并发编程之多进程

一、什么是进程

一个正在运行的程序称之为进程
是一种抽象概念 表示一个执行某件事情的过程进程的概念 起源于操作系统

第一代计算机 程序是固定 无法修改 某种计算机只能干某种活

第二代批处理系统 需要人工参与 将程序攒成一批 统一执行串行执行 提高计算机的的利用率 但是调试麻烦

第三代计算机 为了更好利用计算机资源,产生了

多道技术: (重点)
1.空间复用
内存分割为多个区域 每个区域存储不同的应用程序

2.时间的复用
1.当一个程序遇到了I/O操作时 会切换到其他程序 (切换前需要保存当前运行状态 以便恢复执行)提高效率

2.当你的应用程序执行时间过长 操作系统会强行切走 以保证其他程序也能正常运行 当然因为cpu速度贼快 用户感觉不到,降低效率

3.有一个优先级更高的任务需要处理 此时也会切走降低了效率

我们编写程序时 只能尽量减少I/O操作

总的来说 有了多道技术之后 操作系统可以同时运行多个程序吧 这种情形称之为并发
但是本质好 这些程序还是一个一个排队执行。

#一 操作系统的作用:
    1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口
    2:管理、调度进程,并且将多个进程对硬件的竞争变得有序

#二 多道技术:
    1.产生背景:针对单核,实现并发
    ps:
    现在的主机一般是多核,那么每个核都会利用多道技术
    有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个
    cpu中的任意一个,具体由操作系统调度算法决定。
    
    2.空间上的复用:如内存中同时有多道程序
    3.时间上的复用:复用一个cpu的时间片
       强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样
            才能保证下次切换回来时,能基于上次切走的位置继续运行

进程的并行与并发

并发 看起来像是同时运行中 本质是不停切换执行 多个进程随机执行
并行 同一时刻 多个进程 同时进行 只有多核处理器才有真正的并行

区别:

并行是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。
并发是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个session。

串行 一个一个 依次执行排队
阻塞 遇到了I/O操作 看起来就是代码卡住了非阻塞 不会卡住代码的执行
阻塞 和 非阻塞 说的是同一个进程的情况下

同步 一个调用必须获得返回结果才能继续执行
异步 一个调用发起后 发起方不需要等待它的返回结果

同步和异步 必须存在多个进程(线程)
无论是进程还是线程都是两条独立的执行路径

 

多进程的执行顺序
主进程必然先执行子进程应该在主进程执行后执行一旦子进程启动了 后续的顺序就无法控制了

python如何使用多进程
1.直接创建Process对象 同时传入要做的事情就是一个函数

p = Process(taget=一个函数,args=(函数的参数))
p.start() 让操作系统启动这个进程
2.创建一个类 继承自Process 把要做的任务放在run方法中

常用属性
start 开启进程
join 父进程等待子进程
name 进程名称
is_alive是否存活
terminate 终止进程
pid 获取进程id

启动进程的方式
1.系统初始化 会产生一个根进程
2.用户的交互请求 鼠标双击某个程序
3.在一个进程中 发起了系统调用启动了另一个进程
4.批处理作业开始 某些专用计算机可能还在使用

不同操作系统创建进程的方式不同
unix < centos mac linux
完全拷贝父进程的所有数据 子进程可以访问父进程的数据吗?不可以 但是可以访问拷贝过来数据副本
windows
创建子进程 加载父进程中所有可执行的文件

二、在python程序中的进程操作

一 multiprocessing模块介绍

python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu\_count\(\)查看),在python中大部分情况需要使用多进程。

Python提供了multiprocessing。 multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,>提供了Process、Queue、Pipe、Lock等组件。

需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内

二 Process类的介绍

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,可用来开启一个子进程

强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:

group参数未使用,值始终为None

target表示调用对象,即子进程要执行的任务

args表示调用对象的位置参数元组,args=(1,2,'egon',)

kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}

name为子进程的名称

方法介绍:

