自动监听+语音识别

发布于 2021-08-19  30 次阅读


起因。

为什么有这么个模块呢,源于给朋友优化网络。

林一的一位朋友是个主播,经常需要抢麦序。故找到我优化网络。然后就有了这个模块的搭建。

准备工

  • VScode or sublime
  • Python 3.8
  • pyaudio
  • threading
  • wave
  • numpy
  • baidu-Aip
  • json
  • pynput
  • from urllib.request import urlopen
  • from urllib.request import Request
  • from urllib.error import URLError
  • from urllib.parse import urlencode
  • from urllib2 import urlopen
  • from urllib2 import Request
  • from urllib2 import URLError
  • from urllib import urlencode

下面是部分链接..

pyaudio 是重点,下面是说明文档

PyAudio · PyPI and PyAudio Documentation — PyAudio 0.2.11 documentation (mit.edu

baidu API 官网 语音识别_语音识别技术_百度语音识别-百度AI开放平台 (baidu.com)

baidu 说明文档 语音技术 (baidu.com)

官方参数

属性类型默认值必填说明
durationnumber60000百度语音restapi最大支持 60s,即这个值不能超过60000
sampleRatenumber16000可设为16000或8000
numberOfChannelsnumber1比如设为1,单声道
encodeBitRatenumber48000默认值即可,建议48000,可设为24000-96000。该值越大的话,生成文件越大
formatstringaac默认值即可,只支持aac,不支持mp3

字段名类型可需描述
formatstring必填语音文件的格式,pcm/wav/amr/m4a。不区分大小写。推荐pcm文件
rateint必填采样率,16000、8000,固定值
channelint必填声道数,仅支持单声道,请填写固定值 1
cuidstring必填用户唯一标识,用来区分用户,计算UV值。建议填写能区分用户的机器 MAC 地址或 IMEI 码,长度为60字符以内。
tokenstring必填开放平台获取到的开发者[access_token]获取 Access Token "access_token")
dev_pidint选填不填写lan参数生效,都不填写,默认1537(普通话 输入法模型),dev_pid参数见本节开头的表格
lm_idint选填自训练平台模型id,填dev_pid = 8001 或 8002生效
lanstring选填,废弃参数历史兼容参数,已不再使用
speechstring必填本地语音文件的二进制语音数据 ,需要进行base64 编码。与len参数连一起使用。
lenint必填本地语音文件的的字节数,单位字节
字段名可需描述
cuid必填用户唯一标识,用来区分用户,计算 UV 值。建议填写能区分用户的机器 MAC 地址或 IMEI 码,长度为 60 字符以内。
token必填开放平台获取到的开发者 [access_token] 获取 Access Token "access_token")
dev_pid选填不填写 lan 参数生效,都不填写,默认 1537(普通话 输入法模型),dev_pid 参数见本节开头的表格
lm_idint选填
lan选填,废弃参数历史兼容参数,已不再使用。
dev_pid语言模型是否有标点备注
1537普通话(纯中文识别)输入法模型有标点支持自定义词库
1737英语英语模型无标点不支持自定义词库
1637粤语粤语模型有标点不支持自定义词库
1837四川话四川话模型有标点不支持自定义词库
1936普通话远场远场模型有标点不支持自定义词库

上传示例

操作步骤

申请百度语音模块API 权限. 获得. token (AppID,API Key,Secret Key)

在这里插入图片描述

登录百度云,创建应用,获取token

在这里插入图片描述

代码部分

# coding=utf-8

import sys
import json
import time

IS_PY3 = sys.version_info.major == 3

if IS_PY3:
    from urllib.request import urlopen
    from urllib.request import Request
    from urllib.error import URLError
    from urllib.parse import urlencode

    timer = time.perf_counter
else:
    import urllib2
    from urllib2 import urlopen
    from urllib2 import Request
    from urllib2 import URLError
    from urllib import urlencode

    if sys.platform == "win32":
        timer = time.clock
    else:
        # On most other platforms the best timer is time.time()
        timer = time.time

