基础10 多进程、协程(multiprocessing、greenlet、gevent、gevent.monkey、select、selector)

时间:2022-04-24 09:14:50

1.多进程实现方式(类似于多线程)

 import multiprocessing
 import time,threading

 def thread_run():#定义一个线程函数
     print("我是子线程%s" %threading.get_ident())  #threading.get_ident()函数获取当前线程的id
 def run(name):#定义一个进程函数
     time.sleep(1)
     print("hello,我是进程%s" %name)
     t = threading.Thread(target=thread_run(),)#在进程下运行子线程
     t.start()
 if __name__ == '__main__':  #注意:启多进程必须要加这个判断,否则会有意想不到的错误
     for i in range(3):
         p = multiprocessing.Process(target=run,args=('bob %s' %i,))
         p.start()
         p.join()

 输出:
 hello,我是进程bob 0
 我是子线程28176
 hello,我是进程bob 1
 我是子线程27016
 hello,我是进程bob 2
 我是子线程27516

2.进程都是由父进程创建和启动的

 from multiprocessing import Process
 import os

 def info(title):
     print(title)
     print('module name:', __name__)
     print('当前进程父进程id:', os.getppid())
     print('当前进程 id:', os.getpid())
     print("\n\n")

 def f(name):
     info('\033[31;1mcalled from child process function f\033[0m')
     print('hello', name)

 if __name__ == '__main__':
     info('\033[32;1mmain process line\033[0m')
     p = Process(target=f, args=('bob',)) #
     p.start()

 输出:
 main process line
 module name: __main__
 当前进程父进程id: 12468    #这是pycharm的进程号
 当前进程 id: 25692           #这是第一个info函数的进程号

 called from child process function f
 module name: __mp_main__
 当前进程父进程id: 25692 #这是第一个info函数的进程号
 当前进程 id: 27252        #这是f函数内调用的info函数的进程号

 hello bob

3.进程间是不共享内存的(不同于线程),即进程之间是不能直接交换信息的
可以用以下方式实现进程间通信,用Queue队列(线程中用queue)

 import threading
 from multiprocessing import Process ,Queue

 def f(zi_q):#定义一个子进程函数,它往Queue里放值
     zi_q.put([42,None,'hello'])

 if __name__ == '__main__':
     fu_q = Queue()                        #实例化父进程的一个队列Queue
     zi_p = Process(target=f, args=(fu_q,))#将父进程的Queue作为参数传给子进程zi_p,这里父进程的Queue和子进程的Queue
                                           #不是同一个Queue,这里fu_q作为参数传给子进程的时候,是将fu_q
                                           #复制了一份传给了子进程,子进程将值赋给子进程的Queue后,函数内部
                                           #又通过pickle形式将值传给了父进程的Queue(fu_q)
     zi_p.start()
     print(fu_q.get())  #父进程获取Queue内的值
     zi_p.join()

 输出:
 [42, None, 'hello']

4.实现不同进程间通信的另外一种方法

(1)管道(Pipe()),类似于socket

 from multiprocessing import Process, Pipe
 def f(conn):
     conn.send([42, 'hello'])#类似于socket,这里子进程发了两次,下边父进程就需要收两次
     conn.send([32, 'yyyy'])
     print(conn.recv())        #子进程也可以接收父进程发过来的信息
     conn.close()

 if __name__ == '__main__':
     parent_conn, child_conn = Pipe()
     p = Process(target=f, args=(child_conn,))
     p.start()
     print(parent_conn.recv())#收两次
     print(parent_conn.recv())
     parent_conn.send('我是父进程A') #父进程可以给子进程发信息
     p.join()

 输出:
 [42, 'hello']
 [32, 'yyyy']
 我是父进程A

