Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SW-957] make decorator for trigger services #578

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions spot_driver/spot_driver/ros_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
from cv_bridge import CvBridge
from geometry_msgs.msg import TransformStamped
from google.protobuf.timestamp_pb2 import Timestamp
from rclpy.callback_groups import CallbackGroup
from rclpy.node import Node
from sensor_msgs.msg import CameraInfo, CompressedImage, Image
from std_srvs.srv import Trigger
from tf2_msgs.msg import TFMessage

from spot_wrapper.wrapper import SpotWrapper
Expand Down Expand Up @@ -328,3 +330,46 @@ def lookup_a_tform_b(
raise e
time.sleep(0.01)
return None


class TriggerServiceWrapper: # TODO: caching of spot_ros might be funky with multiple instances up
"""A descriptor for calling callbacks for trigger services"""

service_func: Callable[[SpotWrapper], Trigger.Response]
service_name: str
args: Optional[tuple] = ()
kwargs: Optional[dict | None] = None

def __init__(
self, service_func: Callable, service_name: str, *args: Optional[Any], **kwargs: Optional[Any | None]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self, service_func: Callable, service_name: str, *args: Optional[Any], **kwargs: Optional[Any | None]
self, service_func: Callable, service_name: str, *args: Optional[Any], **kwargs: Optional[Any]

nit: does Any cover the None type?

) -> None:
self.service_func = service_func
self.service_name = service_name
self.args = args
self.kwargs = kwargs or {}

def create_service(self, obj: Any, group: CallbackGroup) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

obj: Any is pretty vague. Based on the naming as well it seems like this obj type is limited to a set of Spot objects?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya its supposed to be SpotRos object but that would cause a circular dependency. Was unsure best way around that and keep mypy happy

self.spot_ros = obj
obj.create_service(Trigger, self.service_name, self.callback, callback_group=group)

def callback(self, request: Trigger.Request, response: Trigger.Response) -> Trigger.Response:
if self.spot_ros.mock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this callback explcitly depends on SpotRos object, then does it make sense that TriggerServiceWrapper is its own class at all? Could the code duplication also be solved by moving these function to member functions of SpotRos? (and the member variables to this class can be stored as a dictionary, or wrapped in a function wrapper tool?)

The circular dependency comment suggests that there is a different way to implement this... and I'm thinking if this class specifically only works on SpotRos object, then they are functions/properties that belong to SpotRos class

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya it makes sense to put it in spotROS. I just wanted to reduce the amount of code in it since its already m a s s i v e

response.success = True
response.message = "Mock spot success"
return response

if self.spot_ros.spot_wrapper is None:
response.success = False
response.message = "Spot wrapper is undefined"
return response

try:
# Call the function with predefined arguments
response.success, response.message = self.service_func(
self.spot_ros.spot_wrapper, *self.args, **self.kwargs
) # type: ignore
except Exception as e:
response.success = False
response.message = f"Error executing {self.service_func}: {str(e)}"

return response
Loading
Loading