2019-04-16-day033-锁与队列

时间:2025-01-09 22:36:26

内容回顾

几个问题

  1. 概念多,练习少
  2. 不问问题
    • 概念?代码?

Process类

  1. 并发并行
    • 并发 是同一时间段内多个任务交替使用同一个cpu
    • 并行 是在同一个时刻多个任务在不同的cpu上同时执行
  2. 同步异步
    • 同步 发布一个任务,要等待这个任务结束之后才能继续
    • 异步 发布一个任务,不等待这个任务的结束就可以继续执行当前的内容
  3. 阻塞非阻塞
    • 阻塞 : 在当前任务中cpu不工作
    • 非阻塞 : cpu还在继续为当前程序在执行
  4. start terminate join
    • start\terminate 异步非阻塞
    • join 同步阻塞
  5. io操作
    • i :输入到内存
    • o :从内存向外(网络 硬盘)输出
join
import time
import random
from multiprocessing import Process
def done(name):
    num = random.uniform(1, 3)
    print('start buy %s'%name,num)
    time.sleep(num)
    print('end buy %s' % name)

if __name__ == '__main__':
    l = []
    p1 = Process(target=done,args=('橘子',))
    p1.start()
    l = [p1]
    p2 = Process(target=done, args=('苹果',))
    p2.start()
    l = [p1,p2]
    p3 = Process(target=done, args=('榴莲',))
    p3.start()
    l = [p1, p2,p3]
    p1.join()   * 等买橘子的那个人回来 只关心p1这个子进程是否执行完
    print('买橘子的那个人回来了')
    p2.join()   * 等买苹果的人回来
    print('买苹果的那个人回来了')
    p3.join()   * 等待榴莲的人回来
    print('买榴莲的那个人回来了')
import time
import random
from multiprocessing import Process
def done(name):
    num = random.uniform(1, 3)
    print('start buy %s'%name,num)
    time.sleep(num)
    print('end buy %s' % name)

if __name__ == '__main__':
    l = ['橘子','苹果','榴莲']
    p_l = []
    for fruit in l:
        p = Process(target=done,args=(fruit,))
        p.start()
        p_l.append(p)
    print(p_l)
    for p in p_l:
        p.join()   # 阻塞 等橘子回来;阻塞 等苹果回来;阻塞 等榴莲回来

join Process模块提供给我们的 对子进程同步管理的方法

import time
import random
from multiprocessing import Process
def done(name):
    num = random.uniform(1, 3)
    print('start buy %s'%name,num)
    time.sleep(num)
    print('end buy %s' % name)

if __name__ == '__main__':
    l = ['橘子','苹果','榴莲']
    for fruit in l:
        p = Process(target=done,args=(fruit,))
        p.start()
        p.join()

开启进程 Process类

实例化的时候引用的参数 : target = 函数名,args=(参数1,)

方法 : start 开启进程 terminate 结束进程 join等待子进程结束

属性 :
  • name 进程名 pid进程id
  • daemon 一定要在start之前设置,设置
  • 个子进程为守护进程,守护进程在主进程的代码结束之后结束

使用类的方式开启子进程

from multiprocessing import Process
class 类名(Process):
    def __init__(self,参数1,参数2):
        super().__init__()
        self.参数1 = 参数1
        self.参数2 = 参数2
    def run(self):
        '''要写在子进程中的代码
        可以使用self中的所有参数'''
        pass
if __name__ == '__main__':
    p = 类名('参数1','参数2')
    p.start()

from multiprocessing import Process
def func():pass

if __name__ == '__main__':
    Process(target=func).start()
    a = 1
print(a)

import json
import time
from multiprocessing import Process,Lock
def search_ticket(name):
   with open('ticket',encoding='utf-8') as f:
       dic = json.load(f)
       print('%s查询余票为%s'%(name,dic['count']))

def buy_ticket(name):
   with open('ticket',encoding='utf-8') as f:
       dic = json.load(f)
   time.sleep(2)
   if dic['count'] >= 1:
       print('%s买到票了'%name)
       dic['count'] -= 1
       time.sleep(2)
       with open('ticket', mode='w',encoding='utf-8') as f:
           json.dump(dic,f)
   else:
       print('余票为0,%s没买到票' % name)

