Open_Duck_Mini_Interact/tts_module.py
2025-09-29 09:19:40 +08:00

173 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pyaudio
import wave
import tempfile
import os
import requests
import time
import sys
# 原代码3. 百度在线TTS模块完整逻辑
class BaiduOnlineTTS:
def __init__(self, api_key, secret_key):
"""初始化百度在线TTS"""
self.api_key = api_key
self.secret_key = secret_key
self.access_token = None
self.token_expires = 0
# 初始化音频播放器
self.audio_player = pyaudio.PyAudio()
# TTS配置参数
self.default_options = {
'vol': 5, # 音量(0-15)
'spd': 5, # 语速(0-9)
'pit': 5, # 音调(0-9)
'per': 0 # 发音人(0:女,1:男,3:情感女,4:情感男)
}
# 获取初始访问令牌
if not self._get_access_token():
raise Exception("无法获取百度API访问令牌请检查密钥是否正确")
def _get_access_token(self):
"""获取百度API访问令牌"""
# 检查令牌是否仍然有效
if self.access_token and time.time() < self.token_expires - 300:
return True
try:
url = f"https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={self.api_key}&client_secret={self.secret_key}"
response = requests.get(url)
result = response.json()
if "access_token" in result:
self.access_token = result["access_token"]
self.token_expires = time.time() + result["expires_in"]
print("✅ 成功获取百度API访问令牌")
return True
else:
print(f"❌ 获取令牌失败: {result}")
return False
except Exception as e:
print(f"❌ 获取令牌时发生错误: {str(e)}")
return False
def text_to_speech(self, text, options=None, save_path=None):
"""将文本转换为语音"""
# 确保令牌有效
if not self._get_access_token():
return None
# 合并配置参数
params = self.default_options.copy()
if options:
params.update(options)
try:
# 对文本进行URL编码
encoded_text = requests.utils.quote(text)
url = f"https://tsn.baidu.com/text2audio?tex={encoded_text}&lan=zh&cuid=baidu-tts-python&ctp=1&tok={self.access_token}"
# 添加合成参数
for key, value in params.items():
url += f"&{key}={value}"
# 发送请求
response = requests.get(url)
# 检查响应是否为音频数据
if response.headers.get("Content-Type", "").startswith("audio/"):
# 保存文件(如果需要)
if save_path:
with open(save_path, "wb") as f:
f.write(response.content)
print(f"✅ 音频已保存至: {save_path}")
return response.content
else:
# 解析错误信息
try:
error = response.json()
print(f"❌ 语音合成失败: {error.get('err_msg', '未知错误')}")
except:
print(f"❌ 语音合成失败,响应内容: {response.text}")
return None
except Exception as e:
print(f"❌ 语音合成时发生错误: {str(e)}")
return None
def speak(self, text, options=None):
"""直接播放文本转换的语音"""
# 全局变量由调度脚本传入,此处保留原逻辑调用
from main_scheduler import feedback_playing
if feedback_playing:
return False
feedback_playing = True
# 限制文本长度(百度API有长度限制)
if len(text) > 1024:
print("⚠️ 文本过长将截断为1024字符")
text = text[:1024]
# 获取音频数据
audio_data = self.text_to_speech(text, options)
if not audio_data:
feedback_playing = False
return False
try:
# 创建临时MP3文件
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file:
temp_file.write(audio_data)
temp_filename = temp_file.name
# 转换为WAV格式(适配pyaudio)
from pydub import AudioSegment
audio = AudioSegment.from_mp3(temp_filename)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as wav_file:
audio.export(wav_file.name, format="wav")
wav_filename = wav_file.name
# 播放WAV文件
wf = wave.open(wav_filename, 'rb')
stream = self.audio_player.open(
format=self.audio_player.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True
)
# 播放音频
chunk = 1024
data = wf.readframes(chunk)
while data and feedback_playing:
stream.write(data)
data = wf.readframes(chunk)
# 清理资源
stream.stop_stream()
stream.close()
wf.close()
print(f"✅ 语音播放完成: {text[:20]}...")
return True
except Exception as e:
print(f"❌ 播放语音时发生错误: {str(e)}")
return False
finally:
# 删除临时文件
if 'temp_filename' in locals() and os.path.exists(temp_filename):
os.remove(temp_filename)
if 'wav_filename' in locals() and os.path.exists(wav_filename):
os.remove(wav_filename)
feedback_playing = False
def close(self):
"""释放资源"""
self.audio_player.terminate()
print("✅ TTS资源已释放")