diff --git a/rosapi/package.xml b/rosapi/package.xml index 932baeaa..33db71c9 100644 --- a/rosapi/package.xml +++ b/rosapi/package.xml @@ -24,14 +24,8 @@ rcl_interfaces rosbridge_library ros2node - ros2param - ros2pkg ros2service ros2topic - ament_cmake_mypy ament_cmake_pytest diff --git a/rosapi/scripts/rosapi_node b/rosapi/scripts/rosapi_node index 3ef910b7..e61f8253 100755 --- a/rosapi/scripts/rosapi_node +++ b/rosapi/scripts/rosapi_node @@ -33,10 +33,13 @@ import os import sys +from json import dumps, loads import rclpy +from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.clock import Clock, ClockType from rclpy.node import Node +from rclpy.qos import qos_profile_parameters from rosapi import glob_helper, objectutils, params, proxy from rosapi_msgs.msg import TypeDef @@ -85,13 +88,9 @@ class Rosapi(Node): # Initialises the ROS node def register_services(self): proxy.init(self) - if self.get_namespace() == "/": - full_name = self.get_namespace() + self.get_name() - else: - full_name = self.get_namespace() + "/" + self.get_name() timeout_sec = self.get_parameter("params_timeout").value - params.init(full_name, timeout_sec) + params.init(self, timeout_sec) self.create_service(Topics, "~/topics", self.get_topics) self.create_service(Interfaces, "~/interfaces", self.get_interfaces) @@ -131,11 +130,41 @@ class Rosapi(Node): "~/service_response_details", self.get_service_response_details, ) - self.create_service(SetParam, "~/set_param", self.set_param) - self.create_service(GetParam, "~/get_param", self.get_param) - self.create_service(HasParam, "~/has_param", self.has_param) - self.create_service(DeleteParam, "~/delete_param", self.delete_param) - self.create_service(GetParamNames, "~/get_param_names", self.get_param_names) + self.create_service( + SetParam, + "~/set_param", + self.set_param, + callback_group=ReentrantCallbackGroup(), + qos_profile=qos_profile_parameters, + ) + self.create_service( + GetParam, + "~/get_param", + self.get_param, + callback_group=ReentrantCallbackGroup(), + qos_profile=qos_profile_parameters, + ) + self.create_service( + HasParam, + "~/has_param", + self.has_param, + callback_group=ReentrantCallbackGroup(), + qos_profile=qos_profile_parameters, + ) + self.create_service( + DeleteParam, + "~/delete_param", + self.delete_param, + callback_group=ReentrantCallbackGroup(), + qos_profile=qos_profile_parameters, + ) + self.create_service( + GetParamNames, + "~/get_param_names", + self.get_param_names, + callback_group=ReentrantCallbackGroup(), + qos_profile=qos_profile_parameters, + ) self.create_service(GetTime, "~/get_time", self.get_time) self.create_service(GetROSVersion, "~/get_ros_version", self.get_ros_version) @@ -262,38 +291,77 @@ class Rosapi(Node): ] return response - def set_param(self, request, response): + async def set_param(self, request, response): try: node_name, param_name = self._get_node_and_param_name(request.name) - params.set_param(node_name, param_name, request.value, self.globs.params) except ValueError: self._print_malformed_param_name_warning(request.name) + + try: + await params.set_param(node_name, param_name, request.value, self.globs.params) + except Exception as e: + response.successful = False + response.reason = str(e) + else: + response.successful = True + return response - def get_param(self, request, response): + async def get_param(self, request, response): try: node_name, param_name = self._get_node_and_param_name(request.name) - response.value = params.get_param( - node_name, param_name, request.default_value, self.globs.params - ) except ValueError: self._print_malformed_param_name_warning(request.name) + + try: + response.value = await params.get_param(node_name, param_name, self.globs.params) + except Exception as e: + response.successful = False + response.reason = str(e) + + default = "" + if request.default_value != "": + try: + default = loads(request.default_value) + except ValueError: + self.get_logger().error( + "Failed to parse default value: {}".format(request.default_value) + ) + + response.value = dumps(default) + else: + response.successful = True + return response - def has_param(self, request, response): + async def has_param(self, request, response): try: node_name, param_name = self._get_node_and_param_name(request.name) - response.exists = params.has_param(node_name, param_name, self.globs.params) except ValueError: self._print_malformed_param_name_warning(request.name) + + response.exists = await params.has_param(node_name, param_name, self.globs.params) + return response - def delete_param(self, request, response): - params.delete_param(request.node_name, request.name, self.globs.params) + async def delete_param(self, request, response): + try: + node_name, param_name = self._get_node_and_param_name(request.name) + except ValueError: + self._print_malformed_param_name_warning(request.name) + + try: + await params.delete_param(node_name, param_name, self.globs.params) + except Exception as e: + response.successful = False + response.reason = str(e) + else: + response.successful = True + return response - def get_param_names(self, request, response): - response.names = params.get_param_names(self.globs.params) + async def get_param_names(self, request, response): + response.names = await params.get_param_names(self.globs.params) return response def get_time(self, request, response): diff --git a/rosapi/src/rosapi/async_helper.py b/rosapi/src/rosapi/async_helper.py new file mode 100644 index 00000000..53596dfc --- /dev/null +++ b/rosapi/src/rosapi/async_helper.py @@ -0,0 +1,71 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2025, Fictionlab sp. z o.o. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +from rclpy.node import Node +from rclpy.task import Future + + +async def futures_wait_for(node: Node, futures: list[Future], timeout_sec: float): + """Await a list of futures with a timeout.""" + first_done_future: Future = Future() + + def timeout_callback(): + first_done_future.set_result(None) + + timer = node.create_timer(timeout_sec, timeout_callback) + + def future_done_callback(arg): + if all(future.done() for future in futures): + first_done_future.set_result(None) + + for future in futures: + future.add_done_callback(future_done_callback) + + await first_done_future + + timer.cancel() + timer.destroy() + + +async def async_sleep(node: Node, delay_sec: float): + """Block the coroutine for a given time.""" + sleep_future: Future = Future() + + def timeout_callback(): + sleep_future.set_result(None) + + timer = node.create_timer(delay_sec, timeout_callback) + + await sleep_future + + timer.cancel() + timer.destroy() diff --git a/rosapi/src/rosapi/params.py b/rosapi/src/rosapi/params.py index 51513f14..443e60b1 100644 --- a/rosapi/src/rosapi/params.py +++ b/rosapi/src/rosapi/params.py @@ -1,6 +1,7 @@ # Software License Agreement (BSD License) # # Copyright (c) 2012, Willow Garage, Inc. +# Copyright (c) 2025, Fictionlab sp. z o.o. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -31,15 +32,16 @@ # POSSIBILITY OF SUCH DAMAGE. import fnmatch -import threading from json import dumps, loads -import rclpy from rcl_interfaces.msg import Parameter, ParameterType, ParameterValue -from rcl_interfaces.srv import ListParameters +from rcl_interfaces.srv import GetParameters, ListParameters, SetParameters +from rclpy.callback_groups import MutuallyExclusiveCallbackGroup +from rclpy.node import Node from rclpy.parameter import get_parameter_value +from rclpy.task import Future from ros2node.api import get_absolute_node_name -from ros2param.api import call_get_parameters, call_set_parameters +from rosapi.async_helper import futures_wait_for from rosapi.proxy import get_nodes """ Methods to interact with the param server. Values have to be passed @@ -48,10 +50,7 @@ # Constants DEFAULT_PARAM_TIMEOUT_SEC = 5.0 -# Ensure thread safety for setting / getting parameters. -param_server_lock = threading.RLock() _node = None -_parent_node_name = "" _timeout_sec = DEFAULT_PARAM_TIMEOUT_SEC _parameter_type_mapping = [ @@ -68,29 +67,20 @@ ] -def init(parent_node_name, timeout_sec=DEFAULT_PARAM_TIMEOUT_SEC): +def init(node: Node, timeout_sec: float | int = DEFAULT_PARAM_TIMEOUT_SEC): """ Initializes params module with a rclpy.node.Node for further use. This function has to be called before any other for the module to work. """ - global _node, _parent_node_name, _timeout_sec - # TODO(@jubeira): remove this node; use rosapi node with MultiThreadedExecutor or - # async / await to prevent the service calls from blocking. - parent_node_basename = parent_node_name.split("/")[-1] - param_node_name = f"{parent_node_basename}_params" - _node = rclpy.create_node( - param_node_name, - cli_args=["--ros-args", "-r", f"__node:={param_node_name}"], - start_parameter_services=False, - ) - _parent_node_name = get_absolute_node_name(parent_node_name) + global _node, _timeout_sec + _node = node if not isinstance(timeout_sec, (int, float)) or timeout_sec <= 0: raise ValueError("Parameter timeout must be a positive number") _timeout_sec = timeout_sec -def set_param(node_name, name, value, params_glob): +async def set_param(node_name: str, name: str, value: str, params_glob: list[str]): """Sets a parameter in a given node""" if params_glob and not any(fnmatch.fnmatch(str(name), glob) for glob in params_glob): @@ -109,11 +99,10 @@ def set_param(node_name, name, value, params_glob): ) node_name = get_absolute_node_name(node_name) - with param_server_lock: - _set_param(node_name, name, value) + await _set_param(node_name, name, value) -def _set_param(node_name, name, value, parameter_type=None): +async def _set_param(node_name: str, name: str, value: str, parameter_type=None): """ Internal helper function for set_param. Attempts to set the given parameter in the target node with the desired value, @@ -128,47 +117,93 @@ def _set_param(node_name, name, value, parameter_type=None): parameter.value = ParameterValue() parameter.value.type = parameter_type if parameter_type != ParameterType.PARAMETER_NOT_SET: - setattr(parameter.value, _parameter_type_mapping[parameter_type]) + setattr(parameter.value, _parameter_type_mapping[parameter_type], loads(value)) - try: - # call_get_parameters will fail if node does not exist. - call_set_parameters(node=_node, node_name=node_name, parameters=[parameter]) - except Exception: - pass + assert _node is not None + client = _node.create_client( + SetParameters, + f"{node_name}/set_parameters", + callback_group=MutuallyExclusiveCallbackGroup(), + ) + + if not client.service_is_ready(): + _node.destroy_client(client) + raise Exception(f"Service {client.srv_name} is not available") + + request = SetParameters.Request() + request.parameters = [parameter] + + future = client.call_async(request) + + await futures_wait_for(_node, [future], _timeout_sec) + + _node.destroy_client(client) + if not future.done(): + future.cancel() + raise Exception("Timeout occurred") -def get_param(node_name, name, default, params_glob): + result = future.result() + + assert result is not None + if not result.results[0].successful: + raise Exception(result.results[0].reason) + + +async def get_param(node_name: str, name: str, params_glob: str) -> str: """Gets a parameter from a given node""" if params_glob and not any(fnmatch.fnmatch(str(name), glob) for glob in params_glob): # If the glob list is not empty and there are no glob matches, # stop the attempt to get the parameter. - return + raise Exception(f"Parameter {name} does not match any of the glob strings") # If the glob list is empty (i.e. false) or the parameter matches # one of the glob strings, continue to get the parameter. - if default != "": - try: - default = loads(default) - except ValueError: - pass # Keep default without modifications. node_name = get_absolute_node_name(node_name) - with param_server_lock: - try: - # call_get_parameters will fail if node does not exist. - response = call_get_parameters(node=_node, node_name=node_name, parameter_names=[name]) - pvalue = response.values[0] - # if type is 0 (parameter not set), the next line will raise an exception - # and return value shall go to default. - value = getattr(pvalue, _parameter_type_mapping[pvalue.type]) - except Exception: - # If either the node or the parameter does not exist, return default. - value = default + pvalue = await _get_param(node_name, name) + value = getattr(pvalue, _parameter_type_mapping[pvalue.type]) return dumps(value) -def has_param(node_name, name, params_glob): +async def _get_param(node_name: str, name: str) -> ParameterValue: + """Internal helper function for get_param""" + + assert _node is not None + client = _node.create_client( + GetParameters, + f"{node_name}/get_parameters", + callback_group=MutuallyExclusiveCallbackGroup(), + ) + + if not client.service_is_ready(): + _node.destroy_client(client) + raise Exception(f"Service {client.srv_name} is not available") + + request = GetParameters.Request() + request.names = [name] + + future = client.call_async(request) + + await futures_wait_for(_node, [future], _timeout_sec) + + _node.destroy_client(client) + + if not future.done(): + future.cancel() + raise Exception("Timeout occurred") + + result = future.result() + + assert result is not None + if len(result.values) == 0: + raise Exception(f"Parameter {name} not found") + + return result.values[0] + + +async def has_param(node_name: str, name: str, params_glob: str) -> bool: """Checks whether a given node has a parameter or not""" if params_glob and not any(fnmatch.fnmatch(str(name), glob) for glob in params_glob): @@ -178,16 +213,15 @@ def has_param(node_name, name, params_glob): # If the glob list is empty (i.e. false) or the parameter matches # one of the glob strings, check whether the parameter exists. node_name = get_absolute_node_name(node_name) - with param_server_lock: - try: - response = call_get_parameters(node=_node, node_name=node_name, parameter_names=[name]) - except Exception: - return False + try: + pvalue = await _get_param(node_name, name) + except Exception: + return False - return response.values[0].type > 0 and response.values[0].type < len(_parameter_type_mapping) + return 0 < pvalue.type < len(_parameter_type_mapping) -def delete_param(node_name, name, params_glob): +async def delete_param(node_name, name, params_glob): """Deletes a parameter in a given node""" if params_glob and not any(fnmatch.fnmatch(str(name), glob) for glob in params_glob): @@ -197,60 +231,55 @@ def delete_param(node_name, name, params_glob): # If the glob list is empty (i.e. false) or the parameter matches # one of the glob strings, continue to delete the parameter. node_name = get_absolute_node_name(node_name) - if has_param(node_name, name, params_glob): - with param_server_lock: - _set_param(node_name, name, None, ParameterType.PARAMETER_NOT_SET) + if await has_param(node_name, name, params_glob): + await _set_param(node_name, name, None, ParameterType.PARAMETER_NOT_SET) -def get_param_names(params_glob): - params = [] - nodes = get_nodes() - - for node in nodes: - params.extend(get_node_param_names(node, params_glob)) - - return params +async def get_param_names(params_glob: str | None) -> list[str]: + assert _node is not None + nodes = [get_absolute_node_name(node) for node in get_nodes()] -def get_node_param_names(node_name, params_glob): - """Gets list of parameter names for a given node""" - node_name = get_absolute_node_name(node_name) + futures: list[tuple[str, Future]] = [] + clients = [] + for node_name in nodes: + if node_name == _node.get_fully_qualified_name(): + continue - with param_server_lock: - if params_glob: - # If there is a parameter glob, filter by it. - return list( - filter( - lambda x: any(fnmatch.fnmatch(str(x), glob) for glob in params_glob), - _get_param_names(node_name), - ) - ) + client = _node.create_client( + ListParameters, + f"{node_name}/list_parameters", + callback_group=MutuallyExclusiveCallbackGroup(), + ) + if client.service_is_ready(): + future = client.call_async(ListParameters.Request()) + futures.append((node_name, future)) + clients.append(client) else: - # If there is no parameter glob, don't filter. - return _get_param_names(node_name) + _node.destroy_client(client) + params = [] -def _get_param_names(node_name): - # This method is called in a service callback; calling a service of the same node - # will cause a deadlock. - global _parent_node_name - if node_name == _parent_node_name or node_name == _node.get_fully_qualified_name(): - return [] + await futures_wait_for(_node, [future for _, future in futures], _timeout_sec) - client = _node.create_client(ListParameters, f"{node_name}/list_parameters") + for client in clients: + _node.destroy_client(client) - if not client.service_is_ready(): - return [] + for node_name, future in futures: + if not future.done(): + future.cancel() + continue - request = ListParameters.Request() - future = client.call_async(request) - if _node.executor: - _node.executor.spin_until_future_complete(future, timeout_sec=_timeout_sec) - else: - rclpy.spin_until_future_complete(_node, future, timeout_sec=_timeout_sec) - response = future.result() + if future.exception() is not None: + continue - if response is not None: - return [f"{node_name}:{param_name}" for param_name in response.result.names] + result = future.result() + if result is not None: + params.extend([f"{node_name}:{param_name}" for param_name in result.result.names]) + + if params_glob: + return list( + filter(lambda x: any(fnmatch.fnmatch(str(x), glob) for glob in params_glob), params) + ) else: - return [] + return params diff --git a/rosapi_msgs/srv/DeleteParam.srv b/rosapi_msgs/srv/DeleteParam.srv index 22a97d59..11719846 100644 --- a/rosapi_msgs/srv/DeleteParam.srv +++ b/rosapi_msgs/srv/DeleteParam.srv @@ -1,2 +1,4 @@ string name --- +bool successful +string reason diff --git a/rosapi_msgs/srv/GetParam.srv b/rosapi_msgs/srv/GetParam.srv index cdbf3fd1..83f432b0 100644 --- a/rosapi_msgs/srv/GetParam.srv +++ b/rosapi_msgs/srv/GetParam.srv @@ -2,3 +2,5 @@ string name string default_value --- string value +bool successful +string reason diff --git a/rosapi_msgs/srv/SetParam.srv b/rosapi_msgs/srv/SetParam.srv index 426e15ce..2aeadc61 100644 --- a/rosapi_msgs/srv/SetParam.srv +++ b/rosapi_msgs/srv/SetParam.srv @@ -1,3 +1,5 @@ string name string value --- +bool successful +string reason