194 lines
5.9 KiB
Python

"""Owon PSU SCPI control over raw serial (pyserial).
This module provides a small, programmatic API suitable for tests:
- OwonPSU: context-manageable controller class
- scan_ports(): find devices responding to *IDN?
- auto_detect(): select the first matching device by IDN substring
Behavior follows the working tryout example (serial):
- Both commands and queries are terminated with a newline ("\n" by default).
- Queries use readline() to fetch a single-line response.
- Command set uses: 'output 0/1', 'output?', 'SOUR:VOLT <V>', 'SOUR:CURR <A>', 'MEAS:VOLT?', 'MEAS:CURR?', '*IDN?'
"""
from __future__ import annotations
from dataclasses import dataclass
from time import sleep
from typing import Iterable, Optional
import serial
from serial import Serial
from serial.tools import list_ports
@dataclass
class SerialParams:
baudrate: int = 115200
timeout: float = 1.0 # seconds
bytesize: int = serial.EIGHTBITS
parity: str = serial.PARITY_NONE
stopbits: float = serial.STOPBITS_ONE
xonxoff: bool = False
rtscts: bool = False
dsrdtr: bool = False
write_timeout: float = 1.0 # seconds
class OwonPSU:
def __init__(self, port: str, params: SerialParams | None = None, eol: str = "\n") -> None:
self.port = port
self.params = params or SerialParams()
self.eol = eol
self._ser: Optional[Serial] = None
def open(self) -> None:
if self._ser and self._ser.is_open:
return
ser = Serial()
ser.port = self.port
ser.baudrate = self.params.baudrate
ser.bytesize = self.params.bytesize
ser.parity = self.params.parity
ser.stopbits = self.params.stopbits
ser.xonxoff = self.params.xonxoff
ser.rtscts = self.params.rtscts
ser.dsrdtr = self.params.dsrdtr
ser.timeout = self.params.timeout
ser.write_timeout = self.params.write_timeout
ser.open()
self._ser = ser
def close(self) -> None:
if self._ser and self._ser.is_open:
try:
self._ser.close()
finally:
self._ser = None
def __enter__(self) -> "OwonPSU":
self.open()
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.close()
@property
def is_open(self) -> bool:
return bool(self._ser and self._ser.is_open)
# ---- low-level ops ----
def write(self, cmd: str) -> None:
"""Write a SCPI command (append eol)."""
if not self._ser:
raise RuntimeError("Port is not open")
data = (cmd + self.eol).encode("ascii", errors="ignore")
self._ser.write(data)
self._ser.flush()
def query(self, q: str) -> str:
"""Send a query with terminator and return a single-line response using readline()."""
if not self._ser:
raise RuntimeError("Port is not open")
# clear buffers to avoid stale data
try:
self._ser.reset_input_buffer()
self._ser.reset_output_buffer()
except Exception:
pass
self._ser.write((q + self.eol).encode("ascii", errors="ignore"))
self._ser.flush()
line = self._ser.readline().strip()
return line.decode("ascii", errors="ignore")
# ---- high-level ops ----
def idn(self) -> str:
return self.query("*IDN?")
def set_voltage(self, channel: int, volts: float) -> None:
# Using SOUR:VOLT <V> per working example
self.write(f"SOUR:VOLT {volts:.3f}")
def set_current(self, channel: int, amps: float) -> None:
# Using SOUR:CURR <A> per working example
self.write(f"SOUR:CURR {amps:.3f}")
def set_output(self, on: bool) -> None:
# Using 'output 1/0' per working example
self.write("output 1" if on else "output 0")
def output_status(self) -> str:
return self.query("output?")
def measure_voltage(self) -> str:
return self.query("MEAS:VOLT?")
def measure_current(self) -> str:
return self.query("MEAS:CURR?")
# ------- discovery helpers -------
def try_idn_on_port(port: str, params: SerialParams) -> str:
dev: Optional[Serial] = None
try:
dev = Serial()
dev.port = port
dev.baudrate = params.baudrate
dev.bytesize = params.bytesize
dev.parity = params.parity
dev.stopbits = params.stopbits
dev.xonxoff = params.xonxoff
dev.rtscts = params.rtscts
dev.dsrdtr = params.dsrdtr
dev.timeout = params.timeout
dev.write_timeout = params.write_timeout
dev.open()
# Query with newline terminator and read a single line
dev.reset_input_buffer(); dev.reset_output_buffer()
dev.write(b"*IDN?\n"); dev.flush()
line = dev.readline().strip()
return line.decode("ascii", errors="ignore")
except Exception:
return ""
finally:
if dev and dev.is_open:
try:
dev.close()
except Exception:
pass
def scan_ports(params: SerialParams | None = None) -> list[tuple[str, str]]:
"""Return [(port, idn_response), ...] for ports that responded."""
params = params or SerialParams()
results: list[tuple[str, str]] = []
for p in list_ports.comports():
dev = p.device
resp = try_idn_on_port(dev, params)
if resp:
results.append((dev, resp))
return results
def auto_detect(params: SerialParams | None = None, idn_substr: str | None = None) -> Optional[str]:
"""Return the first port whose *IDN? contains idn_substr (case-insensitive), else first responder."""
params = params or SerialParams()
matches = scan_ports(params)
if not matches:
return None
if idn_substr:
isub = idn_substr.lower()
for port, idn in matches:
if isub in idn.lower():
return port
return matches[0][0]
__all__ = [
"SerialParams",
"OwonPSU",
"scan_ports",
"auto_detect",
]