"""ALM_Node domain helpers built on :class:`frame_io.FrameIO`. This module is intentionally narrow: it knows about the ALM_Node frames defined in the project's LDF (``ALM_Req_A``, ``ALM_Status``, ``Tj_Frame``, ``PWM_Frame``, ``PWM_wo_Comp``, ``ConfigFrame``) and how the test suite wants to interact with them. Generic LDF-driven I/O lives in :mod:`frame_io` so it can be reused across other ECUs. Public surface: - Module-level constants (LED_STATE_*, polling cadences, PWM tolerances) - :class:`AlmTester` — bound to a ``FrameIO`` and a ``NAD``; encodes the test patterns (force off, wait for state, measure ANIMATING, assert PWM matches the rgb_to_pwm calculator) - Pure utilities (:func:`ntc_kelvin_to_celsius`, :func:`pwm_within_tol`) """ from __future__ import annotations import time from typing import Optional from frame_io import FrameIO from vendor.rgb_to_pwm import compute_pwm # --- ALMLEDState values (from LDF Signal_encoding_types: LED_State) -------- LED_STATE_OFF = 0 LED_STATE_ANIMATING = 1 LED_STATE_ON = 2 # --- Test pacing ----------------------------------------------------------- # The LIN bus runs at 10 ms frame periodicity, so polling faster than that # returns the same buffered slave data. We poll every 50 ms (5 LIN periods) # which keeps the loop responsive without hammering the bus, and we let the # slave settle for 100 ms (10 LIN periods) before reading PWM_Frame / # PWM_wo_Comp so the firmware has time to populate the TX buffer with fresh # values. STATE_POLL_INTERVAL = 0.05 # 50 ms — 5 LIN frame periods STATE_RECEIVE_TIMEOUT = 0.2 # Per-poll receive timeout; keeps the loop iterating STATE_TIMEOUT_DEFAULT = 1.0 PWM_SETTLE_SECONDS = 0.1 # 100 ms — wait for slave to refresh PWM_Frame TX buffer DURATION_LSB_SECONDS = 0.2 # AmbLightDuration scaling per the ECU spec (1 step = 200 ms) FORCE_OFF_SETTLE_SECONDS = 0.4 # Pause after the OFF command before yielding to the test # --- PWM tolerances -------------------------------------------------------- # Tj_Frame_NTC reports the junction temperature in Kelvin; we convert to °C # at runtime and feed compute_pwm() so the temperature compensation matches # what the ECU is applying. KELVIN_TO_CELSIUS_OFFSET = 273.15 PWM_ABS_TOL = 3277 # ±5% of 16-bit full scale (65535 * 0.05) PWM_REL_TOL = 0.05 # ±5% of expected, whichever is larger # --- Pure utilities -------------------------------------------------------- def ntc_kelvin_to_celsius(ntc_raw: int) -> float: """Convert a Tj_Frame_NTC reading (Kelvin) to °C for compute_pwm().""" return float(ntc_raw) - KELVIN_TO_CELSIUS_OFFSET def pwm_within_tol(actual: int, expected: int) -> bool: """True iff ``actual`` is within ``max(PWM_ABS_TOL, expected * PWM_REL_TOL)`` of ``expected``.""" return abs(actual - expected) <= max(PWM_ABS_TOL, abs(expected) * PWM_REL_TOL) def _band(expected: int) -> int: """The numeric tolerance band used in PWM assertion error messages.""" return max(PWM_ABS_TOL, int(abs(expected) * PWM_REL_TOL)) # --- AlmTester ------------------------------------------------------------- class AlmTester: """ALM_Node helpers bound to a :class:`FrameIO` and a node NAD. All test-side patterns for driving ALM_Req_A, polling ALM_Status, and validating PWM frames live here. Internally everything goes through ``FrameIO`` — there is no direct frame-ref handling. Typical fixture usage:: @pytest.fixture(scope="module") def fio(lin, ldf): return FrameIO(lin, ldf) @pytest.fixture(scope="module") def alm(fio): nad = fio.read_signal("ALM_Status", "ALMNadNo") if nad is None: pytest.skip("ECU not responding on ALM_Status") return AlmTester(fio, int(nad)) """ def __init__(self, fio: FrameIO, nad: int) -> None: self._fio = fio self._nad = int(nad) # --- properties -------------------------------------------------------- @property def fio(self) -> FrameIO: return self._fio @property def nad(self) -> int: return self._nad # --- ALM_Status polling ------------------------------------------------ def read_led_state(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> int: """Read ALMLEDState; -1 if the read timed out. Uses a short receive timeout so that polling loops don't stall for a full second on a single missed frame. """ decoded = self._fio.receive("ALM_Status", timeout=timeout) if decoded is None: return -1 return int(decoded.get("ALMLEDState", -1)) def wait_for_state( self, target: int, timeout: float ) -> tuple[bool, float, list[int]]: """Poll ALMLEDState until it equals ``target``, or until ``timeout``. Returns ``(reached, elapsed_seconds, observed_state_history)``. """ seen: list[int] = [] deadline = time.monotonic() + timeout start = time.monotonic() while time.monotonic() < deadline: st = self.read_led_state() if not seen or seen[-1] != st: seen.append(st) if st == target: return True, time.monotonic() - start, seen time.sleep(STATE_POLL_INTERVAL) return False, time.monotonic() - start, seen def measure_animating_window( self, max_wait: float ) -> tuple[Optional[float], list[int]]: """Wait for ANIMATING to start, then for it to leave ANIMATING. Returns ``(animating_seconds, state_history)``. If ANIMATING is never observed within ``max_wait``, returns ``(None, history)``. """ seen: list[int] = [] started_at: Optional[float] = None deadline = time.monotonic() + max_wait while time.monotonic() < deadline: st = self.read_led_state() if not seen or seen[-1] != st: seen.append(st) if started_at is None and st == LED_STATE_ANIMATING: started_at = time.monotonic() elif started_at is not None and st != LED_STATE_ANIMATING: return time.monotonic() - started_at, seen time.sleep(STATE_POLL_INTERVAL) return None, seen # --- LED control ------------------------------------------------------ def force_off(self) -> None: """Drive the LED to OFF (mode=0, intensity=0) and pause briefly.""" self._fio.send( "ALM_Req_A", AmbLightColourRed=0, AmbLightColourGreen=0, AmbLightColourBlue=0, AmbLightIntensity=0, AmbLightUpdate=0, AmbLightMode=0, AmbLightDuration=0, AmbLightLIDFrom=self._nad, AmbLightLIDTo=self._nad, ) time.sleep(FORCE_OFF_SETTLE_SECONDS) # --- PWM assertions --------------------------------------------------- def assert_pwm_matches_rgb( self, rp, r: int, g: int, b: int, *, label: str = "" ) -> None: """Assert PWM_Frame matches ``compute_pwm(r,g,b,temp_c=Tj_NTC-273.15).pwm_comp``. Reads Tj_Frame_NTC (Kelvin), converts to °C, and feeds that temperature into ``compute_pwm`` so the temperature compensation matches what the ECU is applying. Both ``PWM_Frame_Blue1`` and ``PWM_Frame_Blue2`` are asserted equal to the expected blue PWM. """ suffix = f"_{label}" if label else "" ntc_raw = self._fio.read_signal("Tj_Frame", "Tj_Frame_NTC") assert ntc_raw is not None, "Tj_Frame not received within timeout" temp_c = ntc_kelvin_to_celsius(int(ntc_raw)) rp(f"ntc_raw_kelvin{suffix}", int(ntc_raw)) rp(f"temp_c_used{suffix}", round(temp_c, 2)) expected = compute_pwm(r, g, b, temp_c=temp_c).pwm_comp exp_r, exp_g, exp_b = expected rp(f"expected_pwm{suffix}", { "red": exp_r, "green": exp_g, "blue": exp_b, "rgb_in": (r, g, b), "temp_c_used": round(temp_c, 2), }) # Let the firmware refresh PWM_Frame's TX buffer with the new values. time.sleep(PWM_SETTLE_SECONDS) decoded = self._fio.receive("PWM_Frame") assert decoded is not None, "PWM_Frame not received within timeout" actual_r = int(decoded["PWM_Frame_Red"]) actual_g = int(decoded["PWM_Frame_Green"]) actual_b1 = int(decoded["PWM_Frame_Blue1"]) actual_b2 = int(decoded["PWM_Frame_Blue2"]) rp(f"actual_pwm{suffix}", { "red": actual_r, "green": actual_g, "blue1": actual_b1, "blue2": actual_b2, }) assert pwm_within_tol(actual_r, exp_r), ( f"PWM_Frame_Red {actual_r} differs from expected {exp_r} " f"by more than ±{_band(exp_r)} (rgb_in={(r, g, b)})" ) assert pwm_within_tol(actual_g, exp_g), ( f"PWM_Frame_Green {actual_g} differs from expected {exp_g} " f"by more than ±{_band(exp_g)} (rgb_in={(r, g, b)})" ) assert pwm_within_tol(actual_b1, exp_b), ( f"PWM_Frame_Blue1 {actual_b1} differs from expected {exp_b} " f"by more than ±{_band(exp_b)} (rgb_in={(r, g, b)})" ) assert pwm_within_tol(actual_b2, exp_b), ( f"PWM_Frame_Blue2 {actual_b2} differs from expected {exp_b} " f"by more than ±{_band(exp_b)} (rgb_in={(r, g, b)})" ) def assert_pwm_wo_comp_matches_rgb( self, rp, r: int, g: int, b: int, *, label: str = "" ) -> None: """Assert PWM_wo_Comp matches ``compute_pwm(r,g,b).pwm_no_comp``. ``PWM_wo_Comp`` carries the non-compensated PWM values, so the expected output is temperature-independent. NTC is still logged for visibility. """ suffix = f"_{label}" if label else "" expected = compute_pwm(r, g, b).pwm_no_comp # temp_c is unused for pwm_no_comp exp_r, exp_g, exp_b = expected rp(f"expected_pwm_wo_comp{suffix}", { "red": exp_r, "green": exp_g, "blue": exp_b, "rgb_in": (r, g, b), }) ntc_raw = self._fio.read_signal("Tj_Frame", "Tj_Frame_NTC") rp(f"ntc_raw_kelvin{suffix}", ntc_raw) # Let the firmware refresh PWM_wo_Comp's TX buffer before sampling it. time.sleep(PWM_SETTLE_SECONDS) decoded = self._fio.receive("PWM_wo_Comp") assert decoded is not None, "PWM_wo_Comp not received within timeout" actual_r = int(decoded["PWM_wo_Comp_Red"]) actual_g = int(decoded["PWM_wo_Comp_Green"]) actual_b = int(decoded["PWM_wo_Comp_Blue"]) rp(f"actual_pwm_wo_comp{suffix}", { "red": actual_r, "green": actual_g, "blue": actual_b, }) assert pwm_within_tol(actual_r, exp_r), ( f"PWM_wo_Comp_Red {actual_r} differs from expected {exp_r} " f"by more than ±{_band(exp_r)} (rgb_in={(r, g, b)})" ) assert pwm_within_tol(actual_g, exp_g), ( f"PWM_wo_Comp_Green {actual_g} differs from expected {exp_g} " f"by more than ±{_band(exp_g)} (rgb_in={(r, g, b)})" ) assert pwm_within_tol(actual_b, exp_b), ( f"PWM_wo_Comp_Blue {actual_b} differs from expected {exp_b} " f"by more than ±{_band(exp_b)} (rgb_in={(r, g, b)})" )