【原创】同声传译,llm tts ars 通通流式输出

时间:2024-10-14 20:15:24

 基于Sambanova API  实现流式同声传译功能

效果基于大模型 TTS流式输出实现同声传译_哔哩哔哩_bilibili

import asyncio
import aiohttp
import websockets
import json
import io
from pydub import AudioSegment
from pydub.playback import play
import logging
from concurrent.futures import ThreadPoolExecutor

# 配置日志记录器
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

style_id = "ICL_7****296"
api_endpoint = 'https://api.sambanova.ai/v1/chat/completions'
api_key = '****************'
model = 'Meta-Llama-3.2-3B-Instruct'

# 调用 Sambanova API 获取流式响应
async def get_sambanova_response(prompt):
    headers = {
        'Authorization': f'Bearer {api_key}',
        'Content-Type': 'application/json',
    }
    data = {
        'model': model,
        'messages': [
            {"role": "system", "content": "你是一个同声传译的翻译官,请将中文翻译成英文,将英文翻译成中文。不要输出其他信息"},
            {"role": "user", "content": prompt}
        ],
        'prompt': prompt,
        'stream': True
    }
    translated_text = ""
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(api_endpoint, headers=headers, json=data) as response:
                async for line in response.content:
                    if line:
                        line = line.decode('utf-8').strip()
                        if line.startswith("data: "):
                            line = line[len("data: "):]
                            if line == "[DONE]":
                                break
                            try:
                                chunk = json.loads(line)
                                if "choices" in chunk and "delta" in chunk["choices"][0] and "content" in chunk["choices"][0]["delta"]:
                                    part = chunk["choices"][0]["delta"]["content"]
                                    print(part, end="", flush=True)
                                    translated_text += part
                            except json.JSONDecodeError:
                                logging.error(f"Failed to decode JSON: {line}")
    except Exception as e:
        logging.error(f"Error while processing response: {e}")
    return translated_text

# WebSocket 服务器地址
wss_url = f"wss://wss.*****.com/audio/tts"

# 自定义请求头
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 SamanthaDoubao/1.25.3",
    "Origin": "chrome-extension://obkci***********cldeggd",
    "Sec-WebSocket-Key": "V1******************4w==",
    "Sec-WebSocket-Extensions": "permessage-deflate; client_max_window_bits",
    "Cookie": "***************************"
}

# 播放 AAC 音频流(同步函数)
def play_audio_stream(aac_data):
    audio = AudioSegment.from_file(io.BytesIO(aac_data), format="aac")
    play(audio)

# 使用线程池异步播放音频,避免阻塞事件循环
async def play_audio_async(aac_data, executor):
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(executor, play_audio_stream, aac_data)

# 处理音频播放任务,尽可能减少等待时间
async def audio_player(queue: asyncio.Queue, executor: ThreadPoolExecutor):
    while True:
        aac_data = await queue.get()
        if aac_data is None:
            break
        try:
            logging.info("播放音频数据...")
            await play_audio_async(aac_data, executor)
        except Exception as e:
            logging.error(f"音频播放过程中发生错误: {e}")

# 连接并接收音频数据
async def listen_and_receive(queue: asyncio.Queue, translated_text):
    try:
        async with websockets.connect(wss_url, extra_headers=headers) as websocket:
            text_data = {
                "event": "text",
                "text": translated_text
            }
            finish_data = {
                "event": "finish"
            }

            await websocket.send(json.dumps(text_data))
            await websocket.send(json.dumps(finish_data))
            
            current_audio_data = bytearray()
            
            while True:
                try:
                    message = await websocket.recv()

                    if isinstance(message, str):
                        try:
                            message_json = json.loads(message)
                            logging.info(f'Received message: {message_json}')
                        except json.JSONDecodeError:
                            continue

                        if message_json.get("event") == "sentence_start":
                            logging.info(f"开始新的一段: {message_json['sentence_start_result']['readable_text']}")
                            current_audio_data = bytearray()

                        elif message_json.get("event") == "sentence_end":
                            if current_audio_data:
                                await queue.put(current_audio_data)

                        elif message_json.get("event") == "finish":
                            logging.info("所有段落接收完毕")
                            break

                    elif isinstance(message, bytes):
                        current_audio_data.extend(message)

                except websockets.ConnectionClosedOK:
                    logging.info("WebSocket 连接正常关闭")
                    break
                except websockets.ConnectionClosedError as e:
                    logging.error(f"WebSocket 连接意外关闭: {e}")
                    break

            await websocket.send(json.dumps(finish_data))

    except Exception as e:
        logging.error(f"发生错误: {e}")

async def main():
    prompt = """October 12, a city railway guard road joint defense and stability and railway line safety environment governance conference was held at the city administrative conference center.October 12, a city railway guard road joint defense and stability and railway line safety environment governance conference was held at the city administrative conference center。"""  
    translated_text = await get_sambanova_response(prompt)
    print(translated_text)
    audio_queue = asyncio.Queue()
    executor = ThreadPoolExecutor()

    player_task = asyncio.create_task(audio_player(audio_queue, executor))
    await listen_and_receive(audio_queue, translated_text)
    await audio_queue.put(None)
    await player_task
    executor.shutdown()

# 运行 WebSocket 客户端
asyncio.run(main())