Writing an action server and client(Python)
Background
action은 ROS2에서 비동기식 커뮤니케이션 형태이다. client는 request를 server에게 보내고, server는 feedback과 result를 client에게 보낸다.
Tasks
Writing an action server
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
result = Fibonacci.Result()
return result
def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
if __name__ == '__main__':
main()
Node의 하위클래스인 FibonacciActionServer를 정의한다.
Node를 호출하여 fibonacci_action_server로 지정하여 초기화된다.
super().__init__('fibonacci_action_server')
또한 새로운 action server를 정의한다.
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
해당 action server는 4개의 argument가 필요하다.
1. self : action client를 추가하는 ROS2 노드
2. Fibonacci : action type
3. fibonacci : action name
4. self.execute_callback : callbackfunction, 이는 result message를 return해야 한다.
execute_callback를 정의하고, 이는 목표를 실행하기 위해 호출된다.
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
result = Fibonacci.Result()
return result
이제 실행해보자.
python3 fibonacci_action_server.py
다른 터미널을 실행해 command line interface를 통해 목표를 전달할 수 있다.
ros2 action send_goal fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"
action server가 실행중인 터미널에서 'Executing goal...' message와 함께 목표상태가 설정되지 않았다는 경고가 표시된다.
기본적으로 목표가 callback에서 목표핸들 상태가 설정되지 않은 경우 aborted state로 가정한다.
succeed()를 사용하여 목표가 성공적으로 끝났음을 나타낼 수 있다.
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
goal_handle.succeed()
result = Fibonacci.Result()
return result
action server를 재시작하거나 다른 목표를 보낸다면, 목표가 끝났음을 알리는 상태 SUCCEEDED를 봐야 한다.
이제 실제로 계산하고 피보나치수열 계산값을 return 해보자.
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
sequence = [0, 1]
for i in range(1, goal_handle.request.order):
sequence.append(sequence[i] + sequence[i-1])
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = sequence
return result
계산이 끝난 후 return하기 전에 결과 메세지를 배정한다.
다시, action server를 재시작하고 다른 목표를 계산해보자. 적절한 결과로 목표가 끝났음을 볼 수 있다.
Publishing feedback
실행되는 동안 action client에게 ㄹeedback을 보내는 것은 좋은 방법이 될 수 있다. publish_feedback방법을 호출함으로써 feedback을 줄 수 있다.
sequence 변수를 대체하여 feedback메세지를 줄 수 있다. 매번 feedback message가 업데이트 된 후 feedback message가 게시하고 sleep한다.
import time
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1]
for i in range(1, goal_handle.request.order):
feedback_msg.partial_sequence.append(
feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i-1])
self.get_logger().info('Feedback: {0}'.format(feedback_msg.partial_sequence))
goal_handle.publish_feedback(feedback_msg)
time.sleep(1)
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = feedback_msg.partial_sequence
return result
def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
if __name__ == '__main__':
main()
action server 재시작 후 command line tool의 --feedback옵션을 통해 feedback을 확인 할 수 있다.
ros2 action send_goal --feedback fibonacci action_tutorials_interfaces/action/Fibonacci "{order: 5}"
Writing an action client
action client도 단독파일로 만들 수 있는데 새로운 파일(fibonacci_action_client.py)에 아래 내용을 넣자.
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
return self._action_client.send_goal_async(goal_msg)
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
future = action_client.send_goal(10)
rclpy.spin_until_future_complete(action_client, future)
if __name__ == '__main__':
main()
Node의 하위클래스인 FibonacciActionClient를 정의하고, 이는 Node를 호출함으로써 fibonacci_action_client로 정의하여 초기화한다.
super().__init__('fibonacci_action_client')
이전에 정의했던 action을 사용하여 action client를 만든다.
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
ActionClient는 3가지 argument가 필요하다.
1. self : action client를 추가하는 ROS2 노드
2. Fibonacci : action type
3. fibonacci : action name
action client는 같은 이름, 타입의 action server와 communication 한다.
send_goal을 정의하는데 이는 action server가 사용가능할 때까지 기다린다. 그 후 목표를 전송하며, 기다린 후 return 가능하다.
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
return self._action_client.send_goal_async(goal_msg)
그 후 main()을 초기화하고, FibonacciActionClient 노드를 생성한다. 이는 목표를 전송하고 목표가 완료되기를 기다린다.
마지막에 main()을 호출한다.
이제 확인해보자.
python3 fibonacci_action_server.py
python3 fibonacci_action_client.py
Getting a result
목표가 완료되는 것을 확인할 수 있는 방법을 알아보자.
우선 goal handle을 얻어야 한다. 그래야 결과를 요청하기 위한 handle이 가능해진다.
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10)
rclpy.spin(action_client)
if __name__ == '__main__':
main()
ActionClient.send_goal_async()는 goal handle에 future를 return할 수 있다.
우선 future이 완료되면 callback을 등록한다.
self._send_goal_future.add_done_callback(self.goal_response_callback)
action server는 승인하거나 거절하는 request를 보내면 future는 완료된다.
goal_response_callback은 목표가 거절되면 결과가 없으니 일찌감치 알려준다.
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
get_result_async()를 통해 요청한 결과를 사용할 수 있다. 유사하게 목표를 전송하면, 결과가 준비됐을 때 future를 얻을 수 있다.
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
callback에서 결과를 log하고, ROS2를 종료한다.
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
받아들인 목표에 대한 최종 결과가 기록된 메세지들을 확인할 수 있다.
python3 fibonacci_action_client.py
Getting feedback
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))
def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10)
rclpy.spin(action_client)
if __name__ == '__main__':
main()
callback함수를 보면, feedback portion을 얻고, partial_sequence를 출력한다.
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))
callback을 action client에 등록하고 목표를 보낼 때 callback을 action client에 보냄으로써 성취할 수 있다.
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
ROS2_Humble Documentation : https://docs.ros.org/en/humble/Tutorials/Intermediate.html