1-1. 代码讲解
下面的代码展示了如何在 Python 中使用多线程来处理数据库查询,同时加入了超时处理机制。代码的主要作用是在给定的时间内尝试执行 SQL 查询,并处理可能出现的超时或异常情况。
-
检查连接:首先检查数据库连接
conn
是否存在。如果连接有效,那么继续执行后续操作。 -
创建结果队列:使用
queue.Queue()
创建一个队列result_queue
,用于存放查询结果。这是因为多线程环境下,线程之间通信需要通过线程安全的队列进行。 -
启动查询线程:创建并启动一个线程
query_thread
,该线程运行run_query
函数执行 SQL 查询,并将结果放入result_queue
中。传递给线程的参数包括 SQL 查询字符串sql
和结果队列result_queue
。 -
设置超时时间:设定一个超时时间
timeout
,此例中为 10 秒。这意味着如果查询在 10 秒内没有返回结果,将会处理为超时。 -
尝试获取查询结果:
- 使用
result_queue.get(timeout=timeout)
尝试在超时时间内获取查询结果。 - 如果在超时时间内成功获取到结果,则进一步检查结果是否为异常。如果是异常(比如查询出错),则打印错误信息并返回异常信息;如果不是异常,则直接返回查询结果。
- 使用
-
处理超时情况:如果在指定的超时时间内没有获取到结果(
queue.Empty
异常被触发),则打印并返回“查询超时”信息。此时,你可以选择取消查询或采取其他措施。需要注意的是,停止正在运行的线程或查询可能很复杂,需要根据应用程序的具体需求来特别处理。 -
等待线程结束:无论查询是否超时,使用
query_thread.join()
确保查询线程正确结束,这是良好的线程同步做法。 -
关闭数据库连接:最后,不要忘记在所有操作完成后关闭数据库连接
conn.close()
,以释放资源。
1-2. 示例代码
def run_sql_oracle(sql: str) -> Union[pd.DataFrame, str, None]:
if conn:
# 在这里创建一个队列用来存储查询结果
result_queue = queue.Queue()
query_thread = threading.Thread(target=run_query, args=(sql, result_queue))
query_thread.start()
# 为查询设置一个超时限制
timeout = 15 # 15秒
try:
# 尝试在超时时间内获取结果
results = result_queue.get(timeout=timeout)
# 检查结果是否为异常,如果是则进行相应处理
if isinstance(results, Exception):
print(f"查询失败: {results}")
return f"{results}"
else:
return results
except queue.Empty:
print("查询超时。")
return "查询超时。"
# 在这一点上,你可以尝试取消查询或采取其他行动。
# 注意:停止一个正在运行的线程或查询可能比较复杂,可能需要根据你的应用需求进行特定的处理。
finally:
query_thread.join()
# 不要忘记关闭你的连接
conn.close()