基于Sambanova API 实现流式同声传译功能
效果基于大模型 TTS流式输出实现同声传译_哔哩哔哩_bilibili
import asyncio
import aiohttp
import websockets
import json
import io
from pydub import AudioSegment
from pydub.playback import play
import logging
from concurrent.futures import ThreadPoolExecutor
# 配置日志记录器
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
style_id = "ICL_7****296"
api_endpoint = 'https://api.sambanova.ai/v1/chat/completions'
api_key = '****************'
model = 'Meta-Llama-3.2-3B-Instruct'
# 调用 Sambanova API 获取流式响应
async def get_sambanova_response(prompt):
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
}
data = {
'model': model,
'messages': [
{"role": "system", "content": "你是一个同声传译的翻译官,请将中文翻译成英文,将英文翻译成中文。不要输出其他信息"},
{"role": "user", "content": prompt}
],
'prompt': prompt,
'stream': True
}
translated_text = ""
try:
async with aiohttp.ClientSession() as session:
async with session.post(api_endpoint, headers=headers, json=data) as response:
async for line in response.content:
if line:
line = line.decode('utf-8').strip()
if line.startswith("data: "):
line = line[len("data: "):]
if line == "[DONE]":
break
try:
chunk = json.loads(line)
if "choices" in chunk and "delta" in chunk["choices"][0] and "content" in chunk["choices"][0]["delta"]:
part = chunk["choices"][0]["delta"]["content"]
print(part, end="", flush=True)
translated_text += part
except json.JSONDecodeError:
logging.error(f"Failed to decode JSON: {line}")
except Exception as e:
logging.error(f"Error while processing response: {e}")
return translated_text
# WebSocket 服务器地址
wss_url = f"wss://wss.*****.com/audio/tts"
# 自定义请求头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 SamanthaDoubao/1.25.3",
"Origin": "chrome-extension://obkci***********cldeggd",
"Sec-WebSocket-Key": "V1******************4w==",
"Sec-WebSocket-Extensions": "permessage-deflate; client_max_window_bits",
"Cookie": "***************************"
}
# 播放 AAC 音频流(同步函数)
def play_audio_stream(aac_data):
audio = AudioSegment.from_file(io.BytesIO(aac_data), format="aac")
play(audio)
# 使用线程池异步播放音频,避免阻塞事件循环
async def play_audio_async(aac_data, executor):
loop = asyncio.get_event_loop()
await loop.run_in_executor(executor, play_audio_stream, aac_data)
# 处理音频播放任务,尽可能减少等待时间
async def audio_player(queue: asyncio.Queue, executor: ThreadPoolExecutor):
while True:
aac_data = await queue.get()
if aac_data is None:
break
try:
logging.info("播放音频数据...")
await play_audio_async(aac_data, executor)
except Exception as e:
logging.error(f"音频播放过程中发生错误: {e}")
# 连接并接收音频数据
async def listen_and_receive(queue: asyncio.Queue, translated_text):
try:
async with websockets.connect(wss_url, extra_headers=headers) as websocket:
text_data = {
"event": "text",
"text": translated_text
}
finish_data = {
"event": "finish"
}
await websocket.send(json.dumps(text_data))
await websocket.send(json.dumps(finish_data))
current_audio_data = bytearray()
while True:
try:
message = await websocket.recv()
if isinstance(message, str):
try:
message_json = json.loads(message)
logging.info(f'Received message: {message_json}')
except json.JSONDecodeError:
continue
if message_json.get("event") == "sentence_start":
logging.info(f"开始新的一段: {message_json['sentence_start_result']['readable_text']}")
current_audio_data = bytearray()
elif message_json.get("event") == "sentence_end":
if current_audio_data:
await queue.put(current_audio_data)
elif message_json.get("event") == "finish":
logging.info("所有段落接收完毕")
break
elif isinstance(message, bytes):
current_audio_data.extend(message)
except websockets.ConnectionClosedOK:
logging.info("WebSocket 连接正常关闭")
break
except websockets.ConnectionClosedError as e:
logging.error(f"WebSocket 连接意外关闭: {e}")
break
await websocket.send(json.dumps(finish_data))
except Exception as e:
logging.error(f"发生错误: {e}")
async def main():
prompt = """October 12, a city railway guard road joint defense and stability and railway line safety environment governance conference was held at the city administrative conference center.October 12, a city railway guard road joint defense and stability and railway line safety environment governance conference was held at the city administrative conference center。"""
translated_text = await get_sambanova_response(prompt)
print(translated_text)
audio_queue = asyncio.Queue()
executor = ThreadPoolExecutor()
player_task = asyncio.create_task(audio_player(audio_queue, executor))
await listen_and_receive(audio_queue, translated_text)
await audio_queue.put(None)
await player_task
executor.shutdown()
# 运行 WebSocket 客户端
asyncio.run(main())