Python并发编程实例教程

时间:2022-10-04 00:32:48

有关Python中的并发编程实例,主要是对Threading模块的应用,文中自定义了一个Threading类库。

一、简介
  我们将一个正在运行的程序称为进程。每个进程都有它自己的系统状态,包含内存状态、打开文件列表、追踪指令执行情况的程序指针以及一个保存局部变量的调用栈。通常情况下,一个进程依照一个单序列控制流顺序执行,这个控制流被称为该进程的主线程。在任何给定的时刻,一个程序只做一件事情。
  一个程序可以通过Python库函数中的os或subprocess模块创建新进程(例如os.fork()或是subprocess.Popen())。然而,这些被称为子进程的进程却是独立运行的,它们有各自独立的系统状态以及主线程。因为进程之间是相互独立的,因此它们同原有的进程并发执行。这是指原进程可以在创建子进程后去执行其它工作。
  虽然进程之间是相互独立的,但是它们能够通过名为进程间通信(IPC)的机制进行相互通信。一个典型的模式是基于消息传递,可以将其简单地理解为一个纯字节的缓冲区,而send()或recv()操作原语可以通过诸如管道(pipe)或是网络套接字(network socket)等I/O通道传输或接收消息。还有一些IPC模式可以通过内存映射(memory-mapped)机制完成(例如mmap模块),通过内存映射,进程可以在内存中创建共享区域,而对这些区域的修改对所有的进程可见。
  多进程能够被用于需要同时执行多个任务的场景,由不同的进程负责任务的不同部分。然而,另一种将工作细分到任务的方法是使用线程。同进程类似,线程也有其自己的控制流以及执行栈,但线程在创建它的进程之内运行,分享其父进程的所有数据和系统资源。当应用需要完成并发任务的时候线程是很有用的,但是潜在的问题是任务间必须分享大量的系统状态。
  当使用多进程或多线程时,操作系统负责调度。这是通过给每个进程(或线程)一个很小的时间片并且在所有活动任务之间快速循环切换来实现的,这个过程将CPU时间分割为小片段分给各个任务。例如,如果你的系统中有10个活跃的进程正在执行,操作系统将会适当的将十分之一的CPU时间分配给每个进程并且循环地在十个进程之间切换。当系统不止有一个CPU核时,操作系统能够将进程调度到不同的CPU核上,保持系统负载平均以实现并行执行。
  利用并发执行机制写的程序需要考虑一些复杂的问题。复杂性的主要来源是关于同步和共享数据的问题。通常情况下,多个任务同时试图更新同一个数据结构会造成脏数据和程序状态不一致的问题(正式的说法是资源竞争的问题)。为了解决这个问题,需要使用互斥锁或是其他相似的同步原语来标识并保护程序中的关键部分。举个例子,如果多个不同的线程正在试图同时向同一个文件写入数据,那么你需要一个互斥锁使这些写操作依次执行,当一个线程在写入时,其他线程必须等待直到当前线程释放这个资源。
Python中的并发编程
  Python长久以来一直支持不同方式的并发编程,包括线程、子进程以及其他利用生成器(generator function)的并发实现。
  Python在大部分系统上同时支持消息传递和基于线程的并发编程机制。虽然大部分程序员对线程接口更为熟悉,但是Python的线程机制却有着诸多的限制。Python使用了内部全局解释器锁(GIL)来保证线程安全,GIL同时只允许一个线程执行。这使得Python程序就算在多核系统上也只能在单个处理器上运行。Python界关于GIL的争论尽管很多,但在可预见的未来却没有将其移除的可能。
  Python提供了一些很精巧的工具用于管理基于线程和进程的并发操作。即使是简单地程序也能够使用这些工具使得任务并发进行从而加快运行速度。subprocess模块为子进程的创建和通信提供了API。这特别适合运行与文本相关的程序,因为这些API支持通过新进程的标准输入输出通道传送数据。signal模块将UNIX系统的信号量机制暴露给用户,用以在进程之间传递事件信息。信号是异步处理的,通常有信号到来时会中断程序当前的工作。信号机制能够实现粗粒度的消息传递系统,但是有其他更可靠的进程内通讯技术能够传递更复杂的消息。threading模块为并发操作提供了一系列高级的,面向对象的API。Thread对象们在一个进程内并发地运行,分享内存资源。(www.jbxue.com)使用线程能够更好地扩展I/O密集型的任务。multiprocessing模块同threading模块类似,不过它提供了对于进程的操作。每个进程类是真实的操作系统进程,并且没有共享内存资源,但multiprocessing模块提供了进程间共享数据以及传递消息的机制。通常情况下,将基于线程的程序改为基于进程的很简单,只需要修改一些import声明即可。
Threading模块示例
  以threading模块为例,思考这样一个简单的问题:如何使用分段并行的方式完成一个大数的累加。
import threading

class SummingThread(threading.Thread):
def __init__(self, low, high):
super(SummingThread, self).__init__()
self.low = low
self.high = high
self.total = 0

def run(self):
for i in range(self.low, self.high):
self.total += i

