Skip to content

Commit

Permalink
add data recorder debugger and corresponding data parser
Browse files Browse the repository at this point in the history
  • Loading branch information
justagist committed Aug 8, 2024
1 parent 23fda83 commit ee4f6eb
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 0 deletions.
1 change: 1 addition & 0 deletions pyrcf/components/ctrl_loop_debuggers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
can be used in the control loop."""

from .ctrl_loop_debugger_base import CtrlLoopDebuggerBase, DummyDebugger
from .data_recorder_debugger import ComponentDataRecorderDebugger
104 changes: 104 additions & 0 deletions pyrcf/components/ctrl_loop_debuggers/data_recorder_debugger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from typing import List, Tuple, Callable, Any
import pickle

from ...core.types import LocalMotionPlan, GlobalMotionPlan, RobotCmd, RobotState
from ...utils.time_utils import ClockBase, PythonPerfClock
from .ctrl_loop_debugger_base import CtrlLoopDebuggerBase
from ...core.logging import logging


class ComponentDataRecorderDebugger(CtrlLoopDebuggerBase):
"""A data recording debugger that records all the control loop data directly to a file. Use
`ComponentDataRecorderDataParser` from `pyrcf.utils.data_io_utils`
to parse the recorded data file.
NOTE: if your computer has enough RAM and compute power, this debugger is better (in terms
of keeping desired control loop rate) than `ComponentDataPublisherDebugger`.
"""

def __init__(
self,
file_name: str,
rate: float = None,
clock: ClockBase = PythonPerfClock(),
extra_data_callables: Callable[[], Any] | List[Callable[[], Any]] = None,
buffer_size: int = 50,
):
"""A data recording debugger that records all the control loop data directly to a file. Use
`ComponentDataRecorderDataParser` from `pyrcf.utils.data_io_utils`
to parse the recorded data file.
NOTE: if your computer has enough RAM and compute power, this debugger is better (in terms
of keeping desired control loop rate) than `ComponentDataPublisherDebugger`.
Args:
file_name (str): Path to the file to write to.
rate (float, optional): Rate at which data recording should happen. Defaults to None
(i.e. use control loop rate).
clock (ClockBase, optional): The clock to use for timer. Defaults to PythonPerfClock().
extra_data_callables (Callable[[], Any] | List[Callable[[], Any]], optional):
Additional function handles that return data that needs to be recorded. Defaults to
None.
buffer_size (int, optional): Buffer size to be used to write to file in batches.
Defaults to 50.
Raises:
ValueError: if invalid value in `extra_data_callables`.
"""
super().__init__(rate, clock)

self._file = open(file_name, "wb")

self._additional_handles = []
if extra_data_callables is not None:
if callable(extra_data_callables):
extra_data_callables = [extra_data_callables]
if not isinstance(extra_data_callables, list):
raise ValueError(
f"{self.__class__.__name__}: Argument to this function should"
" be a callable or a list of callables."
)
assert all(
callable(ele) for ele in extra_data_callables
), "All elements in `extra_data_callables` should be a function handle."
for handle in extra_data_callables:
if handle not in self._additional_handles:
self._additional_handles.append(handle)
self._buffer = []
self._buffer_size = buffer_size

def _run_once_impl(
self,
t: float,
dt: float,
robot_state: RobotState,
global_plan: GlobalMotionPlan,
agent_outputs: List[Tuple[LocalMotionPlan, RobotCmd]],
robot_cmd: RobotCmd,
):
self._buffer.append(
{
"t": t,
"dt": dt,
"robot_state": robot_state,
"global_plan": global_plan,
"agent_outputs": agent_outputs,
"robot_cmd": robot_cmd,
"debug_data": [_get_data() for _get_data in self._additional_handles],
}
)
if len(self._buffer) >= self._buffer_size:
pickle.dump(
self._buffer,
self._file,
)
self._buffer.clear()

def shutdown(self):
if len(self._buffer) > 0:
pickle.dump(
self._buffer,
self._file,
)
logging.info(f"Saved data to file: {self._file.name}")
self._file.close()
1 change: 1 addition & 0 deletions pyrcf/utils/data_io_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
DEFAULT_ZMQ_PUBLISH_PORT,
)
from .pyrcf_subscriber import PyRCFSubscriberBase, PyRCFSubscriberZMQ
from .recorded_data_parser import ComponentDataRecorderDataParser
206 changes: 206 additions & 0 deletions pyrcf/utils/data_io_utils/recorded_data_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
from typing import List, Mapping, Any, Literal
import functools
import pickle
import numpy as np
from numbers import Number

from ...core.logging import logging


def rgetattr(obj, attr, *args):
"""Recursive version of python's `getattr` method to get nested sub-object attributes.
Also tries to deal with `__getitem__` calls (e.g. dictionary keys, or indices)!
Also handles [:] situations. E.g. "state_estiamates.end_effector_states.ee_poses[0][:].position"
will return positions of first end-effector (parsing `position` attribute of `Pose3D` across
all time stamps).
"""

def _getattr(obj, attr):
# recursive getattr that also tries to deal with __getitem__ calls
split_at_bracket_end = attr.split("]")
if len(split_at_bracket_end) > 1:
# UGLY: should probably switch to regex
remaining = "]".join(split_at_bracket_end[1:])
spl = attr.split("[")
str_attr = spl[0]
val = spl[1].split("]")[0]
if val.isnumeric():
val = int(val)
n_obj = obj if str_attr == "" else getattr(obj, str_attr, *args)
if val != ":":
item = n_obj[val]
if remaining == "":
return item
return _getattr(item, remaining)

if remaining == "":
if isinstance(obj, (list, tuple, np.ndarray)) and len(obj) == 1:
return n_obj[0]
return n_obj
return (
[_getattr(i, remaining) for i in n_obj]
if len(n_obj) > 1
else _getattr(n_obj[0], remaining)
)

else:
if isinstance(obj, (list, tuple)):
return (
[getattr(o, attr, *args) for o in obj]
if len(obj) > 1
else getattr(obj[0], attr, *args)
)
if isinstance(obj, np.ndarray):
return np.array(
[getattr(o, attr, *args) for o in obj]
if len(obj) > 1
else getattr(obj[0], attr, *args)
)
return getattr(obj, attr, *args)

return functools.reduce(_getattr, [obj] + attr.split("."))


class ComponentDataRecorderDataParser:
"""A data parser for reading file created using `ComponentDataRecorderDebugger`."""

def __init__(self, file_name: str, load_on_init: bool = True):
"""A data parser for reading file created using `ComponentDataRecorderDebugger`.
Args:
file_name (str): Path to the file to read.
load_on_init (bool, optional): If True, will read file and load all data to
memory during init; otherwise only when data is queried. Defaults to True.
"""

self._filename = file_name
self._data = None
self._keys = None
self._data_length = None
if load_on_init:
self.load_data()

def load_data(self):
self._data = {
"t": [],
"dt": [],
"robot_state": [],
"global_plan": [],
"agent_outputs": [],
"robot_cmd": [],
"debug_data": [],
}
self._keys = list(self._data.keys())
self._data_length = 0
logging.info("Parsing data...")
with open(self._filename, "rb") as f:
while True:
try:
data_buffer = pickle.load(f)
for data in data_buffer:
self._data["t"].append(data["t"])
self._data["dt"].append(data["dt"])
self._data["robot_state"].append(data["robot_state"])
self._data["global_plan"].append(data["global_plan"])
self._data["agent_outputs"].append(data["agent_outputs"])
self._data["robot_cmd"].append(data["robot_cmd"])
self._data["debug_data"].append(data["debug_data"])
self._data_length += 1
except EOFError:
break
logging.info(f"Data parsing complete. Loaded data of length {self._data_length}.")

@property
def num_datapoints(self):
"""Length of data points in the loaded data."""
return self._data_length

@property
def key_names(self):
return self._keys

def get_all_data(self) -> Mapping[str, List[Any]]:
"""Get all data as dictionary of lists.
Available keys: ["t", "dt", "robot_state", "global_plan", "agent_outputs", "robot_cmd",
"debug_data"]
e.g. `data['t']` will be a list of timesteps from all the control loop iterations that were
recorded.
Returns:
Mapping[str, List[Any]]: Output data.
Available keys: ["t", "dt", "robot_state", "global_plan", "agent_outputs",
"robot_cmd", "debug_data"]
"""
if self._data is None:
logging.info("Data was not loaded from file. Loading now...")
self.load_data()
return self._data

def get_all_data_for_key(
self,
key_name: Literal[
"t",
"dt",
"robot_state",
"global_plan",
"agent_outputs",
"robot_cmd",
"debug_data",
],
field_name: str = None,
as_ndarray_if_possible: bool = True,
) -> List[Any] | np.ndarray:
"""Get all the data for a specified field for all the objects of a key in the data
dictionary.
E.g. get_all_data_for_key("robot_state","state_estimates.pose.position") will return a
numpy array of all the state_estimate.pose.position values from the data. If objects are not
numbers or numpy arrays, they are returned as a list. So,
`get_all_data_for_key("robot_state", "state_estimates.pose")` will return a list of Pose3D
objects.
Also allows index and key access for valid attributes.
e.g.: get_all_data_for_key("robot_state",
"state_estimates.end_effector_states.ee_poses[0].position[0]") is a valid call to get the x
values of the end-effector pose of the first end-effector in the end-effector state
object's `ee_poses` attribute.
Also handles [:] situations. E.g.
"state_estiamates.end_effector_states.ee_poses[0][:].position" will return positions of
first end-effector (parsing `position` attribute of `Pose3D` across all time stamps).
Args:
key_name (Literal[ "t", "dt", "robot_state", "global_plan", "agent_outputs",
"robot_cmd", "debug_data"]): The key to look for in the dictionary.
field_name (str, optional): Nested attribute string to retrieve for the data in the
specified key value in the dictionary. Defaults to None.
as_ndarray_if_possible (bool, optional): If the retrieved objects is a number or numpy
array, this option will allow returning a numpy array of the combined values.
Defaults to True.
Returns:
List[Any] | np.ndarray: List of retrieved attributes from all objects of the specified
key from the loaded data.
"""
data = self._data[key_name]

if field_name is None:
output_list = data
else:
output_list = []
for obj in data:
try:
output_list.append(rgetattr(obj, field_name))
except Exception as exc:
raise AttributeError(
f"Error trying to retrive field {field_name} for key: {key_name}"
) from exc

if as_ndarray_if_possible and (
isinstance(output_list[0], np.ndarray) or isinstance(output_list[0], Number)
):
output_list = np.array(output_list)
return output_list

0 comments on commit ee4f6eb

Please sign in to comment.