"""ALM_Node domain helpers — the single contributor-facing API for ALM tests. This module is the **only thing test bodies should import** for ALM hardware tests. It defines: - Typed enums (:class:`LedState`, :class:`Mode`, :class:`Update`, :class:`NVMStatus`, :class:`VoltageStatus`, :class:`ThermalStatus`) that mirror the LDF's ``Signal_encoding_types`` blocks. These used to be auto-generated by ``deprecated/gen_lin_api.py``; that path is now retired and the generator + last-emitted file live under ``deprecated/`` for historical reference. Update these enums by hand when the LDF gains new logical encodings. - Module-level constants (LED_STATE_*, polling cadences, PWM tolerances). - :class:`AlmTester` — bound to a ``FrameIO`` and a node NAD; the per-signal read / per-action send surface plus the cross-frame patterns (wait_for_state, measure ANIMATING, assert PWM matches the rgb_to_pwm calculator). - Pure utilities (:func:`ntc_kelvin_to_celsius`, :func:`pwm_within_tol`). Test bodies should reach for ``AlmTester`` methods (``send_color``, ``read_led_state``, ``wait_for_led_on``, ``assert_pwm_matches_rgb``, …) rather than calling ``fio.send("ALM_Req_A", AmbLightColourRed=…)`` directly. Strings flow through this module to ``FrameIO`` so tests never need to know the LDF schema. Maintenance pact: when the LDF gains a signal or a frame that tests should use, the corresponding ``read_*`` / ``send_*`` method (and, if needed, a new IntEnum) goes here. Tests never reach past this module. """ from __future__ import annotations import time from enum import IntEnum from typing import Optional, Union from frame_io import FrameIO from vendor.rgb_to_pwm import compute_pwm # --------------------------------------------------------------------------- # Typed enums (mirroring the LDF's Signal_encoding_types blocks for the ALM # frames). Originally generated by ``deprecated/gen_lin_api.py`` from the LDF; # inlined here when AlmTester became the single contributor-facing surface, # so tests don't need to import a separate generated module at all. The # generator and its previously-emitted output are kept under ``deprecated/`` # for historical reference. When the LDF gains a new logical encoding, # update the matching IntEnum below by hand. # --------------------------------------------------------------------------- class Update(IntEnum): """LDF Signal_encoding_types.Update — AmbLightUpdate values.""" IMMEDIATE_COLOR_UPDATE = 0x00 COLOR_MEMORIZATION = 0x01 APPLY_MEMORIZED_COLOR = 0x02 DISCARD_MEMORIZED_COLOR = 0x03 class Mode(IntEnum): """LDF Signal_encoding_types.Mode — AmbLightMode values (logical + physical).""" IMMEDIATE_SETPOINT = 0x00 FADING_EFFECT_1 = 0x01 FADING_EFFECT_2 = 0x02 TBD_0X03 = 0x03 TBD_0X04 = 0x04 # physical_value 5..63 scale=1.0 offset=0.0 unit='Not Used' — pass int directly class LedState(IntEnum): """LDF Signal_encoding_types.LED_State — ALMLEDState values.""" LED_OFF = 0x00 LED_ANIMATING = 0x01 LED_ON = 0x02 RESERVED = 0x03 class VoltageStatus(IntEnum): """LDF Signal_encoding_types.VoltageStatus — ALMVoltageStatus values.""" NORMAL_VOLTAGE = 0x00 POWER_UNDERVOLTAGE = 0x01 POWER_OVERVOLTAGE = 0x02 RESERVED_0X03 = 0x03 RESERVED_0X04 = 0x04 RESERVED_0X05 = 0x05 RESERVED_0X06 = 0x06 RESERVED_0X07 = 0x07 RESERVED_0X08 = 0x08 RESERVED_0X09 = 0x09 RESERVED_0X0A = 0x0A RESERVED_0X0B = 0x0B RESERVED_0X0C = 0x0C RESERVED_0X0D = 0x0D RESERVED_0X0E = 0x0E RESERVED_0X0F = 0x0F class ThermalStatus(IntEnum): """LDF Signal_encoding_types.ThermalStatus — ALMThermalStatus values.""" NORMAL_TEMPERATURE = 0x00 THERMAL_DERATING = 0x01 THERMAL_SHUTDOWN = 0x02 RESERVED_0X03 = 0x03 RESERVED_0X04 = 0x04 RESERVED_0X05 = 0x05 RESERVED_0X06 = 0x06 RESERVED_0X07 = 0x07 RESERVED_0X08 = 0x08 RESERVED_0X09 = 0x09 RESERVED_0X0A = 0x0A RESERVED_0X0B = 0x0B RESERVED_0X0C = 0x0C RESERVED_0X0D = 0x0D RESERVED_0X0E = 0x0E RESERVED_0X0F = 0x0F class NVMStatus(IntEnum): """LDF Signal_encoding_types.NVMStatus — ALMNVMStatus values.""" NVM_OK = 0x00 NVM_NOK = 0x01 RESERVED_0X02 = 0x02 RESERVED_0X03 = 0x03 RESERVED_0X04 = 0x04 RESERVED_0X05 = 0x05 RESERVED_0X06 = 0x06 RESERVED_0X07 = 0x07 RESERVED_0X08 = 0x08 RESERVED_0X09 = 0x09 RESERVED_0X0A = 0x0A RESERVED_0X0B = 0x0B RESERVED_0X0C = 0x0C RESERVED_0X0D = 0x0D RESERVED_0X0E = 0x0E RESERVED_0X0F = 0x0F # --- ALMLEDState values (from LDF Signal_encoding_types: LED_State) -------- LED_STATE_OFF = 0 LED_STATE_ANIMATING = 1 LED_STATE_ON = 2 # --- Test pacing ----------------------------------------------------------- # The LIN bus runs at 10 ms frame periodicity, so polling faster than that # returns the same buffered slave data. We poll every 50 ms (5 LIN periods) # which keeps the loop responsive without hammering the bus, and we let the # slave settle for 100 ms (10 LIN periods) before reading PWM_Frame / # PWM_wo_Comp so the firmware has time to populate the TX buffer with fresh # values. STATE_POLL_INTERVAL = 0.05 # 50 ms — 5 LIN frame periods STATE_RECEIVE_TIMEOUT = 0.2 # Per-poll receive timeout; keeps the loop iterating STATE_TIMEOUT_DEFAULT = 1.0 PWM_SETTLE_SECONDS = 0.1 # 100 ms — wait for slave to refresh PWM_Frame TX buffer DURATION_LSB_SECONDS = 0.2 # AmbLightDuration scaling per the ECU spec (1 step = 200 ms) FORCE_OFF_SETTLE_SECONDS = 0.4 # Pause after the OFF command before yielding to the test # --- PWM tolerances -------------------------------------------------------- # Tj_Frame_NTC reports the junction temperature in Kelvin; we convert to °C # at runtime and feed compute_pwm() so the temperature compensation matches # what the ECU is applying. KELVIN_TO_CELSIUS_OFFSET = 273.15 PWM_ABS_TOL = 3277 # ±5% of 16-bit full scale (65535 * 0.05) PWM_REL_TOL = 0.05 # ±5% of expected, whichever is larger # --- Pure utilities -------------------------------------------------------- def ntc_kelvin_to_celsius(ntc_raw: int) -> float: """Convert a Tj_Frame_NTC reading (Kelvin) to °C for compute_pwm().""" return float(ntc_raw) - KELVIN_TO_CELSIUS_OFFSET def pwm_within_tol(actual: int, expected: int) -> bool: """True iff ``actual`` is within ``max(PWM_ABS_TOL, expected * PWM_REL_TOL)`` of ``expected``.""" return abs(actual - expected) <= max(PWM_ABS_TOL, abs(expected) * PWM_REL_TOL) def _band(expected: int) -> int: """The numeric tolerance band used in PWM assertion error messages.""" return max(PWM_ABS_TOL, int(abs(expected) * PWM_REL_TOL)) # --- AlmTester ------------------------------------------------------------- class AlmTester: """ALM_Node helpers bound to a :class:`FrameIO` and a node NAD. All test-side patterns for driving ALM_Req_A, polling ALM_Status, and validating PWM frames live here. Internally everything goes through ``FrameIO`` — there is no direct frame-ref handling. Typical fixture usage:: @pytest.fixture(scope="module") def fio(lin, ldf): return FrameIO(lin, ldf) @pytest.fixture(scope="module") def alm(fio): nad = fio.read_signal("ALM_Status", "ALMNadNo") if nad is None: pytest.skip("ECU not responding on ALM_Status") return AlmTester(fio, int(nad)) """ def __init__(self, fio: FrameIO, nad: int) -> None: self._fio = fio self._nad = int(nad) # --- properties -------------------------------------------------------- @property def fio(self) -> FrameIO: return self._fio @property def nad(self) -> int: return self._nad # --- ALM_Status polling ------------------------------------------------ def read_led_state(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> int: """Read ALMLEDState; -1 if the read timed out. Uses a short receive timeout so that polling loops don't stall for a full second on a single missed frame. """ decoded = self._fio.receive("ALM_Status", timeout=timeout) if decoded is None: return -1 return int(decoded.get("ALMLEDState", -1)) def wait_for_state( self, target: int, timeout: float ) -> tuple[bool, float, list[int]]: """Poll ALMLEDState until it equals ``target``, or until ``timeout``. Returns ``(reached, elapsed_seconds, observed_state_history)``. """ seen: list[int] = [] deadline = time.monotonic() + timeout start = time.monotonic() while time.monotonic() < deadline: st = self.read_led_state() if not seen or seen[-1] != st: seen.append(st) if st == target: return True, time.monotonic() - start, seen time.sleep(STATE_POLL_INTERVAL) return False, time.monotonic() - start, seen def measure_animating_window( self, max_wait: float ) -> tuple[Optional[float], list[int]]: """Wait for ANIMATING to start, then for it to leave ANIMATING. Returns ``(animating_seconds, state_history)``. If ANIMATING is never observed within ``max_wait``, returns ``(None, history)``. """ seen: list[int] = [] started_at: Optional[float] = None deadline = time.monotonic() + max_wait while time.monotonic() < deadline: st = self.read_led_state() if not seen or seen[-1] != st: seen.append(st) if started_at is None and st == LED_STATE_ANIMATING: started_at = time.monotonic() elif started_at is not None and st != LED_STATE_ANIMATING: return time.monotonic() - started_at, seen time.sleep(STATE_POLL_INTERVAL) return None, seen # --- ALM_Status per-signal readers ------------------------------------ # # These mirror the signals carried by ALM_Status (the slave-published # status frame). Each one does its own ``fio.receive`` so a test that # only needs one signal doesn't pay for decoding the whole frame — # though in practice ldfparser decodes the full frame either way. def read_nad(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> Optional[int]: """Read ALMNadNo from ALM_Status; ``None`` on timeout.""" decoded = self._fio.receive("ALM_Status", timeout=timeout) if decoded is None: return None return int(decoded["ALMNadNo"]) def read_voltage_status(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> Optional[int]: """Read ALMVoltageStatus from ALM_Status; ``None`` on timeout. Compare against :class:`VoltageStatus` enum members (``NORMAL_VOLTAGE`` / ``POWER_UNDERVOLTAGE`` / ``POWER_OVERVOLTAGE``). """ decoded = self._fio.receive("ALM_Status", timeout=timeout) if decoded is None: return None return int(decoded["ALMVoltageStatus"]) def read_thermal_status(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> Optional[int]: """Read ALMThermalStatus from ALM_Status; ``None`` on timeout. Compare against :class:`ThermalStatus` enum members (``NORMAL_TEMPERATURE`` / ``THERMAL_DERATING`` / ``THERMAL_SHUTDOWN``). """ decoded = self._fio.receive("ALM_Status", timeout=timeout) if decoded is None: return None return int(decoded["ALMThermalStatus"]) def read_nvm_status(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> Optional[int]: """Read ALMNVMStatus from ALM_Status; ``None`` on timeout. Compare against :class:`NVMStatus` enum members. """ decoded = self._fio.receive("ALM_Status", timeout=timeout) if decoded is None: return None return int(decoded["ALMNVMStatus"]) def read_sig_comm_err(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> Optional[int]: """Read SigCommErr from ALM_Status; ``None`` on timeout.""" decoded = self._fio.receive("ALM_Status", timeout=timeout) if decoded is None: return None return int(decoded["SigCommErr"]) # --- Tj_Frame readers -------------------------------------------------- def read_ntc_kelvin(self) -> Optional[int]: """Raw NTC reading in Kelvin from Tj_Frame_NTC; ``None`` on timeout.""" raw = self._fio.read_signal("Tj_Frame", "Tj_Frame_NTC") return None if raw is None else int(raw) def read_ntc_celsius(self) -> Optional[float]: """NTC reading converted to °C; ``None`` on timeout.""" raw = self.read_ntc_kelvin() return None if raw is None else ntc_kelvin_to_celsius(raw) # --- PWM readers ------------------------------------------------------ def read_pwm(self) -> Optional[tuple[int, int, int, int]]: """Read PWM_Frame channels; returns ``(R, G, B1, B2)`` or ``None``. These are the temperature-compensated PWM values the ECU drives the LED rails with. Compare against :func:`compute_pwm(...).pwm_comp` for assertions, or use :meth:`assert_pwm_matches_rgb` for the full pattern. """ decoded = self._fio.receive("PWM_Frame") if decoded is None: return None return ( int(decoded["PWM_Frame_Red"]), int(decoded["PWM_Frame_Green"]), int(decoded["PWM_Frame_Blue1"]), int(decoded["PWM_Frame_Blue2"]), ) def read_pwm_wo_comp(self) -> Optional[tuple[int, int, int]]: """Read PWM_wo_Comp channels; returns ``(R, G, B)`` or ``None``. These are the non-temperature-compensated PWM values — useful when tests want to assert a deterministic mapping from RGB to PWM without involving the runtime NTC reading. """ decoded = self._fio.receive("PWM_wo_Comp") if decoded is None: return None return ( int(decoded["PWM_wo_Comp_Red"]), int(decoded["PWM_wo_Comp_Green"]), int(decoded["PWM_wo_Comp_Blue"]), ) # --- ALM_Req_A senders (per-action, intent-shaped) -------------------- # # ``send_color`` is the single workhorse. The save/apply/discard # convenience methods are thin wrappers that pick the right # ``AmbLightUpdate`` value and leave colour/intensity/mode to the # caller. def send_color( self, *, red: int, green: int, blue: int, intensity: int = 255, mode: Union[Mode, int] = Mode.IMMEDIATE_SETPOINT, update: Union[Update, int] = Update.IMMEDIATE_COLOR_UPDATE, duration: int = 0, lid_from: Optional[int] = None, lid_to: Optional[int] = None, ) -> None: """Publish ALM_Req_A with the given colour / mode / update. ``lid_from`` and ``lid_to`` default to this tester's NAD — i.e. unicast to the bound node. Pass them explicitly for broadcast or range targeting (or use :meth:`send_color_broadcast`). ``mode``, ``update`` accept either :class:`Mode` / :class:`Update` enum members or raw ints — both round-trip identically since the enums inherit from ``IntEnum``. """ nad = self._nad self._fio.send( "ALM_Req_A", AmbLightColourRed=int(red), AmbLightColourGreen=int(green), AmbLightColourBlue=int(blue), AmbLightIntensity=int(intensity), AmbLightUpdate=int(update), AmbLightMode=int(mode), AmbLightDuration=int(duration), AmbLightLIDFrom=int(lid_from if lid_from is not None else nad), AmbLightLIDTo=int(lid_to if lid_to is not None else nad), ) def send_color_broadcast( self, *, red: int, green: int, blue: int, intensity: int = 255, mode: Union[Mode, int] = Mode.IMMEDIATE_SETPOINT, update: Union[Update, int] = Update.IMMEDIATE_COLOR_UPDATE, duration: int = 0, ) -> None: """Broadcast: send the same colour to LID range 0x00–0xFF (every node).""" self.send_color( red=red, green=green, blue=blue, intensity=intensity, mode=mode, update=update, duration=duration, lid_from=0x00, lid_to=0xFF, ) def save_color( self, *, red: int, green: int, blue: int, intensity: int = 255, mode: Union[Mode, int] = Mode.IMMEDIATE_SETPOINT, duration: int = 0, ) -> None: """Memorize a colour without applying it (Update.COLOR_MEMORIZATION). The ECU buffers the request; the LED state does NOT change until a later :meth:`apply_saved_color` call. Useful for testing the save/apply semantics independently of the immediate-update path. """ self.send_color( red=red, green=green, blue=blue, intensity=intensity, mode=mode, update=Update.COLOR_MEMORIZATION, duration=duration, ) def apply_saved_color(self) -> None: """Apply the previously-saved colour (Update.APPLY_MEMORIZED_COLOR).""" self.send_color( red=0, green=0, blue=0, intensity=0, mode=Mode.IMMEDIATE_SETPOINT, update=Update.APPLY_MEMORIZED_COLOR, duration=0, ) def discard_saved_color(self) -> None: """Discard the previously-saved colour (Update.DISCARD_MEMORIZED_COLOR).""" self.send_color( red=0, green=0, blue=0, intensity=0, mode=Mode.IMMEDIATE_SETPOINT, update=Update.DISCARD_MEMORIZED_COLOR, duration=0, ) # --- ConfigFrame sender ----------------------------------------------- def send_config( self, *, calibration: int = 0, enable_derating: int = 1, enable_compensation: int = 1, max_lm: int = 3840, ) -> None: """Publish ConfigFrame. Defaults match the ECU's nominal config (derating + compensation enabled, calibration off, max_lm=3840). Tests that want to toggle a single field pass that one kwarg; the rest stay at nominal. """ self._fio.send( "ConfigFrame", ConfigFrame_Calibration=int(calibration), ConfigFrame_EnableDerating=int(enable_derating), ConfigFrame_EnableCompensation=int(enable_compensation), ConfigFrame_MaxLM=int(max_lm), ) # --- LED control ------------------------------------------------------ def force_off(self) -> None: """Drive the LED to OFF (intensity=0, mode=IMMEDIATE_SETPOINT) and pause briefly.""" self.send_color(red=0, green=0, blue=0, intensity=0, duration=0) time.sleep(FORCE_OFF_SETTLE_SECONDS) # --- wait_for_state convenience wrappers ------------------------------ def wait_for_led_on(self, timeout: float = STATE_TIMEOUT_DEFAULT) -> bool: """Block until ALMLEDState == LED_ON or timeout. Returns whether reached.""" reached, _, _ = self.wait_for_state(LedState.LED_ON, timeout=timeout) return reached def wait_for_led_off(self, timeout: float = STATE_TIMEOUT_DEFAULT) -> bool: """Block until ALMLEDState == LED_OFF or timeout. Returns whether reached.""" reached, _, _ = self.wait_for_state(LedState.LED_OFF, timeout=timeout) return reached def wait_for_animating(self, timeout: float = STATE_TIMEOUT_DEFAULT) -> bool: """Block until ALMLEDState == LED_ANIMATING or timeout. Returns whether reached.""" reached, _, _ = self.wait_for_state(LedState.LED_ANIMATING, timeout=timeout) return reached # --- PWM assertions --------------------------------------------------- def assert_pwm_matches_rgb( self, rp, r: int, g: int, b: int, *, label: str = "" ) -> None: """Assert PWM_Frame matches ``compute_pwm(r,g,b,temp_c=Tj_NTC-273.15).pwm_comp``. Reads Tj_Frame_NTC (Kelvin), converts to °C, and feeds that temperature into ``compute_pwm`` so the temperature compensation matches what the ECU is applying. Both ``PWM_Frame_Blue1`` and ``PWM_Frame_Blue2`` are asserted equal to the expected blue PWM. """ suffix = f"_{label}" if label else "" ntc_raw = self._fio.read_signal("Tj_Frame", "Tj_Frame_NTC") assert ntc_raw is not None, "Tj_Frame not received within timeout" temp_c = ntc_kelvin_to_celsius(int(ntc_raw)) rp(f"ntc_raw_kelvin{suffix}", int(ntc_raw)) rp(f"temp_c_used{suffix}", round(temp_c, 2)) expected = compute_pwm(r, g, b, temp_c=temp_c).pwm_comp exp_r, exp_g, exp_b = expected rp(f"expected_pwm{suffix}", { "red": exp_r, "green": exp_g, "blue": exp_b, "rgb_in": (r, g, b), "temp_c_used": round(temp_c, 2), }) # Let the firmware refresh PWM_Frame's TX buffer with the new values. time.sleep(PWM_SETTLE_SECONDS) decoded = self._fio.receive("PWM_Frame") assert decoded is not None, "PWM_Frame not received within timeout" actual_r = int(decoded["PWM_Frame_Red"]) actual_g = int(decoded["PWM_Frame_Green"]) actual_b1 = int(decoded["PWM_Frame_Blue1"]) actual_b2 = int(decoded["PWM_Frame_Blue2"]) rp(f"actual_pwm{suffix}", { "red": actual_r, "green": actual_g, "blue1": actual_b1, "blue2": actual_b2, }) assert pwm_within_tol(actual_r, exp_r), ( f"PWM_Frame_Red {actual_r} differs from expected {exp_r} " f"by more than ±{_band(exp_r)} (rgb_in={(r, g, b)})" ) assert pwm_within_tol(actual_g, exp_g), ( f"PWM_Frame_Green {actual_g} differs from expected {exp_g} " f"by more than ±{_band(exp_g)} (rgb_in={(r, g, b)})" ) assert pwm_within_tol(actual_b1, exp_b), ( f"PWM_Frame_Blue1 {actual_b1} differs from expected {exp_b} " f"by more than ±{_band(exp_b)} (rgb_in={(r, g, b)})" ) assert pwm_within_tol(actual_b2, exp_b), ( f"PWM_Frame_Blue2 {actual_b2} differs from expected {exp_b} " f"by more than ±{_band(exp_b)} (rgb_in={(r, g, b)})" ) def assert_pwm_wo_comp_matches_rgb( self, rp, r: int, g: int, b: int, *, label: str = "" ) -> None: """Assert PWM_wo_Comp matches ``compute_pwm(r,g,b).pwm_no_comp``. ``PWM_wo_Comp`` carries the non-compensated PWM values, so the expected output is temperature-independent. NTC is still logged for visibility. """ suffix = f"_{label}" if label else "" expected = compute_pwm(r, g, b).pwm_no_comp # temp_c is unused for pwm_no_comp exp_r, exp_g, exp_b = expected rp(f"expected_pwm_wo_comp{suffix}", { "red": exp_r, "green": exp_g, "blue": exp_b, "rgb_in": (r, g, b), }) ntc_raw = self._fio.read_signal("Tj_Frame", "Tj_Frame_NTC") rp(f"ntc_raw_kelvin{suffix}", ntc_raw) # Let the firmware refresh PWM_wo_Comp's TX buffer before sampling it. time.sleep(PWM_SETTLE_SECONDS) decoded = self._fio.receive("PWM_wo_Comp") assert decoded is not None, "PWM_wo_Comp not received within timeout" actual_r = int(decoded["PWM_wo_Comp_Red"]) actual_g = int(decoded["PWM_wo_Comp_Green"]) actual_b = int(decoded["PWM_wo_Comp_Blue"]) rp(f"actual_pwm_wo_comp{suffix}", { "red": actual_r, "green": actual_g, "blue": actual_b, }) assert pwm_within_tol(actual_r, exp_r), ( f"PWM_wo_Comp_Red {actual_r} differs from expected {exp_r} " f"by more than ±{_band(exp_r)} (rgb_in={(r, g, b)})" ) assert pwm_within_tol(actual_g, exp_g), ( f"PWM_wo_Comp_Green {actual_g} differs from expected {exp_g} " f"by more than ±{_band(exp_g)} (rgb_in={(r, g, b)})" ) assert pwm_within_tol(actual_b, exp_b), ( f"PWM_wo_Comp_Blue {actual_b} differs from expected {exp_b} " f"by more than ±{_band(exp_b)} (rgb_in={(r, g, b)})" )