269 lines
9.8 KiB
Python
269 lines
9.8 KiB
Python
import time
|
||
import threading
|
||
import re
|
||
import numpy as np
|
||
import sys
|
||
import os
|
||
import signal
|
||
from openai import OpenAI
|
||
from picamera2 import Picamera2
|
||
import io
|
||
from PIL import Image
|
||
import base64
|
||
|
||
# 配置项目路径(根据你的实际路径修改)
|
||
PROJECT_ROOT = "/home/duckpi/open_duck_mini_ws/OPEN_DUCK_MINI/Open_Duck_Mini_Runtime-2"
|
||
ONNX_MODEL_PATH = "/home/duckpi/open_duck_mini_ws/OPEN_DUCK_MINI/Open_Duck_Mini-2/BEST_WALK_ONNX_2.onnx"
|
||
sys.path.append(PROJECT_ROOT)
|
||
|
||
# 导入运动控制模块
|
||
from v2_rl_walk_mujoco import RLWalk
|
||
|
||
# API配置(替换为你的实际密钥)
|
||
ARK_API_KEY = "390d517c-129a-41c1-bf3d-458048007b69"
|
||
ARK_MODEL_ID = "doubao-seed-1-6-250615"
|
||
|
||
class SimpleTTS:
|
||
"""简化的TTS模块,仅用于测试反馈"""
|
||
def speak(self, text):
|
||
print(f"[语音反馈] {text}")
|
||
|
||
class MotionController:
|
||
"""运动控制封装"""
|
||
def __init__(self):
|
||
try:
|
||
self.rl_walk = RLWalk(
|
||
onnx_model_path=ONNX_MODEL_PATH,
|
||
cutoff_frequency=40,
|
||
pid=[30, 0, 0]
|
||
)
|
||
self.walk_thread = threading.Thread(target=self.rl_walk.run, daemon=True)
|
||
self.walk_thread.start()
|
||
time.sleep(1)
|
||
print("✅ 运动控制模块初始化成功")
|
||
except Exception as e:
|
||
print(f"❌ 运动控制初始化失败:{str(e)}")
|
||
sys.exit(1)
|
||
|
||
def execute_motion(self, action_name: str, seconds: float):
|
||
"""执行指定动作"""
|
||
try:
|
||
if action_name == "move_forward":
|
||
print(f"🚶 前进{seconds}秒...")
|
||
self.rl_walk.last_commands[0] = 0.17
|
||
elif action_name == "move_backward":
|
||
print(f"🚶 后退{seconds}秒...")
|
||
self.rl_walk.last_commands[0] = -0.17
|
||
elif action_name == "turn_left":
|
||
print(f"🔄 左转{seconds}秒...")
|
||
self.rl_walk.last_commands[2] = 1.1
|
||
elif action_name == "turn_right":
|
||
print(f"🔄 右转{seconds}秒...")
|
||
self.rl_walk.last_commands[2] = -1.1
|
||
|
||
time.sleep(seconds)
|
||
self.rl_walk.last_commands = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||
print("✅ 动作完成")
|
||
except Exception as e:
|
||
print(f"❌ 动作执行失败:{str(e)}")
|
||
self.rl_walk.last_commands = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||
|
||
class CameraModule:
|
||
"""摄像头模块封装"""
|
||
def __init__(self):
|
||
try:
|
||
self.camera = Picamera2()
|
||
cam_config = self.camera.create_still_configuration(main={"size": (320, 240)})
|
||
self.camera.configure(cam_config)
|
||
self.camera.start()
|
||
print("✅ 摄像头模块初始化成功")
|
||
except Exception as e:
|
||
print(f"❌ 摄像头初始化失败:{str(e)}")
|
||
sys.exit(1)
|
||
|
||
def capture_base64(self):
|
||
"""拍摄并返回base64编码图像"""
|
||
try:
|
||
img_array = self.camera.capture_array()
|
||
img_byte = io.BytesIO()
|
||
Image.fromarray(img_array).save(img_byte, format="JPEG", quality=80)
|
||
return base64.b64encode(img_byte.getvalue()).decode("utf-8")
|
||
except Exception as e:
|
||
print(f"❌ 拍摄失败:{str(e)}")
|
||
return None
|
||
|
||
class ObjectTracker:
|
||
"""物品追踪核心类"""
|
||
def __init__(self, target_name="万用表", tts=None, motion=None, camera=None):
|
||
self.target_name = target_name
|
||
self.tracking_active = False
|
||
self.target_lost_count = 0
|
||
self.max_lost_count = 5 # 连续丢失次数阈值
|
||
self.tts = tts or SimpleTTS()
|
||
self.motion = motion or MotionController()
|
||
self.camera = camera or CameraModule()
|
||
self.client = OpenAI(
|
||
base_url="https://ark.cn-beijing.volces.com/api/v3",
|
||
api_key=ARK_API_KEY
|
||
)
|
||
print(f"✅ 追踪器初始化完成,目标:{self.target_name}")
|
||
|
||
def get_object_position(self, image_base64):
|
||
"""获取目标在图像中的位置信息"""
|
||
try:
|
||
response = self.client.chat.completions.create(
|
||
model=ARK_MODEL_ID,
|
||
messages=[{
|
||
"role": "user",
|
||
"content": [
|
||
{"type": "image_url", "image_url": f"data:image/jpeg;base64,{image_base64}"},
|
||
{"type": "text", "text": (f"请识别图像中的'{self.target_name}',返回其位置信息。"
|
||
"格式要求:中心X坐标(0-100),中心Y坐标(0-100),宽度占比(0-100),高度占比(0-100)。"
|
||
"如果未找到,返回'未找到'")}
|
||
]
|
||
}]
|
||
)
|
||
|
||
content = response.choices[0].message.content
|
||
if "未找到" in content:
|
||
return None
|
||
|
||
# 提取数字信息
|
||
nums = re.findall(r"\d+\.?\d*", content)
|
||
if len(nums) >= 4:
|
||
return {
|
||
"center_x": float(nums[0]),
|
||
"center_y": float(nums[1]),
|
||
"width": float(nums[2]),
|
||
"height": float(nums[3])
|
||
}
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"❌ 目标识别出错: {str(e)}")
|
||
return None
|
||
|
||
def track_object(self):
|
||
"""追踪主循环"""
|
||
self.tts.speak(f"开始追踪{self.target_name}")
|
||
self.tracking_active = True
|
||
self.target_lost_count = 0
|
||
|
||
try:
|
||
while self.tracking_active:
|
||
# 1. 采集图像
|
||
image_base64 = self.camera.capture_base64()
|
||
if not image_base64:
|
||
time.sleep(1)
|
||
continue
|
||
|
||
# 2. 识别目标位置
|
||
pos = self.get_object_position(image_base64)
|
||
if not pos:
|
||
self.target_lost_count += 1
|
||
print(f"⚠️ 未找到{self.target_name},已连续丢失{self.target_lost_count}次")
|
||
|
||
if self.target_lost_count >= self.max_lost_count:
|
||
self.tts.speak(f"已丢失{self.target_name},停止追踪")
|
||
self.stop_tracking()
|
||
time.sleep(1)
|
||
continue
|
||
|
||
# 重置丢失计数
|
||
self.target_lost_count = 0
|
||
print(f"🎯 发现{self.target_name} - 中心X: {pos['center_x']}, 宽度占比: {pos['width']}")
|
||
|
||
# 3. 根据位置控制移动
|
||
self.control_movement(pos)
|
||
time.sleep(1.5) # 控制追踪频率
|
||
|
||
except Exception as e:
|
||
print(f"❌ 追踪过程出错: {str(e)}")
|
||
self.stop_tracking()
|
||
|
||
def control_movement(self, pos):
|
||
"""根据目标位置控制移动"""
|
||
# 横向位置控制 (左右转向)
|
||
if pos["center_x"] < 35: # 目标偏左
|
||
self.tts.speak("目标在左边,向左转")
|
||
self.motion.execute_motion("turn_left", 0.8)
|
||
elif pos["center_x"] > 65: # 目标偏右
|
||
self.tts.speak("目标在右边,向右转")
|
||
self.motion.execute_motion("turn_right", 0.8)
|
||
|
||
# 距离控制 (前进后退)
|
||
if pos["width"] < 20: # 目标过小,距离过远
|
||
self.tts.speak("距离目标较远,前进")
|
||
self.motion.execute_motion("move_forward", 1.5)
|
||
elif pos["width"] > 40: # 目标过大,距离过近
|
||
self.tts.speak("距离目标过近,后退")
|
||
self.motion.execute_motion("move_backward", 1)
|
||
else:
|
||
self.tts.speak("已对准目标,保持位置")
|
||
|
||
def start_tracking(self):
|
||
"""启动追踪线程"""
|
||
if not self.tracking_active:
|
||
threading.Thread(target=self.track_object, daemon=True).start()
|
||
|
||
def stop_tracking(self):
|
||
"""停止追踪"""
|
||
self.tracking_active = False
|
||
print(f"🛑 停止追踪{self.target_name}")
|
||
|
||
|
||
def main():
|
||
# 初始化组件
|
||
tts = SimpleTTS()
|
||
motion = MotionController()
|
||
camera = CameraModule()
|
||
tracker = ObjectTracker(
|
||
target_name="万用表", # 可修改为其他目标,如"水杯"、"书本"
|
||
tts=tts,
|
||
motion=motion,
|
||
camera=camera
|
||
)
|
||
|
||
# 信号处理(优雅退出)
|
||
def handle_interrupt(signum, frame):
|
||
print("\n🛑 收到退出信号,正在停止...")
|
||
tracker.stop_tracking()
|
||
motion.rl_walk.last_commands = [0.0, 0.0, 0.0] # 停止运动
|
||
camera.camera.stop() # 关闭摄像头
|
||
print("✅ 所有资源已释放,程序退出")
|
||
sys.exit(0)
|
||
|
||
signal.signal(signal.SIGINT, handle_interrupt)
|
||
|
||
# 交互提示
|
||
print("\n===== 物品追踪测试程序 =====")
|
||
print("操作说明:")
|
||
print(" s - 开始追踪目标")
|
||
print(" t - 停止追踪")
|
||
print(" q - 退出程序")
|
||
print("===========================")
|
||
|
||
# 主交互循环
|
||
while True:
|
||
cmd = input("请输入指令: ").strip().lower()
|
||
if cmd == 's':
|
||
if not tracker.tracking_active:
|
||
print("▶️ 开始追踪...")
|
||
tracker.start_tracking()
|
||
else:
|
||
print("⚠️ 已经在追踪中")
|
||
elif cmd == 't':
|
||
if tracker.tracking_active:
|
||
tracker.stop_tracking()
|
||
print("⏹️ 已停止追踪")
|
||
else:
|
||
print("⚠️ 没有正在进行的追踪")
|
||
elif cmd == 'q':
|
||
handle_interrupt(None, None)
|
||
else:
|
||
print("❓ 未知指令,请输入 s/t/q")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
|
||
main() |