p.start():启动进程,并调用该子进程中的p.run() 
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

属性介绍:

p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

p.name:进程的名称

p.pid:进程的pid

三 Process类的使用

注意:在windows中Process()必须放到# if __name__ == ‘__main__’:下

创建并开启子进程的方式


import time
import random
from multiprocessing import Process

def piao(name):
    print('%s piaoing' %name)
    time.sleep(random.randrange(1,5))
    print('%s piao end' %name)

if __name__ == '__main__':
    #实例化得到四个对象
    p1=Process(target=piao,args=('egon',)) #必须加,号
    p2=Process(target=piao,args=('alex',))
    p3=Process(target=piao,args=('wupeqi',))
    p4=Process(target=piao,args=('yuanhao',))

    #调用对象下的方法,开启四个进程
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('')

方式一


import time
import random
from multiprocessing import Process

class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('%s piaoing' %self.name)

        time.sleep(random.randrange(1,5))
        print('%s piao end' %self.name)

if __name__ == '__main__':
    #实例化得到四个对象
    p1=Piao('egon')
    p2=Piao('alex')
    p3=Piao('wupeiqi')
    p4=Piao('yuanhao')

    #调用对象下的方法,开启四个进程
    p1.start() #start会自动调用run
    p2.start()
    p3.start()
    p4.start()
    print('')

方式二

四、Process对象的join方法

在主进程运行过程中如果想并发地执行其他的任务,我们可以开启子进程,此时主进程的任务与子进程的任务分两种情况

情况一:在主进程的任务与子进程的任务彼此独立的情况下,主进程的任务先执行完毕后,主进程还需要等待子进程执行完毕,然后统一回收资源。

情况二:如果主进程的任务在执行到某一个阶段时,需要等待子进程执行完毕后才能继续执行,就需要有一种机制能够让主进程检测子进程是否运行完毕,在子进程执行完毕后才继续执行,否则一直在原地阻塞,这就是join方法的作用


import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)


if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
        p.join()
    # [p.join() for p in p_lst]
    print('父进程在执行')

多个进程同时运行,再谈join方法(1)


import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    time.sleep(1)

if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
    # [p.join() for p in p_lst]
    print('父进程在执行')

多个进程同时运行,再谈join方法(2)

除了上面这些开启进程的方法,还有一种以继承Process类的形式开启进程的方式


import os
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(os.getpid())
        print('%s 正在和女主播聊天' %self.name)

p1=MyProcess('wupeiqi')
p2=MyProcess('yuanhao')
p3=MyProcess('nezha')

p1.start() #start会自动调用run
p2.start()
# p2.run()
p3.start()


p1.join()
p2.join()
p3.join()

print('主线程')

通过继承Process类开启进程

进程之间的数据隔离问题


from multiprocessing import Process

def work():
    global n
    n=0
    print('子进程内: ',n)


if __name__ == '__main__':
    n = 100
    p=Process(target=work)
    p.start()
    print('主进程内: ',n)

进程之间的数据隔离问题

三、守护进程

主进程创建子进程,然后将该进程设置成守护自己的进程,守护进程就好比崇祯皇帝身边的老太监,崇祯皇帝已死老太监就跟着殉葬了。

关于守护进程需要强调两点:

其一:守护进程会在主进程代码执行结束后就终止

其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

如果我们有两个任务需要并发执行,那么开一个主进程和一个子进程分别去执行就ok了,如果子进程的任务在主进程任务结束后就没有存在的必要了,那么该子进程应该在开启前就被设置成守护进程。主进程代码运行结束,守护进程随即终止


from multiprocessing import Process
import time
import random

def task(name):
    print('%s is piaoing' %name)
    time.sleep(random.randrange(1,3))
    print('%s is piao end' %name)


if __name__ == '__main__':
    p=Process(target=task,args=('egon',))
    p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
    p.start()
    print('') #只要终端打印出这一行内容,那么守护进程p也就跟着结束掉了

守护进程

