Python多进程

第一:multiprocessing 多进程模块


属性(继承后,子类可以直接使用)
    daemon:  守护进程,和线程的setDaemon()一样
    name: 进程名字,会根据子类名+id,生成一个新名字
    pid: 进程编号


实例方法:

  is_alive():返回进程是否在运行。
  join([timeout]):阻塞当前上下文环境的进程,直到调用此方法的进程终止或    到达指定的timeout(可选参数)。
  start():进程准备就绪,等待CPU调度
  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
  terminate():不管任务是否完成,立即停止工作进程


构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

介绍


import multiprocessing

def pro(p_name):
    print('嗨,%s,我是一个进程' % p_name)


if __name__ == '__main__':

    for i in range(5):
        # 函数名最好别用 p,否则会和Process的一些变量或者方法冲突
        # p = multiprocessing.Process(target=p, args=('tom',)) # 会报异常
        p = multiprocessing.Process(target=pro, args=('tom',))
        p.start()
    p.join()

创建多进程_第一种方式


import multiprocessing

class CusProcess(multiprocessing.Process):


    def run(self):
        print('多进程创建了,我是编号[-%s-]进程' % self.name)

'''
# 注意:必须要加这行代码if __name__ == '__main__':,不加会报异常 
这是 Windows 上多进程的实现问题。在 Windows 上,子进程会自动 import 启动它的这个文件,
而在 import 的时候是会执行这些语句的。如果你这么写的话就会无限递归子进程报错。
所以必须把创建子进程的部分用那个 if 判断保护起来,import 的时候 name 不是 main ,
就不会递归运行了。
'''
if __name__ == '__main__':
    for i in range(3):
        c = CusProcess()
        c.start()

'''
# 小结:
    对比多线程,多进程可以说是真正的并行了,我的CPU是四核的,它代表能同一时刻运行4个进程,以此类推。
    另外,C++,JAVA这些语言是可以实现线程并行的.但唯独Python不行,因为GIL的存在。支持超线程技术的
    CPU是能够并行两个线程的
'''

创建进程_第二种


from multiprocessing import Process
import time,random
num = 258


class MyPro(Process):
    def run(self):
        global num
        print('子进程', id(num))


if __name__ == '__main__':

    for i in range(3):
        m = MyPro()
        m.start()

    time.sleep(3)
    print('主进程', id(num))

结果:
子进程 2152790437040
子进程 1688574045360
子进程 2599376727216
主进程 2029646163664
由此可见,进程的内存地址都是独立的

进程内存地址对比

第二:join([timeout])

from multiprocessing import Process
import time,random
num = 258


class MyPro(Process):
def run(self):
time.sleep(10)
print('%s子进程退出' % self.name)


if __name__ == '__main__':
m1 = MyPro()
m2 = MyPro()
m1.start()
m2.start()
m1.join()
m2.join()
print('主进程的主线程退出')

# 注意:
  1.多进程的join()和多线程的join()卡住的不是进程,而是主线程,谁调用了join,就等于告诉主线程:我执行完了,你主线程才能往下执行。
    另外,子进程也是有自己的主线程的
  2.所有子线程或者子进程集中调用了join后,主线程的等待的总时间以 耗费时间最长的那个进程或者线程的运行时间 为准
  3.如果你创建了一个子进程或者线程运行后立马跟着一个join(),会导致串行执行,比如以上代码改成:
    m1.start()
    m1.join()
    m2.start()
    m2.join()
 

from multiprocessing import Process
import time,random
num = 258


class MyPro(Process):
    def run(self):
        print('%s子进程的主线程退出' % self.name)


if __name__ == '__main__':

    m1 = MyPro()
    m1.start()
    print(m1.pid)  # 查看子进程PID
    m1.terminate() # 关闭进程,但不会立马关闭,会稍微等一小会
    print(m1.is_alive())  # 进程是否存活,存在返回True,否则,返回False
    time.sleep(1)
    print(m1.is_alive())  # 进程是否存活,存在返回True,否则,返回False
    print('主进程的主线程退出')

关闭进程,查看pid,检查进程是否存活

第三:守护进程

from multiprocessing import Process
import time,random
num = 258


class MyPro(Process):
    def run(self):
        time.sleep(1)
        print('%s子进程退出' % self.name)


if __name__ == '__main__':

    m1 = MyPro()
    m1.daemon = True # 设置守护进程,要在start()之前设置。主进程执行退出,无论子进程执行到哪里,都跟着主进程退出
    m1.start()
    print('主进程退出')

第四:进程同步

# 每个进程内部的数据是独立的,但是,当共同访问一个文件、数据库、打印机、其他设备等是会带来竞争的。
# 解决的办法是加锁,但这种做法会牺牲效率,并发执行变成了串行执行
如:3个进程,要排队上公共汽车,过程应该是,一个先上完车,下一个跟进
from multiprocessing import Process,Lock
import time
class MyPro(Process):
def run(self):
print('%s在排队上车' % self.name)
time.sleep(2)
print('%s上车完毕' % self.name)

