语音识别控制(软件、硬件)-2. 完整代码

时间:2024-09-30 16:23:41
import sqlite3
import time
import wave  # 使用wave库可读、写wav类型的音频文件
from funasr import AutoModel
import sounddevice as sd
import numpy as np
from modelscope import pipeline, Tasks
from pypinyin import lazy_pinyin
import pyaudio  # 使用pyaudio库可以进行录音,播放,生成wav文件
# 模型参数设置
chunk_size = [0, 10, 5]
encoder_chunk_look_back = 7
decoder_chunk_look_back = 5
is_task_running= True
model = AutoModel(model="D:\SpeechRecognize\speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch")

# 假设模型要求的采样率为 16000
fs = 16000
duration = 3 #时间
chunk_stride = chunk_size[1] * 960
cache = {}
window_size = 3

# 连接到 SQLite 数据库,如果不存在则会创建新的数据库文件
conn = sqlite3.connect('speech_recognition.db')
cursor = conn.cursor()

# 创建表格
cursor.execute('''
    CREATE TABLE IF NOT EXISTS speech_data
    (text TEXT, time_stamp TEXT, batch TEXT)
''')

def record(time):  # 录音程序
    # 定义数据流块
    CHUNK = 1024  # 音频帧率(也就是每次读取的数据是多少,默认1024FORMAT = pyaudio.paInt16  # 采样时生成wav文件正常格式
    CHANNELS = 1  # 音轨数(每条音轨定义了该条音轨的属性,如音轨的音色、音色库、通道数、输入/输出端口、音量等。可以多个音轨,不唯一)
    RATE = 16000  # 采样率(即每秒采样多少数据)
    RECORD_SECONDS = time  # 录音时间
    WAVE_OUTPUT_FILENAME = "./output.wav"  # 保存音频路径
    p = pyaudio.PyAudio()  # 创建PyAudio对象
    stream = p.open(format=FORMAT,  # 采样生成wav文件的正常格式
                    channels=CHANNELS,  # 音轨数
                    rate=RATE,  # 采样率
                    input=True,  # Ture代表这是一条输入流,False代表这不是输入流
                    frames_per_buffer=CHUNK)  # 每个缓冲多少帧
    print("* 开始录音")  # 开始录音标志
    frames = []  # 定义frames为一个空列表
    for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):  # 计算要读多少次,每秒的采样率/每次读多少数据*录音时间=需要读多少次
        data = stream.read(CHUNK)  # 每次读chunk个数据
        frames.append(data)  # 将读出的数据保存到列表中
    print("* 结束语音")  # 结束录音标志
    stream.stop_stream()  # 停止输入流
    stream.close()  # 关闭输入流
    p.terminate()  # 终止pyaudio
    wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')  # 以'wb‘二进制流写的方式打开一个文件
    wf.setnchannels(CHANNELS)  # 设置音轨数
    wf.setsampwidth(p.get_sample_size(FORMAT))  # 设置采样点数据的格式,和FOMART保持一致
    wf.setframerate(RATE)  # 设置采样率与RATE要一致
    wf.writeframes(b''.join(frames))  # 将声音数据写入文件
    wf.close()  # 数据流保存完,关闭文件
    while is_task_running:
    start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    myrecording = sd.rec(int(fs * duration), samplerate=fs, channels=1)
    sd.wait()
    speech_chunk = myrecording.flatten()
    # 噪声处理
    filtered_chunk = np.convolve(speech_chunk, np.ones(window_size) / window_size, mode='same')
    speech_chunk = filtered_chunk
    is_final = False
    res = model.generate(input=speech_chunk, cache=cache, is_final=is_final, chunk_size=chunk_size,
                         encoder_chunk_look_back=encoder_chunk_look_back,
                         decoder_chunk_look_back=decoder_chunk_look_back)
    text_result=''.join(lazy_pinyin(str(res[0]['text']))).replace(" ", "")
    # 唤醒词
    s1=''.join(lazy_pinyin(str("小爱")))
    if s1 in text_result:
        #关闭循环
        is_task_running ==False
        print("已唤醒,开始录音")
        record(5)  # 定义录音时间,单位/s
        inference_pipeline = pipeline(
            task=Tasks.auto_speech_recognition,
            model='D:/SpeechRecognize/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch',
            model_revision="v2.0.4")
        rec_result = inference_pipeline('./output.wav', hotword='')
        same = ''.join(lazy_pinyin(rec_result[0]["text"].replace(" ", "")))
        print("语音转文字" + same)
        #匹配字符关键词
        #关键词1 、、、、
        g1 = ''.join(lazy_pinyin(str("打开空调")))
        if g1 in same:
            #通讯发送消息,我会提供五种硬件通讯方式 MTTTSocketModBusTcpIP、串口、HTTP请求
            print("发送给设备")
        is_task_running == True
    cursor.execute("INSERT INTO speech_data (text, time_stamp, batch) VALUES (?,?,?)",
                   (text_result, start_time, 'eerr'))
    conn.commit()