import asyncio
import aiohttp
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer
async def send_sdp(e_sdp):
url = "https://xxxxx:xxxx/rtc/v1/whip/?app=live&stream=livestream"
async with aiohttp.ClientSession() as session:
async with session.post(
url,
data=e_sdp.sdp.encode(), # 将 SDP 字符串编码为字节
headers={
"Content-Type": "application/sdp",
"Content-Length": str(len(e_sdp.sdp))
},
ssl=False # 忽略 SSL 证书验证(不推荐在生产环境中使用)
) as response:
response_data = await response.text()
print("对方的SDP:", response_data)
return RTCSessionDescription(sdp=response_data, type='answer')
async def send_candidate(candidate):
if candidate:
print("收集到的候选:", candidate) # 处理候选,例如打印候选信息
async def run():
pc = RTCPeerConnection()
# 添加本地媒体
player = MediaPlayer('D:\\ceshi\\guoqing.mp4')
# player = MediaPlayer('D:\\ceshi\\guoqing.mp4')
pc.addTrack(player.video)
pc.addTrack(player.audio) # 确保使用 audio
# 监听 ICE 候选
pc.onicecandidate = lambda candidate: asyncio.create_task(send_candidate(candidate))
# 创建 offer
offer = await pc.createOffer()
print("本地生成的SDP:", offer.sdp) # 打印本地 SDP
await pc.setLocalDescription(offer)
# 发送 offer 并接收 answer
answer = await send_sdp(offer)
# 设置远程描述
await pc.setRemoteDescription(answer)
# 监听 ICE 连接状态变化
def on_connection_state_change():
print("连接状态:", pc.connectionState)
print("ICE 连接状态:", pc.iceConnectionState)
if pc.connectionState == "connected":
print("连接已建立!")
elif pc.connectionState == "disconnected":
print("连接已断开!")
elif pc.connectionState == "failed":
print("连接失败!")
elif pc.connectionState == "closed":
print("连接已关闭!")
pc.onconnectionstatechange = on_connection_state_change
# 保持连接活跃
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(run())
上面为推流,下面为拉流
import asyncio
import wave
import aiohttp
import numpy as np
import pyaudio
from aiortc import RTCPeerConnection, RTCSessionDescription, MediaStreamTrack
from webRTC import send_candidate
# 自定义音频接收器
class AudioReceiver(MediaStreamTrack):
kind = "audio"
def __init__(self):
super().__init__()
self.sample_rate = 16000 # 设置音频采样率
self.chunk_size = 1024
self.channels = 1
self.pyaudio = pyaudio.PyAudio()
self.stream = self.pyaudio.open(
format=pyaudio.paInt16,
channels=self.channels,
rate=self.sample_rate,
input=True,
frames_per_buffer=self.chunk_size,
input_device_index=1 # 根据需要调整输入设备索引
)
self.framecount = 0 # 初始化时间戳
# 创建 WAV 文件
self.wav_file = wave.open("output.wav", "wb")
self.wav_file.setnchannels(self.channels)
self.wav_file.setsampwidth(2) # 16位音频
self.wav_file.setframerate(self.sample_rate)
async def next_timestamp(self):
self.framecount += 1
return self.framecount, 160 / self.sample_rate
def create_audio_frame(self, data, sample_rate, channels, pts, time_base):
if not isinstance(data, bytes):
raise ValueError("Data must be a bytes object")
# 将一维字节数组转换为 NumPy 数组
samples_per_channel = len(data) // (channels * 2) # 每个样本 2 字节
samples = np.frombuffer(data, dtype=np.int16).reshape(samples_per_channel, channels)
# 这里我们不再创建 AudioFrame,只需返回样本数据
return samples
async def recv(self):
print("recv执行")
audio_chunk = self.stream.read(self.chunk_size)
# 获取帧时间戳
pts, time_base = await self.next_timestamp()
# 创建音频帧
audio_frame = self.create_audio_frame(
data=audio_chunk,
sample_rate=self.sample_rate,
channels=self.channels,
pts=pts,
time_base=time_base
)
# 写入 WAV 文件
self.wav_file.writeframes(audio_chunk) # 写入原始字节数据
return audio_chunk # 返回字节数据
def onmute(self):
print("音频流静音")
def onunmute(self):
print("音频流取消静音")
asyncio.create_task(self.recv()) # 在这里调用 recv 方法
def close(self):
# 关闭流和文件
self.stream.stop_stream()
self.stream.close()
self.pyaudio.terminate()
self.wav_file.close()
async def send_sdp(e_sdp):
url = "https://xxxxx:xxxx/rtc/v1/whip-play/?app=live&stream=livestream01"
async with aiohttp.ClientSession() as session:
async with session.post(
url,
data=e_sdp.sdp.encode(), # 将 SDP 字符串编码为字节
headers={
"Content-Type": "application/sdp",
"Content-Length": str(len(e_sdp.sdp))
},
ssl=False # 忽略 SSL 证书验证(不推荐在生产环境中使用)
) as response:
response_data = await response.text()
print("对方的SDP:成功")
return RTCSessionDescription(sdp=response_data, type='answer')
async def run():
pc = RTCPeerConnection()
# 添加音频接收器
audio_receiver = AudioReceiver()
pc.addTrack(audio_receiver)
# 监听 ICE 候选
pc.onicecandidate = lambda candidate: asyncio.create_task(send_candidate(candidate))
# 创建 offer
offer = await pc.createOffer()
await pc.setLocalDescription(offer)
# 发送 offer 并接收 answer
answer = await send_sdp(offer)
if answer is not None:
# 确保当前信令状态允许设置远端描述
if pc.signalingState == "have-local-offer":
await pc.setRemoteDescription(answer)
else:
print(f"错误:当前信令状态为 {pc.signalingState},无法处理答案")
else:
print("错误:未能获取有效的 SDP 答复")
# 监听连接状态变化
pc.onconnectionstatechange = lambda: print("连接状态:", pc.connectionState)
# 保持连接活跃
while True:
try:
audio_data = await audio_receiver.recv()
except Exception as e:
print("接收音频数据时出错:", e)
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(run())