Creating custom msg and srv files
Tasks
Create a new package
.msg, .srv를 만들어 각각의 패키지에 생성할 것이다. 패키지는 같은 workspace에 있어야 한다.
pub/sub, service/client 패키지를 생성할 것이기 때문에 이또한 같은 패키지에 생성해야 한다.
ros2 pkg create --build-type ament_cmake --license Apache-2.0 tutorial_interfaces
tutorial_interface는 CMake기반의 패키지이지만 message, service를 사용할 수 있는 패키지 유형에 제한은 없다.
CMake패키지에서 custom interface를 만들고 Python node를 사용할 수 있다.
.msg, .srv파일은 msg, srv directory에 있어야 한다.
# ros2_ws/src/tutorial_interfaces
mkdir msg srv
Create custom definition
tutorial_interfaces/msg에 Num.msg를 만든 후 아래처럼 data를 정의한다.
int64를 가지는 'num' 변수를 지정하는 것이다.
int64 num
Sphere.msg를 만들어 아래와 같이 기입한다.
geometry_msgs/Point message로부터 center 변수를 받는다.
geometry_msgs/Point center
float64 radius
srv경로에 AddThreeInts.srv를 만들어 request와 response를 정의한다.
int64 a
int64 b
int64 c
---
int64 sum
interface를 python 코드로 변환하여 사용하기 위해 CMakeLists.txt에 아래 내용을 추가한다.
# ros2_ws/src/tutorial_interfaces
find_package(geometry_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/Num.msg"
"msg/Sphere.msg"
"srv/AddThreeInts.srv"
DEPENDENCIES geometry_msgs # Add packages that above messages depend on, in this case geometry_msgs for Sphere.msg
)
ament_package() 위에 작성해야 한다.
interface는 rosidl(ROS Interface Definition Language)_default_generators에 의존하기 때문에, dependency를 정의해줘야 한다.
rosidl_default_runtime은 나중에 interface 사용에 필요한 runtime이나 execution stage dependency이다.
rosidl_interface_packages는 package(tutorial_interfaces의 패키지)의 dependency group이름으로, 이를 통해 msg와 srv를 관리한다.
아래의 내용을 package.xml에 추가한다.
# ros2_ws/src/tutorial_interfaces
<depend>geometry_msgs</depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
Build the tutorial_interfaces package
이제 package를 build 해보자.
workspace(~/ros2_ws)에서 아래 command를 실행하면 된다.
colcon build --packages-select tutorial_interfaces
build하고 난 후에는 꼭 sourcing 하기!
source install/setup.bash
Confirm msg and srv creation
새로운 터미널창을 열어 msg, srv가 잘 생성됐는지 확인할 수 있다.
ros2 interface show tutorial_interfaces/msg/Num
ros2 interface show tutorial_interfaces/msg/Sphere
ros2 interface show tutorial_interfaces/srv/AddThreeInts
Testing Num.msg with pub/sub
이제 Num.msg를 사용할 수 있는 publisher/subscriber를 만들어주자.
# ros2_ws/src/py_pubsub/py_pubsub/publisher_member_function.py
import rclpy
from rclpy.node import Node
from tutorial_interfaces.msg import Num # CHANGE
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(Num, 'topic', 10) # CHANGE
timer_period = 0.5
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = Num() # CHANGE
msg.num = self.i # CHANGE
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%d"' % msg.num) # CHANGE
self.i += 1
def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
minimal_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
# ros2_ws/src/py_pubsub/py_pubsub/subscriber_member_function.py
import rclpy
from rclpy.node import Node
from tutorial_interfaces.msg import Num # CHANGE
class MinimalSubscriber(Node):
def __init__(self):
super().__init__('minimal_subscriber')
self.subscription = self.create_subscription(
Num, # CHANGE
'topic',
self.listener_callback,
10)
self.subscription
def listener_callback(self, msg):
self.get_logger().info('I heard: "%d"' % msg.num) # CHANGE
def main(args=None):
rclpy.init(args=args)
minimal_subscriber = MinimalSubscriber()
rclpy.spin(minimal_subscriber)
minimal_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
package.xml도 수정
# ros2_ws/src/py_pubsub
<exec_depend>tutorial_interfaces</exec_depend>
build 해주고
colcon build --packages-select py_pubsub
source 해주고
source install/setup.bash
각각의 terminal에 실행 한다.
ros2 run py_pubsub talker
ros2 run py_pubsub listener
Testing AddThreeInts.srv with service/client
이제 service/client를 통해 AddThreeInts.srv를 실행해보자.
# ros2_ws/src/py_srvcli/py_srvcli/service_member_function.py
from tutorial_interfaces.srv import AddThreeInts # CHANGE
import rclpy
from rclpy.node import Node
class MinimalService(Node):
def __init__(self):
super().__init__('minimal_service')
self.srv = self.create_service(AddThreeInts, 'add_three_ints', self.add_three_ints_callback) # CHANGE
def add_three_ints_callback(self, request, response):
response.sum = request.a + request.b + request.c # CHANGE
self.get_logger().info('Incoming request\na: %d b: %d c: %d' % (request.a, request.b, request.c)) # CHANGE
return response
def main(args=None):
rclpy.init(args=args)
minimal_service = MinimalService()
rclpy.spin(minimal_service)
rclpy.shutdown()
if __name__ == '__main__':
main()
# ros2_ws/src/py_srvcli/py_srvcli/client_member_function.py
from tutorial_interfaces.srv import AddThreeInts # CHANGE
import sys
import rclpy
from rclpy.node import Node
class MinimalClientAsync(Node):
def __init__(self):
super().__init__('minimal_client_async')
self.cli = self.create_client(AddThreeInts, 'add_three_ints') # CHANGE
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = AddThreeInts.Request() # CHANGE
def send_request(self):
self.req.a = int(sys.argv[1])
self.req.b = int(sys.argv[2])
self.req.c = int(sys.argv[3]) # CHANGE
self.future = self.cli.call_async(self.req)
def main(args=None):
rclpy.init(args=args)
minimal_client = MinimalClientAsync()
minimal_client.send_request()
while rclpy.ok():
rclpy.spin_once(minimal_client)
if minimal_client.future.done():
try:
response = minimal_client.future.result()
except Exception as e:
minimal_client.get_logger().info(
'Service call failed %r' % (e,))
else:
minimal_client.get_logger().info(
'Result of add_three_ints: for %d + %d + %d = %d' % # CHANGE
(minimal_client.req.a, minimal_client.req.b, minimal_client.req.c, response.sum)) # CHANGE
break
minimal_client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
package.xml도 수정하고
# ros2_ws.src.py_srvcli
<exec_depend>tutorial_interfaces</exec_depend>
build, source 하면 적용 완료
colcon build --packages-select py_srvcli
source install/setup.bash
이제 각각의 터미널에 service/client를 실행
ros2 run py_srvcli service
ros2 run py_srvcli client 2 3 1
완성된 결과를 볼 수 있다!
ROS2_Humble Documentation : https://docs.ros.org/en/humble/Tutorials/Beginner-Client-Libraries.html