利用python实现生产者消费者的并发模型

时间:2022-08-02 15:14:24

一、使用多线程实现生产者与消费者模型

1、  锁模型:


import random
import time
import threading

product = [] 
lock = threading.Condition()
 
class Producer(threading.Thread):    
    speed=1;
    def __init__(self, lock,speed):
        self._lock = lock       
        self.speed=speed;
        threading.Thread.__init__(self)
 
    def run(self):
        global product
        while True:
            if self._lock.acquire():
                if len(product)+self.speed > 5:
                    self._lock.wait()
                else:
                    ProductThing='';
                    for i in range(self.speed): 
                        num=random.random()                     
                        ProductThing+=str(num)+' '                                        
                        product.append(str(num))
                    print ("product "+str(self.speed)+", count=" + str(len(product)))                  
                    print ('         product things:'+ProductThing)
                    self._lock.notify()
                self._lock.release()
                time.sleep(1)
 
class Consumer(threading.Thread):
    def __init__(self, lock):
        self._lock = lock
        threading.Thread.__init__(self)

    def run(self):
        global product      
        time_start=time.clock()            
        count=0
        while True:
            if self._lock.acquire():
                if len(product) <= 0:
                    self._lock.wait()
                else:                
                    speed=random.randint(1, len(product))                                                        
                    consumeThing='';                  
                    for i in range(speed):               
                        consumeThing+=str(product[0])+' '                        
                        del product[0]                                                                       
                        count=count+1                       
                        if(count==20):                            
                            time_end=time.clock()                         
                            print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))                                              
                    print('consume '+str(speed)+', count=' + str(len(product)))                   
                    print('         consume things:'+consumeThing)                   
                    self._lock.notify()
                self._lock.release()
                time.sleep(1) 
 
def test():  
    p1 = Producer(lock,1)
    p1.start()   
    p2 = Producer(lock,2)
    p2.start() 
    s = Consumer(lock)
    s.start()
 
if __name__ == '__main__':
    test()


2、Condition模型

可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁,但是notify and notifyall本身是不会释放占有的Condition内部的锁,所以随后需要condition.release()来显示的释放锁。

Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。

 

import threading
import time,random
 
product=[];
cond = threading.Condition()

class Producer(threading.Thread):
    def __init__(self,speed):
        threading.Thread.__init__(self)
        self.speed = speed
    def run(self):
        while True:
            for i in range(self.speed): 
                cond.acquire()
                while len(product)>=5:
                    cond.wait();
                num=random.random();
                product.append(num)
                print("生产者"+str(self.speed)+"生产了:"+str(num))
                cond.notifyAll()
                cond.release()
            time.sleep(1)
 
class Consumer(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        count=0
        time_start=time.clock() 
        while True:         
            speed=random.randint(1,5);
            for i in range(speed):
                cond.acquire()
                while len(product)<=0:
                    cond.wait()
                num=product[0];
                del product[0]
                print("消费者,消费了"+str(num))
                count=count+1       
                if(count==20):                            
                    time_end=time.clock()        
                    print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
                cond.notify()  
                cond.release()
            time.sleep(1)
 

 
Producer(1).start();
Producer(2).start();
Consumer().start();

 

3、Queue模型


1.创建一个 Queue.Queue() 的实例,来表示缓冲池。

2.每次从使用生产者线程对队列中的数据进行填充,使用消费者线程取出队列中的数据。


import threading
import queue
import time
import random
 
 
class Producer(threading.Thread):
    def __init__(self,speed,queue):
        threading.Thread.__init__(self)
        self.speed = speed
        self.queue = queue
 
    def run(self):
        while True:
            for i in range(self.speed):
                item = random.random()
                self.queue.put(item)
                print("生产者",self.speed,"生产了:",str(item))
            time.sleep(1)
            
class Consumer(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue = queue
 
    def run(self):
        time_start=time.clock()       
        count=0
        while True:
            speed=random.randint(1, 5)
            for i in range(speed):
                item = self.queue.get()
                print("消费者","消费:",item)
                count=count+1       
                if(count==20):                            
                    time_end=time.clock()        
                    print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
            time.sleep(1)
 
q = queue.Queue(maxsize = 5)


if __name__ == '__main__':
    
    Producer(1,q).start()
    
    Producer(2,q).start()
    
    Consumer(q).start()


4、信号量模型:


import sys, time
import random
from threading import Thread, Semaphore
 
product = [] 
 
mutex = Semaphore(1)
full = Semaphore(0)
empty = Semaphore(5)
 
class Producer(Thread):
    def __init__(self, speed):
        Thread.__init__(self);
        self.speed=speed;
 
    def run(self):
        while True:
            for i in range(self.speed):
                ProductThing=''; 
                empty.acquire()
                mutex.acquire()
                num=random.random()                       
                ProductThing+=str(num)+' '                                        
                product.append(str(num))
                print ('%s: count=%d' % ("producer"+str(self.speed), len(product)))
                print ('         product things:'+ProductThing)  
                mutex.release()
                full.release()
            time.sleep(1)
 
 
class Consumer(Thread):
    def __init__(self):
        Thread.__init__(self);
 
    def run(self):
        count=0
        time_start=time.clock() 
        while True:
            speed=random.randint(1, 5)
            for i in range(speed):
                consumeThing=""     
                full.acquire()
                mutex.acquire()
                consumeThing+=str(product[0])+' '
                del product[0]     
                print('%s: count=%d' % ("consumer", len(product)))
                print('         consume things:'+consumeThing)
                count=count+1       
                if(count==20):                            
                    time_end=time.clock()        
                    print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))                                                                                       
                mutex.release()
                empty.release()
            time.sleep(1)
 
 
