"""Owon PSU SCPI control over a raw serial link (pyserial). WHAT THIS MODULE GIVES YOU -------------------------- - :class:`SerialParams` — a small dataclass for the pyserial settings. Construct directly, or use :meth:`SerialParams.from_config` to build one from the project's central PSU configuration. - :class:`OwonPSU` — context-managed controller. Wraps a serial port and exposes the PSU's SCPI dialect as Python methods. - :func:`scan_ports` — probe every serial port on the host for a device that answers ``*IDN?``. - :func:`auto_detect` — pick a port by IDN substring, or fall back to the first responder. THE SCPI DIALECT THIS PSU EXPECTS --------------------------------- Owon's PSU firmware speaks a near-SCPI dialect over a plain newline- terminated serial link. The commands this module uses (matching the working bench example): *IDN? → identification string output 1 / output 0 → enable / disable the output (lowercase, NOT the standard ``OUTP ON`` / ``OUTP OFF``) output? → output state (returns 'ON'/'OFF' or '1'/'0') SOUR:VOLT → set the voltage setpoint, volts SOUR:CURR → set the current limit, amps MEAS:VOLT? → read measured voltage (string, may include 'V') MEAS:CURR? → read measured current (string, may include 'A') Both commands and queries are terminated with ``\\n`` (configurable via the ``eol`` argument). Queries use ``readline()`` to fetch a single- line response. SAFETY: ``OwonPSU`` defaults to ``safe_off_on_close=True``, which sends ``output 0`` before closing the port. That way an aborted test or an exception cannot leave the bench powered on after the controller is disposed. """ from __future__ import annotations import glob import os import platform import re from dataclasses import dataclass from typing import Optional import serial from serial import Serial from serial.tools import list_ports # ╔══════════════════════════════════════════════════════════════════════╗ # ║ Mappings: human-friendly config strings → pyserial constants ║ # ╚══════════════════════════════════════════════════════════════════════╝ # The project's YAML uses 'N'/'E'/'O' for parity and 1/2 (numeric) for # stopbits. pyserial wants its own constants, so :meth:`from_config` # translates here. _PARITY_MAP = { "N": serial.PARITY_NONE, "E": serial.PARITY_EVEN, "O": serial.PARITY_ODD, } _STOPBITS_MAP = { 1.0: serial.STOPBITS_ONE, 1.5: serial.STOPBITS_ONE_POINT_FIVE, 2.0: serial.STOPBITS_TWO, } # ╔══════════════════════════════════════════════════════════════════════╗ # ║ Numeric parsing ║ # ╚══════════════════════════════════════════════════════════════════════╝ # Matches the first signed real number in a string. Used to extract # floats from MEAS:VOLT? / MEAS:CURR? responses, which may include a # trailing unit ('V' / 'A') depending on the firmware build. _NUM_RE = re.compile(r"[-+]?\d*\.?\d+(?:[eE][-+]?\d+)?") def _parse_float(s: str) -> Optional[float]: """Return the first signed float found in ``s``, or ``None`` if absent. Robust against trailing units, surrounding whitespace, or empty responses — all common on the bench. """ if not s: return None m = _NUM_RE.search(s) return float(m.group()) if m else None # ╔══════════════════════════════════════════════════════════════════════╗ # ║ SerialParams ║ # ╚══════════════════════════════════════════════════════════════════════╝ @dataclass class SerialParams: """Plain serial-port settings consumed by :class:`OwonPSU`. Defaults match the typical Owon PSU configuration: 8N1 framing at 115200 baud with no flow control. Override only what your bench needs. """ baudrate: int = 115200 # bits per second timeout: float = 1.0 # read timeout (seconds) bytesize: int = serial.EIGHTBITS parity: str = serial.PARITY_NONE stopbits: float = serial.STOPBITS_ONE xonxoff: bool = False # software flow control (XON/XOFF) rtscts: bool = False # hardware flow control (RTS/CTS) dsrdtr: bool = False # hardware flow control (DSR/DTR) write_timeout: float = 1.0 # write timeout (seconds) @classmethod def from_config(cls, cfg) -> "SerialParams": """Build a :class:`SerialParams` from a ``PowerSupplyConfig`` dataclass. ``cfg`` is the same ``EcuTestConfig.power_supply`` block tests already use. This method translates its human-friendly strings ('N', '1') into the pyserial constants and casts numeric fields to the expected types — saving every test author from rewriting the same parity/stopbits dictionary lookup. """ parity = _PARITY_MAP.get(str(cfg.parity).upper(), serial.PARITY_NONE) stopbits = _STOPBITS_MAP.get(float(cfg.stopbits), serial.STOPBITS_ONE) return cls( baudrate=int(cfg.baudrate), timeout=float(cfg.timeout), parity=parity, stopbits=stopbits, xonxoff=bool(cfg.xonxoff), rtscts=bool(cfg.rtscts), dsrdtr=bool(cfg.dsrdtr), ) # ╔══════════════════════════════════════════════════════════════════════╗ # ║ OwonPSU controller ║ # ╚══════════════════════════════════════════════════════════════════════╝ class OwonPSU: """Programmatic Owon-style PSU controller over serial SCPI. Construct, then either: 1. Use as a context manager — opens on ``__enter__``, closes on ``__exit__`` (and turns the output OFF first if ``safe_off_on_close`` is True):: with OwonPSU(port, params) as psu: idn = psu.idn() psu.set_voltage(1, 5.0) 2. Or call :meth:`open` / :meth:`close` manually if you need finer control of the lifecycle. See module docstring for the SCPI dialect this class targets. """ def __init__( self, port: str, params: SerialParams | None = None, eol: str = "\n", *, safe_off_on_close: bool = True, ) -> None: # Note: keyword-only ``safe_off_on_close`` keeps the historical # positional signature ``OwonPSU(port, params, eol)`` stable for # existing callers (e.g. vendor/Owon/owon_psu_quick_demo.py). self.port = port self.params = params or SerialParams() self.eol = eol self._safe_off = safe_off_on_close self._ser: Optional[Serial] = None @classmethod def from_config(cls, cfg, *, safe_off_on_close: bool = True) -> "OwonPSU": """Construct (without opening) from ``EcuTestConfig.power_supply``. Equivalent to:: OwonPSU( port=cfg.port, params=SerialParams.from_config(cfg), eol=cfg.eol or "\\n", safe_off_on_close=safe_off_on_close, ) Use as a context manager once constructed:: with OwonPSU.from_config(config.power_supply) as psu: ... """ return cls( port=str(cfg.port).strip(), params=SerialParams.from_config(cfg), eol=cfg.eol or "\n", safe_off_on_close=safe_off_on_close, ) # ---- lifecycle -------------------------------------------------------- def open(self) -> None: """Open the serial port. Idempotent — no-op if already open. We assemble the ``Serial`` object field-by-field instead of passing everything to its constructor so that ``open()`` only runs once at the end. This makes failures easier to diagnose because the port name is set before the open call. """ if self._ser and self._ser.is_open: return ser = Serial() ser.port = self.port ser.baudrate = self.params.baudrate ser.bytesize = self.params.bytesize ser.parity = self.params.parity ser.stopbits = self.params.stopbits ser.xonxoff = self.params.xonxoff ser.rtscts = self.params.rtscts ser.dsrdtr = self.params.dsrdtr ser.timeout = self.params.timeout ser.write_timeout = self.params.write_timeout ser.open() self._ser = ser def close(self) -> None: """Close the serial port. Optionally turns the PSU output OFF first. With ``safe_off_on_close=True`` (the default), this attempts to send ``output 0`` before closing — protecting against leaving the bench powered on after an aborted test. Errors during the safe-off step are swallowed so the close still completes. """ if self._ser and self._ser.is_open: if self._safe_off: try: self.set_output(False) except Exception: # Swallow: the close itself is more important than the # cosmetic safe-off attempt. Whatever caused the failure # will surface elsewhere if it matters. pass try: self._ser.close() finally: self._ser = None def __enter__(self) -> "OwonPSU": self.open() return self def __exit__(self, exc_type, exc, tb) -> None: self.close() @property def is_open(self) -> bool: """``True`` iff the underlying serial port is currently open.""" return bool(self._ser and self._ser.is_open) # ---- low-level serial I/O -------------------------------------------- def write(self, cmd: str) -> None: """Send a SCPI command. The terminator (``eol``) is appended. SCPI commands don't return data — use :meth:`query` for any command ending in ``?`` (which is how the Owon dialect signals "this is a query, please respond on a single line"). """ if not self._ser: raise RuntimeError("Port is not open") data = (cmd + self.eol).encode("ascii", errors="ignore") self._ser.write(data) self._ser.flush() def query(self, q: str) -> str: """Send a query and read a single-line response with ``readline()``. Both buffers are flushed first to discard any stale bytes left over from previous commands or from the kernel's serial driver. The trailing ``\\r\\n`` / ``\\n`` is stripped from the return value so callers see clean strings. """ if not self._ser: raise RuntimeError("Port is not open") try: self._ser.reset_input_buffer() self._ser.reset_output_buffer() except Exception: # Some platforms / drivers don't implement these. Best-effort. pass self._ser.write((q + self.eol).encode("ascii", errors="ignore")) self._ser.flush() line = self._ser.readline().strip() return line.decode("ascii", errors="ignore") # ---- high-level operations: raw string responses --------------------- def idn(self) -> str: """Return the device identification string (``*IDN?``).""" return self.query("*IDN?") def set_voltage(self, channel: int, volts: float) -> None: """Set the voltage setpoint via ``SOUR:VOLT ``. ``channel`` is currently **ignored** (this firmware exposes a single channel). The parameter is kept in the signature for forward compatibility with multi-channel PSUs that prefix the command with ``INST:NSEL `` or use ``SOUR:VOLT``. """ self.write(f"SOUR:VOLT {volts:.3f}") def set_current(self, channel: int, amps: float) -> None: """Set the current limit via ``SOUR:CURR `` (channel ignored).""" self.write(f"SOUR:CURR {amps:.3f}") def set_output(self, on: bool) -> None: """Enable or disable the output (``output 1`` / ``output 0``). Note: this dialect uses lowercase ``output 1/0``, NOT the more common ``OUTP ON``/``OUTP OFF`` from the SCPI standard. The Owon firmware does not accept the standard form. """ self.write("output 1" if on else "output 0") def output_status(self) -> str: """Raw ``output?`` response (e.g. ``'ON'``, ``'OFF'``, ``'1'``, ``'0'``).""" return self.query("output?") def measure_voltage(self) -> str: """Raw ``MEAS:VOLT?`` response (string; may include a ``V`` suffix).""" return self.query("MEAS:VOLT?") def measure_current(self) -> str: """Raw ``MEAS:CURR?`` response (string; may include an ``A`` suffix).""" return self.query("MEAS:CURR?") # ---- high-level operations: parsed numerics -------------------------- # # These wrap the raw queries above and return Python floats / bools, # so tests can write ``assert 4.9 < psu.measure_voltage_v() < 5.1`` # instead of parsing strings themselves. def measure_voltage_v(self) -> Optional[float]: """Measured voltage as a ``float`` (V), or ``None`` if unparseable.""" return _parse_float(self.measure_voltage()) def measure_current_a(self) -> Optional[float]: """Measured current as a ``float`` (A), or ``None`` if unparseable.""" return _parse_float(self.measure_current()) def output_is_on(self) -> Optional[bool]: """Decoded output state. Returns ``None`` if the response is unknown. Accepts ``'ON'``/``'OFF'`` (case-insensitive) and ``'1'``/``'0'`` — the two conventions Owon firmware versions are known to use. """ s = self.output_status().strip().upper() if s in ("ON", "1", "TRUE"): return True if s in ("OFF", "0", "FALSE"): return False return None # ╔══════════════════════════════════════════════════════════════════════╗ # ║ Discovery helpers ║ # ╚══════════════════════════════════════════════════════════════════════╝ def try_idn_on_port(port: str, params: SerialParams) -> str: """Open ``port`` briefly, send ``*IDN?``, return the response (or ``""``). This is the primitive used by :func:`scan_ports`. It uses :class:`OwonPSU` internally with ``safe_off_on_close=False`` (we're only probing, not driving the output), and any exception during open / query is swallowed and reported as an empty string so the scanner can simply skip non-responding ports. """ try: with OwonPSU(port, params, safe_off_on_close=False) as psu: return psu.idn() except Exception: return "" def scan_ports(params: SerialParams | None = None) -> list[tuple[str, str]]: """Probe every serial port and collect ``(port, idn_response)`` pairs. Returns only ports that produced a non-empty IDN. Useful when you don't know which COM/tty the PSU is on. """ params = params or SerialParams() results: list[tuple[str, str]] = [] for p in list_ports.comports(): resp = try_idn_on_port(p.device, params) if resp: results.append((p.device, resp)) return results def auto_detect( params: SerialParams | None = None, idn_substr: str | None = None, ) -> Optional[str]: """Find the first port whose IDN matches ``idn_substr``, else first responder. Pass ``idn_substr="OWON"`` (or similar) to reject other SCPI devices on the same machine. Match is case-insensitive substring. """ params = params or SerialParams() matches = scan_ports(params) if not matches: return None if idn_substr: isub = idn_substr.lower() for port, idn in matches: if isub in idn.lower(): return port return matches[0][0] # ╔══════════════════════════════════════════════════════════════════════╗ # ║ Cross-platform port resolution ║ # ╚══════════════════════════════════════════════════════════════════════╝ # # A bench config typically names the PSU port the way Windows sees it # (``COM7``). When the same config is run from Linux or WSL, that name # is meaningless and the test fails to open the port. # # The helpers below let one config work on every platform: # # 1. ``windows_com_to_linux`` / ``linux_serial_to_windows`` translate # between the two naming conventions for the same physical UART. # WSL1 exposes Windows COMx as /dev/ttyS(x-1). # # 2. ``candidate_ports`` builds an ordered list of ports worth trying # for a given configured value, including platform translations and # common USB-serial device files on Linux. # # 3. ``resolve_port`` walks the candidate list, opens each briefly to # send ``*IDN?``, and returns the first match (filtered by # ``idn_substr`` if provided). Ultimate fallback: a full # :func:`scan_ports`. def windows_com_to_linux(com_name: str) -> Optional[str]: """Map a Windows COM name to its WSL/Linux device file. ``COM1 → /dev/ttyS0``, ``COM2 → /dev/ttyS1``, … Returns ``None`` if ``com_name`` doesn't look like a COM port. """ if not com_name: return None s = com_name.strip().upper() if not s.startswith("COM"): return None try: n = int(s[3:]) except ValueError: return None if n < 1: return None return f"/dev/ttyS{n - 1}" def linux_serial_to_windows(dev_name: str) -> Optional[str]: """Map a Linux ``/dev/ttySn`` to a Windows COM name (``COMn+1``).""" if not dev_name: return None prefix = "/dev/ttyS" s = dev_name.strip() if not s.startswith(prefix): return None try: n = int(s[len(prefix):]) except ValueError: return None return f"COM{n + 1}" def _is_linux_like() -> bool: """True for Linux and WSL hosts (anywhere /dev/tty* lives).""" return platform.system() == "Linux" def _is_windows() -> bool: return platform.system() == "Windows" def candidate_ports(configured: Optional[str]) -> list[str]: """Return an ordered list of ports worth trying for ``configured``. Order, with duplicates removed: 1. The configured port itself (if any). Always honored first. 2. Its cross-platform translation (e.g. ``COM7`` → ``/dev/ttyS6`` on Linux/WSL). Lets a single bench config work on either side. 3. On Linux/WSL only: ``/dev/ttyUSB*`` and ``/dev/ttyACM*`` — common USB-serial adapter device files. These often surface under WSL2 via ``usbipd-win`` and won't be reachable through the COMx → ttySn mapping. """ seen: list[str] = [] def _add(p: Optional[str]) -> None: if p and p not in seen: seen.append(p) # 1. configured port verbatim _add(configured) # 2. platform-aware translation of the configured port if configured: if _is_linux_like(): _add(windows_com_to_linux(configured)) elif _is_windows(): _add(linux_serial_to_windows(configured)) # 3. USB-serial fallbacks on Linux/WSL if _is_linux_like(): for pattern in ("/dev/ttyUSB*", "/dev/ttyACM*"): for p in sorted(glob.glob(pattern)): _add(p) return seen def resolve_port( configured: Optional[str], *, idn_substr: Optional[str] = None, params: Optional[SerialParams] = None, ) -> Optional[tuple[str, str]]: """Find a working PSU port and return ``(port, idn_response)``. Strategy: 1. Try every port from :func:`candidate_ports` (configured port + cross-platform translations + Linux USB-serial paths). 2. If none matched, do a full :func:`scan_ports` of every serial port on the host as a last resort. If ``idn_substr`` is set, only ports whose IDN contains it (case- insensitively) are accepted — this guards against picking up a different SCPI device that happens to be plugged in. If ``idn_substr`` is ``None``, the first responding port wins. Returns ``None`` if nothing responded. """ params = params or SerialParams() def _matches(idn: str) -> bool: if not idn: return False return idn_substr is None or idn_substr.lower() in idn.lower() # Phase 1: candidate list (cheap, targeted) for port in candidate_ports(configured): # Skip Linux device files that don't exist (avoids ENOENT noise) if port.startswith("/dev/") and not os.path.exists(port): continue idn = try_idn_on_port(port, params) if _matches(idn): return port, idn # Phase 2: scan everything pyserial knows about (broad fallback) for port, idn in scan_ports(params): if _matches(idn): return port, idn return None __all__ = [ "SerialParams", "OwonPSU", "scan_ports", "auto_detect", "try_idn_on_port", "windows_com_to_linux", "linux_serial_to_windows", "candidate_ports", "resolve_port", ]