61 lines
1.9 KiB
Python
61 lines
1.9 KiB
Python
from __future__ import annotations
|
|
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
|
|
@dataclass
|
|
class LinFrame:
|
|
"""Represents a LIN frame.
|
|
|
|
id: Frame identifier (0x00 - 0x3F typical for classic LIN IDs)
|
|
data: Up to 8 bytes payload.
|
|
"""
|
|
|
|
id: int
|
|
data: bytes
|
|
|
|
def __post_init__(self) -> None:
|
|
if not (0 <= self.id <= 0x3F):
|
|
raise ValueError(f"LIN ID out of range: {self.id}")
|
|
if not isinstance(self.data, (bytes, bytearray)):
|
|
# allow list of ints
|
|
try:
|
|
self.data = bytes(self.data) # type: ignore[arg-type]
|
|
except Exception as e: # pragma: no cover - defensive
|
|
raise TypeError("data must be bytes-like") from e
|
|
if len(self.data) > 8:
|
|
raise ValueError("LIN data length must be <= 8")
|
|
|
|
|
|
class LinInterface(ABC):
|
|
"""Abstract interface for LIN communication."""
|
|
|
|
@abstractmethod
|
|
def connect(self) -> None:
|
|
"""Open the interface connection."""
|
|
|
|
@abstractmethod
|
|
def disconnect(self) -> None:
|
|
"""Close the interface connection."""
|
|
|
|
@abstractmethod
|
|
def send(self, frame: LinFrame) -> None:
|
|
"""Send a LIN frame."""
|
|
|
|
@abstractmethod
|
|
def receive(self, id: Optional[int] = None, timeout: float = 1.0) -> Optional[LinFrame]:
|
|
"""Receive a LIN frame, optionally filtered by ID. Returns None on timeout."""
|
|
|
|
def request(self, id: int, length: int, timeout: float = 1.0) -> Optional[LinFrame]:
|
|
"""Default request implementation: send header then wait a frame.
|
|
Override in concrete implementation if different behavior is needed.
|
|
"""
|
|
# By default, just wait for any frame with this ID
|
|
return self.receive(id=id, timeout=timeout)
|
|
|
|
def flush(self) -> None:
|
|
"""Optional: flush RX buffers."""
|
|
pass
|