Python子线程异常捕获

时间:2024-03-16 13:58:14

前言

虽然CPython因为GIL的存在导致多线程的并发计算能力大打折扣,但是在i/o密集型的场景时,使用多线程还是能带来效率上的飞跃。近期在使用多线程时遇到了主线程无法捕获子线程抛出的异常问题,这里记录一下解决的办法。

需求
将某一指定目录下所有的文件(包含子目录下的文件)中所有被$[]$字符包裹起来的变量替换成指定的值,这是一个典型的io密集的场景,因此考虑使用多线程提升效率

原demo

def main():
      conf_map = {'DB_HOST': "X.X.X.X",'DB_USER': "root",'DB_PASSWD': 'abcd1234'}
      conf_files=['/etc/abc/', '/var/abc']
      thpool = ThreadPoolExecutor(5)
      for file in conf_files:
          thpool.submit(replace_config, file, conf_map)
      thpool.shutdown(wait=True)

def replace_config(file, conf_map, tmp_conf_path):
    with open(file, 'r') as f:
        content = f.read()

    # 需替换的变量的样式为: $[PASSWORD]$
    wrapper_pattern = re.compile('\$\[([\w-]+)\]\$')
    var_list = wrapper_pattern.findall(content)

    for var in var_list:
        try:
            value = conf_map[var]
            wrapper = "$[%s]$" % var
            content = content.replace(wrapper, value)
        except KeyError:
            print('key error')
            os.system("mv {} /tmp".format(tmp_conf_path))
            raise Exception('[%s]文件中存在未知的key: %s' % (file, var))

    with open(file, 'w') as f:
        f.write(content)    

demo内直接使用python3.2版本以后引入的ThreadPoolExecutor库使用多线程,在子线程无异常时是正常运行的,但是在子线程出现异常时(比如子线程的内部逻辑里发现了不存在key时, except KeyError会捕获到异常),但是你会发现子线程异常终止了,主线程但是却没有异常抛出。

经过一番搜索,在python官方手册中了解到了原因:
Python子线程异常捕获

参考官方文档链接:
https://docs.python.org/zh-cn/3/library/_thread.html

原因即为:

使用start()方法启动子线程时,会为子线程建立独立的堆栈空间,当线程异常中止时,会自行退出而不会将此异常raise到主线程。那么得知了原因,就可以找到解决的办法了。思路是继承标准库的Thread类,进行一些小的改写封装。

修改后的demo

class ReplaceThread(Thread):
    def __init__(self, file, conf_map):
        super(ReplaceThread, self).__init__()
        self.file = file
        self.conf_map = conf_map

        self.exitcode = 0
        self.exception = None

    def run(self):
        try:
            self._run()
        except Exception as e:
            # 如果捕获到异常,返回值改为不等于0
            self.exitcode = 1
            self.exception = e

    def _run(self):
        with open(self.file, 'r') as f:
            content = f.read()

        # 需替换的变量的样式为: $[PASSWORD]$
        wrapper_pattern = re.compile('\$\[([\w-]+)\]\$')
        var_list = wrapper_pattern.findall(content)

        for var in var_list:
            try:
                value = self.conf_map[var]
                wrapper = "$[%s]$" % var
                content = content.replace(wrapper, value)
            except KeyError:
                raise Exception('[%s]文件中存在未知的key: %s‘ % (self.file, var))

        with open(self.file, 'w') as f:
            f.write(content)
            
# 主线程无法直接捕获子线程内的异常,因此自定义了Thread类,在子线程内定义其出现异常时的返回值,在主线程内根据返回值
# 来判断是否出现异常,并进行下一步操作
 def main():
    conf_files=['/etc/abc/', '/var/abc']
    conf_map = {'DB_HOST': "X.X.X.X",'DB_USER': "root",'DB_PASSWD': 'abcd1234'}
    t_objs = []
    for file in conf_files:
        t = ReplaceThread(file=file, conf_map=conf_map)
        t.start()
        t_objs.append(t)
	
    for t in t_objs:
        t.join()
        if t.exitcode != 0:
            os.system("mv {} /tmp".format(tmp_conf_path))
            raise Exception(t.exception)  

如此这般,在主线程里通过自定义的子线程返回值来判断子线程是否有异常,如果子线程有异常则主线程接替抛出子线程里的异常。这里另外还要注意,子线程的join()操作要放到start()操作全部完成了之后再进行,避免主线程被子线程阻塞,这样就变成了串行执行多线程就失去了意义了。

总结

子线程异常处理问题由此就得以解决。遇到问题多查官方文档