power: cross-platform PSU port resolver, parsed numerics, safe-off

owon_psu.py upgrades (all backward-compatible):

- SerialParams.from_config() and OwonPSU.from_config() factories that
  translate the YAML power_supply block (parity 'N', stopbits 1.0)
  into pyserial constants — eliminates the boilerplate every test
  was duplicating.

- Parsed-numeric measurement helpers: measure_voltage_v(),
  measure_current_a(), output_is_on(). Tests can now assert on
  floats / bools instead of regex-ing strings.

- safe_off_on_close=True (new ctor kwarg, default on) — close()
  sends 'output 0' before closing the port. Last-ditch protection
  against leaving the bench powered on after an aborted test.
  Keyword-only so the historical positional ctor signature is
  preserved.

- Cross-platform port resolver: windows_com_to_linux,
  linux_serial_to_windows, candidate_ports, resolve_port. The
  resolver tries the configured port verbatim, then its
  cross-platform translation (COM7 ↔ /dev/ttyS6 on WSL1), then
  Linux USB-serial paths (/dev/ttyUSB*, /dev/ttyACM*), then a full
  scan_ports() with optional idn_substr filter. One bench config
  works on Windows, WSL1, WSL2 + usbipd-win, and native Linux.

- try_idn_on_port refactored to use OwonPSU internally, removing
  ~25 lines of duplicated serial-port plumbing.

ecu_framework/power/__init__.py re-exports the new helpers so tests
can do `from ecu_framework.power import resolve_port, ...`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hosam-Eldin Mostafa 2026-05-08 19:00:12 +02:00
parent 079abc9356
commit c6d7669b90
2 changed files with 622 additions and 206 deletions

View File

@ -1,13 +1,30 @@
"""Power control helpers for ECU tests.
Currently includes Owon PSU serial SCPI controller.
Currently includes Owon PSU serial SCPI controller plus a cross-
platform port resolver so the same bench config works on Windows,
Linux, and WSL.
"""
from .owon_psu import SerialParams, OwonPSU, scan_ports, auto_detect
from .owon_psu import (
SerialParams,
OwonPSU,
scan_ports,
auto_detect,
try_idn_on_port,
candidate_ports,
resolve_port,
windows_com_to_linux,
linux_serial_to_windows,
)
__all__ = [
"SerialParams",
"OwonPSU",
"scan_ports",
"auto_detect",
"try_idn_on_port",
"candidate_ports",
"resolve_port",
"windows_com_to_linux",
"linux_serial_to_windows",
]

View File

