如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import threading
import time
import inspect
import ctypes
def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type (exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res = = 0 :
raise ValueError( "invalid thread id" )
elif res ! = 1 :
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None )
raise SystemError( "PyThreadState_SetAsyncExc failed" )
def stop_thread(thread):
_async_raise(thread.ident, SystemExit)
class TestThread(threading.Thread):
def run( self ):
print
"begin"
while True :
time.sleep( 0.1 )
print ( 'end' )
if __name__ = = "__main__" :
t = TestThread()
t.start()
time.sleep( 1 )
stop_thread(t)
print ( 'stoped' )
|
以上这篇在python中实现强制关闭线程的示例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/tian544556/article/details/79047183