API_KEY = 'your api_key'
SECRET_KEY = 'your_secret_key'

# 需要识别的文件
AUDIO_FILE = './audio/1111.wav'  # 只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式
# 文件格式
FORMAT = AUDIO_FILE[-3:];  # 文件后缀只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式

CUID = '123456PYTHON';
# 采样率
RATE = 16000;  # 固定值

# 普通版

DEV_PID = 1537;  # 1537 表示识别普通话,使用输入法模型。根据文档填写PID,选择语言及识别模型
ASR_URL = 'http://vop.baidu.com/server_api'
SCOPE = 'audio_voice_assistant_get'  # 有此scope表示有asr能力,没有请在网页里勾选,非常旧的应用可能没有

#测试自训练平台需要打开以下信息, 自训练平台模型上线后,您会看见 第二步:“”获取专属模型参数pid:8001,modelid:1234”,按照这个信息获取 dev_pid=8001,lm_id=1234
# DEV_PID = 8001 ;   
# LM_ID = 1234 ;

# 极速版 打开注释的话请填写自己申请的appkey appSecret ,并在网页中开通极速版(开通后可能会收费)

#DEV_PID = 80001
#ASR_URL = 'http://vop.baidu.com/pro_api'
#SCOPE = 'brain_enhanced_asr'  # 有此scope表示有asr能力,没有请在网页里开通极速版

# 忽略scope检查,非常旧的应用可能没有
# SCOPE = False


# 极速版

class DemoError(Exception):
    pass


"""  TOKEN start """

TOKEN_URL = 'http://openapi.baidu.com/oauth/2.0/token'


def fetch_token():
    params = {'grant_type': 'client_credentials',
              'client_id': API_KEY,
              'client_secret': SECRET_KEY}
    post_data = urlencode(params)
    if (IS_PY3):
        post_data = post_data.encode('utf-8')
    req = Request(TOKEN_URL, post_data)
    try:
        f = urlopen(req)
        result_str = f.read()
    except URLError as err:
        print('token http response http code : ' + str(err.code))
        result_str = err.read()
    if (IS_PY3):
        result_str = result_str.decode()

    print(result_str)
    result = json.loads(result_str)
    print(result)
    if ('access_token' in result.keys() and 'scope' in result.keys()):
        if SCOPE and (not SCOPE in result['scope'].split(' ')):  # SCOPE = False 忽略检查
            raise DemoError('scope is not correct')
        print('SUCCESS WITH TOKEN: %s ; EXPIRES IN SECONDS: %s' % (result['access_token'], result['expires_in']))
        return result['access_token']
    else:
        raise DemoError('MAYBE API_KEY or SECRET_KEY not correct: access_token or scope not found in token response')


"""  TOKEN end """

if __name__ == '__main__':
    token = fetch_token()

    """
    httpHandler = urllib2.HTTPHandler(debuglevel=1)
    opener = urllib2.build_opener(httpHandler)
    urllib2.install_opener(opener)
    """

    speech_data = []
    with open(AUDIO_FILE, 'rb') as speech_file:
        speech_data = speech_file.read()
    length = len(speech_data)
    if length == 0:
        raise DemoError('file %s length read 0 bytes' % AUDIO_FILE)

    params = {'cuid': CUID, 'token': token, 'dev_pid': DEV_PID}
    #测试自训练平台需要打开以下信息
    #params = {'cuid': CUID, 'token': token, 'dev_pid': DEV_PID, 'lm_id' : LM_ID}
    params_query = urlencode(params);

    headers = {
        'Content-Type': 'audio/' + FORMAT + '; rate=' + str(RATE),
        'Content-Length': length
    }

    url = ASR_URL + "?" + params_query
    print("url is", url);
    print("header is", headers)
    # print post_data
    req = Request(ASR_URL + "?" + params_query, speech_data, headers)
    try:
        begin = timer()
        f = urlopen(req)
        result_str = f.read()
        print("Request time cost %f" % (timer() - begin))
    except  URLError as err:
        print('asr http response http code : ' + str(err.code))
        result_str = err.read()

    if (IS_PY3):
        result_str = str(result_str, 'utf-8')
    print(result_str)
    with open("result.txt", "w") as of:
        of.write(result_str)