if __name__ == '__main__':
 
    Producer(1).start()
    
    Producer(2).start()
 
    Consumer().start()


5、Event模型

threading.Event机制类似于一个线程向其它多个线程发号施令的模式,其它线程都会持有一个threading.Event的对象,这些线程都会等待这个事件的“发生”,如果此事件一直不发生,那么这些线程将会阻塞,直至事件的“发生”。生产者生产完商品会立即通知消费者去消费,消费者消费完商品后会立即通知生产者去生产,适用于产品池数目为一的情况。

 

import threading 
import random
import time

def produce(speed,e_p1,e_p2,e_c,product):
    while True:
        for i in range(speed):
            if(speed==1):
                e_p1.wait();
            if(speed==2):
                e_p2.wait(); 
            num=random.random();
            product.append(num)
            print("生产者"+str(speed)+",生产了"+str(num))
            e_c.set();
            if(speed==1):
                e_p1.clear();
            if(speed==2):
                e_p2.clear(); 
        time.sleep(1)
 
def consume(e_p1,e_p2,e_c,product):
    count=0
    time_start=time.clock() 
    while True:
        speed=random.randint(1,5);
        for i in range(speed):
            e_c.wait();
            num=product[0];
            del product[0]
            print("消费者,消费了"+str(num))
            count=count+1       
            if(count==20):                            
                time_end=time.clock()        
                print("消费 "+str(count)+"个商品所用的时间: %f s" % (time_end - time_start))
            r=random.randint(1,2)
            if(r==1):
                e_p1.set()
            if(r==2):
                e_p2.set() 
            e_c.clear()
        time.sleep(1)
 
if __name__ == '__main__':
    
    e_p1= threading.Event();
    
    e_p2= threading.Event();
    
    e_c= threading.Event();
    
    e_p1.set()
    
    product=[]
    
    p1=threading.Thread(target=produce, args=(1,e_p1,e_p2,e_c,product))
    
    p1.start()  

    p2=threading.Thread(target=produce, args=(2,e_p1,e_p2,e_c,product))
    
    p2.start()
    
    c1=threading.Thread(target=consume, args=(e_p1,e_p2,e_c,product))
    
    c1.start()

一、多进程实现生产者与消费者模型

 

1、  信号量和共享内存


'''
@author: jqy
'''

from multiprocessing import Process
import time
import random
import multiprocessing 
           
def produce(speed,mutex,full,empty,product,pindex,cindex):
    while True:
        for i in range(speed):
            ProductThing=''; 
            empty.acquire()
            mutex.acquire()
            num=random.random()                       
            ProductThing+=str(num)+' '                                        
            product[pindex.value]=num
            pindex.value=(pindex.value+1)%len(product)
            print ('%s: product things:%s' % ("producer"+str(speed), ProductThing))
            mutex.release()
            full.release()
        time.sleep(1)
 
 
def consume(mutex,full,empty,product,pindex,cindex):
    count=0
    time_start=time.clock() 
    while True:
        speed=random.randint(1, 5)
        for i in range(speed):
            consumeThing=""    
            full.acquire()
            mutex.acquire()
            consumeThing+=str(product[cindex.value])+' '
            cindex.value=(cindex.value+1)%len(product)     
            print('%s:  consume things:%s' % ("consumer", consumeThing))
            count=count+1       
            if(count==20):                            
                time_end=time.clock()        
                print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))                                                                                      
            mutex.release()
            empty.release()
        time.sleep(1)
 
 
