diff --git a/README.md b/README.md index 985f966..09bea86 100644 --- a/README.md +++ b/README.md @@ -57,13 +57,13 @@ import logging # connection interface is common to all HVPS # if no serial port is specified, the first available port will be used # if no baudrate is specified, the default baudrate will be used -# if connect=False, the connection will not be established (useful for testing) # if logging_level is specified, the logger will be configured accordingly -hvps = Caen(port="/dev/ttyUSB0", baudrate=115200, connect=True, logging_level=logging.DEBUG) - -# connection settings can be accessed -print(f"port: {hvps.port}") -print(f"baudrate: {hvps.baudrate}") +with Caen(port="/dev/ttyUSB0", baudrate=115200, logging_level=logging.DEBUG) as hvps: + # using context manager (with) is recommended, but not required. + # If not used, the connection must be opened and closed manually (hvps.open() and hvps.close()) + # connection settings can be accessed + print(f"port: {hvps.port}") + print(f"baudrate: {hvps.baudrate}") ``` ### Module @@ -72,13 +72,12 @@ print(f"baudrate: {hvps.baudrate}") from hvps import Caen # default connection settings -caen = Caen() - -module = caen.module() # get the first module (module 0) -# if multiple modules are present, they can be accessed by index e.g. caen.module(1) +with Caen() as caen: + module = caen.module() # get the first module (module 0) + # if multiple modules are present, they can be accessed by index e.g. caen.module(1) -# get the module's name -print(f"module name: {module.name}") + # get the module's name + print(f"module name: {module.name}") ``` ### Channel @@ -86,22 +85,22 @@ print(f"module name: {module.name}") ```python from hvps import Caen -caen = Caen() -module = caen.module(0) +with Caen() as caen: + module = caen.module(0) -print(f"number of channels: {module.number_of_channels}") + print(f"number of channels: {module.number_of_channels}") -channel = module.channel(2) # get channel number 2 + channel = module.channel(2) # get channel number 2 -# get monitoring parameters -print(f"vmon: {channel.vmon}") -print(f"vset: {channel.vset}") + # get monitoring parameters + print(f"vmon: {channel.vmon}") + print(f"vset: {channel.vset}") -# set values (remote mode must be enabled) -# turn on channel -channel.turn_on() + # set values (remote mode must be enabled) + # turn on channel + channel.turn_on() -channel.vset = 300.0 # 300 V + channel.vset = 300.0 # 300 V ``` ## Disclaimer ⚠️ diff --git a/src/hvps/devices/hvps.py b/src/hvps/devices/hvps.py index 2baddd3..d6eb769 100644 --- a/src/hvps/devices/hvps.py +++ b/src/hvps/devices/hvps.py @@ -6,17 +6,17 @@ import logging import uuid import threading +from abc import ABC, abstractmethod from .module import Module -class Hvps: +class Hvps(ABC): def __init__( self, baudrate: int = 115200, port: str | None = None, timeout: float | None = None, - connect: bool = True, logging_level=logging.WARNING, ): """Initialize the HVPS (High-Voltage Power Supply) object. @@ -25,7 +25,6 @@ def __init__( baudrate (int, optional): The baud rate for serial communication. Defaults to 115200. port (str | None, optional): The serial port to use. If None, it will try to detect one automatically. Defaults to None. timeout (float | None, optional): The timeout for serial communication. Defaults to None. - connect (bool, optional): Whether to connect to the serial port during initialization. Defaults to True. logging_level (int, optional): The logger level. Defaults to logger.WARNING. """ @@ -47,39 +46,89 @@ def __init__( self._modules: Dict[int, Module] = {} - if port is None and connect: - self._logger.info("No port specified, trying to detect one") - ports = [port.device for port in list_ports.comports()] - if len(ports) == 0: - raise Exception("No ports available") - port = ports[0] - self._serial: serial.Serial = serial.Serial() - self._serial.port = port - self._logger.info(f"Using port {port}") + self._serial.baudrate = baudrate + + if port is not None: + self._serial.port = port + + if timeout is not None: + self._serial.timeout = timeout + + def __del__(self): + """Cleanup method to close the serial port when the HVPS object is deleted.""" + self.close() + + def connect(self): + """ + Open the serial port. + """ + + self._logger.debug("Connecting to serial port") + + if self.port is None: + self._logger.info("No port specified, trying to detect one") + ports = [port.device for port in list_ports.comports()] + if len(ports) >= 1: + self._serial.port = ports[0] + if len(ports) > 1: + self._logger.warning( + f"Multiple ports detected: {ports}, using the first one: {self._serial.port}" + ) + + self._logger.info(f"Using port {self._serial.port}") self._logger.info(f"Using baud rate {self._serial.baudrate}") - self._serial.timeout = timeout self._logger.debug(f"Using timeout {self._serial.timeout}") - if connect: - self._logger.debug("Opening serial port") + if not hasattr(self, "_serial"): + return + if self.port is None: + raise ValueError("No port specified") + if not self._serial.is_open: self._serial.open() - self._logger.debug("Serial port opened") + else: + self._logger.debug("Serial port is already open") - def __del__(self): - """Cleanup method to close the serial port when the HVPS object is deleted.""" - if hasattr(self, "_serial"): - self._serial.close() + def open(self): + """ + Open the serial port. (Alias for connect). + """ + + self.connect() def disconnect(self): """ - Disconnect from the serial port. + Close the serial port. """ + + self._logger.debug("Disconnecting from serial port") + if not hasattr(self, "_serial"): return if self._serial.is_open: self._serial.close() + else: + self._logger.debug("Serial port is already closed") + + def close(self): + """ + Close the serial port. (Alias for disconnect). + """ + self.disconnect() + + def __enter__(self) -> Hvps: + """ + Context manager enter method. + """ + self.connect() + return self + + def __exit__(self, exc_type, exc_value, traceback): + """ + Context manager exit method. + """ + self.disconnect() @property def connected(self) -> bool: @@ -101,6 +150,16 @@ def port(self) -> str: """ return self._serial.port + @port.setter + def port(self, port: str): + """ + Set the serial port. + + Args: + port (str): The serial port. + """ + self._serial.port = port + @property def baudrate(self) -> int: """ @@ -111,6 +170,16 @@ def baudrate(self) -> int: """ return self._serial.baudrate + @baudrate.setter + def baudrate(self, baudrate: int): + """ + Set the baud rate. + + Args: + baudrate (int): The baud rate. + """ + self._serial.baudrate = baudrate + @property def timeout(self) -> float: """ @@ -121,6 +190,18 @@ def timeout(self) -> float: """ return self._serial.timeout + @timeout.setter + def timeout(self, timeout: float): + """ + Set the timeout. + + Args: + timeout (float): The timeout. + """ + if timeout < 0: + raise ValueError("Timeout must be positive") + self._serial.timeout = timeout + @property def serial(self): """ @@ -150,12 +231,6 @@ def set_logging_level(self, level: int): """ self._logger.setLevel(level) - def connect(self): - """ - Connect to the serial port. - """ - self._serial.open() - # modules @property @@ -168,6 +243,7 @@ def modules(self): """ return self._modules + @abstractmethod def module(self, module: int = 0) -> Module: """Get the specified module. @@ -180,6 +256,4 @@ def module(self, module: int = 0) -> Module: Raises: KeyError: If the module number is invalid. """ - if module not in self._modules: - raise KeyError(f"Invalid module {module}") - return self._modules[module] + pass diff --git a/src/hvps/devices/iseg/iseg.py b/src/hvps/devices/iseg/iseg.py index 57eb6eb..8ec5f74 100644 --- a/src/hvps/devices/iseg/iseg.py +++ b/src/hvps/devices/iseg/iseg.py @@ -27,3 +27,10 @@ def __init__(self, *args, **kwargs): ) for i in [0] } + + def module(self, module: int = 0) -> Module: + self._logger.debug(f"Getting module {module}") + if module in self._modules.keys(): + return self._modules[module] + else: + raise ValueError(f"Module {module} does not exist") diff --git a/tests/test_caen_devices.py b/tests/test_caen_devices.py index 31fc5ef..81e95a2 100644 --- a/tests/test_caen_devices.py +++ b/tests/test_caen_devices.py @@ -2,25 +2,10 @@ import pytest -def test_caen_init(caplog): - caplog.set_level("DEBUG") - - Caen(connect=False) - - assert caplog.text == "" - - caen = Caen(connect=False, logging_level="DEBUG") - - assert caen.baudrate == 115200 - assert "Using baud rate 115200" in caplog.text - assert "Using port " in caplog.text - assert "Using timeout " in caplog.text - - def test_caen_module(caplog): caplog.set_level("DEBUG") - caen = Caen(connect=False, logging_level="DEBUG") + caen = Caen(logging_level="DEBUG") # for CAEN, modules are dynamically created [caen.module(i) for i in range(0, 32)] @@ -38,7 +23,7 @@ def test_caen_module(caplog): def test_caen_channel(caplog): caplog.set_level("DEBUG") - caen = Caen(connect=False, logging_level="DEBUG") + caen = Caen(logging_level="DEBUG") module = caen.module() diff --git a/tests/test_caen_serial.py b/tests/test_caen_serial.py index 4886f47..50e4cb4 100644 --- a/tests/test_caen_serial.py +++ b/tests/test_caen_serial.py @@ -27,16 +27,28 @@ def is_macos(): ) +@serial_skip_decorator +def test_caen_init(caplog): + caplog.set_level("DEBUG") + + with Caen(logging_level="DEBUG") as caen: + assert caen.baudrate == 115200 + assert "Using baud rate 115200" in caplog.text + assert "Using port " in caplog.text + assert "Using timeout " in caplog.text + + @serial_skip_decorator def test_caen_module_monitor(): # no ports available caen = Caen( port=serial_port, baudrate=serial_baud, - connect=True, timeout=timeout, logging_level=logging.DEBUG, ) + caen.connect() + print( f"Serial port status: connected: {caen.connected}, port: {caen.port}, baudrate: {caen.baudrate}, timeout: {caen.timeout}" ) @@ -90,16 +102,19 @@ def test_caen_module_monitor(): channels = module.channels print(f"Channels: {channels}") + caen.disconnect() + @serial_skip_decorator def test_caen_channel_serial(): caen = Caen( port=serial_port, baudrate=serial_baud, - connect=True, timeout=timeout, logging_level=logging.DEBUG, ) + caen.connect() + print( f"Serial port status: connected: {caen.connected}, port: {caen.port}, baudrate: {caen.baudrate}, timeout: {caen.timeout}" ) @@ -201,3 +216,5 @@ def test_caen_channel_serial(): stat = channel.stat print(f"stat: {stat}") + + caen.disconnect() diff --git a/tests/test_iseg_devices.py b/tests/test_iseg_devices.py index c74d58a..4ecc269 100644 --- a/tests/test_iseg_devices.py +++ b/tests/test_iseg_devices.py @@ -2,23 +2,8 @@ import pytest -def test_iseg_init(caplog): - caplog.set_level("DEBUG") - - Iseg(connect=False) - - assert caplog.text == "" - - iseg = Iseg(connect=False, logging_level="DEBUG") - - assert iseg.baudrate == 115200 - assert "Using baud rate 115200" in caplog.text - assert "Using port " in caplog.text - assert "Using timeout " in caplog.text - - def test_iseg_module(caplog): - iseg = Iseg(connect=False) + iseg = Iseg() # for ISEG only one module exists iseg.module() @@ -35,7 +20,7 @@ def test_iseg_module(caplog): def test_iseg_channel(caplog): caplog.set_level("DEBUG") - iseg = Iseg(connect=False, logging_level="DEBUG") + iseg = Iseg(logging_level="DEBUG") module = iseg.module() diff --git a/tests/test_iseg_serial.py b/tests/test_iseg_serial.py index 388a46c..e9f2d39 100644 --- a/tests/test_iseg_serial.py +++ b/tests/test_iseg_serial.py @@ -26,6 +26,17 @@ def is_macos(): ) +@serial_skip_decorator +def test_iseg_init(caplog): + caplog.set_level("DEBUG") + + with Iseg(logging_level="DEBUG") as iseg: + assert iseg.baudrate == 115200 + assert "Using baud rate 115200" in caplog.text + assert "Using port " in caplog.text + assert "Using timeout " in caplog.text + + @serial_skip_decorator def test_iseg_module_monitor(): iseg = Iseg( @@ -35,6 +46,7 @@ def test_iseg_module_monitor(): timeout=timeout, logging_level=logging.DEBUG, ) + iseg.connect() print( f"Serial port status: connected: {iseg.connected}, port: {iseg.port}, baudrate: {iseg.baudrate}, timeout: {iseg.timeout}" @@ -172,12 +184,14 @@ def test_iseg_module_monitor(): module.reset_module_event_status() module.clear_module_event_status_bits(8) + iseg.disconnect() + @serial_skip_decorator def test_iseg_channel_monitor(): ser = serial.Serial(serial_port, serial_baud, timeout=timeout) - channel = Channel(ser, 0) + channel = Channel(ser, 0) # TODO: what is this 0? trip_action = channel.trip_action print(f"trip_action: {trip_action}")