from __future__ import annotations from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Optional @dataclass class LinFrame: """Represents a LIN frame. id: Frame identifier (0x00 - 0x3F typical for classic LIN IDs) data: Up to 8 bytes payload. """ id: int data: bytes def __post_init__(self) -> None: if not (0 <= self.id <= 0x3F): raise ValueError(f"LIN ID out of range: {self.id}") if not isinstance(self.data, (bytes, bytearray)): # allow list of ints try: self.data = bytes(self.data) # type: ignore[arg-type] except Exception as e: # pragma: no cover - defensive raise TypeError("data must be bytes-like") from e if len(self.data) > 8: raise ValueError("LIN data length must be <= 8") class LinInterface(ABC): """Abstract interface for LIN communication.""" @abstractmethod def connect(self) -> None: """Open the interface connection.""" @abstractmethod def disconnect(self) -> None: """Close the interface connection.""" @abstractmethod def send(self, frame: LinFrame) -> None: """Send a LIN frame.""" @abstractmethod def receive(self, id: Optional[int] = None, timeout: float = 1.0) -> Optional[LinFrame]: """Receive a LIN frame, optionally filtered by ID. Returns None on timeout.""" def request(self, id: int, length: int, timeout: float = 1.0) -> Optional[LinFrame]: """Default request implementation: send header then wait a frame. Override in concrete implementation if different behavior is needed. """ # By default, just wait for any frame with this ID return self.receive(id=id, timeout=timeout) def flush(self) -> None: """Optional: flush RX buffers.""" pass