221 lines
11 KiB
Python
221 lines
11 KiB
Python
from __future__ import annotations # Enable postponed evaluation of annotations (PEP 563/649 style)
|
|
|
|
from typing import Optional # For optional type hints
|
|
|
|
from .base import LinInterface, LinFrame # Base abstraction and frame dataclass used by all LIN adapters
|
|
|
|
|
|
class BabyLinInterface(LinInterface):
|
|
"""LIN adapter that uses the vendor's BabyLIN Python SDK wrapper.
|
|
|
|
- Avoids manual ctypes; relies on BabyLIN_library.py BLC_* functions.
|
|
- Keeps the same LinInterface contract for send/receive/request/flush.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
dll_path: Optional[str] = None, # Not used by SDK wrapper (auto-selects platform libs)
|
|
bitrate: int = 19200, # Informational; typically defined by SDF/schedule
|
|
channel: int = 0, # Channel index used with BLC_getChannelHandle (0-based)
|
|
node_name: Optional[str] = None, # Optional friendly name (not used by SDK calls)
|
|
func_names: Optional[dict] = None, # Legacy (ctypes) compatibility; unused here
|
|
sdf_path: Optional[str] = None, # Optional SDF file to load after open
|
|
schedule_nr: int = 0, # Schedule number to start after connect
|
|
wrapper_module: Optional[object] = None, # Inject a wrapper (e.g., mock) for tests
|
|
) -> None:
|
|
self.bitrate = bitrate # Store configured (informational) bitrate
|
|
self.channel_index = channel # Desired channel index
|
|
self.node_name = node_name or "ECU_TEST_NODE" # Default node name if not provided
|
|
self.sdf_path = sdf_path # SDF to load (if provided)
|
|
self.schedule_nr = schedule_nr # Schedule to start on connect
|
|
|
|
# Choose the BabyLIN wrapper module to use:
|
|
# - If wrapper_module provided (unit tests with mock), use it
|
|
# - Else dynamically import the real SDK wrapper (BabyLIN_library.py)
|
|
if wrapper_module is not None:
|
|
_bl = wrapper_module
|
|
else:
|
|
import importlib, sys, os # Local import to avoid global dependency during unit tests
|
|
_bl = None # Placeholder for resolved module
|
|
import_errors = [] # Accumulate import errors for diagnostics
|
|
for modname in ("BabyLIN_library", "vendor.BabyLIN_library"):
|
|
try:
|
|
_bl = importlib.import_module(modname)
|
|
break
|
|
except Exception as e: # pragma: no cover
|
|
import_errors.append((modname, str(e)))
|
|
if _bl is None:
|
|
# Try adding the common 'vendor' folder to sys.path then retry import
|
|
repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
|
|
vendor_dir = os.path.join(repo_root, "vendor")
|
|
if os.path.isdir(vendor_dir) and vendor_dir not in sys.path:
|
|
sys.path.insert(0, vendor_dir)
|
|
try:
|
|
_bl = importlib.import_module("BabyLIN_library")
|
|
except Exception as e: # pragma: no cover
|
|
import_errors.append(("BabyLIN_library", str(e)))
|
|
if _bl is None:
|
|
# Raise a helpful error with all attempted import paths
|
|
details = "; ".join([f"{m}: {err}" for m, err in import_errors]) or "not found"
|
|
raise RuntimeError(
|
|
"Failed to import BabyLIN_library. Ensure the SDK's BabyLIN_library.py is present in the project (e.g., vendor/BabyLIN_library.py). Details: "
|
|
+ details
|
|
)
|
|
|
|
# Create the BabyLIN SDK instance (module exposes create_BabyLIN())
|
|
self._BabyLIN = _bl.create_BabyLIN()
|
|
# Small helper to call BLC_* functions by name (keeps call sites concise)
|
|
self._bl_call = lambda name, *args, **kwargs: getattr(self._BabyLIN, name)(*args, **kwargs)
|
|
|
|
self._handle = None # Device handle returned by BLC_openPort
|
|
self._channel_handle = None # Per-channel handle returned by BLC_getChannelHandle
|
|
self._connected = False # Internal connection state flag
|
|
|
|
def _err(self, rc: int) -> None:
|
|
"""Raise a RuntimeError with a readable SDK error message for rc != BL_OK."""
|
|
if rc == self._BabyLIN.BL_OK:
|
|
return
|
|
# Prefer a human-friendly error string if the SDK provides it
|
|
try:
|
|
get_str = getattr(self._BabyLIN, 'BLC_getDetailedErrorString', None)
|
|
msg = get_str(rc) if get_str else f"rc={rc}"
|
|
if not isinstance(msg, str):
|
|
msg = str(msg)
|
|
except Exception:
|
|
msg = f"rc={rc}"
|
|
raise RuntimeError(f"BabyLIN error: {msg}")
|
|
|
|
def connect(self) -> None:
|
|
"""Open device, optionally load SDF, select channel, and start schedule."""
|
|
# Discover BabyLIN devices (returns a list of port identifiers)
|
|
ports = self._bl_call('BLC_getBabyLinPorts', 100)
|
|
if not ports:
|
|
raise RuntimeError("No BabyLIN devices found")
|
|
# Open the first available device port (you could extend to select by config)
|
|
self._handle = self._bl_call('BLC_openPort', ports[0])
|
|
if not self._handle:
|
|
raise RuntimeError("Failed to open BabyLIN port")
|
|
|
|
# Load SDF onto the device, if configured (3rd arg '1' often means 'download')
|
|
if self.sdf_path:
|
|
rc = self._bl_call('BLC_loadSDF', self._handle, self.sdf_path, 1)
|
|
if rc != self._BabyLIN.BL_OK:
|
|
self._err(rc)
|
|
|
|
# Get channel count and pick the configured channel index (default 0)
|
|
ch_count = self._bl_call('BLC_getChannelCount', self._handle)
|
|
if ch_count <= 0:
|
|
raise RuntimeError("No channels reported by device")
|
|
ch_idx = int(self.channel_index)
|
|
if ch_idx < 0 or ch_idx >= ch_count:
|
|
ch_idx = 0
|
|
# Resolve a channel handle used for all subsequent Tx/Rx commands
|
|
self._channel_handle = self._bl_call('BLC_getChannelHandle', self._handle, ch_idx)
|
|
|
|
# Start a schedule if configured (common requirement for regular polling/masters)
|
|
if self.schedule_nr is not None:
|
|
cmd = f"start schedule {int(self.schedule_nr)};"
|
|
rc = self._bl_call('BLC_sendCommand', self._channel_handle, cmd)
|
|
if rc != self._BabyLIN.BL_OK:
|
|
self._err(rc)
|
|
|
|
self._connected = True # Mark interface as connected
|
|
|
|
def disconnect(self) -> None:
|
|
"""Close device handles and reset internal state (best-effort)."""
|
|
try:
|
|
self._bl_call('BLC_closeAll') # Close all device connections via SDK
|
|
except Exception:
|
|
pass # Ignore SDK exceptions during shutdown
|
|
self._connected = False
|
|
self._handle = None
|
|
self._channel_handle = None
|
|
|
|
def send(self, frame: LinFrame) -> None:
|
|
"""Transmit a LIN frame using BLC_mon_set_xmit."""
|
|
if not self._connected or not self._channel_handle:
|
|
raise RuntimeError("BabyLIN not connected")
|
|
# slotTime=0 means use default timing configured by schedule/SDF
|
|
rc = self._bl_call('BLC_mon_set_xmit', self._channel_handle, int(frame.id), bytes(frame.data), 0)
|
|
if rc != self._BabyLIN.BL_OK:
|
|
self._err(rc)
|
|
|
|
def receive(self, id: Optional[int] = None, timeout: float = 1.0):
|
|
"""Receive a LIN frame with optional ID filter and timeout (seconds)."""
|
|
if not self._connected or not self._channel_handle:
|
|
raise RuntimeError("BabyLIN not connected")
|
|
ms = max(0, int(timeout * 1000)) # SDK expects milliseconds
|
|
try:
|
|
frame = self._bl_call('BLC_getNextFrameTimeout', self._channel_handle, ms)
|
|
except Exception:
|
|
# Many wrappers raise on timeout; unify as 'no data'
|
|
return None
|
|
if not frame:
|
|
return None
|
|
# Convert SDK frame to our LinFrame (mask to classic 6-bit LIN ID range)
|
|
fid = int(frame.frameId & 0x3F)
|
|
data = bytes(list(frame.frameData)[: int(frame.lenOfData)])
|
|
lin_frame = LinFrame(id=fid, data=data)
|
|
if id is None or fid == id:
|
|
return lin_frame
|
|
# If a different ID was received and caller requested a filter, return None
|
|
return None
|
|
|
|
def flush(self) -> None:
|
|
"""Flush RX buffers if the SDK exposes such a function (optional)."""
|
|
if not self._connected or not self._channel_handle:
|
|
return
|
|
try:
|
|
# Some SDKs may not expose flush; no-op if missing
|
|
flush = getattr(self._BabyLIN, 'BLC_flush', None)
|
|
if flush:
|
|
flush(self._channel_handle)
|
|
except Exception:
|
|
pass
|
|
|
|
def request(self, id: int, length: int, timeout: float = 1.0):
|
|
"""Perform a LIN master request and wait for response.
|
|
|
|
Strategy:
|
|
- Prefer SDK method `BLC_sendRawMasterRequest` if present (bytes or length variants).
|
|
- Fallback: transmit a header with zeroed payload; then wait for response.
|
|
- Always attempt to receive a frame with matching ID within 'timeout'.
|
|
"""
|
|
if not self._connected or not self._channel_handle:
|
|
raise RuntimeError("BabyLIN not connected")
|
|
|
|
sent = False # Track whether a request command was successfully issued
|
|
# Attempt to use raw master request if provided by SDK
|
|
# Preference: try (channel, frameId, length) first because our mock wrapper
|
|
# synthesizes a deterministic payload for this form (see vendor/mock_babylin_wrapper.py),
|
|
# then fall back to (channel, frameId, dataBytes) if the SDK only supports that.
|
|
raw_req = getattr(self._BabyLIN, 'BLC_sendRawMasterRequest', None)
|
|
if raw_req:
|
|
# Prefer the (channel, frameId, length) variant first if supported
|
|
try:
|
|
rc = raw_req(self._channel_handle, int(id), int(length))
|
|
if rc == self._BabyLIN.BL_OK:
|
|
sent = True
|
|
else:
|
|
self._err(rc)
|
|
except TypeError:
|
|
# Fallback to (channel, frameId, dataBytes)
|
|
try:
|
|
payload = bytes([0] * max(0, min(8, int(length))))
|
|
rc = raw_req(self._channel_handle, int(id), payload)
|
|
if rc == self._BabyLIN.BL_OK:
|
|
sent = True
|
|
else:
|
|
self._err(rc)
|
|
except Exception:
|
|
sent = False
|
|
except Exception:
|
|
sent = False
|
|
|
|
if not sent:
|
|
# Fallback: issue a transmit; many stacks will respond on the bus
|
|
self.send(LinFrame(id=id, data=bytes([0] * max(0, min(8, int(length))))))
|
|
|
|
# Wait for the response frame with matching ID (or None on timeout)
|
|
return self.receive(id=id, timeout=timeout)
|