#!/usr/bin/env python3 """ LIN Auto-Addressing Test - Matching babylin behavior This test replicates the exact babylin sequence that successfully changed NAD. Key observations from babylin log: 1. Uses FreeFormat frame (ID 0x3C) 2. Frame structure: [NAD, PCI, SID, SupID_LSB, SupID_MSB, Subf, Param1, Param2] 3. Uses LIN 1.x Classic checksum 4. Loops the auto-addressing schedule multiple times (6+ iterations in babylin log) 5. NAD change happens after several iterations """ import argparse import logging import time from pylin import LinBusManager, LinDevice22 from pymumclient import MelexisUniversalMaster from config import * logging.basicConfig( level=logging.INFO, format='%(asctime)-15s %(levelname)-8s %(message)s' ) logger = logging.getLogger(__name__) def read_status(lin_dev): """Read ALM_Status frame""" try: response = lin_dev.send_message( master_to_slave=False, frame_id=LIN_FRAME_ID_ALM_STATUS, data_length=4, data=None ) if response and len(response) > 0: return response[0], response return None, None except Exception as e: logger.error(f"Failed to read status: {e}") return None, None def send_bsm_frame(transport_layer, subfunction, param1, param2): """ Send BSM-SNPD diagnostic frame with Classic checksum. Uses ld_put_raw() which sends with LIN 1.x Classic checksum (like babylin). send_message() uses Enhanced checksum which causes firmware to reject frames. """ try: data = bytearray([ BSM_NAD_BROADCAST, BSM_PCI, BSM_SID, BSM_SUPPLIER_ID_LSB, BSM_SUPPLIER_ID_MSB, subfunction, param1, param2 ]) transport_layer.ld_put_raw(data=data, baudrate=LIN_BAUDRATE) return True except Exception as e: logger.error(f"Failed to send BSM frame: {e}") return False def poll_status_frames(lin_dev, duration_seconds=AUTOADDRESSING_POLL_DURATION): """ Poll status frames for a specified duration, matching babylin's Pub_serv schedule. This acts as a keepalive and gives the ECU time to process. """ start_time = time.time() poll_count = 0 while (time.time() - start_time) < duration_seconds: try: lin_dev.send_message( master_to_slave=False, frame_id=LIN_FRAME_ID_ALM_STATUS, data_length=4, data=[] ) poll_count += 1 time.sleep(AUTOADDRESSING_STATUS_POLL_INTERVAL) except Exception: # Ignore timeout errors during polling time.sleep(AUTOADDRESSING_STATUS_POLL_INTERVAL) logger.debug(f" Polled status {poll_count} times over {duration_seconds:.1f}s") def run_auto_addressing_sequence(transport_layer, target_nad): """ Run one complete auto-addressing sequence matching babylin. Babylin sequence: 1. INIT (subf 0x01) 2. Wait 50ms 3. 16x NAD assignments (subf 0x02) with 20ms delays 4. STORE (subf 0x03) 5. FINALIZE (subf 0x04) Args: transport_layer: LIN transport layer for sending frames target_nad: Target NAD to assign (will be placed first in sequence) """ # Step 1: Initialize auto-addressing mode logger.debug(" INIT (0x01)") if not send_bsm_frame(transport_layer, BSM_SUBF_INIT, 0x02, 0xFF): return False time.sleep(BSM_INIT_DELAY) # Step 2: Send 16 NAD assignment frames # Put target NAD first in sequence to ensure it gets assigned nad_sequence = list(VALID_NAD_RANGE) # Move target_nad to the front of the sequence if target_nad in nad_sequence: nad_sequence.remove(target_nad) nad_sequence.insert(0, target_nad) for nad in nad_sequence: logger.debug(f" ASSIGN NAD 0x{nad:02X} (0x02)") if not send_bsm_frame(transport_layer, BSM_SUBF_ASSIGN, 0x02, nad): return False time.sleep(BSM_FRAME_DELAY) # Step 3: Store configuration logger.debug(" STORE (0x03)") if not send_bsm_frame(transport_layer, BSM_SUBF_STORE, 0x02, 0xFF): return False time.sleep(BSM_FRAME_DELAY) # Step 4: Finalize logger.debug(" FINALIZE (0x04)") if not send_bsm_frame(transport_layer, BSM_SUBF_FINALIZE, 0x02, 0xFF): return False time.sleep(BSM_FRAME_DELAY) return True def main(): parser = argparse.ArgumentParser(description='LIN Auto-Addressing Test') parser.add_argument('--host', default=MUM_HOST, help=f'MUM IP address (default: {MUM_HOST})') parser.add_argument('--iterations', type=int, default=AUTOADDRESSING_DEFAULT_ITERATIONS, help=f'Number of auto-addressing iterations (default: {AUTOADDRESSING_DEFAULT_ITERATIONS})') parser.add_argument('--check-interval', type=int, default=0, help='Check status every N iterations (0=only at end)') args = parser.parse_args() try: logger.info(f"Connecting to MUM at {args.host}...") # Initialize MUM mum = MelexisUniversalMaster() mum.open_all(args.host) power_control = mum.get_device(MUM_POWER_DEVICE) linmaster = mum.get_device(MUM_LIN_DEVICE) linmaster.setup() # Initialize LIN lin_bus = LinBusManager(linmaster) lin_dev = LinDevice22(lin_bus) lin_dev.baudrate = LIN_BAUDRATE # Get transport layer for sending with Classic checksum transport_layer = lin_dev.get_device("bus/transport_layer") # Power up power_control.power_up() time.sleep(0.5) logger.info("=" * 70) logger.info("MUM connected, LIN bus ready") # Read initial status initial_nad, _ = read_status(lin_dev) if initial_nad: logger.info(f"Initial NAD: 0x{initial_nad:02X}") # Calculate target NAD (different from initial NAD) valid_nads = list(VALID_NAD_RANGE) if initial_nad and initial_nad in valid_nads: valid_nads.remove(initial_nad) target_nad = valid_nads[0] # Pick the first available NAD logger.info(f"Target NAD: 0x{target_nad:02X}") logger.info("=" * 70) logger.info(f"Running {args.iterations} auto-addressing iterations...") logger.info("(Like babylin: iterate multiple times, then check result)") logger.info("=" * 70) # Run iterations with status polling (like babylin's schedule switching) for iteration in range(1, args.iterations + 1): logger.info(f"Iteration {iteration}/{args.iterations}") # Run BSM sequence (like babylin's LIN_AA schedule) if not run_auto_addressing_sequence(transport_layer, target_nad): logger.error("Auto-addressing sequence failed") break # Poll status frames between iterations (like babylin's Pub_serv schedule) # This gives ECU time to process and keeps communication alive logger.debug(f" Status polling between iterations...") poll_status_frames(lin_dev, duration_seconds=2.0) # Check status at intervals if requested if args.check_interval > 0 and iteration % args.check_interval == 0: nad, _ = read_status(lin_dev) if nad: logger.info(f" After iteration {iteration}: NAD = 0x{nad:02X}") if initial_nad and nad != initial_nad: logger.info("=" * 70) logger.info(f"SUCCESS! NAD changed from 0x{initial_nad:02X} to 0x{nad:02X}") logger.info(f"Change occurred after {iteration} iterations") logger.info("=" * 70) break # Final status check logger.info("=" * 70) logger.info("Checking final status...") time.sleep(1.0) final_nad, final_data = read_status(lin_dev) if final_nad: data_hex = ' '.join(f'{b:02X}' for b in final_data) logger.info(f"Final NAD: 0x{final_nad:02X}, Data: {data_hex}") if initial_nad and final_nad != initial_nad: logger.info("=" * 70) logger.info(f"SUCCESS! NAD changed from 0x{initial_nad:02X} to 0x{final_nad:02X}") logger.info("=" * 70) else: logger.info(f"NAD unchanged (still 0x{final_nad:02X})") logger.info("=" * 70) linmaster.teardown() logger.info("Done") except KeyboardInterrupt: logger.info("\nInterrupted by user") try: linmaster.teardown() except: pass except Exception as e: logger.error(f"Error: {e}", exc_info=True) if __name__ == "__main__": main()