"""Owon PSU SCPI control over raw serial (pyserial). This module provides a small, programmatic API suitable for tests: - OwonPSU: context-manageable controller class - scan_ports(): find devices responding to *IDN? - auto_detect(): select the first matching device by IDN substring Behavior follows the working quick demo example (serial): - Both commands and queries are terminated with a newline ("\n" by default). - Queries use readline() to fetch a single-line response. - Command set uses: 'output 0/1', 'output?', 'SOUR:VOLT ', 'SOUR:CURR ', 'MEAS:VOLT?', 'MEAS:CURR?', '*IDN?' """ from __future__ import annotations from dataclasses import dataclass from time import sleep from typing import Iterable, Optional import serial from serial import Serial from serial.tools import list_ports @dataclass class SerialParams: baudrate: int = 115200 timeout: float = 1.0 # seconds bytesize: int = serial.EIGHTBITS parity: str = serial.PARITY_NONE stopbits: float = serial.STOPBITS_ONE xonxoff: bool = False rtscts: bool = False dsrdtr: bool = False write_timeout: float = 1.0 # seconds class OwonPSU: def __init__(self, port: str, params: SerialParams | None = None, eol: str = "\n") -> None: self.port = port self.params = params or SerialParams() self.eol = eol self._ser: Optional[Serial] = None def open(self) -> None: if self._ser and self._ser.is_open: return ser = Serial() ser.port = self.port ser.baudrate = self.params.baudrate ser.bytesize = self.params.bytesize ser.parity = self.params.parity ser.stopbits = self.params.stopbits ser.xonxoff = self.params.xonxoff ser.rtscts = self.params.rtscts ser.dsrdtr = self.params.dsrdtr ser.timeout = self.params.timeout ser.write_timeout = self.params.write_timeout ser.open() self._ser = ser def close(self) -> None: if self._ser and self._ser.is_open: try: self._ser.close() finally: self._ser = None def __enter__(self) -> "OwonPSU": self.open() return self def __exit__(self, exc_type, exc, tb) -> None: self.close() @property def is_open(self) -> bool: return bool(self._ser and self._ser.is_open) # ---- low-level ops ---- def write(self, cmd: str) -> None: """Write a SCPI command (append eol).""" if not self._ser: raise RuntimeError("Port is not open") data = (cmd + self.eol).encode("ascii", errors="ignore") self._ser.write(data) self._ser.flush() def query(self, q: str) -> str: """Send a query with terminator and return a single-line response using readline().""" if not self._ser: raise RuntimeError("Port is not open") # clear buffers to avoid stale data try: self._ser.reset_input_buffer() self._ser.reset_output_buffer() except Exception: pass self._ser.write((q + self.eol).encode("ascii", errors="ignore")) self._ser.flush() line = self._ser.readline().strip() return line.decode("ascii", errors="ignore") # ---- high-level ops ---- def idn(self) -> str: return self.query("*IDN?") def set_voltage(self, channel: int, volts: float) -> None: # Using SOUR:VOLT per working example self.write(f"SOUR:VOLT {volts:.3f}") def set_current(self, channel: int, amps: float) -> None: # Using SOUR:CURR per working example self.write(f"SOUR:CURR {amps:.3f}") def set_output(self, on: bool) -> None: # Using 'output 1/0' per working example self.write("output 1" if on else "output 0") def output_status(self) -> str: return self.query("output?") def measure_voltage(self) -> str: return self.query("MEAS:VOLT?") def measure_current(self) -> str: return self.query("MEAS:CURR?") # ------- discovery helpers ------- def try_idn_on_port(port: str, params: SerialParams) -> str: dev: Optional[Serial] = None try: dev = Serial() dev.port = port dev.baudrate = params.baudrate dev.bytesize = params.bytesize dev.parity = params.parity dev.stopbits = params.stopbits dev.xonxoff = params.xonxoff dev.rtscts = params.rtscts dev.dsrdtr = params.dsrdtr dev.timeout = params.timeout dev.write_timeout = params.write_timeout dev.open() # Query with newline terminator and read a single line dev.reset_input_buffer(); dev.reset_output_buffer() dev.write(b"*IDN?\n"); dev.flush() line = dev.readline().strip() return line.decode("ascii", errors="ignore") except Exception: return "" finally: if dev and dev.is_open: try: dev.close() except Exception: pass def scan_ports(params: SerialParams | None = None) -> list[tuple[str, str]]: """Return [(port, idn_response), ...] for ports that responded.""" params = params or SerialParams() results: list[tuple[str, str]] = [] for p in list_ports.comports(): dev = p.device resp = try_idn_on_port(dev, params) if resp: results.append((dev, resp)) return results def auto_detect(params: SerialParams | None = None, idn_substr: str | None = None) -> Optional[str]: """Return the first port whose *IDN? contains idn_substr (case-insensitive), else first responder.""" params = params or SerialParams() matches = scan_ports(params) if not matches: return None if idn_substr: isub = idn_substr.lower() for port, idn in matches: if isub in idn.lower(): return port return matches[0][0] __all__ = [ "SerialParams", "OwonPSU", "scan_ports", "auto_detect", ]