python编写webRTC推拉流脚本,推自定义音频文件,获取音频流写入文件

时间:2024-10-21 22:08:21
import asyncio
import aiohttp
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer


async def send_sdp(e_sdp):
    url = "https://xxxxx:xxxx/rtc/v1/whip/?app=live&stream=livestream"

    async with aiohttp.ClientSession() as session:
        async with session.post(
                url,
                data=e_sdp.sdp.encode(),  # 将 SDP 字符串编码为字节
                headers={
                    "Content-Type": "application/sdp",
                    "Content-Length": str(len(e_sdp.sdp))
                },
                ssl=False  # 忽略 SSL 证书验证(不推荐在生产环境中使用)
        ) as response:
            response_data = await response.text()
            print("对方的SDP:", response_data)
            return RTCSessionDescription(sdp=response_data, type='answer')


async def send_candidate(candidate):
    if candidate:
        print("收集到的候选:", candidate)  # 处理候选,例如打印候选信息


async def run():
    pc = RTCPeerConnection()

    # 添加本地媒体
    player = MediaPlayer('D:\\ceshi\\guoqing.mp4')
    # player = MediaPlayer('D:\\ceshi\\guoqing.mp4')
    pc.addTrack(player.video)
    pc.addTrack(player.audio)  # 确保使用 audio

    # 监听 ICE 候选
    pc.onicecandidate = lambda candidate: asyncio.create_task(send_candidate(candidate))

    # 创建 offer
    offer = await pc.createOffer()
    print("本地生成的SDP:", offer.sdp)  # 打印本地 SDP
    await pc.setLocalDescription(offer)

    # 发送 offer 并接收 answer
    answer = await send_sdp(offer)

    # 设置远程描述
    await pc.setRemoteDescription(answer)

    # 监听 ICE 连接状态变化
    def on_connection_state_change():
        print("连接状态:", pc.connectionState)
        print("ICE 连接状态:", pc.iceConnectionState)

        if pc.connectionState == "connected":
            print("连接已建立!")
        elif pc.connectionState == "disconnected":
            print("连接已断开!")
        elif pc.connectionState == "failed":
            print("连接失败!")
        elif pc.connectionState == "closed":
            print("连接已关闭!")

    pc.onconnectionstatechange = on_connection_state_change

    # 保持连接活跃
    while True:
        await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(run())

上面为推流,下面为拉流

import asyncio
import wave

import aiohttp
import numpy as np
import pyaudio
from aiortc import RTCPeerConnection, RTCSessionDescription, MediaStreamTrack

from webRTC import send_candidate


# 自定义音频接收器
class AudioReceiver(MediaStreamTrack):
    kind = "audio"

    def __init__(self):
        super().__init__()
        self.sample_rate = 16000  # 设置音频采样率
        self.chunk_size = 1024
        self.channels = 1
        self.pyaudio = pyaudio.PyAudio()
        self.stream = self.pyaudio.open(
            format=pyaudio.paInt16,
            channels=self.channels,
            rate=self.sample_rate,
            input=True,
            frames_per_buffer=self.chunk_size,
            input_device_index=1  # 根据需要调整输入设备索引
        )
        self.framecount = 0  # 初始化时间戳

        # 创建 WAV 文件
        self.wav_file = wave.open("output.wav", "wb")
        self.wav_file.setnchannels(self.channels)
        self.wav_file.setsampwidth(2)  # 16位音频
        self.wav_file.setframerate(self.sample_rate)

    async def next_timestamp(self):
        self.framecount += 1
        return self.framecount, 160 / self.sample_rate

    def create_audio_frame(self, data, sample_rate, channels, pts, time_base):
        if not isinstance(data, bytes):
            raise ValueError("Data must be a bytes object")

        # 将一维字节数组转换为 NumPy 数组
        samples_per_channel = len(data) // (channels * 2)  # 每个样本 2 字节
        samples = np.frombuffer(data, dtype=np.int16).reshape(samples_per_channel, channels)

        # 这里我们不再创建 AudioFrame,只需返回样本数据
        return samples

    async def recv(self):
        print("recv执行")
        audio_chunk = self.stream.read(self.chunk_size)

        # 获取帧时间戳
        pts, time_base = await self.next_timestamp()

        # 创建音频帧
        audio_frame = self.create_audio_frame(
            data=audio_chunk,
            sample_rate=self.sample_rate,
            channels=self.channels,
            pts=pts,
            time_base=time_base
        )

        # 写入 WAV 文件
        self.wav_file.writeframes(audio_chunk)  # 写入原始字节数据

        return audio_chunk  # 返回字节数据

    def onmute(self):
        print("音频流静音")

    def onunmute(self):
        print("音频流取消静音")
        asyncio.create_task(self.recv())  # 在这里调用 recv 方法

    def close(self):
        # 关闭流和文件
        self.stream.stop_stream()
        self.stream.close()
        self.pyaudio.terminate()
        self.wav_file.close()


async def send_sdp(e_sdp):
    url = "https://xxxxx:xxxx/rtc/v1/whip-play/?app=live&stream=livestream01"

    async with aiohttp.ClientSession() as session:
        async with session.post(
                url,
                data=e_sdp.sdp.encode(),  # 将 SDP 字符串编码为字节
                headers={
                    "Content-Type": "application/sdp",
                    "Content-Length": str(len(e_sdp.sdp))
                },
                ssl=False  # 忽略 SSL 证书验证(不推荐在生产环境中使用)
        ) as response:
            response_data = await response.text()
            print("对方的SDP:成功")
            return RTCSessionDescription(sdp=response_data, type='answer')


async def run():
    pc = RTCPeerConnection()

    # 添加音频接收器
    audio_receiver = AudioReceiver()
    pc.addTrack(audio_receiver)
    # 监听 ICE 候选
    pc.onicecandidate = lambda candidate: asyncio.create_task(send_candidate(candidate))
    # 创建 offer
    offer = await pc.createOffer()
    await pc.setLocalDescription(offer)

    # 发送 offer 并接收 answer
    answer = await send_sdp(offer)

    if answer is not None:
        # 确保当前信令状态允许设置远端描述
        if pc.signalingState == "have-local-offer":
            await pc.setRemoteDescription(answer)
        else:
            print(f"错误:当前信令状态为 {pc.signalingState},无法处理答案")
    else:
        print("错误:未能获取有效的 SDP 答复")
    # 监听连接状态变化
    pc.onconnectionstatechange = lambda: print("连接状态:", pc.connectionState)

    # 保持连接活跃
    while True:
        try:
            audio_data = await audio_receiver.recv()
        except Exception as e:
            print("接收音频数据时出错:", e)
        await asyncio.sleep(0.1)


if __name__ == "__main__":
    asyncio.run(run())