"""End-to-end hardware test: power the ECU on via Owon PSU, switch to the 'CCO' schedule, and publish an RGB activation frame on ALM_Req_A (ID 0x0A). Frame layout (from vendor/4SEVEN_color_lib_test.ldf, ALM_Req_A @ ID 0x0A, 8B): byte 0 AmbLightColourRed (0..255) byte 1 AmbLightColourGreen (0..255) byte 2 AmbLightColourBlue (0..255) byte 3 AmbLightIntensity (0..255) byte 4 AmbLightUpdate (bits 0-1) | AmbLightMode (bits 2-7) byte 5 AmbLightDuration byte 6 AmbLightLIDFrom byte 7 AmbLightLIDTo Schedule 'CCO' polls ALM_Req_A every 10 ms (LDF line 252-263). Updating the master-published frame data via BLC_mon_set_xmit makes the next CCO slot publish the new RGB values. The slave answers ALM_Status (ID 0x11) which we use as evidence the bus is alive. """ from __future__ import annotations import time import pytest import serial from ecu_framework.config import EcuTestConfig from ecu_framework.lin.base import LinFrame, LinInterface from ecu_framework.power import OwonPSU, SerialParams pytestmark = [pytest.mark.hardware, pytest.mark.babylin] # Frame IDs from the LDF ALM_REQ_A_ID = 0x0A # master-published RGB control frame ALM_STATUS_ID = 0x11 # slave-published status frame # Default RGB activation: full white at full intensity, immediate setpoint. DEFAULT_RGB = (0xFF, 0xFF, 0xFF) DEFAULT_INTENSITY = 0xFF _PARITY_MAP = { "N": serial.PARITY_NONE, "E": serial.PARITY_EVEN, "O": serial.PARITY_ODD, } _STOPBITS_MAP = { 1: serial.STOPBITS_ONE, 2: serial.STOPBITS_TWO, } def _build_serial_params(psu_cfg) -> SerialParams: return SerialParams( baudrate=int(psu_cfg.baudrate), timeout=float(psu_cfg.timeout), parity=_PARITY_MAP.get(str(psu_cfg.parity or "N").upper(), serial.PARITY_NONE), stopbits=_STOPBITS_MAP.get(int(float(psu_cfg.stopbits or 1)), serial.STOPBITS_ONE), xonxoff=bool(psu_cfg.xonxoff), rtscts=bool(psu_cfg.rtscts), dsrdtr=bool(psu_cfg.dsrdtr), ) def _build_alm_req_a_payload( r: int, g: int, b: int, intensity: int = DEFAULT_INTENSITY, update: int = 0, # 0 = Immediate color update mode: int = 0, # 0 = Immediate Setpoint duration: int = 0, lid_from: int = 0, lid_to: int = 0, ) -> bytes: """Pack RGB-activation signals into the 8-byte ALM_Req_A payload.""" # byte 4 packs Update (2 bits, LSB) and Mode (6 bits) per the LDF offsets. byte4 = (update & 0x03) | ((mode & 0x3F) << 2) return bytes([ r & 0xFF, g & 0xFF, b & 0xFF, intensity & 0xFF, byte4 & 0xFF, duration & 0xFF, lid_from & 0xFF, lid_to & 0xFF, ]) def test_e2e_power_on_then_cco_rgb_activate(config: EcuTestConfig, lin: LinInterface, rp): """ Title: E2E - Power ECU, Switch to CCO Schedule, Activate RGB Description: Powers the ECU via the Owon PSU, switches the BabyLIN master to the 'CCO' schedule (which polls ALM_Req_A every 10 ms per the LDF), and publishes an RGB activation payload on ALM_Req_A (ID 0x0A). Captures bus traffic for a short window to confirm activity (typically the slave-published ALM_Status at ID 0x11 will appear). Requirements: REQ-E2E-CCO-RGB Test Steps: 1. Skip unless interface.type == 'babylin' 2. Skip unless power_supply is enabled and a port is configured 3. Open the PSU, IDN check, set V/I, enable output 4. Wait for ECU boot (boot_settle_seconds, default 1.5 s) 5. Stop any current schedule and start schedule 'CCO' 6. Build the ALM_Req_A payload from RGB+intensity+mode+update 7. Publish the payload via lin.send(LinFrame(0x0A, ...)) 8. Drain RX briefly and collect frames seen during the activation window 9. Assert at least one frame was observed; report IDs/lengths 10. Disable PSU output (always) Expected Result: - PSU comes up, ECU boots, CCO starts without SDK errors - At least one LIN frame is observed on the bus during the window - PSU output is disabled at end of test """ # Step 1 / 2: gate on hardware availability if config.interface.type != "babylin": pytest.skip("interface.type must be 'babylin' for this E2E test") psu_cfg = config.power_supply if not psu_cfg.enabled: pytest.skip("Power supply disabled in config.power_supply.enabled") if not psu_cfg.port: pytest.skip("No power supply 'port' configured (config.power_supply.port)") set_v = float(psu_cfg.set_voltage) print(f"Debug: set_v={set_v}, type={type(set_v)}") set_i = float(psu_cfg.set_current) print(f"Debug: set_i={set_i}, type={type(set_i)}") eol = psu_cfg.eol or "\n" port = str(psu_cfg.port).strip() boot_settle_s = float(getattr(psu_cfg, "boot_settle_seconds", 1.5)) activation_window_s = float(getattr(psu_cfg, "activation_window", 1.0)) # The adapter is hardware-only here; the test is gated on interface.type=='babylin'. send_command = getattr(lin, "send_command", None) start_schedule = getattr(lin, "start_schedule", None) if send_command is None or start_schedule is None: pytest.skip("LIN adapter does not expose send_command/start_schedule (need BabyLinInterface)") rgb = (DEFAULT_RGB[0], DEFAULT_RGB[1], DEFAULT_RGB[2]) rp("interface_type", config.interface.type) rp("psu_port", port) rp("set_voltage", set_v) rp("set_current", set_i) rp("schedule", "CCO") rp("rgb", list(rgb)) rp("intensity", DEFAULT_INTENSITY) sparams = _build_serial_params(psu_cfg) with OwonPSU(port, sparams, eol=eol) as psu: # Step 3: bring up PSU idn = psu.idn() rp("psu_idn", idn) assert isinstance(idn, str) and idn != "", "PSU *IDN? returned empty" if psu_cfg.idn_substr: assert str(psu_cfg.idn_substr).lower() in idn.lower(), ( f"PSU IDN does not contain expected substring " f"{psu_cfg.idn_substr!r}; got {idn!r}" ) psu.set_voltage(1, set_v) psu.set_current(1, set_i) try: psu.set_output(True) # Step 4: let ECU boot time.sleep(boot_settle_s) try: rp("measured_voltage", psu.measure_voltage()) rp("measured_current", psu.measure_current()) except Exception as meas_err: rp("measure_error", repr(meas_err)) # Step 5: switch to schedule CCO. The BabyLIN firmware only accepts # 'start schedule ;', so we resolve the name to its SDF index # via BLC_SDF_getScheduleNr (handled inside start_schedule). try: send_command("stop;") except Exception as e: rp("stop_error", repr(e)) cco_idx = start_schedule("CCO") rp("schedule_index", cco_idx) # Step 6 + 7: build and publish the RGB activation frame. payload = _build_alm_req_a_payload(*rgb, intensity=DEFAULT_INTENSITY) rp("tx_id", f"0x{ALM_REQ_A_ID:02X}") rp("tx_data_hex", payload.hex()) lin.send(LinFrame(id=ALM_REQ_A_ID, data=payload)) # Step 8: collect frames over the activation window. CCO publishes # ALM_Req_A (0x0A) and ALM_Status (0x11) every ~10 ms each. try: lin.flush() except Exception: pass seen = [] deadline = time.monotonic() + activation_window_s while time.monotonic() < deadline: rx = lin.receive(timeout=0.1) if rx is None: continue seen.append((rx.id, bytes(rx.data))) ids = sorted({fid for fid, _ in seen}) rp("rx_count", len(seen)) rp("rx_ids", [f"0x{i:02X}" for i in ids]) if seen: last_id, last_data = seen[-1] rp("rx_last_id", f"0x{last_id:02X}") rp("rx_last_data_hex", last_data.hex()) # Step 9: minimal liveness assertion. We don't require ALM_Status # specifically because absence-of-slave is a separate failure mode # to diagnose; we just want to know the bus moved at all. assert seen, ( f"No LIN frames observed during {activation_window_s:.2f}s on schedule CCO. " f"Check wiring, SDF, and that 'CCO' exists in the loaded SDF." ) if ALM_STATUS_ID in ids: rp("alm_status_seen", True) else: # Not asserted, but logged so the report shows it clearly. rp("alm_status_seen", False) finally: # Step 10: always cut power try: psu.set_output(False) except Exception as off_err: rp("set_output_off_error", repr(off_err))