diff --git a/track_object.py b/track_object.py new file mode 100644 index 0000000..7f17707 --- /dev/null +++ b/track_object.py @@ -0,0 +1,269 @@ +import time +import threading +import re +import numpy as np +import sys +import os +import signal +from openai import OpenAI +from picamera2 import Picamera2 +import io +from PIL import Image +import base64 + +# 配置项目路径(根据你的实际路径修改) +PROJECT_ROOT = "/home/duckpi/open_duck_mini_ws/OPEN_DUCK_MINI/Open_Duck_Mini_Runtime-2" +ONNX_MODEL_PATH = "/home/duckpi/open_duck_mini_ws/OPEN_DUCK_MINI/Open_Duck_Mini-2/BEST_WALK_ONNX_2.onnx" +sys.path.append(PROJECT_ROOT) + +# 导入运动控制模块 +from v2_rl_walk_mujoco import RLWalk + +# API配置(替换为你的实际密钥) +ARK_API_KEY = "390d517c-129a-41c1-bf3d-458048007b69" +ARK_MODEL_ID = "doubao-seed-1-6-250615" + +class SimpleTTS: + """简化的TTS模块,仅用于测试反馈""" + def speak(self, text): + print(f"[语音反馈] {text}") + +class MotionController: + """运动控制封装""" + def __init__(self): + try: + self.rl_walk = RLWalk( + onnx_model_path=ONNX_MODEL_PATH, + cutoff_frequency=40, + pid=[30, 0, 0] + ) + self.walk_thread = threading.Thread(target=self.rl_walk.run, daemon=True) + self.walk_thread.start() + time.sleep(1) + print("✅ 运动控制模块初始化成功") + except Exception as e: + print(f"❌ 运动控制初始化失败:{str(e)}") + sys.exit(1) + + def execute_motion(self, action_name: str, seconds: float): + """执行指定动作""" + try: + if action_name == "move_forward": + print(f"🚶 前进{seconds}秒...") + self.rl_walk.last_commands[0] = 0.17 + elif action_name == "move_backward": + print(f"🚶 后退{seconds}秒...") + self.rl_walk.last_commands[0] = -0.17 + elif action_name == "turn_left": + print(f"🔄 左转{seconds}秒...") + self.rl_walk.last_commands[2] = 1.1 + elif action_name == "turn_right": + print(f"🔄 右转{seconds}秒...") + self.rl_walk.last_commands[2] = -1.1 + + time.sleep(seconds) + self.rl_walk.last_commands = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] + print("✅ 动作完成") + except Exception as e: + print(f"❌ 动作执行失败:{str(e)}") + self.rl_walk.last_commands = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] + +class CameraModule: + """摄像头模块封装""" + def __init__(self): + try: + self.camera = Picamera2() + cam_config = self.camera.create_still_configuration(main={"size": (320, 240)}) + self.camera.configure(cam_config) + self.camera.start() + print("✅ 摄像头模块初始化成功") + except Exception as e: + print(f"❌ 摄像头初始化失败:{str(e)}") + sys.exit(1) + + def capture_base64(self): + """拍摄并返回base64编码图像""" + try: + img_array = self.camera.capture_array() + img_byte = io.BytesIO() + Image.fromarray(img_array).save(img_byte, format="JPEG", quality=80) + return base64.b64encode(img_byte.getvalue()).decode("utf-8") + except Exception as e: + print(f"❌ 拍摄失败:{str(e)}") + return None + +class ObjectTracker: + """物品追踪核心类""" + def __init__(self, target_name="万用表", tts=None, motion=None, camera=None): + self.target_name = target_name + self.tracking_active = False + self.target_lost_count = 0 + self.max_lost_count = 5 # 连续丢失次数阈值 + self.tts = tts or SimpleTTS() + self.motion = motion or MotionController() + self.camera = camera or CameraModule() + self.client = OpenAI( + base_url="https://ark.cn-beijing.volces.com/api/v3", + api_key=ARK_API_KEY + ) + print(f"✅ 追踪器初始化完成,目标:{self.target_name}") + + def get_object_position(self, image_base64): + """获取目标在图像中的位置信息""" + try: + response = self.client.chat.completions.create( + model=ARK_MODEL_ID, + messages=[{ + "role": "user", + "content": [ + {"type": "image_url", "image_url": f"data:image/jpeg;base64,{image_base64}"}, + {"type": "text", "text": (f"请识别图像中的'{self.target_name}',返回其位置信息。" + "格式要求:中心X坐标(0-100),中心Y坐标(0-100),宽度占比(0-100),高度占比(0-100)。" + "如果未找到,返回'未找到'")} + ] + }] + ) + + content = response.choices[0].message.content + if "未找到" in content: + return None + + # 提取数字信息 + nums = re.findall(r"\d+\.?\d*", content) + if len(nums) >= 4: + return { + "center_x": float(nums[0]), + "center_y": float(nums[1]), + "width": float(nums[2]), + "height": float(nums[3]) + } + return None + + except Exception as e: + print(f"❌ 目标识别出错: {str(e)}") + return None + + def track_object(self): + """追踪主循环""" + self.tts.speak(f"开始追踪{self.target_name}") + self.tracking_active = True + self.target_lost_count = 0 + + try: + while self.tracking_active: + # 1. 采集图像 + image_base64 = self.camera.capture_base64() + if not image_base64: + time.sleep(1) + continue + + # 2. 识别目标位置 + pos = self.get_object_position(image_base64) + if not pos: + self.target_lost_count += 1 + print(f"⚠️ 未找到{self.target_name},已连续丢失{self.target_lost_count}次") + + if self.target_lost_count >= self.max_lost_count: + self.tts.speak(f"已丢失{self.target_name},停止追踪") + self.stop_tracking() + time.sleep(1) + continue + + # 重置丢失计数 + self.target_lost_count = 0 + print(f"🎯 发现{self.target_name} - 中心X: {pos['center_x']}, 宽度占比: {pos['width']}") + + # 3. 根据位置控制移动 + self.control_movement(pos) + time.sleep(1.5) # 控制追踪频率 + + except Exception as e: + print(f"❌ 追踪过程出错: {str(e)}") + self.stop_tracking() + + def control_movement(self, pos): + """根据目标位置控制移动""" + # 横向位置控制 (左右转向) + if pos["center_x"] < 35: # 目标偏左 + self.tts.speak("目标在左边,向左转") + self.motion.execute_motion("turn_left", 0.8) + elif pos["center_x"] > 65: # 目标偏右 + self.tts.speak("目标在右边,向右转") + self.motion.execute_motion("turn_right", 0.8) + + # 距离控制 (前进后退) + if pos["width"] < 20: # 目标过小,距离过远 + self.tts.speak("距离目标较远,前进") + self.motion.execute_motion("move_forward", 1.5) + elif pos["width"] > 40: # 目标过大,距离过近 + self.tts.speak("距离目标过近,后退") + self.motion.execute_motion("move_backward", 1) + else: + self.tts.speak("已对准目标,保持位置") + + def start_tracking(self): + """启动追踪线程""" + if not self.tracking_active: + threading.Thread(target=self.track_object, daemon=True).start() + + def stop_tracking(self): + """停止追踪""" + self.tracking_active = False + print(f"🛑 停止追踪{self.target_name}") + + +def main(): + # 初始化组件 + tts = SimpleTTS() + motion = MotionController() + camera = CameraModule() + tracker = ObjectTracker( + target_name="万用表", # 可修改为其他目标,如"水杯"、"书本" + tts=tts, + motion=motion, + camera=camera + ) + + # 信号处理(优雅退出) + def handle_interrupt(signum, frame): + print("\n🛑 收到退出信号,正在停止...") + tracker.stop_tracking() + motion.rl_walk.last_commands = [0.0, 0.0, 0.0] # 停止运动 + camera.camera.stop() # 关闭摄像头 + print("✅ 所有资源已释放,程序退出") + sys.exit(0) + + signal.signal(signal.SIGINT, handle_interrupt) + + # 交互提示 + print("\n===== 物品追踪测试程序 =====") + print("操作说明:") + print(" s - 开始追踪目标") + print(" t - 停止追踪") + print(" q - 退出程序") + print("===========================") + + # 主交互循环 + while True: + cmd = input("请输入指令: ").strip().lower() + if cmd == 's': + if not tracker.tracking_active: + print("▶️ 开始追踪...") + tracker.start_tracking() + else: + print("⚠️ 已经在追踪中") + elif cmd == 't': + if tracker.tracking_active: + tracker.stop_tracking() + print("⏹️ 已停止追踪") + else: + print("⚠️ 没有正在进行的追踪") + elif cmd == 'q': + handle_interrupt(None, None) + else: + print("❓ 未知指令,请输入 s/t/q") + + +if __name__ == "__main__": + + main() \ No newline at end of file