Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create somersault_modified.pdx dynamically #282

Merged
merged 6 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions examples/mksomersaultmodifiedpdx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
#! /usr/bin/python3
#
# SPDX-License-Identifier: MIT
#
# This script demonstrates how an existing ODX database can be
# modified programatically. Note that this is pretty hacky...
import argparse
from copy import deepcopy
from typing import List, TypeVar

import odxtools
import odxtools.uds as uds
from examples import somersaultecu
from odxtools.diaglayer import DiagLayer
from odxtools.diagservice import DiagService
from odxtools.nameditemlist import NamedItemList
from odxtools.odxlink import OdxLinkId, OdxLinkRef
from odxtools.parameters.codedconstparameter import CodedConstParameter
from odxtools.parameters.valueparameter import ValueParameter
from odxtools.request import Request
from odxtools.response import Response, ResponseType
kayoub5 marked this conversation as resolved.
Show resolved Hide resolved

T = TypeVar("T")


def find_named_object(item_list: List[T], name: str) -> T:
kayoub5 marked this conversation as resolved.
Show resolved Hide resolved
for x in item_list:
if getattr(x, "short_name", None) == name:
return x

raise KeyError(str(name))


FLIC_FLAC_SID = 0xBE

db = somersaultecu.database

dlc = db.diag_layer_containers[0]

# modify the positive response of the tester_present service for all ECUs
somersault_dlr = dlc.base_variants.somersault.diag_layer_raw

# rename the "session_start" and "session_stop" services to
# "start_session" and "stop_session"
start_service = find_named_object(somersault_dlr.diag_comms, "session_start")
assert isinstance(start_service, DiagService)
start_service.short_name = "start_session"

stop_service = find_named_object(somersault_dlr.diag_comms, "session_stop")
assert isinstance(stop_service, DiagService)
stop_service.short_name = "stop_session"

tester_present_service = find_named_object(somersault_dlr.diag_comms, "tester_present")
assert isinstance(tester_present_service, DiagService)
tester_present_pr = tester_present_service.positive_responses[0]
param = tester_present_pr.parameters.status
assert isinstance(param, CodedConstParameter)
param.diag_coded_type = somersaultecu.somersault_diagcodedtypes["uint16"]
param.coded_value = 0x1234

# change the DOP used by the "can_do_backward_flips" parameter of the
# positive response to the "session" service from "bool" to "uint8"
cdbf_param = start_service.positive_responses[0].parameters.can_do_backward_flips
doc_frags = start_service.odx_id.doc_fragments
assert isinstance(cdbf_param, ValueParameter)
cdbf_param.dop_ref = OdxLinkRef("somersault.DOP.uint8", doc_frags)

# change the byte and bit positions of the "num_flips_done" parameter
# of the "grudging_backward" response for all variants variant
gb_response = find_named_object(somersault_dlr.positive_responses, "grudging_backward")
param = gb_response.parameters.num_flips_done
param.byte_position = 2
param.bit_position = 4

# add a new "somersault_young" variant which can do flic-flacs and
# does not take any instructions
somersault_young = deepcopy(dlc.ecu_variants.somersault_lazy)
somersault_young_dlr = dlc.ecu_variants.somersault_lazy.diag_layer_raw
somersault_young_dlr.short_name = "somersault_young"
somersault_young_dlr.odx_id = OdxLinkId("ECU.somersault_young",
somersault_young_dlr.odx_id.doc_fragments)
somersault_young_dlr.description = \
"""<p>A young version of the somersault ECU

It is as grumpy as the lazy variant, but it is more agile, so it can do flic-flacs.

On the flipside, it is unwilling to take any instructions, so no
operational parameters can be set. Finally, it is unwilling to compete
(i.e. it does not time its somersaults).</p>"""

# remove the "sault_time" parameter from the positive response of the
# "do_forward_flips" service.
do_forward_flips_service = somersault_young.services.do_forward_flips
pr = do_forward_flips_service.positive_responses[0]
new_params = [x for x in pr.parameters if getattr(x, "short_name", None) != "sault_time"]
pr.parameters = NamedItemList(new_params)

# add a "flic-flac" service
flic_flac_request = Request(
odx_id=OdxLinkId("somersault.RQ.flic_flac", doc_frags),
short_name="RQ_flic_flac",
long_name=None,
description=None,
admin_data=None,
kayoub5 marked this conversation as resolved.
Show resolved Hide resolved
sdgs=[],
parameters=NamedItemList([
CodedConstParameter(
short_name="sid",
long_name=None,
semantic=None,
description=None,
diag_coded_type=somersaultecu.somersault_diagcodedtypes["uint8"],
byte_position=0,
coded_value=FLIC_FLAC_SID,
bit_position=None,
sdgs=[],
)
]),
byte_size=None,
)
somersault_young_dlr.requests.append(flic_flac_request)

