手把手教你使用Python第三方库PyAudio打造一款录音工具

时间:2022-09-04 16:01:49

手把手教你使用Python第三方库PyAudio打造一款录音工具

大家好,我是【(这是月亮的背面)】。今天给大家分享Python使用PyAudio制作录音工具:

最近有在使用屏幕录制软件录制桌面,在用的过程中突发奇想,使用python能不能做屏幕录制工具,也锻炼下自己的动手能力。接下准备写使用python如何做屏幕录制工具的系列文章:

  • 录制屏幕制作视频
  • 录制音频
  • 合成视频,音频
  • 基于Pyqt5制作可视化窗口

大概上述四个部分,希望自己能够尽快完善,上一篇文章利用opencv制作了屏幕录制部分,接下继续更新系列,使用python录制音频。

应用平台

  • windows 10
  • python 3.7

音频录制部分

音频录制与视频录制相似,也是以数据帧的方式录制保存,这次使用强大的第三方包PyAudio和内置的wave模块编写主要部分代码:pip install PyAudio

如果出现安装失败,可点击去此处下载对应.whl文件,cp37代表python3.7环境,64代表64位操作系统。假如不是下载对应的whl包会导致安装失败,下载完成后,cmd窗口下进入whl的所在目录,使用pip install PyAudio-xx.whl即可完成安装。

手把手教你使用Python第三方库PyAudio打造一款录音工具

音频录制主要代码:

  1. from pyaudio import PyAudio, paInt16, paContinue, paComplete
  2.  
  3. # 设置固定参数
  4. chunk = 1024 # 每个缓冲区的帧数
  5. format_sample = paInt16 # 采样位数
  6. channels = 2 # 声道:1,单声道;2,双声道
  7. fps = 44100 # 采样频率
  8.  
  9. # 这里采用回调的方式录制音频
  10. def callback(in_data, frame_count, time_info, status):
  11. """录制回调函数"""
  12. wf.writeframes(in_data)
  13. if xx: # 当某某条件满足时
  14. return in_data, paContinue
  15. else:
  16. return in_data, paComplete
  17.  
  18. # 实例化PyAudio
  19. p = PyAudio()
  20. stream = p.open(format=format_sample,
  21. channels=channels,
  22. rate=fps,
  23. frames_per_buffer=chunk,
  24. input=True,
  25. input_device_index=None, # 输入设备索引, None为默认设备
  26. stream_callback=callback # 回调函数
  27. )
  28. # 开始流录制
  29. stream.start_stream()
  30. # 判断流是否活跃
  31. while stream.is_active():
  32. time.sleep(0.1) # 0.1为灵敏度
  33. # 录制完成,关闭流及实例
  34. stream.stop_stream()
  35. stream.close()
  36. p.terminate()

采取流式并用回调函数录制,需要先定义保存音频文件,用wave新建音频二进制文件:

  1. import wave
  2. wf = wave.open('test.wav', 'wb')
  3. wf.setnchannels(channels)
  4. wf.setsampwidth(p.get_sample_size(format_sample))
  5. wf.setframerate(fps)

为了后续代码可以很好的与之结合复用,将上面的代码包装成类

  1. from pyaudio import PyAudio
  2.  
  3. class AudioRecord(PyAudio):
  4.  
  5. def __init__(self,):

源码于文末补充。

音频播放部分

播放部分代码与录制部分代码相差不大,核心部分:

  1. wf = wave.open('test.wav', 'rb')
  2. def callback(in_data, frame_count, time_info, status):
  3. data = wf.readframes(frame_count)
  4. return data, paContinue
  5.  
  6. stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
  7. channels=wf.getnchannels(),
  8. rate=wf.getframerate(),
  9. output=True,
  10. output_device_index=output_device_index, # 输入设备索引
  11. stream_callback=callback # 输出用回调函数
  12. )
  13. stream.start_stream()
  14. while stream.is_active():
  15. time.sleep(0.1)

目前暂时测试了.wav和.mp3格式可以正常录制及播放,其它类型格式音频可以自行调用代码进行测试。

