ROS2

[ROS2] Tutorial Beginner : Creating custom msg and srv files

씨주 2024. 10. 28. 09:36

Creating custom msg and srv files

Tasks

Create a new package

.msg, .srv를 만들어 각각의 패키지에 생성할 것이다. 패키지는 같은 workspace에 있어야 한다.

pub/sub, service/client 패키지를 생성할 것이기 때문에 이또한 같은 패키지에 생성해야 한다.

ros2 pkg create --build-type ament_cmake --license Apache-2.0 tutorial_interfaces

 

tutorial_interface는 CMake기반의 패키지이지만 message, service를 사용할 수 있는 패키지 유형에 제한은 없다.

CMake패키지에서 custom interface를 만들고 Python node를 사용할 수 있다.

.msg, .srv파일은 msg, srv directory에 있어야 한다.

# ros2_ws/src/tutorial_interfaces
mkdir msg srv

 

Create custom definition

tutorial_interfaces/msg에 Num.msg를 만든 후 아래처럼 data를 정의한다.

int64를 가지는 'num' 변수를 지정하는 것이다.

int64 num

 

Sphere.msg를 만들어 아래와 같이 기입한다.

geometry_msgs/Point message로부터 center 변수를 받는다.

geometry_msgs/Point center
float64 radius

 

srv경로에 AddThreeInts.srv를 만들어 request와 response를 정의한다.

int64 a
int64 b
int64 c
---
int64 sum

 

interface를 python 코드로 변환하여 사용하기 위해 CMakeLists.txt에 아래 내용을 추가한다.

# ros2_ws/src/tutorial_interfaces
find_package(geometry_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/Num.msg"
  "msg/Sphere.msg"
  "srv/AddThreeInts.srv"
  DEPENDENCIES geometry_msgs # Add packages that above messages depend on, in this case geometry_msgs for Sphere.msg
)

 

ament_package() 위에 작성해야 한다.

 

interface는 rosidl(ROS Interface Definition Language)_default_generators에 의존하기 때문에, dependency를 정의해줘야 한다.

rosidl_default_runtime은 나중에 interface 사용에 필요한 runtime이나 execution stage dependency이다.

rosidl_interface_packages는 package(tutorial_interfaces의 패키지)의 dependency group이름으로, 이를 통해 msg와 srv를 관리한다.

아래의 내용을 package.xml에 추가한다. 

# ros2_ws/src/tutorial_interfaces

<depend>geometry_msgs</depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

 

Build the tutorial_interfaces package

이제 package를 build 해보자.

workspace(~/ros2_ws)에서 아래 command를 실행하면 된다.

colcon build --packages-select tutorial_interfaces

 

build하고 난 후에는 꼭 sourcing 하기!

source install/setup.bash

 

Confirm msg and srv creation

새로운 터미널창을 열어 msg, srv가 잘 생성됐는지 확인할 수 있다.

ros2 interface show tutorial_interfaces/msg/Num

 

ros2 interface show tutorial_interfaces/msg/Sphere

 

ros2 interface show tutorial_interfaces/srv/AddThreeInts

 

Testing Num.msg with pub/sub

이제 Num.msg를 사용할 수 있는 publisher/subscriber를 만들어주자.

# ros2_ws/src/py_pubsub/py_pubsub/publisher_member_function.py

import rclpy
from rclpy.node import Node

from tutorial_interfaces.msg import Num                            # CHANGE


class MinimalPublisher(Node):

    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher_ = self.create_publisher(Num, 'topic', 10)  # CHANGE
        timer_period = 0.5
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = Num()                                                # CHANGE
        msg.num = self.i                                           # CHANGE
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%d"' % msg.num)       # CHANGE
        self.i += 1


def main(args=None):
    rclpy.init(args=args)

    minimal_publisher = MinimalPublisher()

    rclpy.spin(minimal_publisher)

    minimal_publisher.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()
# ros2_ws/src/py_pubsub/py_pubsub/subscriber_member_function.py

