173 lines
6.1 KiB
Python
173 lines
6.1 KiB
Python
import pyaudio
|
||
import wave
|
||
import tempfile
|
||
import os
|
||
import requests
|
||
import time
|
||
import sys
|
||
|
||
# 原代码3. 百度在线TTS模块完整逻辑
|
||
class BaiduOnlineTTS:
|
||
def __init__(self, api_key, secret_key):
|
||
"""初始化百度在线TTS"""
|
||
self.api_key = api_key
|
||
self.secret_key = secret_key
|
||
self.access_token = None
|
||
self.token_expires = 0
|
||
|
||
# 初始化音频播放器
|
||
self.audio_player = pyaudio.PyAudio()
|
||
|
||
# TTS配置参数
|
||
self.default_options = {
|
||
'vol': 5, # 音量(0-15)
|
||
'spd': 5, # 语速(0-9)
|
||
'pit': 5, # 音调(0-9)
|
||
'per': 0 # 发音人(0:女,1:男,3:情感女,4:情感男)
|
||
}
|
||
|
||
# 获取初始访问令牌
|
||
if not self._get_access_token():
|
||
raise Exception("无法获取百度API访问令牌,请检查密钥是否正确")
|
||
|
||
def _get_access_token(self):
|
||
"""获取百度API访问令牌"""
|
||
# 检查令牌是否仍然有效
|
||
if self.access_token and time.time() < self.token_expires - 300:
|
||
return True
|
||
|
||
try:
|
||
url = f"https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={self.api_key}&client_secret={self.secret_key}"
|
||
response = requests.get(url)
|
||
result = response.json()
|
||
|
||
if "access_token" in result:
|
||
self.access_token = result["access_token"]
|
||
self.token_expires = time.time() + result["expires_in"]
|
||
print("✅ 成功获取百度API访问令牌")
|
||
return True
|
||
else:
|
||
print(f"❌ 获取令牌失败: {result}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"❌ 获取令牌时发生错误: {str(e)}")
|
||
return False
|
||
|
||
def text_to_speech(self, text, options=None, save_path=None):
|
||
"""将文本转换为语音"""
|
||
# 确保令牌有效
|
||
if not self._get_access_token():
|
||
return None
|
||
|
||
# 合并配置参数
|
||
params = self.default_options.copy()
|
||
if options:
|
||
params.update(options)
|
||
|
||
try:
|
||
# 对文本进行URL编码
|
||
encoded_text = requests.utils.quote(text)
|
||
url = f"https://tsn.baidu.com/text2audio?tex={encoded_text}&lan=zh&cuid=baidu-tts-python&ctp=1&tok={self.access_token}"
|
||
|
||
# 添加合成参数
|
||
for key, value in params.items():
|
||
url += f"&{key}={value}"
|
||
|
||
# 发送请求
|
||
response = requests.get(url)
|
||
|
||
# 检查响应是否为音频数据
|
||
if response.headers.get("Content-Type", "").startswith("audio/"):
|
||
# 保存文件(如果需要)
|
||
if save_path:
|
||
with open(save_path, "wb") as f:
|
||
f.write(response.content)
|
||
print(f"✅ 音频已保存至: {save_path}")
|
||
|
||
return response.content
|
||
else:
|
||
# 解析错误信息
|
||
try:
|
||
error = response.json()
|
||
print(f"❌ 语音合成失败: {error.get('err_msg', '未知错误')}")
|
||
except:
|
||
print(f"❌ 语音合成失败,响应内容: {response.text}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"❌ 语音合成时发生错误: {str(e)}")
|
||
return None
|
||
|
||
def speak(self, text, options=None):
|
||
"""直接播放文本转换的语音"""
|
||
# 全局变量由调度脚本传入,此处保留原逻辑调用
|
||
from main_scheduler import feedback_playing
|
||
if feedback_playing:
|
||
return False
|
||
|
||
feedback_playing = True
|
||
|
||
# 限制文本长度(百度API有长度限制)
|
||
if len(text) > 1024:
|
||
print("⚠️ 文本过长,将截断为1024字符")
|
||
text = text[:1024]
|
||
|
||
# 获取音频数据
|
||
audio_data = self.text_to_speech(text, options)
|
||
if not audio_data:
|
||
feedback_playing = False
|
||
return False
|
||
|
||
try:
|
||
# 创建临时MP3文件
|
||
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file:
|
||
temp_file.write(audio_data)
|
||
temp_filename = temp_file.name
|
||
|
||
# 转换为WAV格式(适配pyaudio)
|
||
from pydub import AudioSegment
|
||
audio = AudioSegment.from_mp3(temp_filename)
|
||
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as wav_file:
|
||
audio.export(wav_file.name, format="wav")
|
||
wav_filename = wav_file.name
|
||
|
||
# 播放WAV文件
|
||
wf = wave.open(wav_filename, 'rb')
|
||
stream = self.audio_player.open(
|
||
format=self.audio_player.get_format_from_width(wf.getsampwidth()),
|
||
channels=wf.getnchannels(),
|
||
rate=wf.getframerate(),
|
||
output=True
|
||
)
|
||
|
||
# 播放音频
|
||
chunk = 1024
|
||
data = wf.readframes(chunk)
|
||
while data and feedback_playing:
|
||
stream.write(data)
|
||
data = wf.readframes(chunk)
|
||
|
||
# 清理资源
|
||
stream.stop_stream()
|
||
stream.close()
|
||
wf.close()
|
||
|
||
print(f"✅ 语音播放完成: {text[:20]}...")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 播放语音时发生错误: {str(e)}")
|
||
return False
|
||
|
||
finally:
|
||
# 删除临时文件
|
||
if 'temp_filename' in locals() and os.path.exists(temp_filename):
|
||
os.remove(temp_filename)
|
||
if 'wav_filename' in locals() and os.path.exists(wav_filename):
|
||
os.remove(wav_filename)
|
||
feedback_playing = False
|
||
|
||
def close(self):
|
||
"""释放资源"""
|
||
self.audio_player.terminate()
|
||
print("✅ TTS资源已释放") |