守护 就是看着 陪着  在代码中 进程只能由进程类守护
一个进程守护者另一个进程 指的是两个进程之间的关联关系
特点:守护进程(妃子) 在被守护进程(皇帝)死亡时 会跟随被守护进程死亡

什么时候需要使用守护进程?
例如: qq中有个下载视频 应该用子进程去做 但是 下载的过程中 qq退出 那么下载也没必要继续了

四、互斥锁


from multiprocessing import Process,Lock

# 进程间 内存空间是相互独立的
def task1(lock):
    lock.acquire()
    for i in range(10000):
        print("===")
    lock.release()

def task2(lock):
    lock.acquire()
    for i in range(10000):
        print("===============")
    lock.release()

def task3(lock):
    lock.acquire()
    for i in range(10000):
        print("======================================")
    lock.release()

if __name__ == '__main__':
    # 买了一把锁
    mutex = Lock()

    # for i in range(10):
    #     p = Process(target=)
    p1 = Process(target=task1,args=(mutex,))
    p2 = Process(target=task2,args=(mutex,))
    p3 = Process(target=task3,args=(mutex,))

    # p1.start()
    # p1.join()
    # p2.start()
    # p2.join()
    # p3.start()
    # p3.join()

    p1.start()
    p2.start()
    p3.start()

    print("over!")

互斥锁案例

     # 什么时候用锁?
    # 当多个进程 同时读写同一份数据 数据很可能就被搞坏了
    # 第一个进程写了一个中文字符的一个字节 cpu被切到另一个进程
    # 另一个进程也写了一个中文字符的一个字节
    # 最后文件解码失败
    # 问题之所以出现 是因为并发 无法控住顺序
    # 目前可以使用join来将所有进程并发改为串行

    # 与join的区别?
    # 多个进程并发的访问了同一个资源  将导致资源竞争(同时读取不会产生问题 同时修改才会出问题)
    # 第一个方案 加上join  但是这样就导致了 不公平  相当于 上厕所得按照颜值来
    # 第二个方案 加锁  谁先抢到资源谁先处理[
    # 相同点: 都变成了串行
    # 不同点:
    # 1.join顺序固定 锁顺序不固定!
    # 2.join使整个进程的任务全部串行  而锁可以指定哪些代码要串行

    # 锁使是什么?
    # 锁本质上就是一个bool类型的标识符  大家(多个进程) 在执行任务之前先判断标识符
    # 互斥锁 两个进程相互排斥

    # 注意 要想锁住资源必须保证 大家拿到锁是同一把

    # 怎么使用?
    # 在需要加锁的地方 lock.acquire() 表示锁定
    # 在代码执行完后 一定要lock.release() 表示释放锁
    # lock.acquire()
    # 放需要竞争资源的代码 (同时写入数据)
    # lock.release()

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或者打印终端是没有问题的,但是带来的是竞争,竞争带来的结果是错乱,如下:多个进程模拟多个人执行抢票任务

#文件db.txt的内容为:{"count":1}
#注意一定要用双引号,不然json无法识别
from multiprocessing import Process
import time,json

def search(name):
    dic=json.load(open('db.txt'))
    time.sleep(1)
    print('\033[43m%s 查到剩余票数%s\033[0m' %(name,dic['count']))

def get(name):
    dic=json.load(open('db.txt'))
    time.sleep(1) #模拟读数据的网络延迟
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(1) #模拟写数据的网络延迟
        json.dump(dic,open('db.txt','w'))
        print('\033[46m%s 购票成功\033[0m' %name)

def task(name):
    search(name)
    get(name)

if __name__ == '__main__':
    for i in range(10): #模拟并发10个客户端抢票
        name='<路人%s>' %i
        p=Process(target=task,args=(name,))
        p.start()

并发运行,效率高,但竞争写同一文件,数据写入错乱,只有一张票,卖成功给了10个人


