Extends ``tests/hardware/alm_helpers.py`` into the full surface that
hardware tests use, so contributors write intent (``alm.send_color``,
``alm.read_led_state``, ``alm.wait_for_led_on``) and never touch
``fio.send("ALM_Req_A", AmbLight…=…)`` or LDF schema details.
What landed:
- AlmTester gains ~16 methods:
read_nad, read_voltage_status, read_thermal_status, read_nvm_status,
read_sig_comm_err, read_ntc_kelvin, read_ntc_celsius, read_pwm,
read_pwm_wo_comp, send_color, send_color_broadcast, save_color,
apply_saved_color, discard_saved_color, send_config, plus
wait_for_led_on / wait_for_led_off / wait_for_animating wrappers.
- The six IntEnum classes that ALM tests need (LedState, Mode, Update,
NVMStatus, VoltageStatus, ThermalStatus) are defined directly in
alm_helpers.py — tests get them via `from alm_helpers import …`.
- All ALM test files migrated:
test_mum_alm_animation.py, test_mum_alm_cases.py, test_overvolt.py,
swe5/test_anm_management.py, swe5/test_com_management.py
each now go through AlmTester for every common pattern.
- swe6/test_com_management.py: stays on `fio` (these tests probe
schema features not in the current production LDF and skip when
the LDF doesn't declare them) — change limited to LedState enum.
- test_mum_alm_animation_generated.py deleted — its "no-AlmTester"
demonstration loses its point now that AlmTester is the
recommended path.
- docs/19_frame_io_and_alm_helpers.md reframed: AlmTester is the
contributor surface; FrameIO is implementation detail. New API
reference + Cookbook examples + a note that the maintenance pact
is "LDF changes → AlmTester updates".
Verified: pytest --collect-only collects 87 tests cleanly; 40 unit
+ mock smoke tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
624 lines
24 KiB
Python
624 lines
24 KiB
Python
"""ALM_Node domain helpers — the single contributor-facing API for ALM tests.
|
||
|
||
This module is the **only thing test bodies should import** for ALM
|
||
hardware tests. It defines:
|
||
|
||
- Typed enums (:class:`LedState`, :class:`Mode`, :class:`Update`,
|
||
:class:`NVMStatus`, :class:`VoltageStatus`, :class:`ThermalStatus`)
|
||
that mirror the LDF's ``Signal_encoding_types`` blocks. These used
|
||
to be auto-generated by ``deprecated/gen_lin_api.py``; that path is now
|
||
retired and the generator + last-emitted file live under ``deprecated/``
|
||
for historical reference. Update these enums by hand when the LDF
|
||
gains new logical encodings.
|
||
- Module-level constants (LED_STATE_*, polling cadences, PWM tolerances).
|
||
- :class:`AlmTester` — bound to a ``FrameIO`` and a node NAD; the
|
||
per-signal read / per-action send surface plus the cross-frame
|
||
patterns (wait_for_state, measure ANIMATING, assert PWM matches the
|
||
rgb_to_pwm calculator).
|
||
- Pure utilities (:func:`ntc_kelvin_to_celsius`, :func:`pwm_within_tol`).
|
||
|
||
Test bodies should reach for ``AlmTester`` methods (``send_color``,
|
||
``read_led_state``, ``wait_for_led_on``, ``assert_pwm_matches_rgb``, …)
|
||
rather than calling ``fio.send("ALM_Req_A", AmbLightColourRed=…)``
|
||
directly. Strings flow through this module to ``FrameIO`` so tests never
|
||
need to know the LDF schema.
|
||
|
||
Maintenance pact: when the LDF gains a signal or a frame that tests
|
||
should use, the corresponding ``read_*`` / ``send_*`` method (and, if
|
||
needed, a new IntEnum) goes here. Tests never reach past this module.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import time
|
||
from enum import IntEnum
|
||
from typing import Optional, Union
|
||
|
||
from frame_io import FrameIO
|
||
from vendor.rgb_to_pwm import compute_pwm
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Typed enums (mirroring the LDF's Signal_encoding_types blocks for the ALM
|
||
# frames). Originally generated by ``deprecated/gen_lin_api.py`` from the LDF;
|
||
# inlined here when AlmTester became the single contributor-facing surface,
|
||
# so tests don't need to import a separate generated module at all. The
|
||
# generator and its previously-emitted output are kept under ``deprecated/``
|
||
# for historical reference. When the LDF gains a new logical encoding,
|
||
# update the matching IntEnum below by hand.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class Update(IntEnum):
|
||
"""LDF Signal_encoding_types.Update — AmbLightUpdate values."""
|
||
IMMEDIATE_COLOR_UPDATE = 0x00
|
||
COLOR_MEMORIZATION = 0x01
|
||
APPLY_MEMORIZED_COLOR = 0x02
|
||
DISCARD_MEMORIZED_COLOR = 0x03
|
||
|
||
|
||
class Mode(IntEnum):
|
||
"""LDF Signal_encoding_types.Mode — AmbLightMode values (logical + physical)."""
|
||
IMMEDIATE_SETPOINT = 0x00
|
||
FADING_EFFECT_1 = 0x01
|
||
FADING_EFFECT_2 = 0x02
|
||
TBD_0X03 = 0x03
|
||
TBD_0X04 = 0x04
|
||
# physical_value 5..63 scale=1.0 offset=0.0 unit='Not Used' — pass int directly
|
||
|
||
|
||
class LedState(IntEnum):
|
||
"""LDF Signal_encoding_types.LED_State — ALMLEDState values."""
|
||
LED_OFF = 0x00
|
||
LED_ANIMATING = 0x01
|
||
LED_ON = 0x02
|
||
RESERVED = 0x03
|
||
|
||
|
||
class VoltageStatus(IntEnum):
|
||
"""LDF Signal_encoding_types.VoltageStatus — ALMVoltageStatus values."""
|
||
NORMAL_VOLTAGE = 0x00
|
||
POWER_UNDERVOLTAGE = 0x01
|
||
POWER_OVERVOLTAGE = 0x02
|
||
RESERVED_0X03 = 0x03
|
||
RESERVED_0X04 = 0x04
|
||
RESERVED_0X05 = 0x05
|
||
RESERVED_0X06 = 0x06
|
||
RESERVED_0X07 = 0x07
|
||
RESERVED_0X08 = 0x08
|
||
RESERVED_0X09 = 0x09
|
||
RESERVED_0X0A = 0x0A
|
||
RESERVED_0X0B = 0x0B
|
||
RESERVED_0X0C = 0x0C
|
||
RESERVED_0X0D = 0x0D
|
||
RESERVED_0X0E = 0x0E
|
||
RESERVED_0X0F = 0x0F
|
||
|
||
|
||
class ThermalStatus(IntEnum):
|
||
"""LDF Signal_encoding_types.ThermalStatus — ALMThermalStatus values."""
|
||
NORMAL_TEMPERATURE = 0x00
|
||
THERMAL_DERATING = 0x01
|
||
THERMAL_SHUTDOWN = 0x02
|
||
RESERVED_0X03 = 0x03
|
||
RESERVED_0X04 = 0x04
|
||
RESERVED_0X05 = 0x05
|
||
RESERVED_0X06 = 0x06
|
||
RESERVED_0X07 = 0x07
|
||
RESERVED_0X08 = 0x08
|
||
RESERVED_0X09 = 0x09
|
||
RESERVED_0X0A = 0x0A
|
||
RESERVED_0X0B = 0x0B
|
||
RESERVED_0X0C = 0x0C
|
||
RESERVED_0X0D = 0x0D
|
||
RESERVED_0X0E = 0x0E
|
||
RESERVED_0X0F = 0x0F
|
||
|
||
|
||
class NVMStatus(IntEnum):
|
||
"""LDF Signal_encoding_types.NVMStatus — ALMNVMStatus values."""
|
||
NVM_OK = 0x00
|
||
NVM_NOK = 0x01
|
||
RESERVED_0X02 = 0x02
|
||
RESERVED_0X03 = 0x03
|
||
RESERVED_0X04 = 0x04
|
||
RESERVED_0X05 = 0x05
|
||
RESERVED_0X06 = 0x06
|
||
RESERVED_0X07 = 0x07
|
||
RESERVED_0X08 = 0x08
|
||
RESERVED_0X09 = 0x09
|
||
RESERVED_0X0A = 0x0A
|
||
RESERVED_0X0B = 0x0B
|
||
RESERVED_0X0C = 0x0C
|
||
RESERVED_0X0D = 0x0D
|
||
RESERVED_0X0E = 0x0E
|
||
RESERVED_0X0F = 0x0F
|
||
|
||
|
||
# --- ALMLEDState values (from LDF Signal_encoding_types: LED_State) --------
|
||
|
||
LED_STATE_OFF = 0
|
||
LED_STATE_ANIMATING = 1
|
||
LED_STATE_ON = 2
|
||
|
||
|
||
# --- Test pacing -----------------------------------------------------------
|
||
# The LIN bus runs at 10 ms frame periodicity, so polling faster than that
|
||
# returns the same buffered slave data. We poll every 50 ms (5 LIN periods)
|
||
# which keeps the loop responsive without hammering the bus, and we let the
|
||
# slave settle for 100 ms (10 LIN periods) before reading PWM_Frame /
|
||
# PWM_wo_Comp so the firmware has time to populate the TX buffer with fresh
|
||
# values.
|
||
STATE_POLL_INTERVAL = 0.05 # 50 ms — 5 LIN frame periods
|
||
STATE_RECEIVE_TIMEOUT = 0.2 # Per-poll receive timeout; keeps the loop iterating
|
||
STATE_TIMEOUT_DEFAULT = 1.0
|
||
PWM_SETTLE_SECONDS = 0.1 # 100 ms — wait for slave to refresh PWM_Frame TX buffer
|
||
DURATION_LSB_SECONDS = 0.2 # AmbLightDuration scaling per the ECU spec (1 step = 200 ms)
|
||
FORCE_OFF_SETTLE_SECONDS = 0.4 # Pause after the OFF command before yielding to the test
|
||
|
||
|
||
# --- PWM tolerances --------------------------------------------------------
|
||
# Tj_Frame_NTC reports the junction temperature in Kelvin; we convert to °C
|
||
# at runtime and feed compute_pwm() so the temperature compensation matches
|
||
# what the ECU is applying.
|
||
KELVIN_TO_CELSIUS_OFFSET = 273.15
|
||
PWM_ABS_TOL = 3277 # ±5% of 16-bit full scale (65535 * 0.05)
|
||
PWM_REL_TOL = 0.05 # ±5% of expected, whichever is larger
|
||
|
||
|
||
# --- Pure utilities --------------------------------------------------------
|
||
|
||
|
||
def ntc_kelvin_to_celsius(ntc_raw: int) -> float:
|
||
"""Convert a Tj_Frame_NTC reading (Kelvin) to °C for compute_pwm()."""
|
||
return float(ntc_raw) - KELVIN_TO_CELSIUS_OFFSET
|
||
|
||
|
||
def pwm_within_tol(actual: int, expected: int) -> bool:
|
||
"""True iff ``actual`` is within ``max(PWM_ABS_TOL, expected * PWM_REL_TOL)`` of ``expected``."""
|
||
return abs(actual - expected) <= max(PWM_ABS_TOL, abs(expected) * PWM_REL_TOL)
|
||
|
||
|
||
def _band(expected: int) -> int:
|
||
"""The numeric tolerance band used in PWM assertion error messages."""
|
||
return max(PWM_ABS_TOL, int(abs(expected) * PWM_REL_TOL))
|
||
|
||
|
||
# --- AlmTester -------------------------------------------------------------
|
||
|
||
|
||
class AlmTester:
|
||
"""ALM_Node helpers bound to a :class:`FrameIO` and a node NAD.
|
||
|
||
All test-side patterns for driving ALM_Req_A, polling ALM_Status, and
|
||
validating PWM frames live here. Internally everything goes through
|
||
``FrameIO`` — there is no direct frame-ref handling.
|
||
|
||
Typical fixture usage::
|
||
|
||
@pytest.fixture(scope="module")
|
||
def fio(lin, ldf): return FrameIO(lin, ldf)
|
||
|
||
@pytest.fixture(scope="module")
|
||
def alm(fio):
|
||
nad = fio.read_signal("ALM_Status", "ALMNadNo")
|
||
if nad is None:
|
||
pytest.skip("ECU not responding on ALM_Status")
|
||
return AlmTester(fio, int(nad))
|
||
"""
|
||
|
||
def __init__(self, fio: FrameIO, nad: int) -> None:
|
||
self._fio = fio
|
||
self._nad = int(nad)
|
||
|
||
# --- properties --------------------------------------------------------
|
||
|
||
@property
|
||
def fio(self) -> FrameIO:
|
||
return self._fio
|
||
|
||
@property
|
||
def nad(self) -> int:
|
||
return self._nad
|
||
|
||
# --- ALM_Status polling ------------------------------------------------
|
||
|
||
def read_led_state(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> int:
|
||
"""Read ALMLEDState; -1 if the read timed out.
|
||
|
||
Uses a short receive timeout so that polling loops don't stall for
|
||
a full second on a single missed frame.
|
||
"""
|
||
decoded = self._fio.receive("ALM_Status", timeout=timeout)
|
||
if decoded is None:
|
||
return -1
|
||
return int(decoded.get("ALMLEDState", -1))
|
||
|
||
def wait_for_state(
|
||
self, target: int, timeout: float
|
||
) -> tuple[bool, float, list[int]]:
|
||
"""Poll ALMLEDState until it equals ``target``, or until ``timeout``.
|
||
|
||
Returns ``(reached, elapsed_seconds, observed_state_history)``.
|
||
"""
|
||
seen: list[int] = []
|
||
deadline = time.monotonic() + timeout
|
||
start = time.monotonic()
|
||
while time.monotonic() < deadline:
|
||
st = self.read_led_state()
|
||
if not seen or seen[-1] != st:
|
||
seen.append(st)
|
||
if st == target:
|
||
return True, time.monotonic() - start, seen
|
||
time.sleep(STATE_POLL_INTERVAL)
|
||
return False, time.monotonic() - start, seen
|
||
|
||
def measure_animating_window(
|
||
self, max_wait: float
|
||
) -> tuple[Optional[float], list[int]]:
|
||
"""Wait for ANIMATING to start, then for it to leave ANIMATING.
|
||
|
||
Returns ``(animating_seconds, state_history)``. If ANIMATING is
|
||
never observed within ``max_wait``, returns ``(None, history)``.
|
||
"""
|
||
seen: list[int] = []
|
||
started_at: Optional[float] = None
|
||
deadline = time.monotonic() + max_wait
|
||
while time.monotonic() < deadline:
|
||
st = self.read_led_state()
|
||
if not seen or seen[-1] != st:
|
||
seen.append(st)
|
||
if started_at is None and st == LED_STATE_ANIMATING:
|
||
started_at = time.monotonic()
|
||
elif started_at is not None and st != LED_STATE_ANIMATING:
|
||
return time.monotonic() - started_at, seen
|
||
time.sleep(STATE_POLL_INTERVAL)
|
||
return None, seen
|
||
|
||
# --- ALM_Status per-signal readers ------------------------------------
|
||
#
|
||
# These mirror the signals carried by ALM_Status (the slave-published
|
||
# status frame). Each one does its own ``fio.receive`` so a test that
|
||
# only needs one signal doesn't pay for decoding the whole frame —
|
||
# though in practice ldfparser decodes the full frame either way.
|
||
|
||
def read_nad(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> Optional[int]:
|
||
"""Read ALMNadNo from ALM_Status; ``None`` on timeout."""
|
||
decoded = self._fio.receive("ALM_Status", timeout=timeout)
|
||
if decoded is None:
|
||
return None
|
||
return int(decoded["ALMNadNo"])
|
||
|
||
def read_voltage_status(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> Optional[int]:
|
||
"""Read ALMVoltageStatus from ALM_Status; ``None`` on timeout.
|
||
|
||
Compare against :class:`VoltageStatus` enum members
|
||
(``NORMAL_VOLTAGE`` / ``POWER_UNDERVOLTAGE`` / ``POWER_OVERVOLTAGE``).
|
||
"""
|
||
decoded = self._fio.receive("ALM_Status", timeout=timeout)
|
||
if decoded is None:
|
||
return None
|
||
return int(decoded["ALMVoltageStatus"])
|
||
|
||
def read_thermal_status(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> Optional[int]:
|
||
"""Read ALMThermalStatus from ALM_Status; ``None`` on timeout.
|
||
|
||
Compare against :class:`ThermalStatus` enum members
|
||
(``NORMAL_TEMPERATURE`` / ``THERMAL_DERATING`` / ``THERMAL_SHUTDOWN``).
|
||
"""
|
||
decoded = self._fio.receive("ALM_Status", timeout=timeout)
|
||
if decoded is None:
|
||
return None
|
||
return int(decoded["ALMThermalStatus"])
|
||
|
||
def read_nvm_status(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> Optional[int]:
|
||
"""Read ALMNVMStatus from ALM_Status; ``None`` on timeout.
|
||
|
||
Compare against :class:`NVMStatus` enum members.
|
||
"""
|
||
decoded = self._fio.receive("ALM_Status", timeout=timeout)
|
||
if decoded is None:
|
||
return None
|
||
return int(decoded["ALMNVMStatus"])
|
||
|
||
def read_sig_comm_err(self, timeout: float = STATE_RECEIVE_TIMEOUT) -> Optional[int]:
|
||
"""Read SigCommErr from ALM_Status; ``None`` on timeout."""
|
||
decoded = self._fio.receive("ALM_Status", timeout=timeout)
|
||
if decoded is None:
|
||
return None
|
||
return int(decoded["SigCommErr"])
|
||
|
||
# --- Tj_Frame readers --------------------------------------------------
|
||
|
||
def read_ntc_kelvin(self) -> Optional[int]:
|
||
"""Raw NTC reading in Kelvin from Tj_Frame_NTC; ``None`` on timeout."""
|
||
raw = self._fio.read_signal("Tj_Frame", "Tj_Frame_NTC")
|
||
return None if raw is None else int(raw)
|
||
|
||
def read_ntc_celsius(self) -> Optional[float]:
|
||
"""NTC reading converted to °C; ``None`` on timeout."""
|
||
raw = self.read_ntc_kelvin()
|
||
return None if raw is None else ntc_kelvin_to_celsius(raw)
|
||
|
||
# --- PWM readers ------------------------------------------------------
|
||
|
||
def read_pwm(self) -> Optional[tuple[int, int, int, int]]:
|
||
"""Read PWM_Frame channels; returns ``(R, G, B1, B2)`` or ``None``.
|
||
|
||
These are the temperature-compensated PWM values the ECU drives
|
||
the LED rails with. Compare against
|
||
:func:`compute_pwm(...).pwm_comp` for assertions, or use
|
||
:meth:`assert_pwm_matches_rgb` for the full pattern.
|
||
"""
|
||
decoded = self._fio.receive("PWM_Frame")
|
||
if decoded is None:
|
||
return None
|
||
return (
|
||
int(decoded["PWM_Frame_Red"]),
|
||
int(decoded["PWM_Frame_Green"]),
|
||
int(decoded["PWM_Frame_Blue1"]),
|
||
int(decoded["PWM_Frame_Blue2"]),
|
||
)
|
||
|
||
def read_pwm_wo_comp(self) -> Optional[tuple[int, int, int]]:
|
||
"""Read PWM_wo_Comp channels; returns ``(R, G, B)`` or ``None``.
|
||
|
||
These are the non-temperature-compensated PWM values — useful
|
||
when tests want to assert a deterministic mapping from RGB to
|
||
PWM without involving the runtime NTC reading.
|
||
"""
|
||
decoded = self._fio.receive("PWM_wo_Comp")
|
||
if decoded is None:
|
||
return None
|
||
return (
|
||
int(decoded["PWM_wo_Comp_Red"]),
|
||
int(decoded["PWM_wo_Comp_Green"]),
|
||
int(decoded["PWM_wo_Comp_Blue"]),
|
||
)
|
||
|
||
# --- ALM_Req_A senders (per-action, intent-shaped) --------------------
|
||
#
|
||
# ``send_color`` is the single workhorse. The save/apply/discard
|
||
# convenience methods are thin wrappers that pick the right
|
||
# ``AmbLightUpdate`` value and leave colour/intensity/mode to the
|
||
# caller.
|
||
|
||
def send_color(
|
||
self,
|
||
*,
|
||
red: int,
|
||
green: int,
|
||
blue: int,
|
||
intensity: int = 255,
|
||
mode: Union[Mode, int] = Mode.IMMEDIATE_SETPOINT,
|
||
update: Union[Update, int] = Update.IMMEDIATE_COLOR_UPDATE,
|
||
duration: int = 0,
|
||
lid_from: Optional[int] = None,
|
||
lid_to: Optional[int] = None,
|
||
) -> None:
|
||
"""Publish ALM_Req_A with the given colour / mode / update.
|
||
|
||
``lid_from`` and ``lid_to`` default to this tester's NAD —
|
||
i.e. unicast to the bound node. Pass them explicitly for
|
||
broadcast or range targeting (or use :meth:`send_color_broadcast`).
|
||
|
||
``mode``, ``update`` accept either :class:`Mode` / :class:`Update`
|
||
enum members or raw ints — both round-trip identically since the
|
||
enums inherit from ``IntEnum``.
|
||
"""
|
||
nad = self._nad
|
||
self._fio.send(
|
||
"ALM_Req_A",
|
||
AmbLightColourRed=int(red),
|
||
AmbLightColourGreen=int(green),
|
||
AmbLightColourBlue=int(blue),
|
||
AmbLightIntensity=int(intensity),
|
||
AmbLightUpdate=int(update),
|
||
AmbLightMode=int(mode),
|
||
AmbLightDuration=int(duration),
|
||
AmbLightLIDFrom=int(lid_from if lid_from is not None else nad),
|
||
AmbLightLIDTo=int(lid_to if lid_to is not None else nad),
|
||
)
|
||
|
||
def send_color_broadcast(
|
||
self,
|
||
*,
|
||
red: int,
|
||
green: int,
|
||
blue: int,
|
||
intensity: int = 255,
|
||
mode: Union[Mode, int] = Mode.IMMEDIATE_SETPOINT,
|
||
update: Union[Update, int] = Update.IMMEDIATE_COLOR_UPDATE,
|
||
duration: int = 0,
|
||
) -> None:
|
||
"""Broadcast: send the same colour to LID range 0x00–0xFF (every node)."""
|
||
self.send_color(
|
||
red=red, green=green, blue=blue, intensity=intensity,
|
||
mode=mode, update=update, duration=duration,
|
||
lid_from=0x00, lid_to=0xFF,
|
||
)
|
||
|
||
def save_color(
|
||
self,
|
||
*,
|
||
red: int,
|
||
green: int,
|
||
blue: int,
|
||
intensity: int = 255,
|
||
mode: Union[Mode, int] = Mode.IMMEDIATE_SETPOINT,
|
||
duration: int = 0,
|
||
) -> None:
|
||
"""Memorize a colour without applying it (Update.COLOR_MEMORIZATION).
|
||
|
||
The ECU buffers the request; the LED state does NOT change until
|
||
a later :meth:`apply_saved_color` call. Useful for testing the
|
||
save/apply semantics independently of the immediate-update path.
|
||
"""
|
||
self.send_color(
|
||
red=red, green=green, blue=blue, intensity=intensity,
|
||
mode=mode, update=Update.COLOR_MEMORIZATION, duration=duration,
|
||
)
|
||
|
||
def apply_saved_color(self) -> None:
|
||
"""Apply the previously-saved colour (Update.APPLY_MEMORIZED_COLOR)."""
|
||
self.send_color(
|
||
red=0, green=0, blue=0, intensity=0,
|
||
mode=Mode.IMMEDIATE_SETPOINT, update=Update.APPLY_MEMORIZED_COLOR,
|
||
duration=0,
|
||
)
|
||
|
||
def discard_saved_color(self) -> None:
|
||
"""Discard the previously-saved colour (Update.DISCARD_MEMORIZED_COLOR)."""
|
||
self.send_color(
|
||
red=0, green=0, blue=0, intensity=0,
|
||
mode=Mode.IMMEDIATE_SETPOINT, update=Update.DISCARD_MEMORIZED_COLOR,
|
||
duration=0,
|
||
)
|
||
|
||
# --- ConfigFrame sender -----------------------------------------------
|
||
|
||
def send_config(
|
||
self,
|
||
*,
|
||
calibration: int = 0,
|
||
enable_derating: int = 1,
|
||
enable_compensation: int = 1,
|
||
max_lm: int = 3840,
|
||
) -> None:
|
||
"""Publish ConfigFrame.
|
||
|
||
Defaults match the ECU's nominal config (derating + compensation
|
||
enabled, calibration off, max_lm=3840). Tests that want to toggle
|
||
a single field pass that one kwarg; the rest stay at nominal.
|
||
"""
|
||
self._fio.send(
|
||
"ConfigFrame",
|
||
ConfigFrame_Calibration=int(calibration),
|
||
ConfigFrame_EnableDerating=int(enable_derating),
|
||
ConfigFrame_EnableCompensation=int(enable_compensation),
|
||
ConfigFrame_MaxLM=int(max_lm),
|
||
)
|
||
|
||
# --- LED control ------------------------------------------------------
|
||
|
||
def force_off(self) -> None:
|
||
"""Drive the LED to OFF (intensity=0, mode=IMMEDIATE_SETPOINT) and pause briefly."""
|
||
self.send_color(red=0, green=0, blue=0, intensity=0, duration=0)
|
||
time.sleep(FORCE_OFF_SETTLE_SECONDS)
|
||
|
||
# --- wait_for_state convenience wrappers ------------------------------
|
||
|
||
def wait_for_led_on(self, timeout: float = STATE_TIMEOUT_DEFAULT) -> bool:
|
||
"""Block until ALMLEDState == LED_ON or timeout. Returns whether reached."""
|
||
reached, _, _ = self.wait_for_state(LedState.LED_ON, timeout=timeout)
|
||
return reached
|
||
|
||
def wait_for_led_off(self, timeout: float = STATE_TIMEOUT_DEFAULT) -> bool:
|
||
"""Block until ALMLEDState == LED_OFF or timeout. Returns whether reached."""
|
||
reached, _, _ = self.wait_for_state(LedState.LED_OFF, timeout=timeout)
|
||
return reached
|
||
|
||
def wait_for_animating(self, timeout: float = STATE_TIMEOUT_DEFAULT) -> bool:
|
||
"""Block until ALMLEDState == LED_ANIMATING or timeout. Returns whether reached."""
|
||
reached, _, _ = self.wait_for_state(LedState.LED_ANIMATING, timeout=timeout)
|
||
return reached
|
||
|
||
# --- PWM assertions ---------------------------------------------------
|
||
|
||
def assert_pwm_matches_rgb(
|
||
self, rp, r: int, g: int, b: int, *, label: str = ""
|
||
) -> None:
|
||
"""Assert PWM_Frame matches ``compute_pwm(r,g,b,temp_c=Tj_NTC-273.15).pwm_comp``.
|
||
|
||
Reads Tj_Frame_NTC (Kelvin), converts to °C, and feeds that
|
||
temperature into ``compute_pwm`` so the temperature compensation
|
||
matches what the ECU is applying. Both ``PWM_Frame_Blue1`` and
|
||
``PWM_Frame_Blue2`` are asserted equal to the expected blue PWM.
|
||
"""
|
||
suffix = f"_{label}" if label else ""
|
||
|
||
ntc_raw = self._fio.read_signal("Tj_Frame", "Tj_Frame_NTC")
|
||
assert ntc_raw is not None, "Tj_Frame not received within timeout"
|
||
temp_c = ntc_kelvin_to_celsius(int(ntc_raw))
|
||
rp(f"ntc_raw_kelvin{suffix}", int(ntc_raw))
|
||
rp(f"temp_c_used{suffix}", round(temp_c, 2))
|
||
|
||
expected = compute_pwm(r, g, b, temp_c=temp_c).pwm_comp
|
||
exp_r, exp_g, exp_b = expected
|
||
rp(f"expected_pwm{suffix}", {
|
||
"red": exp_r, "green": exp_g, "blue": exp_b,
|
||
"rgb_in": (r, g, b), "temp_c_used": round(temp_c, 2),
|
||
})
|
||
|
||
# Let the firmware refresh PWM_Frame's TX buffer with the new values.
|
||
time.sleep(PWM_SETTLE_SECONDS)
|
||
decoded = self._fio.receive("PWM_Frame")
|
||
assert decoded is not None, "PWM_Frame not received within timeout"
|
||
actual_r = int(decoded["PWM_Frame_Red"])
|
||
actual_g = int(decoded["PWM_Frame_Green"])
|
||
actual_b1 = int(decoded["PWM_Frame_Blue1"])
|
||
actual_b2 = int(decoded["PWM_Frame_Blue2"])
|
||
rp(f"actual_pwm{suffix}", {
|
||
"red": actual_r, "green": actual_g,
|
||
"blue1": actual_b1, "blue2": actual_b2,
|
||
})
|
||
|
||
assert pwm_within_tol(actual_r, exp_r), (
|
||
f"PWM_Frame_Red {actual_r} differs from expected {exp_r} "
|
||
f"by more than ±{_band(exp_r)} (rgb_in={(r, g, b)})"
|
||
)
|
||
assert pwm_within_tol(actual_g, exp_g), (
|
||
f"PWM_Frame_Green {actual_g} differs from expected {exp_g} "
|
||
f"by more than ±{_band(exp_g)} (rgb_in={(r, g, b)})"
|
||
)
|
||
assert pwm_within_tol(actual_b1, exp_b), (
|
||
f"PWM_Frame_Blue1 {actual_b1} differs from expected {exp_b} "
|
||
f"by more than ±{_band(exp_b)} (rgb_in={(r, g, b)})"
|
||
)
|
||
assert pwm_within_tol(actual_b2, exp_b), (
|
||
f"PWM_Frame_Blue2 {actual_b2} differs from expected {exp_b} "
|
||
f"by more than ±{_band(exp_b)} (rgb_in={(r, g, b)})"
|
||
)
|
||
|
||
def assert_pwm_wo_comp_matches_rgb(
|
||
self, rp, r: int, g: int, b: int, *, label: str = ""
|
||
) -> None:
|
||
"""Assert PWM_wo_Comp matches ``compute_pwm(r,g,b).pwm_no_comp``.
|
||
|
||
``PWM_wo_Comp`` carries the non-compensated PWM values, so the
|
||
expected output is temperature-independent. NTC is still logged
|
||
for visibility.
|
||
"""
|
||
suffix = f"_{label}" if label else ""
|
||
expected = compute_pwm(r, g, b).pwm_no_comp # temp_c is unused for pwm_no_comp
|
||
exp_r, exp_g, exp_b = expected
|
||
rp(f"expected_pwm_wo_comp{suffix}", {
|
||
"red": exp_r, "green": exp_g, "blue": exp_b, "rgb_in": (r, g, b),
|
||
})
|
||
|
||
ntc_raw = self._fio.read_signal("Tj_Frame", "Tj_Frame_NTC")
|
||
rp(f"ntc_raw_kelvin{suffix}", ntc_raw)
|
||
|
||
# Let the firmware refresh PWM_wo_Comp's TX buffer before sampling it.
|
||
time.sleep(PWM_SETTLE_SECONDS)
|
||
decoded = self._fio.receive("PWM_wo_Comp")
|
||
assert decoded is not None, "PWM_wo_Comp not received within timeout"
|
||
actual_r = int(decoded["PWM_wo_Comp_Red"])
|
||
actual_g = int(decoded["PWM_wo_Comp_Green"])
|
||
actual_b = int(decoded["PWM_wo_Comp_Blue"])
|
||
rp(f"actual_pwm_wo_comp{suffix}", {
|
||
"red": actual_r, "green": actual_g, "blue": actual_b,
|
||
})
|
||
|
||
assert pwm_within_tol(actual_r, exp_r), (
|
||
f"PWM_wo_Comp_Red {actual_r} differs from expected {exp_r} "
|
||
f"by more than ±{_band(exp_r)} (rgb_in={(r, g, b)})"
|
||
)
|
||
assert pwm_within_tol(actual_g, exp_g), (
|
||
f"PWM_wo_Comp_Green {actual_g} differs from expected {exp_g} "
|
||
f"by more than ±{_band(exp_g)} (rgb_in={(r, g, b)})"
|
||
)
|
||
assert pwm_within_tol(actual_b, exp_b), (
|
||
f"PWM_wo_Comp_Blue {actual_b} differs from expected {exp_b} "
|
||
f"by more than ±{_band(exp_b)} (rgb_in={(r, g, b)})"
|
||
)
|