ecu-tests/vendor/mock_babylin_wrapper.py

117 lines
4.3 KiB
Python

"""Mock implementation of the BabyLIN SDK wrapper API used by our adapter.
This module provides create_BabyLIN() returning an object with BLC_* methods,
so the real adapter can be exercised without hardware.
Design notes:
- We simulate a single device with one channel and an RX queue per channel.
- Transmit (BLC_mon_set_xmit) echoes payload into the RX queue to mimic loopback.
- Master request (BLC_sendRawMasterRequest) enqueues a deterministic response so
tests can validate request/response logic without randomness.
"""
from dataclasses import dataclass
from typing import List
BL_OK = 0 # Success code matching the real SDK convention
@dataclass
class BLC_FRAME:
"""Minimal frame structure to mirror the SDK's BLC_FRAME used by the adapter."""
frameId: int
lenOfData: int
frameData: bytes
class _MockChannel:
"""Represents a BabyLIN channel with a simple RX queue."""
def __init__(self):
self.rx: List[BLC_FRAME] = [] # FIFO for received frames
class _MockBL:
"""BabyLIN mock exposing the subset of BLC_* APIs our adapter calls."""
def __init__(self):
self.BL_OK = BL_OK
self._ports = ["MOCK_PORT"] # Simulate one discoverable device
self._handle = object() # Opaque handle placeholder
self._channels = [_MockChannel()] # Single channel system
# -----------------------------
# Discovery/open/close
# -----------------------------
def BLC_getBabyLinPorts(self, timeout_ms: int):
"""Return a list of mock ports; timeout not used in mock."""
return list(self._ports)
def BLC_openPort(self, port: str):
"""Return an opaque handle for the given port name."""
return self._handle
def BLC_closeAll(self):
"""Pretend to close; always succeeds."""
return BL_OK
# -----------------------------
# SDF and channel handling
# -----------------------------
def BLC_loadSDF(self, handle, sdf_path: str, download: int):
"""No-op in mock; assume success."""
return BL_OK
def BLC_getChannelCount(self, handle):
"""Report number of channels (1 in mock)."""
return len(self._channels)
def BLC_getChannelHandle(self, handle, idx: int):
"""Return the channel object acting as its own handle."""
return self._channels[idx]
def BLC_sendCommand(self, channel, command: str):
"""Accept any command (e.g., start schedule); always succeed."""
return BL_OK
# -----------------------------
# Transmit/Receive primitives
# -----------------------------
def BLC_mon_set_xmit(self, channel: _MockChannel, frame_id: int, data: bytes, slot_time: int):
"""Echo transmitted payload back to RX to simulate a bus loopback."""
channel.rx.append(BLC_FRAME(frameId=frame_id, lenOfData=len(data), frameData=bytes(data)))
return BL_OK
def BLC_getNextFrameTimeout(self, channel: _MockChannel, timeout_ms: int):
"""Pop next frame from RX queue; return None on timeout (empty queue)."""
if channel.rx:
return channel.rx.pop(0)
# Simulate timeout -> real wrapper may raise; we return None for simplicity
return None
def BLC_sendRawMasterRequest(self, channel: _MockChannel, frame_id: int, payload_or_length):
"""Simulate a slave response for a master request.
Supports two call forms to mirror SDK variations:
- (channel, frame_id, bytes): use bytes as the response payload
- (channel, frame_id, length): synthesize payload with a deterministic pattern
"""
if isinstance(payload_or_length, (bytes, bytearray)):
data = bytes(payload_or_length)
else:
length = int(payload_or_length)
# Deterministic pattern: response[i] = (frame_id + i) & 0xFF
data = bytes(((frame_id + i) & 0xFF) for i in range(max(0, min(8, length))))
# Enqueue the response frame as if the slave published it on the bus
channel.rx.append(BLC_FRAME(frameId=frame_id, lenOfData=len(data), frameData=data))
return BL_OK
def BLC_getDetailedErrorString(self, rc: int):
"""Provide a friendly error string for non-OK return codes."""
return f"Mock error rc={rc}"
def create_BabyLIN():
"""Factory method matching the real SDK to construct the mock instance."""
return _MockBL()