一、共享数据
进程间通信应该尽量避免使用本节所讲的共享数据方式
from multiprocessing import Manager,Process,Lock进程之间操作共享的数据
def work(dic,mutex):
with mutex:
dic['count']-=1
if __name__ == '__main__':
mutex=Lock()
m=Manager()
share_dic=m.dict({'count':50})
p_l=[]
for i in range(50):
p=Process(target=work,args=(share_dic,mutex))
p_l.append(p)
p.start()
for p in p_l:
p.join()
print(share_dic)
二、进程池
apply是阻塞的,apply_async是非阻塞的
close() : 禁止往进程池内再添加任务
join() 主进程阻塞,等待子进程退出
from multiprocessing import Pool进程池1
import os
import time
def task(n):
print('<%s> is running'%os.getpid())
time.sleep(2)
print('<%s> is done'%os.getpid())
return n**2
if __name__ == '__main__':
# print(os.cpu_count())
p=Pool()
for i in range(1,7):
res=p.apply(task,args=(i,))
print('本次任务的结果 :%s' %res)
print('主')
from multiprocessing import Pool进程池改进版
import os
import time
import random
def task(n):
print('<%s> is running' % os.getpid())
time.sleep(random.randint(1, 3))
# print('<%s> is done'%os.getpid())
return n ** 2
if __name__ == '__main__':
p = Pool(4)
obj_l = []
for i in range(1, 21):
obj = p.apply_async(task, args=(i,))
obj_l.append(obj)
p.close()
p.join()
print('主')
for obj in obj_l:
print(obj.get())
from socket import *进程池的应用-服务端
from multiprocessing import Pool
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加
s.bind(('127.0.0.1',8090))
s.listen(5)
def talk(conn,addr):
while True: #通信循环
try:
data=conn.recv(1024)
if not data:break
conn.send(data.upper())
except Exception:
break
conn.close()
if __name__ == '__main__':
p=Pool(4)
while True:#链接循环
conn,addr=s.accept()
p.apply_async(talk,args=(conn,addr))
s.close()
from socket import *客户端
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8090))
while True:
msg=input('>>: ').strip()
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
c.close()
三、回调函数
就是由别人的函数运行期间来回调你实现的函数。
from multiprocessing import Pool爬虫案例
import requests
import os
def get_page(url):
print('<%s> get [%s]'%(os.getpid(),url))
respones=requests.get(url)
return {'url':url,'text':respones.text}
def parse_page(res):
print('<%s> parse [%s]' % (os.getpid(),res['url']))
with open('db.text','a') as f:
parse_page='url:%s size:%s\n'%(res['url'],len(res['text']))
f.write(parse_page)
if __name__ == '__main__':
p=Pool(4)
urls = [
'https://www.baidu.com',
'http://www.openstack.org',
'https://www.python.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]
for url in urls:
p.apply_async(get_page,args=(url,),callback=parse_page)
p.close()
p.join()
print('主',os.getpid())
四、开启线程
(1)创建线程的开销比创建进程的开销小,因而创建线程的速度快
from multiprocessing import Process1
from threading import Thread
import os
import time
def work():
print('<%s> is running' %os.getpid())
time.sleep(2)
print('<%s> is done' %os.getpid())
if __name__ == '__main__':
t=Thread(target=work,)
# t=Process(target=work,)
t.start()
print('主',os.getpid())
(2)同一下的多个线程共享该进程的资源,而多个进程之间内存功空间是隔离的
from multiprocessing import ProcessView Code
from threading import Thread
import os
import time
n=100
def work():
global n
n-=100
if __name__ == '__main__':
# p=Process(target=work,)
p=Thread(target=work,)
p.start()
p.join()
print('主',n)