python并发编程之多进程

时间:2022-04-12 03:25:57

并发编程之多进程

  1. 获取进程以及父进程的pid

    进程在内存中开启多个,操作系统如何区分这些进程?每个进程都有一个唯一标识,

    1. 在终端查看进程的pid.

      命令行输入: tasklist

    2. 在终端查看执行的进程pid

      命令行输入: tasklist| findstr 进程名

    3. 通过代码查看pid

      import os
      import time
      print(f'子进程:{os.getpid()}')
      print(f'父进程:{os.getppid()}')
      time.sleep(5000)
  2. 验证进程之间的数据隔离

    # 主进程一定要在子进程运行之后才执行.
    from multiprocessing import Process
    import time
    x = 1000
    
    def task(name):
        print(name)
        global x
        x = 2
    
    if __name__ == '__main__':
        p1 = Process(target=task,args=('太白',))
        p1.start()
        time.sleep(3)
        print(f'主进程:{x}')
    # 数字-5~256 主进程子进程是沿用同一个.
  3. join方法

    # 单个进程
    from multiprocessing import Process
    import time
    def task(name):
        time.sleep(1)
        print(f'{name} is running')
    
    if __name__ == '__main__':
        p = Process(target=task,args=('李业',))
        p.start()
        p.join()  # 告知主进程,p进程结束之后,主进程再运行.
        # join也是一种阻塞
        print('===主进程')
    
    # 多个进程
    from multiprocessing import Process
    import time
    def task(sec):
        time.sleep(sec)
        print(f'{sec}: is running')
    if __name__ == '__main__':
        start_time = time.time()
        p_l = []
        for i in range(1,4):
            p = Process(target=task, args=(i,))
            p.start()
            p_l.append(p)
        for i in p_l:
            i.join()
    
        print(f'===主进程:{time.time() - start_time}之后,执行')
    
  4. 进程对象的其他属性

    from multiprocessing import Process
    import time
    def task(name):
        print(f'{name} is running')
        time.sleep(3)
        print(f'{name} is done')
    if __name__ == '__main__':
        p = Process(target=task,args=('怼哥',) ,name='任务1')  # name给进程对象设置name属性
        p.start()
        print(p.pid)  # 获取进程pid号
        print(p.name) # 查看name属性
        p.terminate() # 终止(结束)子进程
        # terminate 与 start一样的工作原理: 都是通知操作系统终止或者开启一个子进程,内存中终止或者开启(耗费时间)
         print(p.is_alive())  # 判断子进程是否存活 # 只是查看内存中p子进程是否运行.
         print('===主进程')
  5. 僵尸进程与孤儿进程

    # linux(mac)环境下才强调的两个概念,windows下没有.
    # 面试官会问到这两个概念.
    from multiprocessing import Process
    import time
    import os
    def task(name):
        print(f'{name} is running')
        print(f'子进程开始了:{os.getpid()}')
        time.sleep(50)
    if __name__ == '__main__':
        for i in range(100000):
            p = Process(target=task,args=('怼哥',))
            p.start()
        print(f'主进程开始了:{os.getpid()}')
    
    # 主进程是子进程的发起者. 主进程要等到所有的子进程运行结束之后,主进程再结束.
    # 而在这个过程中,有些子进程已经运行完毕,但是父进程还在运行,并没有对这些子进程进行回收
    # 这些子进程依然占据着pid,消耗着内存,这样的子进程就被称为:僵尸进程.
    # 内存中会保留这些子进程的信息,只有当主进程调用wait 或waitpid 获取进程的状态信息,这些信息才会消失
    
    # 孤儿进程: 此时如果主进程由于各种原因,提前消失了,它下面的所有的子进程都成为孤儿进程了.
    # 谁给孤儿进程收尸?  一段时间之后, init就会对孤儿进程进行回收.
    
    # 孤儿进程无害:如果僵尸进程挂了,init会对孤儿进程进行回收.
    # 僵尸进程有害:父进程进行一个死循环,无限的开启子进程,递归的开启,子进程越来越多,僵尸进程越来越多,占用大量的pid,
    # 如果父进程不调用wait/waitpid的话,那么保留的信息就不会释放,其进程号就会一直被占用,但是系统所能使用的进程号是有限的,如果大量的产生僵尸进程,将因为没有可用的进程号而导致系统不能产生新的进程,这就是僵尸进程的危害,应当避免
  6. 守护进程

    # 守护进程:  子进程对父进程可以进行守护.
    from multiprocessing import Process
    import time
    import os
    def task(name):
        print(f'{name} is running')
        print(f'子进程开始了:{os.getpid()}')
        time.sleep(50)
    if __name__ == '__main__':
        p = Process(target=task,args=('怼哥',))
        p.daemon = True  # 将p子进程设置成守护进程,守护主进程,只要主进程结束,子进程无论执行与否,都马上结束.
        p.start()
        time.sleep(2)
        print(f'主进程开始了:{os.getpid()}')
  7. 互斥锁

    # 当3个进程,同一时刻共抢一个资源: 输出平台时.
    # 多个进程共抢一个资源,你要做到结果第一位,效率第二位.
    # 你应该牺牲效率,保求结果.利用串行.
    
    from multiprocessing import Process
    from multiprocessing import Lock  # 导入互斥锁
    import time
    import random
    
    def task1(lock):
        print('task1')  # 验证cpu遇到io切换了
        lock.acquire()  # 当主程序运行到这里,发现存在互斥锁,会切换到另一个进程
        print('task1: 开始打印')
        time.sleep(random.randint(1, 3)) # 模拟延迟
        print('task1: 打印完成')
        lock.release() # 开锁
    
    def task2(lock):
        print('task2')  # 验证cpu遇到io切换了
        lock.acquire() # 发现这里也有锁
        print('task2: 开始打印')
        time.sleep(random.randint(1, 3))
        print('task2: 打印完成')
        lock.release() # 开锁
    
    def task3(lock):
        print('task3') # 验证cpu遇到io切换了
        lock.acquire() # 这也有锁
        print('task3: 开始打印')
        time.sleep(random.randint(1, 3))
        print('task3: 打印完成')
        lock.release() # 开锁
    
    if __name__ == '__main__':
        lock = Lock()
        p1 = Process(target=task1, args=(lock,))
        p2 = Process(target=task2, args=(lock,))
        p3 = Process(target=task3, args=(lock,))
        p1.start()
        p2.start()
        p3.start()
    
    # 上锁:
    # 一定要是同一把锁: 只能按照这个规律:上锁一次,解锁一次.
    
    # 互斥锁与join区别共同点? (面试题)
    # 共同点: 都是完成了进程之间的串行.
    # 区别: join是人为控制的进程串行,互斥锁是随机的抢占资源.保证了公平性
  8. 进程之间的通信: 队列.

    上一节: 多个进程之间的通信:基于文件以及加锁的方式.

    1. 操作文件效率低.
    2. 自己加锁很麻烦. 很容易出现死锁,递归锁.

    进程之间的通信最好的方式是基于队列.

    什么是队列?

    队列就是存在于内存中的一个容器,最大的一个特点; 队列的特性就是FIFO.完全支持先进先出的原则.

    from multiprocessing import Queue # 导入队列
    q = Queue(3)  # 括号里的参数是可以设置队列里的最大元素个数
    
    def func():
        print('in func')
    q.put('alex')
    q.put({'count': 1})
    q.put(func)
    # q.put(666)  # 当队列数据已经达到上限,再插入数据的时候,程序就会夯住.
    # print(111)
    
    print(q.get())
    print(q.get())
    ret = q.get()
    ret()
    print(q.get()) # 当你将数据取完后再继续取值时,默认也会阻塞

    其他参数:

    1. maxsize() q = Queue(3) 队列中的数据量不易过大.存放精简的重要的数据.

    2. put block 默认为True 当你插入的数据超过最大限度,默认阻塞

      q = Queue(3)  # 可以设置元素个数
      q.put('alex')
      q.put({'count': 1})
      q.put(22)  
      q.put(333,block=False)  # 改成False 数据超过最大限度,不阻塞了直接报错.
    3. put timeout() 参数 :延时报错,超过设定时间再put不进数据,就会报错.

      get里面的参数:block,timeout一样.

      q = Queue(3)  # 可以设置元素个数
      q.put('alex')
      q.put({'count': 1})
      q.put(22) 
      q.put(333,timeout=3)  # 延时报错,超过三秒再put不进数据,就会报错.
      
      print(q.get())
      print(q.get())
      print(q.get())
      print(q.get(block=False)) # get里的参数效果与put里的参数效果相同  timeout
  9. 进程之间的通信实例.

    # 小米:抢手环4.预期发售10个.
    # 有100个人去抢.
    import os
    from multiprocessing import Queue
    from multiprocessing import Process
    
    def task(q):
        try:
            q.put(f'{os.getpid()}',block=False)
        except Exception:
            return
    
    if __name__ == '__main__':
        q = Queue(10)
        for i in range(100):
            p = Process(target=task,args=(q,))
            p.start()
        for i in range(1,11):
            print(f'排名第{i}的用户: {q.get()}',)

    利用队列进行进程之间通信: 简单,方便,不用自己手动加锁.队列自带阻塞,可持续化获取数据.

  10. 生产者消费者模型

    模型, 设计模式,归一化设计, 理论等等,教给你一个编程思路.如果以后遇到类似的情况,直接套用即可.

    生产者: 生产数据进程.

    消费者: 对生产者生产出来的数据做进一步处理进程.

    吃饭: 吃包子. 厨师生产出包子,不可能直接给到你嘴里. 放在一个盆中,消费者从盆中取出包子食用.三个主体 : (生产者)厨师, (容器队列)盆, (消费者)人.

    为什么夹杂这个容器?

    如果没有容器, 生产者与消费者强耦合性.不合理.所以我们要有一个容器,缓冲区.平衡了生产力与消费力.

    生产者消费者多应用于并发.

    from multiprocessing import Process
    from multiprocessing import Queue
    import time
    import random
    
    def producer(name,q):
        for i in range(1,6):
            time.sleep(random.randint(1,3))
            res = f'{i}号包子'
            q.put(res)
            print(f'\033[0;32m 生产者{name}: 生产了{res}\033[0m')
    
    def consumer(name,q):
        while 1:
            try:
                time.sleep(random.randint(1,3))
                ret = q.get(timeout=5)
                print(f'消费者{name}: 吃了{ret}')
            except Exception:
                return
    
    if __name__ == '__main__':
        q = Queue()
        p1 = Process(target=producer, args=('太白',q))
        p2 = Process(target=consumer, args=('MC骚强',q))
        p1.start()
        p2.start()

    生产者消费者模型:

    合理的去调控多个进程去生成数据以及提取数据,中间有个必不可少的环节容器队列.