5.实现进程间内存的共享(所谓共享即所有进程操作同一份列表、字典等等,其实也不真的是同一份数据,只不过是程序内部通过复制原始列表、字典给进程,进程运行以后修改列表、字典,最后合为一个列表、字典,让人们以为是所有进程共享了相同的内存)
manager模块可实现这一功能

 from multiprocessing import Process, Manager
 import os
 def f(d, li):
     d[os.getpid()] = os.getpid()  #生成进程号字典
     li.append(os.getpid())         #生成进程号列表
     print(li)

 if __name__ == '__main__':
     with Manager() as manager:  #等同于manager = Manager()
         d = manager.dict()   #创建一个字典,可在多个进程间共享和传递
         li = manager.list()#创建一个列表,可在多个进程间共享和传递
         p_list = []
         for i  in range(5):#生成5个进程
             p = Process(target=f, args=(d, li))
             p.start()
             p_list.append(p) #将这5个进程放进一个列表里
         for res in p_list: #让每个进程都执行完
             res.join()
         print(d)

 输出:
 [53812]
 [53812, 51612]
 [53812, 51612, 53520]
 [53812, 51612, 53520, 53696]
 [53812, 51612, 53520, 53696, 53608]
 {53520: 53520, 51612: 51612, 53608: 53608, 53812: 53812, 53696: 53696}

6.进程锁,防止在屏幕上或日志里打印乱了
  如这种乱:hello worhello world 8

 from multiprocessing import Process, Lock

 def f(l, i):
     l.acquire()#获得一把锁
     print('hello world', i)
     l.release()#释放一把锁
 if __name__ == '__main__':
     lock = Lock()
     for num in range(10):
         Process(target=f, args=(lock, num)).start()

 输出:
 hello world 2
 hello world 1
 hello world 4
 hello world 3
 hello world 8
 hello world 0
 hello world 7
 hello world 5
 hello world 6
 hello world 9

7.很重要  进程池    防止进程启动太多,消耗太多资源,一般是几个cpu就启几个进程

 from multiprocessing import Process, Pool
 import time
 import os

 def foo(i):
     time.sleep(1)
     print("正在执行子进程", os.getpid())
     return os.getpid()      #返回的是子进程的id号

 def bar(arg):    #定义回调函数,参数arg是foo函数的返回值
     print("子进程%s执行完毕,主进程%s调用的我"% (arg, os.getpid()))

 if __name__ == '__main__':
     pool = Pool(5)  #允许进程池同时放入5个进程
     print("主进程", os.getpid())
     for i in range(10):
         pool.apply_async(func=foo, args=(i,), callback=bar)#异步,异步最好,callback后跟回调函数,回调函数是由主进程执行的
                                                         #回调函数的参数不用写,默认就是前边的func
        # pool.apply(func=foo, args=(i,))  #同步,是串行的,看不出是5个5个的执行
     print("主进程:end")
     pool.close()        #注意,这里要先close,再join,否则会有错误
     pool.join()

输出:

 主进程 57784
 主进程:end
 正在执行子进程 58060
 子进程58060执行完毕,主进程57784调用的我
 正在执行子进程 55428
 子进程55428执行完毕,主进程57784调用的我
 正在执行子进程 58356
 子进程58356执行完毕,主进程57784调用的我
 正在执行子进程 56716
 子进程56716执行完毕,主进程57784调用的我
 正在执行子进程 57380
 子进程57380执行完毕,主进程57784调用的我
 正在执行子进程 58060
 子进程58060执行完毕,主进程57784调用的我
 正在执行子进程 55428
 子进程55428执行完毕,主进程57784调用的我
 正在执行子进程 58356
 子进程58356执行完毕,主进程57784调用的我
 正在执行子进程 56716
 子进程56716执行完毕,主进程57784调用的我
 正在执行子进程 57380
 子进程57380执行完毕,主进程57784调用的我

8.协程

又称微线程,是一种用户态的轻量级线程,协程拥有自己的寄存器和栈,协程调度切换时,将寄存器上下文和栈保存在其他地方,当切回来的时候,恢复先前保存的寄存器上下文和栈,因此,每一次协程恢复时,

都能在它离开中断的地方继续。

协程的好处:

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

