将获取pose 服务拆分为两个服务-同时并行

时间:2024-07-14 07:25:23

要让get_pose函数同时异步请求多个get_pose_data函数,你可以使用asyncio.gather来并发运行多个协程。以下是如何修改get_pose函数以并发执行多个get_pose_data请求的示例:

from fastapi import FastAPI, HTTPException
import cv2
import numpy as np
import asyncio
import httpx
app = FastAPI()
# 初始化摄像头
cap = cv2.VideoCapture("q888.mp4")
# 异步函数,用于获取姿态数据
async def get_pose_data(image_data: bytes) -> Any:
    async with httpx.AsyncClient() as client:
        response = await client.post("http://poseestimationservice:8002/pose", json={"image_data": image_data})
        if response.status_code == 200:
            return response.json()
        else:
            raise HTTPException(status_code=500, detail="Failed to get pose data from PoseEstimationService")
@app.get("/poses", response_model=list)
async def get_poses():
    # 获取多个图像数据
    image_data_list = []
    for _ in range(5):  # 假设我们同时处理5个图像
        image_data = get_image()
        image_data_list.append(image_data)
    # 使用 asyncio.gather 并发执行多个 get_pose_data 协程
    pose_data_tasks = [get_pose_data(image_data) for image_data in image_data_list]
    pose_data_list = await asyncio.gather(*pose_data_tasks)
    return pose_data_list
def get_image():
    # 从摄像头读取一帧
    success, image = cap.read()
    if not success:
        raise HTTPException(status_code=500, detail="Failed to capture image")
    # 将图像转换为JPEG格式以进行传输
    ret, buffer = cv2.imencode('.jpg', image)
    image_data = buffer.tobytes()
    return image_data
if __name__ == "__main__":
    # 运行服务,这里只是示例,实际部署时需要根据实际情况配置
    uvicorn.run(app, host="0.0.0.0", port=8001)

在这个示例中,get_poses函数现在获取多个图像数据(这里假设5个),并为每个图像数据创建一个get_pose_data协程。然后,使用asyncio.gather来并发执行这些协程。asyncio.gather将等待所有协程完成,并返回它们的返回值列表。
这样,get_poses函数将同时异步地请求多个get_pose_data,并且当所有请求都完成后,将返回一个包含所有结果的列表。
请注意,get_image函数是同步的,因此在这个例子中,我们连续调用它5次来获取5个图像。在实际应用中,你可能希望以某种异步方式获取图像,以避免阻塞事件循环。