117 lines
4.3 KiB
Python
117 lines
4.3 KiB
Python
"""Mock implementation of the BabyLIN SDK wrapper API used by our adapter.
|
|
|
|
This module provides create_BabyLIN() returning an object with BLC_* methods,
|
|
so the real adapter can be exercised without hardware.
|
|
|
|
Design notes:
|
|
- We simulate a single device with one channel and an RX queue per channel.
|
|
- Transmit (BLC_mon_set_xmit) echoes payload into the RX queue to mimic loopback.
|
|
- Master request (BLC_sendRawMasterRequest) enqueues a deterministic response so
|
|
tests can validate request/response logic without randomness.
|
|
"""
|
|
|
|
from dataclasses import dataclass
|
|
from typing import List
|
|
|
|
BL_OK = 0 # Success code matching the real SDK convention
|
|
|
|
|
|
@dataclass
|
|
class BLC_FRAME:
|
|
"""Minimal frame structure to mirror the SDK's BLC_FRAME used by the adapter."""
|
|
frameId: int
|
|
lenOfData: int
|
|
frameData: bytes
|
|
|
|
|
|
class _MockChannel:
|
|
"""Represents a BabyLIN channel with a simple RX queue."""
|
|
|
|
def __init__(self):
|
|
self.rx: List[BLC_FRAME] = [] # FIFO for received frames
|
|
|
|
|
|
class _MockBL:
|
|
"""BabyLIN mock exposing the subset of BLC_* APIs our adapter calls."""
|
|
|
|
def __init__(self):
|
|
self.BL_OK = BL_OK
|
|
self._ports = ["MOCK_PORT"] # Simulate one discoverable device
|
|
self._handle = object() # Opaque handle placeholder
|
|
self._channels = [_MockChannel()] # Single channel system
|
|
|
|
# -----------------------------
|
|
# Discovery/open/close
|
|
# -----------------------------
|
|
def BLC_getBabyLinPorts(self, timeout_ms: int):
|
|
"""Return a list of mock ports; timeout not used in mock."""
|
|
return list(self._ports)
|
|
|
|
def BLC_openPort(self, port: str):
|
|
"""Return an opaque handle for the given port name."""
|
|
return self._handle
|
|
|
|
def BLC_closeAll(self):
|
|
"""Pretend to close; always succeeds."""
|
|
return BL_OK
|
|
|
|
# -----------------------------
|
|
# SDF and channel handling
|
|
# -----------------------------
|
|
def BLC_loadSDF(self, handle, sdf_path: str, download: int):
|
|
"""No-op in mock; assume success."""
|
|
return BL_OK
|
|
|
|
def BLC_getChannelCount(self, handle):
|
|
"""Report number of channels (1 in mock)."""
|
|
return len(self._channels)
|
|
|
|
def BLC_getChannelHandle(self, handle, idx: int):
|
|
"""Return the channel object acting as its own handle."""
|
|
return self._channels[idx]
|
|
|
|
def BLC_sendCommand(self, channel, command: str):
|
|
"""Accept any command (e.g., start schedule); always succeed."""
|
|
return BL_OK
|
|
|
|
# -----------------------------
|
|
# Transmit/Receive primitives
|
|
# -----------------------------
|
|
def BLC_mon_set_xmit(self, channel: _MockChannel, frame_id: int, data: bytes, slot_time: int):
|
|
"""Echo transmitted payload back to RX to simulate a bus loopback."""
|
|
channel.rx.append(BLC_FRAME(frameId=frame_id, lenOfData=len(data), frameData=bytes(data)))
|
|
return BL_OK
|
|
|
|
def BLC_getNextFrameTimeout(self, channel: _MockChannel, timeout_ms: int):
|
|
"""Pop next frame from RX queue; return None on timeout (empty queue)."""
|
|
if channel.rx:
|
|
return channel.rx.pop(0)
|
|
# Simulate timeout -> real wrapper may raise; we return None for simplicity
|
|
return None
|
|
|
|
def BLC_sendRawMasterRequest(self, channel: _MockChannel, frame_id: int, payload_or_length):
|
|
"""Simulate a slave response for a master request.
|
|
|
|
Supports two call forms to mirror SDK variations:
|
|
- (channel, frame_id, bytes): use bytes as the response payload
|
|
- (channel, frame_id, length): synthesize payload with a deterministic pattern
|
|
"""
|
|
if isinstance(payload_or_length, (bytes, bytearray)):
|
|
data = bytes(payload_or_length)
|
|
else:
|
|
length = int(payload_or_length)
|
|
# Deterministic pattern: response[i] = (frame_id + i) & 0xFF
|
|
data = bytes(((frame_id + i) & 0xFF) for i in range(max(0, min(8, length))))
|
|
# Enqueue the response frame as if the slave published it on the bus
|
|
channel.rx.append(BLC_FRAME(frameId=frame_id, lenOfData=len(data), frameData=data))
|
|
return BL_OK
|
|
|
|
def BLC_getDetailedErrorString(self, rc: int):
|
|
"""Provide a friendly error string for non-OK return codes."""
|
|
return f"Mock error rc={rc}"
|
|
|
|
|
|
def create_BabyLIN():
|
|
"""Factory method matching the real SDK to construct the mock instance."""
|
|
return _MockBL()
|