前言
虽然CPython因为GIL的存在导致多线程的并发计算能力大打折扣,但是在i/o密集型的场景时,使用多线程还是能带来效率上的飞跃。近期在使用多线程时遇到了主线程无法捕获子线程抛出的异常问题,这里记录一下解决的办法。
需求
将某一指定目录下所有的文件(包含子目录下的文件)中所有被$[]$
字符包裹起来的变量替换成指定的值,这是一个典型的io密集的场景,因此考虑使用多线程提升效率
原demo
def main():
conf_map = {'DB_HOST': "X.X.X.X",'DB_USER': "root",'DB_PASSWD': 'abcd1234'}
conf_files=['/etc/abc/', '/var/abc']
thpool = ThreadPoolExecutor(5)
for file in conf_files:
thpool.submit(replace_config, file, conf_map)
thpool.shutdown(wait=True)
def replace_config(file, conf_map, tmp_conf_path):
with open(file, 'r') as f:
content = f.read()
# 需替换的变量的样式为: $[PASSWORD]$
wrapper_pattern = re.compile('\$\[([\w-]+)\]\$')
var_list = wrapper_pattern.findall(content)
for var in var_list:
try:
value = conf_map[var]
wrapper = "$[%s]$" % var
content = content.replace(wrapper, value)
except KeyError:
print('key error')
os.system("mv {} /tmp".format(tmp_conf_path))
raise Exception('[%s]文件中存在未知的key: %s' % (file, var))
with open(file, 'w') as f:
f.write(content)
demo内直接使用python3.2版本以后引入的ThreadPoolExecutor库使用多线程,在子线程无异常时是正常运行的,但是在子线程出现异常时(比如子线程的内部逻辑里发现了不存在key时, except KeyError会捕获到异常),但是你会发现子线程异常终止了,主线程但是却没有异常抛出。
经过一番搜索,在python官方手册中了解到了原因:
参考官方文档链接:
https://docs.python.org/zh-cn/3/library/_thread.html
原因即为:
使用start()方法启动子线程时,会为子线程建立独立的堆栈空间,当线程异常中止时,会自行退出而不会将此异常raise到主线程。那么得知了原因,就可以找到解决的办法了。思路是继承标准库的Thread类,进行一些小的改写封装。
修改后的demo
class ReplaceThread(Thread):
def __init__(self, file, conf_map):
super(ReplaceThread, self).__init__()
self.file = file
self.conf_map = conf_map
self.exitcode = 0
self.exception = None
def run(self):
try:
self._run()
except Exception as e:
# 如果捕获到异常,返回值改为不等于0
self.exitcode = 1
self.exception = e
def _run(self):
with open(self.file, 'r') as f:
content = f.read()
# 需替换的变量的样式为: $[PASSWORD]$
wrapper_pattern = re.compile('\$\[([\w-]+)\]\$')
var_list = wrapper_pattern.findall(content)
for var in var_list:
try:
value = self.conf_map[var]
wrapper = "$[%s]$" % var
content = content.replace(wrapper, value)
except KeyError:
raise Exception('[%s]文件中存在未知的key: %s‘ % (self.file, var))
with open(self.file, 'w') as f:
f.write(content)
# 主线程无法直接捕获子线程内的异常,因此自定义了Thread类,在子线程内定义其出现异常时的返回值,在主线程内根据返回值
# 来判断是否出现异常,并进行下一步操作
def main():
conf_files=['/etc/abc/', '/var/abc']
conf_map = {'DB_HOST': "X.X.X.X",'DB_USER': "root",'DB_PASSWD': 'abcd1234'}
t_objs = []
for file in conf_files:
t = ReplaceThread(file=file, conf_map=conf_map)
t.start()
t_objs.append(t)
for t in t_objs:
t.join()
if t.exitcode != 0:
os.system("mv {} /tmp".format(tmp_conf_path))
raise Exception(t.exception)
如此这般,在主线程里通过自定义的子线程返回值来判断子线程是否有异常,如果子线程有异常则主线程接替抛出子线程里的异常。这里另外还要注意,子线程的join()操作要放到start()操作全部完成了之后再进行,避免主线程被子线程阻塞,这样就变成了串行执行多线程就失去了意义了。
总结
子线程异常处理问题由此就得以解决。遇到问题多查官方文档