使用 Python 可以编写多线程程序,注意,这并不是说程序能在多个 CPU 核上跑。如果你想这么做,可以看看关于 Python 并行计算的,比如官方 Wiki。
Python 线程的主要应用场景是一些包含等待或 I/O 的任务,比如与远程 Web 服务器交互,多线程能够让 Python 在等待时执行其他代码,这提高了系统的交互性。例如下面这个爬虫程序:
import Queue
import threading
import urllib2
# 被每个线程调用
def get_url(q, url):
q.put(urllib2.urlopen(url).read())
urls = ["http://google.com", "http://yahoo.com"]
q = Queue.Queue()
for u in urls:
t = threading.Thread(target=get_url, args = (q,u))
t.daemon = True
t.start()
s = q.get()
print(s)
这个程序会几乎同时发出对 Google 和 Yahoo 的 HTTP 请求,并输出最先获得响应的网页。
该模块的设计基于 Java 的线程模型。但是,在 Java 使锁和条件变量成为每个对象的基本行为的地方,它们是 Python 中的独立对象。Python 的 Thread 类支持 Java Thread 类的行为的子集;目前在 Python 中,没有支持优先级,没有线程组,线程不能被销毁、停止、暂停、恢复或中断。 Java 的 Thread 类的静态方法在被映射到模块级全局函数。
本模块中的所有方法都是原子执行的 (atomically)。
函数
threading
模块定义了以下函数:
active_count()
返回当前处于生存状态 (alive) 的 Thread
对象的数量,它和 enumerate()
所返回的列表的长度相同。
current_thread()
返回当前的 Thread
对象。如果当前线程不是通过 threading 模块创建的,那么会返回一个功能受限的“哑对象” (Dummy object)。
get_ident()
返回当前线程的线程标识符,它是一个非零整数,没有实际意义,但你可以用它来索引一个线程相关数据结构(例如用一个全局列表存储当前程序中的所有线程)。注意:当线程被回收和创建时,这些标识符可能会被回收重用。
enumerate()
返回一个当前 alive 的线程对象的列表。
main_thread()
返回主线程对象,就是 Python 解释器启动时的线程。
settrace(func)
为所有由 threading 模块启动的线程设置一个 trace 函数 func 。在每个线程的 run()
之前,这个 func 会被传给 sys.settrace()
为每个线程设置 trace 函数。
setprofile(func)
为所有由 threading 模块启动的线程设置一个 profile 函数 func 。在每个线程的 run()
之前,这个 func 会被传给 sys.setprofile()
为每个线程设置 profile 函数。
stack_size([size])
设置或返回之后创建新线程时的 stack 大小。size 可以是 0 (使用系统默认值)或大于等于 32,768 (32 KiB 是 Python 解释器能够施展拳脚的最小栈空间).
常量
TIMEOUT_MAX
阻塞函数的 timeout 参数的允许最大值。阻塞函数包括 Lock.acquire()
, RLock.acquire()
, Condition.wait()
等。
类
Thread-Local 数据
class local()
Thread-local 数据是专属于某个线程的局部数据。它在 global scope 被定义,但如果在某个线程里访问它,它是局部特有的。示例查看 _threading_local 模块。
Thread
Thread 类表示在单独的控制线程中运行的活动。有两种方法来指定这种活动:通过将 callable 对象传递给构造函数,或者通过覆盖子类中的 run()
方法。换句话说,你只能覆盖这个类的 __init__()
和 run()
方法。
创建 Thread 对象后,通过调用 start()
方法来启动这个线程,它会自动调用 run()
方法。
当这个线程启动后,它的状态转为 alive。当它的 run()
终止或出现了没有处理的异常时,它的状态转为非 alive。可以用 is_alive()
函数来查看一个线程的状态。
其他线程可以调用某个线程的 join()
方法,这会阻塞调用方的线程,直到被调用方的线程终止。
一个线程可以有名字,你可以读/写 name
属性。
一个线程可以被标记为 daemon 线程,这种线程在主线程退出后就会自动退出,所以你不需要手动关闭它们。
有可能会出现“哑线程对象”,或者叫做“外星线程”,指代那些不是由 threading 模块创建的线程,比如从 C 代码创建的。它们无法被 join()
,因为无法探测它们何时终止。
class Thread(group=None, target=None, name=None, args=(), kwargs={}, ***, daemon=None)
group 必须是
None
,这是为以后 Python 实现线程组做准备的。target 是一个可调用对象,被
run()
调用。name 是线程名,默认会被设置为 "Thread-N",N 为整数。
args 是传入 target 的参数,是一个元组,默认为
()
。kwargs 是传入 target 的关键字参数,是一个字典,默认为
{}
。daemon 设置此线程是否为守护线程。默认
None
意为从本线程继承。
start()
启动线程。最多被一个线程调用一次。如果被多次调用,会引发 RuntimeError
。
run()
代表线程活动的函数。
join(timeout=None)
等待一个线程的终止。由于这个函数总是返回 None
,所以如果你传入了 timeout,那么之后还需要用 is_alive()
来判断这个线程是正常终止还是超时了。
如果 join()
被用在了本线程上,就会引发 RuntimeError
,因为这会引起死锁。把它用在一个还没有启动的线程上同样会引发这个异常。
name
线程名字,没有实际语义。旧的 API 有 getName()
和 setName
,但没必要使用了。
ident
线程描述符,通过 get_ident()
来访问。
is_alive()
返回线程是否还在运行。
daemon
线程是否为守护线程。旧的 API 有 isDaemon()
和 setDaemon()
,也没必要使用了。
CPython 实现细节:在 CPython 中,由于 GIL 的存在,在同一时刻仅有一个线程能运行。因此 Python threading 模块的主要应用场景是同时运行多个 I/O 密集型的任务。如果你想利用多核计算资源,可以使用多进程模块 multiprocessing 或 concurrent.futures.ProcessPoolExecutor 。
Lock
原始锁是一种同步原语,在锁定时不属于特定线程。在 Python 中,它是目前可用的最低级别同步原语,由 _thread 扩展模块直接实现。
一个原始锁有两个主要的方法:acquire()
和 release()
,分别用来上锁和解锁。如果尝试 release 一个没有锁上的锁,会引发 RuntimeError
。原始锁支持 “上下文管理协议(Context Management Protocol)”。
“上下文管理协议”:使用
with
语句来获得一个锁、条件变量或信号量,相当于调用acquire()
;离开with
块后,会自动调用release()
。例如:
with lock: # 如果无法获取则会阻塞在这里
# 在这里锁已经被获得
# 在外面锁被释放
如果有多个线程在等待同一个锁,当这个锁被释放时,哪一个进程会获得锁是不确定的,这取决于实现。
class Lock
返回一个原始锁。此类的方法有:
acquire(blocking=True, timeout=-1)
获得一个锁,可以是阻塞或非阻塞的。默认为阻塞。也可以设置 timeout 来设置阻塞的最长时间。
返回 True
或 False
告诉用户是否成功获得锁。
release()
释放一个锁。没有返回值。
RLock
可重入锁 (Reetrant Lock) 是一种同步原语,与原始锁的唯一区别是可以由同一线程多次获取。在内部,除了原始锁使用的锁定/解锁状态之外,它还使用“拥有线程”和“递归级别”的概念。当一个线程 acquire()
了一个锁后,递归等级会设为 1,此时其他线程无法获取此锁,但本线程还可以获取,使得递归等级加 1。本线程释放一次锁,就使得递归等级减 1。直到减为 0,此时锁被释放。
可重入锁的方法 acquire()
和 release()
请参看原始锁,它也支持 “上下文管理协议”。
Condition
Condition 对象就是条件变量,它总是与某种锁相关联,可以是外部传入的锁或是系统默认创建的锁。当几个条件变量共享一个锁时,你就应该自己传入一个锁。这个锁不需要你操心,Condition 类会管理它。
acquire()
和 release()
可以操控这个相关联的锁。其他的方法都必须在这个锁被锁上的情况下使用。wait()
会释放这个锁,阻塞本线程直到其他线程通过 notify()
或 notify_all()
来唤醒它。一旦被唤醒,这个锁又被 wait()
锁上。
经典的 consumer/producer 问题的代码示例为:
import threading
cv = threading.Condition()
# Consumer
with cv:
while not an_item_is_available():
cv.wait()
get_an_available_item()
# Producer
with cv:
make_an_item_available()
cv.notify(
还可以用 wait_for()
来替换 while
循环消费者对系统状态的判断:
# Consumer
with cv:
cv.wait_for(an_item_is_available)
get_an_available_item()
class Condition(lock=None)
返回一个条件变量。lock 必须是一个 Lock 或 RLock,如果使用默认值 None,系统会自动创建一个 RLock 作为锁。这个类的方法签名列表(用法基本已经介绍清楚了):
- acquire(*args)
- release()
- wait(timeout=None)
- wait_for(predicate, timeout=None)
- notify(n=1)
- notify_all()
Semaphore
这是计算机历史上最古老的同步原语:信号量。信号量管理着内部的一个计数器,acquire()
使得计数器减 1,release()
使得计数器加 1。当 acquire()
发现计数器为 0 时,函数会阻塞直到某个线程调用了这个信号量的 release()
。
class Semaphore(value=1)
它的方法只有 acquire()
和 release()
,方法签名和 Lock 的相同。
class BoundedSemaphore(value=1)
实现有界信号量对象的类。有界信号量是指它的计数器永远不会超过初始值 valve 。
Semaphore 例子
信号量通常用于保护容量有限的资源,例如数据库服务器。在资源大小固定的任何情况下,你应该使用有界信号量。在生成任何工作线程之前,主线程应该初始化信号量:
maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)
一旦产生,工作线程在需要连接到服务器时调用信号量的获取和释放方法(这里使用了 with
语句):
with pool_sema:
conn = connectdb()
try:
# ... use connection ...
finally:
conn.close()
Event
这是线程之间通信的最简单机制之一:一个线程发出事件信号,其他线程等待它。
class Event
一个事件对象管理着一个标志,set()
将标志置为 True
,clear()
将标志置为 False
。wait()
阻塞本线程直到标志为 True
。最初标志为 False
。此类的方法签名列表为:
- is_set() -> bool
- set()
- clear()
- wait(timeout=None)
Timer
Timer 表示仅在经过一定时间后才应运行的操作。它是 Thread 的子类。
class Timer(interval, function, args=None, kwargs=None)
方法只有一个 cancel()
,用来取消计时器,停止计时器的运行。
例子
def hello():
print("hello, world")
t = Timer(30.0, hello)
t.start() # 30 秒后 "hello, world" 会被打印出来
Barrier
此类提供了一个简单的同步原语,供需要相互等待的固定数量的线程使用。每个线程都试图通过调用 wait()
方法来跨越屏障,并将阻塞直到所有线程都进行了 wait()
调用。此时,这些线程同时释放。
class Barrier(parties, action=None, timeout=None)
wait(timeout=None)
返回值为 parties - 1,每个线程都不相同,可以这样用:
i = barrier.wait()
if i == 0:
# 只有一个线程需要打印
print("passed the barrier")
reset()
将 barrier 重置为初始状态。原本等待的线程将会引发 BrokenBarrierError
异常。
abort()
将一个 barrier 设为破损状态。当前和之后对这个 barrier 的 wait()
都会引发 BrokenBarrierError
。如果一个线程要退出,为了防止死锁,应当使用这个方法。
parties
这一组线程的数量。
n_waiting
目前处于等待状态的线程的数量。
broken
一个布尔值,代表这个 barrier 是否破损。
exception BrokenBarrierError
这个异常是 RuntimeError
的子类,当一个 Barrier 被重置或破损时,它会被引发。
参考
https://docs.python.org/3/library/threading.html#threading.Timer.cancel