在前兩期,我們已經分別基於仿真環境和世界模型進行了針對Manipulation(動作控制)模型的訓練數據合成與模仿學習。我們來回顧下整個過程:
針對具身智能場景,除了Manipution,Navigation(導航)也是一類非常重要的控制模型,本期我們就來詳細解讀基於仿真環境的導航模型訓練的全過程。
和動作控制模型類似,對導航模型的訓練也可以通過人工演示、數據擴增、數據增強、模仿學習和模型測評幾個環節來進行。
但是相比動作控制模型,導航模型的訓練過程有以下特點:
- 人工演示相對簡單,只需要在二維平面上控制運動方向即可
- 數據擴增相對簡單,因為二維平面上僅有2個自由度
- 運動控制的目標更復雜,不再是完成先驗的特定動作,而是到達一個隨機指定的座標
- 基於上述第3點的原因,對導航模型的測評也更加複雜,需要結合仿真環境和模型推理進行在環驗證
在PAI的Notebook Gallery中,我們已經預置了一個最佳實踐,就是這個過程的一個具體示例:
https://gallery.pai-ml.com/#/preview/deepLearning/cv/isaac\_sim\_wf3
下面我們來詳細解讀這個示例。
使用Isaac Asset公共數據集
由於導航模型的訓練需要一個比較複雜的3D場景,在PAI的公共數據集中已經內置了一個數據集,包含了一些3D場景:
在啓動用於人工演示的DSW時,除了選擇資源類型、自定義數據集,還可以選擇這個公共數據集,方便後續的場景構建
這樣,DSW啓動後,就可以看到這個公共數據集:
人工少量演示
加載場景
接下來,我們在這個DSW中,通過livestream啓動Isaac Sim環境:
export ACCEPT_EULA=Y
PUBLIC_IP=$(curl -s ifconfig.me) && /isaac-sim/runheadless.sh --/persistent/isaac/asset_root/default="/mnt/data/isaac_tmp/isaac_asset/Assets/Isaac/5.0" --/app/livestream/publicEndpointAddress=$PUBLIC_IP --/app/livestream/port=49100
即可在Isaac Sim中通過Isaac Asset公共數據集加載導航模型將要運行的場景:
(場景目錄:/mnt/isaac\_assets/5.0/Isaac/Environments/Simple\_Warehouse/warehouse\_multiple\_shelves.usd)
創建Occupancy Map
接下來我們需要為這個場景創建Occupancy Map,這樣MobilityGen就知道機器人在哪裏可以移動。
- 選擇 Tools > Robotics > Occupancy Map 打開佔用地圖擴展
- 在佔用地圖窗口中設置參數:
Origin: X=2.0, Y=0.0, Z=0.0
Upper Bound: X=10.0, Y=20.0, Z=2.0
Lower Bound: X=-14.0, Y=-18.0, Z=0.1 - 點擊
Calculate生成佔用地圖 - 點擊 Visualize Image 查看佔用地圖
- 在可視化窗口中選擇 Rotate Image: 180
- 選擇 Coordinate Type: ROS Occupancy Map Parameters File YAML
- 點擊 Regenerate Image
- 複製生成的YAML文本
- 創建文件
~/MobilityGenData/maps/warehouse_multiple_shelves/map.yaml - 粘貼YAML內容並修改
image: warehouse_multiple_shelves.png為image: map.png - 保存文件
- 在可視化窗口中點擊 Save Image,在
/root/MobilityGenData/maps/warehouse_multiple_shelves下保存為map.png
啓動MobilityGen插件
按照下列操作啓動MobilityGen插件:
- 導航到 Window > Extensions
- 搜索 MobilityGen UI
- 點擊切換開關啓用擴展
- 您應該看到兩個窗口:MobilityGen UI和佔用地圖可視化窗口
在MobilityGen窗口中設置以下參數:
-
Stage: 粘貼倉庫USD路徑
/mnt/isaac_assets/5.0/Isaac/Environments/Simple_Warehouse/warehouse_multiple_shelves.usd -
Occupancy Map: 輸入之前創建的map.yaml文件路徑
~/MobilityGenData/maps/warehouse_multiple_shelves/map.yaml - Robot: 選擇 CarterRobot
- Scenario: 選擇 KeyboardTeleoperationScenario(可選:如果您想自動生成數據,請選擇 RandomPathFollowingScenario)
- 點擊 Build
錄製軌跡
- 點擊 Start recording 開始記錄日誌
- 移動機器人(如您選取KeyboardTeleoperationScenario)
- 點擊 Stop recording 停止記錄
視頻演示 >>
數據現在記錄到~/MobilityGenData/recordings目錄中。
重放生成視頻
然後使用Isaac Sim提供的 replay_directory.py Python腳本重放場景:
cd /isaac-sim && \
/isaac-sim/python.sh standalone_examples/replicator/mobility_gen/replay_directory.py --render_interval 10 --enable isaacsim.replicator.mobility_gen.examples
腳本完成後,重放渲染的圖像和傳感器數據會被保存到 ~/MobilityGenData/replays路徑下
數據增強
我們使用Cosmos-Transfer1-7B模型來增強採集到的圖像數據。在PAI-ModelGallery中已經集成了這個模型的部署方案:
完成模型部署後,可以使用如下腳本來對圖像數據進行增強:
import cv2
import json
from pathlib import Path
import shutil
import requests
import gradio_client.client as gradio_client
import gradio_client.utils as gradio_utils
# 定義Cosmos服務URL和Token
# COSMOS_SERVICE_URL = "http://xxxxxx" # 請替換為實際服務URL
# EAS_TOKEN = "your_eas_token" # 請替換為實際EAS Token
RGB_TARGETS = [
"state/rgb/robot.front_camera.left.rgb_image",
"state/rgb/robot.front_camera.right.rgb_image",
]
# --- 模塊1: 圖像序列轉視頻 ---
def convert_sequence_to_video(input_dir: Path, output_path: Path, fps: int, image_format: str) -> bool:
print(f" - Converting to video: {input_dir.name} (Format: {image_format})")
image_files = sorted(list(input_dir.glob(f'*.{image_format}')))
if not image_files:
print(f" -> No '{image_format}' images found. Skipping.")
return False
try:
first_img = cv2.imread(str(image_files[0]))
if first_img is None: raise IOError("Cannot read the first image.")
height, width = first_img.shape[:2]
except Exception as e:
print(f" -> Error reading first image: {e}. Skipping.")
return False
output_path.parent.mkdir(parents=True, exist_ok=True)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))
for image_file in image_files:
frame = cv2.imread(str(image_file))
if frame is not None:
out.write(frame)
out.release()
print(f" -> Video created: {output_path}")
return True
# --- 新模塊: 視頻轉圖像序列 ---
def split_video_to_frames(video_path: Path, output_dir: Path, original_image_dir: Path, image_format: str):
"""
將視頻文件拆分為一幀幀的圖片,並使用原始圖片的文件名進行命名。
"""
print(f" - Splitting video back to frames: {video_path.name}")
# 1. 獲取原始文件名作為模板
original_image_files = sorted(list(original_image_dir.glob(f'*.{image_format}')))
original_filenames = [p.name for p in original_image_files]
if not original_filenames:
print(f" -> Warning: Could not find original images in {original_image_dir} to use for naming. Skipping frame splitting.")
return
# 2. 準備輸出目錄
output_dir.mkdir(parents=True, exist_ok=True)
# 3. 打開視頻文件並逐幀讀取
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
print(f" -> Error: Could not open video file {video_path}. Skipping.")
return
frame_index = 0
while True:
ret, frame = cap.read()
if not ret:
break # 視頻結束
if frame_index < len(original_filenames):
# 使用原始文件名來保存新幀
output_filepath = output_dir / original_filenames[frame_index]
cv2.imwrite(str(output_filepath), frame)
else:
# 如果視頻幀數多於原始圖片數,則停止,避免命名衝突
print(f" -> Warning: Video contains more frames than original image sequence. Stopping at frame {frame_index}.")
break
frame_index += 1
cap.release()
print(f" -> Success! {frame_index} frames saved to: {output_dir}")
# --- 模塊2: 調用Cosmos服務 ---
def cosmos_sync_with_upload(client, rgb_video_path, seg_video_path, output_dir, original_rgb_dir):
"""上傳視頻,調用API,下載結果,並觸發視頻到幀的轉換。"""
def upload_file(filepath: Path):
if not filepath or not filepath.exists(): return None
print(f" - Uploading: {filepath.name}")
file_desc = gradio_utils.handle_file(str(filepath))
result_str = client.predict(file_desc, api_name="/upload_file")
return json.loads(result_str).get("path")
remote_rgb_path = upload_file(rgb_video_path)
remote_seg_path = upload_file(seg_video_path)
if not remote_rgb_path or not remote_seg_path:
return False, "視頻上傳失敗"
request_dict = create_cosmos_request(remote_rgb_path, remote_seg_path)
print(" - Sending generation request to Cosmos service...")
result = client.predict(json.dumps(request_dict), api_name="/generate_video")
if isinstance(result, tuple) and len(result) >= 2 and isinstance(result[0], dict):
video_path = result[0].get("video")
if not video_path:
return False, f"API did not return a video path. Message: {result[1]}"
output_file = Path(output_dir) / f"{rgb_video_path.stem}_cosmos_enhanced.mp4"
# 統一處理下載或複製的邏輯
success = False
if video_path.startswith(("http://", "https://")):
try:
resp = requests.get(video_path, stream=True, timeout=300)
resp.raise_for_status()
with open(output_file, "wb") as f: shutil.copyfileobj(resp.raw, f)
success = True
except requests.exceptions.RequestException as e:
return False, f"Failed to download video: {e}"
else:
source_path = Path(video_path)
if source_path.exists():
shutil.copy2(source_path, output_file)
success = True
else:
return False, f"API returned a local path that does not exist: {video_path}"
if success:
print(f" -> Augmented video saved to: {output_file}")
# 定義新幀的輸出目錄,例如 .../robot.front_camera.left.rgb_image_cosmos
new_frames_output_dir = original_rgb_dir.parent / f"{original_rgb_dir.name}_cosmos"
split_video_to_frames(
video_path=output_file,
output_dir=new_frames_output_dir,
original_image_dir=original_rgb_dir,
image_format="jpg" # RGB圖像的原始格式
)
return True, str(output_file)
else:
return False, "Failed to retrieve the generated video file."
else:
return False, f"Unexpected API response format: {result}"
def create_cosmos_request(remote_rgb_path, remote_seg_path):
"""動態創建Cosmos請求,包含主視頻和分割視頻的遠程路徑。"""
return {
"prompt": "A realistic warehouse environment with consistent lighting, perspective, and camera motion. Preserve the original structure, object positions, and layout from the input video. Ensure the output exactly matches the segmentation video frame-by-frame in timing and content. Camera movement must follow the original path precisely.",
"negative_prompt": "The video captures a game playing, with bad crappy graphics and cartoonish frames. It represents a recording of old outdated games. The images are very pixelated and of poor CG quality. There are many subtitles in the footage. Overall, the video is unrealistic and appears cg. Plane background.",
"sigma_max": 80,
"guidance": 7,
"input_video_path": remote_rgb_path, # 主視頻路徑
"blur_strength": "low",
"canny_threshold": "low",
"edge": {"control_weight": 0.3},
"seg": {
"control_weight": 1.0,
"input_control": remote_seg_path # 分割視頻路徑
}
}
# --- 模塊3: 主工作流控制器 ---
def process_and_augment_replays(output_dir: str, fps: int = 30):
source_root = Path("/root/MobilityGenData/replays")
output_root = Path(output_dir)
if not source_root.is_dir(): return
timestamp_dirs = [d for d in source_root.iterdir() if d.is_dir()]
if not timestamp_dirs: return
client = gradio_client.Client(COSMOS_SERVICE_URL, hf_token=EAS_TOKEN)
for ts_dir in timestamp_dirs:
print(f"\nProcessing replay: {ts_dir.name}")
final_output_dir = output_root / ts_dir.name
final_output_dir.mkdir(exist_ok=True)
for rgb_rel_path_str in RGB_TARGETS:
rgb_image_dir = ts_dir / rgb_rel_path_str
seg_rel_path_str = rgb_rel_path_str.replace("rgb", "segmentation")
seg_image_dir = ts_dir / seg_rel_path_str
if not (rgb_image_dir.is_dir() and seg_image_dir.is_dir()):
continue
rgb_video_path = final_output_dir / f"{rgb_image_dir.name}.mp4"
seg_video_path = final_output_dir / f"{seg_image_dir.name}.mp4"
rgb_ok = convert_sequence_to_video(rgb_image_dir, rgb_video_path, fps, "jpg")
seg_ok = convert_sequence_to_video(seg_image_dir, seg_video_path, fps, "png")
if not (rgb_ok and seg_ok):
continue
cosmos_sync_with_upload(
client,
rgb_video_path,
seg_video_path,
final_output_dir,
original_rgb_dir=rgb_image_dir
)
print("\n" + "="*20 + " 全部處理完成 " + "="*20)
# --- 程序入口 ---
if __name__ == "__main__":
!mkdir -p /root/MobilityGenData/cosmos_augmented_videos
output_directory = "/root/MobilityGenData/cosmos_augmented_videos"
process_and_augment_replays(output_dir=output_directory)
以下是兩段數據增強前後的視頻數據,作為對比:
增強前
視頻演示 >>
增強後
視頻演示 >>
模仿學習
這裏選擇NVLab的開源模型X-Mobility(https://github.com/NVlabs/X-Mobility)作為基模,進行模仿學習。
可以使用以下腳本,啓動DLC進行分佈式訓練:
import os
import json
import time
from alibabacloud_tea_openapi.models import Config
from alibabacloud_credentials.client import Client as CredClient
from alibabacloud_credentials.models import Config as CredConfig
from alibabacloud_pai_dlc20201203.client import Client as DLCClient
from alibabacloud_pai_dlc20201203.models import (
CreateJobRequest,
GetJobRequest,
)
def wait_for_job_to_terminate(client, job_id):
while True:
job = client.get_job(job_id, GetJobRequest()).body
print('job({}) is {}'.format(job_id, job.status))
if job.status in ('Succeeded', 'Failed', 'Stopped'):
return job.status
time.sleep(5)
return None
def main():
current_time_tuple = time.localtime()
year = current_time_tuple.tm_year
month = current_time_tuple.tm_mon
day = current_time_tuple.tm_mday
hour = current_time_tuple.tm_hour
minute = current_time_tuple.tm_min
# 請確認您的主賬號已授權DLC,且擁有足夠的權限。
display_name = f"train_xmobility_for_isaac_{day}_{hour}-{minute}" #設置任務名稱
region_id = os.environ.get("dsw_region") #設置regionid
workspace_id = os.environ.get('PAI_WORKSPACE_ID') #設置成用户自己的工作空間id
image_uri = f"dsw-registry.{region_id}.cr.aliyuncs.com/pai-training-algorithm/isaac-sim:x-mobility-v10" #使用官方鏡像
ecs_spec = "ecs.gn8v-4x.8xlarge" #
#########訓練任務相關配置#############
# 樣例數據集設置
# data_type = 'pqt'
# dataset1_dir = "/mnt/data/notebook3/x_mobility_isaac_sim_random_160k/data"
# dataset2_dir = "/mnt/data/notebook3/x_mobility_isaac_sim_nav2_100k/data"
# output_dir = "/mnt/data/notebook3/sample_output"
# 樣例cosmos數據集設置
data_type = 'pqt'
dataset1_dir = "/mnt/data/notebook3/x_mobility_isaac_sim_random_160k_cosmos_to_xmob_resized/afm_isaac_sim_random_160k/data"
dataset2_dir = "/mnt/data/notebook3/x_mobility_isaac_sim_nav2_100k_cosmos_to_xmob_resized/afm_isaac_sim_nav2_100k/data"
output_dir = "/mnt/data/notebook3/sample_cosmos_output"
# mobilitygen數據集設置
# data_type = 'lerobot'
# dataset1_dir = "/mnt/data/notebook3/x_mobility_isaac_sim_mobilitygen"
# dataset2_dir = "/mnt/data/notebook3/x_mobility_isaac_sim_mobilitygen"
# output_dir = "/mnt/data/notebook3/mobilitygen_output"
#########訓練任務相關配置#############
# 本示例通過Credentials SDK默認從環境變量中讀取AccessKey,來實現身份驗證。
credentialsConfig = CredConfig(
type='credentials_uri' # 選填。若您未配置其他“默認憑據鏈”訪問方式,您無需再顯式指定,Credentials SDK會通過uri方式獲取臨時憑證
)
cred = CredClient(credentialsConfig)
# 1. create client;
dlc_client = DLCClient(
config=Config(
credential=cred,
region_id=region_id,
endpoint='pai-dlc.{}.aliyuncs.com'.format(region_id),
)
)
print('-------- Create Job ----------')
# 創建DLC作業。
create_job_resp = dlc_client.create_job(CreateJobRequest().from_map({
'WorkspaceId': workspace_id,
'DisplayName': display_name,
'JobType': 'PyTorchJob',
# 'ResourceId': resource_quota_id,
'JobSpecs': [
{
"Type": "Master",
"Image": image_uri,
"PodCount": 1,
"EcsSpec": ecs_spec,
},
],
'DataSources': [
{
"DataSourceId": dataset_id,
},
],
'UserVpc': {
"VpcId": vpc_id, # 替換為實際 VPC ID
"SwitchId": switch_id, # 替換為實際交換機 ID
"SecurityGroupId": security_groupid # 替換為實際安全組 ID
},
"UserCommand": f" export WANDB_MODE=offline && \
export NCCL_NVLS_ENABLE=0 && \
cd /workspace && \
python3 /train_pai/train_wrapper.py \
-d {dataset1_dir} \
-o {output_dir} \
--type {data_type} \
--stage 1 && \
python3 /train_pai/train_wrapper.py \
-d {dataset2_dir} \
-o {output_dir} \
--type {data_type} \
--stage 2 && \
sleep 30",
}))
job_id = create_job_resp.body.job_id
wait_for_job_to_terminate(dlc_client, job_id)
pass
if __name__ == '__main__':
main()
軟件在環驗證
模型格式轉換
使用以下命令將微調得到的X-Mobility模型轉換為ONNX格式:
%cd /X-MOBILITY
# 以sample訓練結果為例,請更換為自己的訓練結果路徑
!python3 onnx_conversion.py -p /mnt/data/notebook3/nav2_output/checkpoints/last.ckpt -o /tmp/x_mobility.onnx
# 請勿隨意更改tensorrt路徑,此為X-Mobility默認路徑
!python3 trt_conversion.py -o /tmp/x_mobility.onnx -t /tmp/x_mobility.engine
部署ROS2
使用以下腳本部署ROS2環境,以加載X-Mobility模型
# 創建ros2工作空間文件夾
!mkdir -p ~/ros2_ws/src
# 創建符號鏈接到x_mobility_navigator ROS2包
!ln -s /X-MOBILITY/ros2_deployment/x_mobility_navigator ~/ros2_ws/src/x_mobility_navigator
# 構建ROS2工作空間
!cd ~/ros2_ws && colcon build --symlink-install
啓動VNC
X-Mobility模型的軟件在環驗證使用Isaac Sim + ROS2的組合方案,需要使用VNC以驅動圖形化界面。使用以下命令在DSW中啓動VNC:
/opt/TurboVNC/bin/vncserver :0 -geometry 4000x3000
在Isaac Sim中啓用ROS2
使用如下步驟在Isaac Sim中啓動ROS2插件:
-
進行ROS2預配置
source /opt/ros/humble/setup.bash cd ~/ros2_ws && source install/setup.bash source ~/.bashrc -
啓動Isaac Sim
ACCEPT_EULA=Y /isaac-sim/runapp.sh --/persistent/isaac/asset_root/default="/mnt/isaac_assets/5.0" - 通過點擊
Robotics Examples>ROS2>Navigation>Carter Navigation>Load Sample Scene啓動Carter導航示例 - 點擊左側工具欄的
Play圖標
可以新開一個Terminal測試ROS2連接是否正常工作:
# 設置ROS2環境
source /opt/ros/humble/setup.bash
cd ~/ros2_ws && source install/setup.bash
# 列出可用主題
ros2 topic list
如果看到以下主題:
/back_stereo_imu/imu
/chassis/imu
/chassis/odom
/clock
/cmd_vel
/front_3d_lidar/lidar_points
/front_stereo_camera/left/camera_info
/front_stereo_camera/left/image_raw
/front_stereo_camera/left/image_raw/nitros_bridge
/front_stereo_imu/imu
/left_stereo_imu/imu
/parameter_events
/right_stereo_imu/imu
/rosout
/tf
則證明ROS2插件安裝成功。
啓動X-Mobility模型
可以通過以下命令在Terminal中啓動X-Mobility Navigator軟件:
# 設置構建的工作空間
cd ~/ros2_ws && source install/setup.bash
# 啓動X-Mobility導航器
ros2 launch x_mobility_navigator x_mobility_navigator.launch.py
設置目標姿態:選擇2D Goal Pose然後點擊地圖設置位置/方向。
如果一切正常,應該看到機器人在模擬中向目標位置移動。
總結
在本 Notebook 中,我們基於阿里雲 PAI 平台的強大功能,完整地演示了使用 Isaac Sim 和 X-Mobility 的通用導航與運動控制工作流,實現了從數據生成、視覺增強到模型訓練和仿真部署的端到端流程。
處理流程包括:
- 人工演示: 使用 Isaac Sim 5.0 和 MobilityGen 自動化生成大規模導航演示數據。
- 數據擴增(可選):使用MobilityGen的Random軌跡生成功能,可以快速擴增演示數據。
- 數據增強: 利用 Cosmos-Transfer1 對仿真數據進行風格遷移,提升其真實感和多樣性。
- 模仿學習: 基於生成的數據訓練 X-Mobility,一個以世界模型為基礎的通用導航與運動控制策略。
- 軟件在環驗證: 將訓練好的模型通過 ROS2 集成到 Isaac Sim 中,進行端到端的閉環導航驗證。
通過 Cosmos 增強數據後重新訓練的 X-Mobility 模型,在泛化性和魯棒性上展現了巨大潛力。此工作流為通用機器人導航提供了一套完整的技術方案,顯著提升了模型在複雜視覺環境下的泛化能力,併為 Sim2Real 的成功遷移奠定了堅實基礎。