Python程序中的线程操作(线程池)-concurrent模块
一、Python标准模块——concurrent.futures
官方文档:https://docs.python.org/dev/library/concurrent.futures.html
二、介绍
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor:进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.
三、基本方法
submit(fn, *args, **kwargs)
:异步提交任务
map(func, *iterables, timeout=None, chunksize=1)
:取代for循环submit的操作
shutdown(wait=True)
:相当于进程池的pool.close()+pool.join()
操作
- wait=True,等待池内所有任务执行完毕回收完资源后才继续
- wait=False,立即返回,并不会等待池内的任务执行完毕
- 但不管wait参数为何值,整个程序都会等到所有任务执行完毕
- submit和map必须在shutdown之前
result(timeout=None)
:取得结果
add_done_callback(fn)
:回调函数
done()
:判断某一个线程是否完成
cancle()
:取消某个任务
四、ProcessPoolExecutor
介绍
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
#用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os, time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
executor=ProcessPoolExecutor(max_workers=3)
futures=[]
for i in range(11):
future=executor.submit(task,i)
futures.append(future)
executor.shutdown(True)
print('+++>')
for future in futures:
print(future.result())
五、ThreadPoolExecutor
介绍
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.
#用法
与ProcessPoolExecutor相同
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import currentThread
from multiprocessing import current_process
import time
def task(i):
# print(f'{currentThread().name} 在执行任务{i}')
# 进程
print(f'进程 {current_process().name} 在执行任务 {i}')
time.sleep(2)
return i * 2
if __name__ == '__main__':
# 池子里只有四个线程
# pool = ThreadPoolExecutor(4) # 池子里面有4个线程
# 池子里有四个进程
pool = ProcessPoolExecutor(4)
fu_list = []
for i in range(20):
# task任务要做20次, 4个进程负责做这个事
future = pool.submit(task, i) # task任务要做20次,4个进程负责做这个事情
fu_list.append(future)
# 关闭池的入口, 会等待所有的任务执行完,结束阻塞
pool.shutdown()
for fu in fu_list:
print(fu.result())
六、map的用法
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import os, time, random
def task(n):
print('%s is runing' % os.getpid())
time.sleep(random.randint(1, 3))
return n ** 2
if __name__ == '__main__':
executor = ThreadPoolExecutor(max_workers=3)
# for i in range(11):
# future=executor.submit(task,i)
res = executor.map(task, range(1, 12)) # map取代了for+submit
executor.shutdown()
for r in res:
print(r)
54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
54480 is runing
1
4
9
16
25
36
49
64
81
100
121
七、回调函数
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os
def get_page(url):
print('<进程%s> get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}
def parse_page(res):
res=res.result()
print('<进程%s> parse %s' %(os.getpid(),res['url']))
parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)
if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]
# p=Pool(3)
# for url in urls:
# p.apply_async(get_page,args=(url,),callback=pasrse_page)
# p.close()
# p.join()
p=ProcessPoolExecutor(3)
for url in urls:
p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
Python程序中的线程操作(线程池)-concurrent模块的更多相关文章
-
Python程序中的进程操作-进程池(multiprocess.Pool)
目录 一.进程池 二.概念介绍--multiprocess.Pool 三.参数用法 四.主要方法 五.其他方法(了解) 六.代码实例--multiprocess.Pool 6.1 同步 6.2 异步 ...
-
在Python程序中的进程操作,multiprocess.Process模块
在python程序中的进程操作 之前我们已经了解了很多进程相关的理论知识,了解进程是什么应该不再困难了,刚刚我们已经了解了,运行中的程序就是一个进程.所有的进程都是通过它的父进程来创建的.因此,运行起 ...
-
python 全栈开发,Day38(在python程序中的进程操作,multiprocess.Process模块)
昨日内容回顾 操作系统纸带打孔计算机批处理 —— 磁带 联机 脱机多道操作系统 —— 极大的提高了CPU的利用率 在计算机中 可以有超过一个进程 进程遇到IO的时候 切换给另外的进程使用CPU 数据隔 ...
-
Python程序中的进程操作--—--开启多进程
Python程序中的进程操作-----开启多进程 之前我们已经了解了很多进程相关的理论知识,了解进程是什么应该不再困难了,刚刚我们已经了解了,运行中的程序就是一个进程.所有的进程都是通过它的父进程来创 ...
-
Python程序中的进程操作-开启多进程(multiprocess.process)
目录 一.multiprocess模块 二.multiprocess.process模块 三.process模块介绍 3.1 方法介绍 3.2 属性介绍 3.3 在windows中使用process模 ...
-
29、Python程序中的进程操作(multiprocess.process)
一.multiprocess模块 multiprocess不是一个模块而是python中一个操作.管理进程的包. 子模块分为四个部分: 创建进程部分 进程同步部分 进程池部分 进程之间数据共享 二.m ...
-
Python程序中的进程操作
之前我们已经了解了很多进程相关的理论知识,了解进程是什么应该不再困难了,刚刚我们已经了解了,运行中的程序就是一个进程.所有的进程都是通过它的父进程来创建的.因此,运行起来的python程序也是一个进程 ...
-
Python程序中的进程操作-进程间通信(multiprocess.Queue)
目录 一.进程间通信 二.队列 2.1 概念介绍--multiprocess.Queue 2.1.1 方法介绍 2.1.2 其他方法(了解) 三.代码实例--multiprocess.Queue 3. ...
-
在python程序中的进程操作
multiprocess模块 multiprocess不是一个模块而是python中一个操作.管理进程的包. 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所 ...
-
Python程序中的进程操作-进程间数据共享(multiprocess.Manager)
目录 一.进程之间的数据共享 1.1 Manager模块介绍 1.2 Manager例子 一.进程之间的数据共享 展望未来,基于消息传递的并发编程是大势所趋 即便是使用线程,推荐做法也是将程序设计为大 ...
随机推荐
-
SDUT 2603:Rescue The Princess
Rescue The Princess Time Limit: 1000ms Memory limit: 65536K 有疑问?点这里^_^ 题目描述 Several days ago, a b ...
-
IOS 学习笔记 2015-03-24 OC-API-常用结构体
一 标题 常用结构体 二 API 1 NSRange 表示一个范围 A 实例化 NSRange rg={3,5};//第一参数是起始位置第二个参数是长度 B 实例化 NSRange rg2=NSMak ...
-
哈希(1) hash的基本知识回顾
好久没看数据结构了,现在也打不起精神来,翻了一下书,严蔚敏那本书.,以下是书的第9章,发现自己很多时候对知识的认识无法结构化和系统化,都是零散的,模糊的混乱的记忆,以后要有体系, 第9章 查找 ...
-
那两年炼就的Android内功修养
http://blog.csdn.net/luoshengyang/article/details/8923485 http://iconsparadise.com/ http://blog.csdn ...
-
Android 发送多个不同的快捷方式(shortcut)到桌面并向其启动的Activity传参
需求: 对于创建快捷方式到桌面,网上能查到不少资料,但一般都是针对应用程序本身的. 前阵子在做项目时,遇到了一个类似于百度贴吧里面的一个需求:对于每个具体的贴吧,都可以将其发送到桌面(HomeScre ...
-
基本类型(2):oracle中有4个大对象(lobs)类型可用,分别是blob,clob,bfile,nclob。
1)blob:二进制lob,为二进制数据,最长可达4GB,存贮在数据库中. 2)clob:字符lob,字符数据,最长可以达到4GB,存贮在数据库中. 3)bfile:二进制文件;存贮在数据库之外的只读 ...
-
Java 避免创建不必要的对象
最好能重用对象而不是在每次需要的时候就创建一个相同功能的新对象.如果对象是不可变的,它就始终可以被重用. String s = new String("stringette"); ...
-
FreeBsd网络性能优化方案sysctl
以下是阿盛的配置 sysctl net.inet.tcp.msl= sysctl net.inet.tcp.mssdflt= sysctl net.inet.tcp.minmss= sysctl ne ...
-
BZOJ 1002 - 轮状病毒 - [基尔霍夫矩阵(待补)+高精度]
题目链接:https://www.lydsy.com/JudgeOnline/problem.php?id=1002 Description 轮状病毒有很多变种,所有轮状病毒的变种都是从一个轮状基产生 ...
-
[技术选型] dubbo
分布式服务架构 - 阿里开源项目 简介 Dubbo架构设计详解 Dubbo与Zookeeper.SpringMVC整合和使用(负载均衡.容错)