| ubuntu 24.04 Desktop ros2 jazzy |
| CARLA Simulator에서 오는 Lidar 토픽을 받아 ROS2 패키지로 open3d를 사용하여 visualization할 것임 |
topic명
패키지 작성
| 가상환경 활성화 상태로 pip install open3d numpy cd ros2_ws/src ros2 pkg create --build-type ament_python open3d_ros2_node --dependencies rclpy sensor_msgs |
/open3d_ros2_node/open3d_ros2_node/__init__.py
/open3d_ros2_node/open3d_ros2_node/subscriber_member_function.py
토픽 명만 맞춰 주면됨 ->/carla/hero/lidar
| import rclpy from rclpy.node import Node from sensor_msgs.msg import PointCloud2, LaserScan import sensor_msgs_py.point_cloud2 as pc2 import open3d as o3d import numpy as np class LidarVisualizer(Node): def __init__(self): super().__init__('lidar_visualizer') self.subscription = self.create_subscription( PointCloud2, '/carla/hero/lidar', #topic name self.listener_callback, 10) # 2. Open3D Visualizer 설정 self.vis = o3d.visualization.Visualizer() self.vis.create_window(window_name='Open3D ROS2 LiDAR', width=800, height=600) opt = self.vis.get_render_option() opt.point_size = 1.5 # 점 크기를 작게 조절 (네모 느낌 감소) #opt.background_color = np.array([0.0, 0.0, 0.0]) #opt.light_on = False self.pcd = o3d.geometry.PointCloud() self.is_first_msg = True def listener_callback(self, msg): #3D # 1. 포인트 데이터 변환 (기존과 동일) points = [[p[0], p[1], p[2]] for p in pc2.read_points(msg, field_names=("x", "y", "z"), skip_nans=True)] if not points: return self.pcd.points = o3d.utility.Vector3dVector(np.array(points)) if self.is_first_msg: self.vis.add_geometry(self.pcd) # 2. 시점 제어기 가져오기 ctr = self.vis.get_view_control() # --------------------------------------------------------- # A. 탑뷰 (위에서 아래로) # ctr.set_front([0, 0, -1]) # 카메라가 바라보는 방향 (Z축 아래로) # ctr.set_up([0, -1, 0]) # 카메라의 위쪽 방향 설정 # ctr.set_lookat([0, 0, 0]) # 카메라가 집중할 초점 # ctr.set_zoom(0.5) # 숫자가 작을수록 멀리 보임 # --------------------------------------------------------- # B. 쿼터뷰/사선뷰 (대각선 위에서) ctr.set_front([-1.0, 0.0, 1.0]) ctr.set_up([0, 0, 1]) ctr.set_lookat([0, 0, 0]) ctr.set_zoom(0.3) # --------------------------------------------------------- self.is_first_msg = False # 3. 화면 갱신 self.vis.update_geometry(self.pcd) self.vis.poll_events() self.vis.update_renderer() def main(args=None): rclpy.init(args=args) node = LidarVisualizer() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.vis.destroy_window() node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() |
setup.cfg
| [develop] script_dir=$base/lib/open3d_ros2_node [install] install_scripts=$base/lib/open3d_ros2_node |
setup.py
| from setuptools import find_packages, setup package_name = 'open3d_ros2_node' setup( name=package_name, version='0.0.0', packages=find_packages(exclude=['test']), data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), ], install_requires=['setuptools'], zip_safe=True, maintainer='airlab', maintainer_email='airlab@todo.todo', description='TODO: Package description', license='TODO: License declaration', extras_require={ 'test': [ 'pytest', ], }, entry_points={ 'console_scripts': [ 'visualizer = open3d_ros2_node.subscriber_member_function:main', ], }, ) |
추가
/carla/PythonAPI/examples/ros2/ros2_native_watch.py
차량이 어디 있는지 매번 찾기 힘들기 때문에
움직이는 차량을 따라가는 시점인 carla 창을 하나더 생성
| #!/usr/bin/env python import argparse import json import logging import carla import pygame import numpy as np # --- 시각화를 위한 클래스 및 콜백 --- class RenderObject: def __init__(self): self.surface = None def pygame_callback(data, render_obj): """CARLA 이미지를 Pygame Surface로 변환""" array = np.frombuffer(data.raw_data, dtype=np.dtype("uint8")) array = np.reshape(array, (data.height, data.width, 4)) array = array[:, :, :3] # BGRA -> BGR array = array[:, :, ::-1] # BGR -> RGB render_obj.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) # --- 기존 함수 유지 --- def _setup_vehicle(world, config): logging.debug("Spawning vehicle: {}".format(config.get("type"))) bp_library = world.get_blueprint_library() map_ = world.get_map() bp = bp_library.filter(config.get("type"))[0] bp.set_attribute("role_name", config.get("id")) bp.set_attribute("ros_name", config.get("id")) return world.spawn_actor( bp, map_.get_spawn_points()[0], attach_to=None) def _setup_sensors(world, vehicle, sensors_config): bp_library = world.get_blueprint_library() sensors = [] for sensor in sensors_config: logging.debug("Spawning sensor: {}".format(sensor)) bp = bp_library.filter(sensor.get("type"))[0] bp.set_attribute("ros_name", sensor.get("id")) bp.set_attribute("role_name", sensor.get("id")) for key, value in sensor.get("attributes", {}).items(): bp.set_attribute(str(key), str(value)) wp = carla.Transform( location=carla.Location(x=sensor["spawn_point"]["x"], y=-sensor["spawn_point"]["y"], z=sensor["spawn_point"]["z"]), rotation=carla.Rotation(roll=sensor["spawn_point"]["roll"], pitch=-sensor["spawn_point"]["pitch"], yaw=-sensor["spawn_point"]["yaw"]) ) s = world.spawn_actor(bp, wp, attach_to=vehicle) sensors.append(s) s.enable_for_ros() return sensors # --- 메인 실행 루프 --- def main(args): world = None vehicle = None sensors = [] view_camera = None original_settings = None # 1. Pygame 초기화 (창 띄우기) pygame.init() display = pygame.display.set_mode((800, 600), pygame.HWSURFACE | pygame.DOUBLEBUF) pygame.display.set_caption("CARLA ROS2 Visualizer") render_obj = RenderObject() try: client = carla.Client(args.host, args.port) client.set_timeout(60.0) world = client.get_world() # 2. 세계 설정 (동기화 모드) original_settings = world.get_settings() settings = world.get_settings() settings.synchronous_mode = True settings.fixed_delta_seconds = 0.05 world.apply_settings(settings) traffic_manager = client.get_trafficmanager() traffic_manager.set_synchronous_mode(True) # 3. 차량 및 설정 센서 스폰 (JSON 기반) with open(args.file) as f: config = json.load(f) vehicle = _setup_vehicle(world, config) sensors = _setup_sensors(world, vehicle, config.get("sensors", [])) # 4. 시각화 전용 카메라 추가 (차량 뒤쪽 3인칭 뷰) bp_library = world.get_blueprint_library() view_bp = bp_library.find('sensor.camera.rgb') view_bp.set_attribute('image_size_x', '800') view_bp.set_attribute('image_size_y', '600') view_transform = carla.Transform(carla.Location(x=-6, z=2.5), carla.Rotation(pitch=-15)) view_camera = world.spawn_actor(view_bp, view_transform, attach_to=vehicle) # 카메라 콜백 연결 view_camera.listen(lambda image: pygame_callback(image, render_obj)) # 초기 틱 world.tick() vehicle.set_autopilot(True) # 5. 메인 루프 clock = pygame.time.Clock() while True: world.tick() # Pygame 화면에 카메라 영상 출력 if render_obj.surface is not None: display.blit(render_obj.surface, (0, 0)) pygame.display.flip() # 종료 이벤트 처리 (창 닫기 혹은 ESC) for event in pygame.event.get(): if event.type == pygame.QUIT: return elif event.type == pygame.KEYDOWN: if event.key == pygame.K_ESCAPE: return clock.tick(20) # 루프 속도 조절 except KeyboardInterrupt: print('\nCancelled by user. Bye!') finally: # 정리 작업 if original_settings: world.apply_settings(original_settings) if view_camera: view_camera.destroy() for sensor in sensors: sensor.destroy() if vehicle: vehicle.destroy() pygame.quit() if __name__ == '__main__': argparser = argparse.ArgumentParser(description='CARLA ROS2 native with Visualization') argparser.add_argument('--host', metavar='H', default='localhost', help='IP of the host CARLA Simulator') argparser.add_argument('--port', metavar='P', default=2000, type=int, help='TCP port') argparser.add_argument('-f', '--file', required=True, help='JSON config file') argparser.add_argument('-v', '--verbose', action='store_true', dest='debug', help='print debug info') args = argparser.parse_args() log_level = logging.DEBUG if args.debug else logging.INFO logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level) main(args) |
실행 영상
3D point cloud
64ch
128ch
| XDG_SESSION_TYPE=x11 GDK_BACKEND=x11 ros2 run open3d_ros2_node visualizer |
2D point cloud
| sensor_msgs/msg/LaserScan /scan topic 을 publish하기 위해서 sudo apt install ros-jazzy-pointcloud-to-laserscan ros2 run pointcloud_to_laserscan pointcloud_to_laserscan_node --ros-args -p target_frame:=lidar -p min_height:=-2.0 -p max_height:=2.0 -r cloud_in:=/carla/ego_vehicle/lidar -r scan:=/scan |
패키지 형식으로 만듬
| cd ros2_ws/src ros2 pkg create --build-type ament_cmake lidar_cv_visualizer --dependencies rclcpp sensor_msgs opencv cd ~/ros2_ws colcon build --packages-select lidar_cv_visualizer source install/setup.bash ros2 run lidar_cv_visualizer cv_visualizer |
/lidar_cv_visualizer/src/lidar_cv_node.cpp
| #include "rclcpp/rclcpp.hpp" #include "sensor_msgs/msg/laser_scan.hpp" #include "opencv2/opencv.hpp" #include <cmath> #define WIDTH 600 #define HEIGHT 600 #define MAX_RANGE 40.0 // 시각화할 최대 거리 (미터) #define SCALE ((WIDTH / 2.0) / MAX_RANGE) class LidarCvNode : public rclcpp::Node { public: LidarCvNode() : Node("lidar_cv_visualizer") { // 1. QoS 설정 (SensorDataQoS는 보통 Best Effort 방식) auto qos_profile = rclcpp::SensorDataQoS(); // 2. Subscriber 생성 subscription_ = this->create_subscription<sensor_msgs::msg::LaserScan>( "/scan", qos_profile, std::bind(&LidarCvNode::scan_callback, this, std::placeholders::_1)); // 3. OpenCV 창 및 비디오 저상 설정 cv::namedWindow("Lidar_point", cv::WINDOW_AUTOSIZE); std::string filename = "lidar_record.mp4"; int fcc = cv::VideoWriter::fourcc('m','p','4','v'); video_writer_.open(filename, fcc, 20.0, cv::Size(WIDTH, HEIGHT)); RCLCPP_INFO(this->get_logger(), "LiDAR OpenCV Node Started. Max Range: %.1fm", MAX_RANGE); } ~LidarCvNode() { if (video_writer_.isOpened()) video_writer_.release(); cv::destroyAllWindows(); } private: void scan_callback(const sensor_msgs::msg::LaserScan::SharedPtr msg) { // 배경을 흰색으로 초기화 cv::Mat img = cv::Mat(HEIGHT, WIDTH, CV_8UC3, cv::Scalar(255, 255, 255)); int center_x = WIDTH / 2; int center_y = HEIGHT / 2; // 기준점(차량 위치) 그리기 cv::circle(img, cv::Point(center_x, center_y), 5, cv::Scalar(255, 0, 0), -1); // LiDAR 데이터 처리 (단일 루프로 최적화) for (size_t i = 0; i < msg->ranges.size(); i++) { float r = msg->ranges[i]; // 유효하지 않은 거리값 제외 if (r < msg->range_min || r > msg->range_max || std::isinf(r) || std::isnan(r)) { continue; } float angle = msg->angle_min + msg->angle_increment * i; // 좌표 변환 (ROS 표준: x 앞, y 왼쪽 -> OpenCV: x 오른쪽, y 아래) // ROS x축(정면)은 OpenCV의 위쪽 방향이 되어야 함 float x = r * std::cos(angle); float y = r * std::sin(angle); // 이미지 좌표계 매핑 (단위: 픽셀) int px = center_x - (int)(y * SCALE); // y를 왼쪽/오른쪽으로 반전 int py = center_y - (int)(x * SCALE); // x를 위/아래로 반전 // 화면 범위 내에 있는 경우만 그리기 if (px >= 0 && px < WIDTH && py >= 0 && py < HEIGHT) { cv::circle(img, cv::Point(px, py), 2, cv::Scalar(0, 0, 255), -1); } } // 결과 화면 표시 cv::imshow("Lidar_point", img); if (video_writer_.isOpened()) { video_writer_.write(img); } cv::waitKey(1); // GUI 갱신을 위해 필수 } rclcpp::Subscription<sensor_msgs::msg::LaserScan>::SharedPtr subscription_; cv::VideoWriter video_writer_; }; int main(int argc, char **argv) { rclcpp::init(argc, argv); auto node = std::make_shared<LidarCvNode>(); rclcpp::spin(node); rclcpp::shutdown(); return 0; } |
lidar_cv_visualizer/CMakeLists.txt
| cmake_minimum_required(VERSION 3.8) project(lidar_cv_visualizer) if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() # 의존성 찾기 find_package(ament_cmake REQUIRED) find_package(rclcpp REQUIRED) find_package(sensor_msgs REQUIRED) find_package(OpenCV REQUIRED) # 실행 파일 생성 add_executable(cv_visualizer src/lidar_cv_node.cpp) # 헤더 경로 및 라이브러리 연결 target_include_directories(cv_visualizer PRIVATE ${OpenCV_INCLUDE_DIRS}) ament_target_dependencies(cv_visualizer rclcpp sensor_msgs) target_link_libraries(cv_visualizer ${OpenCV_LIBRARIES}) # 설치 경로 설정 install(TARGETS cv_visualizer DESTINATION lib/${PROJECT_NAME}) ament_package() |
다음검색