import rclpy
from rclpy.node import Node

from tutorial_interfaces.msg import Num                        # CHANGE


class MinimalSubscriber(Node):

    def __init__(self):
        super().__init__('minimal_subscriber')
        self.subscription = self.create_subscription(
            Num,                                               # CHANGE
            'topic',
            self.listener_callback,
            10)
        self.subscription

    def listener_callback(self, msg):
            self.get_logger().info('I heard: "%d"' % msg.num)  # CHANGE


def main(args=None):
    rclpy.init(args=args)

    minimal_subscriber = MinimalSubscriber()

    rclpy.spin(minimal_subscriber)

    minimal_subscriber.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

 

package.xml도 수정

# ros2_ws/src/py_pubsub

<exec_depend>tutorial_interfaces</exec_depend>

 

build 해주고

colcon build --packages-select py_pubsub

 

source 해주고

source install/setup.bash

 

각각의 terminal에 실행 한다.

ros2 run py_pubsub talker

ros2 run py_pubsub listener

 

Testing AddThreeInts.srv with service/client

이제 service/client를 통해 AddThreeInts.srv를 실행해보자.

# ros2_ws/src/py_srvcli/py_srvcli/service_member_function.py

from tutorial_interfaces.srv import AddThreeInts                                                           # CHANGE

import rclpy
from rclpy.node import Node


class MinimalService(Node):

    def __init__(self):
        super().__init__('minimal_service')
        self.srv = self.create_service(AddThreeInts, 'add_three_ints', self.add_three_ints_callback)       # CHANGE

    def add_three_ints_callback(self, request, response):
        response.sum = request.a + request.b + request.c                                                   # CHANGE
        self.get_logger().info('Incoming request\na: %d b: %d c: %d' % (request.a, request.b, request.c))  # CHANGE

        return response

def main(args=None):
    rclpy.init(args=args)

    minimal_service = MinimalService()

    rclpy.spin(minimal_service)

    rclpy.shutdown()

if __name__ == '__main__':
    main()
# ros2_ws/src/py_srvcli/py_srvcli/client_member_function.py

from tutorial_interfaces.srv import AddThreeInts                            # CHANGE
import sys
import rclpy
from rclpy.node import Node


class MinimalClientAsync(Node):

    def __init__(self):
        super().__init__('minimal_client_async')
        self.cli = self.create_client(AddThreeInts, 'add_three_ints')       # CHANGE
        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')
        self.req = AddThreeInts.Request()                                   # CHANGE

    def send_request(self):
        self.req.a = int(sys.argv[1])
        self.req.b = int(sys.argv[2])
        self.req.c = int(sys.argv[3])                                       # CHANGE
        self.future = self.cli.call_async(self.req)


def main(args=None):
    rclpy.init(args=args)

    minimal_client = MinimalClientAsync()
    minimal_client.send_request()

    while rclpy.ok():
        rclpy.spin_once(minimal_client)
        if minimal_client.future.done():
            try:
                response = minimal_client.future.result()
            except Exception as e:
                minimal_client.get_logger().info(
                    'Service call failed %r' % (e,))
            else:
                minimal_client.get_logger().info(
                    'Result of add_three_ints: for %d + %d + %d = %d' %                                # CHANGE
                    (minimal_client.req.a, minimal_client.req.b, minimal_client.req.c, response.sum))  # CHANGE
            break

    minimal_client.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

 

package.xml도 수정하고

# ros2_ws.src.py_srvcli

<exec_depend>tutorial_interfaces</exec_depend>

 

build, source 하면 적용 완료

colcon build --packages-select py_srvcli

 

source install/setup.bash

 

이제 각각의 터미널에 service/client를 실행

ros2 run py_srvcli service

ros2 run py_srvcli client 2 3 1

 

완성된 결과를 볼 수 있다!

 

 

 

ROS2_Humble Documentation : https://docs.ros.org/en/humble/Tutorials/Beginner-Client-Libraries.html