<路人0> 查到剩余票数1
<路人1> 查到剩余票数1
<路人2> 查到剩余票数1
<路人3> 查到剩余票数1
<路人4> 查到剩余票数1
<路人5> 查到剩余票数1
<路人6> 查到剩余票数1
<路人7> 查到剩余票数1
<路人8> 查到剩余票数1
<路人9> 查到剩余票数1
<路人0> 购票成功
<路人4> 购票成功
<路人1> 购票成功
<路人5> 购票成功
<路人3> 购票成功
<路人7> 购票成功
<路人2> 购票成功
<路人6> 购票成功
<路人8> 购票成功
<路人9> 购票成功

运行结果

加锁处理:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全

#把文件db.txt的内容重置为:{"count":1}
from multiprocessing import Process,Lock
import time,json

def search(name):
    dic=json.load(open('db.txt'))
    time.sleep(1)
    print('\033[43m%s 查到剩余票数%s\033[0m' %(name,dic['count']))

def get(name):
    dic=json.load(open('db.txt'))
    time.sleep(1) #模拟读数据的网络延迟
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(1) #模拟写数据的网络延迟
        json.dump(dic,open('db.txt','w'))
        print('\033[46m%s 购票成功\033[0m' %name)

def task(name,lock):
    search(name)
    with lock: #相当于lock.acquire(),执行完自代码块自动执行lock.release()
        get(name)

if __name__ == '__main__':
    lock=Lock()
    for i in range(10): #模拟并发10个客户端抢票
        name='<路人%s>' %i
        p=Process(target=task,args=(name,lock))
        p.start()

<路人0> 查到剩余票数1
<路人1> 查到剩余票数1
<路人2> 查到剩余票数1
<路人3> 查到剩余票数1
<路人4> 查到剩余票数1
<路人5> 查到剩余票数1
<路人6> 查到剩余票数1
<路人7> 查到剩余票数1
<路人8> 查到剩余票数1
<路人9> 查到剩余票数1
<路人0> 购票成功

运行结果

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

虽然可以用文件共享数据实现进程间通信,但问题是:

1、效率低(共享数据基于文件,而文件是硬盘上的数据)

2、需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:

1、效率高(多个进程共享一块内存的数据)

2、帮我们处理好锁问题。

这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。

我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

五、进程间的通讯

IPC 指的是进程间通讯
之所以开启子进程 肯定需要它帮我们完成任务 很多情况下 需要将数据返回给父进程
然而 进程内存是物理隔离的
解决方案:
1.将共享数据放到文件中 就是慢
2.管道 subprocess中的那个 管道只能单向通讯 必须存在父子关系
3.共享一块内存区域 得操作系统帮你分配 速度快

from multiprocessing import  Process,Manager
import time

def task(dic):
    print("子进程xxxxx")
    # li[0] = 1
    # print(li[0])
    dic["name"] = "xx"

if __name__ == '__main__':
    m = Manager()
    # li = m.list([100])
    dic = m.dict({})
    # 开启子进程
    p = Process(target=task,args=(dic,))
    p.start()
    time.sleep(3)
    print(dic)

六、僵尸进程和孤儿进程

一个进程任务执行完就死亡了 但是操作系统不会立即将其清理 为的是 开启这个子进程的父进程可以访问到这个子进程的信息
这样的 任务完成的 但是没有被操作系统清理的进程称为僵尸进程 越少越好

孤儿进程 无害!  没有爹的称为孤儿
一个父进程已经死亡 然而他的子孙进程 还在执行着 这时候 操作系统会接管这些孤儿进程

七、队列介绍

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

创建队列的类(底层就是以管道和锁定的方式实现)

Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

参数介绍:

maxsize是队列中允许最大项数,省略则无大小限制。
但需要明确:
    1、队列内存放的是消息而非大数据
    2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小

主要方法介绍:

q.put方法用以插入数据到队列中。
q.get方法可以从队列读取并且删除一个元素。

队列使用:

from multiprocessing import Process,Queue