def use(name,lock):
   search_ticket(name)
   print('%s在等待'%name)
   * lock.acquire()
   * print('%s开始执行了'%name)
   * buy_ticket(name)
   * lock.release()
   with lock:
       print('%s开始执行了'%name)
       buy_ticket(name)

if __name__ == '__main__':
   lock = Lock()
   l = ['alex','wusir','baoyuan','taibai']
   for name in l:
       Process(target=use,args=(name,lock)).start()
  1. 牺牲了效率 保证了数据的安全
  2. 用户就会觉得很慢 体验很差

锁的应用场景,当多个进程需要操作同一个文件/数据库的时候 ,

会产生数据不安全,我们应该使用锁来避免多个进程同时修改一个文件

队列

  1. 多个进程之间的数据是隔离的
  2. 进程之间的数据交互
  3. 是可以通过网络/文件来实现的
  4. socket来实现

IPC - inter process communication

  • 通过python的模块实现的
    • 基于原生socket
    • 基于进程队列的 *****
  • 第三方的软件/工具来实现 : 基于网络的
    • memcache redis rabbitMQ kafka - 软件名
from multiprocessing import Queue # 可以完成进程之间通信的特殊的队列
from queue import Queue    #不能完成进程之间的通信

q = Queue()
q.put(1)
q.put(2)
print(q.get())
print(q.get())

from multiprocessing import Queue,Process

def son(q):
    print('-->',q.get())

if __name__ == '__main__':
    q = Queue()
    Process(target=son,args=(q,)).start()
    q.put('wahaha')
  • 生产者消费者模型
  • 获得数据 生产者
  • 处理数据 消费者
  • 调节生产者的个数或者消费者的个数来让程序的效率达到最平衡和最大化
  • 解耦思想
import time
import random
from multiprocessing import Process,Queue
def producer(q):
    for i in range(10):
        time.sleep(random.random())
        food = '泔水%s'%i
        print('%s生产了%s'%('taibai',food))
        q.put(food)

def consumer(q,name):
    while True:
        food = q.get()   * food = 食物/None
        if not food : break
        time.sleep(random.uniform(1,2))
        print('%s 吃了 %s'%(name,food))

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,))
    p1.start()
    c1 = Process(target=consumer,args=(q,'alex'))
    c1.start()
    c2 = Process(target=consumer,args=(q,'wusir'))
    c2.start()
    p1.join()
    q.put(None)
    q.put(None)

如何结束整个程序

import time
import random
from multiprocessing import JoinableQueue,Process

def consumer(jq,name):
    while True:
        food = jq.get()
        time.sleep(random.uniform(1,2))
        print('%s吃完%s'%(name,food))
        jq.task_done()

def producer(jq):
    for i in range(10):
        time.sleep(random.random())
        food = '泔水%s'%i
        print('%s生产了%s'%('taibai',food))
        jq.put(food)
    jq.join()

if __name__ == '__main__':
    jq = JoinableQueue(5)
    c1 = Process(target=consumer,args=(jq,'alex'))
    p1 = Process(target=producer,args=(jq,))
    c1.daemon = True
    c1.start()
    p1.start()
    p1.join()

锁 同一时刻同一段代码,只能有一个进程来执行这段代码

  • 保证数据的安全
  • 多进程中,只有去操作一些 进程之间可以共享的数据资源的时候才需要进行加锁
  • lock = Lock()
  • acquire release
  • with lock:
  • Lock 互斥锁

IPC

  • 队列
    • PUT
    • GET
  • 生产者消费者模型 基于队列把生产数据和消费数据的过程分开了
  • 补充
    • 队列 是进程安全的 自带了锁
    • 队列基于什么实现的 文件家族的socket服务
    • 基于文件家族的socket服务实现的ipc机制不止队列一个,管道Pipe
    • 队列 = 管道 + 锁
    • 管道 是基于文件家族的socket服务实现

from multiprocessing import Pipe