Python中的并发

时间:2022-06-20 04:53:14

Python并发

并发三种层次

个人理解,并发是计算机在逻辑上能处理多任务的能力。一般分类三种类型:

  1. 异步,异步本质上是单线程的,因为 IO 操作在很多时候会存在阻塞,异步就是在这种阻塞的时候,通过控制权的交换来实现多任务的。即异步本质上是运行过程中的控制权的交换。最典型的例子就是生产者消费者模型。
    • 异步这个概念在不同的地方有不同的说法,比如 python 里面叫做协程,内部通过生成器来实现控制权的交换。但是无论怎么称呼,异步这种并发方式都脱离不了控制权的交换这么一个事实。
  2. 多进程,进程是一个程序具体的实例,拥有自己独立的内存单元。
  3. 多线程,线程依附于进程,共享存储空间。
    • 由于 Python 官方的解释器 Cython 对多线程有一个全局的锁(GIL),所以 Cython 中的线程局限性会比较大。这里不多解释。

这里还有一个概念需要注意,在使用并发的时候弄清楚需要并发的任务是计算密集还是IO密集

因为异步对于计算密集的任务是无效的。因为异步的本质是 IO 操作过程中阻塞时的控制权交换。在计算密集的任务中是没有这样的阻塞的。

协程

前面说了异步的本质是控制权的交换,这里通过一个生产者消费者模型的例子来体会一下这么个过程。

生成者消费者

def consumer():         # 定义消费者,由于有yeild关键词,此消费者为一个生成器
print("[Consumer] Init Consumer ......")
r = "init ok" # 初始化返回结果,并在启动消费者时,返回给生产者
while True:
n = yield r # 消费者通过yield接收生产者的消息,同时返给其结果
print("[Consumer] conusme n = %s, r = %s" % (n, r))
r = "consume %s OK" % n # 消费者消费结果,下个循环返回给生产者 def produce(c): # 定义生产者,此时的 c 为一个生成器
print("[Producer] Init Producer ......")
r = c.send(None) # 启动消费者生成器,同时第一次接收返回结果
print("[Producer] Start Consumer, return %s" % r)
n = 0
while n < 5:
n += 1
print("[Producer] While, Producing %s ......" % n)
r = c.send(n) # 向消费者发送消息并准备接收结果。此时会切换到消费者执行
print("[Producer] Consumer return: %s" % r)
c.close() # 关闭消费者生成器
print("[Producer] Close Producer ......") produce(consumer())

新关键字

# 异步IO例子:适配Python3.5,使用async和await关键字
async def hello(index): # 通过关键字async定义协程
print('Hello world! index=%s, thread=%s' % (index, threading.currentThread()))
await asyncio.sleep(1) # 模拟IO任务
print('Hello again! index=%s, thread=%s' % (index, threading.currentThread())) loop = asyncio.get_event_loop() # 得到一个事件循环模型
tasks = [hello(1), hello(2)] # 初始化任务列表
loop.run_until_complete(asyncio.wait(tasks)) # 执行任务
loop.close() # 关闭事件循环列表

网络io

async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(url, resp.status)
print(url, await resp.text()) loop = asyncio.get_event_loop() # 得到一个事件循环模型
tasks = [ # 初始化任务列表
get("http://zhushou.360.cn/detail/index/soft_id/3283370"),
get("http://zhushou.360.cn/detail/index/soft_id/3264775"),
get("http://zhushou.360.cn/detail/index/soft_id/705490")
]
loop.run_until_complete(asyncio.wait(tasks)) # 执行任务
loop.close() # 关闭事件循环列表

线/进程

这里以进程的multiprocessing模块举例,线程可以使用multiprocessing.dummy,所有的API均相同。

例子

import multiprocessing as mp

############## 直接实例化 ############

def func(number):
result = number * 2 p = mp.Process(target=func, args=(3, )) #实例化进程对象
p.start() #运行进程 ############ 类封装 ############# class MyProcess(mp.Process):
def __init__(self, interval):
mp.Process.__init__(self) # 需要重载的函数
def run(self):
print('I'm running) p = MyProcess(1)
p.start() ################################# p.terminal() # 主动结束进程
p.join() #让主进程等待子进程结束 # 一些常用的属性
p.pid #获得进程的id号
p.name #获得进程名
p.is_alive() #判断进程是否还存活
p.daemon = True #设置进程随主进程一起结束 mp.active_children() #获得当前进程的所有子进程
mp.current_process() #返回正在运行的进程
os.getpid() #获得当前进程的pid

线程池

from multiprocessing.dummy import Pool as ThreadPool 

tasks = list()

