import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from rclpy.time import Time
from typing import List, Tuple, Optional
import numpy as np
class ClosestMessageSubscriber(Node):
    """특정 토픽의 메시지를 수신하여 버퍼에 저장하고, 요청된 타임스탬프에 가장 가까운 메시지를 검색하는 노드.
    Attributes:
        subscription (Subscription): ROS 2 subscription 객체.
        buffer (List[Tuple[Time, String]]): 수신된 메시지와 타임스탬프를 저장하는 버퍼.
        buffer_size (int): 버퍼의 최대 크기.
    """
    def __init__(self):
        """노드 초기화 및 구독자 설정."""
        super().__init__('closest_message_subscriber')
        self.subscription = self.create_subscription(
            String,
            'chatter',
            self.listener_callback,
            10
        )
        self.buffer: List[Tuple[Time, String]] = []
        self.buffer_size: int = 100  # 버퍼 사이즈를 100개 메시지로 설정
    def listener_callback(self, msg: String) -> None:
        """토픽의 메시지를 받아 버퍼에 저장하는 콜백 함수.
        Args:
            msg (String): 수신된 메시지.
        """
        # 버퍼 관리: 오래된 메시지 제거
        if len(self.buffer) >= self.buffer_size:
            self.buffer.pop(0)
        # 메시지 저장
        self.buffer.append((self.get_clock().now(), msg))
    def get_closest_message(self, target_time: Time, time_gap_threshold: float) -> Optional[String]:
        """요청된 타임스탬프와 가장 근접한 메시지를 검색합니다.
        Args:
            target_time (Time): 찾고자 하는 타임스탬프.
            time_gap_threshold (float): 허용되는 최대 시간 차이 (나노초 단위).
        Returns:
            Optional[String]: 가장 근접한 타임스탬프의 메시지. 조건을 만족하는 메시지가 없으면 None을 반환합니다.
        """
        if not self.buffer:
            return None
        # 시간 차이 계산
        timestamps = np.array([t[0].nanoseconds for t in self.buffer])
        target_ns = target_time.nanoseconds
        time_diffs = np.abs(timestamps - target_ns)
        # 조건을 만족하는 최소 시간 차이 찾기
        min_diff_index = np.argmin(time_diffs)
        min_diff = time_diffs[min_diff_index]
        if min_diff <= time_gap_threshold:
            return self.buffer[min_diff_index][1]
        else:
            return None
def main(args=None) -> None:
    """메인 함수: 노드를 초기화하고 실행합니다."""
    rclpy.init(args=args)
    node = ClosestMessageSubscriber()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()
if __name__ == '__main__':
    main()
Time Source와 같은 메커니즘을 제공합니다. message_filters 라이브러리를 사용하여 여러 토픽의 메시지를 시간에 따라 동기화할 수 있습니다. message_filters 라이브러리를 사용하여 ROS 2에서 여러 토픽의 메시지를 시간에 따라 동기화하는 방법을 설명하겠습니다. message_filters 사용하기message_filters를 사용하는 기본적인 단계는 다음과 같습니다:
아래는 두 개의 토픽 (/camera/image와 /lidar/scan)에서 메시지를 동기화하는 예제 코드입니다.
import rclpy
from rclpy.node import Node
from message_filters import ApproximateTimeSynchronizer, Subscriber
from sensor_msgs.msg import Image
from sensor_msgs.msg import LaserScan
class SensorFusionNode(Node):
    def __init__(self):
        super().__init__('sensor_fusion_node')
        # 메시지 필터 생성
        self.image_sub = Subscriber(self, Image, '/camera/image')
        self.lidar_sub = Subscriber(self, LaserScan, '/lidar/scan')
        # 시간 동기화 설정
        self.ats = ApproximateTimeSynchronizer([self.image_sub, self.lidar_sub], queue_size=10, slop=0.1)
        self.ats.registerCallback(self.callback)
    def callback(self, image, scan):
        # 동기화된 메시지 처리
        self.get_logger().info(f"Received synchronized messages: {image.header.stamp} {scan.header.stamp}")
def main(args=None):
    rclpy.init(args=args)
    node = SensorFusionNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()
if __name__ == '__main__':
    main()
Subscriber 객체를 생성합니다.ApproximateTimeSynchronizer를 사용하여 두 토픽의 메시지를 동기화queue_size는 동기화를 위해 버퍼링할 메시지의 수를 정의하고, slop는 메시지 간의 최대 허용 시간 차이를 초 단위로 설정message_filters는 ApproximateTimeSynchronizer 외에도 TimeSynchronizer (엄격한 동기화가 필요할 때) 등 다양한 동기화 옵션을 제공합니다.tf2_ros 패키지는 변환 데이터를 버퍼링하고, 이 데이터에 대한 조회를 허용TF2를 사용하면 특정 시간에 대한 변환 데이터를 요청할 수 있음tf2_ros를 사용하여 ROS 2에서 특정 시간에 대한 변환 데이터를 요청하는 방법을 설명tf2_ros는 좌표 변환을 위한 시스템Buffer와 TransformListener를 사용하여 변환을 저장하고, 이러한 변환에 접근하여 필요한 계산을 수행할 수 있음tf2_ros.Buffer 초기화tf2_ros.Buffer 객체를 초기화합니다. Buffer 객체는 변환 데이터를 저장tf2_ros.TransformListener 사용
TransformListener는 Buffer에 데이터를 자동으로 저장변환 요청
Buffer 객체의 lookup_transform 메소드를 사용import rclpy
from rclpy.node import Node
from tf2_ros import Buffer, TransformListener
from tf2_ros import LookupException, ConnectivityException, ExtrapolationException
from geometry_msgs.msg import TransformStamped
class TransformListenerNode(Node):
    def __init__(self):
        super().__init__('transform_listener_node')
        self.tf_buffer = Buffer()
        self.listener = TransformListener(self.tf_buffer, self)
    def get_transform(self, target_frame: str, source_frame: str, time):
        try:
            # 시간을 Time 형식으로 변환 (rclpy.time.Time)
            when = rclpy.time.Time(seconds=time)
            # 특정 시간에 대한 변환 요청
            transform = self.tf_buffer.lookup_transform(target_frame, source_frame, when)
            return transform
        except (LookupException, ConnectivityException, ExtrapolationException) as e:
            self.get_logger().error(f'Error getting transform: {str(e)}')
            return None
def main(args=None):
    rclpy.init(args=args)
    node = TransformListenerNode()
    try:
        # 예를 들어, 'base_link'에서 'camera_link'로의 변환을 요청
        transform = node.get_transform('base_link', 'camera_link', 1620303662.0)
        if transform:
            node.get_logger().info(f'Transform: {transform}')
    finally:
        node.destroy_node()
        rclpy.shutdown()
if __name__ == '__main__':
    main()
Buffer에 저장된 시간 범위 내에 있어야 합니다.