Skip to content

Commit

Permalink
brping: pingmessage: Move to sphinx documentaiton style
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick José Pereira <[email protected]>
  • Loading branch information
patrickelectric committed May 3, 2024
1 parent ff91a86 commit dce3e69
Showing 1 changed file with 61 additions and 28 deletions.
89 changes: 61 additions & 28 deletions brping/pingmessage.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
#!/usr/bin/env python3

# PingMessage.py
# Python implementation of the Blue Robotics 'Ping' binary message protocol
"""
Python implementation of the Blue Robotics 'Ping' binary message protocol.
This module encapsulates the structure and functionality required to create, parse, and manage messages
used in the Blue Robotics 'Ping' protocol, specifically designed for communication with underwater sonar devices.
"""

import struct
from brping import definitions
payload_dict = definitions.payload_dict_all
asciiMsgs = [definitions.COMMON_NACK, definitions.COMMON_ASCII_TEXT]
variable_msgs = [definitions.PING1D_PROFILE, definitions.PING360_DEVICE_DATA, ]

variable_msgs = [definitions.PING1D_PROFILE, definitions.PING360_DEVICE_DATA]

class PingMessage(object):
"""
Represents a message protocol for the Blue Robotics 'Ping' devices.
This class provides methods to pack and unpack message data into/from binary formats, handle checksums,
and facilitate the communication between client applications and the device.
"""

## header start byte 1
start_1 = ord("B")

Expand Down Expand Up @@ -68,6 +78,12 @@ class PingMessage(object):
# length_mm = m.length_mm
# @endcode
def __init__(self, msg_id=0, msg_data=None):
"""
Initializes a new PingMessage object.
:param msg_id: The message ID for this message.
:param msg_data: Binary data buffer for unpacking into a message object. If None, an empty message is created.
"""
## The message id
self.message_id = msg_id

Expand Down Expand Up @@ -122,9 +138,12 @@ def __init__(self, msg_id=0, msg_data=None):
print("message id not recognized: %d" % self.message_id, msg_data)
raise e

## Pack object attributes into self.msg_data (bytearray)
# @return self.msg_data
def pack_msg_data(self):
"""
Packs object attributes into self.msg_data as a bytearray.
:return: The packed message data.
"""
# necessary for variable length payloads
# update using current contents for the variable length field
self.update_payload_length()
Expand Down Expand Up @@ -152,9 +171,13 @@ def pack_msg_data(self):

return self.msg_data

## Attempts to unpack a bytearray into object attributes
# @Returns True if successful, False otherwise
def unpack_msg_data(self, msg_data):
"""
Unpacks a bytearray into object attributes.
:param msg_data: The binary data to unpack.
:return: True if successful, False otherwise.
"""
self.msg_data = msg_data

# Extract header
Expand Down Expand Up @@ -199,31 +222,49 @@ def unpack_msg_data(self, msg_data):
self.checksum = struct.unpack(PingMessage.endianess + PingMessage.checksum_format, self.msg_data[PingMessage.headerLength + self.payload_length: PingMessage.headerLength + self.payload_length + PingMessage.checksumLength])[0]
return True

## Calculate the checksum from the internal bytearray self.msg_data
def calculate_checksum(self):
"""
Calculates the checksum for the current message data.
:return: Calculated checksum value.
"""
return sum(self.msg_data[0:PingMessage.headerLength + self.payload_length]) & 0xffff

## Update the object checksum value
# @return the object checksum value
def update_checksum(self):
"""
Updates the checksum value of the object based on its message data.
:return: Updated checksum value.
"""
self.checksum = self.calculate_checksum()
return self.checksum

## Verify that the object checksum attribute is equal to the checksum calculated according to the internal bytearray self.msg_data
def verify_checksum(self):
"""
Verifies that the object's checksum attribute matches the checksum calculated from its message data.
:return: True if checksums match, False otherwise.
"""
return self.checksum == self.calculate_checksum()

## Update the payload_length attribute with the **current** payload length, including dynamic length fields (if present)
def update_payload_length(self):
def get_payload_format(self):
"""
Retrieves the struct formatting string for the message payload based on the message ID and dynamic content.
:return: Struct format string for the payload.
"""
if self.message_id in variable_msgs or self.message_id in asciiMsgs:
# The last field self.payload_field_names[-1] is always the single dynamic-length field
self.payload_length = payload_dict[self.message_id]["payload_length"] + len(getattr(self, self.payload_field_names[-1]))
else:
self.payload_length = payload_dict[self.message_id]["payload_length"]

## Get the python struct formatting string for the message payload
# @return the payload struct format string
def get_payload_format(self):
"""
Retrieves the struct formatting string for the message payload based on the message ID and dynamic content.
:return: Struct format string for the payload.
"""
# messages with variable length fields
if self.message_id in variable_msgs or self.message_id in asciiMsgs:
var_length = self.payload_length - payload_dict[self.message_id]["payload_length"] # Subtract static length portion from payload length
Expand All @@ -234,9 +275,11 @@ def get_payload_format(self):
else: # messages with a static (constant) length
return payload_dict[self.message_id]["format"]

## Dump object into string representation
# @return string representation of the object
def __repr__(self):
"""
Returns a string representation of the Ping
:return: String representation of the PingMessage object.
"""
header_string = "Header:"
for attr in PingMessage.header_field_names:
header_string += " " + attr + ": " + str(getattr(self, attr))
Expand Down Expand Up @@ -381,16 +424,6 @@ def wait_checksum_h(self, msg_byte):
return self.state

def parse_byte(self, msg_byte):
""" Returns the current parse state after feeding the parser a single byte.
'msg_byte' is the byte to parse.
If it completes a valid message, returns PingParser.NEW_MESSAGE.
The decoded PingMessage will be available in the self.rx_msg attribute
until a new message is decoded.
"""
# Apply the relevant parsing method for the current state.
# (offset by 1 because NEW_MESSAGE isn't processed - start at WAIT_START)
result = self._PARSE_BYTE[self.state - 1](self, msg_byte)

return self.state if result is None else result
Expand Down

0 comments on commit dce3e69

Please sign in to comment.