GUI窗口所需属性值代码部分

考虑到GUI窗口能较为人性化的输出及输入值,编写该部分代码,内容含音频时长及获取输入设备及输出设备。

  1. # 音频时长
  2. duration = wf.getnframes() / wf.getframerate()
  1. # 获取系统目前已安装的输入输出设备
  2. dev_info = self.get_device_info_by_index(i)
  3. default_rate = int(dev_info['defaultSampleRate'])
  4. if not dev_info['hostApi'] and default_rate == fps and '映射器' not in dev_info['name']:
  5. if dev_info['maxInputChannels']:
  6. print('输入设备:', dev_info['name'])
  7. elif dev_info['maxOutputChannels']:
  8. print('输出设备:', dev_info['name'])

pynput监听键盘

在这部分代码也暂时使用pynput监听键盘来对录音做中断处理。可以调用上一篇文章中的键盘监听代码。

  1. def hotkey(self):
  2. """热键监听"""
  3. with keyboard.Listener(on_press=self.on_press) as listener:
  4. listener.join()
  5.  
  6. def on_press(self, key):
  7. try:
  8. if key.char == 't': # t键,录制结束,保存音频
  9. self.flag = True
  10. elif key.char == 'k': # k键,录制中止,删除文件
  11. self.flag = True
  12. self.kill = True
  13. except Exception as e:
  14. print(e)

功能与上一篇类似,不再赘述。

总结

大家好,我是【(这是月亮的背面)】。以上就是使用PyAudio调用windows的音频设备进行录制及播放的内容了,这篇文章带大家整体学习了使用类及其继承相关知识,用法在这只是展示了冰山一角,还有更多的知识等待着我们一起去探索!

