diff --git a/ecu_framework/power/__init__.py b/ecu_framework/power/__init__.py index 1ab28d9..fb1756e 100644 --- a/ecu_framework/power/__init__.py +++ b/ecu_framework/power/__init__.py @@ -1,13 +1,30 @@ -"""Power control helpers for ECU tests. - -Currently includes Owon PSU serial SCPI controller. -""" - -from .owon_psu import SerialParams, OwonPSU, scan_ports, auto_detect - -__all__ = [ - "SerialParams", - "OwonPSU", - "scan_ports", - "auto_detect", -] +"""Power control helpers for ECU tests. + +Currently includes Owon PSU serial SCPI controller plus a cross- +platform port resolver so the same bench config works on Windows, +Linux, and WSL. +""" + +from .owon_psu import ( + SerialParams, + OwonPSU, + scan_ports, + auto_detect, + try_idn_on_port, + candidate_ports, + resolve_port, + windows_com_to_linux, + linux_serial_to_windows, +) + +__all__ = [ + "SerialParams", + "OwonPSU", + "scan_ports", + "auto_detect", + "try_idn_on_port", + "candidate_ports", + "resolve_port", + "windows_com_to_linux", + "linux_serial_to_windows", +] diff --git a/ecu_framework/power/owon_psu.py b/ecu_framework/power/owon_psu.py index 2161dec..27e8c25 100644 --- a/ecu_framework/power/owon_psu.py +++ b/ecu_framework/power/owon_psu.py @@ -1,193 +1,592 @@ -"""Owon PSU SCPI control over raw serial (pyserial). - -This module provides a small, programmatic API suitable for tests: - -- OwonPSU: context-manageable controller class -- scan_ports(): find devices responding to *IDN? -- auto_detect(): select the first matching device by IDN substring - -Behavior follows the working quick demo example (serial): -- Both commands and queries are terminated with a newline ("\n" by default). -- Queries use readline() to fetch a single-line response. -- Command set uses: 'output 0/1', 'output?', 'SOUR:VOLT ', 'SOUR:CURR ', 'MEAS:VOLT?', 'MEAS:CURR?', '*IDN?' -""" -from __future__ import annotations - -from dataclasses import dataclass -from time import sleep -from typing import Iterable, Optional - -import serial -from serial import Serial -from serial.tools import list_ports - - -@dataclass -class SerialParams: - baudrate: int = 115200 - timeout: float = 1.0 # seconds - bytesize: int = serial.EIGHTBITS - parity: str = serial.PARITY_NONE - stopbits: float = serial.STOPBITS_ONE - xonxoff: bool = False - rtscts: bool = False - dsrdtr: bool = False - write_timeout: float = 1.0 # seconds - - -class OwonPSU: - def __init__(self, port: str, params: SerialParams | None = None, eol: str = "\n") -> None: - self.port = port - self.params = params or SerialParams() - self.eol = eol - self._ser: Optional[Serial] = None - - def open(self) -> None: - if self._ser and self._ser.is_open: - return - ser = Serial() - ser.port = self.port - ser.baudrate = self.params.baudrate - ser.bytesize = self.params.bytesize - ser.parity = self.params.parity - ser.stopbits = self.params.stopbits - ser.xonxoff = self.params.xonxoff - ser.rtscts = self.params.rtscts - ser.dsrdtr = self.params.dsrdtr - ser.timeout = self.params.timeout - ser.write_timeout = self.params.write_timeout - ser.open() - self._ser = ser - - def close(self) -> None: - if self._ser and self._ser.is_open: - try: - self._ser.close() - finally: - self._ser = None - - def __enter__(self) -> "OwonPSU": - self.open() - return self - - def __exit__(self, exc_type, exc, tb) -> None: - self.close() - - @property - def is_open(self) -> bool: - return bool(self._ser and self._ser.is_open) - - # ---- low-level ops ---- - def write(self, cmd: str) -> None: - """Write a SCPI command (append eol).""" - if not self._ser: - raise RuntimeError("Port is not open") - data = (cmd + self.eol).encode("ascii", errors="ignore") - self._ser.write(data) - self._ser.flush() - - def query(self, q: str) -> str: - """Send a query with terminator and return a single-line response using readline().""" - if not self._ser: - raise RuntimeError("Port is not open") - # clear buffers to avoid stale data - try: - self._ser.reset_input_buffer() - self._ser.reset_output_buffer() - except Exception: - pass - self._ser.write((q + self.eol).encode("ascii", errors="ignore")) - self._ser.flush() - line = self._ser.readline().strip() - return line.decode("ascii", errors="ignore") - - # ---- high-level ops ---- - def idn(self) -> str: - return self.query("*IDN?") - - def set_voltage(self, channel: int, volts: float) -> None: - # Using SOUR:VOLT per working example - self.write(f"SOUR:VOLT {volts:.3f}") - - def set_current(self, channel: int, amps: float) -> None: - # Using SOUR:CURR per working example - self.write(f"SOUR:CURR {amps:.3f}") - - def set_output(self, on: bool) -> None: - # Using 'output 1/0' per working example - self.write("output 1" if on else "output 0") - - def output_status(self) -> str: - return self.query("output?") - - def measure_voltage(self) -> str: - return self.query("MEAS:VOLT?") - - def measure_current(self) -> str: - return self.query("MEAS:CURR?") - - -# ------- discovery helpers ------- - -def try_idn_on_port(port: str, params: SerialParams) -> str: - dev: Optional[Serial] = None - try: - dev = Serial() - dev.port = port - dev.baudrate = params.baudrate - dev.bytesize = params.bytesize - dev.parity = params.parity - dev.stopbits = params.stopbits - dev.xonxoff = params.xonxoff - dev.rtscts = params.rtscts - dev.dsrdtr = params.dsrdtr - dev.timeout = params.timeout - dev.write_timeout = params.write_timeout - dev.open() - # Query with newline terminator and read a single line - dev.reset_input_buffer(); dev.reset_output_buffer() - dev.write(b"*IDN?\n"); dev.flush() - line = dev.readline().strip() - return line.decode("ascii", errors="ignore") - except Exception: - return "" - finally: - if dev and dev.is_open: - try: - dev.close() - except Exception: - pass - - -def scan_ports(params: SerialParams | None = None) -> list[tuple[str, str]]: - """Return [(port, idn_response), ...] for ports that responded.""" - params = params or SerialParams() - results: list[tuple[str, str]] = [] - for p in list_ports.comports(): - dev = p.device - resp = try_idn_on_port(dev, params) - if resp: - results.append((dev, resp)) - return results - - -def auto_detect(params: SerialParams | None = None, idn_substr: str | None = None) -> Optional[str]: - """Return the first port whose *IDN? contains idn_substr (case-insensitive), else first responder.""" - params = params or SerialParams() - matches = scan_ports(params) - if not matches: - return None - if idn_substr: - isub = idn_substr.lower() - for port, idn in matches: - if isub in idn.lower(): - return port - return matches[0][0] - - -__all__ = [ - "SerialParams", - "OwonPSU", - "scan_ports", - "auto_detect", -] +"""Owon PSU SCPI control over a raw serial link (pyserial). + +WHAT THIS MODULE GIVES YOU +-------------------------- +- :class:`SerialParams` — a small dataclass for the pyserial settings. + Construct directly, or use :meth:`SerialParams.from_config` to build + one from the project's central PSU configuration. +- :class:`OwonPSU` — context-managed controller. Wraps a serial + port and exposes the PSU's SCPI dialect as Python methods. +- :func:`scan_ports` — probe every serial port on the host for a + device that answers ``*IDN?``. +- :func:`auto_detect` — pick a port by IDN substring, or fall back + to the first responder. + +THE SCPI DIALECT THIS PSU EXPECTS +--------------------------------- +Owon's PSU firmware speaks a near-SCPI dialect over a plain newline- +terminated serial link. The commands this module uses (matching the +working bench example): + + *IDN? → identification string + output 1 / output 0 → enable / disable the output (lowercase, NOT + the standard ``OUTP ON`` / ``OUTP OFF``) + output? → output state (returns 'ON'/'OFF' or '1'/'0') + SOUR:VOLT → set the voltage setpoint, volts + SOUR:CURR → set the current limit, amps + MEAS:VOLT? → read measured voltage (string, may include 'V') + MEAS:CURR? → read measured current (string, may include 'A') + +Both commands and queries are terminated with ``\\n`` (configurable via +the ``eol`` argument). Queries use ``readline()`` to fetch a single- +line response. + +SAFETY: ``OwonPSU`` defaults to ``safe_off_on_close=True``, which sends +``output 0`` before closing the port. That way an aborted test or an +exception cannot leave the bench powered on after the controller is +disposed. +""" +from __future__ import annotations + +import glob +import os +import platform +import re +from dataclasses import dataclass +from typing import Optional + +import serial +from serial import Serial +from serial.tools import list_ports + + +# ╔══════════════════════════════════════════════════════════════════════╗ +# ║ Mappings: human-friendly config strings → pyserial constants ║ +# ╚══════════════════════════════════════════════════════════════════════╝ +# The project's YAML uses 'N'/'E'/'O' for parity and 1/2 (numeric) for +# stopbits. pyserial wants its own constants, so :meth:`from_config` +# translates here. + +_PARITY_MAP = { + "N": serial.PARITY_NONE, + "E": serial.PARITY_EVEN, + "O": serial.PARITY_ODD, +} + +_STOPBITS_MAP = { + 1.0: serial.STOPBITS_ONE, + 1.5: serial.STOPBITS_ONE_POINT_FIVE, + 2.0: serial.STOPBITS_TWO, +} + + +# ╔══════════════════════════════════════════════════════════════════════╗ +# ║ Numeric parsing ║ +# ╚══════════════════════════════════════════════════════════════════════╝ + +# Matches the first signed real number in a string. Used to extract +# floats from MEAS:VOLT? / MEAS:CURR? responses, which may include a +# trailing unit ('V' / 'A') depending on the firmware build. +_NUM_RE = re.compile(r"[-+]?\d*\.?\d+(?:[eE][-+]?\d+)?") + + +def _parse_float(s: str) -> Optional[float]: + """Return the first signed float found in ``s``, or ``None`` if absent. + + Robust against trailing units, surrounding whitespace, or empty + responses — all common on the bench. + """ + if not s: + return None + m = _NUM_RE.search(s) + return float(m.group()) if m else None + + +# ╔══════════════════════════════════════════════════════════════════════╗ +# ║ SerialParams ║ +# ╚══════════════════════════════════════════════════════════════════════╝ + + +@dataclass +class SerialParams: + """Plain serial-port settings consumed by :class:`OwonPSU`. + + Defaults match the typical Owon PSU configuration: 8N1 framing at + 115200 baud with no flow control. Override only what your bench + needs. + """ + + baudrate: int = 115200 # bits per second + timeout: float = 1.0 # read timeout (seconds) + bytesize: int = serial.EIGHTBITS + parity: str = serial.PARITY_NONE + stopbits: float = serial.STOPBITS_ONE + xonxoff: bool = False # software flow control (XON/XOFF) + rtscts: bool = False # hardware flow control (RTS/CTS) + dsrdtr: bool = False # hardware flow control (DSR/DTR) + write_timeout: float = 1.0 # write timeout (seconds) + + @classmethod + def from_config(cls, cfg) -> "SerialParams": + """Build a :class:`SerialParams` from a ``PowerSupplyConfig`` dataclass. + + ``cfg`` is the same ``EcuTestConfig.power_supply`` block tests + already use. This method translates its human-friendly strings + ('N', '1') into the pyserial constants and casts numeric fields + to the expected types — saving every test author from rewriting + the same parity/stopbits dictionary lookup. + """ + parity = _PARITY_MAP.get(str(cfg.parity).upper(), serial.PARITY_NONE) + stopbits = _STOPBITS_MAP.get(float(cfg.stopbits), serial.STOPBITS_ONE) + return cls( + baudrate=int(cfg.baudrate), + timeout=float(cfg.timeout), + parity=parity, + stopbits=stopbits, + xonxoff=bool(cfg.xonxoff), + rtscts=bool(cfg.rtscts), + dsrdtr=bool(cfg.dsrdtr), + ) + + +# ╔══════════════════════════════════════════════════════════════════════╗ +# ║ OwonPSU controller ║ +# ╚══════════════════════════════════════════════════════════════════════╝ + + +class OwonPSU: + """Programmatic Owon-style PSU controller over serial SCPI. + + Construct, then either: + + 1. Use as a context manager — opens on ``__enter__``, closes on + ``__exit__`` (and turns the output OFF first if + ``safe_off_on_close`` is True):: + + with OwonPSU(port, params) as psu: + idn = psu.idn() + psu.set_voltage(1, 5.0) + + 2. Or call :meth:`open` / :meth:`close` manually if you need + finer control of the lifecycle. + + See module docstring for the SCPI dialect this class targets. + """ + + def __init__( + self, + port: str, + params: SerialParams | None = None, + eol: str = "\n", + *, + safe_off_on_close: bool = True, + ) -> None: + # Note: keyword-only ``safe_off_on_close`` keeps the historical + # positional signature ``OwonPSU(port, params, eol)`` stable for + # existing callers (e.g. vendor/Owon/owon_psu_quick_demo.py). + self.port = port + self.params = params or SerialParams() + self.eol = eol + self._safe_off = safe_off_on_close + self._ser: Optional[Serial] = None + + @classmethod + def from_config(cls, cfg, *, safe_off_on_close: bool = True) -> "OwonPSU": + """Construct (without opening) from ``EcuTestConfig.power_supply``. + + Equivalent to:: + + OwonPSU( + port=cfg.port, + params=SerialParams.from_config(cfg), + eol=cfg.eol or "\\n", + safe_off_on_close=safe_off_on_close, + ) + + Use as a context manager once constructed:: + + with OwonPSU.from_config(config.power_supply) as psu: + ... + """ + return cls( + port=str(cfg.port).strip(), + params=SerialParams.from_config(cfg), + eol=cfg.eol or "\n", + safe_off_on_close=safe_off_on_close, + ) + + # ---- lifecycle -------------------------------------------------------- + + def open(self) -> None: + """Open the serial port. Idempotent — no-op if already open. + + We assemble the ``Serial`` object field-by-field instead of + passing everything to its constructor so that ``open()`` only + runs once at the end. This makes failures easier to diagnose + because the port name is set before the open call. + """ + if self._ser and self._ser.is_open: + return + ser = Serial() + ser.port = self.port + ser.baudrate = self.params.baudrate + ser.bytesize = self.params.bytesize + ser.parity = self.params.parity + ser.stopbits = self.params.stopbits + ser.xonxoff = self.params.xonxoff + ser.rtscts = self.params.rtscts + ser.dsrdtr = self.params.dsrdtr + ser.timeout = self.params.timeout + ser.write_timeout = self.params.write_timeout + ser.open() + self._ser = ser + + def close(self) -> None: + """Close the serial port. Optionally turns the PSU output OFF first. + + With ``safe_off_on_close=True`` (the default), this attempts to + send ``output 0`` before closing — protecting against leaving + the bench powered on after an aborted test. Errors during the + safe-off step are swallowed so the close still completes. + """ + if self._ser and self._ser.is_open: + if self._safe_off: + try: + self.set_output(False) + except Exception: + # Swallow: the close itself is more important than the + # cosmetic safe-off attempt. Whatever caused the failure + # will surface elsewhere if it matters. + pass + try: + self._ser.close() + finally: + self._ser = None + + def __enter__(self) -> "OwonPSU": + self.open() + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + @property + def is_open(self) -> bool: + """``True`` iff the underlying serial port is currently open.""" + return bool(self._ser and self._ser.is_open) + + # ---- low-level serial I/O -------------------------------------------- + + def write(self, cmd: str) -> None: + """Send a SCPI command. The terminator (``eol``) is appended. + + SCPI commands don't return data — use :meth:`query` for any + command ending in ``?`` (which is how the Owon dialect signals + "this is a query, please respond on a single line"). + """ + if not self._ser: + raise RuntimeError("Port is not open") + data = (cmd + self.eol).encode("ascii", errors="ignore") + self._ser.write(data) + self._ser.flush() + + def query(self, q: str) -> str: + """Send a query and read a single-line response with ``readline()``. + + Both buffers are flushed first to discard any stale bytes left + over from previous commands or from the kernel's serial driver. + The trailing ``\\r\\n`` / ``\\n`` is stripped from the return + value so callers see clean strings. + """ + if not self._ser: + raise RuntimeError("Port is not open") + try: + self._ser.reset_input_buffer() + self._ser.reset_output_buffer() + except Exception: + # Some platforms / drivers don't implement these. Best-effort. + pass + self._ser.write((q + self.eol).encode("ascii", errors="ignore")) + self._ser.flush() + line = self._ser.readline().strip() + return line.decode("ascii", errors="ignore") + + # ---- high-level operations: raw string responses --------------------- + + def idn(self) -> str: + """Return the device identification string (``*IDN?``).""" + return self.query("*IDN?") + + def set_voltage(self, channel: int, volts: float) -> None: + """Set the voltage setpoint via ``SOUR:VOLT ``. + + ``channel`` is currently **ignored** (this firmware exposes a + single channel). The parameter is kept in the signature for + forward compatibility with multi-channel PSUs that prefix the + command with ``INST:NSEL `` or use ``SOUR:VOLT``. + """ + self.write(f"SOUR:VOLT {volts:.3f}") + + def set_current(self, channel: int, amps: float) -> None: + """Set the current limit via ``SOUR:CURR `` (channel ignored).""" + self.write(f"SOUR:CURR {amps:.3f}") + + def set_output(self, on: bool) -> None: + """Enable or disable the output (``output 1`` / ``output 0``). + + Note: this dialect uses lowercase ``output 1/0``, NOT the more + common ``OUTP ON``/``OUTP OFF`` from the SCPI standard. The + Owon firmware does not accept the standard form. + """ + self.write("output 1" if on else "output 0") + + def output_status(self) -> str: + """Raw ``output?`` response (e.g. ``'ON'``, ``'OFF'``, ``'1'``, ``'0'``).""" + return self.query("output?") + + def measure_voltage(self) -> str: + """Raw ``MEAS:VOLT?`` response (string; may include a ``V`` suffix).""" + return self.query("MEAS:VOLT?") + + def measure_current(self) -> str: + """Raw ``MEAS:CURR?`` response (string; may include an ``A`` suffix).""" + return self.query("MEAS:CURR?") + + # ---- high-level operations: parsed numerics -------------------------- + # + # These wrap the raw queries above and return Python floats / bools, + # so tests can write ``assert 4.9 < psu.measure_voltage_v() < 5.1`` + # instead of parsing strings themselves. + + def measure_voltage_v(self) -> Optional[float]: + """Measured voltage as a ``float`` (V), or ``None`` if unparseable.""" + return _parse_float(self.measure_voltage()) + + def measure_current_a(self) -> Optional[float]: + """Measured current as a ``float`` (A), or ``None`` if unparseable.""" + return _parse_float(self.measure_current()) + + def output_is_on(self) -> Optional[bool]: + """Decoded output state. Returns ``None`` if the response is unknown. + + Accepts ``'ON'``/``'OFF'`` (case-insensitive) and ``'1'``/``'0'`` + — the two conventions Owon firmware versions are known to use. + """ + s = self.output_status().strip().upper() + if s in ("ON", "1", "TRUE"): + return True + if s in ("OFF", "0", "FALSE"): + return False + return None + + +# ╔══════════════════════════════════════════════════════════════════════╗ +# ║ Discovery helpers ║ +# ╚══════════════════════════════════════════════════════════════════════╝ + + +def try_idn_on_port(port: str, params: SerialParams) -> str: + """Open ``port`` briefly, send ``*IDN?``, return the response (or ``""``). + + This is the primitive used by :func:`scan_ports`. It uses + :class:`OwonPSU` internally with ``safe_off_on_close=False`` (we're + only probing, not driving the output), and any exception during + open / query is swallowed and reported as an empty string so the + scanner can simply skip non-responding ports. + """ + try: + with OwonPSU(port, params, safe_off_on_close=False) as psu: + return psu.idn() + except Exception: + return "" + + +def scan_ports(params: SerialParams | None = None) -> list[tuple[str, str]]: + """Probe every serial port and collect ``(port, idn_response)`` pairs. + + Returns only ports that produced a non-empty IDN. Useful when you + don't know which COM/tty the PSU is on. + """ + params = params or SerialParams() + results: list[tuple[str, str]] = [] + for p in list_ports.comports(): + resp = try_idn_on_port(p.device, params) + if resp: + results.append((p.device, resp)) + return results + + +def auto_detect( + params: SerialParams | None = None, + idn_substr: str | None = None, +) -> Optional[str]: + """Find the first port whose IDN matches ``idn_substr``, else first responder. + + Pass ``idn_substr="OWON"`` (or similar) to reject other SCPI + devices on the same machine. Match is case-insensitive substring. + """ + params = params or SerialParams() + matches = scan_ports(params) + if not matches: + return None + if idn_substr: + isub = idn_substr.lower() + for port, idn in matches: + if isub in idn.lower(): + return port + return matches[0][0] + + +# ╔══════════════════════════════════════════════════════════════════════╗ +# ║ Cross-platform port resolution ║ +# ╚══════════════════════════════════════════════════════════════════════╝ +# +# A bench config typically names the PSU port the way Windows sees it +# (``COM7``). When the same config is run from Linux or WSL, that name +# is meaningless and the test fails to open the port. +# +# The helpers below let one config work on every platform: +# +# 1. ``windows_com_to_linux`` / ``linux_serial_to_windows`` translate +# between the two naming conventions for the same physical UART. +# WSL1 exposes Windows COMx as /dev/ttyS(x-1). +# +# 2. ``candidate_ports`` builds an ordered list of ports worth trying +# for a given configured value, including platform translations and +# common USB-serial device files on Linux. +# +# 3. ``resolve_port`` walks the candidate list, opens each briefly to +# send ``*IDN?``, and returns the first match (filtered by +# ``idn_substr`` if provided). Ultimate fallback: a full +# :func:`scan_ports`. + + +def windows_com_to_linux(com_name: str) -> Optional[str]: + """Map a Windows COM name to its WSL/Linux device file. + + ``COM1 → /dev/ttyS0``, ``COM2 → /dev/ttyS1``, … + Returns ``None`` if ``com_name`` doesn't look like a COM port. + """ + if not com_name: + return None + s = com_name.strip().upper() + if not s.startswith("COM"): + return None + try: + n = int(s[3:]) + except ValueError: + return None + if n < 1: + return None + return f"/dev/ttyS{n - 1}" + + +def linux_serial_to_windows(dev_name: str) -> Optional[str]: + """Map a Linux ``/dev/ttySn`` to a Windows COM name (``COMn+1``).""" + if not dev_name: + return None + prefix = "/dev/ttyS" + s = dev_name.strip() + if not s.startswith(prefix): + return None + try: + n = int(s[len(prefix):]) + except ValueError: + return None + return f"COM{n + 1}" + + +def _is_linux_like() -> bool: + """True for Linux and WSL hosts (anywhere /dev/tty* lives).""" + return platform.system() == "Linux" + + +def _is_windows() -> bool: + return platform.system() == "Windows" + + +def candidate_ports(configured: Optional[str]) -> list[str]: + """Return an ordered list of ports worth trying for ``configured``. + + Order, with duplicates removed: + + 1. The configured port itself (if any). Always honored first. + 2. Its cross-platform translation (e.g. ``COM7`` → ``/dev/ttyS6`` + on Linux/WSL). Lets a single bench config work on either side. + 3. On Linux/WSL only: ``/dev/ttyUSB*`` and ``/dev/ttyACM*`` — + common USB-serial adapter device files. These often surface + under WSL2 via ``usbipd-win`` and won't be reachable through + the COMx → ttySn mapping. + """ + seen: list[str] = [] + + def _add(p: Optional[str]) -> None: + if p and p not in seen: + seen.append(p) + + # 1. configured port verbatim + _add(configured) + + # 2. platform-aware translation of the configured port + if configured: + if _is_linux_like(): + _add(windows_com_to_linux(configured)) + elif _is_windows(): + _add(linux_serial_to_windows(configured)) + + # 3. USB-serial fallbacks on Linux/WSL + if _is_linux_like(): + for pattern in ("/dev/ttyUSB*", "/dev/ttyACM*"): + for p in sorted(glob.glob(pattern)): + _add(p) + + return seen + + +def resolve_port( + configured: Optional[str], + *, + idn_substr: Optional[str] = None, + params: Optional[SerialParams] = None, +) -> Optional[tuple[str, str]]: + """Find a working PSU port and return ``(port, idn_response)``. + + Strategy: + + 1. Try every port from :func:`candidate_ports` (configured port + + cross-platform translations + Linux USB-serial paths). + 2. If none matched, do a full :func:`scan_ports` of every serial + port on the host as a last resort. + + If ``idn_substr`` is set, only ports whose IDN contains it (case- + insensitively) are accepted — this guards against picking up a + different SCPI device that happens to be plugged in. If + ``idn_substr`` is ``None``, the first responding port wins. + + Returns ``None`` if nothing responded. + """ + params = params or SerialParams() + + def _matches(idn: str) -> bool: + if not idn: + return False + return idn_substr is None or idn_substr.lower() in idn.lower() + + # Phase 1: candidate list (cheap, targeted) + for port in candidate_ports(configured): + # Skip Linux device files that don't exist (avoids ENOENT noise) + if port.startswith("/dev/") and not os.path.exists(port): + continue + idn = try_idn_on_port(port, params) + if _matches(idn): + return port, idn + + # Phase 2: scan everything pyserial knows about (broad fallback) + for port, idn in scan_ports(params): + if _matches(idn): + return port, idn + + return None + + +__all__ = [ + "SerialParams", + "OwonPSU", + "scan_ports", + "auto_detect", + "try_idn_on_port", + "windows_com_to_linux", + "linux_serial_to_windows", + "candidate_ports", + "resolve_port", +]