thread1 = SummingThread(0, 500000)
thread2 = SummingThread(500000, 1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print(result)

自定义Threading类库
  我写了一个易于使用threads的小型Python类库,包含了一些有用的类和函数。
关键参数:
  * do_threaded_work – 该函数将一系列给定的任务分配给对应的处理函数(分配顺序不确定)
  * ThreadedWorker – 该类创建一个线程,它将从一个同步的工作队列中拉取工作任务并将处理结果写入同步结果队列
  * start_logging_with_thread_info – 将线程id写入所有日志消息。(依赖日志环境)
  * stop_logging_with_thread_info – 用于将线程id从所有的日志消息中移除。(依赖日志环境)
import threading
import logging

def do_threaded_work(work_items, work_func, num_threads=None, per_sync_timeout=1, preserve_result_ordering=True):
""" Executes work_func on each work_item. Note: Execution order is not preserved, but output ordering is (optionally).

Parameters:
- num_threads Default: len(work_items) --- Number of threads to use process items in work_items.
- per_sync_timeout Default: 1 --- Each synchronized operation can optionally timeout.
- preserve_result_ordering Default: True --- Reorders result_item to match original work_items ordering.

Return:
--- list of results from applying work_func to each work_item. Order is optionally preserved.

Example:

def process_url(url):
# TODO: Do some work with the url
return url

urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]

# process urls in parallel
result_items = do_threaded_work(urls_to_process, process_url)

# print(results)
print(repr(result_items))
"""
global wrapped_work_func
if not num_threads:
num_threads = len(work_items)

work_queue = Queue.Queue()
result_queue = Queue.Queue()

index = 0
for work_item in work_items:
if preserve_result_ordering:
work_queue.put((index, work_item))
else:
work_queue.put(work_item)
index += 1

if preserve_result_ordering:
wrapped_work_func = lambda work_item: (work_item[0], work_func(work_item[1]))

start_logging_with_thread_info()

#spawn a pool of threads, and pass them queue instance
for _ in range(num_threads):
if preserve_result_ordering:
t = ThreadedWorker(work_queue, result_queue, work_func=wrapped_work_func, queue_timeout=per_sync_timeout)
else:
t = ThreadedWorker(work_queue, result_queue, work_func=work_func, queue_timeout=per_sync_timeout)
t.setDaemon(True)
t.start()

work_queue.join()
stop_logging_with_thread_info()

logging.info('work_queue joined')

result_items = []
while not result_queue.empty():
result = result_queue.get(timeout=per_sync_timeout)
logging.info('found result[:500]: ' + repr(result)[:500])
if result:
result_items.append(result)

if preserve_result_ordering:
result_items = [work_item for index, work_item in result_items]

return result_items

class ThreadedWorker(threading.Thread):
""" Generic Threaded Worker
Input to work_func: item from work_queue

Example usage:

import Queue

urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]

work_queue = Queue.Queue()
result_queue = Queue.Queue()

def process_url(url):
# TODO: Do some work with the url
return url

def main():
# spawn a pool of threads, and pass them queue instance
for i in range(3):
t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
t.setDaemon(True)
t.start()

# populate queue with data
for url in urls_to_process:
work_queue.put(url)

# wait on the queue until everything has been processed
work_queue.join()

# print results
print repr(result_queue)

main()
"""

def __init__(self, work_queue, result_queue, work_func, stop_when_work_queue_empty=True, queue_timeout=1):
threading.Thread.__init__(self)
self.work_queue = work_queue
self.result_queue = result_queue
self.work_func = work_func
self.stop_when_work_queue_empty = stop_when_work_queue_empty
self.queue_timeout = queue_timeout

def should_continue_running(self):
if self.stop_when_work_queue_empty:
return not self.work_queue.empty()
else:
return True

def run(self):
while self.should_continue_running():
try:
# grabs item from work_queue
work_item = self.work_queue.get(timeout=self.queue_timeout)

# works on item
work_result = self.work_func(work_item)

#place work_result into result_queue
self.result_queue.put(work_result, timeout=self.queue_timeout)

except Queue.Empty:
logging.warning('ThreadedWorker Queue was empty or Queue.get() timed out')

except Queue.Full:
logging.warning('ThreadedWorker Queue was full or Queue.put() timed out')

except:
logging.exception('Error in ThreadedWorker')

finally:
#signals to work_queue that item is done
self.work_queue.task_done()

def start_logging_with_thread_info():
try:
formatter = logging.Formatter('[thread %(thread)-3s] %(message)s')
logging.getLogger().handlers[0].setFormatter(formatter)
except:
logging.exception('Failed to start logging with thread info')

def stop_logging_with_thread_info():
try:
formatter = logging.Formatter('%(message)s')
logging.getLogger().handlers[0].setFormatter(formatter)
except:
logging.exception('Failed to stop logging with thread info')

 使用示例
from test import ThreadedWorker
from queue import Queue

urls_to_process = ["http://facebook.com", "http://pypix.com"]

work_queue = Queue()
result_queue = Queue()

def process_url(url):
# TODO: Do some work with the url
return url

def main():
# spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
t.setDaemon(True)
t.start()

# populate queue with data
for url in urls_to_process:
work_queue.put(url)

# wait on the queue until everything has been processed
work_queue.join()

# print results
print(repr(result_queue))

main()

Python并发编程实例教程的更多相关文章

  1. Python并发编程二(多线程、协程、IO模型)

    1.python并发编程之多线程(理论) 1.1线程概念 在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程 线程顾名思义,就是一条流水线工作的过程(流水线的工作需要电源,电源就相当于 ...

  2. Python并发编程一(多进程)

    1.背景知识(进程.多道技术) 顾名思义,进程即正在执行的一个过程.进程是对正在运行程序的一个抽象. 进程的概念起源于操作系统,是操作系统最核心的概念,也是操作系统提供的最古老也是最重要的抽象概念之一 ...

  3. Python并发编程系列之多进程(multiprocessing)

    1 引言 本篇博文主要对Python中并发编程中的多进程相关内容展开详细介绍,Python进程主要在multiprocessing模块中,本博文以multiprocessing种Process类为中心 ...

  4. python并发编程&多线程(二)

    前导理论知识见:python并发编程&多线程(一) 一 threading模块介绍 multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性 官网链 ...

  5. python并发编程&多进程(二)

    前导理论知识见:python并发编程&多进程(一) 一 multiprocessing模块介绍 python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_cou ...

  6. Python进阶(4)_进程与线程 (python并发编程之多进程)

    一.python并发编程之多进程 1.1 multiprocessing模块介绍 由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大 ...

  7. 快速了解Python并发编程的工程实现(上)

    关于我 一个有思想的程序猿,终身学习实践者,目前在一个创业团队任team lead,技术栈涉及Android.Python.Java和Go,这个也是我们团队的主要技术栈. Github:https:/ ...

  8. Python并发编程-concurrent包

    Python并发编程-concurrent包 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.concurrent.futures包概述 3.2版本引入的模块. 异步并行任务编程 ...

  9. Python并发编程-线程同步(线程安全)

    Python并发编程-线程同步(线程安全) 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 线程同步,线程间协调,通过某种技术,让一个线程访问某些数据时,其它线程不能访问这些数据,直 ...

随机推荐

  1. Java 常用排序算法/程序员必须掌握的 8大排序算法

    Java 常用排序算法/程序员必须掌握的 8大排序算法 分类: 1)插入排序(直接插入排序.希尔排序) 2)交换排序(冒泡排序.快速排序) 3)选择排序(直接选择排序.堆排序) 4)归并排序 5)分配 ...

  2. 超级账本fabric原理之gossip详解

    Goosip协议 去中心化.容错和最终一致性的算法 信息达到同步的最优时间:log(N). 功能: 节点发现 数据广播 gossip中有三种基本的操作: push - A节点将数据(key,value ...

  3. 通过Xtrabackup实现MySQL实例的全库备份与按需单库恢复

    在实际的生产环境中,为了管理方便,我们一般是通过 Xtrabackup实现实例的全库备份,即将实例上的所有数据库备份. 但是,考虑到快速恢复 我们常常面临的需求是快速还原单个数据库.针对初学者来说,网 ...

  4. LeetCode 104. Maximum Depth of Binary Tree二叉树的最大深度 C++/Java

    Given a binary tree, find its maximum depth. The maximum depth is the number of nodes along the long ...

  5. RAD Studio 10.3 来了

    官方原版下载链接:HTTP FTP 官方更新说明:http://docwiki.embarcadero.com/RADStudio/Rio/en/What's_New [官方更新说明简译]1.Delp ...

  6. 解决VS2017引用报错问题

    1.打开VS2017下的Developer Command Prompt for VS 2017 2.然后在CMD窗口输入 CD CD C:\Program Files\Microsoft Visua ...

  7. Eigen学习之Array类

    Eigen 不仅提供了Matrix和Vector结构,还提供了Array结构.区别如下,Matrix和Vector就是线性代数中定义的矩阵和向量,所有的数学运算都和数学上一致.但是存在一个问题是数学上 ...

  8. [洛谷P1709] [USACO5.5]隐藏口令Hidden Password

    洛谷题目链接:[USACO5.5]隐藏口令Hidden Password 题目描述 有时候程序员有很奇怪的方法来隐藏他们的口令.Binny会选择一个字符串S(由N个小写字母组成,5<=N< ...

  9. 【转】servlet&sol;filter&sol;listener&sol;interceptor区别与联系

    原文:https://www.cnblogs.com/doit8791/p/4209442.html 一.概念: 1.servlet:servlet是一种运行服务器端的java应用程序,具有独立于平台 ...

  10. CentOS 6&period;9使用Setup配置网络(解决dhcp模式插入网线不自动获取IP的问题)

    说明:dhcp模式插入网线不自动获取IP是因为网卡没有激活,造成这种原因的,应该是安装系统时没有插入网线造成的. 解决方法: 修改网卡配置文件 vim /etc/sysconfig/network-s ...