Open_Duck_Mini_Interact/voice_recog_module.py

226 lines
10 KiB
Python
Raw Permalink Normal View History

2025-09-29 09:19:40 +08:00
import sounddevice as sd
import pvporcupine
import struct
import websocket
import threading
import hmac
import hashlib
import base64
import json
import time
import urllib.parse
import uuid
import queue
import sys
# 原代码9. 音频采集与WebSocket + 10. 唤醒词监听完整逻辑
class VoiceRecogController:
def __init__(self, access_key, wakeup_word_path, model_path, appid, access_key_id, access_key_secret, tts_controller, feedback_text):
# 接收调度脚本传入的参数,保持原逻辑
self.ACCESS_KEY = access_key
self.WAKEUP_WORD_PATH = wakeup_word_path
self.MODEL_PATH = model_path
self.APPID = appid
self.ACCESS_KEY_ID = access_key_id
self.ACCESS_KEY_SECRET = access_key_secret
self.tts_controller = tts_controller
self.FEEDBACK_TEXT = feedback_text
self.SAMPLE_RATE = 16000
self.CHANNELS = 1
self.SAMPLE_FORMAT = "int16"
self.INTERACTION_TIMEOUT = 30
self.audio_q = queue.Queue()
self.stream = None # 麦克风流后续初始化
def wakeup_listener(self):
"""原代码10. 唤醒词监听"""
try:
porcupine = pvporcupine.create(
access_key=self.ACCESS_KEY,
keyword_paths=[self.WAKEUP_WORD_PATH],
model_path=self.MODEL_PATH
)
print(f"\n🎯 唤醒词引擎就绪(采样率:{porcupine.sample_rate}")
wakeup_mic = sd.RawInputStream(
samplerate=porcupine.sample_rate,
blocksize=porcupine.frame_length,
dtype="int16",
channels=1
)
print("📢 等待唤醒词「小黄鸭」按Ctrl+C退出")
with wakeup_mic:
while True:
pcm_data, _ = wakeup_mic.read(porcupine.frame_length)
pcm_unpacked = struct.unpack_from("h" * porcupine.frame_length, pcm_data)
if porcupine.process(pcm_unpacked) >= 0:
print("🚀 检测到唤醒词「小黄鸭」!")
# 播放唤醒反馈(同步执行)
self.tts_controller.speak(self.FEEDBACK_TEXT["wakeup"])
porcupine.delete()
return True
except Exception as e:
print(f"\n❌ 唤醒词监听失败:{str(e)}")
print(" 排查1. 唤醒词文件路径 2. 麦克风连接 3. PicoVoice Key有效性")
sys.exit(1)
def _audio_callback(self, indata, frames, t, status):
"""原代码9. 音频采集回调"""
if status:
print(f"⚠️ 音频异常:{status}")
self.audio_q.put(bytes(indata))
def _create_ws_url(self):
"""原代码9. 创建WebSocket URL"""
try:
host = "office-api-ast-dx.iflyaisol.com"
path = "/ast/communicate/v1"
utc = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()) + "+0000"
session_uuid = str(uuid.uuid4())
params = {
"accessKeyId": self.ACCESS_KEY_ID,
"appId": self.APPID,
"samplerate": self.SAMPLE_RATE,
"audio_encode": "pcm_s16le",
"lang": "autodialect",
"uuid": session_uuid,
"utc": utc,
}
sorted_params = sorted(params.items(), key=lambda x: x[0])
base_string = "&".join(
f"{urllib.parse.quote_plus(str(k))}={urllib.parse.quote_plus(str(v))}"
for k, v in sorted_params
)
signature = hmac.new(
self.ACCESS_KEY_SECRET.encode("utf-8"),
base_string.encode("utf-8"),
hashlib.sha1
).digest()
signature = base64.b64encode(signature).decode("utf-8")
query = base_string + "&signature=" + urllib.parse.quote_plus(signature)
return f"wss://{host}{path}?{query}", session_uuid
except Exception as e:
print(f"❌ WebSocket URL生成失败{str(e)}")
return None, None
def _on_message(self, ws, message, current_text, last_audio_time):
"""原代码9. WebSocket消息处理接收全局变量引用"""
try:
data = json.loads(message)
if data.get("msg_type") == "result" and "cn" in data.get("data", {}):
words = [
cw.get("w", "")
for rt in data["data"]["cn"].get("st", {}).get("rt", [])
for ws_item in rt.get("ws", [])
for cw in ws_item.get("cw", [])
]
if words:
current_text[0] = "".join(words) # 用列表传引用,修改全局变量
last_audio_time[0] = time.time()
print(f"🎧 识别中:{current_text[0]}", end="\r")
except Exception as e:
print(f"\n❌ 语音识别消息处理错误:{str(e)}")
def _on_error(self, ws, error, is_processing):
"""原代码9. WebSocket错误处理"""
if not is_processing[0]:
print(f"\n❌ WebSocket连接错误{str(error)}")
def _on_close(self, ws, close_status_code, close_msg, current_text, final_result, stream):
"""原代码9. WebSocket关闭处理"""
print(f"\n🔌 WebSocket连接关闭 | 状态码:{close_status_code}")
if stream and stream.active:
stream.stop()
current_text[0] = ""
final_result[0] = ""
def _on_open(self, ws, stream, current_text, final_result, last_audio_time, is_processing, last_command_time, execute_callback):
"""新增 execute_callback 参数,用于接收指令执行函数"""
def send_audio_and_handle():
print("\n🎤 指令已就绪!支持:")
print(" - 运动前进3秒、左转2秒 | - 图像识别:这是什么")
print(" - 闲聊:今天天气怎么样 | - 音量:增大音量、减小音量\n")
stream.start()
current_text[0] = ""
final_result[0] = ""
last_command_time[0] = time.time()
while True:
try:
# 1. 处理音频队列(避免堆积)
while self.audio_q.qsize() > 5:
self.audio_q.get_nowait()
# 2. 发送音频数据(若队列有数据)
audio_data = self.audio_q.get(timeout=0.5)
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
# 3. 指令识别与执行有文本且2秒内无新音频时执行
if current_text[0] and (time.time() - last_audio_time[0]) > 2:
final_result[0] = current_text[0].strip()
if len(final_result[0]) > 0: # 确保指令有效
print(f"\n⏹ 最终指令:{final_result[0]}")
# 调用回调函数执行指令(关键修复:直接在这里执行)
execute_callback(final_result[0])
last_command_time[0] = time.time() # 更新最后操作时间
current_text[0] = "" # 执行后清空,避免重复识别
final_result[0] = ""
time.sleep(1) # 等待指令执行完成
# 4. 超时检测30秒无操作则关闭连接
if time.time() - last_command_time[0] > self.INTERACTION_TIMEOUT:
print(f"\n{self.INTERACTION_TIMEOUT}秒无操作,关闭连接")
self.tts_controller.speak(self.FEEDBACK_TEXT.get("wakeup_timeout", "长时间没操作,我先休息啦"))
time.sleep(1)
ws.send("close", websocket.ABNF.OPCODE_TEXT)
break
except queue.Empty:
# 队列为空时,检测超时
if time.time() - last_command_time[0] > self.INTERACTION_TIMEOUT:
print(f"\n{self.INTERACTION_TIMEOUT}秒无操作,关闭连接")
self.tts_controller.speak(self.FEEDBACK_TEXT.get("wakeup_timeout", "长时间没操作,我先休息啦"))
time.sleep(1)
ws.send("close", websocket.ABNF.OPCODE_TEXT)
break
continue # 继续循环等待音频
except Exception as e:
print(f"\n❌ 音频发送错误:{str(e)}")
break
audio_thread = threading.Thread(target=send_audio_and_handle, daemon=True)
audio_thread.start()
def start_websocket(self, current_text, final_result, last_audio_time, is_processing, last_command_time, execute_callback):
"""新增 execute_callback 参数,用于传递指令执行函数"""
self.stream = sd.RawInputStream(
samplerate=self.SAMPLE_RATE,
channels=self.CHANNELS,
dtype=self.SAMPLE_FORMAT,
callback=self._audio_callback,
)
ws_url, session_id = self._create_ws_url()
if not ws_url:
print("⚠️ 无法生成语音识别连接3秒后重新监听...")
time.sleep(3)
return
try:
print(f"🔄 连接语音识别服务会话ID{session_id[:8]}...")
# 绑定WebSocket回调时传入 execute_callback
ws = websocket.WebSocketApp(
ws_url,
on_open=lambda ws: self._on_open(ws, self.stream, current_text, final_result, last_audio_time, is_processing, last_command_time, execute_callback),
on_message=lambda ws, msg: self._on_message(ws, msg, current_text, last_audio_time),
on_error=lambda ws, err: self._on_error(ws, err, is_processing),
on_close=lambda ws, status, msg: self._on_close(ws, status, msg, current_text, final_result, self.stream)
)
ws.run_forever(ping_interval=10, ping_timeout=5)
except Exception as e:
print(f"❌ 语音识别连接失败:{str(e)}")
print("⚠️ 3秒后重新监听唤醒词...")
time.sleep(3)