if __name__ == '__main__':
pro_list = []
for i in range(3):
m = MyPro()
m.start()
# 结果:

MyPro-2在排队上车
MyPro-1在排队上车
MyPro-3在排队上车
MyPro-2上车完毕
MyPro-1上车完毕
MyPro-3上车完毕

#得到的结果是 三个同时挤上车

解决办法:

from multiprocessing import Process, Lock
import time
lock = Lock() # 生成一个锁对象
class MyPro(Process):
def __init__(self, lock):
super().__init__()
self.lock = lock

def run(self):
self.lock.acquire() # 谁拿到锁,就有执行权
print('%s正在排队上车' % self.name)
time.sleep(2)
print('%s已经上车' % self.name)
self.lock.release() # 释放锁,让其他进程去抢

if __name__ == '__main__':
for i in range(3):
m = MyPro(lock)
m.start()

from multiprocessing import Process, Lock
import json, time


class BuyTicket(Process):
    def __init__(self, lock):
        super().__init__()
        self.lock = lock

    def run(self):
        self.lock.acquire() # 谁先拿到,谁有执行权
        dic = getattr(self, 'get')()
        if dic['count'] > 0:
            dic['count'] -= 1
            getattr(self, 'put')(dic)
            print('买票成功')
        self.lock.release()  # 释放锁
        
    def get(self):
        with open('db.txt', 'r') as f:
            dic = json.loads(f.read())
            print('剩余票数:%s' % dic)
            return dic

    def put(self, dic):
        with open('db.txt', 'w') as f:
            json.dump(dic, f)


if __name__ == '__main__':
    lock = Lock() # 锁对象
    for i in range(3):
        bt = BuyTicket(lock)
        bt.start()

买票

第五:Queue(maxsize)

1.q.put(blocked,timeout): 将数据推入队列里。可选参数:blocked=True,timeout是正整数,该方法会阻塞到timeout的指定时间,阻塞到指定时间后,队列没有空间,抛出
               queue.Full() 异常;blocked=False,队列满了就会抛queue.Full()异常
2.q.get(blocked, timeout): 从队列里取出一个元素并删除。可选参数:blocked=True,timeout是正整数,这等待时间内没有取到元素,抛出queue.Empty()异常。
                blocked=False,队列为空,抛出queue.Empty()异常
3.q.get_nowait():同q.get(False)
4.q.put_nowait():同q.put(False)

熟悉的味道:生产者消费者

from multiprocessing import Process, Queue
import time


class Chef(Process):
def __init__(self, queue):
super().__init__()
self.queue = queue

def run(self):
n = 5
while n > 0:
time.sleep(1)
print('厨师生产了编号[-%s-]的包子' % n)
self.queue.put(n)
n -= 1


class Foodie(Process):
def __init__(self, queue):
super().__init__()
self.queue = queue

def run(self):
while True:
data = self.queue.get()
if data == None:
break
print('食客[-%s-]吃了编号[-%s-]的包子' % (self.name, data))
time.sleep(1)


if __name__ == '__main__':
q = Queue()
ch = Chef(q)
ch.start()

for i in range(3):
fd = Foodie(q)
fd.start()
ch.join() # 主线程等待生产者生产完成后才往下执行
for i in range(3): # 有多少个消费者就发送多少个None,接受到None的子进程就退出
q.put(None)

JoinableQueue(maxsize) 

JoinableQueue继承了Queue,拥有Queue之外的方法:
1.q.task_done():向q.join()发送信号,表示数据已经被取走
2.q.join() :调用此方法,将会阻塞,直到 q.task_done()的调用

from multiprocessing import Process, Queue, JoinableQueue
import time


class Chef(Process):
    def __init__(self, joinableQueue):
        super().__init__()
        self.queue = joinableQueue

    def run(self):
        n = 5
        while n > 0:
            time.sleep(1)
            print('厨师生产了编号[-%s-]的包子' % n)
            self.queue.put(n)
            n -= 1
            self.queue.join()  # 生产一个包子,就阻塞,直到消费者处理完队列里的所有数据(包子)


class Foodie(Process):
    def __init__(self, joinableQueue):
        super().__init__()
        self.queue = joinableQueue

    def run(self):
        while True:
            print('等待包子')
            time.sleep(1)
            data = self.queue.get()
            if data is None: break
            print('食客[-%s-]吃了编号[-%s-]的包子' % (self.name, data))

            self.queue.task_done()  # 向join发送一个信号,队列里已经没东西了


if __name__ == '__main__':
    q = JoinableQueue()
    ch = Chef(q)
    ch.start()

    for i in range(3):
        fd = Foodie(q)
        fd.start()
    ch.join()  # 生产者生产完成后,主线程执行下面代码,向子进程发送None,代表要子进程退出的信号
    for i in range(3):
        q.put(None)

