Skip to content

Commit

Permalink
Use Python contexts to properly handle serial connection (#42)
Browse files Browse the repository at this point in the history
* add context manager for serial

* improve logger messages

* add some setters

* remove connect option

* different use of context manager

* update readme and some tests to new context manager

* ABCs

* fix tests

* moved some serial tests
  • Loading branch information
lobis authored Aug 21, 2023
1 parent 87bab16 commit d19900d
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 90 deletions.
45 changes: 22 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ import logging
# connection interface is common to all HVPS
# if no serial port is specified, the first available port will be used
# if no baudrate is specified, the default baudrate will be used
# if connect=False, the connection will not be established (useful for testing)
# if logging_level is specified, the logger will be configured accordingly
hvps = Caen(port="/dev/ttyUSB0", baudrate=115200, connect=True, logging_level=logging.DEBUG)

# connection settings can be accessed
print(f"port: {hvps.port}")
print(f"baudrate: {hvps.baudrate}")
with Caen(port="/dev/ttyUSB0", baudrate=115200, logging_level=logging.DEBUG) as hvps:
# using context manager (with) is recommended, but not required.
# If not used, the connection must be opened and closed manually (hvps.open() and hvps.close())
# connection settings can be accessed
print(f"port: {hvps.port}")
print(f"baudrate: {hvps.baudrate}")
```

### Module
Expand All @@ -72,36 +72,35 @@ print(f"baudrate: {hvps.baudrate}")
from hvps import Caen

# default connection settings
caen = Caen()

module = caen.module() # get the first module (module 0)
# if multiple modules are present, they can be accessed by index e.g. caen.module(1)
with Caen() as caen:
module = caen.module() # get the first module (module 0)
# if multiple modules are present, they can be accessed by index e.g. caen.module(1)

# get the module's name
print(f"module name: {module.name}")
# get the module's name
print(f"module name: {module.name}")
```

### Channel

```python
from hvps import Caen

caen = Caen()
module = caen.module(0)
with Caen() as caen:
module = caen.module(0)

print(f"number of channels: {module.number_of_channels}")
print(f"number of channels: {module.number_of_channels}")

channel = module.channel(2) # get channel number 2
channel = module.channel(2) # get channel number 2

# get monitoring parameters
print(f"vmon: {channel.vmon}")
print(f"vset: {channel.vset}")
# get monitoring parameters
print(f"vmon: {channel.vmon}")
print(f"vset: {channel.vset}")

# set values (remote mode must be enabled)
# turn on channel
channel.turn_on()
# set values (remote mode must be enabled)
# turn on channel
channel.turn_on()

channel.vset = 300.0 # 300 V
channel.vset = 300.0 # 300 V
```

## Disclaimer ⚠️
Expand Down
134 changes: 104 additions & 30 deletions src/hvps/devices/hvps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
import logging
import uuid
import threading
from abc import ABC, abstractmethod

from .module import Module


class Hvps:
class Hvps(ABC):
def __init__(
self,
baudrate: int = 115200,
port: str | None = None,
timeout: float | None = None,
connect: bool = True,
logging_level=logging.WARNING,
):
"""Initialize the HVPS (High-Voltage Power Supply) object.
Expand All @@ -25,7 +25,6 @@ def __init__(
baudrate (int, optional): The baud rate for serial communication. Defaults to 115200.
port (str | None, optional): The serial port to use. If None, it will try to detect one automatically. Defaults to None.
timeout (float | None, optional): The timeout for serial communication. Defaults to None.
connect (bool, optional): Whether to connect to the serial port during initialization. Defaults to True.
logging_level (int, optional): The logger level. Defaults to logger.WARNING.
"""
Expand All @@ -47,39 +46,89 @@ def __init__(

self._modules: Dict[int, Module] = {}

if port is None and connect:
self._logger.info("No port specified, trying to detect one")
ports = [port.device for port in list_ports.comports()]
if len(ports) == 0:
raise Exception("No ports available")
port = ports[0]

self._serial: serial.Serial = serial.Serial()
self._serial.port = port
self._logger.info(f"Using port {port}")

self._serial.baudrate = baudrate

if port is not None:
self._serial.port = port

if timeout is not None:
self._serial.timeout = timeout

def __del__(self):
"""Cleanup method to close the serial port when the HVPS object is deleted."""
self.close()

def connect(self):
"""
Open the serial port.
"""

self._logger.debug("Connecting to serial port")

if self.port is None:
self._logger.info("No port specified, trying to detect one")
ports = [port.device for port in list_ports.comports()]
if len(ports) >= 1:
self._serial.port = ports[0]
if len(ports) > 1:
self._logger.warning(
f"Multiple ports detected: {ports}, using the first one: {self._serial.port}"
)

self._logger.info(f"Using port {self._serial.port}")
self._logger.info(f"Using baud rate {self._serial.baudrate}")
self._serial.timeout = timeout
self._logger.debug(f"Using timeout {self._serial.timeout}")

if connect:
self._logger.debug("Opening serial port")
if not hasattr(self, "_serial"):
return
if self.port is None:
raise ValueError("No port specified")
if not self._serial.is_open:
self._serial.open()
self._logger.debug("Serial port opened")
else:
self._logger.debug("Serial port is already open")

def __del__(self):
"""Cleanup method to close the serial port when the HVPS object is deleted."""
if hasattr(self, "_serial"):
self._serial.close()
def open(self):
"""
Open the serial port. (Alias for connect).
"""

self.connect()

def disconnect(self):
"""
Disconnect from the serial port.
Close the serial port.
"""

self._logger.debug("Disconnecting from serial port")

if not hasattr(self, "_serial"):
return
if self._serial.is_open:
self._serial.close()
else:
self._logger.debug("Serial port is already closed")

def close(self):
"""
Close the serial port. (Alias for disconnect).
"""
self.disconnect()

def __enter__(self) -> Hvps:
"""
Context manager enter method.
"""
self.connect()
return self

def __exit__(self, exc_type, exc_value, traceback):
"""
Context manager exit method.
"""
self.disconnect()

@property
def connected(self) -> bool:
Expand All @@ -101,6 +150,16 @@ def port(self) -> str:
"""
return self._serial.port

@port.setter
def port(self, port: str):
"""
Set the serial port.
Args:
port (str): The serial port.
"""
self._serial.port = port

@property
def baudrate(self) -> int:
"""
Expand All @@ -111,6 +170,16 @@ def baudrate(self) -> int:
"""
return self._serial.baudrate

@baudrate.setter
def baudrate(self, baudrate: int):
"""
Set the baud rate.
Args:
baudrate (int): The baud rate.
"""
self._serial.baudrate = baudrate

@property
def timeout(self) -> float:
"""
Expand All @@ -121,6 +190,18 @@ def timeout(self) -> float:
"""
return self._serial.timeout

@timeout.setter
def timeout(self, timeout: float):
"""
Set the timeout.
Args:
timeout (float): The timeout.
"""
if timeout < 0:
raise ValueError("Timeout must be positive")
self._serial.timeout = timeout

@property
def serial(self):
"""
Expand Down Expand Up @@ -150,12 +231,6 @@ def set_logging_level(self, level: int):
"""
self._logger.setLevel(level)

def connect(self):
"""
Connect to the serial port.
"""
self._serial.open()

# modules

@property
Expand All @@ -168,6 +243,7 @@ def modules(self):
"""
return self._modules

@abstractmethod
def module(self, module: int = 0) -> Module:
"""Get the specified module.
Expand All @@ -180,6 +256,4 @@ def module(self, module: int = 0) -> Module:
Raises:
KeyError: If the module number is invalid.
"""
if module not in self._modules:
raise KeyError(f"Invalid module {module}")
return self._modules[module]
pass
7 changes: 7 additions & 0 deletions src/hvps/devices/iseg/iseg.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,10 @@ def __init__(self, *args, **kwargs):
)
for i in [0]
}

