import sqlite3
import time
import wave # 使用wave库可读、写wav类型的音频文件
from funasr import AutoModel
import sounddevice as sd
import numpy as np
from modelscope import pipeline, Tasks
from pypinyin import lazy_pinyin
import pyaudio # 使用pyaudio库可以进行录音,播放,生成wav文件
# 模型参数设置
chunk_size = [0, 10, 5]
encoder_chunk_look_back = 7
decoder_chunk_look_back = 5
is_task_running= True
model = AutoModel(model="D:\SpeechRecognize\speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch")
# 假设模型要求的采样率为 16000
fs = 16000
duration = 3 #时间
chunk_stride = chunk_size[1] * 960
cache = {}
window_size = 3
# 连接到 SQLite 数据库,如果不存在则会创建新的数据库文件
conn = sqlite3.connect('speech_recognition.db')
cursor = conn.cursor()
# 创建表格
cursor.execute('''
CREATE TABLE IF NOT EXISTS speech_data
(text TEXT, time_stamp TEXT, batch TEXT)
''')
def record(time): # 录音程序
# 定义数据流块
CHUNK = 1024 # 音频帧率(也就是每次读取的数据是多少,默认1024)
FORMAT = pyaudio.paInt16 # 采样时生成wav文件正常格式
CHANNELS = 1 # 音轨数(每条音轨定义了该条音轨的属性,如音轨的音色、音色库、通道数、输入/输出端口、音量等。可以多个音轨,不唯一)
RATE = 16000 # 采样率(即每秒采样多少数据)
RECORD_SECONDS = time # 录音时间
WAVE_OUTPUT_FILENAME = "./output.wav" # 保存音频路径
p = pyaudio.PyAudio() # 创建PyAudio对象
stream = p.open(format=FORMAT, # 采样生成wav文件的正常格式
channels=CHANNELS, # 音轨数
rate=RATE, # 采样率
input=True, # Ture代表这是一条输入流,False代表这不是输入流
frames_per_buffer=CHUNK) # 每个缓冲多少帧
print("* 开始录音") # 开始录音标志
frames = [] # 定义frames为一个空列表
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): # 计算要读多少次,每秒的采样率/每次读多少数据*录音时间=需要读多少次
data = stream.read(CHUNK) # 每次读chunk个数据
frames.append(data) # 将读出的数据保存到列表中
print("* 结束语音") # 结束录音标志
stream.stop_stream() # 停止输入流
stream.close() # 关闭输入流
p.terminate() # 终止pyaudio
wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb') # 以'wb‘二进制流写的方式打开一个文件
wf.setnchannels(CHANNELS) # 设置音轨数
wf.setsampwidth(p.get_sample_size(FORMAT)) # 设置采样点数据的格式,和FOMART保持一致
wf.setframerate(RATE) # 设置采样率与RATE要一致
wf.writeframes(b''.join(frames)) # 将声音数据写入文件
wf.close() # 数据流保存完,关闭文件
while is_task_running:
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
myrecording = sd.rec(int(fs * duration), samplerate=fs, channels=1)
sd.wait()
speech_chunk = myrecording.flatten()
# 噪声处理
filtered_chunk = np.convolve(speech_chunk, np.ones(window_size) / window_size, mode='same')
speech_chunk = filtered_chunk
is_final = False
res = model.generate(input=speech_chunk, cache=cache, is_final=is_final, chunk_size=chunk_size,
encoder_chunk_look_back=encoder_chunk_look_back,
decoder_chunk_look_back=decoder_chunk_look_back)
text_result=''.join(lazy_pinyin(str(res[0]['text']))).replace(" ", "")
# 唤醒词
s1=''.join(lazy_pinyin(str("小爱")))
if s1 in text_result:
#关闭循环
is_task_running ==False
print("已唤醒,开始录音")
record(5) # 定义录音时间,单位/s
inference_pipeline = pipeline(
task=Tasks.auto_speech_recognition,
model='D:/SpeechRecognize/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch',
model_revision="v2.0.4")
rec_result = inference_pipeline('./output.wav', hotword='')
same = ''.join(lazy_pinyin(rec_result[0]["text"].replace(" ", "")))
print("语音转文字" + same)
#匹配字符关键词
#关键词1 、、、、
g1 = ''.join(lazy_pinyin(str("打开空调")))
if g1 in same:
#通讯发送消息,我会提供五种硬件通讯方式 MTTT、Socket、ModBusTcpIP、串口、HTTP请求
print("发送给设备")
is_task_running == True
cursor.execute("INSERT INTO speech_data (text, time_stamp, batch) VALUES (?,?,?)",
(text_result, start_time, 'eerr'))
conn.commit()