Skip to content

Commit

Permalink
new cli tool 'compare'
Browse files Browse the repository at this point in the history
- new cli tool 'compare'
- new example pdx to test compare tool
- added unittest for cli tool 'compare'
- enhanced output for cli tools 'compare' and 'list'

Signed-off-by: Katja Köhler <[email protected]>
Signed-off-by: Andreas Lauser <[email protected]>
  • Loading branch information
kakoeh committed Dec 15, 2023
1 parent 6419637 commit 644a61b
Show file tree
Hide file tree
Showing 12 changed files with 1,457 additions and 112 deletions.
426 changes: 382 additions & 44 deletions README.md

Large diffs are not rendered by default.

Binary file added examples/somersault_01.01.02.pdx
Binary file not shown.
10 changes: 7 additions & 3 deletions odxtools/basicstructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,10 @@ def param_sort_key(param: Parameter) -> Tuple[int, int, int]:
else:
return []

def print_message_format(self, indent: int = 5, allow_unknown_lengths: bool = False) -> None:
def print_message_format(self,
indent: int = 5,
allow_unknown_lengths: bool = False,
print_objects: bool = True) -> None:
"""
Print a description of the message format to `stdout`.
"""
Expand All @@ -470,5 +473,6 @@ def print_message_format(self, indent: int = 5, allow_unknown_lengths: bool = Fa
print(f"{indent * ' '}" + f"\n{indent * ' '}".join(message_as_lines))
else:
print("Sorry, couldn't pretty print message layout. :(")
for p in self.parameters:
print(indent * " " + str(p).replace("\n", f"\n{indent * ' '}"))
if print_objects:
for p in self.parameters:
print(indent * " " + str(p).replace("\n", f"\n{indent * ' '}"))
245 changes: 221 additions & 24 deletions odxtools/cli/_print_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,32 @@
# SPDX-License-Identifier: MIT
import re
from typing import List, Optional, Union

import markdownify
import pandas as pd
from tabulate import tabulate

from ..diaglayer import DiagLayer
from ..diagservice import DiagService
from ..nameditemlist import NamedItemList
from ..odxtypes import AtomicOdxType
from ..parameters.codedconstparameter import CodedConstParameter
from ..parameters.nrcconstparameter import NrcConstParameter
from ..parameters.parameter import Parameter
from ..parameters.physicalconstantparameter import PhysicalConstantParameter
from ..parameters.systemparameter import SystemParameter
from ..parameters.valueparameter import ValueParameter
from ..singleecujob import SingleEcuJob


class Style:
# class with variables and functions to display the result of the comparison

## text style variables
bold = '\033[1m'
underline = '\033[4m'
italic = '033[3m' # doesn't work with Windows CMD and PowerShell
end = '\033[0m'


def format_desc(desc: str, ident: int = 0) -> str:
Expand All @@ -19,14 +42,13 @@ def format_desc(desc: str, ident: int = 0) -> str:
return desc


def print_diagnostic_service(
service: DiagService,
print_params: bool = False,
print_pre_condition_states: bool = False,
print_state_transitions: bool = False,
print_audiences: bool = False,
allow_unknown_bit_lengths: bool = False,
) -> None:
def print_diagnostic_service(service: DiagService,
print_params: bool = False,
print_pre_condition_states: bool = False,
print_state_transitions: bool = False,
print_audiences: bool = False,
allow_unknown_bit_lengths: bool = False,
print_objects: bool = False) -> None:
print(f" {service.short_name} <ID: {service.odx_id}>")

if service.description:
Expand All @@ -53,31 +75,206 @@ def print_diagnostic_service(
print(f" Enabled-Audiences: {', '.join(enabled_audiences_short_names)}")

if print_params:
assert service.request is not None
assert service.positive_responses is not None
assert service.negative_responses is not None
print_service_parameters(
service,
allow_unknown_bit_lengths=allow_unknown_bit_lengths,
print_objects=print_objects)

print(f" Message format of a request:")
service.request.print_message_format(
indent=3, allow_unknown_lengths=allow_unknown_bit_lengths)

print(f" Number of positive responses: {len(service.positive_responses)}")
if len(service.positive_responses) == 1:
resp = service.positive_responses[0]
def print_service_parameters(service: DiagService,
allow_unknown_bit_lengths: bool = False,
print_objects: bool = False) -> None:
# prints parameter details of request, posivite response and negative response of diagnostic service

print(f" Message format of a positive response:")
resp.print_message_format(indent=3, allow_unknown_lengths=allow_unknown_bit_lengths)
assert service.request is not None
assert service.positive_responses is not None
assert service.negative_responses is not None

print(f" Number of negative responses: {len(service.negative_responses)}")
if len(service.negative_responses) == 1:
resp = service.negative_responses[0]
# Request
print("\n")
print(f" Request Properties:")
print(f" Request Name: {service.request.short_name}")
try:
if len(service()) > 24:
print(
f" Byte-Array: {service()[:24]!r}...\tHex-String: 0x{service().hex().upper()[:24]}..."
)
else:
print(f" Byte-Array: {service()!r}\tHex-String: 0x{service().hex().upper()}")
except:
print(f" Byte-Array: ---\tHex-String: 0x---")
print(f" Service Parameters: {service.request.parameters}\n")
table = generatetable('parameter-details', list(service.request.parameters))
print(tabulate(table, headers='keys', tablefmt='presto'))
print("\n")
print(f" Message format of the request:")
service.request.print_message_format(
indent=0, allow_unknown_lengths=allow_unknown_bit_lengths, print_objects=print_objects)

print(f" Message format of a negative response:")
resp.print_message_format(indent=3, allow_unknown_lengths=allow_unknown_bit_lengths)
# Positive Response
print("\n")
print(f" Positive Response Properties:")
print(f" Number of Positive Responses: {len(service.positive_responses)}")
print(f" Positive Responses: {service.positive_responses}")
if len(service.positive_responses) == 1:
resp = service.positive_responses[0]
print(f" Service Parameters: {resp.parameters}\n")
table = generatetable('parameter-details', list(resp.parameters))
print(tabulate(table, headers='keys', tablefmt='presto'))
print("\n")
print(f" Message format of the positive response:")
resp.print_message_format(
indent=0, allow_unknown_lengths=allow_unknown_bit_lengths, print_objects=print_objects)

# Negative Response
print("\n")
print(f" Negative Response Properties:")
print(f" Number of Negative Responses: {len(service.negative_responses)}")
print(f" Negative Responses: {service.negative_responses}")
if len(service.negative_responses) == 1:
resp = service.negative_responses[0]
print(f" Service Parameters: {resp.parameters}\n")
table = generatetable('parameter-details', list(resp.parameters))
print(tabulate(table, headers='keys', tablefmt='presto'))
print("\n")
print(f" Message format of a negative response:")
resp.print_message_format(
indent=0, allow_unknown_lengths=allow_unknown_bit_lengths, print_objects=print_objects)

if (service.positive_responses and
len(service.positive_responses) > 1) or (service.negative_responses and
len(service.negative_responses) > 1):
# Does this ever happen?
raise NotImplementedError(
f"The diagnostic service {service.odx_id} offers more than one response!")


def generatetable(style: str, list_of_items: Union[List[DiagService],
List[Parameter]]) -> pd.DataFrame:
# generates table for different properties of diagnostic service or parameter of diagnostic service
# style = {'service-details', 'parameter-details'}

if style == 'service-details':
name = []
semantic = []
request: List[Optional[str]] = []

for item in list_of_items:
assert isinstance(item, DiagService)
name.append(item.short_name)
semantic.append(item.semantic)

if getattr(item, "missing_params", None) is None:
if len(item().hex().upper()) > 24:
request.append('0x' + item().hex().upper()[:24] + '...')
else:
request.append('0x' + item().hex().upper())
else:
request.append(None)
return pd.DataFrame({'Name': name, 'Semantic': semantic, 'Hex-Request': request})

elif style == 'parameter-details':

name = []
byte = []
bit_length: List[Optional[int]] = []
semantic = []
param_type = []
value: List[Optional[str]] = []
value_type: List[Optional[str]] = []
data_type: List[Optional[str]] = []
dop: List[Optional[str]] = []

for item in list_of_items:
assert isinstance(item, Parameter)
name.append(item.short_name)
byte.append(item.byte_position)
semantic.append(item.semantic)
param_type.append(item.parameter_type)
if item.get_static_bit_length() is not None:
bit_length.append(item.get_static_bit_length())
length = (item.get_static_bit_length() or 0) // 4
else:
bit_length.append(None)
if isinstance(item, CodedConstParameter):

if isinstance(item.coded_value, int):
value.append("0x" + f"{item.coded_value:0{length}X}")
else:
value.append(f"{item.coded_value!r}")
data_type.append(item.diag_coded_type.base_data_type.name)
value_type.append('coded value')
dop.append(None)
elif isinstance(item, NrcConstParameter):
data_type.append(item.diag_coded_type.base_data_type.name)
value.append(str(item.coded_values))
value_type.append('coded values')
dop.append(None)
elif isinstance(item, (PhysicalConstantParameter, SystemParameter, ValueParameter)):

dop.append(item.dop.short_name)
if (tmp := getattr(item, "physical_type", None)) is not None:
data_type.append(tmp.base_data_type.name)
else:
data_type.append(None)
if isinstance(item, PhysicalConstantParameter):
if isinstance(item.physical_constant_value, int):
value.append("0x" + f"{int(item.physical_constant_value):0{length}X}")
else:
value.append(f"{item.physical_constant_value!r}")
value_type.append('constant value')

elif isinstance(item, ValueParameter) and item.physical_default_value is not None:
if isinstance(item.physical_default_value, int):
value.append("0x" + f"{int(item.physical_default_value):0{length}X}")
else:
value.append(f"{item.physical_default_value!r}")
value_type.append('default value')

else:
value.append(None)
value_type.append(None)
else:
value.append(None)
data_type.append(None)
value_type.append(None)
dop.append(None)

return pd.DataFrame({
'Name': name,
'Byte Position': byte,
'Bit Length': bit_length,
'Semantic': semantic,
'Parameter Type': param_type,
'Data Type': data_type,
'Value': value,
'Value Description': value_type,
'Linked DOP': dop
})


def print_ecu_variant_metrics(variants: List[DiagLayer]) -> None:

name = []
type = []
num_services = []
num_dops = []
num_comparams = []
for variant in variants:
assert isinstance(variant, DiagLayer)
all_services: List[Union[DiagService, SingleEcuJob]] = sorted(
variant.services, key=lambda x: x.short_name)
name.append(variant.short_name)
type.append(variant.variant_type.value)
num_services.append(len(all_services))
num_dops.append(len(variant.diag_data_dictionary_spec.data_object_props))
num_comparams.append(len(variant.comparams))

table = pd.DataFrame({
'Name': name,
'Variant Type': type,
'Number of Services': num_services,
'Number of DOPs': num_dops,
'Number of communication parameters': num_comparams
})
print(tabulate(table, headers='keys', tablefmt='presto'))
Loading

0 comments on commit 644a61b

Please sign in to comment.