源码:

  1. import wave
  2. import time
  3. from pathlib import Path
  4. from threading import Thread
  5. from pyaudio import PyAudio, paInt16, paContinue, paComplete
  6. from pynput import keyboard # pip install pynput
  7.  
  8.  
  9. class AudioRecord(PyAudio):
  10.  
  11. def __init__(self, channels=2):
  12. super().__init__()
  13. self.chunk = 1024 # 每个缓冲区的帧数
  14. self.format_sample = paInt16 # 采样位数
  15. self.channels = channels # 声道:1,单声道;2,双声道
  16. self.fps = 44100 # 采样频率
  17. self.input_dict = None
  18. self.output_dict = None
  19. self.stream = None
  20. self.filename = '~test.wav'
  21. self.duration = 0 # 音频时长
  22. self.flag = False
  23. self.kill = False
  24.  
  25. def __call__(self, filename):
  26. """重载文件名"""
  27. self.filename = filename
  28.  
  29. def callback_input(self, in_data, frame_count, time_info, status):
  30. """录制回调函数"""
  31. self.wf.writeframes(in_data)
  32. if not self.flag:
  33. return in_data, paContinue
  34. else:
  35. return in_data, paComplete
  36.  
  37. def callback_output(self, in_data, frame_count, time_info, status):
  38. """播放回调函数"""
  39. data = self.wf.readframes(frame_count)
  40. return data, paContinue
  41.  
  42. def open_stream(self, name):
  43. """打开录制流"""
  44. input_device_index = self.get_device_index(name, True) if name else None
  45. return self.open(format=self.format_sample,
  46. channels=self.channels,
  47. rate=self.fps,
  48. frames_per_buffer=self.chunk,
  49. input=True,
  50. input_device_index=input_device_index, # 输入设备索引
  51. stream_callback=self.callback_input
  52. )
  53.  
  54. def audio_record_run(self, name=None):
  55. """音频录制"""
  56. self.wf = self.save_audio_file(self.filename)
  57. self.stream = self.open_stream(name)
  58. self.stream.start_stream()
  59. while self.stream.is_active():
  60. time.sleep(0.1)
  61. self.wf.close()
  62. if self.kill:
  63. Path(self.filename).unlink()
  64. self.duration = self.get_duration(self.wf)
  65. print(self.duration)
  66. self.terminate_run()
  67.  
  68. def run(self, filename=None, name=None, record=True):
  69. """音频录制线程"""
  70. thread_1 = Thread(target=self.hotkey, daemon=True)
  71. if record:
  72. # 录制
  73. if filename:
  74. self.filename = filename
  75. thread_2 = Thread(target=self.audio_record_run, args=(name,))
  76. else:
  77. # 播放
  78. if not filename:
  79. raise Exception('未输入音频文件名,不能播放,请输入后再试!')
  80. thread_2 = Thread(target=self.read_audio, args=(filename, name,))
  81. thread_1.start()
  82. thread_2.start()
  83.  
  84. def read_audio(self, filename, name=None):
  85. """音频播放"""
  86. output_device_index = self.get_device_index(name, False) if name else None
  87. with wave.open(filename, 'rb') as self.wf:
  88. self.duration = self.get_duration(self.wf)
  89. self.stream = self.open(format=self.get_format_from_width(self.wf.getsampwidth()),
  90. channels=self.wf.getnchannels(),
  91. rate=self.wf.getframerate(),
  92. output=True,
  93. output_device_index=output_device_index, # 输出设备索引
  94. stream_callback=self.callback_output
  95. )
  96. self.stream.start_stream()
  97. while self.stream.is_active():
  98. time.sleep(0.1)
  99. print(self.duration)
  100. self.terminate_run()
  101.  
  102. @staticmethod
  103. def get_duration(wf):
  104. """获取音频时长"""
  105. return round(wf.getnframes() / wf.getframerate(), 2)
  106.  
  107. def get_in_out_devices(self):
  108. """获取系统输入输出设备"""
  109. self.input_dict = {}
  110. self.output_dict = {}
  111. for i in range(self.get_device_count()):
  112. dev_info = self.get_device_info_by_index(i)
  113. default_rate = int(dev_info['defaultSampleRate'])
  114. if not dev_info['hostApi'] and default_rate == self.fps and '映射器' not in dev_info['name']:
  115. if dev_info['maxInputChannels']:
  116. self.input_dict[dev_info['name']] = i
  117. elif dev_info['maxOutputChannels']:
  118. self.output_dict[dev_info['name']] = i
  119.  
  120. def get_device_index(self, name, input_in=True):
  121. """获取选定设备索引"""
  122. if input_in and self.input_dict:
  123. return self.input_dict.get(name, -1)
  124. elif not input_in and self.output_dict:
  125. return self.output_dict.get(name, -1)
  126.  
  127. def save_audio_file(self, filename):
  128. """音频文件保存"""
  129. wf = wave.open(filename, 'wb')
  130. wf.setnchannels(self.channels)
  131. wf.setsampwidth(self.get_sample_size(self.format_sample))
  132. wf.setframerate(self.fps)
  133. return wf
  134.  
  135. def terminate_run(self):
  136. """结束流录制或流播放"""
  137. if self.stream:
  138. self.stream.stop_stream()
  139. self.stream.close()
  140. self.terminate()
  141.  
  142. def hotkey(self):
  143. """热键监听"""
  144. with keyboard.Listener(on_press=self.on_press) as listener:
  145. listener.join()
  146.  
  147. def on_press(self, key):
  148. try:
  149. if key.char == 't': # t键,录制结束,保存音频
  150. self.flag = True
  151. elif key.char == 'k': # k键,录制中止,删除文件
  152. self.flag = True
  153. self.kill = True
  154. except Exception as e:
  155. print(e)
  156.  
  157.  
  158. if __name__ == '__main__':
  159. audio_record = AudioRecord()
  160. audio_record.get_in_out_devices()
  161. # 录制
  162. print(audio_record.input_dict)
  163. audio_record.run('test.mp3')
  164. # 播放
  165. print(audio_record.output_dict)
  166. audio_record.run('test.mp3', record=False)

小伙伴们,快快用实践一下吧!

原文链接:https://mp.weixin.qq.com/s/5GW1mcBNEm1hE5NY6nH7_w