测试代码

# _*_ coding: utf-8 _*_

# 录音机,用于录制声卡播放的声音(内录)
# 可以录制鼠标操作,用于在开始录音时回放原先的鼠标操作

import os
#导入音频处理模块
import pyaudio
import threading
import wave
import time
from datetime import datetime
#导入控制与监控键盘和鼠标的模块
from pynput import keyboard,mouse
from pynput.keyboard import Key, Controller
import numpy as np

#baidu
import sys
import json

IS_PY3 = sys.version_info.major == 3

if IS_PY3:
    from urllib.request import urlopen
    from urllib.request import Request
    from urllib.error import URLError
    from urllib.parse import urlencode

    timer = time.perf_counter
else:
    import urllib2
    from urllib2 import urlopen
    from urllib2 import Request
    from urllib2 import URLError
    from urllib import urlencode

    if sys.platform == "win32":
        timer = time.clock
    else:
        # On most other platforms the best timer is time.time()
        timer = time.time

#录音类 
class Recorder():
    def __init__(self, chunk=1024, channels=1, rate=16000):
       self.CHUNK = chunk
       self.FORMAT = pyaudio.paInt16
       self.CHANNELS = channels
       self.RATE = rate
       self._running = True
       self._frames = []
       #录音开始时间
       self.recBegin =0
       #录音时长
       self.recTime =0  

    #获取内录设备序号,在windows操作系统上测试通过,hostAPI = 0 表明是MME设备
    def findInternalRecordingDevice(self,p):
       #要找查的设备名称中的关键字
       target = '立体声混音'
       #逐一查找声音设备  
       for i in range(p.get_device_count()):
           devInfo = p.get_device_info_by_index(i)   

           if devInfo['name'].find(target)>=0 and devInfo['hostApi'] == 0 :      
               #print('已找到内录设备,序号是 ',i)
               return i
       print('无法找到内录设备!')
       return -1

    #开始录音,开启一个新线程进行录音操作
    def start(self):
       print("正在录音...")  
       threading._start_new_thread(self.srecord, ())

    #执行录音的线程函数
    def srecord(self):
       self.recBegin = time.time()
       self._running = True
       self._frames = []
       print ("a")
       p = pyaudio.PyAudio()

       #查找内录设备
       dev_idx = self.findInternalRecordingDevice(p)

       if dev_idx < 0 :            
           return
       #在打开输入流时指定输入设备
       stream = p.open(input_device_index=dev_idx,
                       format=self.FORMAT,
                       channels=self.CHANNELS,
                       rate=self.RATE,
                       input=True,
                       frames_per_buffer=self.CHUNK)
       #循环读取输入流
       while(self._running):
          data = stream.read(self.CHUNK)
          self._frames.append(data)
          audio_data = np.fromstring(data, dtype=np.short)
          large_sample_count = np.sum( audio_data > 800 )
          temp = np.max(audio_data)
          if temp > 800 :
              print("检测到信号")
              print('当前阈值:',temp) 
              threading._start_new_thread(self.__record, ())
              threading._start_new_thread(self.stop_time, ())
              break

       #停止读取输入流  
       stream.stop_stream()
       #关闭输入流
       stream.close()
       #结束pyaudio
       p.terminate()
       return

    def __record(self):
      print ('b')
      self.recBegin = time.time()
      self._running = True
      self._frames = []

      p = pyaudio.PyAudio()
      #查找内录设备
      dev_idx = self.findInternalRecordingDevice(p)

      if dev_idx < 0 :            
         return
      #在打开输入流时指定输入设备
      stream = p.open(input_device_index=dev_idx,
                     format=self.FORMAT,
                     channels=self.CHANNELS,
                     rate=self.RATE,
                     input=True,
                     frames_per_buffer=self.CHUNK)
      #循环读取输入流
      while(self._running):
         data = stream.read(self.CHUNK)
         self._frames.append(data)


      #停止读取输入流  
      stream.stop_stream()
      #关闭输入流
      stream.close()
      #结束pyaudio
      p.terminate()
      return
    def stop_time(self):
      time.sleep(4.4)
      recorder.stop() 
    #停止录音
    def stop(self):
       self._running = False
       self.recTime = time.time() - self.recBegin
       print("录音已停止")       
       print('录音时间为%ds'%self.recTime)
       #以当前时间为关键字保存wav文件
       self.save("record/"+"test.wav")
       print ('---save---')
       get_BD_result()



    #保存到文件
    def save(self, fileName):   
       #创建pyAudio对象
       p = pyaudio.PyAudio()
       #打开用于保存数据的文件
       wf = wave.open(fileName, 'wb')
       #设置音频参数
       wf.setnchannels(self.CHANNELS)
       wf.setsampwidth(p.get_sample_size(self.FORMAT))
       wf.setframerate(self.RATE)
       #写入数据
       wf.writeframes(b''.join(self._frames))
       #关闭文件
       wf.close()
       #结束pyaudio
       p.terminate()