@ -1,48 +1,220 @@
"""Owon PSU SCPI control over raw serial (pyserial).
"""Owon PSU SCPI control over a raw serial link (pyserial).
This module provides a small, programmatic API suitable for tests:
WHAT THIS MODULE GIVES YOU
--------------------------
- :class:`SerialParams` a small dataclass for the pyserial settings.
Construct directly, or use :meth:`SerialParams.from_config` to build
one from the project's central PSU configuration.
- :class:`OwonPSU` context-managed controller. Wraps a serial
port and exposes the PSU's SCPI dialect as Python methods.
- :func:`scan_ports` probe every serial port on the host for a
device that answers ``*IDN?``.
- :func:`auto_detect` pick a port by IDN substring, or fall back
to the first responder.
- OwonPSU: context-manageable controller class
- scan_ports(): find devices responding to *IDN?
- auto_detect(): select the first matching device by IDN substring
THE SCPI DIALECT THIS PSU EXPECTS
---------------------------------
Owon's PSU firmware speaks a near-SCPI dialect over a plain newline-
terminated serial link. The commands this module uses (matching the
working bench example):
Behavior follows the working quick demo example (serial):
- Both commands and queries are terminated with a newline ("\n" by default).
- Queries use readline() to fetch a single-line response.
- Command set uses: 'output 0/1', 'output?', 'SOUR:VOLT <V>', 'SOUR:CURR <A>', 'MEAS:VOLT?', 'MEAS:CURR?', '*IDN?'
*IDN? identification string
output 1 / output 0 enable / disable the output (lowercase, NOT
the standard ``OUTP ON`` / ``OUTP OFF``)
output? output state (returns 'ON'/'OFF' or '1'/'0')
SOUR:VOLT <V> set the voltage setpoint, volts
SOUR:CURR <A> set the current limit, amps
MEAS:VOLT? read measured voltage (string, may include 'V')
MEAS:CURR? read measured current (string, may include 'A')
Both commands and queries are terminated with ``\\n`` (configurable via
the ``eol`` argument). Queries use ``readline()`` to fetch a single-
line response.
SAFETY: ``OwonPSU`` defaults to ``safe_off_on_close=True``, which sends
``output 0`` before closing the port. That way an aborted test or an
exception cannot leave the bench powered on after the controller is
disposed.
"""
from __future__ import annotations
import glob
import os
import platform
import re
from dataclasses import dataclass
from time import sleep
from typing import Iterable, Optional
from typing import Optional
import serial
from serial import Serial
from serial.tools import list_ports
# ╔══════════════════════════════════════════════════════════════════════╗
# ║ Mappings: human-friendly config strings → pyserial constants ║
# ╚══════════════════════════════════════════════════════════════════════╝
# The project's YAML uses 'N'/'E'/'O' for parity and 1/2 (numeric) for
# stopbits. pyserial wants its own constants, so :meth:`from_config`
# translates here.
_PARITY_MAP = {
"N": serial.PARITY_NONE,
"E": serial.PARITY_EVEN,
"O": serial.PARITY_ODD,
}
_STOPBITS_MAP = {
1.0: serial.STOPBITS_ONE,
1.5: serial.STOPBITS_ONE_POINT_FIVE,
2.0: serial.STOPBITS_TWO,
}
# ╔══════════════════════════════════════════════════════════════════════╗
# ║ Numeric parsing ║
# ╚══════════════════════════════════════════════════════════════════════╝
# Matches the first signed real number in a string. Used to extract
# floats from MEAS:VOLT? / MEAS:CURR? responses, which may include a
# trailing unit ('V' / 'A') depending on the firmware build.
_NUM_RE = re.compile(r"[-+]?\d*\.?\d+(?:[eE][-+]?\d+)?")
def _parse_float(s: str) -> Optional[float]:
"""Return the first signed float found in ``s``, or ``None`` if absent.
Robust against trailing units, surrounding whitespace, or empty
responses all common on the bench.
"""
if not s:
return None
m = _NUM_RE.search(s)
return float(m.group()) if m else None
# ╔══════════════════════════════════════════════════════════════════════╗
# ║ SerialParams ║
# ╚══════════════════════════════════════════════════════════════════════╝
@dataclass
class SerialParams:
baudrate: int = 115200
timeout: float = 1.0 # seconds
"""Plain serial-port settings consumed by :class:`OwonPSU`.
Defaults match the typical Owon PSU configuration: 8N1 framing at
115200 baud with no flow control. Override only what your bench
needs.
"""
baudrate: int = 115200 # bits per second
timeout: float = 1.0 # read timeout (seconds)
bytesize: int = serial.EIGHTBITS
parity: str = serial.PARITY_NONE
stopbits: float = serial.STOPBITS_ONE
xonxoff: bool = False
rtscts: bool = False
dsrdtr: bool = False
write_timeout: float = 1.0 # seconds
xonxoff: bool = False # software flow control (XON/XOFF)
rtscts: bool = False # hardware flow control (RTS/CTS)
dsrdtr: bool = False # hardware flow control (DSR/DTR)
write_timeout: float = 1.0 # write timeout (seconds)
@classmethod
def from_config(cls, cfg) -> "SerialParams":
"""Build a :class:`SerialParams` from a ``PowerSupplyConfig`` dataclass.
``cfg`` is the same ``EcuTestConfig.power_supply`` block tests
already use. This method translates its human-friendly strings
('N', '1') into the pyserial constants and casts numeric fields
to the expected types saving every test author from rewriting
the same parity/stopbits dictionary lookup.
"""
parity = _PARITY_MAP.get(str(cfg.parity).upper(), serial.PARITY_NONE)
stopbits = _STOPBITS_MAP.get(float(cfg.stopbits), serial.STOPBITS_ONE)
return cls(
baudrate=int(cfg.baudrate),
timeout=float(cfg.timeout),
parity=parity,
stopbits=stopbits,
xonxoff=bool(cfg.xonxoff),
rtscts=bool(cfg.rtscts),
dsrdtr=bool(cfg.dsrdtr),
)
# ╔══════════════════════════════════════════════════════════════════════╗
# ║ OwonPSU controller ║
# ╚══════════════════════════════════════════════════════════════════════╝
class OwonPSU:
def __init__(self, port: str, params: SerialParams | None = None, eol: str = "\n") -> None:
"""Programmatic Owon-style PSU controller over serial SCPI.
Construct, then either:
1. Use as a context manager opens on ``__enter__``, closes on
``__exit__`` (and turns the output OFF first if
``safe_off_on_close`` is True)::
with OwonPSU(port, params) as psu:
idn = psu.idn()
psu.set_voltage(1, 5.0)
2. Or call :meth:`open` / :meth:`close` manually if you need
finer control of the lifecycle.
See module docstring for the SCPI dialect this class targets.
"""
def __init__(
self,
port: str,
params: SerialParams | None = None,
eol: str = "\n",
*,
safe_off_on_close: bool = True,
) -> None:
# Note: keyword-only ``safe_off_on_close`` keeps the historical
# positional signature ``OwonPSU(port, params, eol)`` stable for
# existing callers (e.g. vendor/Owon/owon_psu_quick_demo.py).
self.port = port
self.params = params or SerialParams()
self.eol = eol
self._safe_off = safe_off_on_close
self._ser: Optional[Serial] = None
@classmethod
def from_config(cls, cfg, *, safe_off_on_close: bool = True) -> "OwonPSU":
"""Construct (without opening) from ``EcuTestConfig.power_supply``.
Equivalent to::
OwonPSU(
port=cfg.port,
params=SerialParams.from_config(cfg),
eol=cfg.eol or "\\n",
safe_off_on_close=safe_off_on_close,
)
Use as a context manager once constructed::
with OwonPSU.from_config(config.power_supply) as psu:
...
"""
return cls(
port=str(cfg.port).strip(),
params=SerialParams.from_config(cfg),
eol=cfg.eol or "\n",
safe_off_on_close=safe_off_on_close,
)
# ---- lifecycle --------------------------------------------------------
def open(self) -> None:
"""Open the serial port. Idempotent — no-op if already open.
We assemble the ``Serial`` object field-by-field instead of
passing everything to its constructor so that ``open()`` only
runs once at the end. This makes failures easier to diagnose
because the port name is set before the open call.
"""
if self._ser and self._ser.is_open:
return
ser = Serial()
@ -60,7 +232,22 @@ class OwonPSU:
self._ser = ser
def close(self) -> None:
"""Close the serial port. Optionally turns the PSU output OFF first.
With ``safe_off_on_close=True`` (the default), this attempts to
send ``output 0`` before closing protecting against leaving
the bench powered on after an aborted test. Errors during the
safe-off step are swallowed so the close still completes.
"""
if self._ser and self._ser.is_open:
if self._safe_off:
try:
self.set_output(False)
except Exception:
# Swallow: the close itself is more important than the
# cosmetic safe-off attempt. Whatever caused the failure
# will surface elsewhere if it matters.
pass
try:
self._ser.close()
finally:
@ -75,11 +262,18 @@ class OwonPSU:
@property
def is_open(self) -> bool:
"""``True`` iff the underlying serial port is currently open."""
return bool(self._ser and self._ser.is_open)
# ---- low-level ops ----
# ---- low-level serial I/O --------------------------------------------
def write(self, cmd: str) -> None:
"""Write a SCPI command (append eol)."""
"""Send a SCPI command. The terminator (``eol``) is appended.
SCPI commands don't return data — use :meth:`query` for any
command ending in ``?`` (which is how the Owon dialect signals
"this is a query, please respond on a single line").
"""
if not self._ser:
raise RuntimeError("Port is not open")
data = (cmd + self.eol).encode("ascii", errors="ignore")
@ -87,92 +281,140 @@ class OwonPSU:
self._ser.flush()
def query(self, q: str) -> str:
"""Send a query with terminator and return a single-line response using readline()."""
"""Send a query and read a single-line response with ``readline()``.
Both buffers are flushed first to discard any stale bytes left
over from previous commands or from the kernel's serial driver.
The trailing ``\\r\\n`` / ``\\n`` is stripped from the return
value so callers see clean strings.
"""
if not self._ser:
raise RuntimeError("Port is not open")
# clear buffers to avoid stale data
try:
self._ser.reset_input_buffer()
self._ser.reset_output_buffer()
except Exception:
# Some platforms / drivers don't implement these. Best-effort.
pass
self._ser.write((q + self.eol).encode("ascii", errors="ignore"))
self._ser.flush()
line = self._ser.readline().strip()
return line.decode("ascii", errors="ignore")
# ---- high-level ops ----
# ---- high-level operations: raw string responses ---------------------
def idn(self) -> str:
"""Return the device identification string (``*IDN?``)."""
return self.query("*IDN?")
def set_voltage(self, channel: int, volts: float) -> None:
# Using SOUR:VOLT <V> per working example
"""Set the voltage setpoint via ``SOUR:VOLT <V>``.
``channel`` is currently **ignored** (this firmware exposes a
single channel). The parameter is kept in the signature for
forward compatibility with multi-channel PSUs that prefix the
command with ``INST:NSEL <n>`` or use ``SOUR<n>:VOLT``.
"""
self.write(f"SOUR:VOLT {volts:.3f}")
def set_current(self, channel: int, amps: float) -> None:
# Using SOUR:CURR <A> per working example
"""Set the current limit via ``SOUR:CURR <A>`` (channel ignored)."""
self.write(f"SOUR:CURR {amps:.3f}")
def set_output(self, on: bool) -> None:
# Using 'output 1/0' per working example
"""Enable or disable the output (``output 1`` / ``output 0``).
Note: this dialect uses lowercase ``output 1/0``, NOT the more
common ``OUTP ON``/``OUTP OFF`` from the SCPI standard. The
Owon firmware does not accept the standard form.
"""
self.write("output 1" if on else "output 0")
def output_status(self) -> str:
"""Raw ``output?`` response (e.g. ``'ON'``, ``'OFF'``, ``'1'``, ``'0'``)."""
return self.query("output?")
def measure_voltage(self) -> str:
"""Raw ``MEAS:VOLT?`` response (string; may include a ``V`` suffix)."""
return self.query("MEAS:VOLT?")
def measure_current(self) -> str:
"""Raw ``MEAS:CURR?`` response (string; may include an ``A`` suffix)."""
return self.query("MEAS:CURR?")
# ---- high-level operations: parsed numerics --------------------------
#
# These wrap the raw queries above and return Python floats / bools,
# so tests can write ``assert 4.9 < psu.measure_voltage_v() < 5.1``
# instead of parsing strings themselves.
def measure_voltage_v(self) -> Optional[float]:
"""Measured voltage as a ``float`` (V), or ``None`` if unparseable."""
return _parse_float(self.measure_voltage())
def measure_current_a(self) -> Optional[float]:
"""Measured current as a ``float`` (A), or ``None`` if unparseable."""
return _parse_float(self.measure_current())
def output_is_on(self) -> Optional[bool]:
"""Decoded output state. Returns ``None`` if the response is unknown.
Accepts ``'ON'``/``'OFF'`` (case-insensitive) and ``'1'``/``'0'``
the two conventions Owon firmware versions are known to use.
"""
s = self.output_status().strip().upper()
if s in ("ON", "1", "TRUE"):
return True
if s in ("OFF", "0", "FALSE"):
return False
return None
# ╔══════════════════════════════════════════════════════════════════════╗
# ║ Discovery helpers ║
# ╚══════════════════════════════════════════════════════════════════════╝
# ------- discovery helpers -------
def try_idn_on_port(port: str, params: SerialParams) -> str:
dev: Optional[Serial] = None
"""Open ``port`` briefly, send ``*IDN?``, return the response (or ``""``).
This is the primitive used by :func:`scan_ports`. It uses
:class:`OwonPSU` internally with ``safe_off_on_close=False`` (we're
only probing, not driving the output), and any exception during
open / query is swallowed and reported as an empty string so the
scanner can simply skip non-responding ports.
"""
try:
dev = Serial()
dev.port = port
dev.baudrate = params.baudrate
dev.bytesize = params.bytesize
dev.parity = params.parity
dev.stopbits = params.stopbits
dev.xonxoff = params.xonxoff
dev.rtscts = params.rtscts
dev.dsrdtr = params.dsrdtr
dev.timeout = params.timeout
dev.write_timeout = params.write_timeout
dev.open()
# Query with newline terminator and read a single line
dev.reset_input_buffer(); dev.reset_output_buffer()
dev.write(b"*IDN?\n"); dev.flush()
line = dev.readline().strip()
return line.decode("ascii", errors="ignore")
with OwonPSU(port, params, safe_off_on_close=False) as psu:
return psu.idn()
except Exception:
return ""
finally:
if dev and dev.is_open:
try:
dev.close()
except Exception:
pass
def scan_ports(params: SerialParams | None = None) -> list[tuple[str, str]]:
"""Return [(port, idn_response), ...] for ports that responded."""
"""Probe every serial port and collect ``(port, idn_response)`` pairs.
Returns only ports that produced a non-empty IDN. Useful when you
don't know which COM/tty the PSU is on.
"""
params = params or SerialParams()
results: list[tuple[str, str]] = []
for p in list_ports.comports():
dev = p.device
resp = try_idn_on_port(dev, params)
resp = try_idn_on_port(p.device, params)
if resp:
results.append((dev, resp))
results.append((p.device, resp))
return results
def auto_detect(params: SerialParams | None = None, idn_substr: str | None = None) -> Optional[str]:
"""Return the first port whose *IDN? contains idn_substr (case-insensitive), else first responder."""
def auto_detect(
params: SerialParams | None = None,
idn_substr: str | None = None,
) -> Optional[str]:
"""Find the first port whose IDN matches ``idn_substr``, else first responder.
Pass ``idn_substr="OWON"`` (or similar) to reject other SCPI
devices on the same machine. Match is case-insensitive substring.
"""
params = params or SerialParams()
matches = scan_ports(params)
if not matches:
@ -185,9 +427,166 @@ def auto_detect(params: SerialParams | None = None, idn_substr: str | None = Non
return matches[0][0]
# ╔══════════════════════════════════════════════════════════════════════╗
# ║ Cross-platform port resolution ║
# ╚══════════════════════════════════════════════════════════════════════╝
#
# A bench config typically names the PSU port the way Windows sees it
# (``COM7``). When the same config is run from Linux or WSL, that name
# is meaningless and the test fails to open the port.
#
# The helpers below let one config work on every platform:
#
# 1. ``windows_com_to_linux`` / ``linux_serial_to_windows`` translate
# between the two naming conventions for the same physical UART.
# WSL1 exposes Windows COMx as /dev/ttyS(x-1).
#
# 2. ``candidate_ports`` builds an ordered list of ports worth trying
# for a given configured value, including platform translations and
# common USB-serial device files on Linux.
#
# 3. ``resolve_port`` walks the candidate list, opens each briefly to
# send ``*IDN?``, and returns the first match (filtered by
# ``idn_substr`` if provided). Ultimate fallback: a full
# :func:`scan_ports`.
def windows_com_to_linux(com_name: str) -> Optional[str]:
"""Map a Windows COM name to its WSL/Linux device file.
``COM1 /dev/ttyS0``, ``COM2 /dev/ttyS1``,
Returns ``None`` if ``com_name`` doesn't look like a COM port.
"""
if not com_name:
return None
s = com_name.strip().upper()
if not s.startswith("COM"):
return None
try:
n = int(s[3:])
except ValueError:
return None
if n < 1:
return None
return f"/dev/ttyS{n - 1}"
def linux_serial_to_windows(dev_name: str) -> Optional[str]:
"""Map a Linux ``/dev/ttySn`` to a Windows COM name (``COMn+1``)."""
if not dev_name:
return None
prefix = "/dev/ttyS"
s = dev_name.strip()
if not s.startswith(prefix):
return None
try:
n = int(s[len(prefix):])
except ValueError:
return None
return f"COM{n + 1}"
def _is_linux_like() -> bool:
"""True for Linux and WSL hosts (anywhere /dev/tty* lives)."""
return platform.system() == "Linux"
def _is_windows() -> bool:
return platform.system() == "Windows"
def candidate_ports(configured: Optional[str]) -> list[str]:
"""Return an ordered list of ports worth trying for ``configured``.
Order, with duplicates removed:
1. The configured port itself (if any). Always honored first.
2. Its cross-platform translation (e.g. ``COM7`` ``/dev/ttyS6``
on Linux/WSL). Lets a single bench config work on either side.
3. On Linux/WSL only: ``/dev/ttyUSB*`` and ``/dev/ttyACM*``
common USB-serial adapter device files. These often surface
under WSL2 via ``usbipd-win`` and won't be reachable through
the COMx ttySn mapping.
"""
seen: list[str] = []
def _add(p: Optional[str]) -> None:
if p and p not in seen:
seen.append(p)
# 1. configured port verbatim
_add(configured)
# 2. platform-aware translation of the configured port
if configured:
if _is_linux_like():
_add(windows_com_to_linux(configured))
elif _is_windows():
_add(linux_serial_to_windows(configured))
# 3. USB-serial fallbacks on Linux/WSL
if _is_linux_like():
for pattern in ("/dev/ttyUSB*", "/dev/ttyACM*"):
for p in sorted(glob.glob(pattern)):
_add(p)
return seen
def resolve_port(
configured: Optional[str],
*,
idn_substr: Optional[str] = None,
params: Optional[SerialParams] = None,
) -> Optional[tuple[str, str]]:
"""Find a working PSU port and return ``(port, idn_response)``.
Strategy:
1. Try every port from :func:`candidate_ports` (configured port +
cross-platform translations + Linux USB-serial paths).
2. If none matched, do a full :func:`scan_ports` of every serial
port on the host as a last resort.
If ``idn_substr`` is set, only ports whose IDN contains it (case-
insensitively) are accepted this guards against picking up a
different SCPI device that happens to be plugged in. If
``idn_substr`` is ``None``, the first responding port wins.
Returns ``None`` if nothing responded.
"""
params = params or SerialParams()
def _matches(idn: str) -> bool:
if not idn:
return False
return idn_substr is None or idn_substr.lower() in idn.lower()
# Phase 1: candidate list (cheap, targeted)
for port in candidate_ports(configured):
# Skip Linux device files that don't exist (avoids ENOENT noise)
if port.startswith("/dev/") and not os.path.exists(port):
continue
idn = try_idn_on_port(port, params)
if _matches(idn):
return port, idn
# Phase 2: scan everything pyserial knows about (broad fallback)
for port, idn in scan_ports(params):
if _matches(idn):
return port, idn
return None
__all__ = [
"SerialParams",
"OwonPSU",
"scan_ports",
"auto_detect",
"try_idn_on_port",
"windows_com_to_linux",
"linux_serial_to_windows",
"candidate_ports",
"resolve_port",
]