271 lines
11 KiB
Python

from __future__ import annotations # Postponed annotations for forward references and speed
import os # For environment variables and filesystem checks
import pathlib # Path handling across platforms
from dataclasses import dataclass, field # Lightweight typed containers
from typing import Any, Dict, Optional # Type hints for clarity
import yaml # Safe YAML parsing for configuration files
@dataclass
class FlashConfig:
"""Flashing-related configuration.
enabled: Whether to trigger flashing at session start.
hex_path: Path to the firmware HEX file (if any).
"""
enabled: bool = False # Off by default
hex_path: Optional[str] = None # No default file path
@dataclass
class InterfaceConfig:
"""LIN interface configuration.
type: Adapter type — "mock" (simulated), "babylin" (legacy BabyLIN SDK), or "mum"
(Melexis Universal Master).
channel: Channel index to use (0-based in most SDKs); BabyLIN-specific.
bitrate: Effective LIN bitrate; the MUM uses this directly, the BabyLIN SDF may override.
dll_path: Legacy/optional pointer to vendor DLLs when using ctypes (not used by SDK wrapper).
node_name: Optional friendly name for display/logging.
func_names: Legacy mapping for ctypes function names; ignored by SDK wrapper.
sdf_path: Path to the SDF to load on connect (BabyLIN only).
schedule_nr: Schedule index to start after connect (BabyLIN only). -1 = skip.
host: MUM IP address (MUM only).
lin_device: MUM LIN device name (MUM only, default 'lin0').
power_device: MUM power-control device name (MUM only, default 'power_out0').
boot_settle_seconds: Delay after MUM power-up before sending the first frame.
frame_lengths: Optional map of frame_id (int) -> data length (int) used by the
MUM adapter when receiving slave-published frames.
"""
type: str = "mock" # "mock", "babylin", or "mum"
channel: int = 1
bitrate: int = 19200
dll_path: Optional[str] = None
node_name: Optional[str] = None
func_names: Dict[str, str] = field(default_factory=dict)
# BabyLIN-specific
sdf_path: Optional[str] = None
schedule_nr: int = 0
# MUM-specific
host: Optional[str] = None
lin_device: str = "lin0"
power_device: str = "power_out0"
boot_settle_seconds: float = 0.5
frame_lengths: Dict[int, int] = field(default_factory=dict)
# Optional LDF path; when set, tests/fixtures can load an LdfDatabase
# and the MUM adapter auto-merges the LDF's frame lengths into its map.
ldf_path: Optional[str] = None
@dataclass
class EcuTestConfig:
"""Top-level, fully-typed configuration for the framework.
interface: Settings for LIN communication (mock or BabyLIN).
flash: Optional flashing behavior configuration.
"""
interface: InterfaceConfig = field(default_factory=InterfaceConfig)
flash: FlashConfig = field(default_factory=FlashConfig)
# Serial power supply (e.g., Owon) configuration
# Test code can rely on these values to interact with PSU if enabled
power_supply: "PowerSupplyConfig" = field(default_factory=lambda: PowerSupplyConfig())
@dataclass
class PowerSupplyConfig:
"""Serial power supply configuration (e.g., Owon PSU).
enabled: Whether PSU tests/features should be active.
port: Serial device (e.g., COM4 on Windows, /dev/ttyUSB0 on Linux).
baudrate/timeout/eol: Basic line settings; eol often "\n" or "\r\n".
parity: One of "N", "E", "O".
stopbits: 1 or 2.
xonxoff/rtscts/dsrdtr: Flow control flags.
idn_substr: Optional substring to assert in *IDN? responses.
do_set/set_voltage/set_current: Optional demo/test actions.
"""
enabled: bool = False
port: Optional[str] = None
baudrate: int = 115200
timeout: float = 1.0
eol: str = "\n"
parity: str = "N"
stopbits: float = 1.0
xonxoff: bool = False
rtscts: bool = False
dsrdtr: bool = False
idn_substr: Optional[str] = None
do_set: bool = False
set_voltage: float = 1.0
set_current: float = 0.1
DEFAULT_CONFIG_RELATIVE = pathlib.Path("config") / "test_config.yaml" # Default config path relative to repo root
ENV_CONFIG_PATH = "ECU_TESTS_CONFIG" # Env var to override config file location
def _deep_update(base: Dict[str, Any], updates: Dict[str, Any]) -> Dict[str, Any]:
"""Recursively merge dict 'updates' into dict 'base'.
- Nested dicts are merged by key
- Scalars/collections at any level are replaced entirely
- Mutation occurs in-place on 'base' and the same object is returned
"""
for k, v in updates.items(): # Iterate all update keys
if isinstance(v, dict) and isinstance(base.get(k), dict): # Both sides dict → recurse
base[k] = _deep_update(base[k], v)
else: # Otherwise replace
base[k] = v
return base # Return the mutated base for chaining
def _to_dataclass(cfg: Dict[str, Any]) -> EcuTestConfig:
"""Convert a merged plain dict config into strongly-typed dataclasses.
Defensive casting is used to ensure correct types even if YAML contains strings.
"""
iface = cfg.get("interface", {}) # Sub-config for interface
flash = cfg.get("flash", {}) # Sub-config for flashing
psu = cfg.get("power_supply", {}) # Sub-config for power supply
# Coerce frame_lengths keys to int (YAML may parse numeric keys as int already,
# but accept hex strings like "0x0A: 8" too).
raw_fl = iface.get("frame_lengths", {}) or {}
frame_lengths: Dict[int, int] = {}
if isinstance(raw_fl, dict):
for k, v in raw_fl.items():
try:
key = int(k, 0) if isinstance(k, str) else int(k)
frame_lengths[key] = int(v)
except (TypeError, ValueError):
continue
return EcuTestConfig(
interface=InterfaceConfig(
type=str(iface.get("type", "mock")).lower(),
channel=int(iface.get("channel", 1)),
bitrate=int(iface.get("bitrate", 19200)),
dll_path=iface.get("dll_path"),
node_name=iface.get("node_name"),
func_names=dict(iface.get("func_names", {}) or {}),
sdf_path=iface.get("sdf_path"),
schedule_nr=int(iface.get("schedule_nr", 0)),
host=iface.get("host"),
lin_device=str(iface.get("lin_device", "lin0")),
power_device=str(iface.get("power_device", "power_out0")),
boot_settle_seconds=float(iface.get("boot_settle_seconds", 0.5)),
frame_lengths=frame_lengths,
ldf_path=iface.get("ldf_path"),
),
flash=FlashConfig(
enabled=bool(flash.get("enabled", False)), # Coerce to bool
hex_path=flash.get("hex_path"), # Optional hex path
),
power_supply=PowerSupplyConfig(
enabled=bool(psu.get("enabled", False)),
port=psu.get("port"),
baudrate=int(psu.get("baudrate", 115200)),
timeout=float(psu.get("timeout", 1.0)),
eol=str(psu.get("eol", "\n")),
parity=str(psu.get("parity", "N")),
stopbits=float(psu.get("stopbits", 1.0)),
xonxoff=bool(psu.get("xonxoff", False)),
rtscts=bool(psu.get("rtscts", False)),
dsrdtr=bool(psu.get("dsrdtr", False)),
idn_substr=psu.get("idn_substr"),
do_set=bool(psu.get("do_set", False)),
set_voltage=float(psu.get("set_voltage", 1.0)),
set_current=float(psu.get("set_current", 0.1)),
),
)
def load_config(workspace_root: Optional[str] = None, overrides: Optional[Dict[str, Any]] = None) -> EcuTestConfig:
"""Load configuration from YAML file, environment, overrides, or defaults.
Precedence (highest to lowest):
1. in-memory 'overrides' dict
2. YAML file specified by env var ECU_TESTS_CONFIG
3. YAML at ./config/test_config.yaml (relative to workspace_root)
4. built-in defaults in this function
"""
# Start with built-in defaults; minimal, safe baseline
base: Dict[str, Any] = {
"interface": {
"type": "mock", # mock by default for developer friendliness
"channel": 1,
"bitrate": 19200,
},
"flash": {
"enabled": False,
"hex_path": None,
},
"power_supply": {
"enabled": False,
"port": None,
"baudrate": 115200,
"timeout": 1.0,
"eol": "\n",
"parity": "N",
"stopbits": 1.0,
"xonxoff": False,
"rtscts": False,
"dsrdtr": False,
"idn_substr": None,
"do_set": False,
"set_voltage": 1.0,
"set_current": 0.1,
},
}
cfg_path: Optional[pathlib.Path] = None # Resolved configuration file path
# 2) Environment variable can point to any YAML file
env_path = os.getenv(ENV_CONFIG_PATH)
if env_path:
candidate = pathlib.Path(env_path)
if candidate.is_file(): # Only accept existing files
cfg_path = candidate
# 3) Fallback to default path under the provided workspace root
if cfg_path is None and workspace_root:
candidate = pathlib.Path(workspace_root) / DEFAULT_CONFIG_RELATIVE
if candidate.is_file():
cfg_path = candidate
# Load YAML file if we have one
if cfg_path and cfg_path.is_file():
with open(cfg_path, "r", encoding="utf-8") as f:
file_cfg = yaml.safe_load(f) or {} # Parse YAML safely; empty → {}
if isinstance(file_cfg, dict): # Only merge dicts
_deep_update(base, file_cfg)
# Optionally merge a dedicated PSU YAML if present (or env var path)
# This allows users to keep sensitive or machine-specific serial settings separate
psu_env = os.getenv("OWON_PSU_CONFIG")
psu_default = None
if workspace_root:
candidate = pathlib.Path(workspace_root) / "config" / "owon_psu.yaml"
if candidate.is_file():
psu_default = candidate
psu_path: Optional[pathlib.Path] = pathlib.Path(psu_env) if psu_env else psu_default
if psu_path and psu_path.is_file():
with open(psu_path, "r", encoding="utf-8") as f:
psu_cfg = yaml.safe_load(f) or {}
if isinstance(psu_cfg, dict):
base.setdefault("power_supply", {})
# Merge PSU YAML into power_supply section
base["power_supply"] = _deep_update(base["power_supply"], psu_cfg)
# 1) In-memory overrides always win
if overrides:
_deep_update(base, overrides)
# Convert to typed dataclasses for ergonomic downstream usage
return _to_dataclass(base)