JoinableQueue 版本

尽量使用消息队列进程并发编程,另外,应要避免共享数据的使用

第八:共享内存(了解,不推荐使用)

转自 : https://www.cnblogs.com/gengyi/p/8661235.html

 

第九:进程池

#在远程控制多主机或者同时操作一些系统管理的时候,并行可以节省时间。另外,如果任务过多,频繁的创建进程太麻烦和繁琐,任务少可以选择不用进程池。

 

# 参数
from multiprocessing import Pool
Pool(self, processes=None, initializer=None, initargs=(),maxtasksperchild=None, context=None)
processes: 指定创建进程数量
initializer: 每个进程启动时要执行的可调用对象
initargs: 进程的参数,元祖

 # 方法

pool.apply(func, args=(), kwds={}) 同步执行,并返回结果
pool.apply_async(func, args=(), kwds={}, callback=None,error_callback=None)
'''
异步执行,CPU有多少个核,就能同时跑多少个任务,参数callback代表当前一次的任务执行完毕后,通知主进程调用一个函数(回调),另外,callback的函数不能有阻塞操作,因为会影响到其他异步任务的结果。
当进程池里的进程数量大于CPU核数和任务数量比进程池数大时,剩余的进程会切换着运行。如,进程池=5,CPU核数=4,任务数量=10,那么同时能跑4个任务,剩下的一个切换着跑,只是太快了感觉不出来
'''
pool.close() 关闭进程池
pool.join() 等待所有工作进程退出,另外,join必须要在close的前面



from multiprocessing import Pool
import os,time


def start(num):
    print('创建了PID为[-%s-]的进程' % os.getpid())
    print('进程[-%s-]执行完毕' % os.getpid())
    time.sleep(3)
    return num


if __name__ == '__main__':
    pool = Pool(4)  # 指定最大进程数

    for i in range(100):  # 创建10个任务
        ''' 同步执行,当本次任务执行完成并返回后, 无论其他子进程是否阻塞,它只能在原地等着,
            等着上一个任务的执行完毕,即使任务发生了阻塞,也会被夺走执行权
            
        '''
        result = pool.apply(func=start, args=(i,))
        print(result)

apply 同步


from multiprocessing import Pool
import os,time


def start(num):
    print('创建了PID为[-%s-]的进程' % os.getpid())
    print('进程[-%s-]执行完毕' % os.getpid())
    time.sleep(3)
    return num


if __name__ == '__main__':
    pool = Pool(4)  # 指定最大进程数
    result = []
    for i in range(10):  # 创建10个任务
        res = pool.apply_async(func=start, args=(i,))
        result.append(res)


    '''
    apply_async()不同于apply(),异步执行,主进程要使用join()等待,等所有任务执行完毕使用get()来收集,
    如果不这么做,主进程结束,进程池没来的及执行,就跟着主进程一起结束了
    '''
    pool.close()
    pool.join()
    for res in result:
        print(res.get())  # 获取结果

apply_async 异步

 

 回调:


from
multiprocessing import Pool import os,time def start(num): print('创建了PID为[-%s-]的进程' % os.getpid()) time.sleep(1) print('进程[-%s-]执行完毕' % os.getpid()) return num def end(arg): print(arg) # 打印每个任务的结果 print('结束了', os.getpid()) # 查看是哪个进程打印(其实是Python解释器的进程) if __name__ == '__main__': pool = Pool(2) # 指定最大进程数 result = [] for i in range(10): # 创建10个任务 ''' 当一个任务执行完毕后,通知主进程调用end, 参数为当前任务的返回结果 ''' res = pool.apply_async(func=start, args=(i,), callback=end) result.append(res) ''' apply_async()不同于apply(),异步执行,主进程要使用join()等待,等所有任务执行完毕使用get()来收集, 如果不这么做,主进程结束,进程池没来的及执行,就跟着主进程一起结束了 ''' pool.close() pool.join() for res in result: print(res.get()) # 获取结果,apply()可直接获取

 

 soket + pool的简单应用

Server:

from socket import *
from multiprocessing import Pool
import os

s = socket(AF_INET, SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('127.0.0.1', 888))
s.listen(5)


def Server_Pro(conn, addr):
    while True:
        data = conn.recv(1024)
        print('收到客户端发来的消息%s' % data)
        talk = 'Server收到你发送过来的消息'.encode('utf-8')
        conn.send(talk)


if __name__ == '__main__':
    pool = Pool(3)  # 能同时处理三个任务
    while True:
        conn, addr = s.accept()
        pool.apply_async(Server_Pro,args=(conn, addr))

 

 

Client:

from socket import *
c = socket(AF_INET, SOCK_STREAM)
c.connect(('127.0.0.1', 888))
while True:
    data = input('输入发送的消息:').encode('utf-8')
    c.send(data)
    msg = c.recv(1024)
    print(msg.decode('utf8'))