一、使用多线程实现生产者与消费者模型
1、 锁模型:
import random import time import threading product = [] lock = threading.Condition() class Producer(threading.Thread): speed=1; def __init__(self, lock,speed): self._lock = lock self.speed=speed; threading.Thread.__init__(self) def run(self): global product while True: if self._lock.acquire(): if len(product)+self.speed > 5: self._lock.wait() else: ProductThing=''; for i in range(self.speed): num=random.random() ProductThing+=str(num)+' ' product.append(str(num)) print ("product "+str(self.speed)+", count=" + str(len(product))) print (' product things:'+ProductThing) self._lock.notify() self._lock.release() time.sleep(1) class Consumer(threading.Thread): def __init__(self, lock): self._lock = lock threading.Thread.__init__(self) def run(self): global product time_start=time.clock() count=0 while True: if self._lock.acquire(): if len(product) <= 0: self._lock.wait() else: speed=random.randint(1, len(product)) consumeThing=''; for i in range(speed): consumeThing+=str(product[0])+' ' del product[0] count=count+1 if(count==20): time_end=time.clock() print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start)) print('consume '+str(speed)+', count=' + str(len(product))) print(' consume things:'+consumeThing) self._lock.notify() self._lock.release() time.sleep(1) def test(): p1 = Producer(lock,1) p1.start() p2 = Producer(lock,2) p2.start() s = Consumer(lock) s.start() if __name__ == '__main__': test()
2、Condition模型
可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁,但是notify and notifyall本身是不会释放占有的Condition内部的锁,所以随后需要condition.release()来显示的释放锁。
Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。
import threading import time,random product=[]; cond = threading.Condition() class Producer(threading.Thread): def __init__(self,speed): threading.Thread.__init__(self) self.speed = speed def run(self): while True: for i in range(self.speed): cond.acquire() while len(product)>=5: cond.wait(); num=random.random(); product.append(num) print("生产者"+str(self.speed)+"生产了:"+str(num)) cond.notifyAll() cond.release() time.sleep(1) class Consumer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): count=0 time_start=time.clock() while True: speed=random.randint(1,5); for i in range(speed): cond.acquire() while len(product)<=0: cond.wait() num=product[0]; del product[0] print("消费者,消费了"+str(num)) count=count+1 if(count==20): time_end=time.clock() print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start)) cond.notify() cond.release() time.sleep(1) Producer(1).start(); Producer(2).start(); Consumer().start();
3、Queue模型
1.创建一个 Queue.Queue() 的实例,来表示缓冲池。
2.每次从使用生产者线程对队列中的数据进行填充,使用消费者线程取出队列中的数据。
import threading import queue import time import random class Producer(threading.Thread): def __init__(self,speed,queue): threading.Thread.__init__(self) self.speed = speed self.queue = queue def run(self): while True: for i in range(self.speed): item = random.random() self.queue.put(item) print("生产者",self.speed,"生产了:",str(item)) time.sleep(1) class Consumer(threading.Thread): def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue def run(self): time_start=time.clock() count=0 while True: speed=random.randint(1, 5) for i in range(speed): item = self.queue.get() print("消费者","消费:",item) count=count+1 if(count==20): time_end=time.clock() print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start)) time.sleep(1) q = queue.Queue(maxsize = 5) if __name__ == '__main__': Producer(1,q).start() Producer(2,q).start() Consumer(q).start()
4、信号量模型:
import sys, time import random from threading import Thread, Semaphore product = [] mutex = Semaphore(1) full = Semaphore(0) empty = Semaphore(5) class Producer(Thread): def __init__(self, speed): Thread.__init__(self); self.speed=speed; def run(self): while True: for i in range(self.speed): ProductThing=''; empty.acquire() mutex.acquire() num=random.random() ProductThing+=str(num)+' ' product.append(str(num)) print ('%s: count=%d' % ("producer"+str(self.speed), len(product))) print (' product things:'+ProductThing) mutex.release() full.release() time.sleep(1) class Consumer(Thread): def __init__(self): Thread.__init__(self); def run(self): count=0 time_start=time.clock() while True: speed=random.randint(1, 5) for i in range(speed): consumeThing="" full.acquire() mutex.acquire() consumeThing+=str(product[0])+' ' del product[0] print('%s: count=%d' % ("consumer", len(product))) print(' consume things:'+consumeThing) count=count+1 if(count==20): time_end=time.clock() print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start)) mutex.release() empty.release() time.sleep(1) if __name__ == '__main__': Producer(1).start() Producer(2).start() Consumer().start()
5、Event模型
threading.Event机制类似于一个线程向其它多个线程发号施令的模式,其它线程都会持有一个threading.Event的对象,这些线程都会等待这个事件的“发生”,如果此事件一直不发生,那么这些线程将会阻塞,直至事件的“发生”。生产者生产完商品会立即通知消费者去消费,消费者消费完商品后会立即通知生产者去生产,适用于产品池数目为一的情况。
import threading import random import time def produce(speed,e_p1,e_p2,e_c,product): while True: for i in range(speed): if(speed==1): e_p1.wait(); if(speed==2): e_p2.wait(); num=random.random(); product.append(num) print("生产者"+str(speed)+",生产了"+str(num)) e_c.set(); if(speed==1): e_p1.clear(); if(speed==2): e_p2.clear(); time.sleep(1) def consume(e_p1,e_p2,e_c,product): count=0 time_start=time.clock() while True: speed=random.randint(1,5); for i in range(speed): e_c.wait(); num=product[0]; del product[0] print("消费者,消费了"+str(num)) count=count+1 if(count==20): time_end=time.clock() print("消费 "+str(count)+"个商品所用的时间: %f s" % (time_end - time_start)) r=random.randint(1,2) if(r==1): e_p1.set() if(r==2): e_p2.set() e_c.clear() time.sleep(1) if __name__ == '__main__': e_p1= threading.Event(); e_p2= threading.Event(); e_c= threading.Event(); e_p1.set() product=[] p1=threading.Thread(target=produce, args=(1,e_p1,e_p2,e_c,product)) p1.start() p2=threading.Thread(target=produce, args=(2,e_p1,e_p2,e_c,product)) p2.start() c1=threading.Thread(target=consume, args=(e_p1,e_p2,e_c,product)) c1.start()
一、多进程实现生产者与消费者模型
1、 信号量和共享内存
''' @author: jqy ''' from multiprocessing import Process import time import random import multiprocessing def produce(speed,mutex,full,empty,product,pindex,cindex): while True: for i in range(speed): ProductThing=''; empty.acquire() mutex.acquire() num=random.random() ProductThing+=str(num)+' ' product[pindex.value]=num pindex.value=(pindex.value+1)%len(product) print ('%s: product things:%s' % ("producer"+str(speed), ProductThing)) mutex.release() full.release() time.sleep(1) def consume(mutex,full,empty,product,pindex,cindex): count=0 time_start=time.clock() while True: speed=random.randint(1, 5) for i in range(speed): consumeThing="" full.acquire() mutex.acquire() consumeThing+=str(product[cindex.value])+' ' cindex.value=(cindex.value+1)%len(product) print('%s: consume things:%s' % ("consumer", consumeThing)) count=count+1 if(count==20): time_end=time.clock() print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start)) mutex.release() empty.release() time.sleep(1) if __name__ == '__main__': product = multiprocessing.Array('d',range(5)) pindex=multiprocessing.Value('i',0) cindex=multiprocessing.Value('i',0) mutex = multiprocessing.Semaphore(1) full = multiprocessing.Semaphore(0) empty = multiprocessing.Semaphore(5) Process(target=produce, args=(1,mutex,full,empty,product,pindex,cindex)).start() Process(target=produce, args=(2,mutex,full,empty,product,pindex,cindex)).start() Process(target=consume, args=(mutex,full,empty,product,pindex,cindex)).start()
2、 Condition模型
import multiprocessing import random import time def produce(speed,cond,product): while True: for i in range(speed): cond.acquire() while len(product)>=5: cond.wait(); num=random.random(); product.append(num) print("生产者"+str(speed)+",生产了:"+str(num)) cond.notify() cond.release() time.sleep(1) def consume(cond,product): count=0 time_start=time.clock() while True: speed=random.randint(1,5); for i in range(speed): cond.acquire() while len(product)<=0: cond.wait() num=product[0]; del product[0] print("消费者,消费了"+str(num)) count=count+1 if(count==20): time_end=time.clock() print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start)) cond.notify() cond.release() time.sleep(1) if __name__ == '__main__': c= multiprocessing.Condition(); product=multiprocessing.Manager().list() p1=multiprocessing.Process(target=produce, args=(1,c,product)) p1.start() p2=multiprocessing.Process(target=produce, args=(2,c,product)) p2.start() c1=multiprocessing.Process(target=consume, args=(c,product)) c1.start() p1.join() p2.join() c1.join()
3、 消息队列模型
''' @author: jqy ''' from multiprocessing import Process import random import time import multiprocessing def produce(speed,q): while True: for i in range(speed): item = random.random() q.put(item) print("生产者",speed,"生产了",str(item)) time.sleep(1) def consume(num,q): count=0 time_start=time.clock() while True: speed=random.randint(1, 5) for i in range(speed): item = q.get() print("消费者","消费了",item) count=count+1 if(count==20): time_end=time.clock() print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start)) time.sleep(1) if __name__ == '__main__': q = multiprocessing.Queue(maxsize = 5) Process(target=produce, args=(1,q)).start() Process(target=produce, args=(2,q)).start() Process(target=consume, args=(1,q)).start()
4、 管道模型:
''' Created on 2016/11/11 @author: jqy ''' from multiprocessing import Process import random import time import multiprocessing def produce(speed,empty,p): while True: for i in range(speed): empty.acquire() item = random.random() p.send(item) print("生产者",speed,",生产了",str(item)) time.sleep(1) def consume(num,empty,p): count=0 time_start=time.clock() while True: speed=random.randint(1,5) for i in range(speed): item = p.recv() print("消费者"," 消费了",item) count=count+1 if(count==20): time_end=time.clock() print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start)) empty.release() time.sleep(1) if __name__ == '__main__': parent_conn, child_conn = multiprocessing.Pipe() empty = multiprocessing.Semaphore(5) Process(target=produce, args=(1,empty,child_conn)).start() Process(target=produce, args=(2,empty,child_conn)).start() Process(target=consume, args=(1,empty,parent_conn)).start()
5、 Event模型
生产者生产完商品会立即通知消费者去消费,消费者消费完商品后会立即通知生产者去生产,适用于产品池数目为一的情况。
import multiprocessing import random import time def produce(speed,e_p1,e_p2,e_c,product): while True: for i in range(speed): if(speed==1): e_p1.wait(); if(speed==2): e_p2.wait(); num=random.random(); product.append(num) print("生产者"+str(speed)+",生产了"+str(num)) e_c.set(); if(speed==1): e_p1.clear(); if(speed==2): e_p2.clear(); time.sleep(1) def consume(e_p1,e_p2,e_c,product): count=0 time_start=time.clock() while True: speed=random.randint(1,5); for i in range(speed): e_c.wait(); num=product[0]; del product[0] print("消费者,消费了"+str(num)) count=count+1 if(count==20): time_end=time.clock() print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start)) r=random.randint(1,2) if(r==1): e_p1.set() if(r==2): e_p2.set() e_c.clear() time.sleep(1) if __name__ == '__main__': e_p1= multiprocessing.Event(); e_p2= multiprocessing.Event(); e_c= multiprocessing.Event(); e_p1.set() product=multiprocessing.Manager().list() p1=multiprocessing.Process(target=produce, args=(1,e_p1,e_p2,e_c,product)) p1.start() p2=multiprocessing.Process(target=produce, args=(2,e_p1,e_p2,e_c,product)) p2.start() c1=multiprocessing.Process(target=consume, args=(e_p1,e_p2,e_c,product)) c1.start() p1.join() p2.join() c1.join()
三、不同主机上生产者消费者模型:
1、 socketTCP模型:
from multiprocessing import Process import queue import threading import socket import random import time MaxSize=5 def produce(speed,host,port): A=(host,port) s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,MaxSize*3) s.setsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF,MaxSize*3) s.connect(A) item=1; while True: for i in range(speed): if(speed==1): print("生产者",speed,",生产了","{0:0=3}".format(item)) s.send(bytes("{0:0=3}".format(item), encoding = "utf8")) else: print("生产者",speed,",生产了","{0:x=3}".format(item)) s.send(bytes("{0:x=3}".format(item), encoding = "utf8")) item=item+1 time.sleep(1) s.close() def consume(host1,port1,host2,port2): q=queue.Queue(maxsize=MaxSize); threading.Thread(target=getMessage, args=(q,host1,port1)).start() threading.Thread(target=getMessage, args=(q,host2,port2)).start() count=0 time_start=time.clock() while True: speed=random.randint(1,MaxSize) for i in range(speed): item=q.get() print("消费者"," 消费了",item) count=count+1 if(count==20): time_end=time.clock() print("消费"+str(count)+"个商品所用的时间: %f s" % (time_end - time_start)) time.sleep(1) def getMessage(q,host,port): A=(host,port) sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF,MaxSize*3) sock.setsockopt(socket.SOL_SOCKET,socket.SO_SNDBUF,MaxSize*3) sock.bind(A) sock.listen(0) tcpClientSock, addr=sock.accept() while True: try: data=tcpClientSock.recv(3) q.put(str(data,encoding = "utf8")) print("count="+str(q.qsize())) except: print("exception") tcpClientSock.close() sock.close() if __name__ == '__main__': Process(target=produce, args=(1,'localhost',8080)).start() Process(target=produce, args=(2,'localhost',8090)).start() Process(target=consume, args=('localhost',8080,'localhost',8090)).start()
2、 远程调用模型:
先在主进程中注册获取产品的方法,消费者在取用商品时调用取用商品的远程方法来获取。取用商品有一定的延迟,使得程序的整个运行速度比较慢。
from multiprocessing import Process from xmlrpc.client import ServerProxy from xmlrpc.server import SimpleXMLRPCServer import random import time import multiprocessing q = multiprocessing.Queue(maxsize = 5) def produce(speed,q): while True: for i in range(speed): item = random.random() q.put(item) print("生产者",speed,"生产了",str(item)) time.sleep(1) def getAProduct(): global q return q.get() def consume(host,port): server = ServerProxy("http://"+host+":"+str(port)) count=0 time_start=time.clock() while True: speed=random.randint(1, 5) for i in range(speed): print("开始远程调用") item = server.getAProduct() print("消费者","消费了",item) count=count+1 if(count==20): time_end=time.clock() print("消费"+str(count)+"个资源所需要的时间: %f s" % (time_end - time_start)) time.sleep(1) if __name__ == '__main__': s = SimpleXMLRPCServer(('localhost', 8000)) s.register_function(getAProduct) print('注册获取产品方法完成') Process(target=produce, args=(1,q)).start() Process(target=produce, args=(2,q)).start() Process(target=consume, args=('localhost',8000)).start() s.serve_forever()