def module(self, module: int = 0) -> Module:
self._logger.debug(f"Getting module {module}")
if module in self._modules.keys():
return self._modules[module]
else:
raise ValueError(f"Module {module} does not exist")
19 changes: 2 additions & 17 deletions tests/test_caen_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,10 @@
import pytest


def test_caen_init(caplog):
caplog.set_level("DEBUG")

Caen(connect=False)

assert caplog.text == ""

caen = Caen(connect=False, logging_level="DEBUG")

assert caen.baudrate == 115200
assert "Using baud rate 115200" in caplog.text
assert "Using port " in caplog.text
assert "Using timeout " in caplog.text


def test_caen_module(caplog):
caplog.set_level("DEBUG")

caen = Caen(connect=False, logging_level="DEBUG")
caen = Caen(logging_level="DEBUG")

# for CAEN, modules are dynamically created
[caen.module(i) for i in range(0, 32)]
Expand All @@ -38,7 +23,7 @@ def test_caen_module(caplog):
def test_caen_channel(caplog):
caplog.set_level("DEBUG")

caen = Caen(connect=False, logging_level="DEBUG")
caen = Caen(logging_level="DEBUG")

module = caen.module()

Expand Down
21 changes: 19 additions & 2 deletions tests/test_caen_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,28 @@ def is_macos():
)


@serial_skip_decorator
def test_caen_init(caplog):
caplog.set_level("DEBUG")

with Caen(logging_level="DEBUG") as caen:
assert caen.baudrate == 115200
assert "Using baud rate 115200" in caplog.text
assert "Using port " in caplog.text
assert "Using timeout " in caplog.text


@serial_skip_decorator
def test_caen_module_monitor():
# no ports available
caen = Caen(
port=serial_port,
baudrate=serial_baud,
connect=True,
timeout=timeout,
logging_level=logging.DEBUG,
)
caen.connect()

print(
f"Serial port status: connected: {caen.connected}, port: {caen.port}, baudrate: {caen.baudrate}, timeout: {caen.timeout}"
)
Expand Down Expand Up @@ -90,16 +102,19 @@ def test_caen_module_monitor():
channels = module.channels
print(f"Channels: {channels}")

caen.disconnect()


@serial_skip_decorator
def test_caen_channel_serial():
caen = Caen(
port=serial_port,
baudrate=serial_baud,
connect=True,
timeout=timeout,
logging_level=logging.DEBUG,
)
caen.connect()

print(
f"Serial port status: connected: {caen.connected}, port: {caen.port}, baudrate: {caen.baudrate}, timeout: {caen.timeout}"
)
Expand Down Expand Up @@ -201,3 +216,5 @@ def test_caen_channel_serial():

stat = channel.stat
print(f"stat: {stat}")

caen.disconnect()
Loading

0 comments on commit d19900d

Please sign in to comment.