174 lines
6.2 KiB
Python

"""Thin wrapper over `ldfparser` for use in tests.
Loads an LDF (LIN Description File) and exposes per-frame `pack()` /
`unpack()` helpers plus a `frame_lengths()` map suitable for plugging
into the MUM adapter's `frame_lengths` argument.
Typical usage:
from ecu_framework.lin.ldf import LdfDatabase
db = LdfDatabase("./vendor/4SEVEN_color_lib_test.ldf")
frame = db.frame("ALM_Req_A")
payload = frame.pack(
AmbLightColourRed=0xFF,
AmbLightColourGreen=0xFF,
AmbLightColourBlue=0xFF,
AmbLightIntensity=0xFF,
AmbLightLIDFrom=0x01,
AmbLightLIDTo=0x01,
)
# → bytes(8); unspecified signals fall back to their LDF init_value.
decoded = db.frame("ALM_Status").unpack(b"\\x07\\x00\\x00\\x00")
# → {'ALMNadNo': 7, 'ALMVoltageStatus': 0, ...}
The wrapper uses `encode_raw` / `decode_raw` rather than `encode` / `decode`
so signal *encoding types* (logical/physical conversions) are bypassed —
tests work with raw integer values, which is what `LinFrame.data` carries.
If you need encoding-type interpretation, use `Frame.encode()` /
`Frame.decode()` (which delegate to the underlying ldfparser methods).
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Tuple, Union
class FrameNotFound(KeyError):
"""Raised when a frame name or ID isn't present in the loaded LDF."""
class Frame:
"""Lightweight wrapper around an `ldfparser` frame object.
Exposes the attributes tests actually need (`id`, `name`, `length`,
`signal_layout`) and `pack`/`unpack` helpers that work on raw bytes.
"""
__slots__ = ("_raw",)
def __init__(self, raw_frame: Any) -> None:
self._raw = raw_frame
@property
def name(self) -> str:
return str(self._raw.name)
@property
def id(self) -> int:
return int(self._raw.frame_id)
@property
def length(self) -> int:
return int(self._raw.length)
def signal_layout(self) -> List[Tuple[int, str, int]]:
"""Return [(start_bit, signal_name, width_in_bits), ...]."""
return [(int(off), s.name, int(s.width)) for off, s in self._raw.signal_map]
def signal_names(self) -> List[str]:
return [s.name for _, s in self._raw.signal_map]
# ---- raw (integer) packing ------------------------------------------
def pack(self, *args: Dict[str, int], **kwargs: int) -> bytes:
"""Encode signal values into the raw payload for this frame.
Accepts either a single dict positional argument or keyword args:
frame.pack(AmbLightColourRed=255, AmbLightColourGreen=128)
frame.pack({"AmbLightColourRed": 255, "AmbLightColourGreen": 128})
Signals not provided fall back to the `init_value` declared in the
LDF (handled by ldfparser's `encode_raw`). Returns bytes of length
`self.length`.
"""
if args and kwargs:
raise TypeError("pack() takes either a positional dict or kwargs, not both")
if args:
if len(args) != 1 or not isinstance(args[0], dict):
raise TypeError("pack() positional argument must be a dict")
values = dict(args[0])
else:
values = dict(kwargs)
encoded = self._raw.encode_raw(values)
return bytes(encoded)
def unpack(self, data: Union[bytes, bytearray, list]) -> Dict[str, int]:
"""Decode raw bytes into a `{signal_name: int}` dict."""
return dict(self._raw.decode_raw(bytes(data)))
# ---- encoding-aware (logical/physical values) -----------------------
def encode(self, values: Dict[str, Any]) -> bytes:
"""Encode using LDF encoding types (logical → numeric, physical scaling).
Useful when you want to write 'Immediate color Update' instead of `0`.
Falls back to ldfparser's `encode`.
"""
encoded = self._raw.encode(values)
return bytes(encoded)
def decode(self, data: Union[bytes, bytearray, list]) -> Dict[str, Any]:
"""Decode using LDF encoding types (numeric → logical/physical)."""
return dict(self._raw.decode(bytes(data)))
def __repr__(self) -> str:
return f"Frame(name={self.name!r}, id=0x{self.id:02X}, length={self.length})"
class LdfDatabase:
"""Load an LDF file and expose its frames in a test-friendly form."""
def __init__(self, path: Union[str, Path]) -> None:
# Lazy import keeps the framework importable on machines without ldfparser
# — only `LdfDatabase` instantiation requires it.
try:
from ldfparser import parse_ldf # type: ignore
except Exception as e:
raise RuntimeError(
"ldfparser is not installed. Install it with: pip install ldfparser"
) from e
self.path = Path(path)
if not self.path.is_file():
raise FileNotFoundError(f"LDF not found: {self.path}")
self._raw = parse_ldf(str(self.path))
@property
def baudrate(self) -> int:
return int(self._raw.baudrate)
@property
def protocol_version(self) -> str:
return str(self._raw.protocol_version)
def frame(self, key: Union[str, int]) -> Frame:
"""Look up a frame by name (str) or by frame_id (int)."""
try:
raw = self._raw.get_frame(key)
except LookupError as e:
raise FrameNotFound(f"Frame {key!r} not found in {self.path.name}") from e
return Frame(raw)
def frames(self) -> List[Frame]:
"""Return all unconditional frames (excludes diagnostic/event-triggered)."""
return [Frame(rf) for rf in self._raw.frames]
def frame_lengths(self) -> Dict[int, int]:
"""`{frame_id: length}` map suitable for `MumLinInterface(frame_lengths=...)`."""
return {int(rf.frame_id): int(rf.length) for rf in self._raw.frames}
def signal_names(self, frame_key: Union[str, int]) -> List[str]:
"""Convenience: list signal names for a given frame."""
return self.frame(frame_key).signal_names()
def __repr__(self) -> str:
try:
n = sum(1 for _ in self._raw.frames)
except Exception:
n = "?"
return f"LdfDatabase(path={self.path!s}, frames={n})"
__all__ = ["LdfDatabase", "Frame", "FrameNotFound"]