From 9fceea84349d218754726641bd2733752cd82e06 Mon Sep 17 00:00:00 2001 From: DCEM Date: Wed, 27 Nov 2024 13:16:49 +0100 Subject: [PATCH 1/2] add HP 4194A and 4395A --- .../drivers/Agilent/Agilent_4395A.py | 6 + .../drivers/HP/HP_4194A.py | 684 ++++++++++++ .../drivers/HP/HP_4395A.py | 974 ++++++++++++++++++ .../drivers/HP/private/bit_name_mapper.py | 106 ++ .../HP/private/interdependent_parameter.py | 174 ++++ .../HP/private/reset_value_parameter.py | 59 ++ 6 files changed, 2003 insertions(+) create mode 100644 src/qcodes_contrib_drivers/drivers/Agilent/Agilent_4395A.py create mode 100644 src/qcodes_contrib_drivers/drivers/HP/HP_4194A.py create mode 100644 src/qcodes_contrib_drivers/drivers/HP/HP_4395A.py create mode 100644 src/qcodes_contrib_drivers/drivers/HP/private/bit_name_mapper.py create mode 100644 src/qcodes_contrib_drivers/drivers/HP/private/interdependent_parameter.py create mode 100644 src/qcodes_contrib_drivers/drivers/HP/private/reset_value_parameter.py diff --git a/src/qcodes_contrib_drivers/drivers/Agilent/Agilent_4395A.py b/src/qcodes_contrib_drivers/drivers/Agilent/Agilent_4395A.py new file mode 100644 index 000000000..8ddc593ff --- /dev/null +++ b/src/qcodes_contrib_drivers/drivers/Agilent/Agilent_4395A.py @@ -0,0 +1,6 @@ +from ..HP.HP_4395A import HP4395A + +class Agilent4395A(HP4395A): + """ + "Alias provided to help locate the instrument under its newer company branding." + """ diff --git a/src/qcodes_contrib_drivers/drivers/HP/HP_4194A.py b/src/qcodes_contrib_drivers/drivers/HP/HP_4194A.py new file mode 100644 index 000000000..a0d8a7d42 --- /dev/null +++ b/src/qcodes_contrib_drivers/drivers/HP/HP_4194A.py @@ -0,0 +1,684 @@ +import warnings +from typing import Any, Union, Optional, Callable +from typing_extensions import Literal + +# from qcodes import ParameterWithSetpoints, VisaInstrument +from qcodes.instrument import ParameterWithSetpoints, VisaInstrument +from qcodes.instrument.parameter import Parameter # , invert_val_mapping +from qcodes.utils.helpers import create_on_off_val_mapping +from qcodes.utils.validators import Enum, Ints, Numbers, Arrays # , Lists, +import numpy +import os + +from private.bit_name_mapper import BitNameMapper +from private.interdependent_parameter import interdependent_parameter_factory +from private.reset_value_parameter import reset_value_parameter_factory + +ResetValueParameter = reset_value_parameter_factory(Parameter) +InterdependentParameter = interdependent_parameter_factory(ResetValueParameter) + +class HP4194A(VisaInstrument): + """ + This is the QCoDeS python driver for the HP 4194A + """ + + def __init__( + self, + name: str, + address: str, + terminator: str = '\r\n', + # terminator: str = None, + timeout: int = 10, + # timeout: int = 100000, + **kwargs: Any + ) -> None: + """ + QCoDeS driver for the HP 4194A. + + Args: + name (str): Name of the instrument. + address (str): Address of the instrument. + terminator (str): Terminator character of + the string reply. Optional, default `'\\n'` + timeout (int): VISA timeout is set purposely + to a long time to allow long spectrum measurement. + Optional, default 100000 + """ + super().__init__( + name=name, + address=address, + terminator=terminator, + timeout=timeout, + **kwargs + ) + + self._map_STB = BitNameMapper({ + 0: 'Measurement complete', + 1: 'Sweep complete', ### + 3: 'End status', + 4: 'Ignore trigger', + 5: 'Error (Hardware trips)', ### + 6: 'RQS', + }) + + self.add_parameter( + 'transfer_format', + parameter_class=ResetValueParameter, + value_after_reset='ASCII', + set_cmd='FMT{}', + docstring='Sets the format to transfer the trace data via GPIB.' + ' (No query)', + val_mapping={ + 'ASCII': '1', + '64-bit le': '2', + '32-bit le': '3', # p260 + }, + # snapshot_value=False + ) + + self.add_parameter( + 'status_mask', + parameter_class=ResetValueParameter, + value_after_reset=[], + set_cmd='RQS{}', + # get_cmd=self.status_mask.cache.get(), + set_parser=lambda x: self._map_STB.bitnames_to_value(x), + get_parser=lambda x: self._map_STB.value_to_bitnames(x), + docstring='Mask the status byte' + '\n' + self._map_STB.docstring([6]), + ) + + self.add_parameter( # 1-a: Function + 'analyzer_mode', + parameter_class=InterdependentParameter, + value_after_reset='Impedance', + set_cmd='FNC{}', + docstring='Selects the analyzer mode.', + val_mapping={'Impedance': '1', + 'Gain-Phase': '2', + 'Impedance with Z Probe': '3'}, + ) + + self.add_parameter( # 1-a: Function + 'measure_impedance', + parameter_class=InterdependentParameter, + value_after_reset='|Z|-theta', + set_cmd='IMP{}', + docstring='Selects the measurement function for Impedance.', + val_mapping={ + '|Z|-theta': '1', + 'R-X': '2', + 'Ls-Rs': '3', + 'Ls-Q': '4', + 'Cs-Rs': '5', + 'Cs-Q': '6', + 'CS-D': '7', + '|X|-theta': '8', + 'G-B': '9', + 'Lp-G': '10', + 'Lp-Q': '11', + 'Cp-G': '12', + 'Cp-Q': '13', + 'Cp-D': '14', + '|Z|-La': '15', + '|Z|-Ca': '16', + '|Z|-Lp': '17', + '|Z|-Cp': '18', + 'Lp-Rp': '19', + 'Cp-Rp': '20', + }, + update_method=self._sweep_unit_update, + ) + + self.add_parameter( # 1-a: Function + 'measure_gain_phase', + parameter_class=InterdependentParameter, + value_after_reset='Tch/Rch(dB)-theta', + set_cmd='GPP{}', + docstring='Selects the measurement function for Gain-Phase.', + val_mapping={ + 'Tch/Rch(dB)-theta': '1', + 'Tch/Rch-theta': '2', + 'Tch/Rch(dB)-tau': '3', + 'Rch-Tch(V)': '4', + 'Rch-Tch(dBm)': '5', + 'Rch-Tch(dBV)': '6', + }, + update_method=self._sweep_unit_update, + ) + + self.add_parameter( # 7: Mesurement Unit + 'power_splitter_mode', + parameter_class=ResetValueParameter, + value_after_reset='Dual', + set_cmd='PWS{}', + docstring='Selects the power splitter mode.', + val_mapping={ + 'Dual': '1', + 'Single': '2', + } + ) + + self.add_parameter( # 1-b: Sweep + 'sweep_type', + parameter_class=InterdependentParameter, + value_after_reset='Linear', + set_cmd='SWT{}', + docstring='Selects the sweep type.', + val_mapping={ + 'Linear': '1', + 'Log': '2', + } + ) + + self.add_parameter( # 1-b: Sweep + 'sweep_parameter_mode', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode', 'sweep_type'], + update_method=self._sweep_parameter_mode_update, + value_after_reset='Frequency', + set_cmd='SWP{}', + docstring='Selects the mode of the sweep parameter.', + val_mapping={ + 'Frequency': '1', + 'DC Bias': '2', # (Impedance measurement Only) + 'Osc level(V)': '3', + 'Osc level(dBm)': '4', # (Linear sweep Only) + 'Osc level(dBV)': '5', # (Linear sweep only) + } + ) + + self.add_parameter( # 1-b: Sweep + 'sweep_direction', + parameter_class=InterdependentParameter, + value_after_reset='Up', + set_cmd='SWD{}', + docstring='Selects the sweep direction.', + val_mapping={ + 'Up': '1', + 'Down': '2', + } + ) + + self.add_parameter( # 1-b: Sweep + 'sweep_mode', + parameter_class=InterdependentParameter, + value_after_reset='Repeat', + set_cmd='SWM{}', + docstring='Selects the sweep mode.', + val_mapping={ + 'Repeat': '1', + 'Single': '2', + 'Manual': '3', + } + ) + + self.add_parameter( # 1-d: Display + 'display_mode', + parameter_class=InterdependentParameter, + value_after_reset='X-A&B', + set_cmd='DSP{}', + docstring='Selects the display mode.', + val_mapping={ + 'X-A&B': '1', + 'A-B': '2', + 'Table': '3', + } + ) + + self.add_parameter( # 1-d: Display + 'display_data_A', + parameter_class=InterdependentParameter, + value_after_reset=True, + set_cmd='DPA{}', + docstring='Display data A on/off. Effective for X-A&B mode.', + val_mapping=create_on_off_val_mapping( + on_val='1', off_val='0') + ) + + self.add_parameter( # 1-d: Display + 'display_data_B', + parameter_class=InterdependentParameter, + value_after_reset=True, + set_cmd='DPB{}', + docstring='Display data B on/off. Effective for X-A&B mode.', + val_mapping=create_on_off_val_mapping( + on_val='1', off_val='0') + ) + + self.add_parameter( # 1-d: Display + 'display_scale_A', + parameter_class=InterdependentParameter, + value_after_reset='Linear', + set_cmd='ASC{}', + docstring='Scale display data A to Linear/Log. Effective for X-A&B mode.', + val_mapping={ + 'Linear': '1', + 'Log': '2', + } + ) + + self.add_parameter( # 1-d: Display + 'display_scale_B', + parameter_class=InterdependentParameter, + value_after_reset='Linear', + set_cmd='BSC{}', + docstring='Scale display data B to Linear/Log. Effective for X-A&B mode.', + val_mapping={ + 'Linear': '1', + 'Log': '2', + } + ) + + self.add_parameter( # 1-d: Display + 'display_autoscale', + parameter_class=Parameter, + set_cmd='AUTO{}', + docstring='Autoscale display to A/B. Effective for X-A&B mode.', + vals=Enum('A', 'B'), + ) + + self.add_parameter( # 6: Parameter + 'n_points', + parameter_class=Parameter, + set_cmd='NOP={}', + get_cmd='NOP?', + get_parser=lambda x: int(x), + docstring='Number of measurement points.', + vals=Ints(2, 401), + ) + + self.add_parameter( # 6: Parameter + 'start', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode', 'sweep_parameter_mode', 'sweep_type'], + update_method=lambda: self._sweep_start_stop_update('start'), + set_cmd='START={}', + set_parser=lambda x: round(x, 3), + get_cmd='START?', + get_parser=lambda x: float(x), + docstring='Sets the start value of the sweep parameters.', + ) + + self.add_parameter( # 6: Parameter + 'stop', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode', 'sweep_parameter_mode', 'sweep_type'], + update_method=lambda: self._sweep_start_stop_update('stop'), + set_cmd='STOP={}', + set_parser=lambda x: round(x, 3), + get_cmd='STOP?', + get_parser=lambda x: float(x), + docstring='Sets the stop value of the sweep parameters.', + ) + + + # + # self.add_parameter( # 6: Parameter + # 'osc', + # parameter_class=Parameter, + # # dependent_on=['analyzer_mode', 'sweep_parameter_mode', 'start', 'stop'], + # # update_method=self._oscillator_level_update(), + # set_cmd='OSC={}', + # set_parser=lambda x: round(x, 2), + # get_cmd='OSC?', + # get_parser=lambda x: float(x), + # docstring='Sets the value of the oscillator level.', + # ) + + self.add_parameter( # 6: Parameter + 'oscillator_level', + parameter_class=InterdependentParameter, + # dependent_on=['analyzer_mode', 'sweep_parameter_mode', 'start', 'stop'], + # update_method=self._oscillator_level_update(), + # set_cmd='OSC={}', + get_cmd='OSC?', + get_parser=lambda x: float(x), + docstring='Sets the value of the oscillator level.', + ) + + self.add_parameter( # 6: Parameter + 'oscillator_level_unit', + parameter_class=Parameter, + set_cmd=self._oscillator_level_unit_update, + docstring='Sets the unit of the oscillator level.', + vals=Enum('dBm', 'V'), + ) + + self.add_parameter( # 5: Averaging + 'averaging_factor', + parameter_class=Parameter, + vals=Enum(1, 2, 4, 8, 16, 32, 64, 128, 256), + set_cmd='NOA={}', + get_cmd='NOA?', + get_parser=lambda x: int(x), + docstring='Sets the averaging factor.', + ) + + self.add_parameter( # 4: Integ Time + 'integration_time', + parameter_class=ResetValueParameter, + value_after_reset=0.5, + set_cmd='ITM{}', + unit='ms', + docstring='Sets the stop value of the sweep parameters.', + vals=Enum(0.5, 5, 100), + val_mapping={ + 0.5: '1', + 5: '2', + 100: '3', + } + ) + + self.add_parameter( # 1-d: Display + 'sweep_parameter', + parameter_class=Parameter, + get_cmd=lambda: numpy.linspace( + self.start.cache.get(), + self.stop.cache.get(), + self.n_points.cache.get()), + # vals=Arrays(shape=(self.n_points,)), + vals=Arrays(shape=(self.n_points.cache.get,)), + snapshot_value=False, + ) + + self.add_parameter( # 1-d: Display + 'sweep_trace_A', + parameter_class=ParameterWithSetpoints, + setpoints=(self.sweep_parameter,), + get_cmd='A?', + get_parser=lambda x: numpy.array( + [float(v) for v in x.split(",")]), + docstring='Register for display data A', + # vals=Arrays(shape=(self.n_points,)), + vals=Arrays(shape=(self.n_points.cache.get,)), + snapshot_value=False, + ) + + self.add_parameter( # 1-d: Display + 'sweep_trace_B', + parameter_class=ParameterWithSetpoints, + setpoints=(self.sweep_parameter,), + get_cmd='B?', + get_parser=lambda x: numpy.array( + [float(v) for v in x.split(",")]), + docstring='Register for display data A', + # vals=Arrays(shape=(self.n_points,)), + vals=Arrays(shape=(self.n_points.cache.get,)), + snapshot_value=False, + ) + + + +################################################# +################################################# +################################################# + model = self.IDN()['model'] + knownmodels = [ + 'HP4194A IMPEDANCE/GAIN-PHASE_ANALYZER OPT350', + 'HP4194A IMPEDANCE/GAIN-PHASE_ANALYZER OPT350 (Simulated)' + ] + if model not in knownmodels: + raise ValueError(f"'{model}' is an unknown model.") + + self.connect_message() + self.reset() + #self.status_clear() + if '(Simulated)' not in model: + self.snapshot(True) + + def _sweep_parameter_mode_update(self) -> None: + vals = {'Frequency': '1', } + + if 'Impedance' in self.analyzer_mode.cache(): + vals.update({'DC Bias': '2', }) + + vals.update({'Osc level(V)': '3', }) + + if self.sweep_type.cache() == 'Linear': + vals.update({ + 'Osc level(dBm)': '4', + 'Osc level(dBV)': '5', + }) + + self.sweep_parameter_mode.vals = Enum(*vals) + + if self.sweep_parameter_mode() == 'Frequency': + self.sweep_parameter.unit = 'Hz' + else: + raise NotImplementedError + + def _oscillator_level_unit_update(self, osc_unit: str) -> None: + if self.analyzer_mode.cache() == 'Gain-Phase': + if osc_unit == 'V': + self._oscillator_level_update(use_V=True) + elif osc_unit == 'dBm': + self._oscillator_level_update(use_V=False) + else: + raise ValueError + else: + raise NotImplementedError + + self.oscillator_level(self.oscillator_level.vals._min_value) + + def _oscillator_level_update(self, use_V = False) -> None: + start = self.start.cache.get() + stop = self.stop.cache.get() + if self.sweep_parameter_mode.cache() == 'Frequency': + if self.analyzer_mode.cache() == 'Impedance': + set_cmd_tail = 'V' + unit = 'V' + set_parser = lambda x: round(x, 2) + if (start > 10000e3) or (stop > 10000e3): + vals = Numbers(0.01, 0.5) + else: + vals = Numbers(0.01, 1.0) + elif ((self.analyzer_mode.cache() == 'Gain-Phase') + or (self.analyzer_mode.cache() == 'Impedance with Z Probe')): + if use_V: + set_cmd_tail = 'V' + unit = 'V' + set_parser = lambda x: round(x, 2) + vals = Numbers(0.01, 1.2) + else: + set_cmd_tail = 'DBM' + unit = 'dBm' + set_parser = lambda x: round(x, 1) + vals = Numbers(-65.0, 15.0) + else: + raise NotImplementedError + else: + raise NotImplementedError + + self.oscillator_level_unit.cache.set(unit) + self.oscillator_level.unit = unit + self.oscillator_level.vals = vals + self.oscillator_level.set_parser = set_parser + self.oscillator_level.set_cmd = 'OSC={}' + set_cmd_tail + + def _sweep_start_stop_update(self, parameter_name: str) -> None: + if self.sweep_parameter_mode.cache() == 'Frequency': + unit = 'Hz' + if self.analyzer_mode.cache() == 'Impedance': + if self.cable_length == '1m': + vals = Numbers(100, 15e6) + else: + vals = Numbers(100, 40e6) + elif self.analyzer_mode.cache() == 'Gain-Phase': + vals = Numbers(10, 100e6) + elif self.analyzer_mode.cache() == 'Impedance with Z Probe': + vals = Numbers(10, 100e6) + else: + raise NotImplementedError + else: + raise NotImplementedError + + if 'start' == parameter_name: + self.start.unit = unit + self.start.vals = vals + + if 'stop' == parameter_name: + self.stop.unit = unit + self.stop.vals = vals + + self._oscillator_level_update() + + def _sweep_unit_update(self) -> None: + analyzer_mode = self.analyzer_mode.cache() + A_unit = None + B_unit = None + + if analyzer_mode == 'Gain-Phase': + meas = self.measure_gain_phase() + if meas == 'Rch-Tch(V)': + B_unit = 'V' + elif analyzer_mode == 'Impedance': + meas = self.measure_impedance() + if meas == '|Z|-theta': + B_unit = 'deg' + elif meas == 'Cs-Rs': + B_unit == 'Ohm' + elif meas == 'Cs-D': + A_unit == 'F' + + self.sweep_trace_A.unit = A_unit + self.sweep_trace_B.unit = B_unit + + def sweep_start_trigger(self) -> None: + """ + Sweep start trigger + """ + self.write('SWTRG') + + def reset(self) -> None: + """ + Reset the instrument + Note + The RST command resets the instrument to the power-on + default conditions except for the following settings. + 1. Sweep mode is set to the Single sweep mode (code: SWM2) + and the traces on the screen will be erased. + 2. Data registers (A ~ D), general purpose registers (RA - RL), + all registers for compensation, Rn, Z, and all + read-only type registers are not reset. + 3. Program WORK AREA is not cleared. + + """ + self.write('RST') + if float(self.ask('STOP?')) == 40000e3: + self.cable_length = '0m' + else: + self.cable_length = '1m' + + for name, param in self.parameters.items(): + if hasattr(param, 'reset_value'): + param.reset_value() + + for name, param in self.parameters.items(): + if hasattr(param, 'update_method'): + if isinstance(param.update_method, Callable): + param.update_method() + + def get_idn(self) -> dict[str, Optional[str]]: + """ + For the HP 4194A instrument, the response to 'ID?' command + is used as it does not support standard '*IDN?' command. + + Returns: + A dict containing vendor, model, serial, and firmware. + """ + idstr = "" # in case self.ask fails + try: + original_termination = self.visa_handle.read_termination + self.visa_handle.read_termination = None + self.visa_handle.write('ID?') + idstr = self.visa_handle.read() + self.visa_handle.read_termination = original_termination + + # Full string as model, stripping leading and trailing whitespaces + model = idstr.replace("\r\n", " ").strip() + + idparts: list[Optional[str]] = ["HP", model, None, None] + + except Exception: + self.log.warning( + f"Error getting or interpreting ID?: {idstr!r}", exc_info=True + ) + idparts = [None, self.name, None, None] + + return dict(zip(("vendor", "model", "serial", "firmware"), idparts)) + + def wait_for_srq(self, timeout=25) -> None: + ('Wait for service request (SRQ) from instrument.') + return self.visa_handle.wait_for_srq(timeout) + + def _get_sweep_trace( + self, + number: list[int] = [1], + retrun_val_2=False + ) -> None: + ('Outputs DATA TRACE data. (Query only)') + if not self.transfer_format.cache == '32-bit le': + self.transfer_format('32-bit le') + trace = self.visa_handle.query_binary_values( + 'OUTPDTRC?', is_big_endian=True) + trace_length = len(trace) + points = self.n_points.cache() + if trace_length == points: + returnVal = trace + elif trace_length == 2 * points: + val_1 = trace[0::2] # Amplitude value + if retrun_val_2 is True: + # val_2 = trace[1::2] # Auxiliary amplitude value + raise NotImplementedError( + 'Returning Auxiliary amplitude value not implemented.') + returnVal = val_1 + else: + raise ValueError( + "Number of points recieved does not" + f" match {self.name}.n_points") + return numpy.array(returnVal) +# +# self.add_parameter('ID', +# get_cmd='ID?', +# get_parser=self.read_until_empty_line) +# +# +# def read_until_empty_line(self, cmd): +# response = [] +# while True: +# line = self.instrument.readline().rstrip() # rstrip to remove the '\r\n' +# if line == '': +# break +# response.append(line) +# return response + +# +# def get_raw(self) -> np.ndarray: +# """ +# Return the axis values, with values retrieved from the parent instrument +# """ +# return np.linspace(self._startparam(), self._stopparam(), self._pointsparam()) +# +# + +""" + self.add_parameter( + 'start', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode', 'sweep_type'], + update_method=lambda: self._sweep_parameter_update('start'), + get_cmd='STAR?', + get_parser=float, + docstring='Sets the start value of the sweep parameters. This' + ' command is not valid when the list sweeping mode is' + ' selected. ([Start])\nWhen editing a list sweep table,' + ' the command sets the start value of a segment.' + ' ([SEGMENT: START] under [Sweep])' + ) + + + + + +""" diff --git a/src/qcodes_contrib_drivers/drivers/HP/HP_4395A.py b/src/qcodes_contrib_drivers/drivers/HP/HP_4395A.py new file mode 100644 index 000000000..7db5eeffd --- /dev/null +++ b/src/qcodes_contrib_drivers/drivers/HP/HP_4395A.py @@ -0,0 +1,974 @@ +import warnings +from typing import Any + +from qcodes.instrument import ParameterWithSetpoints, VisaInstrument +from qcodes.instrument.parameter import Parameter +from qcodes.utils.helpers import create_on_off_val_mapping +from qcodes.utils.validators import Enum, Ints, Numbers, Arrays +import numpy + +from private.bit_name_mapper import BitNameMapper +from private.interdependent_parameter import interdependent_parameter_factory + +InterdependentParameter = interdependent_parameter_factory(Parameter) + +class HP4395A(VisaInstrument): + """ + This is the QCoDeS python driver for the HP/Agilent 4395A + """ + + def __init__( + self, + name: str, + address: str, + terminator: str = '\n', + timeout: int = 100000, + **kwargs: Any + ) -> None: + """ + QCoDeS driver for the HP/Agilent 4395A. + + Args: + name (str): Name of the instrument. + address (str): Address of the instrument. + terminator (str): Terminator character of + the string reply. Optional, default `'\\n'` + timeout (int): VISA timeout is set purposely + to a long time to allow long spectrum measurement. + Optional, default 100000 + """ + super().__init__( + name=name, + address=address, + terminator=terminator, + timeout=timeout, + **kwargs + ) + + self._map_STB = BitNameMapper({ + 2: 'Event Status Register B Summary Bit', + 3: 'Questionable Status Register Summary Bit', + 4: 'Message in Output Queue A ', + 5: 'Standard Event Status Register Summary Bit', + 6: 'Request Service', + 7: 'Operation Status Register Summary Bit' + }) + + self._map_ESR = BitNameMapper({ + 0: 'Operation Complete', + 1: 'Request Control', + 2: 'Query Error', + 3: 'Device Dependent Error', + 4: 'Execution Error', + 5: 'Command Error', + 6: 'User Request', + 7: 'Power ON', + }) + + self._map_ESB = BitNameMapper({ + 0: 'SING, NUMG or Cal Std. Complete', + 1: 'Service Routine Waiting or Bus Trigger Waiting', + 2: 'Data Entry Complete', + 3: 'Limit Failed, Ch 2', + 4: 'Limit Failed, Ch 1', + 5: 'Search Failed, Ch 2', + 6: 'Search Failed, Ch 1', + 7: 'Point Measurement Complete', + 8: 'Reverse GET', + 9: 'Forward GET' + }) + + self.add_parameter( # p354 + 'status_mask', + get_cmd='*SRE?', + set_cmd='*SRE {}', + get_parser=lambda x: self._map_STB.value_to_bitnames(int(x), [6]), + set_parser=lambda x: self._map_STB.bitnames_to_value(x), + docstring='Sets the enable bits of the Status Byte Register.' + ' (STB)\n (0 to 255 - decimal expression of enable' + ' bits of the status byte register)\n' + '\n' + self._map_STB.docstring([6]), + ) + + self.add_parameter( # p263 + 'status_mask_instrument', + get_cmd='ESNB?', + set_cmd='ESNB {}', + get_parser=lambda x: self._map_ESB.value_to_bitnames(int(x)), + set_parser=lambda x: self._map_ESB.bitnames_to_value(x), + docstring='Enables the bits of Event Status register B' + ' (ESB)\n (Instrument Evenet Status' + ' register).\n(0 to 65535 - decimal expression' + ' of the contents of the register)\n' + '\n' + self._map_ESB.docstring(), + ) + + self.add_parameter( # p262 + 'status_mask_event', + get_cmd='*ESE?', + set_cmd='*ESE {}', + get_parser=lambda x: self._map_ESR.value_to_bitnames(int(x)), + set_parser=lambda x: self._map_ESR.bitnames_to_value(x), + docstring='Sets the enable bits of the Standard Event Status' + ' Register. (ESR)\n (0 to 255 - decimal expression' + ' of enable bits of the operation status register)\n' + '\n' + self._map_ESR.docstring(), + ) + + self.add_parameter( # p262 + 'status', + get_cmd='*STB?', + get_parser=lambda x: self._map_ESR.value_to_bitnames(int(x)), + docstring='Returns Status Byte Register contents.' + ) + + self.add_parameter( # p303, p339, p373 + 'analyzer_mode', + parameter_class=InterdependentParameter, + set_cmd='{}', + get_cmd=self._analyzer_mode_get, + docstring='Selects the analyzer mode.', + val_mapping={'Network': 'NA', + 'Spectrum': 'SA', + 'Impedance': 'ZA'}, + ) + self.add_parameter( # p238 + 'active_channel', + set_cmd='{}', + get_cmd=self._active_channel_get, + docstring='Selects channel 1 or 2 as the active channel.', + val_mapping={1: 'CHAN1', 2: 'CHAN2'}, + ) + + self.add_parameter( # p273 + 'hold', + set_cmd='HOLD', + get_cmd='HOLD?', + docstring='Freezes the data trace on the display.' + ' The analyzer stops sweeping and taking data.' + ' (SWEEP: HOLD under [Trigger])', + val_mapping=create_on_off_val_mapping( + on_val='1', off_val='0'), + ) + + self.add_parameter( # p258 + 'display_trace', + set_cmd='DISP {}', + get_cmd='DISP?', + docstring='Selects the display trace type. (DISPLAY: DATA, MEMORY,' + ' DATA and MEMORY under [Display]', + val_mapping={ + 'Data': 'DATA', + 'Memory': 'MEMO', + 'Data and memory': 'DATM', + } + ) + + self.add_parameter( # p345 + 'scale_for', + set_cmd='SCAF {}', + get_cmd='SCAF?', + docstring='Selects one of the "DATA" or "MEMORY" traces to be' + ' scaled.\n([SCALE FOR [] under [Scale Ref]; No' + ' equivalent SCPI command)', + val_mapping={ + 'Data': 'DATA', + 'Memory': 'MEMORY', + }, + ) + + self.add_parameter( # p307 + 'options', + parameter_class=InterdependentParameter, + get_cmd='*OPT?', + docstring='Queries the options installed. (Query only)' + # val mapping is broken since instrument does not answer as expected + # *OPT? returns 1D6,010 + # val_mapping={ + # 'None': None, + # 'HP Instrument BASIC': '1C2', + # 'Time-gated spectrum analysis': '1D6' + # } + ) + + self.add_parameter( # p362 + 'trigger_source', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode', 'options'], + # dependent_on=['analyzer_mode'], + update_method=self._trigger_source_update, + get_cmd='TRGS?', + set_cmd='TRGS {}', + docstring='Selects the trigger source, which is common to' + ' both channels. (TRIGGER: [] under [Trigger])', + val_mapping={ + 'Internal trigger': 'INT', + 'External trigger': 'EXT', + 'GPIB trigger': 'BUS', + 'Video trigger': 'VID', + 'Manual trigger': 'MAN', + 'External gate trigger': 'GAT' + } + ) + + self.add_parameter( # p266 + 'display_format', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode'], + update_method=self._display_format_update, + get_cmd='FMT?', + set_cmd='FMT {}', + docstring='Selects the display format.', + val_mapping={ + 'Log magnitude': 'LOGM', + 'Phase': 'PHAS', + 'Delay': 'DELA', + 'Linear magnitude': 'LINM', + 'SWR': 'SWR', + 'Real': 'REAL', + 'Imaginary': 'IMAG', + 'Smith chart': 'SMITH', + 'Polar chart': 'POLA', + 'Admittance Smith chart': 'ADMIT', + 'Spectrum': 'SPECT', + 'Noise level': 'NOISE', + 'Linear Y-axis': 'LINY', + 'Log Y-axis': 'LOGY', + 'Complex plane': 'COMP', + 'Expanded phase': 'EXPP', + } + ) + + self.add_parameter( # p292 + 'measure', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode'], + update_method=self._measure_update, + get_cmd='MEAS?', + set_cmd='MEAS {}', + docstring='Selects the parameters or inputs to be measured.', + val_mapping={ + 'A/R': 'AR', + 'B/R': 'BR', + 'A/B': 'AB', # missing in prog. manual + 'R': 'R', + 'A': 'A', + 'B': 'B', + 'S11 - Reflection Forward': 'S11', + 'S12 - Transmission Forward': 'S12', + 'S21 - Transmission Reverse': 'S21', + 'S22 - Reflection Reverse': 'S22', + # 'S': 'S', # command not valid but in prog. manual + 'Impedance ABS': 'IMAG', + 'Impedance Phase': 'IPH', + 'Resistance': 'IRE', + 'Reactance': 'IIM', + 'Admittance ABS': 'AMAG', + 'Admittance Phase': 'APH', + 'Conductance': 'ARE', + 'Susceptance': 'AIM', + 'Reflection Coefficient ABS': 'RCM', + 'Reflection Coefficient Phase': 'RCPH', + 'Reflection Coefficient Real': 'RCR', + 'Reflection Coefficient Imaginary': 'RCIM', + 'Parallel Capacitance': 'CP', + 'Series Capacitance': 'CS', + 'Parallel Inductance': 'LP', + 'Series Inductance': 'LS', + 'Dissipation Factor': 'D', + 'Quality Factor': 'Q', + 'Parallel Resistance': 'RP', + 'Series Resistance': 'RS', + } + ) + + self.add_parameter( # p358 + 'sweep_type', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode'], + update_method=self._sweep_type_update, + get_cmd='SWPT?', + set_cmd='SWPT {}', + docstring='Selects the sweep type.', + val_mapping={ + 'Linear frequency': 'LINF', + 'Log frequency': 'LOGF', + 'List frequency': 'LIST', + 'Power': 'POWE', + } + ) + + self.add_parameter( # p352 + 'span', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode', 'sweep_type'], + # update_method=self._span_update, + get_cmd='SPAN?', + get_parser=float, + docstring='Sets the span of the sweep parameters. This command is' + ' not valid when the list sweeping mode is selected.' + ' ([Span])\nWhen editing a list sweep table, the command' + ' sets the span of a segment. ([SPAN] under [Sweep])' + ) + + self.add_parameter( # p327 + 'power_level', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode', 'sweep_type', 'span'], + update_method=self._power_level_update, + get_cmd='POWE?', + set_parser=lambda x: round(x, 1), + get_parser=float, + unit='dBm', + vals=Numbers(-50, 15), + docstring='Sets the power level segment by segment, or sets the' + ' power level for the list sweep table. ([POWER]' + ' under [Sweep])\nThis command is valid when the linear' + ' frequency or log frequency sweeping mode is selected' + ' in the network and impedance analyzer modes, or when' + ' measuring on zero span in the spectrum analyzer mode.' + ) + + self.add_parameter( # p234 + 'bandwidth_auto', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode', 'sweep_type'], + update_method=self._bandwidth_auto_update, + get_cmd='BWAUTO?', + val_mapping=create_on_off_val_mapping( + on_val='1', off_val='0'), + docstring='When log frequency sweeping mode is selected, sets' + ' either the automatic or manual IF bandwidth ON.' + ' (Network and impedance analyzers)\nWhen' + ' linear frequency sweeping mode is selected, sets' + ' either the automatic or manual resolution bandwidth' + ' ON. (Spectrum analyzeronly)' + ) + + self.add_parameter( # p233 + 'bandwidth', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode', 'span', 'bandwidth_auto'], + update_method=self._bandwidth_update, + get_cmd='BW?', + unit='Hz', + get_parser=lambda x: int(float(x)), + docstring='Sets the bandwidth value for IF bandwidth reduction, or' + ' sets the IF bandwidth of the list sweep table.\nThis' + ' command is valid only if the automatic IF bandwidth' + ' setting is off by BWAUTO OFF command. (Network and' + ' impedance analyzers)\nSets the bandwidth value for the' + ' resolution bandwidth reduction, or sets the resolution' + ' bandwidth of the list sweep table. This command is' + ' valid only if the automatic resolution bandwidth' + ' setting is off by BWAUTO OFF command.' + ' (Spectrum analyzer)' + ) + + self.add_parameter( # p355 + 'start', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode', 'sweep_type'], + update_method=lambda: self._sweep_start_stop_update('start'), + get_cmd='STAR?', + get_parser=float, + docstring='Sets the start value of the sweep parameters. This' + ' command is not valid when the list sweeping mode is' + ' selected. ([Start])\nWhen editing a list sweep table,' + ' the command sets the start value of a segment.' + ' ([SEGMENT: START] under [Sweep])' + ) + + self.add_parameter( # p356 + 'stop', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode', 'sweep_type'], + update_method=lambda: self._sweep_start_stop_update('stop'), + get_cmd='STOP?', + get_parser=float, + docstring='Sets the stop value of the sweep parameters. This' + ' command is not valid when the list sweeping mode is' + ' selected. ([Stop])\nWhen editing a list sweep table,' + ' the command sets the stop value of a segment.' + ' ([SEGMENT: STOP] under [Sweep])' + ) + + self.add_parameter( # p325 + 'n_points', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode', 'span'], + update_method=self._n_points_update, + get_cmd='POIN?', + get_parser=int, + vals=Ints(2, 801), + docstring='Sets the number of points for the segment, or sets the' + ' number of points for the list sweep table. (In the' + ' spectrum analyzer mode, this command can set the' + ' number of points for zero span measurement only; can' + ' be used to query in the other measurement types.)' + ) + + self.add_parameter( # p232 + 'beep_on_warn', + get_cmd='BEEPWARN?', + set_cmd='BEEPWARN {}', + val_mapping=create_on_off_val_mapping( + on_val='1', off_val='0'), + docstring='Sets the warning annunciator. When the annunciator is' + ' ON, it sounds a warning when a cautionary message is' + ' displayed.' + ) + + self.add_parameter( # p232 + 'beep_on_fail', + get_cmd='BEEPFAIL?', + set_cmd='BEEPFAIL {}', + val_mapping=create_on_off_val_mapping( + on_val='1', off_val='0'), + docstring='Turns the limit fail beeper ON or OFF. When the limit' + ' testing is ON and the fail beeper is ON, a beep is' + ' emitted each time a limit test is performed and a' + ' failure is detected.' + ) + + self.add_parameter( # p260 + 'display_dual_channel', + get_cmd='DUAC?', + set_cmd='DUAC {}', + val_mapping=create_on_off_val_mapping( + on_val='1', off_val='0'), + docstring='Selects the display of both measurement channels or' + 'the active channel only.' + ' ([DUAL CHAN ON off] under [Display])' + ) + + self.add_parameter( # p354 + 'display_split', + get_cmd='SPLD?', + set_cmd='SPLD {}', + val_mapping=create_on_off_val_mapping( + on_val='1', off_val='0'), + docstring='Sets the dual channel display mode. ([SPLIT DISP ON' + ' off] under [Display])\nOFF - Full-screen single' + ' graticule display\nON - Split display with two' + ' half-screen graticules' + ) + + self.add_parameter( # p229 + 'averaging', + get_cmd='AVER?', + set_cmd='AVER {}', + val_mapping=create_on_off_val_mapping( + on_val='1', off_val='0'), + docstring='Turns the averaging function ON or OFF for the active' + ' channel. ([AVERAGING ON off] under [Bw/Avg])' + ) + + self.add_parameter( # p230 + 'averaging_factor', + get_cmd='AVERFACT?', + set_cmd='AVERFACT {}', + get_parser=lambda x: int(float(x)), + vals=Ints(1, 999), + docstring='Turns the averaging function ON or OFF for the active' + ' channel. ([AVERAGING ON off] under [Bw/Avg])' + ) + + self.add_parameter( # p323 + 'unit_phase', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode'], + set_cmd='PHAU {}', + get_cmd='PHAU?', + docstring='Selects the unit of phase format. ([PHASE UNIT []]' + ' under [Format]; Impedance analyzer only.)', + val_mapping={ + '°': 'DEG', + 'rad': 'RAD', + }, + ) + + self.add_parameter( # p339 + 'unit_spectrum', + parameter_class=InterdependentParameter, + dependent_on=['analyzer_mode'], + set_cmd='SAUNIT {}', + get_cmd='SAUNIT?', + docstring='Selects the unit of the measurement data on the active' + ' channel when operating in the spectrum analyzer mode.' + ' (Spectrum analyzer only) ([UNIT: dBm], [dBV], [dBµV],' + ' [WATT], [VOLT] under [Format])', + val_mapping={ + 'dBm': 'DBM', + 'dBV': 'DBV', + 'dBµV': 'DBUV', + 'Watt': 'W', + 'Volt': 'V', + }, + ) + + self.add_parameter( # p267 + 'transfer_format', + set_cmd='{}', + docstring='Sets the format to transfer the trace data via GPIB.' + ' (No query)', + val_mapping={ + '32-bit le': 'FORM2', + '64-bit le': 'FORM3', + 'ASCII': 'FORM4', + '32-bit be': 'FORM5', + }, + snapshot_value=False + ) + + self.add_parameter( + 'sweep_parameter', + get_cmd=self._get_sweep_parameter, + vals=Arrays(shape=(self.n_points.cache.get,)), + snapshot_value=False + ) + + self.add_parameter( + 'sweep_trace', + parameter_class=ParameterWithSetpoints, + get_cmd=self._get_sweep_trace, + setpoints=(self.sweep_parameter,), + snapshot_value=False, + vals=Arrays(shape=(self.n_points.cache.get,)), + # vals=Arrays( + # shape=(self.n_points.cache.get,), + # valid_types=(float, complex)), + ) + + model = self.IDN()['model'] + knownmodels = [ + '4395A', + 'Agilent 4395A (Simulated)' + ] + if model not in knownmodels: + raise ValueError(f"'{model}' is an unknown model.") + + self.connect_message() + self.status_clear() + if '(Simulated)' not in model: + self.snapshot(True) + + def write(self, cmd: str) -> None: + super().write(cmd) + if not self.operation_complete(): + warnings.warn("Operation seems to not have completed succesfully.", + stacklevel=2) + + def ask(self, cmd: str) -> str: + returnVal = super().ask(cmd) + + esr = self.get_status_event() + if esr: + warnings.warn(f"Instrument event status register indicates " + f"the follwing errors: {esr}", + stacklevel=2) + return returnVal + + def operation_complete(self) -> bool: # p306 + ("'*OPC?' query places an ASCII character 1 in to the analyzer's" + " output queue when all pending operations have been completed.") + if self.ask('*OPC?') == '1': + return True + else: + return False + + def get_status_event(self) -> list[str]: # p263 + ('Returns and clears the:\n' + ' ESR - Standard Event Status Register.') + esr = int(self.ask_raw('*ESR?')) + return self._map_ESR.value_to_bitnames(esr) + + def get_status_instrument(self) -> list[str]: # p262 + ('Returns and clears the:\n' + ' ESB - Event Status register B\n' + ' (Instrument Event Status register)') + esb = int(self.ask_raw('*ESB?')) + return self._map_ESB.value_to_bitnames(esb) + + def _analyzer_mode_get(self) -> str: + cmd_list = self.analyzer_mode.val_mapping.values() + for cmd in cmd_list: + if (self.ask(f'{cmd}?') == '1'): + return cmd + + def _active_channel_get(self) -> str: + channel_list = self.active_channel.val_mapping.values() + for channel in channel_list: + if (self.ask(f'{channel}?') == '1'): + return channel + + def _trigger_source_update(self) -> None: + vals = ( + 'Internal trigger', + 'External trigger', + 'GPIB trigger', + 'Manual trigger', + ) + + if self.analyzer_mode.cache() == 'Spectrum': + vals += ('Video trigger',) + if '1D6' in self.options.cache(): + vals += ('External gate trigger',) + + # The * syntax is used to unpack the tuple vals + # and pass them as separate arguments to Enum. + self.trigger_source.vals = Enum(*vals) + + def _display_format_update(self) -> None: + if self.analyzer_mode.cache() == 'Network': + self.display_format.vals = Enum( + 'Log magnitude', + 'Phase', + 'Delay', + 'Linear magnitude', + 'SWR', + 'Real', + 'Imaginary', + 'Smith chart', + 'Polar chart', + 'Admittance Smith chart', + 'Expanded phase', + ) + if self.analyzer_mode.cache() == 'Spectrum': + self.display_format.vals = Enum( + 'Spectrum', + 'Noise level', + ) + if self.analyzer_mode.cache() == 'Impedance': + self.display_format.vals = Enum( + 'Smith chart', + 'Polar chart', + 'Admittance Smith chart', + 'Linear Y-axis', + 'Log Y-axis', + 'Complex plane', + ) + + def _measure_update(self) -> None: + if self.analyzer_mode.cache() == 'Network': + self.measure.vals = Enum( + 'A/R', + 'B/R', + 'R', + 'A', + 'B', + 'S11', + 'S12', + 'S21', + 'S22', + ) + if self.analyzer_mode.cache() == 'Spectrum': + self.measure.vals = Enum( + # 'S', # command not valid but in prog. manual + 'R', + 'A', + 'B', + ) + if self.analyzer_mode.cache() == 'Impedance': + self.measure.vals = Enum( + 'Impedance ABS', + 'Impedance Phase', + 'Resistance', + 'Reactance', + 'Admittance ABS', + 'Admittance Phase', + 'Conductance', + 'Susceptance', + 'Reflection Coefficient ABS', + 'Reflection Coefficient Phase', + 'Reflection Coefficient Real', + 'Reflection Coefficient Imaginary', + 'Parallel Capacitance', + 'Series Capacitance', + 'Parallel Inductance', + 'Series Inductance', + 'Dissipation Factor', + 'Quality Factor', + 'Parallel Resistance', + 'Series Resistance', + ) + + def _sweep_type_update(self) -> None: + if self.sweep_type.cache() == 'List frequency': + raise NotImplementedError + if self.analyzer_mode.cache() == 'Spectrum': + self.sweep_type.vals = Enum( + 'Linear frequency', + 'List frequency', + ) + else: + self.sweep_type.vals = Enum( + 'Linear frequency', + 'Log frequency', + 'List frequency', + 'Power', + ) + + def _power_level_update(self) -> None: + if self.analyzer_mode.cache() == 'Spectrum': + if self.span.cache() == 0: + set_cmd = 'POWE {}DBM' + else: + set_cmd = self._error_cmd_not_valid + else: + if (self.sweep_type.cache() == 'Linear frequency') or (self.sweep_type.cache() == 'Log frequency'): + set_cmd = 'POWE {}DBM' + else: + set_cmd = self._error_cmd_not_valid + self.power_level.set_cmd = set_cmd + + def _bandwidth_update(self) -> None: + if self.bandwidth_auto.cache() is False: + set_cmd = 'BW {}HZ' + else: + set_cmd = self._error_cmd_not_valid + self.bandwidth.set_cmd = set_cmd + + if self.analyzer_mode.cache() == 'Spectrum': + if self.span.cache() == 0: + self.bandwidth.vals = Enum( + 3e3, 5e3, 10e3, 20e3, 40e3, 100e3, 200e3, 400e3, 800e3, + 1.5e6, 3e6, 5e6 + ) + else: + self.bandwidth.vals = Enum( + 1, 3, 10, 30, 100, 300, + 1e3, 3e3, 10e3, 30e3, 100e3, 300e3, + 1e6, 3e6 + ) + else: + self.bandwidth.vals = Enum( + 2, 10, 30, 100, 300, + 1e3, 3e3, 10e3, 30e3 + ) + + def _sweep_start_stop_update(self, parameter_name: str) -> None: + if self.sweep_type.cache() == 'Power': + set_cmd_tail = 'DBM' + unit = 'dBm' + vals = Numbers(-50, 15) + else: + set_cmd_tail = 'HZ' + unit = 'Hz' + if self.analyzer_mode.cache() == 'Spectrum': + vals = Numbers(0, 510e6) + else: + vals = Numbers(10, 510e6) + + if 'start' == parameter_name: + self.start.set_cmd = 'STAR {}' + set_cmd_tail + self.start.unit = unit + self.start.vals = vals + + if 'stop' == parameter_name: + self.stop.set_cmd = 'STOP {}' + set_cmd_tail + self.stop.unit = unit + self.stop.vals = vals + + self._sweep_unit_update() + + def _span_update(self) -> None: + raise NotImplementedError + + def _n_points_update(self) -> None: + if self.analyzer_mode.cache() == 'Spectrum': + if self.span.cache() != 0: + set_cmd = self._error_cmd_not_valid + else: + set_cmd = 'POIN {}' + else: + set_cmd = 'POIN {}' + self.n_points.set_cmd = set_cmd + + def _bandwidth_auto_update(self) -> None: + if self.analyzer_mode.cache() == 'Spectrum': + if self.sweep_type.cache() == 'Linear frequency': + set_cmd = 'BWAUTO{}', + else: + set_cmd = self._error_cmd_not_valid + else: + if self.sweep_type.cache() == 'Log frequency': + set_cmd = 'BWAUTO{}', + else: + set_cmd = self._error_cmd_not_valid + self.n_points.set_cmd = set_cmd + + def _error_cmd_not_valid(self, value): + raise ValueError( + "Command not valid in current instrument configuration." + ) + + def _sweep_unit_update(self) -> None: # 104 + # sweep_parameter unit + if self.sweep_type.cache() == 'Power': + self.sweep_parameter.unit = 'dBm' + else: + self.sweep_parameter.unit = 'Hz' + + # sweep_trace unit + display_format = self.display_format.cache() + analyzer_mode = self.analyzer_mode.cache() + unit_phase = self.unit_phase.cache() + measure = self.measure.cache() + unit_spectrum = self.unit_spectrum.cache() + + cirf_dependent = [ + 'Smith chart', + 'Polar chart', + 'Admittance Smith chart'] + if display_format in cirf_dependent: + # depends on CIRF Command parameter, Amplitude Value (Value 2) + raise NotImplementedError( + 'display_format units not implemented') + + if analyzer_mode == 'Network': + if display_format == 'Log magnitude': + sweep_trace_unit = 'dB' + elif display_format in ['Phase', 'Expanded phase']: + sweep_trace_unit = unit_phase + elif display_format == 'Delay': + sweep_trace_unit = 's' + elif display_format == 'Linear magnitude': + sweep_trace_unit = 'V' + # elif display_format == 'SWR': + # sweep_trace_unit = '' + # elif display_format == 'Real': + # sweep_trace_unit = '' + # elif display_format == 'Imaginary': + # sweep_trace_unit = '' + else: + raise NotImplementedError( + 'Unit needs to be implemented in _sweep_unit_update') + + if analyzer_mode == 'Spectrum': + sweep_trace_unit = unit_spectrum + + if analyzer_mode == 'Impedance': + ohm = [ + 'Impedance ABS', + 'Resistance', + 'Reactance', + 'Parallel Resistance', + 'Series Resistance', + ] + phase = [ + 'Impedance Phase', + 'Admittance Phase', + ] + siemens = [ + 'Admittance ABS', + 'Conductance', + 'Susceptance', + ] + farad = [ + 'Parallel Capacitance', + 'Series Capacitance', + ] + henry = [ + 'Parallel Inductance', + 'Series Inductance', + ] + + if measure in ohm: + sweep_trace_unit = 'ohm' + elif measure in phase: + sweep_trace_unit = unit_phase + elif measure in siemens: + sweep_trace_unit = 'S' + elif measure in farad: + sweep_trace_unit = 'F' + elif measure in henry: + sweep_trace_unit = 'H' + else: + raise NotImplementedError( + 'Unit needs to be implemented in _sweep_unit_update') + + self.sweep_trace.unit = sweep_trace_unit + + def status_clear(self) -> None: # p240 + ('Clears the error queues:' + ' * STB - Status Byte\n' + ' * OSR - Operational Status register\n' + ' * ESR - Standard Event Status register,\n' + ' * ESB - Event Status register B' + ' (Instrument Event Status register)') + self.write_raw('*CLS') + + def wait_for_srq(self, timeout=25) -> None: + ('Wait for service request (SRQ) from instrument.') + return self.visa_handle.wait_for_srq(timeout) + + def _get_sweep_trace( + self, + number: list[int] = [1], + retrun_val_2=False + ) -> None: # p314 + ('Outputs DATA TRACE data. (Query only)') + if not self.transfer_format.cache == '32-bit le': + self.transfer_format('32-bit le') + trace = self.visa_handle.query_binary_values( + 'OUTPDTRC?', is_big_endian=True) + trace_length = len(trace) + points = self.n_points.cache() + if trace_length == points: + returnVal = trace + elif trace_length == 2 * points: + val_1 = trace[0::2] # Amplitude value + if retrun_val_2 is True: + # val_2 = trace[1::2] # Auxiliary amplitude value + raise NotImplementedError( + 'Returning Auxiliary amplitude value not implemented.') + returnVal = val_1 + else: + raise ValueError( + "Number of points recieved does not" + f" match {self.name}.n_points") + return numpy.array(returnVal) + + def _get_sweep_parameter(self) -> list[float]: # p319 + ('Outputs the sweep parameter data. (Query only)') + self._sweep_unit_update() + if not self.transfer_format.cache == '32-bit le': + self.transfer_format('32-bit le') + returnVal = self.visa_handle.query_binary_values( + 'OUTPSWPRM?', is_big_endian=True) + return numpy.array(returnVal) + + def scale_auto(self) -> None: # p229 + ('Brings the trace data, defined by the SCAF command, in view on the' + ' display. (Network and impedance analyzers only) ([AUTO SCALE] under' + ' [Scale Ref]; No query)') + if self.analyzer_mode.cache() == 'Spectrum': + self._error_cmd_not_valid + else: + self.write('AUTO') + + def reset(self) -> None: # 337 + ('Resets the analyzer to its default values (No query):\n' + ' * Initializes the instrument settings.\n' + ' * Sets the trigger mode to HOLD.\n' + ' * Resets HP Instrument BASIC\n' + ' (only if executed on the external controller)') + self.write('*RST') + # self.snapshot(update=True) + + def single(self) -> None: # p350 + ('Makes one sweep of the data and returns to the hold mode.' + ' Instrument BASIC EXECUTE executable;\n[SINGLE] under' + ' [Trigger]; No query;\nWhen you execute this command by [EXECUTE]' + ' command of the instrument BASIC, the analyzer sweeps once and then' + ' back the control to the analyzer. The program waits the completion' + ' of sweep. You can use this method instead of detecting the sweep' + ' end by monitoring the status register to synchronize the program' + ' with the analyzer.') + self.write('SING') + + def number_of_groups(self, number: int) -> None: # p 303 + ('Triggers a user-specified number of sweeps and returns to the' + ' HOLD mode.\n([NUMBER OF GROUPS] under [Trigger]; No query') + self.write(f'NUMG {number}') diff --git a/src/qcodes_contrib_drivers/drivers/HP/private/bit_name_mapper.py b/src/qcodes_contrib_drivers/drivers/HP/private/bit_name_mapper.py new file mode 100644 index 000000000..d5d3518e7 --- /dev/null +++ b/src/qcodes_contrib_drivers/drivers/HP/private/bit_name_mapper.py @@ -0,0 +1,106 @@ +class BitNameMapper: + """ + Manages mappings between integer values and corresponding bit names. + + Attributes: + bit_map (dict): Maps bit positions to bit names. + Example: + self._map_STB = BitNameMapper({ + 2: 'Event Status Register B Summary Bit', + 3: 'Questionable Status Register Summary Bit', + 4: 'Message in Output Queue A', + 5: 'Standard Event Status Register Summary Bit', + 6: 'Request Service', + 7: 'Operation Status Register Summary Bit' + }) + + Methods: + value_to_bitnames: Converts an integer to a list of bit names. + bitnames_to_value: Converts a list of bit names to an integer value. + docstring: Generates a docstring listing the available bit names. + """ + + def __init__(self, bit_map: dict): + """ + Initialize the BitNameMapper with a mapping dictionary. + + Args: + bit_map (dict): A dictionary mapping bit positions to names. + """ + self.bit_map = bit_map + + @property + def bit_map(self) -> dict: + """Returns the current bit position to name mapping.""" + return self._bit_map + + @bit_map.setter + def bit_map(self, bit_map: dict) -> None: + """Sets the bit mapping and creates an inverse mapping.""" + self._bit_map = bit_map + self._inverse_bit_map = {name: position for position, name in bit_map.items()} + + def value_to_bitnames( + self, + value: int, + disabled_bits: list[int] = [] + ) -> list[str]: + """ + Converts an integer value to a list of corresponding bit names. + + Args: + value (int): The integer value to convert. + disabled_bits (list[int]): A list of bit positions to ignore. + + Returns: + list[str]: The names of the bits set in the value. + + Raises: + ValueError: If a disabled bit is set in the value. + KeyError: If a bit position in the value has no corresponding name. + """ + bit_names = [] + for bit_position in range(value.bit_length()): + if value & 1: + if bit_position in disabled_bits: + raise ValueError(f"Bit position {bit_position} is disabled") + if bit_position in self._bit_map: + bit_names.append(self._bit_map[bit_position]) + else: + raise KeyError(f"Unmapped bit at position: {bit_position}") + value >>= 1 + return bit_names + + def bitnames_to_value(self, bit_names: list[str]) -> int: + """ + Converts a list of bit names to their corresponding integer value. + + Args: + bit_names (list[str]): The list of bit names to convert. + + Returns: + int: The integer value represented by the bit names. + """ + value = 0 + for bit_name in bit_names: + bit_position = self._inverse_bit_map.get(bit_name) + if bit_position is None: + raise KeyError(f"Unmapped bit name: {bit_name}") + value += 2 ** bit_position + return value + + def docstring(self, disabled_bits: list[int] = []) -> str: + """ + Generates a docstring listing the available bit names, excluding any disabled bits. + + Args: + disabled_bits (list[int]): A list of bit positions to exclude from the docstring. + + Returns: + str: A formatted string listing the available bit names. + """ + docstring = "" + for bit_index, bit_name in self._bit_map.items(): + if bit_index not in disabled_bits: + docstring += f" * '{bit_name}'\n" + return docstring diff --git a/src/qcodes_contrib_drivers/drivers/HP/private/interdependent_parameter.py b/src/qcodes_contrib_drivers/drivers/HP/private/interdependent_parameter.py new file mode 100644 index 000000000..550e01be5 --- /dev/null +++ b/src/qcodes_contrib_drivers/drivers/HP/private/interdependent_parameter.py @@ -0,0 +1,174 @@ +import os +from typing import Any, Union, Optional, Callable, List +from typing_extensions import Literal +from qcodes import ParameterBase +import logging + +log = logging.getLogger(__name__) + +def interdependent_parameter_factory(base_class: type = ParameterBase) -> type: + """ + Factory function to create a class extending a given base class with interdependency features. + + Args: + base_class (type): The base class to extend, must be a subclass of `ParameterBase`. + + Returns: + A class that extends the given `base_class` with interdependency capabilities. + + Raises: + TypeError: If the `base_class` is not a subclass of `ParameterBase`. + """ + if not issubclass(base_class, ParameterBase): + raise TypeError("base_class must be a subclass of ParameterBase") + + + class FactoryInterdependentParameter(ParameterBase): + """ + Extends `ParameterBase` to handle interdependencies with other parameters. + + This class allows the parameter to be dependent on other parameters, + triggering update actions when those parameters change. + `InterdependentParameters` can only depend on other `InterdependentParameters`. + + Attributes: + dependent_on (List[str] or Literal[False]): Names of parameters this parameter depends on. + update_method (Callable or Literal[False]): Method called when a dependent parameter changes. + """ + + def __init__( + self, + get_cmd: Optional[Union[str, Callable[..., Any], Literal[False]]] = None, + set_cmd: Optional[Union[str, Callable[..., Any], Literal[False]]] = False, + update_method: Optional[Union[Callable[..., Any], Literal[False]]] = False, + dependent_on: Optional[Union[List[str], Literal[False]]] = False, + docstring: Optional[str] = None, + **kwargs: Any + ) -> None: + super().__init__(get_cmd=None, set_cmd=False, docstring=docstring, **kwargs) + self.get_cmd = get_cmd + self.set_cmd = set_cmd + self.update_method = update_method + self._dependent_on = dependent_on + self._dependent_parameter_list = [] + self.docstring = docstring + + if self._dependent_on: + for dependent in dependent_on: + parameter = getattr(self.instrument, dependent) + parameter.add_dependent_parameter(self) + + @property + def dependent_on(self) -> Union[List[str], Literal[False]]: + """ + Return list of parameter names that this parameter depends on. + + Returns: + A list of strings representing the names of dependent parameters, + or False if there are none. + """ + return self._dependent_on + + def add_dependent_parameter(self, dependent_parameter: 'FactoryInterdependentParameter') -> None: + """ + Adds a parameter to the list of dependent parameters. + + This method is used to register another parameter as dependent on this parameter. + When the value of this parameter changes, the registered dependent parameter + is updated accordingly. + + Args: + dependent_parameter: The parameter to add as a dependent. + """ + self._dependent_parameter_list.append(dependent_parameter) + + def get_raw(self) -> Any: + """ + Retrieves the raw value of the parameter, executing any update actions if the parameter has changed. + + This method is a core part of the QCoDeS parameter interface. It is used to get the current value of the + parameter from the instrument. If the parameter is dependent on others, it also triggers the update + mechanism for those parameters. + + Returns: + The raw value of the parameter. + """ + if isinstance(self.get_cmd, str): + raw_value = self.instrument.ask(self.get_cmd) + elif isinstance(self.get_cmd, Callable): + raw_value = self.get_cmd() + else: + if hasattr(self, 'val_mapping'): + raw_value = self.val_mapping[self.cache.get()] + else: + raw_value = self.cache.get() + + self._execute_on_change(raw_value) + return raw_value + + def set_raw(self, raw_value: Union[float, int, str]) -> None: + """ + Sets the raw value of the parameter, executing any update actions if the parameter changes. + + This method is a core part of the QCoDeS parameter interface. It is used to set the value of the + parameter on the instrument. If the parameter is dependent on others, it also triggers the update + mechanism for those parameters. + + Args: + raw_value: The new value to set for the parameter. + """ + if isinstance(self.set_cmd, str): + self.instrument.write(self.set_cmd.format(raw_value)) + elif callable(self.set_cmd): + self.set_cmd(raw_value) + + self._execute_on_change(raw_value) + + def update_docstring(self) -> None: + """ + Updates the docstring of the parameter to reflect its current state. + + This method constructs a new docstring based on the parameter's name, + label, unit, and validator (`vals`). It appends any additional documentation + provided in `self.docstring` to the automatically generated docstring. + This is useful for keeping the documentation of the parameter up-to-date with + its runtime state. + """ + self.__doc__ = os.linesep.join( + ( + "Parameter class:", + "", + f"* `name`: {self.name}", + f"* `label`: {self.label}", + f"* `unit`: {self.unit}", + f"* `vals`: {repr(self.vals)}", + ) + ) + if self.docstring is not None: + self.__doc__ = os.linesep.join((self.docstring, "", self.__doc__)) + + def _execute_on_change(self, raw_value: Union[float, int, str]) -> None: + """ + Executes the update method for all dependent parameters when this parameter's value changes. + + This private method is called whenever the value of this parameter is read or set. It compares the + new value with the cached value, and if they differ, it updates the cache and calls the `update_method` + for each parameter that depends on this one. This ensures that changes in this parameter's value + propagate to all dependent parameters. + + Args: + raw_value: The new raw value of the parameter. + """ + value = self._from_raw_value_to_value(raw_value) + if self.cache.get(False) != value: + self.cache.set(value) + for parameter in self._dependent_parameter_list: + if parameter.update_method: + parameter.update_method() + parameter.update_docstring() + parameter.get() + if self.update_method: + self.update_method() + self.update_docstring() + + return FactoryInterdependentParameter diff --git a/src/qcodes_contrib_drivers/drivers/HP/private/reset_value_parameter.py b/src/qcodes_contrib_drivers/drivers/HP/private/reset_value_parameter.py new file mode 100644 index 000000000..83bf05184 --- /dev/null +++ b/src/qcodes_contrib_drivers/drivers/HP/private/reset_value_parameter.py @@ -0,0 +1,59 @@ +from typing import Any, Union, Optional +from qcodes import ParameterBase +import logging + +log = logging.getLogger(__name__) + +def reset_value_parameter_factory(base_class: type = ParameterBase) -> type: + """ + Factory function to create a custom QCoDeS parameter class with reset functionality. + + This function creates a subclass of the given base class (default is `ParameterBase`) + that includes a method to reset the parameter's value to a predefined default. + + Args: + base_class (type): The base class to extend, must be a subclass of `ParameterBase`. + + Returns: + type: A new class that extends the base class with reset functionality. + + Raises: + TypeError: If the base_class is not a subclass of `ParameterBase`. + """ + if not issubclass(base_class, ParameterBase): + raise TypeError("base_class must be a subclass of ParameterBase") + + class FactoryResetValueParameter(base_class): + """ + Extends a QCoDeS Parameter with a reset_value method and value_after_reset attribute. + + Attributes: + value_after_reset (Union[Any, Literal[False], None]): The value to reset the + parameter to when `reset_value` is called. + + Methods: + reset_value(): Resets the parameter's value to `value_after_reset`. + """ + + def __init__( + self, + value_after_reset: Optional[Union[Any, Literal[False]]] = False, + **kwargs: Any + ) -> None: + """ + Initializes a new FactoryResetValueParameter instance. + + Args: + value_after_reset (Optional[Union[Any, Literal[False]]]): The value to reset to. + If None or False, the reset functionality is disabled. + **kwargs: Additional keyword arguments for the base class. + """ + self.value_after_reset = value_after_reset + super().__init__(**kwargs) + + def reset_value(self) -> None: + """Resets the parameter's value to `value_after_reset`, if it is set.""" + if self.value_after_reset is not None: + self.cache.set(self.value_after_reset) + + return FactoryResetValueParameter From 4442a5b9621ae9fccf3d23e3d2e98167bcb611c4 Mon Sep 17 00:00:00 2001 From: DCEM Date: Tue, 3 Dec 2024 14:42:56 +0100 Subject: [PATCH 2/2] correct docstring --- src/qcodes_contrib_drivers/drivers/HP/HP_4395A.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/qcodes_contrib_drivers/drivers/HP/HP_4395A.py b/src/qcodes_contrib_drivers/drivers/HP/HP_4395A.py index 7db5eeffd..eeea13667 100644 --- a/src/qcodes_contrib_drivers/drivers/HP/HP_4395A.py +++ b/src/qcodes_contrib_drivers/drivers/HP/HP_4395A.py @@ -470,8 +470,8 @@ def __init__( set_cmd='AVERFACT {}', get_parser=lambda x: int(float(x)), vals=Ints(1, 999), - docstring='Turns the averaging function ON or OFF for the active' - ' channel. ([AVERAGING ON off] under [Bw/Avg])' + docstring='Makes the averaging factor for the active function.' + '([AVERAGING FACTOR] under [Bw/Avg])' ) self.add_parameter( # p323