포스트

9월 9일 - wild scene 실험 결과

9월 9일 - wild scene 실험 결과

ROS bag 분석

  • left_compressed image와 /depth , /pose 를 추출하여 디렉토리에 저장
convert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import cv2
import numpy as np
from collections import deque

import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data

from sensor_msgs.msg import CompressedImage, Image
from geometry_msgs.msg import PoseWithCovarianceStamped
from cv_bridge import CvBridge


def stamp_to_float(stamp):
    return stamp.sec + stamp.nanosec * 1e-9


class ManualSyncSaver(Node):
    def __init__(self):
        super().__init__('manual_synced_saver')

        # === 출력 경로 ===
        self.output_dir = '/home/loe/workspace/rosbag/wild_output2'
        self.rgb_dir = os.path.join(self.output_dir, 'rgb')
        self.depth_dir = os.path.join(self.output_dir, 'depth')
        os.makedirs(self.rgb_dir, exist_ok=True)
        os.makedirs(self.depth_dir, exist_ok=True)

        # === 상태 ===
        self.bridge = CvBridge()
        self.total_seen = 0          # 동기 성공 시도 카운트 (매 5번째 저장)
        self.frame_id = 1            # 저장 프레임 번호 (1부터)
        self.slop = 0.05             # 동기 허용 오차 (초)
        self.require_depth = True    # True면 RGB+POSE+DEPTH 모두 있어야 저장

        # === 버퍼 ===
        self.rgb_buffer = deque()    # CompressedImage
        self.pose_buffer = deque()   # PoseWithCovarianceStamped
        self.depth_buffer = deque()  # Image

        # === 구독 ===
        self.create_subscription(
            CompressedImage,
            '/zed_front/front/left/image_rect_color/compressed',
            self.rgb_cb,
            qos_profile_sensor_data
        )
        self.create_subscription(
            PoseWithCovarianceStamped,
            '/zed_front/front/pose_with_covariance',
            self.pose_cb,
            qos_profile_sensor_data
        )
        self.create_subscription(
            Image,
            '/zed_front/front/depth/depth_registered',
            self.depth_cb,
            qos_profile_sensor_data
        )

        # === Pose 텍스트 파일 ===
        self.pose_txt_path = os.path.join(self.output_dir, 'pose.txt')
        self.pose_txt = open(self.pose_txt_path, 'w')
        self.get_logger().info(
            f"[ManualSyncSaver] Save every 5th triple → {self.rgb_dir}, {self.depth_dir}, pose.txt"
        )

    # ---------- 콜백 ----------
    def rgb_cb(self, msg: CompressedImage):
        self.rgb_buffer.append(msg)
        self.try_sync()

    def pose_cb(self, msg: PoseWithCovarianceStamped):
        self.pose_buffer.append(msg)
        self.try_sync()

    def depth_cb(self, msg: Image):
        self.depth_buffer.append(msg)
        self.try_sync()

    # ---------- 유틸 ----------
    def _nearest(self, buf, t_anchor):
        if not buf:
            return None, float('inf')
        best = None
        best_dt = float('inf')
        for m in buf:
            dt = abs(stamp_to_float(m.header.stamp) - t_anchor)
            if dt < best_dt:
                best_dt = dt
                best = m
        return best, best_dt

    def _drop_stale(self, buf, t_anchor):
        while buf:
            t0 = stamp_to_float(buf[0].header.stamp)
            if t_anchor - t0 > self.slop:
                buf.popleft()
            else:
                break

    def try_sync(self):
        # 앵커: 가장 오래된 RGB 기준으로 동기 시도
        while self.rgb_buffer:
            rgb_msg = self.rgb_buffer[0]
            t = stamp_to_float(rgb_msg.header.stamp)

            # 과거 값 정리
            self._drop_stale(self.pose_buffer, t)
            self._drop_stale(self.depth_buffer, t)

            pose_msg, dt_pose = self._nearest(self.pose_buffer, t)
            depth_msg, dt_depth = self._nearest(self.depth_buffer, t)

            ok_pose = (pose_msg is not None) and (dt_pose <= self.slop)
            ok_depth = (depth_msg is not None) and (dt_depth <= self.slop)

            if ok_pose and (ok_depth or not self.require_depth):
                # 저장
                self.save_triplet(rgb_msg, pose_msg, depth_msg if ok_depth else None)
                # 사용한 메시지 제거
                self.rgb_buffer.popleft()
                self.pose_buffer.remove(pose_msg)
                if ok_depth:
                    self.depth_buffer.remove(depth_msg)
            else:
                # 누적 방지: 너무 과거인 pose/depth를 버리거나, 아니면 rgb를 버림
                dropped = False
                if self.pose_buffer:
                    if stamp_to_float(self.pose_buffer[0].header.stamp) < t - self.slop:
                        self.pose_buffer.popleft()
                        dropped = True
                if self.depth_buffer and not dropped:
                    if stamp_to_float(self.depth_buffer[0].header.stamp) < t - self.slop:
                        self.depth_buffer.popleft()
                        dropped = True
                if not dropped:
                    self.rgb_buffer.popleft()

    # ---------- 저장 ----------
    def save_triplet(self, rgb_msg: CompressedImage, pose_msg: PoseWithCovarianceStamped, depth_msg: Image = None):
        # 매 5번째만 저장
        if self.total_seen % 5 != 0:
            self.total_seen += 1
            return

        frame_name = f"frame_{self.frame_id:05d}"

        # 1) RGB 저장 → wild_output/rgb/
        try:
            np_arr = np.frombuffer(rgb_msg.data, np.uint8)
            rgb_image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
            cv2.imwrite(os.path.join(self.rgb_dir, f"{frame_name}.png"), rgb_image)
        except Exception as e:
            self.get_logger().error(f"[RGB] save failed: {e}")

        # 2) Pose 저장 → wild_output/pose.txt  (idx x y z qx qy qz qw)
        try:
            p = pose_msg.pose.pose.position
            o = pose_msg.pose.pose.orientation
            line = f"{self.frame_id} {p.x:.6f} {p.y:.6f} {p.z:.6f} {o.x:.6f} {o.y:.6f} {o.z:.6f} {o.w:.6f}\n"
            self.pose_txt.write(line)
            self.pose_txt.flush()
        except Exception as e:
            self.get_logger().error(f"[POSE] save failed: {e}")

        # 3) Depth 저장 → wild_output/depth/
        if depth_msg is not None:
            try:
                depth = self.bridge.imgmsg_to_cv2(depth_msg, desired_encoding='passthrough')
                enc = depth_msg.encoding.upper()
                if enc in ('16UC1', 'MONO16'):
                    # 16-bit PNG로 그대로 저장 (단위가 mm일 가능성 높음)
                    cv2.imwrite(os.path.join(self.depth_dir, f"{frame_name}.png"), depth)
                elif enc == '32FC1':
                    # 부동소수 포맷은 .npy로 원본 보존
                    np.save(os.path.join(self.depth_dir, f"{frame_name}.npy"), depth)
                else:
                    # 알 수 없는 인코딩은 안전하게 .npy
                    np.save(os.path.join(self.depth_dir, f"{frame_name}.npy"), depth)
            except Exception as e:
                self.get_logger().error(f"[DEPTH] save failed: {e}")

        self.get_logger().info(f"✅ Saved synced frame {self.frame_id}")
        self.total_seen += 1
        self.frame_id += 1

    def destroy_node(self):
        try:
            self.pose_txt.close()
        except Exception:
            pass
        super().destroy_node()


