import sounddevice as sd import pvporcupine import struct import websocket import threading import hmac import hashlib import base64 import json import time import urllib.parse import uuid import queue import sys # 原代码9. 音频采集与WebSocket + 10. 唤醒词监听完整逻辑 class VoiceRecogController: def __init__(self, access_key, wakeup_word_path, model_path, appid, access_key_id, access_key_secret, tts_controller, feedback_text): # 接收调度脚本传入的参数,保持原逻辑 self.ACCESS_KEY = access_key self.WAKEUP_WORD_PATH = wakeup_word_path self.MODEL_PATH = model_path self.APPID = appid self.ACCESS_KEY_ID = access_key_id self.ACCESS_KEY_SECRET = access_key_secret self.tts_controller = tts_controller self.FEEDBACK_TEXT = feedback_text self.SAMPLE_RATE = 16000 self.CHANNELS = 1 self.SAMPLE_FORMAT = "int16" self.INTERACTION_TIMEOUT = 30 self.audio_q = queue.Queue() self.stream = None # 麦克风流后续初始化 def wakeup_listener(self): """原代码10. 唤醒词监听""" try: porcupine = pvporcupine.create( access_key=self.ACCESS_KEY, keyword_paths=[self.WAKEUP_WORD_PATH], model_path=self.MODEL_PATH ) print(f"\n🎯 唤醒词引擎就绪(采样率:{porcupine.sample_rate})") wakeup_mic = sd.RawInputStream( samplerate=porcupine.sample_rate, blocksize=porcupine.frame_length, dtype="int16", channels=1 ) print("📢 等待唤醒词「小黄鸭」(按Ctrl+C退出)") with wakeup_mic: while True: pcm_data, _ = wakeup_mic.read(porcupine.frame_length) pcm_unpacked = struct.unpack_from("h" * porcupine.frame_length, pcm_data) if porcupine.process(pcm_unpacked) >= 0: print("🚀 检测到唤醒词「小黄鸭」!") # 播放唤醒反馈(同步执行) self.tts_controller.speak(self.FEEDBACK_TEXT["wakeup"]) porcupine.delete() return True except Exception as e: print(f"\n❌ 唤醒词监听失败:{str(e)}") print(" 排查:1. 唤醒词文件路径 2. 麦克风连接 3. PicoVoice Key有效性") sys.exit(1) def _audio_callback(self, indata, frames, t, status): """原代码9. 音频采集回调""" if status: print(f"⚠️ 音频异常:{status}") self.audio_q.put(bytes(indata)) def _create_ws_url(self): """原代码9. 创建WebSocket URL""" try: host = "office-api-ast-dx.iflyaisol.com" path = "/ast/communicate/v1" utc = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()) + "+0000" session_uuid = str(uuid.uuid4()) params = { "accessKeyId": self.ACCESS_KEY_ID, "appId": self.APPID, "samplerate": self.SAMPLE_RATE, "audio_encode": "pcm_s16le", "lang": "autodialect", "uuid": session_uuid, "utc": utc, } sorted_params = sorted(params.items(), key=lambda x: x[0]) base_string = "&".join( f"{urllib.parse.quote_plus(str(k))}={urllib.parse.quote_plus(str(v))}" for k, v in sorted_params ) signature = hmac.new( self.ACCESS_KEY_SECRET.encode("utf-8"), base_string.encode("utf-8"), hashlib.sha1 ).digest() signature = base64.b64encode(signature).decode("utf-8") query = base_string + "&signature=" + urllib.parse.quote_plus(signature) return f"wss://{host}{path}?{query}", session_uuid except Exception as e: print(f"❌ WebSocket URL生成失败:{str(e)}") return None, None def _on_message(self, ws, message, current_text, last_audio_time): """原代码9. WebSocket消息处理(接收全局变量引用)""" try: data = json.loads(message) if data.get("msg_type") == "result" and "cn" in data.get("data", {}): words = [ cw.get("w", "") for rt in data["data"]["cn"].get("st", {}).get("rt", []) for ws_item in rt.get("ws", []) for cw in ws_item.get("cw", []) ] if words: current_text[0] = "".join(words) # 用列表传引用,修改全局变量 last_audio_time[0] = time.time() print(f"🎧 识别中:{current_text[0]}", end="\r") except Exception as e: print(f"\n❌ 语音识别消息处理错误:{str(e)}") def _on_error(self, ws, error, is_processing): """原代码9. WebSocket错误处理""" if not is_processing[0]: print(f"\n❌ WebSocket连接错误:{str(error)}") def _on_close(self, ws, close_status_code, close_msg, current_text, final_result, stream): """原代码9. WebSocket关闭处理""" print(f"\n🔌 WebSocket连接关闭 | 状态码:{close_status_code}") if stream and stream.active: stream.stop() current_text[0] = "" final_result[0] = "" def _on_open(self, ws, stream, current_text, final_result, last_audio_time, is_processing, last_command_time, execute_callback): """新增 execute_callback 参数,用于接收指令执行函数""" def send_audio_and_handle(): print("\n🎤 指令已就绪!支持:") print(" - 运动:前进3秒、左转2秒 | - 图像识别:这是什么") print(" - 闲聊:今天天气怎么样 | - 音量:增大音量、减小音量\n") stream.start() current_text[0] = "" final_result[0] = "" last_command_time[0] = time.time() while True: try: # 1. 处理音频队列(避免堆积) while self.audio_q.qsize() > 5: self.audio_q.get_nowait() # 2. 发送音频数据(若队列有数据) audio_data = self.audio_q.get(timeout=0.5) ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) # 3. 指令识别与执行:有文本且2秒内无新音频时执行 if current_text[0] and (time.time() - last_audio_time[0]) > 2: final_result[0] = current_text[0].strip() if len(final_result[0]) > 0: # 确保指令有效 print(f"\n⏹ 最终指令:{final_result[0]}") # 调用回调函数执行指令(关键修复:直接在这里执行) execute_callback(final_result[0]) last_command_time[0] = time.time() # 更新最后操作时间 current_text[0] = "" # 执行后清空,避免重复识别 final_result[0] = "" time.sleep(1) # 等待指令执行完成 # 4. 超时检测:30秒无操作则关闭连接 if time.time() - last_command_time[0] > self.INTERACTION_TIMEOUT: print(f"\n⌛ {self.INTERACTION_TIMEOUT}秒无操作,关闭连接") self.tts_controller.speak(self.FEEDBACK_TEXT.get("wakeup_timeout", "长时间没操作,我先休息啦")) time.sleep(1) ws.send("close", websocket.ABNF.OPCODE_TEXT) break except queue.Empty: # 队列为空时,检测超时 if time.time() - last_command_time[0] > self.INTERACTION_TIMEOUT: print(f"\n⌛ {self.INTERACTION_TIMEOUT}秒无操作,关闭连接") self.tts_controller.speak(self.FEEDBACK_TEXT.get("wakeup_timeout", "长时间没操作,我先休息啦")) time.sleep(1) ws.send("close", websocket.ABNF.OPCODE_TEXT) break continue # 继续循环等待音频 except Exception as e: print(f"\n❌ 音频发送错误:{str(e)}") break audio_thread = threading.Thread(target=send_audio_and_handle, daemon=True) audio_thread.start() def start_websocket(self, current_text, final_result, last_audio_time, is_processing, last_command_time, execute_callback): """新增 execute_callback 参数,用于传递指令执行函数""" self.stream = sd.RawInputStream( samplerate=self.SAMPLE_RATE, channels=self.CHANNELS, dtype=self.SAMPLE_FORMAT, callback=self._audio_callback, ) ws_url, session_id = self._create_ws_url() if not ws_url: print("⚠️ 无法生成语音识别连接,3秒后重新监听...") time.sleep(3) return try: print(f"🔄 连接语音识别服务(会话ID:{session_id[:8]}...)") # 绑定WebSocket回调时传入 execute_callback ws = websocket.WebSocketApp( ws_url, on_open=lambda ws: self._on_open(ws, self.stream, current_text, final_result, last_audio_time, is_processing, last_command_time, execute_callback), on_message=lambda ws, msg: self._on_message(ws, msg, current_text, last_audio_time), on_error=lambda ws, err: self._on_error(ws, err, is_processing), on_close=lambda ws, status, msg: self._on_close(ws, status, msg, current_text, final_result, self.stream) ) ws.run_forever(ping_interval=10, ping_timeout=5) except Exception as e: print(f"❌ 语音识别连接失败:{str(e)}") print("⚠️ 3秒后重新监听唤醒词...") time.sleep(3)