"""Generic LDF-driven frame I/O for tests. ``FrameIO`` is a thin layer over ``ecu_framework.lin.base.LinInterface`` that knows about an LDF database. It is **domain-agnostic** — it does not care whether the frame is ALM-related, BSM-related, or anything else. Three access levels are exposed so a tester can pick the abstraction they need: 1. **High** — work in terms of frame and signal names:: fio.send("ALM_Req_A", AmbLightColourRed=255, ...) decoded = fio.receive("ALM_Status") nad = fio.read_signal("ALM_Status", "ALMNadNo") 2. **Mid** — convert between signal kwargs and bytes without I/O:: data = fio.pack("ConfigFrame", ConfigFrame_Calibration=0, ...) decoded = fio.unpack("PWM_Frame", raw_bytes) 3. **Low** — bypass the LDF entirely and push/pull raw bytes:: fio.send_raw(0x12, b"\\x00" * 8) rx = fio.receive_raw(0x11, timeout=0.5) The introspection helpers (:meth:`frame`, :meth:`frame_id`, :meth:`frame_length`) are useful for tests that mix layers (e.g. pack with the LDF, hand-edit a byte, then ``send_raw``). """ from __future__ import annotations from typing import Any, Optional from ecu_framework.lin.base import LinFrame, LinInterface class FrameIO: """LDF-driven frame I/O over a LIN interface. Frame lookups are cached per ``FrameIO`` instance, so repeated calls to :meth:`send`, :meth:`receive`, or :meth:`frame` don't re-walk the LDF. """ def __init__(self, lin: LinInterface, ldf) -> None: self._lin = lin self._ldf = ldf self._frames: dict = {} # --- properties -------------------------------------------------------- @property def lin(self) -> LinInterface: return self._lin @property def ldf(self): return self._ldf # --- introspection ----------------------------------------------------- def frame(self, name: str): """Return the LDF Frame object for ``name``; cached after first lookup.""" f = self._frames.get(name) if f is None: f = self._ldf.frame(name) self._frames[name] = f return f def frame_id(self, name: str) -> int: return int(self.frame(name).id) def frame_length(self, name: str) -> int: return int(self.frame(name).length) # --- high level: by name ---------------------------------------------- def send(self, frame_name: str, **signals) -> None: """Pack the named frame from ``**signals`` and transmit it. ``signals`` must cover every signal in the frame (ldfparser raises if one is missing). Use :meth:`receive` first to capture a current snapshot if you only want to change one signal. """ f = self.frame(frame_name) self._lin.send(LinFrame(id=f.id, data=f.pack(**signals))) def receive(self, frame_name: str, timeout: float = 1.0) -> Optional[dict]: """Receive ``frame_name`` and return its decoded signals as a dict, or ``None`` if the slave didn't respond within ``timeout``. """ f = self.frame(frame_name) rx = self._lin.receive(id=f.id, timeout=timeout) if rx is None: return None return f.unpack(bytes(rx.data)) def read_signal( self, frame_name: str, signal_name: str, *, timeout: float = 1.0, default: Any = None, ) -> Any: """Read a single signal value from a frame. Returns ``default`` if the frame timed out or the signal isn't present in the decoded payload. """ decoded = self.receive(frame_name, timeout=timeout) if decoded is None: return default return decoded.get(signal_name, default) # --- mid level: pack/unpack without I/O -------------------------------- def pack(self, frame_name: str, **signals) -> bytes: """Pack ``signals`` into raw bytes per the LDF, no transmission.""" return bytes(self.frame(frame_name).pack(**signals)) def unpack(self, frame_name: str, data: bytes) -> dict: """Decode ``data`` against the named frame's LDF layout.""" return self.frame(frame_name).unpack(bytes(data)) # --- low level: raw bus ------------------------------------------------ def send_raw(self, frame_id: int, data: bytes) -> None: """Send arbitrary bytes on a frame ID. Bypasses the LDF entirely.""" self._lin.send(LinFrame(id=int(frame_id), data=bytes(data))) def receive_raw(self, frame_id: int, timeout: float = 1.0) -> Optional[LinFrame]: """Receive a frame by ID and return the raw ``LinFrame`` (or None). Use this when you don't have an LDF entry for the frame, or when you want to inspect the raw payload before decoding. """ return self._lin.receive(id=int(frame_id), timeout=timeout)