Splits hardware-test concerns into two reusable modules and rebuilds
test_mum_alm_animation.py on top of them.
- frame_io.py — generic LDF-driven I/O class. Knows nothing about
ALM. Three access levels:
high: send/receive/read_signal by frame and signal name
mid: pack/unpack — bytes ↔ signals without I/O
low: send_raw/receive_raw — bypass the LDF entirely
Plus introspection: frame, frame_id, frame_length. Frame lookups
are cached per FrameIO instance.
- alm_helpers.py — ALM_Node domain helpers built on FrameIO.
AlmTester class bound to (fio, nad) exposes:
force_off, read_led_state, wait_for_state,
measure_animating_window, assert_pwm_matches_rgb,
assert_pwm_wo_comp_matches_rgb
Plus pure utilities (ntc_kelvin_to_celsius, pwm_within_tol) and
the LED-state / pacing / PWM-tolerance constants. PWM assertions
use vendor/rgb_to_pwm.py (compute_pwm) at the runtime
Tj_Frame_NTC temperature.
- test_mum_alm_animation.py rewritten:
* fio + alm fixtures replace the previous dict-based _ctx
* SETUP / PROCEDURE / ASSERT / TEARDOWN section markers
* test_mode1_fade now wraps its ConfigFrame change in
try/finally so EnableCompensation is restored even on
assertion failure (was leaking state into later tests)
* test_disable_compensation_pwm_wo_comp uses the four-phase
pattern explicitly
Sibling imports work because pytest's default rootdir mode puts the
test file's directory on sys.path.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
278 lines
11 KiB
Python
278 lines
11 KiB
Python
"""ALM_Node domain helpers built on :class:`frame_io.FrameIO`.
|
|
|
|
This module is intentionally narrow: it knows about the ALM_Node frames
|
|
defined in the project's LDF (``ALM_Req_A``, ``ALM_Status``, ``Tj_Frame``,
|
|
``PWM_Frame``, ``PWM_wo_Comp``, ``ConfigFrame``) and how the test suite
|
|
wants to interact with them. Generic LDF-driven I/O lives in
|
|
:mod:`frame_io` so it can be reused across other ECUs.
|
|
|
|
Public surface:
|
|
|
|
- Module-level constants (LED_STATE_*, polling cadences, PWM tolerances)
|
|
- :class:`AlmTester` — bound to a ``FrameIO`` and a ``NAD``; encodes the
|
|
test patterns (force off, wait for state, measure ANIMATING, assert
|
|
PWM matches the rgb_to_pwm calculator)
|
|
- Pure utilities (:func:`ntc_kelvin_to_celsius`, :func:`pwm_within_tol`)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from typing import Optional
|
|
|
|
from frame_io import FrameIO
|
|
from vendor.rgb_to_pwm import compute_pwm
|
|
|
|
|
|
# --- ALMLEDState values (from LDF Signal_encoding_types: LED_State) --------
|
|
|
|
LED_STATE_OFF = 0
|
|
LED_STATE_ANIMATING = 1
|
|
LED_STATE_ON = 2
|
|
|
|
|
|
# --- Test pacing -----------------------------------------------------------
|
|
# The LIN bus runs at 10 ms frame periodicity, so polling faster than that
|
|
# returns the same buffered slave data. We poll every 50 ms (5 LIN periods)
|
|
# which keeps the loop responsive without hammering the bus, and we let the
|
|
# slave settle for 100 ms (10 LIN periods) before reading PWM_Frame /
|
|
# PWM_wo_Comp so the firmware has time to populate the TX buffer with fresh
|
|
# values.
|
|
STATE_POLL_INTERVAL = 0.05 # 50 ms — 5 LIN frame periods
|
|
STATE_RECEIVE_TIMEOUT = 0.2 # Per-poll receive timeout; keeps the loop iterating
|
|
STATE_TIMEOUT_DEFAULT = 1.0
|
|
PWM_SETTLE_SECONDS = 0.1 # 100 ms — wait for slave to refresh PWM_Frame TX buffer
|
|
DURATION_LSB_SECONDS = 0.2 # AmbLightDuration scaling per the ECU spec (1 step = 200 ms)
|
|
FORCE_OFF_SETTLE_SECONDS = 0.4 # Pause after the OFF command before yielding to the test
|
|
|
|
|
|
# --- PWM tolerances --------------------------------------------------------
|
|
# Tj_Frame_NTC reports the junction temperature in Kelvin; we convert to °C
|
|
# at runtime and feed compute_pwm() so the temperature compensation matches
|
|
# what the ECU is applying.
|
|
KELVIN_TO_CELSIUS_OFFSET = 273.15
|
|
PWM_ABS_TOL = 3277 # ±5% of 16-bit full scale (65535 * 0.05)
|
|
PWM_REL_TOL = 0.05 # ±5% of expected, whichever is larger
|
|
|
|
|
|
# --- Pure utilities --------------------------------------------------------
|
|
|
|
|
|
def ntc_kelvin_to_celsius(ntc_raw: int) -> float:
|
|
"""Convert a Tj_Frame_NTC reading (Kelvin) to °C for compute_pwm()."""
|
|
return float(ntc_raw) - KELVIN_TO_CELSIUS_OFFSET
|
|
|
|
|
|
def pwm_within_tol(actual: int, expected: int) -> bool:
|
|
"""True iff ``actual`` is within ``max(PWM_ABS_TOL, expected * PWM_REL_TOL)`` of ``expected``."""
|
|
return abs(actual - expected) <= max(PWM_ABS_TOL, abs(expected) * PWM_REL_TOL)
|
|
|
|
|
|
def _band(expected: int) -> int:
|
|
"""The numeric tolerance band used in PWM assertion error messages."""
|
|
return max(PWM_ABS_TOL, int(abs(expected) * PWM_REL_TOL))
|
|
|
|
|
|
# --- AlmTester -------------------------------------------------------------
|
|
|
|
|
|
class AlmTester:
|
|
"""ALM_Node helpers bound to a :class:`FrameIO` and a node NAD.
|
|
|
|
All test-side patterns for driving ALM_Req_A, polling ALM_Status, and
|
|
validating PWM frames live here. Internally everything goes through
|
|
``FrameIO`` — there is no direct frame-ref handling.
|
|
|
|
Typical fixture usage::
|
|
|
|
@pytest.fixture(scope="module")
|
|
def fio(lin, ldf): return FrameIO(lin, ldf)
|
|
|
|
@pytest.fixture(scope="module")
|
|
def alm(fio):
|
|
nad = fio.read_signal("ALM_Status", "ALMNadNo")
|
|
if nad is None:
|
|
pytest.skip("ECU not responding on ALM_Status")
|
|
return AlmTester(fio, int(nad))
|
|
"""
|
|
|
|
def __init__(self, fio: FrameIO, nad: int) -> None:
|
|
self._fio = fio
|
|
self._nad = int(nad)
|
|
|
|
# --- properties --------------------------------------------------------
|
|
|
|
@property
|
|
def fio(self) -> FrameIO:
|
|
return self._fio
|
|
|
|
@property
|
|
def nad(self) -> int:
|
|
return self._nad
|
|
|
|
# --- ALM_Status polling ------------------------------------------------
|
|
|
|
def read_led_state(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> int:
|
|
"""Read ALMLEDState; -1 if the read timed out.
|
|
|
|
Uses a short receive timeout so that polling loops don't stall for
|
|
a full second on a single missed frame.
|
|
"""
|
|
decoded = self._fio.receive("ALM_Status", timeout=timeout)
|
|
if decoded is None:
|
|
return -1
|
|
return int(decoded.get("ALMLEDState", -1))
|
|
|
|
def wait_for_state(
|
|
self, target: int, timeout: float
|
|
) -> tuple[bool, float, list[int]]:
|
|
"""Poll ALMLEDState until it equals ``target``, or until ``timeout``.
|
|
|
|
Returns ``(reached, elapsed_seconds, observed_state_history)``.
|
|
"""
|
|
seen: list[int] = []
|
|
deadline = time.monotonic() + timeout
|
|
start = time.monotonic()
|
|
while time.monotonic() < deadline:
|
|
st = self.read_led_state()
|
|
if not seen or seen[-1] != st:
|
|
seen.append(st)
|
|
if st == target:
|
|
return True, time.monotonic() - start, seen
|
|
time.sleep(STATE_POLL_INTERVAL)
|
|
return False, time.monotonic() - start, seen
|
|
|
|
def measure_animating_window(
|
|
self, max_wait: float
|
|
) -> tuple[Optional[float], list[int]]:
|
|
"""Wait for ANIMATING to start, then for it to leave ANIMATING.
|
|
|
|
Returns ``(animating_seconds, state_history)``. If ANIMATING is
|
|
never observed within ``max_wait``, returns ``(None, history)``.
|
|
"""
|
|
seen: list[int] = []
|
|
started_at: Optional[float] = None
|
|
deadline = time.monotonic() + max_wait
|
|
while time.monotonic() < deadline:
|
|
st = self.read_led_state()
|
|
if not seen or seen[-1] != st:
|
|
seen.append(st)
|
|
if started_at is None and st == LED_STATE_ANIMATING:
|
|
started_at = time.monotonic()
|
|
elif started_at is not None and st != LED_STATE_ANIMATING:
|
|
return time.monotonic() - started_at, seen
|
|
time.sleep(STATE_POLL_INTERVAL)
|
|
return None, seen
|
|
|
|
# --- LED control ------------------------------------------------------
|
|
|
|
def force_off(self) -> None:
|
|
"""Drive the LED to OFF (mode=0, intensity=0) and pause briefly."""
|
|
self._fio.send(
|
|
"ALM_Req_A",
|
|
AmbLightColourRed=0, AmbLightColourGreen=0, AmbLightColourBlue=0,
|
|
AmbLightIntensity=0,
|
|
AmbLightUpdate=0, AmbLightMode=0, AmbLightDuration=0,
|
|
AmbLightLIDFrom=self._nad, AmbLightLIDTo=self._nad,
|
|
)
|
|
time.sleep(FORCE_OFF_SETTLE_SECONDS)
|
|
|
|
# --- PWM assertions ---------------------------------------------------
|
|
|
|
def assert_pwm_matches_rgb(
|
|
self, rp, r: int, g: int, b: int, *, label: str = ""
|
|
) -> None:
|
|
"""Assert PWM_Frame matches ``compute_pwm(r,g,b,temp_c=Tj_NTC-273.15).pwm_comp``.
|
|
|
|
Reads Tj_Frame_NTC (Kelvin), converts to °C, and feeds that
|
|
temperature into ``compute_pwm`` so the temperature compensation
|
|
matches what the ECU is applying. Both ``PWM_Frame_Blue1`` and
|
|
``PWM_Frame_Blue2`` are asserted equal to the expected blue PWM.
|
|
"""
|
|
suffix = f"_{label}" if label else ""
|
|
|
|
ntc_raw = self._fio.read_signal("Tj_Frame", "Tj_Frame_NTC")
|
|
assert ntc_raw is not None, "Tj_Frame not received within timeout"
|
|
temp_c = ntc_kelvin_to_celsius(int(ntc_raw))
|
|
rp(f"ntc_raw_kelvin{suffix}", int(ntc_raw))
|
|
rp(f"temp_c_used{suffix}", round(temp_c, 2))
|
|
|
|
expected = compute_pwm(r, g, b, temp_c=temp_c).pwm_comp
|
|
exp_r, exp_g, exp_b = expected
|
|
rp(f"expected_pwm{suffix}", {
|
|
"red": exp_r, "green": exp_g, "blue": exp_b,
|
|
"rgb_in": (r, g, b), "temp_c_used": round(temp_c, 2),
|
|
})
|
|
|
|
# Let the firmware refresh PWM_Frame's TX buffer with the new values.
|
|
time.sleep(PWM_SETTLE_SECONDS)
|
|
decoded = self._fio.receive("PWM_Frame")
|
|
assert decoded is not None, "PWM_Frame not received within timeout"
|
|
actual_r = int(decoded["PWM_Frame_Red"])
|
|
actual_g = int(decoded["PWM_Frame_Green"])
|
|
actual_b1 = int(decoded["PWM_Frame_Blue1"])
|
|
actual_b2 = int(decoded["PWM_Frame_Blue2"])
|
|
rp(f"actual_pwm{suffix}", {
|
|
"red": actual_r, "green": actual_g,
|
|
"blue1": actual_b1, "blue2": actual_b2,
|
|
})
|
|
|
|
assert pwm_within_tol(actual_r, exp_r), (
|
|
f"PWM_Frame_Red {actual_r} differs from expected {exp_r} "
|
|
f"by more than ±{_band(exp_r)} (rgb_in={(r, g, b)})"
|
|
)
|
|
assert pwm_within_tol(actual_g, exp_g), (
|
|
f"PWM_Frame_Green {actual_g} differs from expected {exp_g} "
|
|
f"by more than ±{_band(exp_g)} (rgb_in={(r, g, b)})"
|
|
)
|
|
assert pwm_within_tol(actual_b1, exp_b), (
|
|
f"PWM_Frame_Blue1 {actual_b1} differs from expected {exp_b} "
|
|
f"by more than ±{_band(exp_b)} (rgb_in={(r, g, b)})"
|
|
)
|
|
assert pwm_within_tol(actual_b2, exp_b), (
|
|
f"PWM_Frame_Blue2 {actual_b2} differs from expected {exp_b} "
|
|
f"by more than ±{_band(exp_b)} (rgb_in={(r, g, b)})"
|
|
)
|
|
|
|
def assert_pwm_wo_comp_matches_rgb(
|
|
self, rp, r: int, g: int, b: int, *, label: str = ""
|
|
) -> None:
|
|
"""Assert PWM_wo_Comp matches ``compute_pwm(r,g,b).pwm_no_comp``.
|
|
|
|
``PWM_wo_Comp`` carries the non-compensated PWM values, so the
|
|
expected output is temperature-independent. NTC is still logged
|
|
for visibility.
|
|
"""
|
|
suffix = f"_{label}" if label else ""
|
|
expected = compute_pwm(r, g, b).pwm_no_comp # temp_c is unused for pwm_no_comp
|
|
exp_r, exp_g, exp_b = expected
|
|
rp(f"expected_pwm_wo_comp{suffix}", {
|
|
"red": exp_r, "green": exp_g, "blue": exp_b, "rgb_in": (r, g, b),
|
|
})
|
|
|
|
ntc_raw = self._fio.read_signal("Tj_Frame", "Tj_Frame_NTC")
|
|
rp(f"ntc_raw_kelvin{suffix}", ntc_raw)
|
|
|
|
# Let the firmware refresh PWM_wo_Comp's TX buffer before sampling it.
|
|
time.sleep(PWM_SETTLE_SECONDS)
|
|
decoded = self._fio.receive("PWM_wo_Comp")
|
|
assert decoded is not None, "PWM_wo_Comp not received within timeout"
|
|
actual_r = int(decoded["PWM_wo_Comp_Red"])
|
|
actual_g = int(decoded["PWM_wo_Comp_Green"])
|
|
actual_b = int(decoded["PWM_wo_Comp_Blue"])
|
|
rp(f"actual_pwm_wo_comp{suffix}", {
|
|
"red": actual_r, "green": actual_g, "blue": actual_b,
|
|
})
|
|
|
|
assert pwm_within_tol(actual_r, exp_r), (
|
|
f"PWM_wo_Comp_Red {actual_r} differs from expected {exp_r} "
|
|
f"by more than ±{_band(exp_r)} (rgb_in={(r, g, b)})"
|
|
)
|
|
assert pwm_within_tol(actual_g, exp_g), (
|
|
f"PWM_wo_Comp_Green {actual_g} differs from expected {exp_g} "
|
|
f"by more than ±{_band(exp_g)} (rgb_in={(r, g, b)})"
|
|
)
|
|
assert pwm_within_tol(actual_b, exp_b), (
|
|
f"PWM_wo_Comp_Blue {actual_b} differs from expected {exp_b} "
|
|
f"by more than ±{_band(exp_b)} (rgb_in={(r, g, b)})"
|
|
)
|