From ac949d5710eb8b1af7ac804859bc73e4bded4c68 Mon Sep 17 00:00:00 2001 From: MatthieuDartiailh Date: Fri, 10 Jul 2020 22:07:22 -0400 Subject: [PATCH] pyvisa_py: rename main package and update type annotations --- pyvisa-py/common.py | 60 ---- {pyvisa-py => pyvisa_py}/__init__.py | 6 +- pyvisa_py/common.py | 30 ++ {pyvisa-py => pyvisa_py}/gpib.py | 66 +++-- {pyvisa-py => pyvisa_py}/highlevel.py | 279 +++++++++--------- .../protocols/__init__.py | 0 {pyvisa-py => pyvisa_py}/protocols/rpc.py | 0 {pyvisa-py => pyvisa_py}/protocols/usbraw.py | 0 {pyvisa-py => pyvisa_py}/protocols/usbtmc.py | 0 {pyvisa-py => pyvisa_py}/protocols/usbutil.py | 0 {pyvisa-py => pyvisa_py}/protocols/vxi11.py | 0 {pyvisa-py => pyvisa_py}/serial.py | 47 ++- {pyvisa-py => pyvisa_py}/sessions.py | 99 +++++-- {pyvisa-py => pyvisa_py}/tcpip.py | 55 ++-- .../testsuite/__init__.py | 0 .../testsuite/test_highlevel.py | 0 {pyvisa-py => pyvisa_py}/usb.py | 26 +- setup.cfg | 17 +- 18 files changed, 401 insertions(+), 284 deletions(-) delete mode 100644 pyvisa-py/common.py rename {pyvisa-py => pyvisa_py}/__init__.py (91%) create mode 100644 pyvisa_py/common.py rename {pyvisa-py => pyvisa_py}/gpib.py (94%) rename {pyvisa-py => pyvisa_py}/highlevel.py (73%) rename {pyvisa-py => pyvisa_py}/protocols/__init__.py (100%) rename {pyvisa-py => pyvisa_py}/protocols/rpc.py (100%) rename {pyvisa-py => pyvisa_py}/protocols/usbraw.py (100%) rename {pyvisa-py => pyvisa_py}/protocols/usbtmc.py (100%) rename {pyvisa-py => pyvisa_py}/protocols/usbutil.py (100%) rename {pyvisa-py => pyvisa_py}/protocols/vxi11.py (100%) rename {pyvisa-py => pyvisa_py}/serial.py (92%) rename {pyvisa-py => pyvisa_py}/sessions.py (91%) rename {pyvisa-py => pyvisa_py}/tcpip.py (93%) rename {pyvisa-py => pyvisa_py}/testsuite/__init__.py (100%) rename {pyvisa-py => pyvisa_py}/testsuite/test_highlevel.py (100%) rename {pyvisa-py => pyvisa_py}/usb.py (91%) diff --git a/pyvisa-py/common.py b/pyvisa-py/common.py deleted file mode 100644 index 3b9dff68..00000000 --- a/pyvisa-py/common.py +++ /dev/null @@ -1,60 +0,0 @@ -# -*- coding: utf-8 -*- -"""Common code. - -:copyright: 2014-2020 by PyVISA-sim Authors, see AUTHORS for more details. -:license: MIT, see LICENSE for more details. - -""" -import logging -from typing import Optional, SupportsBytes - -from pyvisa import logger - -logger = logging.LoggerAdapter(logger, {"backend": "py"}) - - -class MockInterface(object): - - #: Name of the resource used to create this interface - resource_name: str - - def __init__(self, resource_name) -> None: - self.resource_name = resource_name - - -class NamedObject(object): - """A class to construct named sentinels.""" - - #: Name used to identify the sentinel - name: str - - def __init__(self, name) -> None: - self.name = name - - def __repr__(self) -> str: - return "<%s>" % self.name - - __str__ = __repr__ - - -# XXX can probably be removed -def iter_bytes(data: SupportsBytes, mask: Optional[int] = None, send_end: bool = False): - if send_end and mask is None: - raise ValueError("send_end requires a valid mask.") - - if mask is None: - for d in data[:]: - yield bytes([d]) - - else: - for d in data[:-1]: - yield bytes([d & ~mask]) - - if send_end: - yield bytes([data[-1] | ~mask]) - else: - yield bytes([data[-1] & ~mask]) - - -int_to_byte = lambda val: bytes([val]) -last_int = lambda val: val[-1] diff --git a/pyvisa-py/__init__.py b/pyvisa_py/__init__.py similarity index 91% rename from pyvisa-py/__init__.py rename to pyvisa_py/__init__.py index 45453507..494053cc 100644 --- a/pyvisa-py/__init__.py +++ b/pyvisa_py/__init__.py @@ -6,9 +6,11 @@ :license: MIT, see LICENSE for more details. """ -try: +import sys + +if sys.version_info >= (3, 8): from importlib.metadata import PackageNotFoundError, version -except ImportError: +else: from importlib_metadata import PackageNotFoundError, version # type: ignore __version__ = "unknown" diff --git a/pyvisa_py/common.py b/pyvisa_py/common.py new file mode 100644 index 00000000..884ba63d --- /dev/null +++ b/pyvisa_py/common.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +"""Common code. + +:copyright: 2014-2020 by PyVISA-sim Authors, see AUTHORS for more details. +:license: MIT, see LICENSE for more details. + +""" +import logging + +from pyvisa import logger + +logger = logging.LoggerAdapter(logger, {"backend": "py"}) # type: ignore + + +class NamedObject(object): + """A class to construct named sentinels.""" + + #: Name used to identify the sentinel + name: str + + def __init__(self, name) -> None: + self.name = name + + def __repr__(self) -> str: + return "<%s>" % self.name + + __str__ = __repr__ + + +int_to_byte = lambda val: val.to_bytes(1, "big") diff --git a/pyvisa-py/gpib.py b/pyvisa_py/gpib.py similarity index 94% rename from pyvisa-py/gpib.py rename to pyvisa_py/gpib.py index 6c81891a..b6d85f37 100644 --- a/pyvisa-py/gpib.py +++ b/pyvisa_py/gpib.py @@ -8,18 +8,19 @@ """ import ctypes # Used for missing bindings not ideal from bisect import bisect -from typing import Any, Iterator, List, Optional, Tuple +from typing import Any, Iterator, List, Tuple, Union from pyvisa import attributes, constants, logger from pyvisa.constants import ResourceAttribute, StatusCode +from pyvisa.rname import GPIBInstr, GPIBIntfc from .sessions import Session, UnknownAttribute try: GPIB_CTYPES = True - from gpib_ctypes import gpib - from gpib_ctypes.Gpib import Gpib - from gpib_ctypes.gpib.gpib import _lib as gpib_lib + from gpib_ctypes import gpib # typing: ignore + from gpib_ctypes.Gpib import Gpib # typing: ignore + from gpib_ctypes.gpib.gpib import _lib as gpib_lib # typing: ignore # Add some extra binding not available by default extra_funcs = [ @@ -35,8 +36,8 @@ except ImportError: GPIB_CTYPES = False try: - import gpib - from Gpib import Gpib, GpibError + import gpib # typing: ignore + from Gpib import Gpib, GpibError # typing: ignore except ImportError as e: Session.register_unavailable( constants.InterfaceType.gpib, @@ -215,7 +216,7 @@ def convert_gpib_status(status: int) -> StatusCode: return StatusCode.success -class _GPIBCommon: +class _GPIBCommon(Session): """Common base class for GPIB sessions. Both INSTR and INTFC resources share the following attributes: @@ -236,6 +237,10 @@ class _GPIBCommon: """ + # Override parsed to take into account the fact that this class is only used + # for a specific kind of resource + parsed: Union[GPIBIntfc, GPIBInstr] + #: Bus wide controller. controller: Gpib @@ -255,7 +260,7 @@ def after_parsing(self) -> None: send_eoi = 1 eos_mode = 0 self.interface = None - if self.parsed.resource_class == "INSTR": + if isinstance(self.parsed, GPIBInstr): pad = int(self.parsed.primary_address) # Used to talk to a specific resource self.interface = Gpib( @@ -271,7 +276,7 @@ def after_parsing(self) -> None: # Force timeout setting to interface self.set_attribute( - constants.VI_ATTR_TMO_VALUE, + constants.ResourceAttribute.timeout_value, attributes.AttributesByID[constants.VI_ATTR_TMO_VALUE].default, ) @@ -279,7 +284,9 @@ def after_parsing(self) -> None: attribute = getattr(constants, "VI_ATTR_" + name) self.attrs[attribute] = attributes.AttributesByID[attribute].default - def _get_timeout(self, attribute: constants.ResourceAttribute) -> Optional[int]: + def _get_timeout( + self, attribute: constants.ResourceAttribute + ) -> Tuple[int, StatusCode]: if self.interface: # 0x3 is the hexadecimal reference to the IbaTMO (timeout) configuration # option in linux-gpib. @@ -291,9 +298,7 @@ def _get_timeout(self, attribute: constants.ResourceAttribute) -> Optional[int]: self.timeout = None return super(_GPIBCommon, self)._get_timeout(attribute) - def _set_timeout( - self, attribute: constants.ResourceAttribute, value: Optional[int] - ): + def _set_timeout(self, attribute: constants.ResourceAttribute, value: int): """Set the timeout value. linux-gpib only supports 18 discrete timeout values. If a timeout @@ -321,6 +326,8 @@ def _set_timeout( """ status = super(_GPIBCommon, self)._set_timeout(attribute, value) + # Inspect the result of setting the value to decide how to translate the result + # on the interface. if self.interface: if self.timeout is None: gpib_timeout = 0 @@ -331,12 +338,13 @@ def _set_timeout( self.interface.timeout(gpib_timeout) return status - def close(self) -> None: + def close(self) -> StatusCode: if self.interface: self.interface.close() self.controller.close() + return StatusCode.success - def read(self, count: int) -> bytes: + def read(self, count: int) -> Tuple[bytes, StatusCode]: """Reads data from device or interface synchronously. Corresponds to viRead function of the VISA library. @@ -414,7 +422,7 @@ def gpib_control_ren(self, mode: constants.RENLineOperation) -> StatusCode: Return value of the library call. """ - if self.parsed.interface_type == "INTFC": + if isinstance(self.parsed, GPIBIntfc): if mode not in ( constants.VI_GPIB_REN_ASSERT, constants.VI_GPIB_REN_DEASSERT, @@ -447,7 +455,10 @@ def gpib_control_ren(self, mode: constants.RENLineOperation) -> StatusCode: constants.VI_GPIB_REN_ASSERT_ADDRESS, ): ifc.remote_enable(1) - if mode == constants.VI_GPIB_REN_ASSERT_ADDRESS: + if ( + isinstance(self.parsed, GPIBInstr) + and mode == constants.VI_GPIB_REN_ASSERT_ADDRESS + ): # 0 for the secondary address means don't use it ifc.listener( self.parsed.primary_address, self.parsed.secondary_address @@ -605,9 +616,13 @@ def _set_attribute( # TODO: Check secondary addresses. @Session.register(constants.InterfaceType.gpib, "INSTR") -class GPIBSession(_GPIBCommon, Session): +class GPIBSession(_GPIBCommon): """A GPIB Session that uses linux-gpib to do the low level communication.""" + # Override parsed to take into account the fact that this class is only used + # for a specific kind of resource + parsed: GPIBInstr + @staticmethod def list_resources() -> List[str]: return ["GPIB%d::%d::INSTR" % (board, pad) for board, pad in _find_listeners()] @@ -757,9 +772,12 @@ def _set_attribute( @Session.register(constants.InterfaceType.gpib, "INTFC") -class GPIBInterface(_GPIBCommon, Session): - """A GPIB Interface that uses linux-gpib to do the low level communication. - """ +class GPIBInterface(_GPIBCommon): + """A GPIB Interface that uses linux-gpib to do the low level communication.""" + + # Override parsed to take into account the fact that this class is only used + # for a specific kind of resource + parsed: GPIBIntfc @staticmethod def list_resources() -> List[str]: @@ -787,7 +805,7 @@ def gpib_command(self, command_bytes: bytes) -> Tuple[int, StatusCode]: try: return self.controller.command(command_bytes), StatusCode.success except gpib.GpibError as e: - return convert_gpib_error(e, self.controller.ibsta(), "gpib command") + return 0, convert_gpib_error(e, self.controller.ibsta(), "gpib command") def gpib_send_ifc(self) -> StatusCode: """Pulse the interface clear line (IFC) for at least 100 microseconds. @@ -922,7 +940,7 @@ def _get_attribute(self, attribute: ResourceAttribute) -> Tuple[Any, StatusCode] # some versions of linux-gpib do not expose Gpib.lines() return constants.VI_STATE_UNKNOWN, StatusCode.success - return super(GPIBSession, self)._get_attribute(attribute) + return super()._get_attribute(attribute) def _set_attribute( self, attribute: ResourceAttribute, attribute_state: Any @@ -951,4 +969,4 @@ def _set_attribute( # INTFC don't have an interface so use the controller _ = self.controller - return super(GPIBSession, self)._set_attribute(attribute, attribute_state) + return super()._set_attribute(attribute, attribute_state) diff --git a/pyvisa-py/highlevel.py b/pyvisa_py/highlevel.py similarity index 73% rename from pyvisa-py/highlevel.py rename to pyvisa_py/highlevel.py index cf9bbc56..e56c445f 100644 --- a/pyvisa-py/highlevel.py +++ b/pyvisa_py/highlevel.py @@ -8,11 +8,12 @@ """ import random from collections import OrderedDict -from typing import Any, Dict, Iterable, Iterator, Optional, Tuple, Union +from typing import Any, Dict, Iterable, Optional, Tuple, Union, List, cast -from pyvisa import constants, errors, highlevel, rname +from pyvisa import constants, highlevel, rname from pyvisa.constants import StatusCode from pyvisa.typing import VISAEventContext, VISARMSession, VISASession +from pyvisa.util import LibraryPath from . import sessions from .common import logger @@ -36,6 +37,9 @@ class PyVisaLibrary(highlevel.VisaLibraryBase): """ + #: Live session object identified by a randon session ID + sessions: Dict[int, sessions.Session] + # Try to import packages implementing lower level functionality. try: from .serial import SerialSession @@ -65,42 +69,32 @@ class PyVisaLibrary(highlevel.VisaLibraryBase): except Exception as e: logger.debug("GPIBSession was not imported %s." % e) - @classmethod - def get_session_classes(cls) -> Dict[sessions.Session]: - return sessions.Session._session_classes - - @classmethod - def iter_session_classes_issues( - cls, - ) -> Iterator[Tuple[constants.InterfaceType, str], str]: - return sessions.Session.iter_session_classes_issues() - @staticmethod - def get_library_paths() -> Iterable[str]: + def get_library_paths() -> Iterable[LibraryPath]: """List a dummy library path to allow to create the library.""" - return ("py",) + return (LibraryPath("py"),) @staticmethod - def get_debug_info() -> Dict[str, Union[str, Dict[str, str]]]: + def get_debug_info() -> Dict[str, Union[str, List[str], Dict[str, str]]]: """Return a list of lines with backend info.""" from . import __version__ - d = OrderedDict() + d: OrderedDict[str, Union[str, List[str], Dict[str, str]]] = OrderedDict() d["Version"] = "%s" % __version__ - for key, val in PyVisaLibrary.get_session_classes().items(): + for key, val in sessions.Session.iter_valid_session_classes(): key_name = "%s %s" % (key[0].name.upper(), key[1]) - try: - d[key_name] = getattr(val, "session_issue").split("\n") - except AttributeError: - d[key_name] = "Available " + val.get_low_level_info() + d[key_name] = "Available " + val.get_low_level_info() + + for key, issue in sessions.Session.iter_session_classes_issues(): + key_name = "%s %s" % (key[0].name.upper(), key[1]) + d[key_name] = issue.split("\n") return d def _init(self) -> None: """Custom initialization code.""" - #: map session handle to session object. - #: dict[int, session.Session] + # Map session handle to session object. self.sessions = {} def _register(self, obj: object) -> VISASession: @@ -117,13 +111,12 @@ def _register(self, obj: object) -> VISASession: self.sessions[session] = obj return session - # noinspection PyShadowingBuiltins def open( self, session: VISARMSession, resource_name: str, access_mode: constants.AccessModes = constants.AccessModes.no_lock, - open_timeout: Optional[int] = constants.VI_TMO_IMMEDIATE, + open_timeout: int = constants.VI_TMO_IMMEDIATE, ) -> Tuple[VISASession, StatusCode]: """Opens a session to the specified resource. @@ -131,20 +124,21 @@ def open( Parameters ---------- - session : typing.VISARMSession + session : VISARMSession Resource Manager session (should always be a session returned from open_default_resource_manager()). resource_name : str Unique symbolic name of a resource. access_mode : constants.AccessModes, optional Specifies the mode by which the resource is to be accessed. - open_timeout : Optional[int] + open_timeout : int Specifies the maximum time period (in milliseconds) that this - operation waits before returning an error. + operation waits before returning an error. constants.VI_TMO_IMMEDIATE + and constants.VI_TMO_INFINITE are used as min and max. Returns ------- - typing.VISASession + VISASession Unique logical identifier reference to a session StatusCode Return value of the library call. @@ -161,7 +155,10 @@ def open( try: parsed = rname.parse_resource_name(resource_name) except rname.InvalidResourceName: - return 0, StatusCode.error_invalid_resource_name + return ( + VISASession(0), + self.handle_return_value(None, StatusCode.error_invalid_resource_name), + ) cls = sessions.Session.get_session_class( parsed.interface_type_const, parsed.resource_class @@ -190,12 +187,12 @@ def clear(self, session: VISASession) -> StatusCode: try: sess = self.sessions[session] except KeyError: - return constants.StatusCode.error_invalid_object - return sess.clear() + return self.handle_return_value(session, StatusCode.error_invalid_object) + return self.handle_return_value(session, sess.clear()) def flush( self, session: VISASession, mask: constants.BufferOperation - ) -> constants.StatusCode: + ) -> StatusCode: """Flush the specified buffers. The buffers can be associated with formatted I/O operations and/or @@ -214,19 +211,19 @@ def flush( Returns ------- - constants.StatusCode + StatusCode Return value of the library call. """ try: sess = self.sessions[session] except KeyError: - return constants.StatusCode.error_invalid_object - return sess.flush(mask) + return self.handle_return_value(session, StatusCode.error_invalid_object) + return self.handle_return_value(session, sess.flush(mask)) def gpib_command( self, session: VISASession, command_byte: bytes - ) -> Tuple[int, constants.StatusCode]: + ) -> Tuple[int, StatusCode]: """Write GPIB command bytes on the bus. Corresponds to viGpibCommand function of the VISA library. @@ -242,18 +239,19 @@ def gpib_command( ------- int Number of written bytes - constants.StatusCode + StatusCode Return value of the library call. """ try: - return self.sessions[session].gpib_command(command_byte) + written, st = self.sessions[session].gpib_command(command_byte) + return written, self.handle_return_value(session, st) except KeyError: - return constants.StatusCode.error_invalid_object + return 0, self.handle_return_value(session, StatusCode.error_invalid_object) def assert_trigger( self, session: VISASession, protocol: constants.TriggerProtocol - ) -> constants.StatusCode: + ) -> StatusCode: """Assert software or hardware trigger. Corresponds to viAssertTrigger function of the VISA library. @@ -267,16 +265,18 @@ def assert_trigger( Returns ------- - constants.StatusCode + StatusCode Return value of the library call. """ try: - return self.sessions[session].assert_trigger(protocol) + return self.handle_return_value( + session, self.sessions[session].assert_trigger(protocol) + ) except KeyError: - return constants.StatusCode.error_invalid_object + return self.handle_return_value(session, StatusCode.error_invalid_object) - def gpib_send_ifc(self, session: VISASession) -> constants.StatusCode: + def gpib_send_ifc(self, session: VISASession) -> StatusCode: """Pulse the interface clear line (IFC) for at least 100 microseconds. Corresponds to viGpibSendIFC function of the VISA library. @@ -288,19 +288,20 @@ def gpib_send_ifc(self, session: VISASession) -> constants.StatusCode: Returns ------- - constants.StatusCode + StatusCode Return value of the library call. """ try: - sess = self.sessions[session] + return self.handle_return_value( + session, self.sessions[session].gpib_send_ifc() + ) except KeyError: - return constants.StatusCode.error_invalid_object - return sess.gpib_send_ifc() + return self.handle_return_value(session, StatusCode.error_invalid_object) def gpib_control_ren( self, session: VISASession, mode: constants.RENLineOperation - ) -> constants.StatusCode: + ) -> StatusCode: """Controls the state of the GPIB Remote Enable (REN) interface line. Optionally the remote/local state of the device can also be set. @@ -316,19 +317,20 @@ def gpib_control_ren( Returns ------- - constants.StatusCode + StatusCode Return value of the library call. """ try: - sess = self.sessions[session] + return self.handle_return_value( + session, self.sessions[session].gpib_control_ren(mode) + ) except KeyError: - return constants.StatusCode.error_invalid_object - return sess.gpib_control_ren() + return self.handle_return_value(session, StatusCode.error_invalid_object) def gpib_control_atn( self, session: VISASession, mode: constants.ATNLineOperation - ) -> constants.StatusCode: + ) -> StatusCode: """Specifies the state of the ATN line and the local active controller state. Corresponds to viGpibControlATN function of the VISA library. @@ -342,19 +344,20 @@ def gpib_control_atn( Returns ------- - constants.StatusCode + StatusCode Return value of the library call. """ try: - sess = self.sessions[session] + return self.handle_return_value( + session, self.sessions[session].gpib_control_atn(mode) + ) except KeyError: - return constants.StatusCode.error_invalid_object - return sess.gpib_control_atn() + return self.handle_return_value(session, StatusCode.error_invalid_object) def gpib_pass_control( self, session: VISASession, primary_address: int, secondary_address: int - ) -> constants.StatusCode: + ) -> StatusCode: """Tell a GPIB device to become controller in charge (CIC). Corresponds to viGpibPassControl function of the VISA library. @@ -372,17 +375,21 @@ def gpib_pass_control( Returns ------- - constants.StatusCode + StatusCode Return value of the library call. """ try: - sess = self.sessions[session] + return self.handle_return_value( + session, + self.sessions[session].gpib_pass_control( + primary_address, secondary_address + ), + ) except KeyError: - return constants.StatusCode.error_invalid_object - return sess.gpib_pass_control() + return self.handle_return_value(session, StatusCode.error_invalid_object) - def read_stb(self, session: VISASession) -> Tuple[int, constants.StatusCode]: + def read_stb(self, session: VISASession) -> Tuple[int, StatusCode]: """Reads a status byte of the service request. Corresponds to viReadSTB function of the VISA library. @@ -396,19 +403,20 @@ def read_stb(self, session: VISASession) -> Tuple[int, constants.StatusCode]: ------- int Service request status byte - constants.StatusCode + StatusCode Return value of the library call. """ try: sess = self.sessions[session] except KeyError: - return 0, constants.StatusCode.error_invalid_object - return sess.read_stb() + return 0, self.handle_return_value(session, StatusCode.error_invalid_object) + stb, status_code = sess.read_stb() + return stb, self.handle_return_value(session, status_code) def close( self, session: Union[VISASession, VISAEventContext, VISARMSession] - ) -> constants.StatusCode: + ) -> StatusCode: """Closes the specified session, event, or find list. Corresponds to viClose function of the VISA library. @@ -420,20 +428,21 @@ def close( Returns ------- - constants.StatusCode + StatusCode Return value of the library call. """ try: sess = self.sessions[session] + # The RM session directly references the library. if sess is not self: - sess.close() + return self.handle_return_value(session, sess.close()) + else: + return self.handle_return_value(session, StatusCode.success) except KeyError: - return StatusCode.error_invalid_object + return self.handle_return_value(session, StatusCode.error_invalid_object) - def open_default_resource_manager( - self, - ) -> Tuple[VISARMSession, constants.StatusCode]: + def open_default_resource_manager(self,) -> Tuple[VISARMSession, StatusCode]: """This function returns a session to the Default Resource Manager resource. Corresponds to viOpenDefaultRM function of the VISA library. @@ -442,11 +451,14 @@ def open_default_resource_manager( ------- VISARMSession Unique logical identifier to a Default Resource Manager session - constants.StatusCode + StatusCode Return value of the library call. """ - return self._register(self), StatusCode.success + return ( + cast(VISARMSession, self._register(self)), + self.handle_return_value(None, StatusCode.success), + ) def list_resources( self, session: VISARMSession, query: str = "?*::INSTR" @@ -468,22 +480,23 @@ def list_resources( """ # For each session type, ask for the list of connected resources and # merge them into a single list. - - resources = sum( - [ - st.list_resources() - for key, st in sessions.Session.iter_valid_session_classes() - ], - [], + # HINT: the cast should not be necessary here + resources = cast( + List[str], + ( + sum( + [ + st.list_resources() + for key, st in sessions.Session.iter_valid_session_classes() + ] + ) + or [] + ), ) - resources = rname.filter(resources, query) - - return resources + return rname.filter(resources, query) - def read( - self, session: VISASession, count: int - ) -> Tuple[bytes, constants.StatusCode]: + def read(self, session: VISASession, count: int) -> Tuple[bytes, StatusCode]: """Reads data from device or interface synchronously. Corresponds to viRead function of the VISA library. @@ -499,24 +512,22 @@ def read( ------- bytes Date read - constants.StatusCode + StatusCode Return value of the library call. """ # from the session handle, dispatch to the read method of the session object. try: - ret = self.sessions[session].read(count) + data, status_code = self.sessions[session].read(count) except KeyError: - return 0, StatusCode.error_invalid_object - - if ret[1] < 0: - raise errors.VisaIOError(ret[1]) + return ( + b"", + self.handle_return_value(session, StatusCode.error_invalid_object), + ) - return ret + return data, self.handle_return_value(session, status_code) - def write( - self, session: VISASession, data: bytes - ) -> Tuple[int, constants.StatusCode]: + def write(self, session: VISASession, data: bytes) -> Tuple[int, StatusCode]: """Write data to device or interface synchronously. Corresponds to viWrite function of the VISA library. @@ -532,24 +543,19 @@ def write( ------- int Number of bytes actually transferred - constants.StatusCode + StatusCode Return value of the library call. """ # from the session handle, dispatch to the write method of the session object. try: - ret = self.sessions[session].write(data) + written, status_code = self.sessions[session].write(data) except KeyError: - return 0, StatusCode.error_invalid_object + return 0, self.handle_return_value(session, StatusCode.error_invalid_object) - if ret[1] < 0: - raise errors.VisaIOError(ret[1]) + return written, self.handle_return_value(session, status_code) - return ret - - def buffer_read( - self, session: VISASession, count: int - ) -> Tuple[bytes, constants.StatusCode]: + def buffer_read(self, session: VISASession, count: int) -> Tuple[bytes, StatusCode]: """Reads data through the use of a formatted I/O read buffer. The data can be read from a device or an interface. @@ -567,15 +573,13 @@ def buffer_read( ------- bytes Data read - constants.StatusCode + StatusCode Return value of the library call. """ return self.read(session, count) - def buffer_write( - self, session: VISASession, data: bytes - ) -> Tuple[int, constants.StatusCode]: + def buffer_write(self, session: VISASession, data: bytes) -> Tuple[int, StatusCode]: """Writes data to a formatted I/O write buffer synchronously. Corresponds to viBufWrite function of the VISA library. @@ -591,7 +595,7 @@ def buffer_write( ------- int number of written bytes - constants.StatusCode + StatusCode return value of the library call. """ @@ -601,7 +605,7 @@ def get_attribute( self, session: Union[VISASession, VISAEventContext, VISARMSession], attribute: Union[constants.ResourceAttribute, constants.EventAttribute], - ) -> Tuple[Any, constants.StatusCode]: + ) -> Tuple[Any, StatusCode]: """Retrieves the state of an attribute. Corresponds to viGetAttribute function of the VISA library. @@ -617,23 +621,29 @@ def get_attribute( ------- Any State of the queried attribute for a specified resource - constants.StatusCode + StatusCode Return value of the library call. """ try: sess = self.sessions[session] except KeyError: - return None, StatusCode.error_invalid_object + return ( + None, + self.handle_return_value(session, StatusCode.error_invalid_object), + ) - return sess.get_attribute(attribute) + state, status_code = sess.get_attribute( + cast(constants.ResourceAttribute, attribute) + ) + return state, self.handle_return_value(session, status_code) def set_attribute( self, session: VISASession, attribute: constants.ResourceAttribute, attribute_state: Any, - ) -> constants.StatusCode: + ) -> StatusCode: """Set the state of an attribute. Corresponds to viSetAttribute function of the VISA library. @@ -654,11 +664,12 @@ def set_attribute( """ try: - sess = self.sessions[session] + return self.handle_return_value( + session, + self.sessions[session].set_attribute(attribute, attribute_state), + ) except KeyError: - return StatusCode.error_invalid_object - - return sess.set_attribute(attribute, attribute_state) + return self.handle_return_value(session, StatusCode.error_invalid_object) def lock( self, @@ -666,7 +677,7 @@ def lock( lock_type: constants.Lock, timeout: int, requested_key: Optional[str] = None, - ) -> Tuple[str, constants.StatusCode]: + ) -> Tuple[str, StatusCode]: """Establishes an access mode to the specified resources. Corresponds to viLock function of the VISA library. @@ -686,7 +697,7 @@ def lock( Returns ------- - Optional[str] + str Key that can then be passed to other sessions to share the lock, or None for an exclusive lock. StatusCode @@ -696,11 +707,14 @@ def lock( try: sess = self.sessions[session] except KeyError: - return StatusCode.error_invalid_object - - return sess.lock(lock_type, timeout, requested_key) + return ( + "", + self.handle_return_value(session, StatusCode.error_invalid_object), + ) + key, status_code = sess.lock(lock_type, timeout, requested_key) + return key, self.handle_return_value(session, key) - def unlock(self, session: VISASession) -> constants.StatusCode: + def unlock(self, session: VISASession) -> StatusCode: """Relinquish a lock for the specified resource. Corresponds to viUnlock function of the VISA library. @@ -719,9 +733,8 @@ def unlock(self, session: VISASession) -> constants.StatusCode: try: sess = self.sessions[session] except KeyError: - return StatusCode.error_invalid_object - - return sess.unlock() + return self.handle_return_value(session, StatusCode.error_invalid_object) + return self.handle_return_value(session, sess.unlock()) def disable_event( self, diff --git a/pyvisa-py/protocols/__init__.py b/pyvisa_py/protocols/__init__.py similarity index 100% rename from pyvisa-py/protocols/__init__.py rename to pyvisa_py/protocols/__init__.py diff --git a/pyvisa-py/protocols/rpc.py b/pyvisa_py/protocols/rpc.py similarity index 100% rename from pyvisa-py/protocols/rpc.py rename to pyvisa_py/protocols/rpc.py diff --git a/pyvisa-py/protocols/usbraw.py b/pyvisa_py/protocols/usbraw.py similarity index 100% rename from pyvisa-py/protocols/usbraw.py rename to pyvisa_py/protocols/usbraw.py diff --git a/pyvisa-py/protocols/usbtmc.py b/pyvisa_py/protocols/usbtmc.py similarity index 100% rename from pyvisa-py/protocols/usbtmc.py rename to pyvisa_py/protocols/usbtmc.py diff --git a/pyvisa-py/protocols/usbutil.py b/pyvisa_py/protocols/usbutil.py similarity index 100% rename from pyvisa-py/protocols/usbutil.py rename to pyvisa_py/protocols/usbutil.py diff --git a/pyvisa-py/protocols/vxi11.py b/pyvisa_py/protocols/vxi11.py similarity index 100% rename from pyvisa-py/protocols/vxi11.py rename to pyvisa_py/protocols/vxi11.py diff --git a/pyvisa-py/serial.py b/pyvisa_py/serial.py similarity index 92% rename from pyvisa-py/serial.py rename to pyvisa_py/serial.py index 1408d49d..2f38f213 100644 --- a/pyvisa-py/serial.py +++ b/pyvisa_py/serial.py @@ -6,9 +6,9 @@ :license: MIT, see LICENSE for more details. """ -from typing import Any, List, Tuple +from typing import Any, List, Optional, Tuple -from pyvisa import attributes, constants, logger +from pyvisa import attributes, constants, logger, rname from pyvisa.constants import ( BufferOperation, ResourceAttribute, @@ -31,6 +31,24 @@ raise +def iter_bytes(data: bytes, mask: Optional[int] = None, send_end: bool = False): + if send_end and mask is None: + raise ValueError("send_end requires a valid mask.") + + if mask is None: + for d in data: + yield bytes([d]) + + else: + for d in data[:-1]: + yield bytes([d & ~mask]) + + if send_end: + yield bytes([data[-1] | ~mask]) + else: + yield bytes([data[-1] & ~mask]) + + def to_state(boolean_input: bool) -> constants.LineState: """Convert a boolean input into a LineState value.""" if boolean_input: @@ -42,6 +60,10 @@ def to_state(boolean_input: bool) -> constants.LineState: class SerialSession(Session): """A serial Session that uses PySerial to do the low level communication.""" + # Override parsed to take into account the fact that this class is only used + # for a specific kind of resource + parsed: rname.ASRLInstr + @staticmethod def list_resources() -> List[str]: return ["ASRL%s::INSTR" % port[0] for port in comports()] @@ -85,8 +107,9 @@ def _set_timeout(self, attribute: ResourceAttribute, value: int) -> StatusCode: self.interface.write_timeout = self.timeout return status - def close(self) -> None: + def close(self) -> StatusCode: self.interface.close() + return StatusCode.success def read(self, count: int) -> Tuple[bytes, StatusCode]: """Reads data from device or interface synchronously. @@ -116,12 +139,12 @@ def read(self, count: int) -> Tuple[bytes, StatusCode]: elif end_in == SerialTermination.last_bit: mask = 2 ** self.interface.bytesize - checker = lambda current: bool(common.last_int(current) & mask) + checker = lambda current: bool(current[-1] & mask) elif end_in == SerialTermination.termination_char: end_char, _ = self.get_attribute(ResourceAttribute.termchar) - checker = lambda current: common.last_int(current) == end_char + checker = lambda current: current[-1] == end_char else: raise ValueError("Unknown value for VI_ATTR_ASRL_END_IN: %s" % end_in) @@ -160,19 +183,17 @@ def write(self, data: bytes) -> Tuple[int, StatusCode]: send_end, _ = self.get_attribute(ResourceAttribute.send_end_enabled) try: - # We need to wrap data in common.iter_bytes to Provide Python 2 and 3 compatibility - if end_out in (SerialTermination.none, SerialTermination.termination_break): - data = common.iter_bytes(data) + data = data elif end_out == SerialTermination.last_bit: - last_bit, _ = self.get_attribute(constants.VI_ATTR_ASRL_DATA_BITS) + last_bit, _ = self.get_attribute(ResourceAttribute.asrl_data_bits) mask = 1 << (last_bit - 1) - data = common.iter_bytes(data, mask, send_end) + data = iter_bytes(data, mask, send_end) elif end_out == SerialTermination.termination_char: - term_char, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR) - data = common.iter_bytes(data + common.int_to_byte(term_char)) + term_char, _ = self.get_attribute(ResourceAttribute.termchar) + data = data + common.int_to_byte(term_char) else: raise ValueError("Unknown value for VI_ATTR_ASRL_END_OUT: %s" % end_out) @@ -384,7 +405,7 @@ def _set_attribute( raise NotImplementedError elif attribute == constants.VI_ATTR_ASRL_DSR_STATE: - return to_state(self.interface.getDSR()) + raise NotImplementedError elif attribute == constants.VI_ATTR_ASRL_DTR_STATE: raise NotImplementedError diff --git a/pyvisa-py/sessions.py b/pyvisa_py/sessions.py similarity index 91% rename from pyvisa-py/sessions.py rename to pyvisa_py/sessions.py index 71733a7e..5a357b4b 100644 --- a/pyvisa-py/sessions.py +++ b/pyvisa_py/sessions.py @@ -8,14 +8,16 @@ """ import abc import time -from typing import Any, Callable, Dict, Iterator, Optional, Tuple, Type, TypeVar +from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Type, TypeVar from pyvisa import attributes, constants, logger, rname from pyvisa.constants import ResourceAttribute, StatusCode from pyvisa.typing import VISARMSession +from .common import int_to_byte + #: Type var used when typing register. -T = TypeVar("T", bound="Session") +T = TypeVar("T", bound=Type["Session"]) class UnknownAttribute(Exception): @@ -101,7 +103,7 @@ def _set_attribute( """ @abc.abstractmethod - def close(self) -> None: + def close(self) -> StatusCode: """Close the session. Use it to do final clean ups. @@ -118,10 +120,10 @@ def close(self) -> None: parsed: rname.ResourceName #: Session type as (Interface Type, Resource Class) - session_type: Optional[Tuple[constants.InterfaceType, str]] = None + session_type: Tuple[constants.InterfaceType, str] #: Timeout in seconds to use when opening the resource. - open_timeout: float + open_timeout: Optional[float] #: Value of the timeout in seconds used for general operation timeout: Optional[float] @@ -140,6 +142,11 @@ def close(self) -> None: Tuple[constants.InterfaceType, str], Type["Session"] ] = dict() + @staticmethod + def list_resources() -> List[str]: + """List the resources available for the resource class.""" + return [] + @classmethod def get_low_level_info(cls) -> str: """Get info about the backend used by the session.""" @@ -148,7 +155,7 @@ def get_low_level_info(cls) -> str: @classmethod def iter_valid_session_classes( cls, - ) -> Iterator[Tuple[constants.InterfaceType, str], Type["Session"]]: + ) -> Iterator[Tuple[Tuple[constants.InterfaceType, str], Type["Session"]]]: """Iterator over valid sessions classes infos.""" for key, val in cls._session_classes.items(): if issubclass(val, Session): @@ -157,7 +164,7 @@ def iter_valid_session_classes( @classmethod def iter_session_classes_issues( cls, - ) -> Iterator[Tuple[constants.InterfaceType, str], str]: + ) -> Iterator[Tuple[Tuple[constants.InterfaceType, str], str]]: """Iterator over invalid sessions classes (i.e. those with import errors).""" for key, val in cls._session_classes.items(): try: @@ -228,7 +235,7 @@ def _internal(python_class): @classmethod def register_unavailable( cls, interface_type: constants.InterfaceType, resource_class: str, msg: str - ) -> Type[object]: + ) -> None: """Register that no session class exists. This creates a fake session that will raise a ValueError if called. @@ -245,12 +252,12 @@ def register_unavailable( Returns ------- - Type[object] + Type[Session] Fake session. """ - class _internal(object): + class _internal(Session): #: Message detailing why no session is available. session_issue: str = msg @@ -258,6 +265,15 @@ class _internal(object): def __init__(self, *args, **kwargs) -> None: raise ValueError(msg) + def _get_attribute(self, attr): + raise NotImplementedError() + + def _set_attribute(self, attr, value): + raise NotImplementedError() + + def close(self): + raise NotImplementedError() + if (interface_type, resource_class) in cls._session_classes: logger.warning( "%s is already registered in the ResourceManager. " @@ -272,7 +288,7 @@ def __init__( resource_manager_session: VISARMSession, resource_name: str, parsed: Optional[rname.ResourceName] = None, - open_timeout: Optional[int] = None, + open_timeout: Optional[float] = None, ) -> None: if parsed is None: parsed = rname.parse_resource_name(resource_name) @@ -344,6 +360,46 @@ def after_parsing(self) -> None: """ pass + def write(self, data: bytes) -> Tuple[int, StatusCode]: + """Writes data to device or interface synchronously. + + Corresponds to viWrite function of the VISA library. + + Parameters + ---------- + data : bytes + Data to be written. + + Returns + ------- + int + Number of bytes actually transferred + StatusCode + Return value of the library call. + + """ + raise NotImplementedError + + def read(self, count: int) -> Tuple[bytes, StatusCode]: + """Reads data from device or interface synchronously. + + Corresponds to viRead function of the VISA library. + + Parameters + ----------- + count : int + Number of bytes to be read. + + Returns + ------- + bytes + Data read from the device + StatusCode + Return value of the library call. + + """ + raise NotImplementedError() + def clear(self) -> StatusCode: """Clears a device. @@ -687,7 +743,7 @@ def _read( count: int, end_indicator_checker: Callable[[bytes], bool], suppress_end_en: bool, - termination_char: str, + termination_char: Optional[int], termination_char_en: bool, timeout_exception: Type[Exception], ) -> Tuple[bytes, StatusCode]: @@ -705,7 +761,7 @@ def _read( Function to check if the message is complete. suppress_end_en : bool Suppress end. - termination_char : str + termination_char : int Stop reading if this character is received. termination_char_en : bool Is termination char enabled. @@ -725,11 +781,10 @@ def _read( # termination character is in the middle of the block or that the # maximum number of bytes is exceeded - # Make sure termination_char is a string - try: - termination_char = chr(termination_char) - except TypeError: - pass + # Turn the termination_char store as an int in VISA attribute in a byte + term_char = ( + int_to_byte(termination_char) if termination_char is not None else b"" + ) finish_time = None if self.timeout is None else (time.time() + self.timeout) out = bytearray() @@ -747,12 +802,12 @@ def _read( # RULE 6.1.1 return bytes(out), StatusCode.success else: - if termination_char_en and termination_char in current: + if termination_char_en and (term_char in current): # RULE 6.1.2 - # Return everything upto and including the termination + # Return everything up to and including the termination # character return ( - bytes(out[: out.index(termination_char) + 1]), + bytes(out[: out.index(term_char) + 1]), StatusCode.success_termination_character_read, ) elif len(out) >= count: @@ -781,7 +836,7 @@ def _get_timeout(self, attribute: ResourceAttribute) -> Tuple[int, StatusCode]: ret_value = int(self.timeout * 1000.0) return ret_value, StatusCode.success - def _set_timeout(self, attribute: ResourceAttribute, value: Optional[int]): + def _set_timeout(self, attribute: ResourceAttribute, value: int): """ Sets timeout calculated value from python way to VI_ way In VISA, the timeout is expressed in milliseconds or using the diff --git a/pyvisa-py/tcpip.py b/pyvisa_py/tcpip.py similarity index 93% rename from pyvisa-py/tcpip.py rename to pyvisa_py/tcpip.py index 7db2ed20..c9735d85 100644 --- a/pyvisa-py/tcpip.py +++ b/pyvisa_py/tcpip.py @@ -12,7 +12,7 @@ import time from typing import Any, List, Optional, Tuple -from pyvisa import attributes, constants, errors +from pyvisa import attributes, constants, errors, rname from pyvisa.constants import ResourceAttribute, StatusCode from . import common @@ -55,6 +55,10 @@ class TCPIPInstrSession(Session): #: ID of the link used for VXI-11 communication link: int + # Override parsed to take into account the fact that this class is only used + # for a specific kind of resource + parsed: rname.TCPIPInstr + @staticmethod def list_resources() -> List[str]: # TODO: is there a way to get this? @@ -87,16 +91,18 @@ def after_parsing(self) -> None: attribute = getattr(constants, "VI_ATTR_" + name) self.attrs[attribute] = attributes.AttributesByID[attribute].default - def close(self) -> None: + def close(self) -> StatusCode: try: self.interface.destroy_link(self.link) except (errors.VisaIOError, socket.error, rpc.RPCError) as e: print("Error closing VISA link: {}".format(e)) self.interface.close() - self.link = None + self.link = 0 self.interface = None + return StatusCode.success + def read(self, count: int) -> Tuple[bytes, StatusCode]: """Reads data from device or interface synchronously. @@ -120,8 +126,8 @@ def read(self, count: int) -> Tuple[bytes, StatusCode]: else: chunk_length = self.max_recv_size - if self.get_attribute(constants.VI_ATTR_TERMCHAR_EN)[0]: - term_char, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR) + if self.get_attribute(ResourceAttribute.termchar_enabled)[0]: + term_char, _ = self.get_attribute(ResourceAttribute.termchar) flags = vxi11.OP_FLAG_TERMCHAR_SET else: term_char = flags = 0 @@ -178,7 +184,7 @@ def write(self, data: bytes) -> Tuple[int, StatusCode]: Return value of the library call. """ - send_end, _ = self.get_attribute(constants.VI_ATTR_SEND_END_EN) + send_end, _ = self.get_attribute(ResourceAttribute.send_end_enabled) chunk_size = 1024 try: @@ -381,9 +387,7 @@ def unlock(self) -> constants.StatusCode: return VXI11_ERRORS_TO_VISA[error] - def _set_timeout( - self, attribute: ResourceAttribute, value: Optional[int] - ) -> StatusCode: + def _set_timeout(self, attribute: ResourceAttribute, value: int) -> StatusCode: """ Sets timeout calculated value from python way to VI_ way """ @@ -417,6 +421,10 @@ class TCPIPSocketSession(Session): #: Maximum size of a chunk of data in bytes. max_recv_size: int + # Override parsed to take into account the fact that this class is only used + # for a specific kind of resource + parsed: rname.TCPIPSocket + @staticmethod def list_resources() -> List[str]: # TODO: is there a way to get this? @@ -435,35 +443,35 @@ def after_parsing(self) -> None: # termination char self._pending_buffer = bytearray() - self.attrs[constants.VI_ATTR_TCPIP_ADDR] = self.parsed.host_address - self.attrs[constants.VI_ATTR_TCPIP_PORT] = self.parsed.port - self.attrs[constants.VI_ATTR_INTF_NUM] = self.parsed.board - self.attrs[constants.VI_ATTR_TCPIP_NODELAY] = ( + self.attrs[ResourceAttribute.tcpip_address] = self.parsed.host_address + self.attrs[ResourceAttribute.tcpip_port] = self.parsed.port + self.attrs[ResourceAttribute.interface_number] = self.parsed.board + self.attrs[ResourceAttribute.tcpip_nodelay] = ( self._get_tcpip_nodelay, self._set_attribute, ) - self.attrs[constants.VI_ATTR_TCPIP_HOSTNAME] = "" - self.attrs[constants.VI_ATTR_TCPIP_KEEPALIVE] = ( + self.attrs[ResourceAttribute.tcpip_hostname] = "" + self.attrs[ResourceAttribute.tcpip_keepalive] = ( self._get_tcpip_keepalive, self._set_tcpip_keepalive, ) # to use default as ni visa driver (NI-VISA 15.0) - self.attrs[constants.VI_ATTR_SUPPRESS_END_EN] = True + self.attrs[ResourceAttribute.suppress_end_enabled] = True for name in ("TERMCHAR", "TERMCHAR_EN"): attribute = getattr(constants, "VI_ATTR_" + name) self.attrs[attribute] = attributes.AttributesByID[attribute].default - def _connect(self) -> None: + def _connect(self) -> StatusCode: timeout = self.open_timeout / 1000.0 if self.open_timeout else 10.0 try: self.interface = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.interface.setblocking(0) + self.interface.setblocking(False) self.interface.connect_ex((self.parsed.host_address, int(self.parsed.port))) except Exception as e: raise Exception("could not connect: {0}".format(str(e))) finally: - self.interface.setblocking(1) + self.interface.setblocking(True) # minimum is in interval 100 - 500ms based on timeout min_select_timeout = max(min(timeout / 10.0, 0.5), 0.1) @@ -488,9 +496,10 @@ def _connect(self) -> None: # min_select_timeout select_timout = max(select_timout / 2.0, min_select_timeout) - def close(self) -> None: + def close(self) -> StatusCode: self.interface.close() self.interface = None + return StatusCode.success def read(self, count: int) -> Tuple[bytes, StatusCode]: """Reads data from device or interface synchronously. @@ -515,10 +524,10 @@ def read(self, count: int) -> Tuple[bytes, StatusCode]: else: chunk_length = self.max_recv_size - term_char, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR) + term_char, _ = self.get_attribute(ResourceAttribute.termchar) term_byte = common.int_to_byte(term_char) if term_char else b"" - term_char_en, _ = self.get_attribute(constants.VI_ATTR_TERMCHAR_EN) - suppress_end_en, _ = self.get_attribute(constants.VI_ATTR_SUPPRESS_END_EN) + term_char_en, _ = self.get_attribute(ResourceAttribute.termchar_enabled) + suppress_end_en, _ = self.get_attribute(ResourceAttribute.suppress_end_enabled) read_fun = self.interface.recv diff --git a/pyvisa-py/testsuite/__init__.py b/pyvisa_py/testsuite/__init__.py similarity index 100% rename from pyvisa-py/testsuite/__init__.py rename to pyvisa_py/testsuite/__init__.py diff --git a/pyvisa-py/testsuite/test_highlevel.py b/pyvisa_py/testsuite/test_highlevel.py similarity index 100% rename from pyvisa-py/testsuite/test_highlevel.py rename to pyvisa_py/testsuite/test_highlevel.py diff --git a/pyvisa-py/usb.py b/pyvisa_py/usb.py similarity index 91% rename from pyvisa-py/usb.py rename to pyvisa_py/usb.py index 976ac625..20fc00c4 100644 --- a/pyvisa-py/usb.py +++ b/pyvisa_py/usb.py @@ -7,10 +7,11 @@ """ import errno -from typing import Any, List, Tuple +from typing import Any, List, Tuple, Union, Type from pyvisa import attributes, constants from pyvisa.constants import ResourceAttribute, StatusCode +from pyvisa.rname import USBInstr, USBRaw from .common import logger from .sessions import Session, UnknownAttribute @@ -47,13 +48,18 @@ class USBTimeoutException(Exception): class USBSession(Session): - """Base class for drivers that communicate with usb devices - via usb port using pyUSB - """ + """Base class for drivers working with usb devices via usb port using pyUSB.""" + + # Override parsed to take into account the fact that this class is only used + # for a specific kind of resource + parsed: Union[USBInstr, USBRaw] + + #: Class to use when instantiating the interface + _intf_cls: Union[Type[usbraw.USBRawDevice], Type[usbtmc.USBTMC]] @staticmethod def list_resources() -> List[str]: - """Return list of resources for this type of USB device""" + """Return list of resources for this type of USB device.""" raise NotImplementedError @classmethod @@ -86,7 +92,7 @@ def after_parsing(self) -> None: attribute = constants.VI_ATTR_TMO_VALUE self.set_attribute(attribute, attributes.AttributesByID[attribute].default) - def _get_timeout(self, attribute: ResourceAttribute) -> int: + def _get_timeout(self, attribute: ResourceAttribute) -> Tuple[int, StatusCode]: if self.interface: if self.interface.timeout == 2 ** 32 - 1: self.timeout = None @@ -226,6 +232,10 @@ def _set_attribute( class USBInstrSession(USBSession): """Class for USBTMC devices.""" + # Override parsed to take into account the fact that this class is only used + # for a specific kind of resource + parsed: USBInstr + #: Class to use when instantiating the interface _intf_cls = usbtmc.USBTMC @@ -281,6 +291,10 @@ def list_resources() -> List[str]: class USBRawSession(USBSession): """Class for RAW devices.""" + # Override parsed to take into account the fact that this class is only used + # for a specific kind of resource + parsed: USBRaw + #: Class to use when instantiating the interface _intf_cls = usbraw.USBRawDevice diff --git a/setup.cfg b/setup.cfg index 9cbd528f..1031a0f1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -71,7 +71,22 @@ per-file-ignores = pyvisa-py/serial.py:C901 [mypy] -follow_imports = ignore +follow_imports = normal + +[mypy-usb.*] +ignore_missing_imports = True + +[mypy-serial.*] +ignore_missing_imports = True + +[mypy-gpib.*] +ignore_missing_imports = True + +[mypy-Gpib.*] +ignore_missing_imports = True + +[mypy-gpib_ctypes.*] +ignore_missing_imports = True [isort] multi_line_output = 3