| 
									
										
										
										
											2025-09-29 09:19:40 +08:00
										 |  |  |  | import pyaudio | 
					
						
							|  |  |  |  | import wave | 
					
						
							|  |  |  |  | import tempfile | 
					
						
							|  |  |  |  | import os | 
					
						
							|  |  |  |  | import requests | 
					
						
							|  |  |  |  | import time | 
					
						
							|  |  |  |  | import sys | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | # 原代码3. 百度在线TTS模块完整逻辑 | 
					
						
							|  |  |  |  | class BaiduOnlineTTS: | 
					
						
							|  |  |  |  |     def __init__(self, api_key, secret_key): | 
					
						
							|  |  |  |  |         """初始化百度在线TTS""" | 
					
						
							|  |  |  |  |         self.api_key = api_key | 
					
						
							|  |  |  |  |         self.secret_key = secret_key | 
					
						
							|  |  |  |  |         self.access_token = None | 
					
						
							|  |  |  |  |         self.token_expires = 0 | 
					
						
							|  |  |  |  |          | 
					
						
							|  |  |  |  |         # 初始化音频播放器 | 
					
						
							|  |  |  |  |         self.audio_player = pyaudio.PyAudio() | 
					
						
							|  |  |  |  |          | 
					
						
							|  |  |  |  |         # TTS配置参数 | 
					
						
							|  |  |  |  |         self.default_options = { | 
					
						
							|  |  |  |  |             'vol': 5,    # 音量(0-15) | 
					
						
							|  |  |  |  |             'spd': 5,    # 语速(0-9) | 
					
						
							|  |  |  |  |             'pit': 5,    # 音调(0-9) | 
					
						
							|  |  |  |  |             'per': 0     # 发音人(0:女,1:男,3:情感女,4:情感男) | 
					
						
							|  |  |  |  |         } | 
					
						
							|  |  |  |  |          | 
					
						
							|  |  |  |  |         # 获取初始访问令牌 | 
					
						
							|  |  |  |  |         if not self._get_access_token(): | 
					
						
							|  |  |  |  |             raise Exception("无法获取百度API访问令牌,请检查密钥是否正确") | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     def _get_access_token(self): | 
					
						
							|  |  |  |  |         """获取百度API访问令牌""" | 
					
						
							|  |  |  |  |         # 检查令牌是否仍然有效 | 
					
						
							|  |  |  |  |         if self.access_token and time.time() < self.token_expires - 300: | 
					
						
							|  |  |  |  |             return True | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |         try: | 
					
						
							|  |  |  |  |             url = f"https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={self.api_key}&client_secret={self.secret_key}" | 
					
						
							|  |  |  |  |             response = requests.get(url) | 
					
						
							|  |  |  |  |             result = response.json() | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |             if "access_token" in result: | 
					
						
							|  |  |  |  |                 self.access_token = result["access_token"] | 
					
						
							|  |  |  |  |                 self.token_expires = time.time() + result["expires_in"] | 
					
						
							|  |  |  |  |                 print("✅ 成功获取百度API访问令牌") | 
					
						
							|  |  |  |  |                 return True | 
					
						
							|  |  |  |  |             else: | 
					
						
							|  |  |  |  |                 print(f"❌ 获取令牌失败: {result}") | 
					
						
							|  |  |  |  |                 return False | 
					
						
							|  |  |  |  |         except Exception as e: | 
					
						
							|  |  |  |  |             print(f"❌ 获取令牌时发生错误: {str(e)}") | 
					
						
							|  |  |  |  |             return False | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     def text_to_speech(self, text, options=None, save_path=None): | 
					
						
							|  |  |  |  |         """将文本转换为语音""" | 
					
						
							|  |  |  |  |         # 确保令牌有效 | 
					
						
							|  |  |  |  |         if not self._get_access_token(): | 
					
						
							|  |  |  |  |             return None | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |         # 合并配置参数 | 
					
						
							|  |  |  |  |         params = self.default_options.copy() | 
					
						
							|  |  |  |  |         if options: | 
					
						
							|  |  |  |  |             params.update(options) | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |         try: | 
					
						
							|  |  |  |  |             # 对文本进行URL编码 | 
					
						
							|  |  |  |  |             encoded_text = requests.utils.quote(text) | 
					
						
							|  |  |  |  |             url = f"https://tsn.baidu.com/text2audio?tex={encoded_text}&lan=zh&cuid=baidu-tts-python&ctp=1&tok={self.access_token}" | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |             # 添加合成参数 | 
					
						
							|  |  |  |  |             for key, value in params.items(): | 
					
						
							|  |  |  |  |                 url += f"&{key}={value}" | 
					
						
							|  |  |  |  |                  | 
					
						
							|  |  |  |  |             # 发送请求 | 
					
						
							|  |  |  |  |             response = requests.get(url) | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |             # 检查响应是否为音频数据 | 
					
						
							|  |  |  |  |             if response.headers.get("Content-Type", "").startswith("audio/"): | 
					
						
							|  |  |  |  |                 # 保存文件(如果需要) | 
					
						
							|  |  |  |  |                 if save_path: | 
					
						
							|  |  |  |  |                     with open(save_path, "wb") as f: | 
					
						
							|  |  |  |  |                         f.write(response.content) | 
					
						
							|  |  |  |  |                     print(f"✅ 音频已保存至: {save_path}") | 
					
						
							|  |  |  |  |                  | 
					
						
							|  |  |  |  |                 return response.content | 
					
						
							|  |  |  |  |             else: | 
					
						
							|  |  |  |  |                 # 解析错误信息 | 
					
						
							|  |  |  |  |                 try: | 
					
						
							|  |  |  |  |                     error = response.json() | 
					
						
							|  |  |  |  |                     print(f"❌ 语音合成失败: {error.get('err_msg', '未知错误')}") | 
					
						
							|  |  |  |  |                 except: | 
					
						
							|  |  |  |  |                     print(f"❌ 语音合成失败,响应内容: {response.text}") | 
					
						
							|  |  |  |  |                 return None | 
					
						
							|  |  |  |  |                  | 
					
						
							|  |  |  |  |         except Exception as e: | 
					
						
							|  |  |  |  |             print(f"❌ 语音合成时发生错误: {str(e)}") | 
					
						
							|  |  |  |  |             return None | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     def speak(self, text, options=None): | 
					
						
							|  |  |  |  |         """直接播放文本转换的语音""" | 
					
						
							|  |  |  |  |         # 全局变量由调度脚本传入,此处保留原逻辑调用 | 
					
						
							|  |  |  |  |         from main_scheduler import feedback_playing | 
					
						
							|  |  |  |  |         if feedback_playing: | 
					
						
							|  |  |  |  |             return False | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |         feedback_playing = True | 
					
						
							|  |  |  |  |          | 
					
						
							|  |  |  |  |         # 限制文本长度(百度API有长度限制) | 
					
						
							|  |  |  |  |         if len(text) > 1024: | 
					
						
							|  |  |  |  |             print("⚠️ 文本过长,将截断为1024字符") | 
					
						
							|  |  |  |  |             text = text[:1024] | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |         # 获取音频数据 | 
					
						
							|  |  |  |  |         audio_data = self.text_to_speech(text, options) | 
					
						
							|  |  |  |  |         if not audio_data: | 
					
						
							|  |  |  |  |             feedback_playing = False | 
					
						
							|  |  |  |  |             return False | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |         try: | 
					
						
							|  |  |  |  |             # 创建临时MP3文件 | 
					
						
							|  |  |  |  |             with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file: | 
					
						
							|  |  |  |  |                 temp_file.write(audio_data) | 
					
						
							|  |  |  |  |                 temp_filename = temp_file.name | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |             # 转换为WAV格式(适配pyaudio) | 
					
						
							|  |  |  |  |             from pydub import AudioSegment | 
					
						
							|  |  |  |  |             audio = AudioSegment.from_mp3(temp_filename) | 
					
						
							|  |  |  |  |             with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as wav_file: | 
					
						
							|  |  |  |  |                 audio.export(wav_file.name, format="wav") | 
					
						
							|  |  |  |  |                 wav_filename = wav_file.name | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |             # 播放WAV文件 | 
					
						
							|  |  |  |  |             wf = wave.open(wav_filename, 'rb') | 
					
						
							|  |  |  |  |             stream = self.audio_player.open( | 
					
						
							|  |  |  |  |                 format=self.audio_player.get_format_from_width(wf.getsampwidth()), | 
					
						
							|  |  |  |  |                 channels=wf.getnchannels(), | 
					
						
							|  |  |  |  |                 rate=wf.getframerate(), | 
					
						
							|  |  |  |  |                 output=True | 
					
						
							|  |  |  |  |             ) | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |             # 播放音频 | 
					
						
							|  |  |  |  |             chunk = 1024 | 
					
						
							|  |  |  |  |             data = wf.readframes(chunk) | 
					
						
							|  |  |  |  |             while data and feedback_playing: | 
					
						
							|  |  |  |  |                 stream.write(data) | 
					
						
							|  |  |  |  |                 data = wf.readframes(chunk) | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |             # 清理资源 | 
					
						
							|  |  |  |  |             stream.stop_stream() | 
					
						
							|  |  |  |  |             stream.close() | 
					
						
							|  |  |  |  |             wf.close() | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |             print(f"✅ 语音播放完成: {text[:20]}...") | 
					
						
							|  |  |  |  |             return True | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |         except Exception as e: | 
					
						
							|  |  |  |  |             print(f"❌ 播放语音时发生错误: {str(e)}") | 
					
						
							|  |  |  |  |             return False | 
					
						
							|  |  |  |  |              | 
					
						
							|  |  |  |  |         finally: | 
					
						
							|  |  |  |  |             # 删除临时文件 | 
					
						
							|  |  |  |  |             if 'temp_filename' in locals() and os.path.exists(temp_filename): | 
					
						
							|  |  |  |  |                 os.remove(temp_filename) | 
					
						
							|  |  |  |  |             if 'wav_filename' in locals() and os.path.exists(wav_filename): | 
					
						
							|  |  |  |  |                 os.remove(wav_filename) | 
					
						
							|  |  |  |  |             feedback_playing = False | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     def close(self): | 
					
						
							|  |  |  |  |         """释放资源""" | 
					
						
							|  |  |  |  |         self.audio_player.terminate() | 
					
						
							|  |  |  |  |         print("✅ TTS资源已释放") |