You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Since the solution is simpler, I did not create any design docs with diagram, instead directly came up with a working PoC.
I have implemented it as in the code below. I will raise a PR once the draft is approved in terms of the design. Renaming of class name, variables and so on can be handled as part of the code review.
importasyncioimportrclpyfromrclpy.nodeimportNodefromlaunchimportActionimportloggingimportsignalclassWaitForTopic(Action):
"""Custom Action that waits for a topic to become available."""def__init__(self, topic_name: str):
super().__init__()
self.topic_name=topic_nameself.logger=logging.getLogger('WaitForTopic')
self.logger.setLevel(logging.INFO)
defexecute(self, context):
"""Wait for the topic to be available before continuing."""self.logger.info(f"Starting to wait for topic '{self.topic_name}'...")
rclpy.init(args=None)
self.node=rclpy.create_node("topic_waiter")
# Create a completion futureself.future=asyncio.get_event_loop().create_future()
# Set up signal handler for graceful shutdownsignal.signal(signal.SIGINT, self._shutdown_handler)
defcheck_topic():
topics= [t[0] fortinself.node.get_topic_names_and_types()]
ifself.topic_nameintopics:
self.logger.info(f"Topic '{self.topic_name}' is available.")
ifnotself.future.done():
self.future.set_result(True)
else:
self.logger.info(f"Waiting for topic '{self.topic_name}'...")
# Periodically check for the topicself.node.create_timer(0.5, check_topic)
# Run the asyncio event loop until future completesasyncio.ensure_future(self._wait_for_future())
context.add_completion_future(self.future)
return []
asyncdef_wait_for_future(self):
"""Wait until the topic is available and clean up resources."""try:
awaitself.futureself.logger.info(f"Topic '{self.topic_name}' found, proceeding...")
finally:
self._cleanup()
def_shutdown_handler(self, signum, frame):
"""Handle SIGINT for graceful shutdown."""self.logger.info("Received SIGINT, shutting down...")
ifnotself.future.done():
self.future.set_exception(asyncio.CancelledError())
def_cleanup(self):
"""Shutdown rclpy properly and destroy the node."""self.node.destroy_node()
rclpy.shutdown()
Tests
I have tested it by developing the below launch file and running demo nodes. The same will provided in the examples directory for reference. If there is any other tests to be done, kindly feel free to let me know.
importosfromlaunchimportLaunchDescriptionfromlaunch_ros.actionsimportNodefromlaunch_ros.actionsimportWaitForTopicdefgenerate_launch_description():
# Nodes for the camera and processingtalker=Node(
package='demo_nodes_cpp',
executable='talker',
name='talker',
)
listener=Node(
package='demo_nodes_cpp',
executable='listener',
name='listener',
)
# Wait for the topic '/chatter' before launching the processing nodewait_for_a_topic=WaitForTopic('/chatter')
returnLaunchDescription([
talker,
wait_for_a_topic , # Action that waits for the topiclistener# Node that will be launched after topic availability
])
Issue Assignment
This issue can be assigned to me.
The text was updated successfully, but these errors were encountered:
RobotBramhana
changed the title
Feature request: Launch action to make a node wait for its predecessor to publish a topic
Feature request: Launch action to make a node wait for its predecessor to publish a topic before starting dependent nodes
Jan 27, 2025
Background
This reference to the issue opened here
Solution
Since the solution is simpler, I did not create any design docs with diagram, instead directly came up with a working PoC.
I have implemented it as in the code below. I will raise a PR once the draft is approved in terms of the design. Renaming of class name, variables and so on can be handled as part of the code review.
Tests
I have tested it by developing the below launch file and running demo nodes. The same will provided in the examples directory for reference. If there is any other tests to be done, kindly feel free to let me know.
Issue Assignment
This issue can be assigned to me.
The text was updated successfully, but these errors were encountered: