多进程测试mlflow 服务

时间:2024-12-23 10:31:32
  1. 导入必要的模块:你需要确保导入了 multiprocessing 模块中的 Process 类以及其他任何所需的依赖项。
  2. 定义目标函数post_server 函数必须被正确定义,并且能够接受传入的参数。
  3. 启动进程:在创建了 Process 实例之后,需要调用 .start() 方法来启动它们;否则,这些进程不会执行任何任务。
  4. 保护入口点:为了防止子进程中再次尝试创建新的子进程(这可能会导致递归地创建更多不必要的进程),应该将所有与多进程相关的代码放在 if __name__ == '__main__': 下面。
  5. 处理可能的异常情况:考虑加入异常处理机制,以便更好地管理可能出现的问题。
  6. 优化资源利用:如果你计划并发地发起大量的请求,考虑使用进程池(Pool)来限制同时运行的进程数量,从而避免系统资源耗尽。

以下是经过修正后的代码示例:

import multiprocessing as mp
from multiprocessing import Process
import json

def post_server(json_data):
    try:
        # 假设这里有一个函数可以处理传入的json_data并返回结果
        print(f"Processing data in process {mp.current_process().name}")
        # 模拟网络请求或其他耗时操作
        # response = requests.post(url, headers=headers, data=json.dumps(json_data))
        # print(response.status_code)
    except Exception as e:
        print(f"Error occurred in process {mp.current_process().name}: {e}")

if __name__ == '__main__':
    # 示例 JSON 数据
    json_data = {"key": "value"}

    p_list = []
    for i in range(10):
        p = Process(target=post_server, args=(json_data,))
        p_list.append(p)
        p.start()  # 启动进程

    for p in p_list:
        p.join()  # 等待所有进程完成

    print("All processes have finished.")

关键改进点说明

  • 启动进程:添加了 p.start() 调用来启动每个进程。如果不显式调用此方法,则进程对象仅会被创建但不会开始执行。
  • 保护入口点:将多进程逻辑包裹在 if __name__ == '__main__': 条件语句中,以确保当脚本作为主程序运行时才创建和启动子进程,而不会在导入该模块时意外触发。
  • 错误处理:增加了对潜在异常的基本捕获,这样即使某个进程中出现了问题,其他进程仍然可以继续工作。
  • 日志输出:为 post_server 函数添加了一些简单的打印语句,用于调试目的,显示当前正在处理数据的进程名称。

此外,如果你希望更高效地管理和控制并发任务的数量,可以考虑使用 multiprocessing.Pool 或者 concurrent.futures.ProcessPoolExecutor 来代替直接管理多个 Process 对象。例如:

from concurrent.futures import ProcessPoolExecutor

if __name__ == '__main__':
    json_data = {"key": "value"}
    
    with ProcessPoolExecutor(max_workers=5) as executor:  # 控制最大并发数为5
        futures = [executor.submit(post_server, json_data) for _ in range(10)]
        
        for future in futures:
            try:
                future.result()  # 获取结果或等待完成
            except Exception as exc:
                print(f'Generated an exception: {exc}')
    
    print("All processes have finished.")

这种方法不仅简化了代码结构,而且还能自动处理一些常见的边界条件,比如工作进程的回收、超时管理等。通过这种方式,你可以更容易地实现并行任务的同时,也减少了手动编写复杂同步逻辑的需求。