194 lines
5.9 KiB
Python
194 lines
5.9 KiB
Python
"""Owon PSU SCPI control over raw serial (pyserial).
|
|
|
|
This module provides a small, programmatic API suitable for tests:
|
|
|
|
- OwonPSU: context-manageable controller class
|
|
- scan_ports(): find devices responding to *IDN?
|
|
- auto_detect(): select the first matching device by IDN substring
|
|
|
|
Behavior follows the working quick demo example (serial):
|
|
- Both commands and queries are terminated with a newline ("\n" by default).
|
|
- Queries use readline() to fetch a single-line response.
|
|
- Command set uses: 'output 0/1', 'output?', 'SOUR:VOLT <V>', 'SOUR:CURR <A>', 'MEAS:VOLT?', 'MEAS:CURR?', '*IDN?'
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from time import sleep
|
|
from typing import Iterable, Optional
|
|
|
|
import serial
|
|
from serial import Serial
|
|
from serial.tools import list_ports
|
|
|
|
|
|
@dataclass
|
|
class SerialParams:
|
|
baudrate: int = 115200
|
|
timeout: float = 1.0 # seconds
|
|
bytesize: int = serial.EIGHTBITS
|
|
parity: str = serial.PARITY_NONE
|
|
stopbits: float = serial.STOPBITS_ONE
|
|
xonxoff: bool = False
|
|
rtscts: bool = False
|
|
dsrdtr: bool = False
|
|
write_timeout: float = 1.0 # seconds
|
|
|
|
|
|
class OwonPSU:
|
|
def __init__(self, port: str, params: SerialParams | None = None, eol: str = "\n") -> None:
|
|
self.port = port
|
|
self.params = params or SerialParams()
|
|
self.eol = eol
|
|
self._ser: Optional[Serial] = None
|
|
|
|
def open(self) -> None:
|
|
if self._ser and self._ser.is_open:
|
|
return
|
|
ser = Serial()
|
|
ser.port = self.port
|
|
ser.baudrate = self.params.baudrate
|
|
ser.bytesize = self.params.bytesize
|
|
ser.parity = self.params.parity
|
|
ser.stopbits = self.params.stopbits
|
|
ser.xonxoff = self.params.xonxoff
|
|
ser.rtscts = self.params.rtscts
|
|
ser.dsrdtr = self.params.dsrdtr
|
|
ser.timeout = self.params.timeout
|
|
ser.write_timeout = self.params.write_timeout
|
|
ser.open()
|
|
self._ser = ser
|
|
|
|
def close(self) -> None:
|
|
if self._ser and self._ser.is_open:
|
|
try:
|
|
self._ser.close()
|
|
finally:
|
|
self._ser = None
|
|
|
|
def __enter__(self) -> "OwonPSU":
|
|
self.open()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb) -> None:
|
|
self.close()
|
|
|
|
@property
|
|
def is_open(self) -> bool:
|
|
return bool(self._ser and self._ser.is_open)
|
|
|
|
# ---- low-level ops ----
|
|
def write(self, cmd: str) -> None:
|
|
"""Write a SCPI command (append eol)."""
|
|
if not self._ser:
|
|
raise RuntimeError("Port is not open")
|
|
data = (cmd + self.eol).encode("ascii", errors="ignore")
|
|
self._ser.write(data)
|
|
self._ser.flush()
|
|
|
|
def query(self, q: str) -> str:
|
|
"""Send a query with terminator and return a single-line response using readline()."""
|
|
if not self._ser:
|
|
raise RuntimeError("Port is not open")
|
|
# clear buffers to avoid stale data
|
|
try:
|
|
self._ser.reset_input_buffer()
|
|
self._ser.reset_output_buffer()
|
|
except Exception:
|
|
pass
|
|
self._ser.write((q + self.eol).encode("ascii", errors="ignore"))
|
|
self._ser.flush()
|
|
line = self._ser.readline().strip()
|
|
return line.decode("ascii", errors="ignore")
|
|
|
|
# ---- high-level ops ----
|
|
def idn(self) -> str:
|
|
return self.query("*IDN?")
|
|
|
|
def set_voltage(self, channel: int, volts: float) -> None:
|
|
# Using SOUR:VOLT <V> per working example
|
|
self.write(f"SOUR:VOLT {volts:.3f}")
|
|
|
|
def set_current(self, channel: int, amps: float) -> None:
|
|
# Using SOUR:CURR <A> per working example
|
|
self.write(f"SOUR:CURR {amps:.3f}")
|
|
|
|
def set_output(self, on: bool) -> None:
|
|
# Using 'output 1/0' per working example
|
|
self.write("output 1" if on else "output 0")
|
|
|
|
def output_status(self) -> str:
|
|
return self.query("output?")
|
|
|
|
def measure_voltage(self) -> str:
|
|
return self.query("MEAS:VOLT?")
|
|
|
|
def measure_current(self) -> str:
|
|
return self.query("MEAS:CURR?")
|
|
|
|
|
|
# ------- discovery helpers -------
|
|
|
|
def try_idn_on_port(port: str, params: SerialParams) -> str:
|
|
dev: Optional[Serial] = None
|
|
try:
|
|
dev = Serial()
|
|
dev.port = port
|
|
dev.baudrate = params.baudrate
|
|
dev.bytesize = params.bytesize
|
|
dev.parity = params.parity
|
|
dev.stopbits = params.stopbits
|
|
dev.xonxoff = params.xonxoff
|
|
dev.rtscts = params.rtscts
|
|
dev.dsrdtr = params.dsrdtr
|
|
dev.timeout = params.timeout
|
|
dev.write_timeout = params.write_timeout
|
|
dev.open()
|
|
# Query with newline terminator and read a single line
|
|
dev.reset_input_buffer(); dev.reset_output_buffer()
|
|
dev.write(b"*IDN?\n"); dev.flush()
|
|
line = dev.readline().strip()
|
|
return line.decode("ascii", errors="ignore")
|
|
except Exception:
|
|
return ""
|
|
finally:
|
|
if dev and dev.is_open:
|
|
try:
|
|
dev.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def scan_ports(params: SerialParams | None = None) -> list[tuple[str, str]]:
|
|
"""Return [(port, idn_response), ...] for ports that responded."""
|
|
params = params or SerialParams()
|
|
results: list[tuple[str, str]] = []
|
|
for p in list_ports.comports():
|
|
dev = p.device
|
|
resp = try_idn_on_port(dev, params)
|
|
if resp:
|
|
results.append((dev, resp))
|
|
return results
|
|
|
|
|
|
def auto_detect(params: SerialParams | None = None, idn_substr: str | None = None) -> Optional[str]:
|
|
"""Return the first port whose *IDN? contains idn_substr (case-insensitive), else first responder."""
|
|
params = params or SerialParams()
|
|
matches = scan_ports(params)
|
|
if not matches:
|
|
return None
|
|
if idn_substr:
|
|
isub = idn_substr.lower()
|
|
for port, idn in matches:
|
|
if isub in idn.lower():
|
|
return port
|
|
return matches[0][0]
|
|
|
|
|
|
__all__ = [
|
|
"SerialParams",
|
|
"OwonPSU",
|
|
"scan_ports",
|
|
"auto_detect",
|
|
]
|