用greenlet模块实现协程(手动):

 from greenlet import greenlet
 def test1():
     print("test1")      #4.打印
     gr2.switch()        #5.切换到test2,test2开始执行
     print("test1...")   #8.接上次离开的地方,开始执行,打印
     gr2.switch()        #9.切换到test2,test2开始执行

 def test2():
     print("test2")     #6.打印
     gr1.switch()       #7.切换到tets1,test1开始执行
     print("test2...")  #10.接上次离开的地方,开始执行,打印,程序结束

 gr1 = greenlet(test1)   #1.启动一个协程
 gr2 = greenlet(test2)   #2.启动一个协程
 gr1.switch()            #3.切换到test1,test1开始执行

 输出:
 test1
 test2
 test1...
 test2...

重要:用gevent模块实现协程(自动切换)。遇到IO(程序睡觉或卡主)便自动切换到下一个协程

 import gevent

 def t1():
     print("t1 运行")       #4.t1开始运行,打印
     gevent.sleep(2)       #5.切换到下一个协程,按生成的顺序,切换到t2
                           #10.程序运行到这里,只花费了不到0.1秒钟,这时t1还在睡觉,又触发了切换操作,切换到t2
                           #13.程序运行到这里,只花费了不到0.1秒钟,这时t1还在睡觉,又触发了切换操作,切换到t2
     print("再次回到t1")    #16.接上次离开的地方,执行打印操作

 def t2():
     print("t2 运行")       #6.t2开始运行,打印
     gevent.sleep(1)       #7.切换到下一个协程,按生成的顺序,切换到t3
                           #11.程序运行到这里,只花费了不到0.1秒钟,这时t2还在睡觉,又触发了切换操作,切换到t3
                           #14.程序运行到这里,只花费了不到0.1秒钟,这时t2还在睡觉,又触发了切换操作,但是t3运行
                           #    完毕,无法切换到t3,只能切换到t1,t1还在睡觉,又切换到t2,互相切换过了1秒后,t2睡觉
                           #    完毕,这时切换到t2
     print("再次回到t2")    #15.接上次离开的地方,执行打印操作,切换到t1

 def t3():
     print("t3运行")        #8.t3开始运行,打印
     gevent.sleep(0)       #9.虽然睡觉0秒,但是会触发切换操作,切换到下一个协程,按生成的顺序,切换到t1
     print("再次回到t3")    #12.由于t3没有睡觉,执行打印操作。t3运行完毕,自动切换到t1

 gevent.joinall([
     gevent.spawn(t1),      #1.生成协程1
     gevent.spawn(t2),      #2.生成协程2
     gevent.spawn(t3),      #3.生成协程3
 ])

 输出:   t1 运行   t2 运行   t3运行   再次回到t3   再次回到t2   再次回到t1

9.爬虫,简单下载网页,使用gevent和gevent.monkey模块(需python3.5以上,gevent在linux完美支持,在Windows可能会有问题)

 from urllib import request
 import gevent,time
 from gevent import monkey
 monkey.patch_all() #把当前程序的所有的io操作给gevent单独的做上标记,使gevent能辨识出io操作,从而达到协程遇到io就切换的效果,不加monkey是不能识别出io的

 def f(url):
     print('GET: %s' % url)
     resp = request.urlopen(url)
     data = resp.read()
     print('%d bytes received from %s.' % (len(data), url))

 urls = ['https://www.python.org/',
         'https://www.yahoo.com/',
         'https://github.com/' ]
 time_start = time.time()
 for url in urls:
     f(url)
 print("同步cost",time.time() - time_start)
 async_time_start = time.time()
 gevent.joinall([
     gevent.spawn(f, 'https://www.python.org/'),
     gevent.spawn(f, 'https://www.yahoo.com/'),
     gevent.spawn(f, 'https://github.com/'),
 ])
 print("异步cost",time.time() - async_time_start)

输出:

 GET: https://www.python.org/
 47403 bytes received from https://www.python.org/.
 GET: https://www.yahoo.com/
 460629 bytes received from https://www.yahoo.com/.
 GET: https://github.com/
 25734 bytes received from https://github.com/.
 同步cost 5.380241632461548
 GET: https://www.python.org/
 GET: https://www.yahoo.com/
 GET: https://github.com/
 47403 bytes received from https://www.python.org/.
 430893 bytes received from https://www.yahoo.com/.
 25734 bytes received from https://github.com/.
 异步cost 1.2949371337890625