if __name__ == '__main__':

    product = multiprocessing.Array('d',range(5))

    pindex=multiprocessing.Value('i',0)

    cindex=multiprocessing.Value('i',0)
    
    mutex = multiprocessing.Semaphore(1)
    
    full = multiprocessing.Semaphore(0)

    empty = multiprocessing.Semaphore(5)
 
    Process(target=produce, args=(1,mutex,full,empty,product,pindex,cindex)).start()  

    Process(target=produce, args=(2,mutex,full,empty,product,pindex,cindex)).start()
    
    Process(target=consume, args=(mutex,full,empty,product,pindex,cindex)).start()

 

2、  Condition模型

 

import multiprocessing 
import random
import time

def produce(speed,cond,product):
    while True:
        for i in range(speed): 
            cond.acquire()
            while len(product)>=5:
                cond.wait();
            num=random.random();
            product.append(num)
            print("生产者"+str(speed)+",生产了:"+str(num))
            cond.notify()
            cond.release()
        time.sleep(1)
 
def consume(cond,product):
    count=0
    time_start=time.clock() 
    while True:
        speed=random.randint(1,5);
        for i in range(speed):
            cond.acquire()
            while len(product)<=0:
                cond.wait()
            num=product[0];
            del product[0]
            print("消费者,消费了"+str(num))
            count=count+1       
            if(count==20):                            
                time_end=time.clock()        
                print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start)) 
            cond.notify()
            cond.release()
        time.sleep(1)
 
if __name__ == '__main__':
    
    c= multiprocessing.Condition();
    
    product=multiprocessing.Manager().list()
       
    p1=multiprocessing.Process(target=produce, args=(1,c,product))
    
    p1.start()  

    p2=multiprocessing.Process(target=produce, args=(2,c,product))
    
    p2.start()
    
    c1=multiprocessing.Process(target=consume, args=(c,product))
    
    c1.start()
    
    p1.join()
    
    p2.join()
    
    c1.join()


3、  消息队列模型

 

'''
@author: jqy
'''
from multiprocessing import Process
import random
import time
import multiprocessing

def produce(speed,q):
    while True:
        for i in range(speed):
            item = random.random()
            q.put(item)
            print("生产者",speed,"生产了",str(item))
        time.sleep(1)

def consume(num,q):
    count=0
    time_start=time.clock() 
    while True:
        speed=random.randint(1, 5)
        for i in range(speed):
            item = q.get()        
            print("消费者","消费了",item)
            count=count+1       
            if(count==20):                            
                time_end=time.clock()        
                print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start)) 
        time.sleep(1)

if __name__ == '__main__':
    
    q =  multiprocessing.Queue(maxsize = 5)
       
    Process(target=produce, args=(1,q)).start()  

    Process(target=produce, args=(2,q)).start()
    
    Process(target=consume, args=(1,q)).start()


4、  管道模型:

 

'''
Created on 2016/11/11

@author: jqy
'''
from multiprocessing import Process
import random
import time
import multiprocessing

def produce(speed,empty,p):
    while True:
        for i in range(speed):
            empty.acquire()
            item = random.random()
            p.send(item)
            print("生产者",speed,",生产了",str(item))
        time.sleep(1)

def consume(num,empty,p):
    count=0
    time_start=time.clock() 
    while True:
        speed=random.randint(1,5)
        for i in range(speed):
            item = p.recv()        
            print("消费者"," 消费了",item)
            count=count+1       
            if(count==20):                            
                time_end=time.clock()        
                print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
            empty.release()
        time.sleep(1)

if __name__ == '__main__':
    
    parent_conn, child_conn  =  multiprocessing.Pipe()

    empty = multiprocessing.Semaphore(5)
       
    Process(target=produce, args=(1,empty,child_conn)).start()  

    Process(target=produce, args=(2,empty,child_conn)).start()
    
    Process(target=consume, args=(1,empty,parent_conn)).start()

 

5、  Event模型

生产者生产完商品会立即通知消费者去消费,消费者消费完商品后会立即通知生产者去生产,适用于产品池数目为一的情况。


import multiprocessing 
import random
import time

def produce(speed,e_p1,e_p2,e_c,product):
    while True:
        for i in range(speed):
            if(speed==1):
                e_p1.wait();
            if(speed==2):
                e_p2.wait(); 
            num=random.random();
            product.append(num)
            print("生产者"+str(speed)+",生产了"+str(num))
            e_c.set();
            if(speed==1):
                e_p1.clear();
            if(speed==2):
                e_p2.clear(); 
        time.sleep(1)
 
def consume(e_p1,e_p2,e_c,product):
    count=0
    time_start=time.clock() 
    while True:
        speed=random.randint(1,5);
        for i in range(speed):
            e_c.wait();
            num=product[0];
            del product[0]
            print("消费者,消费了"+str(num))
            count=count+1       
            if(count==20):                            
                time_end=time.clock()        
                print("消费 "+str(count)+"个商品所用时间: %f s" % (time_end - time_start))
            r=random.randint(1,2)
            if(r==1):
                e_p1.set()
            if(r==2):
                e_p2.set() 
            e_c.clear()
        time.sleep(1)
 
