import pyaudio import wave import tempfile import os import requests import time import sys # 原代码3. 百度在线TTS模块完整逻辑 class BaiduOnlineTTS: def __init__(self, api_key, secret_key): """初始化百度在线TTS""" self.api_key = api_key self.secret_key = secret_key self.access_token = None self.token_expires = 0 # 初始化音频播放器 self.audio_player = pyaudio.PyAudio() # TTS配置参数 self.default_options = { 'vol': 5, # 音量(0-15) 'spd': 5, # 语速(0-9) 'pit': 5, # 音调(0-9) 'per': 0 # 发音人(0:女,1:男,3:情感女,4:情感男) } # 获取初始访问令牌 if not self._get_access_token(): raise Exception("无法获取百度API访问令牌,请检查密钥是否正确") def _get_access_token(self): """获取百度API访问令牌""" # 检查令牌是否仍然有效 if self.access_token and time.time() < self.token_expires - 300: return True try: url = f"https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={self.api_key}&client_secret={self.secret_key}" response = requests.get(url) result = response.json() if "access_token" in result: self.access_token = result["access_token"] self.token_expires = time.time() + result["expires_in"] print("✅ 成功获取百度API访问令牌") return True else: print(f"❌ 获取令牌失败: {result}") return False except Exception as e: print(f"❌ 获取令牌时发生错误: {str(e)}") return False def text_to_speech(self, text, options=None, save_path=None): """将文本转换为语音""" # 确保令牌有效 if not self._get_access_token(): return None # 合并配置参数 params = self.default_options.copy() if options: params.update(options) try: # 对文本进行URL编码 encoded_text = requests.utils.quote(text) url = f"https://tsn.baidu.com/text2audio?tex={encoded_text}&lan=zh&cuid=baidu-tts-python&ctp=1&tok={self.access_token}" # 添加合成参数 for key, value in params.items(): url += f"&{key}={value}" # 发送请求 response = requests.get(url) # 检查响应是否为音频数据 if response.headers.get("Content-Type", "").startswith("audio/"): # 保存文件(如果需要) if save_path: with open(save_path, "wb") as f: f.write(response.content) print(f"✅ 音频已保存至: {save_path}") return response.content else: # 解析错误信息 try: error = response.json() print(f"❌ 语音合成失败: {error.get('err_msg', '未知错误')}") except: print(f"❌ 语音合成失败,响应内容: {response.text}") return None except Exception as e: print(f"❌ 语音合成时发生错误: {str(e)}") return None def speak(self, text, options=None): """直接播放文本转换的语音""" # 全局变量由调度脚本传入,此处保留原逻辑调用 from main_scheduler import feedback_playing if feedback_playing: return False feedback_playing = True # 限制文本长度(百度API有长度限制) if len(text) > 1024: print("⚠️ 文本过长,将截断为1024字符") text = text[:1024] # 获取音频数据 audio_data = self.text_to_speech(text, options) if not audio_data: feedback_playing = False return False try: # 创建临时MP3文件 with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file: temp_file.write(audio_data) temp_filename = temp_file.name # 转换为WAV格式(适配pyaudio) from pydub import AudioSegment audio = AudioSegment.from_mp3(temp_filename) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as wav_file: audio.export(wav_file.name, format="wav") wav_filename = wav_file.name # 播放WAV文件 wf = wave.open(wav_filename, 'rb') stream = self.audio_player.open( format=self.audio_player.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True ) # 播放音频 chunk = 1024 data = wf.readframes(chunk) while data and feedback_playing: stream.write(data) data = wf.readframes(chunk) # 清理资源 stream.stop_stream() stream.close() wf.close() print(f"✅ 语音播放完成: {text[:20]}...") return True except Exception as e: print(f"❌ 播放语音时发生错误: {str(e)}") return False finally: # 删除临时文件 if 'temp_filename' in locals() and os.path.exists(temp_filename): os.remove(temp_filename) if 'wav_filename' in locals() and os.path.exists(wav_filename): os.remove(wav_filename) feedback_playing = False def close(self): """释放资源""" self.audio_player.terminate() print("✅ TTS资源已释放")