10.基于协程的异步多并发

socket server

 #!usr/bin/env/python
 # -*- coding:utf-8 -*-
 # from  wangteng
 #通过协程实现并发
 import sys
 import socket
 import time
 import gevent

 from gevent import socket,monkey
 monkey.patch_all()

 def server(port):
     s = socket.socket()
     s.bind(('0.0.0.0',port))
     s.listen(500)#最大连接数为500
     while True:
         cli, addr = s.accept()
         gevent.spawn(handle_request, cli)#  创建协程,这里做了一个操作,将cli作为参数传给handle_request,  handle_request(cli)

 def handle_request(conn):
     try:
         while True:
             data = conn.recv(1024)#接收client的请求

             with open("bbb.txt",'ab') as f:#接收文件内容,并写入文件
                 f.write(data)
             print("recv:", data)
             # post = bytes('get your request: ',encoding='utf-8')
             # post = post + data  #post为要向client发送的数据,必须为bytes类型
             conn.send(data)  #向client发送数据
             if not data:
                 conn.shutdown(socket.SHUT_WR)

     except Exception as ex:
         print(ex)

     finally:
         conn.close()

 if __name__ == "__main__":
     server(9000)

client

 #!usr/bin/env/python
 # -*- coding:utf-8 -*-
 # from  wangteng
 #本客户端可以跟异步socket_server多并发、select socket server一起用
 import socket

 host = 'localhost'
 port = 9000
 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 s.connect((host, port))
 while True:
     msg = bytes(input(">>:"), encoding="utf-8")  #输入要发送的内容,必须是bytes类型
     s.sendall(msg)   #发送数据
     # filename = input("filename:>>")   #注释的内容是可以发送文件的
     # with open(filename,'rb') as f:#发送文件
     #     for line in f:
     #         s.sendall(line)
     #         data = s.recv(1024)
     #         print('recv:',data)
     data = s.recv(1024)#接收数据
     print('recv:', data.decode())#解码可以显示中文
 s.close()

11.io多路复用,不是异步   基于select
server

 #io多路复用实现并发,并不是异步  select

 import select
 import socket
 import queue

 server = socket.socket()
 server.bind(('localhost',9000))
 server.listen(1000)

 server.setblocking(False) #不阻塞

 msg_dic = {}

 inputs = [server, ]#新建链接的集合列表,实际情况是随着连接的增多,列表中的conn会越来越多,而server只有一个
 #inputs = [server,conn] #[conn,]
 #inputs = [server,conn,conn2,conn3...] #实际情况是随着连接的增多,列表中的conn会越来越多,而server只有一个
 outputs = [] #要返回数据的客户端连接的集合列表,比如conn1需要server给conn1返回请求数据,就将conn1放进outputs里
 #outputs = [r1,] #
 while True:
     readable, writeable, exceptional = select.select(inputs, outputs, inputs )#select监测新建立的连接,readble是inputs列表中
                                                  #的元素集合,writeable是outputs列表中的元素集合,exceptional是异常连接的集合,它包含在inputs中
     print(readable, writeable, exceptional)
     for r in readable:
         if r is server: #代表来了一个新连接
             conn,addr = server.accept()#建立连接
             print("来了个新连接",addr)
             inputs.append(conn) #是因为这个新建立的连接还没发数据过来,现在就接收的话程序就报错了,
             #所以要想实现这个客户端发数据来时server端能知道,就需要让select再监测这个conn,所以将新连接放到inputs里
             msg_dic[conn] = queue.Queue() #初始化一个队列,用来存放要返回给这个客户端的数据
         else: #conn2  这里不是新来了一个新连接,而是之前已经建立好的连接,是继续要发送数据的客户端连接
             data = r.recv(1024)
             print("收到数据",data.decode())
             msg_dic[r].put(data)#将要返回给客户端的数据存放到队列里
             outputs.append(r) #将连接r 放入要返回数据的连接队列里,如果r不是需要返回数据的连接,就可以不将r放进outputs中

     for w in writeable: #要返回给客户端的连接列表
         data_to_client = msg_dic[w].get()#从队列里取出要返回给客户端的数据
         w.send(data_to_client) #给客户端返回数据
         outputs.remove(w) #确保下次循环writeable的时候,不返回这个已经处理完的连接了

     for e in exceptional:#有异常的客户端连接列表,称之为坏连接
         if e in outputs: #如果坏连接在返回数据列表里,就将其去掉,否则会有错误
             outputs.remove(e)

         inputs.remove(e) #坏连接一定在inputs中,所以不需要判断,因为所有连接都是最先存放在inputs中

         del msg_dic[e]  #同时,将队列中的坏连接也去掉(如果队列中存在的话)