def do_task(item):
return item pool = ThreadPool(3) ################ 原始操作 ####################### for item in items:
pool.apply_async(do_task, (item,)) #添加进程,非阻塞,返回执行结果
pool.apply(do_task, (item,)) #阻塞 ############## map操作 #####################3 results = pool.map(do_task, items) ################################ pool.close() #关闭进程池后不会有新的进程被创建
pool.join() #等到结束,必须在close后使用

进程通信

# Lock(锁)
# 限制对资源的访问 def func(lock): #使用with
with lock:
print('I got lock')
def func(lock): #不使用with
lock.acquire() #请求锁
try:
print('I got lock')
finally:
lock.release() #释放锁 lock = mp.Lock() #申请锁
p = mp.Process(target=func, args=(lock,))
p.start() ############################################ # Semaphore(信号量)
# 限制资源的最大连接数 def func(s):
s.aquire() #请求连接
s.release() #断开连接 s = mp.Semaphore(2) #定义信号量的最大连接数
for i in range(5):
p = mp.Process(target=func, arg=(s))
p.start ############################################ # Event(事件)
# 进程间同步 def func(e):
e.wait() #定义等待时间,默认等待到e.set()为止,阻塞
e.is_set() #判断消息是否被发出
print('got') e = mp.Event()
p = mp.Process(target=func, args=(e,))
p.start()
e.set() #发出消息 ############################################ # Queue(队列)
# 多进程之间的数据传递 import Queue Queue.Queue(maxsize = 0) # 先进先出, maxsize小于等于则不限大小
Queue.LifoQueue(maxsize = 0) # 后进先出
Queue.PriorityQueue(maxsize = 0) # 构造一个优先级队列 #异常
Queue.Empty #当调用非阻塞的get()获取空队列的元素时, 引发异常
Queue.Full #当调用非阻塞的put()向满队列中添加元素时, 引发异常 # 生存者消费者模型 def produce(q):
try:
data = q.put(data, block=, timeout=)
# 若block为False且队列已满,则立即抛出Queue.Full
# 若block为True进程会阻塞timeout指定时间,直到队列有空间,否则抛出Queue.Full
except: def cosume(q):
try:
q.get(block=, timeout=) #与上同理
except: q = mp.Queue()
pro = mp.Process(target=produce, args=(q, ))
cos = mp.Process(target=cosume, args=(q, ))
pro.start()
cos.start()
pro.join()
cos.join() ############################################ # Pipe(管道)
# 多进程之间的数据传递 def func1(pipe):
while True:
pipe.send(1)
def func2(pipe):
while True:
pipe.recv() #如果管道内无消息可接受,则会阻塞
pipe = mp.Pipe(duplex=) #参数默认为True即管道的两边均可收发
# 返回(conn1, conn2),当参数为False时conn1只能收信息,conn2只能发消息
p1 = mp.Process(target=func1, args=(pipe[0], ))
p2 = mp.Process(target=func2, args=(pipe[1], ))
p1.start()
p2.start()
p1.join()
p2.join()

并发池

新的并发池模块concurrent.futures再次封装了并发操作,可以用于量大但简单并发操作。

进程线程通用关键字换一下就行。

future对象

from concurrent.futures import ThreadPoolExecutor
import time def working(message):
time.sleep(2)
return message pool = ThreadPoolExecutor(max_workers=2) # 创建一个最大可容纳2个task的线程池 worker1 = pool.submit(working, ("hello")) # 往线程池里面加入一个task
worker2 = pool.submit(working, ("world")) # 往线程池里面加入一个task # submit 返回了一个future对象,即未完成的操作,我们可以通过调用函数来查看其状态 worker1.done() # 判断task1是否结束 worker1.result() # 查看task1返回的结果
worker2.result() # 查看task2返回的结果

executor对象

import concurrent.futures

items = list() # 任务对象

def do_task(item): # 处理函数
return item #################### submit ######################### with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(do_task, item): item for item in items}
for future in concurrent.futures.as_completed(futures):
item = futures[future]
result = future.result()
print(item, result) #################### map ######################### # map跟submit的区别在于submit是无序的,而map是有序的 with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for item, result in zip(items, executor.map(do_task, items)):
print(item, result) #################### wait ######################### # wait返回一个元组,包含已完成任务的集合和未完成任务的集合。 pool = ThreadPoolExecutor(5)
futures = []
for item in items:
futures.append(pool.submit(do_task, item)) concurrent.futures.wait(futures, timeout=None, return_when='FIRST_COMPLETED')

return_when参数可选FIRST_COMPLETED, FIRST_EXCEPTIONALL_COMPLETE

ALL_COMPLETE 会阻塞

参考

Python 并行任务技巧

Python并发编程之线程池/进程池

