python中使用threading.condition交替打印两个字符的程序。
这个程序涉及到两个线程的的协调问题,两个线程为了能够相互协调运行,必须持有一个共同的状态,通过这个状态来维护两个线程的执行,通过使用threading.condition对象就能够完成两个线程之间的这种协调工作。
threading.condition默认情况下会通过持有一个reentrantlock来协调线程之间的工作,所谓可重入锁,是只一个可以由一个线程递归获取的锁,此锁对象会维护当前锁的所有者(线程)和当前所有者递归获取锁的次数(本文在逻辑上和可重入锁没有任何关系,完全可以用一个普通锁替代)。
python文档中给出的描述是:它是一个与某个锁相联系的变量。同时它实现了上下文管理协议。其对象中除了acquire和release方法之外,其它方法的调用的前提是,当前线程必须是这个锁的所有者。
通过代码和其中的注释,能够非常明白地弄清楚condition的原理是怎样的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
import threading
import time
import functools
def worker(cond, name):
"""worker running in different thread"""
with cond: # 通过__enter__方法,获取cond对象中的锁,默认是一个reentrantlock对象
print ( '...{}-{}-{}' . format (name, threading.current_thread().getname(), cond._is_owned()))
cond.wait() # 创建一个新的锁newlock,调用acquire将newlock获取,然后将newlock放入等待列表中,\
# 释放cond._lock锁(_release_save),最后再次调用acquire让newlock阻塞
print ( 'wait returned in {}' . format (name))
if __name__ = = '__main__' :
condition = threading.condition()
t1 = threading.thread(target = functools.partial(worker, condition, 't1' ))
t2 = threading.thread(target = functools.partial(worker, condition, 't2' ))
t2.start() # 启动线程2
t1.start() # 启动线程1
time.sleep( 2 )
with condition:
condition.notify( 1 ) # 按照fifo顺序(wait调用顺序),释放一个锁,并将其从等待列表中删除
time.sleep( 2 )
with condition:
condition.notify( 1 ) # 按照fifo顺序(wait调用顺序),释放另一个锁,并将其从等待队列中删除
t1.join() # 主线程等待子线程结束
t2.join() # 主线程等待子线程结束
print ( 'all done' )
|
其输出为:
1
2
3
4
5
|
...t2 - thread - 2 - true
...t1 - thread - 1 - true
wait returned in t2
wait returned in t1
all done
|
其中wait方法要求获取到threading.condition对象中的锁(如果没有提供,默认使用一个可重入锁),然后自己创建一个新的普通锁(newlock),并获取这个newlock;之后调用_release_save方法释放threading.condition对象中的锁,让其它线程能够获取到;最后再次调用newlock上的acquire方法,由于在创建时已经acquire过,所以此线程会阻塞在此。而wait想要继续执行,必须等待其它线程将产生阻塞的这个newlock给release掉,当然,这就是notify方法的责任了。
notify方法接收一个数字n,从等待列表中取出相应数量的等待对象(让wait方法阻塞的锁对象),调用其release方法,让对应的wait方法能够返回。而notify_all方法仅仅就是将n设置为等待列表的总长度而已。
在理解了threading.condition对象中wait和notify的工作原理之后,我们就可以利用它们来实现两个线程交替打印字符的功能了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
import threading
import functools
import time
def print_a(state):
while true:
if state.closed:
print ( 'close a' )
return
print ( 'a' )
time.sleep( 2 )
state.set_current_is_a(true)
state.wait_for_b()
def print_b(state):
while true:
if state.closed:
print ( 'close b' )
return
state.wait_for_a()
print ( 'b' )
time.sleep( 2 )
state.set_current_is_a(false)
if __name__ = = '__main__' :
class state( object ):
"""state used to coordinate multiple(two here) threads"""
def __init__( self ):
self .condition = threading.condition()
self .current_is_a = false
self .closed = false
def wait_for_a( self ):
with self .condition:
while not self .current_is_a:
self .condition.wait()
def wait_for_b( self ):
with self .condition:
while self .current_is_a:
self .condition.wait()
def set_current_is_a( self , flag):
self .current_is_a = flag
with self .condition:
self .condition.notify_all()
state = state()
t1 = threading.thread(target = functools.partial(print_a, state))
t2 = threading.thread(target = functools.partial(print_b, state))
try :
t1.start()
t2.start()
t1.join()
t2.join()
except keyboardinterrupt:
state.closed = true
print ( 'closed' )
|
可以看到有两种类型的任务,一个用于打印字符a,一个用于打印字符b,我们的实现种让a先于b打印,所以在print_a中,先打印a,再设置当前字符状态并释放等待列表中的所有锁(set_current_is_a),如果没有这一步,current_is_a将一直是false,wait_for_b能够返回,而wait_for_a却永远不会返回,最终效果就是每隔两秒就打印一个字符a,而b永远不会打印。另一个副作用是如果wait_for_a永远不会返回,那print_b所在线程的关闭逻辑也就无法执行,最终会成为僵尸线程(这里的关闭逻辑只用作示例,生产环境需要更加完善的关闭机制)。
考虑另一种情况,print_a种将set_current_is_a和wait_for_b交换一下位置会怎么样。从观察到的输出我们看到,程序首先输出了一个字符a,以后,每隔2秒钟,就会同时输出a和b,而不是交替输出。原因在于,由于current_is_a还是false,我们先调用的wait_for_b其会立即返回,之后调用set_current_is_a,将current_is_a设置为true,并释放所有的阻塞wait的锁(notify_all),这个过程中没有阻塞,print_a紧接着进入了下一个打印循环;与此同时,print_b中的wait_for_a也返回了,进入到b的打印循环,故最终我们看到a和b总是一起打印。
可见对于threading.condition的使用需要多加小心,要注意逻辑上的严谨性。
附一个队列版本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
import threading
import functools
import time
from queue import queue
def print_a(q_a, q_b):
while true:
char_a = q_a.get()
if char_a = = 'closed' :
return
print (char_a)
time.sleep( 2 )
q_b.put( 'b' )
def print_b(q_a, q_b):
while true:
char_b = q_b.get()
if char_b = = 'closed' :
return
print (char_b)
time.sleep( 2 )
q_a.put( 'a' )
if __name__ = = '__main__' :
q_a = queue()
q_b = queue()
t1 = threading.thread(target = functools.partial(print_a, q_a, q_b))
t2 = threading.thread(target = functools.partial(print_b, q_a, q_b))
try :
t1.start()
t2.start()
q_a.put( 'a' )
t1.join()
t2.join()
except keyboardinterrupt:
q_a.put( 'closed' )
q_b.put( 'closed' )
print ( 'done' )
|
队列版本逻辑更清晰,更不容易出错,实际应用中应该选用队列。
附一个协程版本(python 3.5+):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import time
import asyncio
async def print_a():
while true:
print ( 'a' )
time.sleep( 2 ) # simulate the cpu block time
await asyncio.sleep( 0 ) # release control to event loop
async def print_b():
while true:
print ( 'b' )
time.sleep( 2 ) # simulate the cpu block time
await asyncio.sleep( 0 ) # release control to event loop
async def main():
await asyncio.wait([print_a(), print_b()])
if __name__ = = '__main__' :
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
|
协程的运行需要依附于一个事件循环(select/poll/epoll/kqueue),通过async def将一个函数定义为协程,通过await主动让渡控制权,通过相互让渡控制权完成交替打印字符。整个程序运行于一个线程中,这样就没有线程间协调的工作,仅仅是控制权的让渡逻辑。对于io密集型操作,而没有明显的cpu阻塞(计算复杂,以致出现明显的延时,比如复杂加解密算法)的情况下非常合适。
附一个java版本:
printmain类,用于管理和协调打印a和打印b的两个线程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package com.cuttyfox.tests. self .version1;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.timeunit;
public class printmain {
private boolean currentisa = false;
public synchronized void waitingforprintinga() throws interruptedexception {
while (this.currentisa = = false) {
wait();
}
}
public synchronized void waitingforprintingb() throws interruptedexception {
while (this.currentisa = = true) {
wait();
}
}
public synchronized void setcurrentisa(boolean flag) {
this.currentisa = flag;
notifyall();
}
public static void main(string[] args) throws exception {
printmain state = new printmain();
executorservice executorservice = executors.newcachedthreadpool();
executorservice.execute(new printb(state));
executorservice.execute(new printa(state));
executorservice.shutdown();
executorservice.awaittermination( 10 , timeunit.seconds);
system.out.println( "done" );
system.exit( 0 );
}
}
|
打印a的线程(首先打印a):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
package com.cuttyfox.tests. self .version1;
import java.util.concurrent.timeunit;
public class printa implements runnable{
private printmain state;
public printa(printmain state) {
this.state = state;
}
public void run() {
try {
while (!thread.interrupted()){
system.out.println( "print a" );
timeunit.seconds.sleep( 1 );
this.state.setcurrentisa(true);
this.state.waitingforprintingb();
}
} catch (interruptedexception e) {
system.out.println( "exit through interrupting." );
}
}
}
|
打印b的线程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
package com.cuttyfox.tests. self .version1;
import java.util.concurrent.timeunit;
public class printb implements runnable{
private printmain state;
public printb(printmain state) {
this.state = state;
}
public void run() {
try {
while (!thread.interrupted()) {
this.state.waitingforprintinga();
system.out.println( "print b" );
timeunit.seconds.sleep( 1 );
this.state.setcurrentisa(false);
}
} catch (interruptedexception e) {
system.out.println( "exit through interrupting." );
}
}
}
|
java对象本身有对象锁,故这里没有像python中那样需要显式通过创建一个condition对象来得到一把锁。
使用python实现交替打印abcdef的过程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
import threading
import time
import functools
from collections import deque
letters = [ chr (code) for code in range ( 97 , 97 + 6 )]
length = len (letters)
class state( object ):
def __init__( self ):
self .condition = threading.condition()
self .index_value = 0
def set_next_index( self , index):
with self .condition:
self .index_value = index
self .condition.notify_all()
def wait_for( self , index_value):
with self .condition:
while not self .index_value = = index_value:
self .condition.wait()
def print_letter(state: state, wait_ident: int ):
print ( 'got: {}!' . format (wait_ident))
while true:
state.wait_for(wait_ident)
time.sleep( 2 )
print (letters[state.index_value])
print ( 'print: {} and set next: {}' . format (state.index_value,
(state.index_value + 1 ) % length
))
state.set_next_index((state.index_value + 1 ) % length)
state = state()
d = deque()
d.extend( range (length))
d.rotate( 1 )
print (d)
threads = []
for wait_ident in d:
t = threading.thread(target = functools.partial(print_letter, state, wait_ident))
threads.append(t)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/cnweike/article/details/80939225