flic_flac_positive_response = Response(
odx_id=OdxLinkId("somersault.PR.flic_flac", doc_frags),
short_name="PR_flic_flac",
long_name=None,
description=None,
admin_data=None,
sdgs=[],
response_type=ResponseType.POSITIVE,
parameters=NamedItemList([
CodedConstParameter(
short_name="sid",
long_name=None,
semantic=None,
description=None,
diag_coded_type=somersaultecu.somersault_diagcodedtypes["uint8"],
byte_position=0,
coded_value=uds.positive_response_id(FLIC_FLAC_SID),
bit_position=None,
sdgs=[],
),
ValueParameter(
short_name="can_do_backward_flips",
long_name=None,
semantic=None,
description=None,
physical_default_value_raw=None,
byte_position=1,
dop_ref=OdxLinkRef("somersault.DOP.boolean", doc_frags),
dop_snref=None,
bit_position=None,
sdgs=[],
),
]),
byte_size=None,
)
somersault_young_dlr.positive_responses.append(flic_flac_positive_response)

flic_flac_service = DiagService(
odx_id=OdxLinkId("somersault.service.flic_flac", doc_frags),
short_name="flic_flac",
long_name=None,
description=None,
admin_data=None,
protocol_snrefs=[],
related_diag_comm_refs=[],
diagnostic_class=None,
is_mandatory_raw=None,
is_executable_raw=None,
is_final_raw=None,
comparam_refs=[],
is_cyclic_raw=None,
is_multiple_raw=None,
addressing_raw=None,
transmission_mode_raw=None,
audience=None,
functional_class_refs=[],
pre_condition_state_refs=[],
state_transition_refs=[],
semantic="FUNCTION",
request_ref=OdxLinkRef.from_id(flic_flac_request.odx_id),
pos_response_refs=[
OdxLinkRef.from_id(flic_flac_positive_response.odx_id),
],
neg_response_refs=[
OdxLinkRef.from_id(somersaultecu.somersault_negative_responses["general"].odx_id),
],
sdgs=[],
)

# create a new list of diagnostic communications that does not include
# the "set_operation_params" and "compulsory_program" services
ss_young_diag_comms = [
x for x in somersault_young_dlr.diag_comms
if getattr(x, "short_name", None) not in ("set_operation_params", "compulsory_program")
]

# append the flic-flac service
ss_young_diag_comms.append(flic_flac_service)

# change the list of the ECU's diag comms
somersault_young_dlr.diag_comms = ss_young_diag_comms

dlc.ecu_variants.append(DiagLayer(diag_layer_raw=somersault_young_dlr))

# make the database consistent. Note: For just writing to disk this is
# not necessary (but it is useful if the database is going to be used
# for something else later...)
db.refresh()

if __name__ == "__main__":
argparser = argparse.ArgumentParser(
description="\n".join([
"Creates a simple sample PDX file for a modified 'somersault' ECU from scratch.", "",
"The modified PDX file is primarily intended to be used a demo for the ",
"'compare' command line tool."
]),
formatter_class=argparse.RawTextHelpFormatter,
)

argparser.add_argument(
"output_pdx_file",
metavar="OUTPUT_PDX_FILE",
help="Path to the where the resulting .pdx file is written",
)

args = argparser.parse_args()

# write the result
odxtools.write_pdx_file(args.output_pdx_file, db)
Binary file modified examples/somersault.pdx
Binary file not shown.
Binary file modified examples/somersault_modified.pdx
Binary file not shown.
11 changes: 1 addition & 10 deletions examples/somersaultecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ class SomersaultSID(IntEnum):
long_name=None,
semantic=None,
description=None,
request_byte_position=3,
request_byte_position=0,
kayoub5 marked this conversation as resolved.
Show resolved Hide resolved
byte_position=1,
byte_length=1,
bit_position=None,
Expand Down Expand Up @@ -2426,12 +2426,3 @@ class SomersaultSID(IntEnum):

# Create ID mapping and resolve references
database.refresh()

# delete all variables except "database"
for name in dir():
if name not in (
"database",
"SID",
):
del globals()[name]
del globals()["name"]
14 changes: 14 additions & 0 deletions odxtools/nameditemlist.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# SPDX-License-Identifier: MIT
import abc
from copy import deepcopy
from keyword import iskeyword
from typing import (Any, Collection, Dict, Iterable, List, Optional, Protocol, SupportsIndex, Tuple,
TypeVar, Union, cast, overload, runtime_checkable)
Expand Down Expand Up @@ -175,6 +176,19 @@ def __str__(self) -> str:
def __repr__(self) -> str:
return f"{type(self).__name__}([{', '.join([repr(x) for x in self])}])"

def __copy__(self) -> Any:
return self.__class__(list(self))

def __deepcopy__(self, memo: Dict[int, Any]) -> Any:
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
result._item_dict = {}
for x in self:
result.append(deepcopy(x, memo))

return result


class NamedItemList(ItemAttributeList[T]):

Expand Down
3 changes: 2 additions & 1 deletion tests/test_odxtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ def test_encode_response_with_matching_request_param_and_structure(self) -> None
OdxLinkRef("somersault.PR.happy_forward", container_doc_frags))

coded_request = request.encode(forward_soberness_check=0x12, num_flips=12)
self.assertEqual(bytes(coded_request), bytes.fromhex("ba120c"))
coded_response = response.encode(yeha_level=3, coded_request=coded_request)
self.assertEqual(bytes(coded_response), 0xFA0003.to_bytes(3, "big"))
self.assertEqual(bytes(coded_response), bytes.fromhex("faba03"))


class TestNamedItemList(unittest.TestCase):
Expand Down
Loading