#鼠标宏 ,目前只记录与回放click操作
class MouseMacro():
    def __init__(self):        
       #指示是否记录鼠标事件
       self.enabled = False
       #模拟鼠标的控制器对象
       self.mouseCtrl = mouse.Controller()
       #记录鼠标点击位置的列表
       self.mouseMacroList=[]

    #开始记录鼠标宏操作
    def beginMouseMacro(self):
       print('开始记录鼠标宏')
       self.mouseMacroList=[]
       self.enabled=True

    #记录鼠标宏操作
    def recordMouse(self,event):
       print('记录鼠标事件',event)
       self.mouseMacroList.append(event)        

    #停止记录鼠标宏操作
    def endMouseMacro(self):
       self.enabled=False
       print('停止记录鼠标宏!')        

    #回放录制的鼠标宏操作
    def playMouseMacro(self):
       if len(self.mouseMacroList) > 0:
           print('回放鼠标宏:',self.mouseMacroList)
       for pos in self.mouseMacroList:
           self.mouseCtrl.position = pos
           self.mouseCtrl.click(mouse.Button.left,1)

#监控按键
def on_keyPress(key):
    try:
       #print('key {0} pressed'.format( key.char))
       
       #开始录音
       if key.char == 'a':
           #录音前回放鼠标宏
           recorder.start()
       #停止录音
       if key.char == 's':
           recorder.stop() 

       #开始录制鼠标事件

       #退出程序
       if key.char == 'x':           
           #mouse_listener.stop()将停止对鼠标的监听
           #返回 False 将使键盘对应的lisenter停止监听
           return False 

    except Exception as e: 
       print(e)


#监控鼠标
def on_click(x, y, button, pressed): 
    #print('{0} at {1}'.format('Pressed' if pressed else 'Released',(x, y)))

    #如果正在录制鼠标宏,记录鼠标的点击位置
    if pressed and mouseMacro.enabled:
       mouseMacro.recordMouse((x,y))    
    return True

###############Baidu

API_KEY = 'your key'
SECRET_KEY = 'your key'

AUDIO_FILE = './record/test.wav'  # 只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式
# 文件格式
FORMAT = AUDIO_FILE[-3:];  # 文件后缀只支持 pcm/wav/amr 格式,极速版额外支持m4a 格式
CUID = '123456PYTHON';
# 采样率
RATE = 16000;  # 固定值

DEV_PID = 1537;  # 1537 表示识别普通话,使用输入法模型。根据文档填写PID,选择语言及识别模型
ASR_URL = 'http://vop.baidu.com/server_api'
SCOPE = 'audio_voice_assistant_get'  # 有此scope表示有asr能力,没有请在网页里勾选,非常旧的应用可能没有

