前言
因为我司「七月在线」关于dexcap的复现/优化接近尾声了,故准备把dexcap的源码也分析下。下周则分析下iDP3的源码——为队伍「iDP3人形的复现/优化」助力
最开始,dexcap的源码分析属于此文《DexCap——斯坦福李飞飞团队泡茶机器人:带灵巧手和动作捕捉的数据收集系统(含硬件清单)》的第三部分
然原理讲解、硬件配置、源码解析都放在一篇文章里的话,篇幅会显得特别长,故把源码的部分抽取出来,独立成此文
第一部分 STEP1_collect_data
1.1 calculate_offset_vis_calib.py
这段代码是一个用于校准和保存偏移量、方向偏移量的脚本,用于从指定目录中读取数据,计算偏移量和方向偏移量,并将结果保存到指定目录中,以下是代码的部分解释
- 导入库
import argparse # 用于解析命令行参数 import numpy as np # 用于数值计算 import os # 用于操作系统相关功能 import sys # 用于系统相关功能 from scipy.spatial.transform import Rotation as R # 用于旋转矩阵和欧拉角转换 from transforms3d.euler import euler2mat # 用于将欧拉角转换为旋转矩阵
- 主函数
if __name__ == "__main__": parser = argparse.ArgumentParser(description="calibrate and save in dataset folder") # 创建命令行参数解析器 parser.add_argument("--directory", type=str, default="", help="Directory with saved data") # 添加目录参数 parser.add_argument("--default", type=str, default="default_offset", help="Directory with saved data") # 添加默认目录参数 args = parser.parse_args() # 解析命令行参数 if os.path.exists("{}/calib_offset.txt".format(args.directory)): # 检查目标目录中是否已经存在calib_offset.txt文件 response = ( input( f"calib_offset.txt already exists. Do you want to override? (y/n): " # 提示用户是否覆盖文件 ) .strip() .lower() ) if response != "y": # 如果用户选择不覆盖,退出程序 print("Exiting program without overriding the existing directory.") sys.exit()
- 读取默认偏移量和方向偏移量
default_offset = np.loadtxt(os.path.join(args.default, "calib_offset.txt")) # 读取默认偏移量 default_ori = np.loadtxt(os.path.join(args.default, "calib_ori_offset.txt")) # 读取默认方向偏移量 default_offset_left = np.loadtxt(os.path.join(args.default, "calib_offset_left.txt")) # 读取左手默认偏移量 default_ori_left = np.loadtxt(os.path.join(args.default, "calib_ori_offset_left.txt")) # 读取左手默认方向偏移量 default_ori_matrix = euler2mat(*default_ori) # 将默认方向偏移量转换为旋转矩阵 default_ori_matrix_left = euler2mat(*default_ori_left) # 将左手默认方向偏移量转换为旋转矩阵
- 提取偏移量和方向偏移量
frame_dirs = os.listdir("./test_data") # 列出test_data目录中的所有文件 list = [] # 存储偏移量的列表 list_ori = [] # 存储方向偏移量的列表 list_left = [] # 存储左手偏移量的列表 list_ori_left = [] # 存储左手方向偏移量的列表 for frame_dir in frame_dirs: #
1.2 data_recording.py——配置多个Realsense T265相机和一个L515相机
这段代码是一个用于记录和处理Realsense相机数据的Python脚本
顺带再回顾一下
DexCap团队设计了一个装载摄像机的背包「如图(a)、(b)所示,为方便大家对照,特把上图再贴一下,如下」
- 在正前面,它通过胸部摄像机支架的4个插槽集成了4个相机,最顶部是一台Intel Realsense L515 RGB-D LiDAR摄像机,顶部下面是3个Realsense T265鱼眼SLAM跟踪相机(分别为红、绿、蓝),用于在人类数据收集过程中捕捉观察结果
1.2.1 一系列定义:比如保存帧数据的函数
它可以捕捉颜色图像、深度图像、姿态数据以及手部关节数据,并将这些数据保存到指定的目录中
- 首先,导入库
""" 示例用法 python data_recording.py -s --store_hand -o ./save_data_scenario_1 """ import argparse # 用于解析命令行参数 import copy # 用于复制对象 import numpy as np # 用于数值计算 import open3d as o3d # 用于3D数据处理 import os # 用于操作系统相关功能 import shutil # 用于文件操作 import sys # 用于系统相关功能 import pyrealsense2 as rs # 用于Realsense相机操作 import cv2 # 用于图像处理 from enum import IntEnum # 用于定义枚举类型 from realsense_helper import get_profiles # 导入自定义的Realsense帮助函数 from transforms3d.quaternions import axangle2quat, qmult, quat2mat, mat2quat # 用于四元数操作 import redis # 用于Redis数据库操作 import concurrent.futures # 用于并发执行 from hyperparameters import* # 导入超参数
- 其次,定义一个枚举类型 Preset,用于表示不同的预设配置
class Preset(IntEnum): Custom = 0 Default = 1 Hand = 2 HighAccuracy = 3 HighDensity = 4 MediumDensity = 5
- 然后,保存帧数据的函数
def save_frame( frame_id, out_directory, color_buffer, depth_buffer, pose_buffer, pose2_buffer, pose3_buffer, rightHandJoint_buffer, leftHandJoint_buffer, rightHandJointOri_buffer, leftHandJointOri_buffer, save_hand, ): frame_directory = os.path.join(out_directory, f"frame_{frame_id}") # 创建帧目录 os.makedirs(frame_directory, exist_ok=True) # 如果目录不存在则创建 cv2.imwrite( os.path.join(frame_directory, "color_image.jpg"), color_buffer[frame_id][:, :, ::-1], # 保存颜色图像 ) cv2.imwrite( os.path.join(frame_directory, "depth_image.png"), depth_buffer[frame_id] # 保存深度图像 ) np.savetxt(os.path.join(frame_directory, "pose.txt"), pose_buffer[frame_id]) # 保存姿态数据 np.savetxt(os.path.join(frame_directory, "pose_2.txt"), pose2_buffer[frame_id]) # 保存第二个姿态数据 np.savetxt(os.path.join(frame_directory, "pose_3.txt"), pose3_buffer[frame_id]) # 保存第三个姿态数据 if save_hand: # 如果需要保存手部数据 np.savetxt( os.path.join(frame_directory, "right_hand_joint.txt"), rightHandJoint_buffer[frame_id], # 保存右手关节数据 ) np.savetxt( os.path.join(frame_directory, "left_hand_joint.txt"), leftHandJoint_buffer[frame_id], # 保存左手关节数据 ) np.savetxt( os.path.join(frame_directory, "right_hand_joint_ori.txt"), rightHandJointOri_buffer[frame_id], # 保存右手关节方向数据 ) np.savetxt( os.path.join(frame_directory, "left_hand_joint_ori.txt"), leftHandJointOri_buffer[frame_id], # 保存左手关节方向数据 ) return f"frame {frame_id + 1} saved" # 返回保存帧的消息
1.2.2 RealsenseProcessor 的类:获取多个相机的RGB-D帧和位姿数据
接下来,定义了一个名为 RealsenseProcessor 的类,用于处理Realsense相机的数据。它可以配置多个Realsense T265相机和一个L515相机,获取RGB-D帧和位姿数据,并可视化和存储这些数据
以下是对RealsenseProcessor类的详细解读:
- 类定义和初始化
初始化方法接受多个参数,用于配置T265相机的序列号、总帧数、是否存储帧、输出目录、是否保存手部数据和是否启用可视化
初始化各种缓冲区,用于存储彩色图像、 深度图像、位姿数据,和手部关节数据(包含左右两手的关节位置、关节方向)class RealsesneProcessor: # 定义Realsense处理器类 def __init__( # 初始化方法 self, first_t265_serial, # 第一个T265相机的序列号 second_t265_serial, # 第二个T265相机的序列号 thrid_t265_serial, # 第三个T265相机的序列号 total_frame, # 总帧数 store_frame=False, # 是否存储帧,默认为False out_directory=None, # 输出目录,默认为None save_hand=False, # 是否保存手部数据,默认为False enable_visualization=True, # 是否启用可视化,默认为True ): self.first_t265_serial = first_t265_serial # 初始化第一个T265相机的序列号 self.second_t265_serial = second_t265_serial # 初始化第二个T265相机的序列号 self.thrid_t265_serial = thrid_t265_serial # 初始化第三个T265相机的序列号 self.store_frame = store_frame # 初始化是否存储帧 self.out_directory = out_directory # 初始化输出目录 self.total_frame = total_frame # 初始化总帧数 self.save_hand = save_hand # 初始化是否保存手部数据 self.enable_visualization = enable_visualization # 初始化是否启用可视化 self.rds = None # 初始化Redis连接
self.color_buffer = [] # 初始化彩色图像缓冲区 self.depth_buffer = [] # 初始化深度图像缓冲区 self.pose_buffer = [] # 初始化第一个T265相机的位姿缓冲区 self.pose2_buffer = [] # 初始化第二个T265相机的位姿缓冲区 self.pose3_buffer = [] # 初始化第三个T265相机的位姿缓冲区 self.pose2_image_buffer = [] # 初始化第二个T265相机的图像缓冲区 self.pose3_image_buffer = [] # 初始化第三个T265相机的图像缓冲区 self.rightHandJoint_buffer = [] # 初始化右手关节位置缓冲区 self.leftHandJoint_buffer = [] # 初始化左手关节位置缓冲区 self.rightHandJointOri_buffer = [] # 初始化右手关节方向缓冲区 self.leftHandJointOri_buffer = [] # 初始化左手关节方向缓冲区
- 获取T265相机配置
具体方法是根据T265相机的序列号和管道配置,返回一个配置对象def get_rs_t265_config(self, t265_serial, t265_pipeline): t265_config = rs.config() t265_config.enable_device(t265_serial) t265_config.enable_stream(rs.stream.pose) return t265_config
- 配置流
该方法配置并启动多个Realsense相机的流,包括一个L515相机和三个T265相机
如果启用了手部数据保存功能,则连接到Redis服务器
配置并启动L515相机的深度和彩色流def configure_stream(self): # 配置流的方法 # 连接到Redis服务器 if self.save_hand: # 如果启用了手部数据保存功能 self.rds = redis.Redis(host="localhost", port=6669, db=0) # 连接到本地Redis服务器
配置并启动三个T265相机的位姿流,具体而言# 创建一个管道 self.pipeline = rs.pipeline() # 创建一个Realsense管道 config = rs.config() # 创建一个配置对象 color_profiles, depth_profiles = get_profiles() # 获取彩色和深度流的配置文件 w, h, fps, fmt = depth_profiles[1] # 获取深度流的宽度、高度、帧率和格式 config.enable_stream(rs.stream.depth, w, h, fmt, fps) # 启用深度流 w, h, fps, fmt = color_profiles[18] # 获取彩色流的宽度、高度、帧率和格式 config.enable_stream(rs.stream.color, w, h, fmt, fps) # 启用彩色流
先配置
再启动# 配置第一个T265相机的流 ctx = rs.context() # 创建一个Realsense上下文 self.t265_pipeline = rs.pipeline(ctx) # 创建一个T265管道 t265_config = rs.config() # 创建一个T265配置对象 t265_config.enable_device(self.first_t265_serial) # 启用第一个T265相机 # 配置第二个T265相机的流 ctx_2 = rs.context() # 创建另一个Realsense上下文 self.t265_pipeline_2 = rs.pipeline(ctx_2) # 创建第二个T265管道 t265_config_2 = self.get_rs_t265_config( self.second_t265_serial, self.t265_pipeline_2 ) # 获取第二个T265相机的配置 # 配置第三个T265相机的流 ctx_3 = rs.context() # 创建第三个Realsense上下文 self.t265_pipeline_3 = rs.pipeline(ctx_3) # 创建第三个T265管道 t265_config_3 = self.get_rs_t265_config( self.thrid_t265_serial, self.t265_pipeline_3 ) # 获取第三个T265相机的配置
如果启用了可视化功能,则初始化Open3D可视化器self.t265_pipeline.start(t265_config) # 启动第一个T265管道 self.t265_pipeline_2.start(t265_config_2) # 启动第二个T265管道 self.t265_pipeline_3.start(t265_config_3) # 启动第三个T265管道 pipeline_profile = self.pipeline.start(config) # 启动L515管道并获取配置文件 depth_sensor = pipeline_profile.get_device().first_depth_sensor() # 获取深度传感器 depth_sensor.set_option(rs.option.visual_preset, Preset.HighAccuracy) # 设置深度传感器的选项为高精度 self.depth_scale = depth_sensor.get_depth_scale() # 获取深度比例 align_to = rs.stream.color # 对齐到彩色流 self.align = rs.align(align_to) # 创建对齐对象
self.vis = None # 初始化可视化器 if self.enable_visualization: # 如果启用了可视化功能 self.vis = o3d.visualization.Visualizer() # 创建Open3D可视化器 self.vis.create_window() # 创建可视化窗口 self.vis.get_view_control().change_field_of_view(step=1.0) # 改变视野
- 获取RGB-D帧——get_rgbd_frame_from_realsense
该方法从Realsense相机获取RGB-D顾,并将深度帧与彩色帧对齐
如果启用了可视化功能,则将深度图像和彩色图像转换为Open3D的RGBD图像对象def get_rgbd_frame_from_realsense(self, enable_visualization=False): # 从Realsense获取RGB-D帧的方法 frames = self.pipeline.wait_for_frames() # 等待获取帧数据 # 将深度帧对齐到彩色帧 aligned_frames = self.align.process(frames) # 对齐帧数据 # 获取对齐后的帧 aligned_depth_frame = aligned_frames.get_depth_frame() # 获取对齐后的深度帧 color_frame = aligned_frames.get_color_frame() # 获取对齐后的彩色帧 depth_image = ( np.asanyarray(aligned_depth_frame.get_data()) // 4 ) # 将深度帧数据转换为numpy数组,并除以4以获得以米为单位的深度值(适用于L515相机) color_image = np.asanyarray(color_frame.get_data()) # 将彩色帧数据转换为numpy数组
rgbd = None # 初始化RGBD图像对象 if enable_visualization: # 如果启用了可视化功能 depth_image_o3d = o3d.geometry.Image(depth_image) # 将深度图像转换为Open3D图像对象 color_image_o3d = o3d.geometry.Image(color_image) # 将彩色图像转换为Open3D图像对象 rgbd = o3d.geometry.RGBDImage.create_from_color_and_depth( color_image_o3d, # 彩色图像 depth_image_o3d, # 深度图像 depth_trunc=4.0, # 深度截断值,超过此值的深度将被忽略 convert_rgb_to_intensity=False, # 是否将RGB图像转换为强度图像 ) # 创建RGBD图像对象 return rgbd, depth_image, color_image # 返回RGBD图像、深度图像和彩色图像
- 通过frame_to_pose_conversion方法,实现帧到位姿的转换
@staticmethod # 静态方法装饰器 def frame_to_pose_conversion(input_t265_frames): # 定义帧到位姿转换的方法 pose_frame = input_t265_frames.get_pose_frame() # 获取位姿帧 pose_data = pose_frame.get_pose_data() # 获取位姿数据 pose_3x3 = quat2mat( # 将四元数转换为3x3旋转矩阵 np.array( [ pose_data.rotation.w, # 四元数的w分量 pose_data.rotation.x, # 四元数的x分量 pose_data.rotation.y, # 四元数的y分量 pose_data.rotation.z, # 四元数的z分量 ] ) ) pose_4x4 = np.eye(4) # 创建4x4单位矩阵 pose_4x4[:3, :3] = pose_3x3 # 将3x3旋转矩阵赋值给4x4矩阵的左上角 pose_4x4[:3, 3] = [ # 将平移向量赋值给4x4矩阵的右上角 pose_data.translation.x, # 位姿的x平移分量 pose_data.translation.y, # 位姿的y平移分量 pose_data.translation.z, # 位姿的z平移分量 ] return pose_4x4 # 返回4x4位姿矩阵
- 处理帧
该方法处理每一帧数据,首先通过get_rgbd_frame_from_realsense获取三个T265相机的RGB-D帧数据,然后通过frame_to_pose_conversion,把帧数据换成位姿数据
如果启用了手部数据保存功能,则从Redis服务器获取手部关节数据def process_frame(self): # 定义处理帧的方法 frame_count = 0 # 初始化帧计数器 first_frame = True # 标记是否为第一帧 try: while frame_count < self.total_frame: # 循环处理每一帧,直到达到总帧数 t265_frames = self.t265_pipeline.wait_for_frames() # 获取第一个T265相机的帧数据 t265_frames_2 = self.t265_pipeline_2.wait_for_frames() # 获取第二个T265相机的帧数据 t265_frames_3 = self.t265_pipeline_3.wait_for_frames() # 获取第三个T265相机的帧数据 rgbd, depth_frame, color_frame = self.get_rgbd_frame_from_realsense() # 获取RGB-D帧数据 # 获取第一个T265相机的位姿数据 pose_4x4 = RealsesneProcessor.frame_to_pose_conversion( input_t265_frames=t265_frames ) # 获取第二个T265相机的位姿数据 pose_4x4_2 = RealsesneProcessor.frame_to_pose_conversion( input_t265_frames=t265_frames_2 ) # 获取第三个T265相机的位姿数据 pose_4x4_3 = RealsesneProcessor.frame_to_pose_conversion( input_t265_frames=t265_frames_3 )
将位姿数据转换为4x4矩阵,并应用校正矩阵if self.save_hand: # 如果启用了手部数据保存功能 # 获取左手关节位置数据 leftHandJointXyz = np.frombuffer( self.rds.get("rawLeftHandJointXyz"), dtype=np.float64 ).reshape(21, 3) # 获取右手关节位置数据 rightHandJointXyz = np.frombuffer( self.rds.get("rawRightHandJointXyz"), dtype=np.float64 ).reshape(21, 3) # 获取左手关节方向数据 leftHandJointOrientation = np.frombuffer( self.rds.get("rawLeftHandJointOrientation"), dtype=np.float64 ).reshape(21, 4) # 获取右手关节方向数据 rightHandJointOrientation = np.frombuffer( self.rds.get("rawRightHandJointOrientation"), dtype=np.float64 ).reshape(21, 4)
且转换为Open3D格式的L515相机内参corrected_pose = pose_4x4 @ between_cam # 应用校正矩阵
如果是第一帧,则初始化Open3D的点云和坐标系,并添加到可视化器中# 转换为Open3D格式的L515相机内参 o3d_depth_intrinsic = o3d.camera.PinholeCameraIntrinsic( 1280, 720, 898.2010498046875, 897.86669921875, 657.4981079101562, 364.30950927734375, )
如果不是第一帧,则更新点云和坐标系的位姿if first_frame: # 如果是第一帧 if self.enable_visualization: # 如果启用了可视化功能 pcd = o3d.geometry.PointCloud.create_from_rgbd_image( rgbd, o3d_depth_intrinsic ) # 创建点云 pcd.transform(corrected_pose) # 应用校正矩阵 rgbd_mesh = o3d.geometry.TriangleMesh.create_coordinate_frame( size=0.3 ) # 创建RGBD坐标系 rgbd_mesh.transform(corrected_pose) # 应用校正矩阵 rgbd_previous_pose = copy.deepcopy(corrected_pose) # 复制校正矩阵 chest_mesh = o3d.geometry.TriangleMesh.create_coordinate_frame( size=0.3 ) # 创建胸部坐标系 chest_mesh.transform(pose_4x4) # 应用位姿矩阵 chest_previous_pose = copy.deepcopy(pose_4x4) # 复制位姿矩阵 left_hand_mesh = ( o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.3) ) # 创建左手坐标系 left_hand_mesh.transform(pose_4x4_2) # 应用位姿矩阵 left_hand_previous_pose = copy.deepcopy(pose_4x4_2) # 复制位姿矩阵 right_hand_mesh = ( o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.3) ) # 创建右手坐标系 right_hand_mesh.transform(pose_4x4_3) # 应用位姿矩阵 right_hand_previous_pose = copy.deepcopy(pose_4x4_3) # 复制位姿矩阵 self.vis.add_geometry(pcd) # 添加点云到可视化器 self.vis.add_geometry(rgbd_mesh) # 添加RGBD坐标系到可视化器 self.vis.add_geometry(chest_mesh) # 添加胸部坐标系到可视化器 self.vis.add_geometry(left_hand_mesh) # 添加左手坐标系到可视化器 self.vis.add_geometry(right_hand_mesh) # 添加右手坐标系到可视化器 view_params = ( self.vis.get_view_control().convert_to_pinhole_camera_parameters() ) # 获取视图参数 first_frame = False # 标记为非第一帧
再更新可视化器else: if self.enable_visualization: # 如果启用了可视化功能 new_pcd = o3d.geometry.PointCloud.create_from_rgbd_image( rgbd, o3d_depth_intrinsic ) # 创建新的点云 new_pcd.transform(corrected_pose) # 应用校正矩阵 rgbd_mesh.transform(np.linalg.inv(rgbd_previous_pose)) # 逆变换上一个RGBD坐标系 rgbd_mesh.transform(corrected_pose) # 应用校正矩阵 rgbd_previous_pose = copy.deepcopy(corrected_pose) # 复制校正矩阵 chest_mesh.transform(np.linalg.inv(chest_previous_pose)) # 逆变换上一个胸部坐标系 chest_mesh.transform(pose_4x4) # 应用位姿矩阵 chest_previous_pose = copy.deepcopy(pose_4x4) # 复制位姿矩阵 left_hand_mesh.transform(np.linalg.inv(left_hand_previous_pose)) # 逆变换上一个左手坐标系 left_hand_mesh.transform(pose_4x4_2) # 应用位姿矩阵 left_hand_previous_pose = copy.deepcopy(pose_4x4_2) # 复制位姿矩阵 right_hand_mesh.transform( np.linalg.inv(right_hand_previous_pose) ) # 逆变换上一个右手坐标系 right_hand_mesh.transform(pose_4x4_3) # 应用位姿矩阵 right_hand_previous_pose = copy.deepcopy(pose_4x4_3) # 复制位姿矩阵 pcd.points = new_pcd.points # 更新点云的点 pcd.colors = new_pcd.colors # 更新点云的颜色 self.vis.update_geometry(pcd) # 更新点云几何 self.vis.update_geometry(rgbd_mesh) # 更新RGBD坐标系几何 self.vis.update_geometry(chest_mesh) # 更新胸部坐标系几何 self.vis.update_geometry(left_hand_mesh) # 更新左手坐标系几何 self.vis.update_geometry(right_hand_mesh) # 更新右手坐标系几何 self.vis.get_view_control().convert_from_pinhole_camera_parameters( view_params ) # 恢复视图参数
如果启用了帧存储功能,则将深度图像、彩色图像和位姿数据存储到缓冲区中if self.enable_visualization: # 如果启用了可视化功能 self.vis.poll_events() # 处理可视化事件 self.vis.update_renderer() # 更新渲染器
处理完所有帧后,停止所有相机的流,并保存所有帧数据 - 主函数
主函数解析命令行参数,创建 RealsenseProcessor 对象,并配置和处理帧数据
如果输出目录已存在,提示用户是否覆盖目录import concurrent.futures # 导入并发库,用于多线程处理 def main(args): # 定义主函数,接受命令行参数 realsense_processor = RealsesneProcessor( # 创建Realsense处理器对象 first_t265_serial="11622110012", # 第一个T265相机的序列号 second_t265_serial="909212110944", # 第二个T265相机的序列号 thrid_t265_serial="929122111181", # 第三个T265相机的序列号 total_frame=10000, # 总帧数 store_frame=args.store_frame, # 是否存储帧 out_directory=args.out_directory, # 输出目录 save_hand=args.store_hand, # 是否保存手部数据 enable_visualization=args.enable_vis, # 是否启用可视化 ) realsense_processor.configure_stream() # 配置Realsense流 realsense_processor.process_frame() # 处理帧数据 if __name__ == "__main__": # 主程序入口 # 设置命令行参数解析器 parser = argparse.ArgumentParser(description="Process frames and save data.") parser.add_argument( "-s", "--store_frame", action="store_true", help="Flag to indicate whether to store frames", # 是否存储帧的标志 ) parser.add_argument( "--store_hand", action="store_true", help="Flag to indicate whether to store hand joint position and orientation", # 是否保存手部关节位置和方向的标志 ) parser.add_argument( "-v", "--enable_vis", action="store_true", help="Flag to indicate whether to enable open3d visualization", # 是否启用Open3D可视化的标志 ) parser.add_argument( "-o", "--out_directory", type=str, help="Output directory for saved data", # 保存数据的输出目录 default="./saved_data", # 默认输出目录为./saved_data ) args = parser.parse_args() # 解析命令行参数
如果启用了帧存储功能,则创建输出目录# 检查输出目录是否存在 if os.path.exists(args.out_directory): response = ( input( f"{args.out_directory} already exists. Do you want to override? (y/n): " ) .strip() .lower() ) if response != "y": # 如果用户选择不覆盖,退出程序 print("Exiting program without overriding the existing directory.") sys.exit() else: shutil.rmtree(args.out_directory) # 如果用户选择覆盖,删除现有目录
调用 main 函数开始处理帧数据if args.store_frame: os.makedirs(args.out_directory, exist_ok=True) # 创建输出目录
# 如果用户选择覆盖,删除现有目录 main(args) # 调用主函数
1.3 demo_clipping_3d.py——可视化点云数据且选择每个演示的起始帧/结束帧
这段代码是一个用于可视化点云数据(PCD文件),并选择每个演示的起始帧、结束帧的脚本,以下是详细解读
- 导入库
导入各种库和模块,用于处理命令行参数、文件操作、点云数据处理、键盘事件监听等import argparse # 用于解析命令行参数 import os # 用于操作系统相关功能 import copy # 用于复制对象 import zmq # 用于消息传递 import cv2 # 用于图像处理 import sys # 用于系统相关功能 import json # 用于处理JSON数据 import shutil # 用于文件操作 import open3d as o3d # 用于3D数据处理 import numpy as np # 用于数值计算 import platform # 用于获取操作系统信息 from pynput import keyboard # 用于监听键盘事件 from transforms3d.quaternions import qmult, quat2mat # 用于四元数操作 from transforms3d.axangles import axangle2mat # 用于轴角转换 from scipy.spatial.transform import Rotation # 用于旋转矩阵和欧拉角转换 from transforms3d.euler import quat2euler, mat2euler, quat2mat, euler2mat # 用于欧拉角和矩阵转换 from visualizer import * # 导入自定义的可视化模块
- 定义全局变量,用于存储剪辑标记、当前剪辑、是否切换到下一帧或上一帧的标志
clip_marks = [] # 存储剪辑标记 current_clip = {} # 存储当前剪辑 next_frame = False # 标记是否切换到下一帧 previous_frame = False # 标记是否切换到上一帧
- 定义键盘事件处理函数,用于处理不同的键盘输入:
Key.up:保存剪辑标记到 clip_marks. json 文件
Key.down:标记切换到上一帧
Key.page_down :标记切换到下一帧
Key.end:标记当前剪辑的结束帧
Key.insert:标记当前剪辑的起始帧def on_press(key): # 定义键盘按下事件处理函数 global next_frame, previous_frame, delta_movement_accu, delta_ori_accu, delta_movement_accu_left, delta_ori_accu_left, adjust_movement, adjust_right, frame, step, dataset_folder, clip_marks, current_clip # 确定操作系统类型 os_type = platform.system() assert os_type == "Windows" # 仅支持Windows系统 frame_folder = 'frame_{}'.format(frame) # 当前帧文件夹名称 # Windows特定的键绑定在AttributeError部分处理 if key == keyboard.Key.up: # y正方向 with open(os.path.join(dataset_folder, 'clip_marks.json'), 'w') as f: json.dump(clip_marks, f, indent=4) # 保存剪辑标记到JSON文件 elif key == keyboard.Key.down: # y负方向 previous_frame = True # 切换到上一帧 elif key == keyboard.Key.page_down: next_frame = True # 切换到下一帧 elif key == keyboard.Key.end: if 'start' in current_clip.keys(): print("end", frame_folder) current_clip['end'] = frame_folder # 标记当前剪辑的结束帧 clip_marks.append(current_clip) # 添加当前剪辑到剪辑标记列表 current_clip = {} # 重置当前剪辑 elif key == keyboard.Key.insert: print("start", frame_folder) current_clip['start'] = frame_folder # 标记当前剪辑的起始帧 else: print("Key error", key) # 处理其他键的错误
- 数据可视化类
定义数据可祝化类 ReplayDatavisualizer,继承自DataVisualizer.
replay_frames 方法用于可视化单帧数据,并处理键盘事件以切换帧class ReplayDataVisualizer(DataVisualizer): # 定义数据重放可视化类,继承自DataVisualizer def __init__(self, directory): super().__init__(directory) # 调用父类的初始化方法 def replay_frames(self): # 定义重放帧的方法 """ 可视化单帧数据 """ global delta_movement_accu, delta_ori_accu, next_frame, previous_frame, frame if self.R_delta_init is None: self.initialize_canonical_frame() # 初始化标准帧 self._load_frame_data(frame) # 加载当前帧数据 self.vis.add_geometry(self.pcd) # 添加点云数据到可视化器 self.vis.add_geometry(self.coord_frame_1) # 添加坐标系1到可视化器 self.vis.add_geometry(self.coord_frame_2) # 添加坐标系2到可视化器 self.vis.add_geometry(self.coord_frame_3) # 添加坐标系3到可视化器 for joint in self.left_joints + self.right_joints: self.vis.add_geometry(joint) # 添加关节数据到可视化器 for cylinder in self.left_line_set + self.right_line_set: self.vis.add_geometry(cylinder) # 添加连线数据到可视化器 next_frame = True # 初始化为下一帧 try: with keyboard.Listener(on_press=on_press) as listener: # 监听键盘事件 while True: if next_frame == True: next_frame = False frame += 10 # 切换到下一帧 if previous_frame == True: previous_frame = False frame -= 10 # 切换到上一帧 self._load_frame_data(frame) # 加载当前帧数据 self.step += 1 # 增加步数 self.vis.update_geometry(self.pcd) # 更新点云数据 self.vis.update_geometry(self.coord_frame_1) # 更新坐标系1 self.vis.update_geometry(self.coord_frame_2) # 更新坐标系2 self.vis.update_geometry(self.coord_frame_3) # 更新坐标系3 for joint in self.left_joints + self.right_joints: self.vis.update_geometry(joint) # 更新关节数据 for cylinder in self.left_line_set + self.right_line_set: self.vis.update_geometry(cylinder) # 更新连线数据 self.vis.poll_events() # 处理可视化事件 self.vis.update_renderer() # 更新渲染器 listener.join() # 等待监听器结束 finally: print("cumulative_correction ", self.cumulative_correction) # 打印累计修正值
- 主函数
主函数解析命令行参数,获取数据目录
检查数据目录是否存在,如果存在 clip_marks. json 文件,询问用户是否覆盖if __name__ == "__main__": # 主程序入口 parser = argparse.ArgumentParser(description="Visualize saved frame data.") # 创建命令行参数解析器 parser.add_argument("--directory", type=str, default="./saved_data", help="Directory with saved data") # 添加目录参数 args = parser.parse_args() # 解析命令行参数
初始化 ReplayDatavisualizer 对象,并加载校准偏移量和方向偏移量assert os.path.exists(args.directory), f"given directory: {args.directory} not exists" # 确认目录存在 if os.path.exists(os.path.join(args.directory, 'clip_marks.json')): # 检查剪辑标记文件是否存在 response = ( input( f"clip_marks.json already exists. Do you want to override? (y/n): " ) .strip() .lower() ) if response != "y": print("Exiting program without overriding the existing directory.") # 如果用户选择不覆盖,退出程序 sys.exit()
调用replay_frames 方法开始可视化和处理键盛事件dataset_folder = args.directory # 设置数据集文件夹 visualizer = ReplayDataVisualizer(args.directory) # 创建数据重放可视化器对象 visualizer.right_hand_offset = np.loadtxt("{}/calib_offset.txt".format(args.directory)) # 加载右手偏移量 visualizer.right_hand_ori_offset = np.loadtxt("{}/calib_ori_offset.txt".format(args.directory)) # 加载右手方向偏移量 visualizer.left_hand_offset = np.loadtxt("{}/calib_offset_left.txt".format(args.directory)) # 加载左手偏移量 visualizer.left_hand_ori_offset = np.loadtxt("{}/calib_ori_offset_left.txt".format(args.directory)) # 加载左手方向偏移量
visualizer.replay_frames() # 开始重放帧
1.4 redis_glove_server.py——接收UDP数据包并将手部关节数据存储到Redis数据库
这段代码是一个用于接收UDP数据包并将手部关节数据存储到Redis数据库的Python脚本
所谓UDP (User Datagram Protocol,用户数据报协议),其是一种简单的、面向无连接的传输层协议
与TCP (Transmission Control Protocol, 传输控制协议)不同,UDP不提供可靠性、数据包顺序和流量控制等功能。UDP主要用于 需要快速传输且对数据丢失不敏感的应用场景,例如视频流、在线游戏和实时通信等
以下是代码的详细解读:
- 导入库
import socket # 用于网络通信 import json # 用于处理JSON数据 import redis # 用于连接Redis数据库 import numpy as np # 用于数值计算
- 初始化Redis连接
# 初始化Redis连接 redis_host = "localhost" # Redis服务器主机名 redis_port = 6669 # Redis服务器端口 redis_password = "" # Redis服务器密码,如果没有密码则保持为空字符串 r = redis.StrictRedis( host=redis_host, port=redis_port, password=redis_password, decode_responses=True ) # 创建Redis连接对象
- 定义手部关节名称
# 定义左手和右手的关节名称 left_hand_joint_names = ["leftHand", 'leftThumbProximal', 'leftThumbMedial', 'leftThumbDistal', 'leftThumbTip', 'leftIndexProximal', 'leftIndexMedial', 'leftIndexDistal', 'leftIndexTip', 'leftMiddleProximal', 'leftMiddleMedial', 'leftMiddleDistal', 'leftMiddleTip', 'leftRingProximal', 'leftRingMedial', 'leftRingDistal', 'leftRingTip', 'leftLittleProximal', 'leftLittleMedial', 'leftLittleDistal', 'leftLittleTip'] right_hand_joint_names = ["rightHand", 'rightThumbProximal', 'rightThumbMedial', 'rightThumbDistal', 'rightThumbTip', 'rightIndexProximal', 'rightIndexMedial', 'rightIndexDistal', 'rightIndexTip', 'rightMiddleProximal', 'rightMiddleMedial', 'rightMiddleDistal', 'rightMiddleTip', 'rightRingProximal', 'rightRingMedial', 'rightRingDistal', 'rightRingTip', 'rightLittleProximal', 'rightLittleMedial', 'rightLittleDistal', 'rightLittleTip']
- 归一化函数
def normalize_wrt_middle_proximal(hand_positions, is_left=True): # 定义相对于中指近端关节的归一化函数 middle_proximal_idx = left_hand_joint_names.index('leftMiddleProximal') # 获取左手中指近端关节的索引 if not is_left: middle_proximal_idx = right_hand_joint_names.index('rightMiddleProximal') # 获取右手中指近端关节的索引 wrist_position = hand_positions[0] # 获取手腕位置 middle_proximal_position = hand_positions[middle_proximal_idx] # 获取中指近端关节位置 bone_length = np.linalg.norm(wrist_position - middle_proximal_position) # 计算手腕到中指近端关节的骨骼长度 normalized_hand_positions = (middle_proximal_position - hand_positions) / bone_length # 归一化手部关节位置 return normalized_hand_positions # 返回归一化后的手部关节位置
- 启动服务器函数
创建并绑定UDP套接字
无限循环接收UDP数据包def start_server(port): # 定义启动服务器的函数,接受端口号作为参数 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 使用SOCK_DGRAM创建UDP套接字 s.bind(("192.168.0.200", port)) # 绑定套接字到指定的IP地址和端口 print(f"Server started, listening on port {port} for UDP packets...") # 打印服务器启动信息
解析接收到的数据包,提取手部关节位置和方向数据while True: # 无限循环,持续接收数据 data, address = s.recvfrom(64800) # 接收UDP数据包,最大数据包大小为64800字节 decoded_data = data.decode() # 解码数据 # 尝试解析JSON数据 try: received_json = json.loads(decoded_data) # 解析JSON数据 # 初始化数组以存储手部关节位置和方向 left_hand_positions = np.zeros((21, 3)) # 初始化左手关节位置数组,21个关节,每个关节3个坐标 right_hand_positions = np.zeros((21, 3)) # 初始化右手关节位置数组,21个关节,每个关节3个坐标 left_hand_orientations = np.zeros((21, 4)) # 初始化左手关节方向数组,21个关节,每个关节4个方向分量 right_hand_orientations = np.zeros((21, 4)) # 初始化右手关节方向数组,21个关节,每个关节4个方向分量
计算相对于中指近端关节的相对距离,并进行归一化# 遍历JSON数据以提取手部关节位置和方向 for joint_name in left_hand_joint_names: # 遍历左手关节名称列表 joint_data = received_json["scene"]["actors"][0]["body"][joint_name] # 获取关节数据 joint_position = np.array(list(joint_data["position"].values())) # 获取关节位置 joint_rotation = np.array(list(joint_data["rotation"].values())) # 获取关节方向 left_hand_positions[left_hand_joint_names.index(joint_name)] = joint_position # 存储关节位置 left_hand_orientations[left_hand_joint_names.index(joint_name)] = joint_rotation # 存储关节方向 for joint_name in right_hand_joint_names: # 遍历右手关节名称列表 joint_data = received_json["scene"]["actors"][0]["body"][joint_name] # 获取关节数据 joint_position = np.array(list(joint_data["position"].values())) # 获取关节位置 joint_rotation = np.array(list(joint_data["rotation"].values())) # 获取关节方向 right_hand_positions[right_hand_joint_names.index(joint_name)] = joint_position # 存储关节位置 right_hand_orientations[right_hand_joint_names.index(joint_name)] = joint_rotation # 存储关节方向
将原始和归一化后的手部关节数据存储到Redis数据库中# 计算相对于中指近端关节的相对距离,并归一化 left_middle_proximal_idx = left_hand_joint_names.index('leftMiddleProximal') # 获取左手中指近端关节的索引 right_middle_proximal_idx = right_hand_joint_names.index('rightMiddleProximal') # 获取右手中指近端关节的索引 left_wrist_position = left_hand_positions[0] # 获取左手手腕位置 right_wrist_position = right_hand_positions[0] # 获取右手手腕位置 left_middle_proximal_position = left_hand_positions[left_middle_proximal_idx] # 获取左手中指近端关节位置 right_middle_proximal_position = right_hand_positions[right_middle_proximal_idx] # 获取右手中指近端关节位置 left_bone_length = np.linalg.norm(left_wrist_position - left_middle_proximal_position) # 计算左手骨骼长度 right_bone_length = np.linalg.norm(right_wrist_position - right_middle_proximal_position) # 计算右手骨骼长度 normalized_left_hand_positions = (left_middle_proximal_position - left_hand_positions) / left_bone_length # 归一化左手关节位置 normalized_right_hand_positions = (right_middle_proximal_position - right_hand_positions) / right_bone_length # 归一化右手关节位置
打印接收到的手部关节位置数据r.set("leftHandJointXyz", np.array(normalized_left_hand_positions).astype(np.float64).tobytes()) # 将归一化后的左手关节位置存储到Redis r.set("rightHandJointXyz", np.array(normalized_right_hand_positions).astype(np.float64).tobytes()) # 将归一化后的右手关节位置存储到Redis r.set("rawLeftHandJointXyz", np.array(left_hand_positions).astype(np.float64).tobytes()) # 将原始左手关节位置存储到Redis r.set("rawRightHandJointXyz", np.array(right_hand_positions).astype(np.float64).tobytes()) # 将原始右手关节位置存储到Redis r.set("rawLeftHandJointOrientation", np.array(left_hand_orientations).astype(np.float64).tobytes()) # 将原始左手关节方向存储到Redis r.set("rawRightHandJointOrientation", np.array(right_hand_orientations).astype(np.float64).tobytes()) # 将原始右手关节方向存储到Redis
print("\n\n") print("=" * 50) print(np.round(left_hand_positions, 3)) # 打印左手关节位置 print("-"*50) print(np.round(right_hand_positions, 3)) # 打印右手关节位置 except json.JSONDecodeError: # 捕获JSON解析错误 print("Invalid JSON received:") # 打印错误信息
- 主程序入口
if __name__ == "__main__": # 主程序入口 start_server(14551) # 启动服务器,监听端口14551
1.5 replay_human_traj_vis.py——可视化保存点云数据和位姿(可用于查看所有帧和校准)
1.6 transform_to_robot_table.py——将可视化点云数据放置在机器人桌面上
1.7 visualizer.py——可视化手部关节数据
1.7.1 一系列定义:比如五个手指等
- 导入各种库和模块,用于处理命令行参数、文件操作、点云数据处理、键盘事件监听等
- 定义手部关节之间的连接线,用于可视化手部骨架
lines = np.array([ # 拇指 [1, 2], [2, 3], [3, 4], # 食指 [5, 6], [6, 7], [7, 8], # 中指 [9, 10], [10, 11], [11, 12], # 无名指 [13, 14], [14, 15], [15, 16], # 小指 [17, 18], [18, 19], [19, 20], # 连接近端关节 [1, 5], [5, 9], [9, 13], [13, 17], # 连接手掌 [0, 1], [17, 0] ])
- 定义一系列全局变量,用于存储累积的校正、当前帧、步长等信息
delta_movement_accu = np.array([0.0, 0.0, 0.0]) # 累积的位移校正 delta_ori_accu = np.array([0.0, 0.0, 0.0]) # 累积的方向校正 delta_movement_accu_left = np.array([0.0, 0.0, 0.0]) # 左手累积的位移校正 delta_ori_accu_left = np.array([0.0, 0.0, 0.0]) # 左手累积的方向校正 adjust_movement = True # 是否调整位移 adjust_right = True # 是否调整右手 next_frame = False # 是否切换到下一帧 frame = 0 # 当前帧 step = 0.01 # 步长 fixed_transform = np.array([0.0, 0.0, 0.0]) # 固定变换
- 一些辅助函数
将手腕位置平移到原点
应用位姿矩阵def translate_wrist_to_origin(joint_positions): # 将手腕位置平移到原点 wrist_position = joint_positions[0] # 获取手腕位置 updated_positions = joint_positions - wrist_position # 平移所有关节位置 return updated_positions # 返回平移后的关节位置
创建或更新圆柱体def apply_pose_matrix(joint_positions, pose_matrix): # 应用位姿矩阵 homogeneous_joint_positions = np.hstack([joint_positions, np.ones((joint_positions.shape[0], 1))]) # 将关节位置转换为齐次坐标 transformed_positions = np.dot(homogeneous_joint_positions, pose_matrix.T) # 应用位姿矩阵 transformed_positions_3d = transformed_positions[:, :3] # 提取3D坐标 return transformed_positions_3d # 返回变换后的关节位置
def create_or_update_cylinder(start, end, radius=0.003, cylinder_list=None, cylinder_idx=-1): # 创建或更新圆柱体 cyl_length = np.linalg.norm(end - start) # 计算圆柱体的长度 new_cylinder = o3d.geometry.TriangleMesh.create_cylinder(radius=radius, height=cyl_length, resolution=20, split=4) # 创建新的圆柱体 new_cylinder.paint_uniform_color([1, 0, 0]) # 将圆柱体涂成红色 new_cylinder.translate(np.array([0, 0, cyl_length / 2])) # 平移圆柱体 direction = end - start # 计算方向向量 direction /= np.linalg.norm(direction) # 归一化方向向量 up = np.array([0, 0, 1]) # 圆柱体的默认向上向量 rotation_axis = np.cross(up, direction) # 计算旋转轴 rotation_angle = np.arccos(np.dot(up, direction)) # 计算旋转角度 if np.linalg.norm(rotation_axis) != 0: # 如果旋转轴不为零 rotation_axis /= np.linalg.norm(rotation_axis) # 归一化旋转轴 rotation_matrix = o3d.geometry.get_rotation_matrix_from_axis_angle(rotation_axis * rotation_angle) # 计算旋转矩阵 new_cylinder.rotate(rotation_matrix, center=np.array([0, 0, 0])) # 旋转圆柱体 new_cylinder.translate(start) # 平移圆柱体到起始位置 if cylinder_list[cylinder_idx] is not None: # 如果圆柱体列表中已有圆柱体 cylinder_list[cylinder_idx].vertices = new_cylinder.vertices # 更新顶点 cylinder_list[cylinder_idx].triangles = new_cylinder.triangles # 更新三角形 cylinder_list[cylinder_idx].vertex_normals = new_cylinder.vertex_normals # 更新顶点法线 cylinder_list[cylinder_idx].vertex_colors = new_cylinder.vertex_colors # 更新顶点颜色 else: cylinder_list[cylinder_idx] = new_cylinder # 添加新的圆柱体到列表中
1.7.2 DataVisualizer类:用于可视化点云数据和手部关节数据
接下来,定义了一个名为DataVisualizer的类,用于可视化点云数据和手部关节数据
- 类定义和初始化
class DataVisualizer: # 定义DataVisualizer类 def __init__(self, directory): # 初始化方法,接受数据目录作为参数 self.directory = directory # 初始化数据目录 self.base_pcd = None # 初始化基础点云对象 self.pcd = None # 初始化点云对象 self.img_backproj = None # 初始化图像反投影对象 self.coord_frame_1 = None # 初始化坐标系1 self.coord_frame_2 = None # 初始化坐标系2 self.coord_frame_3 = None # 初始化坐标系3 self.right_hand_offset = None # 初始化右手偏移量 self.right_hand_ori_offset = None # 初始化右手方向偏移量 self.left_hand_offset = None # 初始化左手偏移量 self.left_hand_ori_offset = None # 初始化左手方向偏移量 self.pose1_prev = np.eye(4) # 初始化第一个位姿矩阵 self.pose2_prev = np.eye(4) # 初始化第二个位姿矩阵 self.pose3_prev = np.eye(4) # 初始化第三个位姿矩阵 self.vis = o3d.visualization.Visualizer() # 创建Open3D可视化器 self.vis.create_window() # 创建可视化窗口 self.vis.get_view_control().change_field_of_view(step=1.0) # 改变视野 self.between_cam = np.eye(4) # 初始化相机之间的变换矩阵 self.between_cam[:3, :3] = np.array([[1.0, 0.0, 0.0], [0.0, -1.0, 0.0], [0.0, 0.0, -1.0]]) self.between_cam[:3, 3] = np.array([0.0, 0.076, 0.0]) # 设置平移部分 self.between_cam_2 = np.eye(4) # 初始化第二个相机之间的变换矩阵 self.between_cam_2[:3, :3] = np.array([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]]) self.between_cam_2[:3, 3] = np.array([0.0, -0.032, 0.0]) # 设置平移部分 self.between_cam_3 = np.eye(4) # 初始化第三个相机之间的变换矩阵 self.between_cam_3[:3, :3] = np.array([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]]) self.between_cam_3[:3, 3] = np.array([0.0, -0.064, 0.0]) # 设置平移部分 self.canonical_t265_ori = None # 初始化标准T265方向 # 可视化左手21个关节 self.left_joints = [] # 初始化左手关节列表 self.right_joints = [] # 初始化右手关节列表 self.left_line_set = [None for _ in lines] # 初始化左手连线列表 self.right_line_set = [None for _ in lines] # 初始化右手连线列表 for i in range(21): # 遍历21个关节 for joint in [self.left_joints, self.right_joints]: # 遍历左手和右手关节列表 # 为指尖和手腕创建较大的球体,为其他关节创建较小的球体 radius = 0.011 if i in [0, 4, 8, 12, 16, 20] else 0.007 sphere = o3d.geometry.TriangleMesh.create_sphere(radius=radius) # 创建球体 # 将手腕和近端关节涂成绿色 if i in [0, 1, 5, 9, 13, 17]: sphere.paint_uniform_color([0, 1, 0]) # 将指尖涂成红色 elif i in [4, 8, 12, 16, 20]: sphere.paint_uniform_color([1, 0, 0]) # 将其他关节涂成蓝色 else: sphere.paint_uniform_color([0, 0, 1]) # 将拇指涂成粉色 if i in [1, 2, 3, 4]: sphere.paint_uniform_color([1, 0, 1]) joint.append(sphere) # 将球体添加到关节列表中 self.vis.add_geometry(sphere) # 将球体添加到可视化器中 self.step = 0 # 初始化步数 self.distance_buffer = [] # 初始化距离缓冲区 self.R_delta_init = None # 初始化旋转矩阵 self.cumulative_correction = np.array([0.0, 0.0, 0.0]) # 初始化累计修正值
- 初始化标准帧的方法
def initialize_canonical_frame(self): # 初始化标准帧的方法 if self.R_delta_init is None: # 如果旋转矩阵未初始化 self._load_frame_data(0) # 加载第0帧数据 pose_ori_matirx = self.pose3_prev[:3, :3] # 获取第三个位姿的旋转矩阵 pose_ori_correction_matrix = np.dot(np.array([[0, -1, 0], [0, 0, 1], [1, 0, 0]]), euler2mat(0, 0, 0)) # 计算修正矩阵 pose_ori_matirx = np.dot(pose_ori_matirx, pose_ori_correction_matrix) # 应用修正矩阵 self.canonical_t265_ori = np.array([[1, 0, 0], [0, -1, 0], [0, 0, -1]]) # 初始化标准T265方向 x_angle, y_angle, z_angle = mat2euler(self.pose3_prev[:3, :3]) # 将旋转矩阵转换为欧拉角 self.canonical_t265_ori = np.dot(self.canonical_t265_ori, euler2mat(-z_angle, x_angle + 0.3, y_angle)) # 应用欧拉角修正 self.R_delta_init = np.dot(self.canonical_t265_ori, pose_ori_matirx.T) # 计算初始旋转矩阵
- 重放关键帧校准的方法
def replay_keyframes_calibration(self): """ 可视化单帧 """ global delta_movement_accu, delta_ori_accu, next_frame, frame if self.R_delta_init is None: self.initialize_canonical_frame() # 初始化标准帧 self._load_frame_data(frame) # 加载当前帧数据 self.vis.add_geometry(self.pcd) # 添加点云数据到可视化器 self.vis.add_geometry(self.coord_frame_1) # 添加坐标系1到可视化器 self.vis.add_geometry(self.coord_frame_2) # 添加坐标系2到可视化器 self.vis.add_geometry(self.coord_frame_3) # 添加坐标系3到可视化器 for joint in self.left_joints + self.right_joints: self.vis.add_geometry(joint) # 添加关节数据到可视化器 for cylinder in self.left_line_set + self.right_line_set: self.vis.add_geometry(cylinder) # 添加连线数据到可视化器 next_frame = True # 初始化为下一帧 try: with keyboard.Listener(on_press=on_press) as listener: # 监听键盘事件 while True: if next_frame == True: next_frame = False frame += 10 # 切换到下一帧 self._load_frame_data(frame) # 加载当前帧数据 self.step += 1 # 增加步数 self.vis.update_geometry(self.pcd) # 更新点云数据 self.vis.update_geometry(self.coord_frame_1) # 更新坐标系1 self.vis.update_geometry(self.coord_frame_2) # 更新坐标系2 self.vis.update_geometry(self.coord_frame_3) # 更新坐标系3 for joint in self.left_joints + self.right_joints: self.vis.update_geometry(joint) # 更新关节数据 for cylinder in self.left_line_set + self.right_line_set: self.vis.update_geometry(cylinder) # 更新连线数据 self.vis.poll_events() # 处理可视化事件 self.vis.update_renderer() # 更新渲染器 listener.join() # 等待监听器结束 finally: print("cumulative_correction ", self.cumulative_correction) # 打印累计修正值
- 重放所有帧的方法
def replay_all_frames(self): """ 连续可视化所有帧 """ try: if self.R_delta_init is None: self.initialize_canonical_frame() # 初始化标准帧 frame = 0 # 初始化帧计数器 first_frame = True # 标记是否为第一帧 while True: if not self._load_frame_data(frame): # 加载当前帧数据 break # 如果无法加载数据,退出循环 if first_frame: self.vis.add_geometry(self.pcd) # 添加点云数据到可视化器 self.vis.add_geometry(self.coord_frame_1) # 添加坐标系1到可视化器 self.vis.add_geometry(self.coord_frame_2) # 添加坐标系2到可视化器 self.vis.add_geometry(self.coord_frame_3) # 添加坐标系3到可视化器 for joint in self.left_joints + self.right_joints: self.vis.add_geometry(joint) # 添加关节数据到可视化器 for cylinder in self.left_line_set + self.right_line_set: self.vis.add_geometry(cylinder) # 添加连线数据到可视化器 else: self.vis.update_geometry(self.pcd) # 更新点云数据 self.vis.update_geometry(self.coord_frame_1) # 更新坐标系1 self.vis.update_geometry(self.coord_frame_2) # 更新坐标系2 self.vis.update_geometry(self.coord_frame_3) # 更新坐标系3 for joint in self.left_joints + self.right_joints: self.vis.update_geometry(joint) # 更新关节数据 for cylinder in self.left_line_set + self.right_line_set: self.vis.update_geometry(cylinder) # 更新连线数据 self.vis.poll_events() # 处理可视化事件 self.vis.update_renderer() # 更新渲染器 if first_frame: view_params = self.vis.get_view_control().convert_to_pinhole_camera_parameters() # 获取视图参数 else: self.vis.get_view_control().convert_from_pinhole_camera_parameters(view_params) # 恢复视图参数 self.step += 1 # 增加步数 frame += 5 # 增加帧计数器 if first_frame: first_frame = False # 标记为非第一帧 finally: self.vis.destroy_window() # 销毁可视化窗口
- 反投影点的方法
def _back_project_point(self, point, intrinsics): """ 将单个点从3D反投影到2D图像空间 """ x, y, z = point fx, fy = intrinsics[0, 0], intrinsics[1, 1] cx, cy = intrinsics[0, 2], intrinsics[1, 2] u = (x * fx / z) + cx v = (y * fy / z) + cy return int(u), int(v)
1.7.3 _load_frame_data:加载给定帧的点云和位姿数据,并进行可视化处理
最后是加载帧数据的方法,即_load_frame_data这个方法,用于加载给定帧的点云和位姿数据,并进行可视化处理
- 方法定义和参数说明
def _load_frame_data(self, frame, vis_2d=False, load_table_points=False): """ Load point cloud and poses for a given frame @param frame: frame count in integer @return whether we can successfully load all data from frame subdirectory """ global delta_movement_accu, delta_ori_accu, delta_movement_accu_left, delta_ori_accu_left # 全局变量,用于存储累积的平移和旋转校正 print(f"frame {frame}") # 打印当前帧编号 if adjust_movement: # 如果启用了平移校正 print("adjusting translation") # 打印平移校正信息 else: print("adjusting rotation") # 打印旋转校正信息
- 初始化最顶部的L515相机内参
# L515: o3d_depth_intrinsic = o3d.camera.PinholeCameraIntrinsic( 1280, 720, 898.2010498046875, 897.86669921875, 657.4981079101562, 364.30950927734375) # 初始化L515相机的内参
- 处理桌面点云数据
if load_table_points: # 如果需要加载桌面点云数据 table_color_image_o3d = o3d.io.read_image(os.path.join(self.table_frame, "frame_0", "color_image.jpg")) # 读取桌面彩色图像 table_depth_image_o3d = o3d.io.read_image(os.path.join(self.table_frame, "frame_0", "depth_