"""Unit tests for the MUM LIN adapter using fake pylin/pymumclient modules. These tests don't talk to real hardware — they inject lightweight fakes via the adapter's `mum_module` / `pylin_module` constructor args to validate the adapter's plumbing (connect/disconnect, send, receive, send_raw, power_*). """ from __future__ import annotations import pytest from ecu_framework.lin.base import LinFrame from ecu_framework.lin.mum import MumLinInterface # ---- fakes --------------------------------------------------------------- class _FakePower: def __init__(self): self.up_calls = 0 self.down_calls = 0 def power_up(self): self.up_calls += 1 def power_down(self): self.down_calls += 1 class _FakeTransport: def __init__(self): self.raw_frames = [] def ld_put_raw(self, data, baudrate): self.raw_frames.append((bytes(data), int(baudrate))) class _FakeLinDev: def __init__(self, transport): self.baudrate = 0 self.tx = [] self._transport = transport # Pre-canned slave responses keyed by frame_id self.slave_responses = {0x11: [0x07, 0x00, 0x00, 0x00]} self.fail_on_recv_id = None def get_device(self, name): if name == "bus/transport_layer": return self._transport raise KeyError(name) def send_message(self, master_to_slave, frame_id, data_length, data=None): if master_to_slave: self.tx.append((int(frame_id), int(data_length), list(data or []))) return None # slave-to-master if self.fail_on_recv_id == int(frame_id): raise RuntimeError("simulated rx timeout") return self.slave_responses.get(int(frame_id)) class _FakeLinMaster: def __init__(self): self.setup_calls = 0 self.teardown_calls = 0 def setup(self): self.setup_calls += 1 def teardown(self): self.teardown_calls += 1 class _FakeMUM: """Stand-in for pymumclient.MelexisUniversalMaster().""" def __init__(self): self.opened_with = None self._lin_master = _FakeLinMaster() self._power = _FakePower() self._transport = _FakeTransport() self._lin_dev = _FakeLinDev(self._transport) def open_all(self, host): self.opened_with = host def get_device(self, name): if name == "lin0": return self._lin_master if name == "power_out0": return self._power raise KeyError(name) class _FakeMumModule: def __init__(self): self.last = None def MelexisUniversalMaster(self): # noqa: N802 - matches vendor API self.last = _FakeMUM() return self.last class _FakePylinModule: """Stand-in for pylin: provides LinBusManager and LinDevice22.""" def __init__(self, lin_dev_factory): # lin_dev_factory(lin_bus) returns an object with the .get_device, # .send_message and .baudrate API used by MumLinInterface. self._lin_dev_factory = lin_dev_factory def LinBusManager(self, linmaster): # noqa: N802 return ("bus_for", linmaster) def LinDevice22(self, lin_bus): # noqa: N802 return self._lin_dev_factory(lin_bus) # ---- helpers ------------------------------------------------------------- def _build_iface(boot_settle=0.0): """Construct a MumLinInterface wired to fake modules; return (iface, fakes).""" mum_mod = _FakeMumModule() # Pylin's LinDevice22 should hand back the same FakeLinDev that's # attached to the MUM instance for this test, so assertions can read tx. captured = {} def lin_dev_factory(lin_bus): # The mum module's get_device('lin0') will be called from connect(); # but pylin.LinDevice22(lin_bus) just needs to expose the same API. # We pull the FakeLinDev off the FakeMUM that was constructed. captured["lin_dev"] = mum_mod.last._lin_dev return mum_mod.last._lin_dev pylin_mod = _FakePylinModule(lin_dev_factory) iface = MumLinInterface( host="10.0.0.1", boot_settle_seconds=boot_settle, mum_module=mum_mod, pylin_module=pylin_mod, ) return iface, mum_mod, captured # ---- tests --------------------------------------------------------------- @pytest.mark.unit def test_connect_opens_mum_and_powers_up(): iface, mum_mod, _ = _build_iface() iface.connect() try: assert mum_mod.last.opened_with == "10.0.0.1" assert mum_mod.last._lin_master.setup_calls == 1 assert mum_mod.last._power.up_calls == 1 assert iface._lin_dev.baudrate == 19200 finally: iface.disconnect() @pytest.mark.unit def test_disconnect_powers_down_and_tears_down(): iface, mum_mod, _ = _build_iface() iface.connect() iface.disconnect() assert mum_mod.last._power.down_calls == 1 assert mum_mod.last._lin_master.teardown_calls == 1 @pytest.mark.unit def test_send_publishes_master_frame(): iface, mum_mod, _ = _build_iface() iface.connect() try: iface.send(LinFrame(id=0x0A, data=bytes([1, 2, 3, 4, 5, 6, 7, 8]))) tx = mum_mod.last._lin_dev.tx assert tx == [(0x0A, 8, [1, 2, 3, 4, 5, 6, 7, 8])] finally: iface.disconnect() @pytest.mark.unit def test_receive_uses_frame_lengths_default(): iface, _, _ = _build_iface() iface.connect() try: frame = iface.receive(id=0x11, timeout=0.1) assert frame is not None assert frame.id == 0x11 # Default frame_lengths maps 0x11 -> 4 assert len(frame.data) == 4 assert frame.data[0] == 0x07 finally: iface.disconnect() @pytest.mark.unit def test_receive_returns_none_on_pylin_exception(): iface, mum_mod, _ = _build_iface() iface.connect() try: mum_mod.last._lin_dev.fail_on_recv_id = 0x11 assert iface.receive(id=0x11, timeout=0.1) is None finally: iface.disconnect() @pytest.mark.unit def test_receive_without_id_raises(): iface, _, _ = _build_iface() iface.connect() try: with pytest.raises(NotImplementedError): iface.receive(id=None) finally: iface.disconnect() @pytest.mark.unit def test_send_raw_uses_classic_checksum_path(): iface, mum_mod, _ = _build_iface() iface.connect() try: iface.send_raw(b"\x7f\x06\xb5\xff\x7f\x01\x02\xff") raw = mum_mod.last._transport.raw_frames assert len(raw) == 1 assert raw[0][0] == b"\x7f\x06\xb5\xff\x7f\x01\x02\xff" assert raw[0][1] == 19200 finally: iface.disconnect() @pytest.mark.unit def test_power_cycle_calls_down_then_up(): iface, mum_mod, _ = _build_iface() iface.connect() try: iface.power_cycle(wait=0.0) finally: iface.disconnect() assert mum_mod.last._power.up_calls >= 2 # initial connect + cycle assert mum_mod.last._power.down_calls >= 1