TOKEN_URL = 'http://openapi.baidu.com/oauth/2.0/token'
def fetch_token():
    params = {'grant_type': 'client_credentials',
              'client_id': API_KEY,
              'client_secret': SECRET_KEY}
    post_data = urlencode(params)
    if (IS_PY3):
        post_data = post_data.encode('utf-8')
    req = Request(TOKEN_URL, post_data)
    try:
        f = urlopen(req)
        result_str = f.read()
    except URLError as err:
        print('token http response http code : ' + str(err.code))
        result_str = err.read()
    if (IS_PY3):
        result_str = result_str.decode()

    print(result_str)
    result = json.loads(result_str)
    print(result)
    if ('access_token' in result.keys() and 'scope' in result.keys()):
        if SCOPE and (not SCOPE in result['scope'].split(' ')):  # SCOPE = False 忽略检查
            raise DemoError('scope is not correct')
        print('SUCCESS WITH TOKEN: %s ; EXPIRES IN SECONDS: %s' % (result['access_token'], result['expires_in']))
        return result['access_token']
    else:
        raise DemoError('MAYBE API_KEY or SECRET_KEY not correct: access_token or scope not found in token response')


"""  TOKEN end """

def get_BD_result():
    token = fetch_token()

    """
    httpHandler = urllib2.HTTPHandler(debuglevel=1)
    opener = urllib2.build_opener(httpHandler)
    urllib2.install_opener(opener)
    """

    speech_data = []
    with open(AUDIO_FILE, 'rb') as speech_file:
        speech_data = speech_file.read()
    length = len(speech_data)
    if length == 0:
        raise DemoError('file %s length read 0 bytes' % AUDIO_FILE)

    params = {'cuid': CUID, 'token': token, 'dev_pid': DEV_PID}
    #测试自训练平台需要打开以下信息
    #params = {'cuid': CUID, 'token': token, 'dev_pid': DEV_PID, 'lm_id' : LM_ID}
    params_query = urlencode(params);

    headers = {
        'Content-Type': 'audio/' + FORMAT + '; rate=' + str(RATE),
        'Content-Length': length
    }

    url = ASR_URL + "?" + params_query
    print("url is", url);
    print("header is", headers)
    # print post_data
    req = Request(ASR_URL + "?" + params_query, speech_data, headers)
    try:
        begin = timer()
        f = urlopen(req)
        result_str = f.read()
        print("Request time cost %f" % (timer() - begin))
    except  URLError as err:
        print('asr http response http code : ' + str(err.code))
        result_str = err.read()

    if (IS_PY3):
        result_str = str(result_str, 'utf-8')
    #print(result_str['result'])
    print (json.loads(result_str)['result'])

    with open("result.txt", "w") as of:
        of.write(result_str)
    
    result_find = json.loads(result_str)['result'][0]
    put_key(result_find)

def put_key(result_str):
    print ('put_key_ok')
    kb = keyboard.Controller()
    #kvkey = keyboard.Key()
    if '去' or '六' in result_str:
        print ('result:6')
        kb.type('6')
        kb.press(keyboard.Key.enter)

   
if __name__ == "__main__":

    #检测当前目录下是否有record子目录
    if not os.path.exists('record'):
       os.makedirs('record')

    print("\npython 录音机 ....\n")
    print("----------------- 提示 ------------------------------------------\n") 
    print("按 a 键 开始录音,     按 s 键 停止录音 ,     按 x 键 退出程序 ") 
    print("-----------------------------------------------------------------\n") 

    #创建录音机对象
    recorder = Recorder() 


    #开始监听鼠标与键盘
    keyboard_listener=keyboard.Listener(on_press=on_keyPress)
    lst=[keyboard_listener]
    for t in lst:
       t.start()
    for t in lst:
       t.join()