From 07944785a675561ba23a04c84c0693e842ed534e Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Mon, 5 Sep 2022 16:37:43 +0200 Subject: [PATCH 1/2] Move range stuff in separate module --- qupulse/pulses/loop_pulse_template.py | 147 +------------------------ qupulse/pulses/range.py | 151 ++++++++++++++++++++++++++ 2 files changed, 155 insertions(+), 143 deletions(-) create mode 100644 qupulse/pulses/range.py diff --git a/qupulse/pulses/loop_pulse_template.py b/qupulse/pulses/loop_pulse_template.py index 82236c71..09358866 100644 --- a/qupulse/pulses/loop_pulse_template.py +++ b/qupulse/pulses/loop_pulse_template.py @@ -21,6 +21,7 @@ from qupulse.pulses.pulse_template import PulseTemplate, ChannelID, AtomicPulseTemplate from qupulse._program.waveforms import SequenceWaveform as ForLoopWaveform from qupulse.pulses.measurement import MeasurementDefiner, MeasurementDeclaration +from qupulse.pulses.range import ParametrizedRange, RangeScope __all__ = ['ForLoopPulseTemplate', 'LoopPulseTemplate', 'LoopIndexNotUsedException'] @@ -45,54 +46,6 @@ def measurement_names(self) -> Set[str]: return self.__body.measurement_names -class ParametrizedRange: - """Like the builtin python range but with parameters.""" - def __init__(self, *args, **kwargs): - """Positional and keyword arguments cannot be mixed. - - Args: - *args: Interpreted as ``(start, )`` or ``(start, stop[, step])`` - **kwargs: Expected to contain ``start``, ``stop`` and ``step`` - Raises: - TypeError: If positional and keyword arguments are mixed - KeyError: If keyword arguments but one of ``start``, ``stop`` or ``step`` is missing - """ - if args and kwargs: - raise TypeError('ParametrizedRange only takes either positional or keyword arguments') - elif kwargs: - start = kwargs['start'] - stop = kwargs['stop'] - step = kwargs['step'] - elif len(args) in (1, 2, 3): - if len(args) == 3: - start, stop, step = args - elif len(args) == 2: - (start, stop), step = args, 1 - elif len(args) == 1: - start, (stop,), step = 0, args, 1 - else: - raise TypeError('ParametrizedRange expected 1 to 3 arguments, got {}'.format(len(args))) - - self.start = ExpressionScalar.make(start) - self.stop = ExpressionScalar.make(stop) - self.step = ExpressionScalar.make(step) - - def to_tuple(self) -> Tuple[Any, Any, Any]: - """Return a simple representation of the range which is useful for comparison and serialization""" - return (self.start.get_serialization_data(), - self.stop.get_serialization_data(), - self.step.get_serialization_data()) - - def to_range(self, parameters: Mapping[str, Number]) -> range: - return range(checked_int_cast(self.start.evaluate_in_scope(parameters)), - checked_int_cast(self.stop.evaluate_in_scope(parameters)), - checked_int_cast(self.step.evaluate_in_scope(parameters))) - - @property - def parameter_names(self) -> Set[str]: - return set(self.start.variables) | set(self.stop.variables) | set(self.step.variables) - - class ForLoopPulseTemplate(LoopPulseTemplate, MeasurementDefiner, ParameterConstrainer): """This pulse template allows looping through an parametrized integer range and provides the loop index as a parameter to the body. If you do not need the index in the pulse template, consider using @@ -122,18 +75,7 @@ def __init__(self, MeasurementDefiner.__init__(self, measurements=measurements) ParameterConstrainer.__init__(self, parameter_constraints=parameter_constraints) - if isinstance(loop_range, ParametrizedRange): - self._loop_range = loop_range - elif isinstance(loop_range, (int, str)): - self._loop_range = ParametrizedRange(loop_range) - elif isinstance(loop_range, (tuple, list)): - self._loop_range = ParametrizedRange(*loop_range) - elif isinstance(loop_range, range): - self._loop_range = ParametrizedRange(start=loop_range.start, - stop=loop_range.stop, - step=loop_range.step) - else: - raise ValueError('loop_range is not valid') + self._loop_range = ParametrizedRange.from_range_like(loop_range) if not loop_index.isidentifier(): raise InvalidParameterNameException(loop_index) @@ -198,15 +140,8 @@ def _body_scope_generator(self, scope: Scope, forward=True) -> Iterator[Scope]: loop_range = loop_range if forward else reversed(loop_range) loop_index_name = self._loop_index - get_for_loop_scope = _get_for_loop_scope - for loop_index_value in loop_range: - try: - yield get_for_loop_scope(scope, loop_index_name, loop_index_value) - except TypeError: - # we cannot hash the scope so we will not try anymore - get_for_loop_scope = _ForLoopScope - yield get_for_loop_scope(scope, loop_index_name, loop_index_value) + yield _ForLoopScope(scope, loop_index_name, loop_index_value) def _internal_create_program(self, *, scope: Scope, @@ -301,78 +236,4 @@ def __str__(self) -> str: self.body_parameter_names) -class _ForLoopScope(Scope): - __slots__ = ('_index_name', '_index_value', '_inner') - - def __init__(self, inner: Scope, index_name: str, index_value: int): - super().__init__() - self._inner = inner - self._index_name = index_name - self._index_value = index_value - - def get_volatile_parameters(self) -> FrozenMapping[str, Expression]: - inner_volatile = self._inner.get_volatile_parameters() - - if self._index_name in inner_volatile: - # TODO: use delete method of frozendict - index_name = self._index_name - return FrozenDict((name, value) for name, value in inner_volatile.items() if name != index_name) - else: - return inner_volatile - - def __hash__(self): - return hash((self._inner, self._index_name, self._index_value)) - - def __eq__(self, other: '_ForLoopScope'): - try: - return (self._index_name == other._index_name - and self._index_value == other._index_value - and self._inner == other._inner) - except AttributeError: - return False - - def __contains__(self, item): - return item == self._index_name or item in self._inner - - def get_parameter(self, parameter_name: str) -> Number: - if parameter_name == self._index_name: - return self._index_value - else: - return self._inner.get_parameter(parameter_name) - - __getitem__ = get_parameter - - def change_constants(self, new_constants: Mapping[str, Number]) -> 'Scope': - return _get_for_loop_scope(self._inner.change_constants(new_constants), self._index_name, self._index_value) - - def __len__(self) -> int: - return len(self._inner) + int(self._index_name not in self._inner) - - def __iter__(self) -> Iterator: - if self._index_name in self._inner: - return iter(self._inner) - else: - return itertools.chain(self._inner, (self._index_name,)) - - def as_dict(self) -> FrozenMapping[str, Number]: - if self._as_dict is None: - self._as_dict = FrozenDict({**self._inner.as_dict(), self._index_name: self._index_value}) - return self._as_dict - - def keys(self): - return self.as_dict().keys() - - def items(self): - return self.as_dict().items() - - def values(self): - return self.as_dict().values() - - def __repr__(self): - return f'{type(self)}(inner={self._inner!r}, index_name={self._index_name!r}, ' \ - f'index_value={self._index_value!r})' - - -@functools.lru_cache(maxsize=10**6) -def _get_for_loop_scope(inner: Scope, index_name: str, index_value: int) -> Scope: - return _ForLoopScope(inner, index_name, index_value) +_ForLoopScope = RangeScope diff --git a/qupulse/pulses/range.py b/qupulse/pulses/range.py new file mode 100644 index 00000000..d3f8c57b --- /dev/null +++ b/qupulse/pulses/range.py @@ -0,0 +1,151 @@ +from typing import Tuple, Any, AbstractSet, Mapping, Union, Iterator +from numbers import Number +from dataclasses import dataclass +from functools import lru_cache +import itertools + +from qupulse.utils import checked_int_cast, cached_property +from qupulse.expressions import ExpressionScalar, ExpressionVariableMissingException, ExpressionLike, Expression +from qupulse.parameter_scope import Scope +from qupulse.utils.types import FrozenDict, FrozenMapping + +RangeLike = Union[range, + ExpressionLike, + Tuple[ExpressionLike, ExpressionLike], + Tuple[ExpressionLike, ExpressionLike, ExpressionLike]] + + +@dataclass(frozen=True) +class ParametrizedRange: + start: ExpressionScalar + stop: ExpressionScalar + step: ExpressionScalar + + def __init__(self, *args, **kwargs): + """Like the builtin python range but with parameters. Positional and keyword arguments cannot be mixed. + + Args: + *args: Interpreted as ``(start, )`` or ``(start, stop[, step])`` + **kwargs: Expected to contain ``start``, ``stop`` and ``step`` + Raises: + TypeError: If positional and keyword arguments are mixed + KeyError: If keyword arguments but one of ``start``, ``stop`` or ``step`` is missing + """ + if args and kwargs: + raise TypeError('ParametrizedRange only takes either positional or keyword arguments') + elif kwargs: + start = kwargs['start'] + stop = kwargs['stop'] + step = kwargs['step'] + elif len(args) in (1, 2, 3): + if len(args) == 3: + start, stop, step = args + elif len(args) == 2: + (start, stop), step = args, 1 + else: + start, (stop,), step = 0, args, 1 + else: + raise TypeError('ParametrizedRange expected 1 to 3 arguments, got {}'.format(len(args)), args) + + object.__setattr__(self, 'start', ExpressionScalar(start)) + object.__setattr__(self, 'stop', ExpressionScalar(stop)) + object.__setattr__(self, 'step', ExpressionScalar(step)) + + @lru_cache(maxsize=1024) + def to_tuple(self) -> Tuple[Any, Any, Any]: + """Return a simple representation of the range which is useful for comparison and serialization""" + return (self.start.get_serialization_data(), + self.stop.get_serialization_data(), + self.step.get_serialization_data()) + + def to_range(self, parameters: Mapping[str, Number]) -> range: + return range(checked_int_cast(self.start.evaluate_in_scope(parameters)), + checked_int_cast(self.stop.evaluate_in_scope(parameters)), + checked_int_cast(self.step.evaluate_in_scope(parameters))) + + @cached_property + def parameter_names(self) -> AbstractSet[str]: + return set(self.start.variables) | set(self.stop.variables) | set(self.step.variables) + + @classmethod + def from_range_like(cls, range_like: RangeLike): + if isinstance(range_like, cls): + return range_like + elif isinstance(range_like, tuple): + return cls(*range_like) + elif isinstance(range_like, range): + return cls(range_like.start, range_like.stop, range_like.step) + else: + return cls(range_like) + + +class RangeScope(Scope): + __slots__ = ('_index_name', '_index_value', '_inner') + + def __init__(self, inner: Scope, index_name: str, index_value: int): + super().__init__() + self._inner = inner + self._index_name = index_name + self._index_value = index_value + + def get_volatile_parameters(self) -> FrozenMapping[str, Expression]: + inner_volatile = self._inner.get_volatile_parameters() + + if self._index_name in inner_volatile: + # TODO: use delete method of frozendict + index_name = self._index_name + return FrozenDict((name, value) for name, value in inner_volatile.items() if name != index_name) + else: + return inner_volatile + + def __hash__(self): + return hash((self._inner, self._index_name, self._index_value)) + + def __eq__(self, other: 'RangeScope'): + try: + return (self._index_name == other._index_name + and self._index_value == other._index_value + and self._inner == other._inner) + except AttributeError: + return False + + def __contains__(self, item): + return item == self._index_name or item in self._inner + + def get_parameter(self, parameter_name: str) -> Number: + if parameter_name == self._index_name: + return self._index_value + else: + return self._inner.get_parameter(parameter_name) + + __getitem__ = get_parameter + + def change_constants(self, new_constants: Mapping[str, Number]) -> 'Scope': + return RangeScope(self._inner.change_constants(new_constants), self._index_name, self._index_value) + + def __len__(self) -> int: + return len(self._inner) + int(self._index_name not in self._inner) + + def __iter__(self) -> Iterator: + if self._index_name in self._inner: + return iter(self._inner) + else: + return itertools.chain(self._inner, (self._index_name,)) + + def as_dict(self) -> FrozenMapping[str, Number]: + if self._as_dict is None: + self._as_dict = FrozenDict({**self._inner.as_dict(), self._index_name: self._index_value}) + return self._as_dict + + def keys(self): + return self.as_dict().keys() + + def items(self): + return self.as_dict().items() + + def values(self): + return self.as_dict().values() + + def __repr__(self): + return f'{type(self)}(inner={self._inner!r}, index_name={self._index_name!r}, ' \ + f'index_value={self._index_value!r})' From c007a1139d96172d2949f8c0336e2932c7b02bab Mon Sep 17 00:00:00 2001 From: Simon Humpohl Date: Mon, 5 Sep 2022 16:42:31 +0200 Subject: [PATCH 2/2] First draft of range base measurement window parameterization --- qupulse/pulses/measurement.py | 75 ++++++++++++++++++++++++++++------- 1 file changed, 61 insertions(+), 14 deletions(-) diff --git a/qupulse/pulses/measurement.py b/qupulse/pulses/measurement.py index 0c3f1808..cebafe11 100644 --- a/qupulse/pulses/measurement.py +++ b/qupulse/pulses/measurement.py @@ -2,22 +2,39 @@ from numbers import Real import itertools -from qupulse.expressions import Expression +from qupulse.expressions import ExpressionScalar, ExpressionLike from qupulse.utils.types import MeasurementWindow from qupulse.parameter_scope import Scope +from qupulse.pulses.range import ParametrizedRange, RangeLike, RangeScope -MeasurementDeclaration = Tuple[str, Union[Expression, str, Real], Union[Expression, str, Real]] +MeasurementRangeDeclaration = Tuple[str, ExpressionLike, ExpressionLike, Tuple[str, RangeLike]] + +MeasurementDeclaration = Union[ + Tuple[str, ExpressionLike, ExpressionLike], + MeasurementRangeDeclaration +] class MeasurementDefiner: def __init__(self, measurements: Optional[List[MeasurementDeclaration]]): - if measurements is None: - self._measurement_windows = [] - else: - self._measurement_windows = [(name, - begin if isinstance(begin, Expression) else Expression(begin), - length if isinstance(length, Expression) else Expression(length)) - for name, begin, length in measurements] + self._measurement_windows: Tuple[Tuple[str, ExpressionScalar, ExpressionScalar], ...] = () + self._measurement_ranges: Tuple[Tuple[str, ExpressionScalar, ExpressionScalar, Tuple[str, RangeLike]], ...] = () + + if measurements is not None: + measurement_windows = [] + measurement_ranges = [] + + for declaration in measurements: + if len(declaration) == 3: + name, begin, length = declaration + measurement_windows.append((name, ExpressionScalar(begin), ExpressionScalar(length))) + else: + name, begin, length, (idx_name, param_range) = declaration + measurement_ranges.append((name, ExpressionScalar(begin), ExpressionScalar(length), + (idx_name, ParametrizedRange.from_range_like(param_range)))) + + self._measurement_windows = tuple(measurement_windows) + self._measurement_ranges = tuple(measurement_ranges) for _, _, length in self._measurement_windows: if (length < 0) is True: raise ValueError('Measurement window length may not be negative') @@ -59,6 +76,24 @@ def get_measurement_windows(self, begin_val, length_val) ) + + for name, begin, length, (idx_name, idx_range) in self._measurement_ranges: + name = measurement_mapping[name] + if name is None: + continue + + for idx_val in idx_range.to_range(parameters): + scope = RangeScope(parameters, idx_name, idx_val) + + begin_val = begin.evaluate_in_scope(scope) + length_val = length.evaluate_in_scope(scope) + + resulting_windows.append( + (name, + begin_val, + length_val) + ) + return resulting_windows @property @@ -72,13 +107,25 @@ def measurement_parameters(self) -> Set[str]: @property def measurement_declarations(self) -> List[MeasurementDeclaration]: """Return the measurements that are directly declared on `self`. Does _not_ visit eventual child objects.""" - return [(name, - begin.original_expression, - length.original_expression) - for name, begin, length in self._measurement_windows] + measurements = [] + measurements.extend((name, + begin.original_expression, + length.original_expression) + for name, begin, length in self._measurement_windows) + measurements.extend((name, begin.original_expression, length.original_expression, + (idx_name, idx_range.to_tuple())) + for name, begin, length, (idx_name, idx_range) in self._measurement_ranges) + return measurements @property def measurement_names(self) -> Set[str]: """Return the names of measurements that are directly declared on `self`. Does _not_ visit eventual child objects.""" - return {name for name, *_ in self._measurement_windows} + return {name for name, *_ in itertools.chain(self._measurement_windows, self._measurement_ranges)} + + def __hash__(self): + return hash((self._measurement_windows, self._measurement_ranges)) + + def __eq__(self, other): + return (self._measurement_windows == getattr(other, '_measurement_windows', None) and + self._measurement_ranges == getattr(other, '_measurement_ranges', None))