def main(args=None):
    rclpy.init(args=args)
    node = ManualSyncSaver()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        node.get_logger().info('KeyboardInterrupt, shutting down...')
    finally:
        node.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

wild gs slam 실행

  • 확실히 실내 주행데이터에 비해 가우시안이 원활하게 학습되지 않는 것을 확인할 수 있었다.
    • 예상원인
      • 가우시안 맵 생성 시에 하늘에 대한 prior가 없을 수 있다. (실제로 wild gs 공식 실험 데이터에서도 야외 실험 영상은 x)
      • 풀숲, 나무등이 전부 동적 객체로 분류되어 해당 부분의 가우시안이 잘 학습되지 않았다. (metric depth와 gaussian rendered depth를 비교해볼때에 나무 기둥과 같은 부분이 동적으로 분류되어 애초에 맵에서 표현이 안된다) 실내에서는 동적인 객체가 가우시안 맵에서 잘 제거되었고 이로인해 두 depth image의 diff로 uncertainty가 잘 학습되었다 (이런식으로 이 slam환경에서 동적 & 정적이 어떤 것인지 파악이 되어야 전체적인 맵생성에서 퀄리티가 향상된다)

        초반에는 위와같이 하늘, 땅을 정적 객체로 판단하고 진행하였으나

        여러 나무 그림자와 나뭇잎을 지나면서 바닥과 나무가 동적이라고 학습하기 시작한다. → uncertainty를 모든 픽셀에 적용하므로 가우시안 맵 학습이 안된다.

        이후에 다시 하늘과 바닥을 다시 정적으로 판단하기는 하지만 이미 맵에는 잘못된 결과가 들어가게 된다.

실험결과 분석

  • threshold distance : 1.0m
  • threshold rotation : 10 도 야외환경에서 약 63%의 query가 성공하였고 이는 실내에서의 실험 수치 88% 보다 약 25% 하락한 결과이다.

Best retrieval pose

Worst retrieval pose

  • slam 후반부에 retrieval성능이 떨어지는 것을 관측할 수 있엇다.
    • frame_350 이후 동적 객체인 사람이 등장하고 로봇이 회전하는 동작을 확인하였다.

Translation error

Rotation error

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.