74 lines
2.4 KiB
Python

from __future__ import annotations
import queue
import threading
import time
from typing import Optional
from .base import LinInterface, LinFrame
class MockBabyLinInterface(LinInterface):
"""A mock LIN interface that echoes frames and synthesizes responses.
Useful for local development without hardware. Thread-safe.
"""
def __init__(self, bitrate: int = 19200, channel: int = 1) -> None:
self.bitrate = bitrate
self.channel = channel
self._rx: "queue.Queue[LinFrame]" = queue.Queue()
self._lock = threading.RLock()
self._connected = False
def connect(self) -> None:
with self._lock:
self._connected = True
def disconnect(self) -> None:
with self._lock:
self._connected = False
# drain queue
try:
while True:
self._rx.get_nowait()
except queue.Empty:
pass
def send(self, frame: LinFrame) -> None:
if not self._connected:
raise RuntimeError("Mock interface not connected")
# echo back the frame as a received event
self._rx.put(frame)
def receive(self, id: Optional[int] = None, timeout: float = 1.0) -> Optional[LinFrame]:
if not self._connected:
raise RuntimeError("Mock interface not connected")
deadline = time.time() + max(0.0, timeout)
while time.time() < deadline:
try:
frm = self._rx.get(timeout=max(0.0, deadline - time.time()))
if id is None or frm.id == id:
return frm
# not matching, requeue tail-safe
self._rx.put(frm)
except queue.Empty:
break
return None
def request(self, id: int, length: int, timeout: float = 1.0) -> Optional[LinFrame]:
if not self._connected:
raise RuntimeError("Mock interface not connected")
# synthesize a deterministic response payload of requested length
payload = bytes((id + i) & 0xFF for i in range(max(0, min(8, length))))
frm = LinFrame(id=id, data=payload)
self._rx.put(frm)
return self.receive(id=id, timeout=timeout)
def flush(self) -> None:
while not self._rx.empty():
try:
self._rx.get_nowait()
except queue.Empty: # pragma: no cover - race guard
break