9월 9일 - wild scene 실험 결과
9월 9일 - wild scene 실험 결과
ROS bag 분석
- left_compressed image와 /depth , /pose 를 추출하여 디렉토리에 저장
convert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import cv2
import numpy as np
from collections import deque
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from sensor_msgs.msg import CompressedImage, Image
from geometry_msgs.msg import PoseWithCovarianceStamped
from cv_bridge import CvBridge
def stamp_to_float(stamp):
return stamp.sec + stamp.nanosec * 1e-9
class ManualSyncSaver(Node):
def __init__(self):
super().__init__('manual_synced_saver')
# === 출력 경로 ===
self.output_dir = '/home/loe/workspace/rosbag/wild_output2'
self.rgb_dir = os.path.join(self.output_dir, 'rgb')
self.depth_dir = os.path.join(self.output_dir, 'depth')
os.makedirs(self.rgb_dir, exist_ok=True)
os.makedirs(self.depth_dir, exist_ok=True)
# === 상태 ===
self.bridge = CvBridge()
self.total_seen = 0 # 동기 성공 시도 카운트 (매 5번째 저장)
self.frame_id = 1 # 저장 프레임 번호 (1부터)
self.slop = 0.05 # 동기 허용 오차 (초)
self.require_depth = True # True면 RGB+POSE+DEPTH 모두 있어야 저장
# === 버퍼 ===
self.rgb_buffer = deque() # CompressedImage
self.pose_buffer = deque() # PoseWithCovarianceStamped
self.depth_buffer = deque() # Image
# === 구독 ===
self.create_subscription(
CompressedImage,
'/zed_front/front/left/image_rect_color/compressed',
self.rgb_cb,
qos_profile_sensor_data
)
self.create_subscription(
PoseWithCovarianceStamped,
'/zed_front/front/pose_with_covariance',
self.pose_cb,
qos_profile_sensor_data
)
self.create_subscription(
Image,
'/zed_front/front/depth/depth_registered',
self.depth_cb,
qos_profile_sensor_data
)
# === Pose 텍스트 파일 ===
self.pose_txt_path = os.path.join(self.output_dir, 'pose.txt')
self.pose_txt = open(self.pose_txt_path, 'w')
self.get_logger().info(
f"[ManualSyncSaver] Save every 5th triple → {self.rgb_dir}, {self.depth_dir}, pose.txt"
)
# ---------- 콜백 ----------
def rgb_cb(self, msg: CompressedImage):
self.rgb_buffer.append(msg)
self.try_sync()
def pose_cb(self, msg: PoseWithCovarianceStamped):
self.pose_buffer.append(msg)
self.try_sync()
def depth_cb(self, msg: Image):
self.depth_buffer.append(msg)
self.try_sync()
# ---------- 유틸 ----------
def _nearest(self, buf, t_anchor):
if not buf:
return None, float('inf')
best = None
best_dt = float('inf')
for m in buf:
dt = abs(stamp_to_float(m.header.stamp) - t_anchor)
if dt < best_dt:
best_dt = dt
best = m
return best, best_dt
def _drop_stale(self, buf, t_anchor):
while buf:
t0 = stamp_to_float(buf[0].header.stamp)
if t_anchor - t0 > self.slop:
buf.popleft()
else:
break
def try_sync(self):
# 앵커: 가장 오래된 RGB 기준으로 동기 시도
while self.rgb_buffer:
rgb_msg = self.rgb_buffer[0]
t = stamp_to_float(rgb_msg.header.stamp)
# 과거 값 정리
self._drop_stale(self.pose_buffer, t)
self._drop_stale(self.depth_buffer, t)
pose_msg, dt_pose = self._nearest(self.pose_buffer, t)
depth_msg, dt_depth = self._nearest(self.depth_buffer, t)
ok_pose = (pose_msg is not None) and (dt_pose <= self.slop)
ok_depth = (depth_msg is not None) and (dt_depth <= self.slop)
if ok_pose and (ok_depth or not self.require_depth):
# 저장
self.save_triplet(rgb_msg, pose_msg, depth_msg if ok_depth else None)
# 사용한 메시지 제거
self.rgb_buffer.popleft()
self.pose_buffer.remove(pose_msg)
if ok_depth:
self.depth_buffer.remove(depth_msg)
else:
# 누적 방지: 너무 과거인 pose/depth를 버리거나, 아니면 rgb를 버림
dropped = False
if self.pose_buffer:
if stamp_to_float(self.pose_buffer[0].header.stamp) < t - self.slop:
self.pose_buffer.popleft()
dropped = True
if self.depth_buffer and not dropped:
if stamp_to_float(self.depth_buffer[0].header.stamp) < t - self.slop:
self.depth_buffer.popleft()
dropped = True
if not dropped:
self.rgb_buffer.popleft()
# ---------- 저장 ----------
def save_triplet(self, rgb_msg: CompressedImage, pose_msg: PoseWithCovarianceStamped, depth_msg: Image = None):
# 매 5번째만 저장
if self.total_seen % 5 != 0:
self.total_seen += 1
return
frame_name = f"frame_{self.frame_id:05d}"
# 1) RGB 저장 → wild_output/rgb/
try:
np_arr = np.frombuffer(rgb_msg.data, np.uint8)
rgb_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
cv2.imwrite(os.path.join(self.rgb_dir, f"{frame_name}.png"), rgb_image)
except Exception as e:
self.get_logger().error(f"[RGB] save failed: {e}")
# 2) Pose 저장 → wild_output/pose.txt (idx x y z qx qy qz qw)
try:
p = pose_msg.pose.pose.position
o = pose_msg.pose.pose.orientation
line = f"{self.frame_id} {p.x:.6f} {p.y:.6f} {p.z:.6f} {o.x:.6f} {o.y:.6f} {o.z:.6f} {o.w:.6f}\n"
self.pose_txt.write(line)
self.pose_txt.flush()
except Exception as e:
self.get_logger().error(f"[POSE] save failed: {e}")
# 3) Depth 저장 → wild_output/depth/
if depth_msg is not None:
try:
depth = self.bridge.imgmsg_to_cv2(depth_msg, desired_encoding='passthrough')
enc = depth_msg.encoding.upper()
if enc in ('16UC1', 'MONO16'):
# 16-bit PNG로 그대로 저장 (단위가 mm일 가능성 높음)
cv2.imwrite(os.path.join(self.depth_dir, f"{frame_name}.png"), depth)
elif enc == '32FC1':
# 부동소수 포맷은 .npy로 원본 보존
np.save(os.path.join(self.depth_dir, f"{frame_name}.npy"), depth)
else:
# 알 수 없는 인코딩은 안전하게 .npy
np.save(os.path.join(self.depth_dir, f"{frame_name}.npy"), depth)
except Exception as e:
self.get_logger().error(f"[DEPTH] save failed: {e}")
self.get_logger().info(f"✅ Saved synced frame {self.frame_id}")
self.total_seen += 1
self.frame_id += 1
def destroy_node(self):
try:
self.pose_txt.close()
except Exception:
pass
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = ManualSyncSaver()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('KeyboardInterrupt, shutting down...')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
wild gs slam 실행
- 확실히 실내 주행데이터에 비해 가우시안이 원활하게 학습되지 않는 것을 확인할 수 있었다.
실험결과 분석
- threshold distance : 1.0m
- threshold rotation : 10 도 야외환경에서 약 63%의 query가 성공하였고 이는 실내에서의 실험 수치 88% 보다 약 25% 하락한 결과이다.
Best retrieval pose
Worst retrieval pose
Translation error
Rotation error
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.



















