first commit

This commit is contained in:
“jiawei” 2025-09-29 15:55:31 +08:00
parent 9ec3f5efea
commit 6ec04089f4

269
track_object.py Normal file
View File

@ -0,0 +1,269 @@
import time
import threading
import re
import numpy as np
import sys
import os
import signal
from openai import OpenAI
from picamera2 import Picamera2
import io
from PIL import Image
import base64
# 配置项目路径(根据你的实际路径修改)
PROJECT_ROOT = "/home/duckpi/open_duck_mini_ws/OPEN_DUCK_MINI/Open_Duck_Mini_Runtime-2"
ONNX_MODEL_PATH = "/home/duckpi/open_duck_mini_ws/OPEN_DUCK_MINI/Open_Duck_Mini-2/BEST_WALK_ONNX_2.onnx"
sys.path.append(PROJECT_ROOT)
# 导入运动控制模块
from v2_rl_walk_mujoco import RLWalk
# API配置替换为你的实际密钥
ARK_API_KEY = "390d517c-129a-41c1-bf3d-458048007b69"
ARK_MODEL_ID = "doubao-seed-1-6-250615"
class SimpleTTS:
"""简化的TTS模块仅用于测试反馈"""
def speak(self, text):
print(f"[语音反馈] {text}")
class MotionController:
"""运动控制封装"""
def __init__(self):
try:
self.rl_walk = RLWalk(
onnx_model_path=ONNX_MODEL_PATH,
cutoff_frequency=40,
pid=[30, 0, 0]
)
self.walk_thread = threading.Thread(target=self.rl_walk.run, daemon=True)
self.walk_thread.start()
time.sleep(1)
print("✅ 运动控制模块初始化成功")
except Exception as e:
print(f"❌ 运动控制初始化失败:{str(e)}")
sys.exit(1)
def execute_motion(self, action_name: str, seconds: float):
"""执行指定动作"""
try:
if action_name == "move_forward":
print(f"🚶 前进{seconds}秒...")
self.rl_walk.last_commands[0] = 0.17
elif action_name == "move_backward":
print(f"🚶 后退{seconds}秒...")
self.rl_walk.last_commands[0] = -0.17
elif action_name == "turn_left":
print(f"🔄 左转{seconds}秒...")
self.rl_walk.last_commands[2] = 1.1
elif action_name == "turn_right":
print(f"🔄 右转{seconds}秒...")
self.rl_walk.last_commands[2] = -1.1
time.sleep(seconds)
self.rl_walk.last_commands = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
print("✅ 动作完成")
except Exception as e:
print(f"❌ 动作执行失败:{str(e)}")
self.rl_walk.last_commands = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
class CameraModule:
"""摄像头模块封装"""
def __init__(self):
try:
self.camera = Picamera2()
cam_config = self.camera.create_still_configuration(main={"size": (320, 240)})
self.camera.configure(cam_config)
self.camera.start()
print("✅ 摄像头模块初始化成功")
except Exception as e:
print(f"❌ 摄像头初始化失败:{str(e)}")
sys.exit(1)
def capture_base64(self):
"""拍摄并返回base64编码图像"""
try:
img_array = self.camera.capture_array()
img_byte = io.BytesIO()
Image.fromarray(img_array).save(img_byte, format="JPEG", quality=80)
return base64.b64encode(img_byte.getvalue()).decode("utf-8")
except Exception as e:
print(f"❌ 拍摄失败:{str(e)}")
return None
class ObjectTracker:
"""物品追踪核心类"""
def __init__(self, target_name="万用表", tts=None, motion=None, camera=None):
self.target_name = target_name
self.tracking_active = False
self.target_lost_count = 0
self.max_lost_count = 5 # 连续丢失次数阈值
self.tts = tts or SimpleTTS()
self.motion = motion or MotionController()
self.camera = camera or CameraModule()
self.client = OpenAI(
base_url="https://ark.cn-beijing.volces.com/api/v3",
api_key=ARK_API_KEY
)
print(f"✅ 追踪器初始化完成,目标:{self.target_name}")
def get_object_position(self, image_base64):
"""获取目标在图像中的位置信息"""
try:
response = self.client.chat.completions.create(
model=ARK_MODEL_ID,
messages=[{
"role": "user",
"content": [
{"type": "image_url", "image_url": f"data:image/jpeg;base64,{image_base64}"},
{"type": "text", "text": (f"请识别图像中的'{self.target_name}',返回其位置信息。"
"格式要求中心X坐标(0-100)中心Y坐标(0-100),宽度占比(0-100),高度占比(0-100)。"
"如果未找到,返回'未找到'")}
]
}]
)
content = response.choices[0].message.content
if "未找到" in content:
return None
# 提取数字信息
nums = re.findall(r"\d+\.?\d*", content)
if len(nums) >= 4:
return {
"center_x": float(nums[0]),
"center_y": float(nums[1]),
"width": float(nums[2]),
"height": float(nums[3])
}
return None
except Exception as e:
print(f"❌ 目标识别出错: {str(e)}")
return None
def track_object(self):
"""追踪主循环"""
self.tts.speak(f"开始追踪{self.target_name}")
self.tracking_active = True
self.target_lost_count = 0
try:
while self.tracking_active:
# 1. 采集图像
image_base64 = self.camera.capture_base64()
if not image_base64:
time.sleep(1)
continue
# 2. 识别目标位置
pos = self.get_object_position(image_base64)
if not pos:
self.target_lost_count += 1
print(f"⚠️ 未找到{self.target_name},已连续丢失{self.target_lost_count}")
if self.target_lost_count >= self.max_lost_count:
self.tts.speak(f"已丢失{self.target_name},停止追踪")
self.stop_tracking()
time.sleep(1)
continue
# 重置丢失计数
self.target_lost_count = 0
print(f"🎯 发现{self.target_name} - 中心X: {pos['center_x']}, 宽度占比: {pos['width']}")
# 3. 根据位置控制移动
self.control_movement(pos)
time.sleep(1.5) # 控制追踪频率
except Exception as e:
print(f"❌ 追踪过程出错: {str(e)}")
self.stop_tracking()
def control_movement(self, pos):
"""根据目标位置控制移动"""
# 横向位置控制 (左右转向)
if pos["center_x"] < 35: # 目标偏左
self.tts.speak("目标在左边,向左转")
self.motion.execute_motion("turn_left", 0.8)
elif pos["center_x"] > 65: # 目标偏右
self.tts.speak("目标在右边,向右转")
self.motion.execute_motion("turn_right", 0.8)
# 距离控制 (前进后退)
if pos["width"] < 20: # 目标过小,距离过远
self.tts.speak("距离目标较远,前进")
self.motion.execute_motion("move_forward", 1.5)
elif pos["width"] > 40: # 目标过大,距离过近
self.tts.speak("距离目标过近,后退")
self.motion.execute_motion("move_backward", 1)
else:
self.tts.speak("已对准目标,保持位置")
def start_tracking(self):
"""启动追踪线程"""
if not self.tracking_active:
threading.Thread(target=self.track_object, daemon=True).start()
def stop_tracking(self):
"""停止追踪"""
self.tracking_active = False
print(f"🛑 停止追踪{self.target_name}")
def main():
# 初始化组件
tts = SimpleTTS()
motion = MotionController()
camera = CameraModule()
tracker = ObjectTracker(
target_name="万用表", # 可修改为其他目标,如"水杯"、"书本"
tts=tts,
motion=motion,
camera=camera
)
# 信号处理(优雅退出)
def handle_interrupt(signum, frame):
print("\n🛑 收到退出信号,正在停止...")
tracker.stop_tracking()
motion.rl_walk.last_commands = [0.0, 0.0, 0.0] # 停止运动
camera.camera.stop() # 关闭摄像头
print("✅ 所有资源已释放,程序退出")
sys.exit(0)
signal.signal(signal.SIGINT, handle_interrupt)
# 交互提示
print("\n===== 物品追踪测试程序 =====")
print("操作说明:")
print(" s - 开始追踪目标")
print(" t - 停止追踪")
print(" q - 退出程序")
print("===========================")
# 主交互循环
while True:
cmd = input("请输入指令: ").strip().lower()
if cmd == 's':
if not tracker.tracking_active:
print("▶️ 开始追踪...")
tracker.start_tracking()
else:
print("⚠️ 已经在追踪中")
elif cmd == 't':
if tracker.tracking_active:
tracker.stop_tracking()
print("⏹️ 已停止追踪")
else:
print("⚠️ 没有正在进行的追踪")
elif cmd == 'q':
handle_interrupt(None, None)
else:
print("❓ 未知指令,请输入 s/t/q")
if __name__ == "__main__":
main()