From ee4f6eb9da913de228e0c58b27697b11d6f86ce1 Mon Sep 17 00:00:00 2001 From: JustaGist Date: Thu, 8 Aug 2024 09:05:12 +0100 Subject: [PATCH] add data recorder debugger and corresponding data parser --- .../ctrl_loop_debuggers/__init__.py | 1 + .../data_recorder_debugger.py | 104 +++++++++ pyrcf/utils/data_io_utils/__init__.py | 1 + .../data_io_utils/recorded_data_parser.py | 206 ++++++++++++++++++ 4 files changed, 312 insertions(+) create mode 100644 pyrcf/components/ctrl_loop_debuggers/data_recorder_debugger.py create mode 100644 pyrcf/utils/data_io_utils/recorded_data_parser.py diff --git a/pyrcf/components/ctrl_loop_debuggers/__init__.py b/pyrcf/components/ctrl_loop_debuggers/__init__.py index 4979ebf..04ce36f 100644 --- a/pyrcf/components/ctrl_loop_debuggers/__init__.py +++ b/pyrcf/components/ctrl_loop_debuggers/__init__.py @@ -2,3 +2,4 @@ can be used in the control loop.""" from .ctrl_loop_debugger_base import CtrlLoopDebuggerBase, DummyDebugger +from .data_recorder_debugger import ComponentDataRecorderDebugger diff --git a/pyrcf/components/ctrl_loop_debuggers/data_recorder_debugger.py b/pyrcf/components/ctrl_loop_debuggers/data_recorder_debugger.py new file mode 100644 index 0000000..7ac8f62 --- /dev/null +++ b/pyrcf/components/ctrl_loop_debuggers/data_recorder_debugger.py @@ -0,0 +1,104 @@ +from typing import List, Tuple, Callable, Any +import pickle + +from ...core.types import LocalMotionPlan, GlobalMotionPlan, RobotCmd, RobotState +from ...utils.time_utils import ClockBase, PythonPerfClock +from .ctrl_loop_debugger_base import CtrlLoopDebuggerBase +from ...core.logging import logging + + +class ComponentDataRecorderDebugger(CtrlLoopDebuggerBase): + """A data recording debugger that records all the control loop data directly to a file. Use + `ComponentDataRecorderDataParser` from `pyrcf.utils.data_io_utils` + to parse the recorded data file. + + NOTE: if your computer has enough RAM and compute power, this debugger is better (in terms + of keeping desired control loop rate) than `ComponentDataPublisherDebugger`. + """ + + def __init__( + self, + file_name: str, + rate: float = None, + clock: ClockBase = PythonPerfClock(), + extra_data_callables: Callable[[], Any] | List[Callable[[], Any]] = None, + buffer_size: int = 50, + ): + """A data recording debugger that records all the control loop data directly to a file. Use + `ComponentDataRecorderDataParser` from `pyrcf.utils.data_io_utils` + to parse the recorded data file. + + NOTE: if your computer has enough RAM and compute power, this debugger is better (in terms + of keeping desired control loop rate) than `ComponentDataPublisherDebugger`. + + Args: + file_name (str): Path to the file to write to. + rate (float, optional): Rate at which data recording should happen. Defaults to None + (i.e. use control loop rate). + clock (ClockBase, optional): The clock to use for timer. Defaults to PythonPerfClock(). + extra_data_callables (Callable[[], Any] | List[Callable[[], Any]], optional): + Additional function handles that return data that needs to be recorded. Defaults to + None. + buffer_size (int, optional): Buffer size to be used to write to file in batches. + Defaults to 50. + + Raises: + ValueError: if invalid value in `extra_data_callables`. + """ + super().__init__(rate, clock) + + self._file = open(file_name, "wb") + + self._additional_handles = [] + if extra_data_callables is not None: + if callable(extra_data_callables): + extra_data_callables = [extra_data_callables] + if not isinstance(extra_data_callables, list): + raise ValueError( + f"{self.__class__.__name__}: Argument to this function should" + " be a callable or a list of callables." + ) + assert all( + callable(ele) for ele in extra_data_callables + ), "All elements in `extra_data_callables` should be a function handle." + for handle in extra_data_callables: + if handle not in self._additional_handles: + self._additional_handles.append(handle) + self._buffer = [] + self._buffer_size = buffer_size + + def _run_once_impl( + self, + t: float, + dt: float, + robot_state: RobotState, + global_plan: GlobalMotionPlan, + agent_outputs: List[Tuple[LocalMotionPlan, RobotCmd]], + robot_cmd: RobotCmd, + ): + self._buffer.append( + { + "t": t, + "dt": dt, + "robot_state": robot_state, + "global_plan": global_plan, + "agent_outputs": agent_outputs, + "robot_cmd": robot_cmd, + "debug_data": [_get_data() for _get_data in self._additional_handles], + } + ) + if len(self._buffer) >= self._buffer_size: + pickle.dump( + self._buffer, + self._file, + ) + self._buffer.clear() + + def shutdown(self): + if len(self._buffer) > 0: + pickle.dump( + self._buffer, + self._file, + ) + logging.info(f"Saved data to file: {self._file.name}") + self._file.close() diff --git a/pyrcf/utils/data_io_utils/__init__.py b/pyrcf/utils/data_io_utils/__init__.py index 68c495f..2135610 100644 --- a/pyrcf/utils/data_io_utils/__init__.py +++ b/pyrcf/utils/data_io_utils/__init__.py @@ -5,3 +5,4 @@ DEFAULT_ZMQ_PUBLISH_PORT, ) from .pyrcf_subscriber import PyRCFSubscriberBase, PyRCFSubscriberZMQ +from .recorded_data_parser import ComponentDataRecorderDataParser diff --git a/pyrcf/utils/data_io_utils/recorded_data_parser.py b/pyrcf/utils/data_io_utils/recorded_data_parser.py new file mode 100644 index 0000000..fb3ef79 --- /dev/null +++ b/pyrcf/utils/data_io_utils/recorded_data_parser.py @@ -0,0 +1,206 @@ +from typing import List, Mapping, Any, Literal +import functools +import pickle +import numpy as np +from numbers import Number + +from ...core.logging import logging + + +def rgetattr(obj, attr, *args): + """Recursive version of python's `getattr` method to get nested sub-object attributes. + Also tries to deal with `__getitem__` calls (e.g. dictionary keys, or indices)! + + Also handles [:] situations. E.g. "state_estiamates.end_effector_states.ee_poses[0][:].position" + will return positions of first end-effector (parsing `position` attribute of `Pose3D` across + all time stamps). + """ + + def _getattr(obj, attr): + # recursive getattr that also tries to deal with __getitem__ calls + split_at_bracket_end = attr.split("]") + if len(split_at_bracket_end) > 1: + # UGLY: should probably switch to regex + remaining = "]".join(split_at_bracket_end[1:]) + spl = attr.split("[") + str_attr = spl[0] + val = spl[1].split("]")[0] + if val.isnumeric(): + val = int(val) + n_obj = obj if str_attr == "" else getattr(obj, str_attr, *args) + if val != ":": + item = n_obj[val] + if remaining == "": + return item + return _getattr(item, remaining) + + if remaining == "": + if isinstance(obj, (list, tuple, np.ndarray)) and len(obj) == 1: + return n_obj[0] + return n_obj + return ( + [_getattr(i, remaining) for i in n_obj] + if len(n_obj) > 1 + else _getattr(n_obj[0], remaining) + ) + + else: + if isinstance(obj, (list, tuple)): + return ( + [getattr(o, attr, *args) for o in obj] + if len(obj) > 1 + else getattr(obj[0], attr, *args) + ) + if isinstance(obj, np.ndarray): + return np.array( + [getattr(o, attr, *args) for o in obj] + if len(obj) > 1 + else getattr(obj[0], attr, *args) + ) + return getattr(obj, attr, *args) + + return functools.reduce(_getattr, [obj] + attr.split(".")) + + +class ComponentDataRecorderDataParser: + """A data parser for reading file created using `ComponentDataRecorderDebugger`.""" + + def __init__(self, file_name: str, load_on_init: bool = True): + """A data parser for reading file created using `ComponentDataRecorderDebugger`. + + Args: + file_name (str): Path to the file to read. + load_on_init (bool, optional): If True, will read file and load all data to + memory during init; otherwise only when data is queried. Defaults to True. + """ + + self._filename = file_name + self._data = None + self._keys = None + self._data_length = None + if load_on_init: + self.load_data() + + def load_data(self): + self._data = { + "t": [], + "dt": [], + "robot_state": [], + "global_plan": [], + "agent_outputs": [], + "robot_cmd": [], + "debug_data": [], + } + self._keys = list(self._data.keys()) + self._data_length = 0 + logging.info("Parsing data...") + with open(self._filename, "rb") as f: + while True: + try: + data_buffer = pickle.load(f) + for data in data_buffer: + self._data["t"].append(data["t"]) + self._data["dt"].append(data["dt"]) + self._data["robot_state"].append(data["robot_state"]) + self._data["global_plan"].append(data["global_plan"]) + self._data["agent_outputs"].append(data["agent_outputs"]) + self._data["robot_cmd"].append(data["robot_cmd"]) + self._data["debug_data"].append(data["debug_data"]) + self._data_length += 1 + except EOFError: + break + logging.info(f"Data parsing complete. Loaded data of length {self._data_length}.") + + @property + def num_datapoints(self): + """Length of data points in the loaded data.""" + return self._data_length + + @property + def key_names(self): + return self._keys + + def get_all_data(self) -> Mapping[str, List[Any]]: + """Get all data as dictionary of lists. + + Available keys: ["t", "dt", "robot_state", "global_plan", "agent_outputs", "robot_cmd", + "debug_data"] + + e.g. `data['t']` will be a list of timesteps from all the control loop iterations that were + recorded. + + Returns: + Mapping[str, List[Any]]: Output data. + Available keys: ["t", "dt", "robot_state", "global_plan", "agent_outputs", + "robot_cmd", "debug_data"] + """ + if self._data is None: + logging.info("Data was not loaded from file. Loading now...") + self.load_data() + return self._data + + def get_all_data_for_key( + self, + key_name: Literal[ + "t", + "dt", + "robot_state", + "global_plan", + "agent_outputs", + "robot_cmd", + "debug_data", + ], + field_name: str = None, + as_ndarray_if_possible: bool = True, + ) -> List[Any] | np.ndarray: + """Get all the data for a specified field for all the objects of a key in the data + dictionary. + + E.g. get_all_data_for_key("robot_state","state_estimates.pose.position") will return a + numpy array of all the state_estimate.pose.position values from the data. If objects are not + numbers or numpy arrays, they are returned as a list. So, + `get_all_data_for_key("robot_state", "state_estimates.pose")` will return a list of Pose3D + objects. + + Also allows index and key access for valid attributes. + e.g.: get_all_data_for_key("robot_state", + "state_estimates.end_effector_states.ee_poses[0].position[0]") is a valid call to get the x + values of the end-effector pose of the first end-effector in the end-effector state + object's `ee_poses` attribute. + + Also handles [:] situations. E.g. + "state_estiamates.end_effector_states.ee_poses[0][:].position" will return positions of + first end-effector (parsing `position` attribute of `Pose3D` across all time stamps). + + Args: + key_name (Literal[ "t", "dt", "robot_state", "global_plan", "agent_outputs", + "robot_cmd", "debug_data"]): The key to look for in the dictionary. + field_name (str, optional): Nested attribute string to retrieve for the data in the + specified key value in the dictionary. Defaults to None. + as_ndarray_if_possible (bool, optional): If the retrieved objects is a number or numpy + array, this option will allow returning a numpy array of the combined values. + Defaults to True. + + Returns: + List[Any] | np.ndarray: List of retrieved attributes from all objects of the specified + key from the loaded data. + """ + data = self._data[key_name] + + if field_name is None: + output_list = data + else: + output_list = [] + for obj in data: + try: + output_list.append(rgetattr(obj, field_name)) + except Exception as exc: + raise AttributeError( + f"Error trying to retrive field {field_name} for key: {key_name}" + ) from exc + + if as_ndarray_if_possible and ( + isinstance(output_list[0], np.ndarray) or isinstance(output_list[0], Number) + ): + output_list = np.array(output_list) + return output_list