Python中的并发的更多相关文章

  1. Python中的并发编程

    简介 我们将一个正在运行的程序称为进程.每个进程都有它自己的系统状态,包含内存状态.打开文件列表.追踪指令执行情况的程序指针以及一个保存局部变量的调用栈.通常情况下,一个进程依照一个单序列控制流顺序执 ...

  2. python中实现并发的手段之 协程

    几种实现并发的手段 进程 启动多个进程 进程之间是由操作系统负责调用线程 启动多个线程 真正被CPU执行的最小单位实际是线程 开启一个线程 创建一个线程 寄存器 堆栈 关闭一个线程协程 本质上是一个线 ...

  3. python中的并发执行

    一. Gevent实例 import gevent import requests from gevent import monkey # socket发送请求以后就会进入等待状态,gevent更改了 ...

  4. 用greenlet实现Python中的并发

    from greenlet import greenlet def test1(): print 12 gr2.switch() print 34 def test2(): print 56 gr1. ...

  5. Python中实现异步并发查询数据库

    这周又填了一个以前挖下的坑. 这个博客系统使用Psycopy库实现与PostgreSQL数据库的通信.前期,只是泛泛地了解了一下SQL语言,然后就胡乱拼凑出这么一个简易博客系统. 10月份找到工作以后 ...

  6. python中并发编程基础1

    并发编程基础概念 1.进程. 什么是进程? 正在运行的程序就是进程.程序只是代码. 什么是多道? 多道技术: 1.空间上的复用(内存).将内存分为几个部分,每个部分放入一个程序,这样同一时间在内存中就 ...

  7. python web中的并发请求

    python web可以选择django,也可以选择flask,它们的原理差不多.flask比较轻量,下面写一段flask程序来说明python web对并发请求的处理. app.py import ...

  8. 并发编程---线程 &semi;python中各种锁

    一,概念 在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程 线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程 --车间负责把资源整合到 ...

  9. 操作系统&sol;应用程序、操作中的&OpenCurlyDoubleQuote;并发”、线程和进程,python中线程和进程(GIL锁),python线程编写&plus;锁

    并发编程前言: 1.网络应用 1)爬虫 直接应用并发编程: 2)网络框架 django flask tornado 源码-并发编程 3)socketserver 源码-并发编程 2.运维领域 1)自动 ...

随机推荐

  1. Java 内存分配全面浅析

    本文将由浅入深详细介绍Java内存分配的原理,以帮助新手更轻松的学习Java.这类文章网上有很多,但大多比较零碎.本文从认知过程角度出发,将带给读者一个系统的介绍. 进入正题前首先要知道的是Java程 ...

  2. c&num; 只允许一个实例运行

    1.单件模式,Singleton,应用程序只能允许一个实例在运行.这是最好的解决方法2.查询系统进程里是不是已经运行.private void Form1_Load(object sender, Ev ...

  3. JQ 如何设置单选按钮问题

    <input type="radio" name="db_12" value="2" checked="checked/&g ...

  4. update语句的执行步骤及commit语句的执行顺序

    update语句的执行步骤和其他DML语句的执行步骤是一样的包含insert .delete语句等,执行步骤如下: 一.如果数据和回滚数据不在数据库高速缓存区中,则oracle服务器进程将把他们从数据 ...

  5. NTP服务器时间同步

    CentOS 配置服务器NTP同步 1 查看是否安装 rpm -aq | grep ntp 2 安装 yum -y install ntp 3 配置 /etc/ntp.conf restrict 访问 ...

  6. 在linux中安装git,并将代码发布到github

    楼主Git小白,今天刚刚学习了git,虽然在工作中也许用不到,但是在学习的时候肯定会用到的,毕竟一个程序员首先就要整理自己的知识点,将美丽的代码分享与大家. 楼主是将Git安装在阿里云的centos7 ...

  7. 爬微信好友签名和QQ好友签名

    先说如何爬微信好友签名,主要使用itchat,这个库提供直接的api来获取好友信息,只要用正则过滤出就行了.说一下步骤,就不贴代码了.# 登陆# 获取好友列表# 提取签名# jieba分词# word ...

  8. NPOI2&period;2&period;0&period;0实例详解&lpar;九&rpar;—设置EXCEL单元格【时间格式】

    原文:http://blog.csdn.net/xxs77ch/article/details/50245391 using System; using System.Collections.Gene ...

  9. 深入浅出的webpack构建工具---devTool中SourceMap模式详解&lpar;四&rpar;

    阅读目录 一:什么是SourceMap? 二:理解webpack中的SourceMap的eval,inline,sourceMap,cheap,module 三:开发环境和线上环境如何选择source ...

  10. noip2017d1t3

    其实是参考洛谷上某篇题解的思路: 先求出两个dis数组表示从1走和从n走的最短路: 转移方程:dp[v][dis1[u]-dis1[v]+w+j]+=dp[u][j]; 转移顺序要注意一下呢,肯定是先 ...