client

 #!usr/bin/env/python
 # -*- coding:utf-8 -*-
 # from  wangteng
 #本客户端可以跟异步socket_server多并发、select socket server一起用
 import socket

 host = 'localhost'
 port = 9000
 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 s.connect((host, port))
 while True:
     msg = bytes(input(">>:"), encoding="utf-8")  #输入要发送的内容,必须是bytes类型
     s.sendall(msg)   #发送数据
     # filename = input("filename:>>")   #注释的内容是可以发送文件的
     # with open(filename,'rb') as f:#发送文件
     #     for line in f:
     #         s.sendall(line)
     #         data = s.recv(1024)
     #         print('recv:',data)
     data = s.recv(1024)#接收数据
     print('recv:', data.decode())#解码可以显示中文
 s.close()

12.  重要要会:IO多路复用之  selector
server

 #io多路复用实现并发,并不是异步  selector最好
 import selectors
 import socket

 sel = selectors.DefaultSelector()

 def accept(sock, mask):
     conn, addr = sock.accept()  # 新建一个连接
     print('accepted', conn, 'from', addr,mask)
     conn.setblocking(False)  #设置为非阻塞
     sel.register(conn, selectors.EVENT_READ, read) #将新连接注册到sel实例中,由sel监测新连接conn,read为回调函数,即新连接conn将会调用read函数

 def read(conn, mask):
     data = conn.recv(1024)  # Should be ready
     if data:
         print('echoing', repr(data), 'to', conn)
         conn.send(data)  # Hope it won't block
     else:  #如果没有数据接收进来
         print('closing', conn)
         sel.unregister(conn)   #取消注册该连接conn
         conn.close()

 sock = socket.socket()
 sock.bind(('localhost', 9000))
 sock.listen(100)
 sock.setblocking(False)
 sel.register(sock, selectors.EVENT_READ, accept) #将sock注册到sel实例中,由sel监测sock,accept为回调函数,即sock要活动的话,                                                    #将会调用accept函数去新建一个连接

 while True:
     events = sel.select() #默认阻塞,有活动连接就返回活动的连接列表,events是一个列表,里边存放所有的连接
     for key, mask in events:  #mask是连接的顺序数
         callback = key.data #调用accept函数
         callback(key.fileobj, mask) #key.fileobj= 文件句柄

client

 #!usr/bin/env/python
 # -*- coding:utf-8 -*-
 # from  wangteng
 #本客户端可以跟异步socket_server多并发、select socket server一起用
 import socket

 host = 'localhost'
 port = 9000
 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 s.connect((host, port))
 while True:
     msg = bytes(input(">>:"), encoding="utf-8")  #输入要发送的内容,必须是bytes类型
     s.sendall(msg)   #发送数据
     # filename = input("filename:>>")   #注释的内容是可以发送文件的
     # with open(filename,'rb') as f:#发送文件
     #     for line in f:
     #         s.sendall(line)
     #         data = s.recv(1024)
     #         print('recv:',data)
     data = s.recv(1024)#接收数据
     print('recv:', data.decode())#解码可以显示中文
 s.close()