q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(1)
q.put(2)
q.put(3)
print(q.full()) #满了
# q.put(4) #再放就阻塞住了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
# print(q.get()) #再取就阻塞住了
"""
    进程间通讯的另一种方式 使用queue
    queue  队列
    队列的特点:
        先进的先出
        后进后出
        就像扶梯
"""
from multiprocessing import Process,Queue


# 基础操作 必须要掌握的
# 创建一个队列
# q = Queue()
# # 存入数据
# q.put("hello")
# q.put(["1","2","3"])
# q.put(1)
# # 取出数据
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get())

# 阻塞操作 必须掌握
# q = Queue(3)
# # # 存入数据
# q.put("hello",block=False)
# q.put(["1","2","3"],block=False)
# q.put(1,block=False)
# 当容量满的时候 再执行put 默认会阻塞直到执行力了get为止
# 如果修改block=False 直接报错 因为没地方放了
# q.put({},block=False)
#
#  取出数据
# print(q.get(block=False))
# print(q.get(block=False))
# print(q.get(block=False))
# # 对于get   当队列中中没有数据时默认是阻塞的  直达执行了put
# # 如果修改block=False 直接报错 因为没数据可取了
# print(q.get(block=False))

八、生产消费者模型:

1.生产者消费者模型
模型 设计模式 三层结构 等等表示的都是一种编程套路
生产者指的是能够产生数据的一类任务
消费者指的是处理数据的一类任务

需求: 文件夹里有十个文本文档 要求你找出文件中包含习大大关键字的文件打开并读取文件数据就是生产者查找关键字的过程就是消费者

生产者消费者模型为什么出现?
生产者的处理能力与消费者的处理能力 不匹配不平衡 导致了一方等待另一方 浪费时间
目前我们通过多进程将生产 和 消费 分开处理
然后将生产者生产的数据通过队列交给消费者

总结一下在生产者消费者模型中 不仅需要生产者消费者 还需要一个共享数据区域
1.将生产方和消费方耦合度降低
2.平衡双方的能力 提高整体效率

上代码 :
搞两个进程 一个负责生产 一个负责消费

from multiprocessing import Process,Queue
# 制作热狗
def make_hotdog(queue,name):
    for i in range(3):
        time.sleep(random.randint(1,2))
        print("%s 制作了一个热狗 %s" % (name,i))
        # 生产得到的数据
        data = "%s生产的热狗%s" % (name,i)
        # 存到队列中
        queue.put(data)
    # 装入一个特别的数据 告诉消费方 没有了
    #queue.put(None)


# 吃热狗
def eat_hotdog(queue,name):
    while True:
        data = queue.get()
        if not data:break
        time.sleep(random.randint(1, 2))
        print("%s 吃了%s" % (name,data))

if __name__ == '__main__':
    #创建队列
    q = Queue()
    p1 = Process(target=make_hotdog,args=(q,"邵钻钻的热狗店"))
    p2 = Process(target=make_hotdog, args=(q, "egon的热狗店"))
    p3 = Process(target=make_hotdog, args=(q, "老王的热狗店"))


    c1 = Process(target=eat_hotdog, args=(q,"思聪"))
    c2 = Process(target=eat_hotdog, args=(q, "李哲"))

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    # 让主进程等三家店全都做完后....
    p1.join()
    p2.join()
    p3.join()

    # 添加结束标志   注意这种方法有几个消费者就加几个None 不太合适 不清楚将来有多少消费者
    q.put(None)
    q.put(None)


    # 现在 需要知道什么时候做完热狗了 生产者不知道  消费者也不知道
    # 只有队列知道

print("主进程over")
# 生产方不生产了 然而消费方不知道 所以已知等待 get函数阻塞 # 三家店都放了一个空表示没热狗了 但是消费者只有两个 他们只要看见None 就认为没有了 # 于是进程也就结束了 造成一些数据没有被处理 # 等待做有店都做完热狗在放None

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
    q.put(None) #发送结束信号
if __name__ == '__main__':
    q=Queue()
    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,))

    #开始
    p1.start()
    c1.start()
    print('')

模型