前言:
不要试图用强制方法杀掉一个python线程,这从服务设计上就存在不合理性。 多线程本用来任务的协作并发,如果你使用强制手段干掉线程,那么很大几率出现意想不到的bug。 请记住一点,锁资源不会因为线程退出而释放锁资源 !
我们可以举出两个常见的例子:
1. 有个A线程拿到了锁,因为他是被强制干掉的,没能及时的release()释放锁资源,那么导致所有的线程获取资源是都被阻塞下去,这就是典型的死锁场景。
2.在常见的生产消费者的场景下,消费者从任务队列获取任务,但是被干掉后没有把正在做的任务丢回队列中,那么这就造成了数据丢失。
下面是java和python终止线程的方法:
java有三种方法可以使终止线程:
1. 使用退出标志,使线程正常退出,也就是当run方法完成后线程终止。
2. 使用stop方法强行终止线程(不推荐使用,因为stop和suspend、resume一样,也可能发生不可预料的结果)。
3. 使用interrupt方法中断线程。
python可以有两种方法:
1. 退出标记
2. 使用ctypes强行杀掉线程
不管是python还是java环境下,理想的停止退出线程方法是 让线程自个自杀,所谓的线程自杀就是 你给他一个标志位,他退出线程。
下面我们会采用多种方法来测试 停止python线程的异常情况。我们查看一个进程所有的执行线程, 进程是用过掌控资源,线程是用作调度单元,进程要被调度执行必须要有一个线程,默认的线程和进程的pid一样的。
1
2
3
4
5
6
|
ps - mp 31449 - o THREAD,tid
USER % CPU PRI SCNT WCHAN USER SYSTEM TID
root 0.0 - - - - - -
root 0.0 19 - poll_s - - 31449
root 0.0 19 - poll_s - - 31450
|
获取到了进程所有的线程后,通过strace得知 31450 是需要我们kill的线程id,当我们kill的时候,会出现整个进程都崩溃的情况。 在多线程环境下,产生的信号是传递给整个进程的,一般而言,所有线程都有机会收到这个信号,进程在收到信号的的线程上下文执行信号处理函数,具体是哪个线程执行的难以获知。也就是说,信号会随机发个该进程的一个线程。
1
2
3
4
5
6
7
|
strace - p <span style = "font-size:14px;line-height:21px;" > 31450 < / span> Process <span style = "font-size:14px;line-height:21px;" > 31450 < / span> attached - interrupt to quit
select( 0 , NULL, NULL, NULL, { 0 , 320326 }) = 0 (Timeout)
select( 0 , NULL, NULL, NULL, { 1 , 0 }) = 0 (Timeout)
select( 0 , NULL, NULL, NULL, { 1 , 0 }) = 0 (Timeout)
select( 0 , NULL, NULL, NULL, { 1 , 0 }) = ? ERESTARTNOHAND (To be restarted)
- - - SIGTERM (Terminated) @ 0 ( 0 ) - - -
Process <span style = "font-size:14px;line-height:21px;" > 31450 < / span> detached
|
上面出现的问题其实跟pthread的说明是一致的。当我们在python代码里加入 signal 信号处理函数后,回调函数可以防止整个进程的退出,那么问题来了,通过信号函数不能识别你要干掉哪一个线程,也就是说,不能精准的干掉某个线程。你虽然把信号发给31450线程id,但是信号受理人是所属进程的任何一个,另外传给信号处理函数的参数只有信号数和信号stack而已,可有可无的。
加了信号处理后,不会退出进程
1
2
3
4
5
6
|
select( 0 , NULL, NULL, NULL, { 1 , 0 }) = 0 (Timeout)
select( 0 , NULL, NULL, NULL, { 1 , 0 }) = ? ERESTARTNOHAND (To be restarted)
- - - SIGTERM (Terminated) @ 0 ( 0 ) - - -
rt_sigreturn( 0xffffffff ) = - 1 EINTR (Interrupted system call)
select( 0 , NULL, NULL, NULL, { 1 , 0 }) = 0 (Timeout)
select( 0 , NULL, NULL, NULL, { 1 , 0 }) = 0 (Timeout)
|
如果想从外部通知杀掉某个线程,那么可以构建使用rpc服务,或者别的方式通信,signal信号不可以,因为无法无法传递更多的信息。
python的线程不是模拟的,是真实的内核线程,内核调用pthread方法,但Python上层没有提供关闭线程的方法,这就需要我们自己把握了。强烈推荐使用 event 或者 自定义标志位的方法, 如果非要强制杀掉线程,那么可以用python ctypes PyThreadState SetAsyncExc 方法强制退出,这样对于运行的python服务没有什么影响。
该函数的实现原理比较简单,其实也是在python虚拟机里做个标示位,然后由虚拟机运行一个异常来取消线程,虚拟机会帮你做好try cache。 切记不要在外部杀掉python的某个线程,虽然你能通过ctypes找到线程id,但是你直接kill会干掉整个进程的。
下面的代码是 用ctypes 杀掉线程的样例,不推荐使用,因为太粗暴了.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import ctypes
def terminate_thread(thread):
if not thread.isAlive():
return
exc = ctypes.py_object(SystemExit)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), exc)
if res = = 0 :
raise ValueError( "nonexistent thread id" )
elif res > 1 :
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None )
raise SystemError( "PyThreadState_SetAsyncExc failed" )
|
咱们简单look一下PyThreadState源代码,总而言之触发线程的异常模式。 有兴趣的人可以阅读 python pystate.c 的设计,配合着youtube的一些视频分享。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
int
PyThreadState_SetAsyncExc( long id , PyObject * exc) {
PyInterpreterState * interp = GET_INTERP_STATE();
...
HEAD_LOCK();
for (p = interp - >tstate_head; p ! = NULL; p = p - > next ) {
if (p - >thread_id = = id ) {
从链表里找到线程的 id ,避免死锁,我们需要释放head_mutex。
PyObject * old_exc = p - >async_exc;
Py_XINCREF(exc); #增加该对象的引用数
p - >async_exc = exc; # 更为exc模式
HEAD_UNLOCK();
Py_XDECREF(old_exc); # 因为要取消,当然也就递减引用
...
return 1 ; #销毁线程成功
}
}
HEAD_UNLOCK();
return 0 ;
}
|
原生posix pthread 可以使用 ptread_cancel(tid) 在主线程中结束子线程。但是 Python 的线程库不支持这样做,理由是我们不应该强制地结束一个线程,这样会带来很多隐患,应该让该线程自己结束自己。所以在 Python 中,推荐的方法是在子线程中循环判断一个标志位,在主线程中改变该标志位,子线程读到标志位改变,就结束自己。
类似这个逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
def consumer_threading():
t1_stop = threading.Event()
t1 = threading.Thread(target = thread1, args = ( 1 , t1_stop))
t2_stop = threading.Event()
t2 = threading.Thread(target = thread2, args = ( 2 , t2_stop))
time.sleep(duration)
#stop the thread2
t2_stop. set ()
def thread1(arg1, stop_event):
while ( not stop_event.is_set()):
#similar to time.sleep()
stop_event.wait(time)
pass
def thread2(arg1, stop_event):
while ( not stop_event.is_set()):
stop_event.wait(time)
pass
|
简单的总结,虽然我们可以用ctypes里的pystats来控制线程,但这种粗暴中断线程的方法是不合理的。 请选用 自杀模式 !如果你的线程正在发生io阻塞,而不能判断事件怎么办? 你的程序需要做优化了,最少在网络io层需要有主动的timeout,避免一直的阻塞下去。