if __name__ == '__main__':
    
    e_p1= multiprocessing.Event();
    
    e_p2= multiprocessing.Event();
    
    e_c= multiprocessing.Event();
    
    e_p1.set()
    
    product=multiprocessing.Manager().list()
       
    p1=multiprocessing.Process(target=produce, args=(1,e_p1,e_p2,e_c,product))
    
    p1.start()  

    p2=multiprocessing.Process(target=produce, args=(2,e_p1,e_p2,e_c,product))
    
    p2.start()
    
    c1=multiprocessing.Process(target=consume, args=(e_p1,e_p2,e_c,product))
    
    c1.start()
    
    p1.join()
    
    p2.join()
    
    c1.join()


三、不同主机上生产者消费者模型:

 

1、  socketTCP模型:


from multiprocessing import Process
import queue
import threading
import socket
import random
import time

MaxSize=5

def produce(speed,host,port):
    A=(host,port)
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,MaxSize*3)
    s.setsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF,MaxSize*3)
    s.connect(A)
    item=1; 
    while True:
        for i in range(speed):
            if(speed==1):
                print("生产者",speed,",生产了","{0:0=3}".format(item))
                s.send(bytes("{0:0=3}".format(item), encoding = "utf8"))
            else:
                print("生产者",speed,",生产了","{0:x=3}".format(item))
                s.send(bytes("{0:x=3}".format(item), encoding = "utf8"))
            item=item+1
        time.sleep(1)
    s.close()

def consume(host1,port1,host2,port2):
    q=queue.Queue(maxsize=MaxSize); 
    threading.Thread(target=getMessage, args=(q,host1,port1)).start()
    threading.Thread(target=getMessage, args=(q,host2,port2)).start()   
    count=0
    time_start=time.clock() 
    while True:
        speed=random.randint(1,MaxSize)
        for i in range(speed):
            item=q.get()
            print("消费者"," 消费了",item)
            count=count+1       
            if(count==20):                            
                time_end=time.clock()        
                print("消费"+str(count)+"个商品所用的时间: %f s" % (time_end - time_start))
        time.sleep(1)

def getMessage(q,host,port):    
    A=(host,port)
    sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF,MaxSize*3)
    sock.setsockopt(socket.SOL_SOCKET,socket.SO_SNDBUF,MaxSize*3) 
    sock.bind(A)
    sock.listen(0)
    tcpClientSock, addr=sock.accept()  
    while True:
        try:                  
            data=tcpClientSock.recv(3)
            q.put(str(data,encoding = "utf8"))
            print("count="+str(q.qsize()))        
        except:
            print("exception")
    tcpClientSock.close()
    sock.close()
    
    
if __name__ == '__main__':
       
    Process(target=produce, args=(1,'localhost',8080)).start()  

    Process(target=produce, args=(2,'localhost',8090)).start()
    
    Process(target=consume, args=('localhost',8080,'localhost',8090)).start()


2、  远程调用模型:

先在主进程中注册获取产品的方法,消费者在取用商品时调用取用商品的远程方法来获取。取用商品有一定的延迟,使得程序的整个运行速度比较慢。


from multiprocessing import Process
from xmlrpc.client import ServerProxy
from xmlrpc.server import SimpleXMLRPCServer
import random
import time
import multiprocessing

q =  multiprocessing.Queue(maxsize = 5)

def produce(speed,q):
    while True:
        for i in range(speed):
            item = random.random()
            q.put(item)
            print("生产者",speed,"生产了",str(item))
        time.sleep(1)

def getAProduct():
    global q
    return q.get()

def consume(host,port):
    server = ServerProxy("http://"+host+":"+str(port))
    count=0
    time_start=time.clock() 
    while True:
        speed=random.randint(1, 5)
        for i in range(speed):
            print("开始远程调用")
            item = server.getAProduct()        
            print("消费者","消费了",item)
            count=count+1       
            if(count==20):                            
                time_end=time.clock()        
                print("消费"+str(count)+"个资源所需要的时间: %f s" % (time_end - time_start)) 
        time.sleep(1)

if __name__ == '__main__':
    
    s = SimpleXMLRPCServer(('localhost', 8000))
    
    s.register_function(getAProduct)
    
    print('注册获取产品方法完成')
            
    Process(target=produce, args=(1,q)).start()  

    Process(target=produce, args=(2,q)).start()
    
    Process(target=consume, args=('localhost',8000)).start()
    
    s.serve_forever()