From 8a0b41a8a9436c305feb0dc3fbf8b0a127042943 Mon Sep 17 00:00:00 2001 From: Fabian Ulbricht Date: Wed, 17 Jul 2024 00:11:41 +0200 Subject: [PATCH] added interference & tests, multitap rayleigh and utilities Signed-off-by: Fabian Ulbricht --- doc/source/api/channel.wireless.rst | 11 ++ doc/source/api/ofdm.rst | 21 +++- doc/source/api/utils.rst | 4 + sionna/channel/__init__.py | 2 +- sionna/channel/rayleigh_block_fading.py | 116 +++++++++++++++++ sionna/ofdm/__init__.py | 2 + sionna/ofdm/interference.py | 151 +++++++++++++++++++++++ sionna/ofdm/interference_estimation.py | 83 +++++++++++++ sionna/utils/misc.py | 27 ++++ test/unit/ofdm/test_ofdm_interference.py | 97 +++++++++++++++ 10 files changed, 510 insertions(+), 4 deletions(-) create mode 100644 sionna/ofdm/interference.py create mode 100644 sionna/ofdm/interference_estimation.py create mode 100644 test/unit/ofdm/test_ofdm_interference.py diff --git a/doc/source/api/channel.wireless.rst b/doc/source/api/channel.wireless.rst index f3404b48..1196d17c 100644 --- a/doc/source/api/channel.wireless.rst +++ b/doc/source/api/channel.wireless.rst @@ -421,10 +421,21 @@ cir_to_ofdm_channel Rayleigh block fading ====================== +RayleighBlockFading +--------------------- + .. autoclass:: sionna.channel.RayleighBlockFading :members: :exclude-members: call, build +MultiTapRayleighBlockFading +---------------------------- + +.. autoclass:: sionna.channel.MultiTapRayleighBlockFading + :members: + :exclude-members: call, build + + 3GPP 38.901 channel models =========================== diff --git a/doc/source/api/ofdm.rst b/doc/source/api/ofdm.rst index 0bcbbdee..f7610318 100644 --- a/doc/source/api/ofdm.rst +++ b/doc/source/api/ofdm.rst @@ -21,10 +21,10 @@ that automatically generates orthogonal pilot transmissions for all transmitters and streams. Additionally, the module contains layers for channel estimation, precoding, -equalization, and detection, +equalization, detection, and interference simulation and estimation, such as the :class:`~sionna.ofdm.LSChannelEstimator`, the -:class:`~sionna.ofdm.ZFPrecoder`, and the :class:`~sionna.ofdm.LMMSEEqualizer` and -:class:`~sionna.ofdm.LinearDetector`. +:class:`~sionna.ofdm.ZFPrecoder`, the :class:`~sionna.ofdm.LMMSEEqualizer` the +:class:`~sionna.ofdm.LinearDetector`, and :class: `sionna.ofdm.OFDMInterferenceSource`. These are good starting points for the development of more advanced algorithms and provide robust baselines for benchmarking. @@ -310,3 +310,18 @@ MMSEPICDetector .. autoclass:: sionna.ofdm.MMSEPICDetector :exclude-members: call, build :members: + + +Interference +*************** + +OFDMInterferenceSource +----------------------- +.. autoclass:: sionna.ofdm.OFDMInterferenceSource + :exclude-members: call, build + +CovarianceEstimator +-------------------- +.. autoclass:: sionna.ofdm.CovarianceEstimator + :exclude-members: call, build + :members: diff --git a/doc/source/api/utils.rst b/doc/source/api/utils.rst index 62f33fa8..799895c5 100644 --- a/doc/source/api/utils.rst +++ b/doc/source/api/utils.rst @@ -122,6 +122,10 @@ complex_normal -------------- .. autofunction:: sionna.utils.complex_normal +complex_uniform_disk +---------------------- +.. autofunction:: sionna.utils.complex_uniform_disk + log2 ---- .. autofunction:: sionna.utils.log2 diff --git a/sionna/channel/__init__.py b/sionna/channel/__init__.py index 8d70bf70..4b9e02ec 100644 --- a/sionna/channel/__init__.py +++ b/sionna/channel/__init__.py @@ -18,7 +18,7 @@ from .apply_time_channel import ApplyTimeChannel from .ofdm_channel import OFDMChannel from .time_channel import TimeChannel -from .rayleigh_block_fading import RayleighBlockFading +from .rayleigh_block_fading import RayleighBlockFading, MultiTapRayleighBlockFading from .cir_dataset import CIRDataset from .constants import * from .utils import deg_2_rad, rad_2_deg, wrap_angle_0_360, drop_uts_in_sector, relocate_uts, set_3gpp_scenario_parameters, gen_single_sector_topology, gen_single_sector_topology_interferers, subcarrier_frequencies, cir_to_ofdm_channel, cir_to_time_channel, time_to_ofdm_channel, time_lag_discrete_time_channel, exp_corr_mat, one_ring_corr_mat, time_frequency_vector diff --git a/sionna/channel/rayleigh_block_fading.py b/sionna/channel/rayleigh_block_fading.py index 25f308cf..ec0bc09a 100644 --- a/sionna/channel/rayleigh_block_fading.py +++ b/sionna/channel/rayleigh_block_fading.py @@ -116,3 +116,119 @@ def __call__(self, batch_size, num_time_steps, sampling_frequency=None): # Tile the response over the block h = tf.tile(h, [1, 1, 1, 1, 1, 1, num_time_steps]) return h, delays + + + +# TODO should we introduce a parameter for time delay between paths? +class MultiTapRayleighBlockFading(ChannelModel): + # pylint: disable=line-too-long + r""" + Generate channel impulse responses corresponding to a Rayleigh block + fading channel model. + + The channel impulse responses generated are formed of M paths with delays of + :math:`\tau_m = \frac{m}{\mathtt{sampling\_frequency}}, 0 \leq m \leq M-1`, + and a normally distributed fading coefficient. + All time steps of a batch example share the same channel coefficient + (block fading). + + This class can be used in conjunction with the classes that simulate the + channel response in time or frequency domain, i.e., + :class:`~sionna.channel.OFDMChannel`, + :class:`~sionna.channel.TimeChannel`, + :class:`~sionna.channel.GenerateOFDMChannel`, + :class:`~sionna.channel.ApplyOFDMChannel`, + :class:`~sionna.channel.GenerateTimeChannel`, + :class:`~sionna.channel.ApplyTimeChannel`. + + Parameters + ---------- + + num_rx : int + Number of receivers (:math:`N_R`) + + num_rx_ant : int + Number of antennas per receiver (:math:`N_{RA}`) + + num_tx : int + Number of transmitters (:math:`N_T`) + + num_tx_ant : int + Number of antennas per transmitter (:math:`N_{TA}`) + + num_paths: int + Number of paths (:math:`M`) + + dtype : tf.DType + Complex datatype to use for internal processing and output. + Defaults to `tf.complex64`. + + Input + ----- + batch_size : int + Batch size + + num_time_steps : int + Number of time steps + + sampling_frequency : float + Sampling frequency [Hz] + + Output + ------- + a : [batch size, num_rx, num_rx_ant, num_tx, num_tx_ant, num_paths], num_time_steps], tf.complex + Path coefficients + + tau : [batch size, num_rx, num_tx, num_paths], tf.float + Path delays [s] + """ + + def __init__( self, + num_rx, + num_rx_ant, + num_tx, + num_tx_ant, + num_paths, + dtype=tf.complex64): + + assert dtype.is_complex, "'dtype' must be complex type" + self._dtype = dtype + + # We don't set these attributes as private so that the user can update + # them + self.num_tx = num_tx + self.num_tx_ant = num_tx_ant + self.num_rx = num_rx + self.num_rx_ant = num_rx_ant + self.num_paths = num_paths + + def __call__(self, batch_size, num_time_steps, sampling_frequency): + + # Delays + delays = tf.range(0, self.num_paths, dtype=self._dtype.real_dtype) / sampling_frequency + delays = tf.tile(delays[tf.newaxis, tf.newaxis, tf.newaxis, :], [batch_size, self.num_rx, self.num_tx, 1]) + + # Fading coefficients + std = tf.cast(tf.sqrt(0.5), dtype=self._dtype.real_dtype) + h_real = tf.random.normal(shape=[ batch_size, + self.num_rx, + self.num_rx_ant, + self.num_tx, + self.num_tx_ant, + self.num_paths, + 1], # Same response over the block + stddev=std, + dtype = self._dtype.real_dtype) + h_img = tf.random.normal(shape=[ batch_size, + self.num_rx, + self.num_rx_ant, + self.num_tx, + self.num_tx_ant, + self.num_paths, + 1], # Same response over the block + stddev=std, + dtype = self._dtype.real_dtype) + h = tf.complex(h_real, h_img) + # Tile the response over the block + h = tf.tile(h, [1, 1, 1, 1, 1, 1, num_time_steps]) + return h, delays diff --git a/sionna/ofdm/__init__.py b/sionna/ofdm/__init__.py index 20831bb6..1d40b600 100644 --- a/sionna/ofdm/__init__.py +++ b/sionna/ofdm/__init__.py @@ -14,3 +14,5 @@ from .equalization import OFDMEqualizer, LMMSEEqualizer, ZFEqualizer, MFEqualizer from .detection import OFDMDetector, OFDMDetectorWithPrior, MaximumLikelihoodDetector, MaximumLikelihoodDetectorWithPrior, LinearDetector, KBestDetector, EPDetector, MMSEPICDetector from .precoding import ZFPrecoder +from .interference import OFDMInterferenceSource +from .interference_estimation import CovarianceEstimator diff --git a/sionna/ofdm/interference.py b/sionna/ofdm/interference.py new file mode 100644 index 00000000..a75c874c --- /dev/null +++ b/sionna/ofdm/interference.py @@ -0,0 +1,151 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2021-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +"""Class definition for signal interference generation""" + +import sionna +import tensorflow as tf + +class OFDMInterferenceSource(tf.keras.layers.Layer): + # pylint: disable=line-too-long + r"""OFDMInterferenceSource(density_subcarriers=1.0, sampler="uniform", domain="freq", cyclic_prefix_length=0, dtype=tf.complex64) + + Layer to simulate interference transmitter symbols for OFDM systems in frequency or time domain. + These can then be sent through a OFDM channel to simulate interference at the receiver. + + The transmit symbols can be sampled form different distributions or constellations, to be configured through the parameter `sampler`. + + When `domain` is set to "freq", the interference generated is meant to be used in frequency domain simulations. + The simulation thus implicitly assumes that the interfering devices are synchronized with the receiver, and send their signal with a cyclic prefix of sufficient length. + + When `domain` is set to "time", the interference is generated in time domain. A cyclic prefix may be added to the interference symbols, which can be controlled through the parameter `cyclic_prefix_length`. + + This class supports simulation of narrowband interference through the parameter `density_subcarriers`, which controls the fraction of subcarriers on which the interference takes place. + The subcarriers on which the interference takes place are randomly selected for each call to the layer. + + Parameters + ---------- + density_subcarriers: float + Fraction of subcarriers which are effected by interference. Must be in between 0 and 1. + The number of subcarriers effected is rounded to the next integer, and the subcarriers are randomly selected for each call to the layer. + sampler: str, instance of :class:`~sionna.mapping.Constellation`, or callable + If str, one of ["uniform", "gaussian"]. + If instance of :class:`~sionna.mapping.Constellation`, the constellation is sampled randomly. + If callable, the callable is used as sampling function. It should have signature ``(shape, var, dtype) -> tf.Tensor``. + The sampled symbols will always have an expected mean energy of 1. + domain: str + Domain in which the interference is generated. One of ["time", "freq"]. + cyclic_prefix_length: int + Length of the cyclic prefix. Only relevant if `domain` is "time". + fft_size: int, optional (ignored if `domain` is "freq") + FFT size. Ignored for frequency domain, as this is separately provided by shape. + This parameter is relevant in time domain, as a cyclic prefix might be added in, and sparsity over subcarriers mandates the FFT size. + dtype: tf.complex + Data type of the interference symbols. Defaults to tf.complex64. + + Input + ----- + shape: 1D tensor/array/list, int + List of integers, specifying the shape of the interference symbols to be generated. + Should consist of `(batch_size, num_tx, num_tx_ant, num_ofdm_symbols, fft_size)` in frequency domain, + and `(batch_size, num_tx, num_tx_ant, num_time_samples)` in time domain. + + + Output + ------ + x_itf: ``output_shape``, ``dtype`` + Interference symbols in time or frequency domain, depending on the parameter `domain`. ```output_shape``` is ```shape``` if in frequency domain, and `(batch_size, num_tx, num_tx_ant, num_ofdm_symbols * (fft_size + cyclic_prefix_length))` if in time domain. + + """ + def __init__(self, + density_subcarriers=1.0, + sampler="uniform", + domain="freq", + cyclic_prefix_length=0, + fft_size=None, + dtype=tf.complex64, + **kwargs): + + super().__init__(trainable=False, dtype=dtype, **kwargs) + self._density_subcarriers = density_subcarriers + self._domain = domain + self._cyclic_prefix_length = cyclic_prefix_length + self._fft_size = fft_size + self._dtype_as_dtype = tf.as_dtype(self.dtype) + # if sampler is string, we use the corresponding function. Otherwise assign the function directly + self._sample_function = self._sampler_to_callable(sampler, self._dtype_as_dtype) + if self._domain == "time": + self._modulator = sionna.ofdm.OFDMModulator(cyclic_prefix_length) + self._check_settings() + + def _check_settings(self): + assert self._density_subcarriers >= 0.0 and self._density_subcarriers <= 1.0, "density_subcarriers must be in [0, 1]" + assert self._domain in ["time", "freq"] + if self._domain == "time": + assert self._cyclic_prefix_length >= 0, "cyclic_prefix_length must be non-negative" + assert self._fft_size is not None, "fft_size must be provided in time domain" + assert self._dtype_as_dtype.is_complex + + def call(self, inputs): + if self._domain == "freq": + self._fft_size = inputs[-1] + num_ofdm_symbols = inputs[-2] + else: + num_ofdm_symbols = tf.math.ceil(tf.cast(inputs[-1], tf.float32) / (self._fft_size + self._cyclic_prefix_length)) + x_itf = self._sample_function(tf.concat([inputs[:3], [num_ofdm_symbols, self._fft_size]], axis=0)) + x_itf = self._make_sparse(x_itf) + if self._domain == "time": + x_itf = self._modulator(x_itf) + x_itf = x_itf[..., :inputs[-1]] + return x_itf + + def _make_sparse(self, data): + shape = tf.shape(data) + num_subcarriers = shape[-1] + num_nonzero_subcarriers = tf.cast(tf.round(self._density_subcarriers * tf.cast(num_subcarriers, tf.float32)), tf.int32) + + # create sparse masks + subcarrier_mask = tf.concat([tf.ones([num_nonzero_subcarriers], dtype=self._dtype), + tf.zeros([num_subcarriers - num_nonzero_subcarriers], dtype=self._dtype)], axis=0) + subcarrier_mask = tf.random.shuffle(subcarrier_mask) + + return data * subcarrier_mask + + def _sampler_to_callable(self, sampler, dtype): + # pylint: disable=line-too-long + r""" + Returns callable which samples from a constellation or a distribution. + + Input + ----- + sampler : str | Constellation | callable + String in `["uniform", "gaussian"]`, an instance of :class:`~sionna.mapping.Constellation`, or function with signature ``(shape, dtype) -> tf.Tensor``, + where elementwise :math:`E[|x|^2] = 1`. + dtype : tf.Dtype + Defines the datatype the returned function should return. + + Output + ------ + callable + Function with signature ``shape -> tf.Tensor`` which returns a tensor of shape ``shape`` with dtype ``dtype``. + """ + if isinstance(sampler, sionna.mapping.Constellation): + assert sampler.dtype == dtype + sampler.normalize = True + ret = sionna.utils.SymbolSource('custom', constellation=sampler, dtype=dtype) + else: + if isinstance(sampler, str): + if sampler == "uniform": + f = sionna.utils.complex_uniform_disk + elif sampler == "gaussian": + f = sionna.utils.complex_normal + else: + raise ValueError(f"Unknown sampler {sampler}") + elif callable(sampler): + f = sampler + else: + raise ValueError(f"Unknown sampler {sampler}") + # pylint: disable=unnecessary-lambda-assignment + ret = lambda s: f(shape=s, var=1.0, dtype=dtype) + return ret diff --git a/sionna/ofdm/interference_estimation.py b/sionna/ofdm/interference_estimation.py new file mode 100644 index 00000000..65af3ed3 --- /dev/null +++ b/sionna/ofdm/interference_estimation.py @@ -0,0 +1,83 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2021-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +"""Class definition for receiver covariance estimation""" + +import tensorflow as tf +import numpy as np + +# TODO add linear interpolation +class CovarianceEstimator(tf.keras.layers.Layer): + # pylint: disable=line-too-long + r"""CovarianceEstimator(pilot_pattern, subcarrier_interpolation='nn', dtype=tf.complex64) + + Uses all resource elements which are masked in all streams to estimate the covariance matrix of noise or interference over the whole resource grid. + A covariance matrix is estimated for each subcarrier, and then interpolated between the subcarriers using the method specified in the constructor. + + Parameters + ---------- + pilot_pattern : PilotPattern + An instance of :class:`~sionna.ofdm.PilotPattern`. + + subcarrier_interpolation : str + Method to interpolate between subcarriers. Currently only 'nn' (nearest neighbor) is supported. + + dtype: tf.complex + Data type of the input signal. Defaults to tf.complex64. + + Input + ----- + y : [batch_size, num_rx, num_rx_ant, num_ofdm_symbols, num_effective_subcarriers], tf.complex + Frequency domain receive signal after DC- and guard carrier removal. + + Output + ------ + R : [batch_size, num_rx, num_ofdm_symbols, num_effective_subcarriers, num_rx_ant, num_rx_ant], tf.complex + Estimated covariance matrix for resource element. + + """ + def __init__(self, pilot_pattern, subcarrier_interpolation='nn', dtype=tf.complex64, **kwargs): + super().__init__(dtype=dtype, **kwargs) + self._pilot_pattern = pilot_pattern + assert subcarrier_interpolation in ['nn'] + self._subcarrier_interpolation = subcarrier_interpolation + + # 1 where a pilot is zero, else 0 + zero_pilots = np.zeros_like(pilot_pattern.mask, dtype=tf.as_dtype(self.dtype).as_numpy_dtype) + zero_pilots[np.where(pilot_pattern.mask)] = np.reshape(np.abs(pilot_pattern.pilots)==0, -1) + # num_estimation_elements, 2: symbol-index, subcarrier-index + self._estimation_indices = tf.where(np.all(zero_pilots, axis=(0, 1))) + if subcarrier_interpolation == 'nn': + estimated_subcarriers = np.unique(self._estimation_indices[:, 1]) + # for each subcarrier, find the closest estimated subcarrier index + closest_subcarrier_index = np.array([np.argmin(np.abs(estimated_subcarriers - subcarrier)) + for subcarrier in np.arange(pilot_pattern.num_effective_subcarriers)]) + self._closest_subcarrier = tf.gather(estimated_subcarriers, closest_subcarrier_index) + + + def call(self, inputs): + input_shape = tf.shape(inputs) + num_effective_subcarriers = input_shape[-1] + # make RG dimensions first ones, to gather from them + inputs = tf.transpose(inputs, tf.roll(tf.range(tf.rank(inputs)), shift=2, axis=0)) + # gather: [num_estimation_elements, batch_size, num_rx, num_rx_ant] + estimation_signals = tf.gather_nd(inputs, self._estimation_indices) + # compute element-wise covariances + # [num_estimation_elements, batch_size, num_rx, num_rx_ant, num_rx_ant] + cov_elementwise = tf.einsum('...i,...j->...ij', estimation_signals, tf.math.conj(estimation_signals)) + # mean over subcarrier using estimation_indices + # [num_effective_subcarriers, batch_size, num_rx, num_rx_ant, num_rx_ant] + # TODO change when upgrading TF, where tf.math.unsorted_mean support complex numbers + mean_real = tf.math.unsorted_segment_mean(tf.math.real(cov_elementwise), self._estimation_indices[:, 1], num_effective_subcarriers) + mean_imag = tf.math.unsorted_segment_mean(tf.math.imag(cov_elementwise), self._estimation_indices[:, 1], num_effective_subcarriers) + cov_subcarrierwise = tf.complex(mean_real, mean_imag) + # interpolate over subcarrier + if self._subcarrier_interpolation == 'nn': + cov_subcarrierwise = tf.gather(cov_subcarrierwise, self._closest_subcarrier, axis=0) + # transpose to [batch_size, num_rx, num_ofdm_symbols, num_effective_subcarriers, num_rx_ant, num_rx_ant] and return + cov = tf.transpose(cov_subcarrierwise, [1, 2, 0, 3, 4]) + cov = tf.expand_dims(cov, axis=2) + output_shape = tf.concat([input_shape[:2], input_shape[3:], [input_shape[2]], [input_shape[2]]], axis=0) + return tf.broadcast_to(cov, output_shape) + diff --git a/sionna/utils/misc.py b/sionna/utils/misc.py index c1ffc779..ad9c7fd0 100644 --- a/sionna/utils/misc.py +++ b/sionna/utils/misc.py @@ -935,6 +935,33 @@ def complex_normal(shape, var=1.0, dtype=tf.complex64): return x +def complex_uniform_disk(shape, var=1.0, dtype=tf.complex64): + # pylint: disable=line-too-long + r""" + Sample uniform circle on complex plane. + + Input + ----- + shape : tf.shape, or list + The desired shape. + + var : float + The total variance., i.e., each complex dimension has + variance ``var/2``. + + dtype: tf.complex + The desired dtype. Defaults to `tf.complex64`. + + Output + ------ + : ``shape``, ``dtype`` + Tensor of random variables sampled from a uniform complex disk with radius :math:`\sqrt{2 \cdot \text{var}}`. + """ + # Sample theta and R uniform, r = sqrt(2R) + r = tf.complex(tf.random.uniform(shape, minval=0, maxval=var, dtype=dtype.real_dtype), tf.cast(0.0, dtype.real_dtype)) + theta = tf.complex(tf.random.uniform(shape, minval=0, maxval=2*np.pi, dtype=dtype.real_dtype), tf.cast(0.0, dtype.real_dtype)) + return tf.sqrt(2*r)*tf.exp(1j*theta) + ########################################################### # Deprecated aliases that will not be included in the next # major release diff --git a/test/unit/ofdm/test_ofdm_interference.py b/test/unit/ofdm/test_ofdm_interference.py new file mode 100644 index 00000000..ec79fefb --- /dev/null +++ b/test/unit/ofdm/test_ofdm_interference.py @@ -0,0 +1,97 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2021-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# + +try: + import sionna +except ImportError as e: + import sys + sys.path.append("../") + import sionna + +import unittest +import pytest +import numpy as np +import tensorflow as tf + +class TestInterference(unittest.TestCase): + + def setUp(self): + self.batch_size = 64 + self.num_tx = 3 + self.num_tx_ant = 2 + self.fft_size = 72 + self.num_ofdm_symbols = 14 + self.num_bits_per_symbol = 4 + self.dtype = tf.complex128 + + def test_frequency_interference_sampling(self): + # test variance and dtype of all sampling methods + constellation = sionna.mapping.Constellation('qam', self.num_bits_per_symbol, dtype=self.dtype) + for sampler in ["uniform", "gaussian", constellation]: + itf_src = sionna.ofdm.OFDMInterferenceSource(density_subcarriers=1.0, sampler=sampler, domain="freq", dtype=self.dtype) + x_itf = itf_src([self.batch_size, self.num_tx, self.num_tx_ant, self.num_ofdm_symbols, self.fft_size]) + self.assertEqual(self.dtype, x_itf.dtype) + self.assertAlmostEqual(1.0, np.var(x_itf), 2) + + def test_density_subcarriers(self): + density_subcarriers = 0.5 + itf_src = sionna.ofdm.OFDMInterferenceSource(density_subcarriers=density_subcarriers, sampler="uniform", domain="freq", dtype=self.dtype) + x_itf = itf_src([self.batch_size, self.num_tx, self.num_tx_ant, self.num_ofdm_symbols, self.fft_size]) + # count number of non-zero subcarriers per batch + num_non_zero_subcarriers = np.sum(np.sum(np.abs(x_itf), axis=(1, 2, 3)) > 0, axis=-1, dtype=np.float32) + assert np.all(num_non_zero_subcarriers == np.round(density_subcarriers * self.fft_size)) + + def test_time_interference(self): + # check if cyclic prefix is added correctly for different cp-lengths, including edge-cases (0, fft_size, fft_size+1) + for cp_length in [0, 16, self.fft_size, self.fft_size+1]: + itf_src = sionna.ofdm.OFDMInterferenceSource(domain="time", cyclic_prefix_length=cp_length, fft_size=self.fft_size, dtype=self.dtype) + num_time_samples = self.num_ofdm_symbols * (self.fft_size + cp_length) + shape = [self.batch_size, self.num_tx, self.num_tx_ant, num_time_samples] + if cp_length <= self.fft_size: + x_itf = itf_src(shape) + assert x_itf.shape == shape + x_itf_ofdm_symbols = tf.reshape(x_itf, [-1, self.fft_size + cp_length]).numpy() + if cp_length > 0: + assert np.all(x_itf_ofdm_symbols[:, -cp_length:] == x_itf_ofdm_symbols[:, :cp_length]) + else: + with self.assertRaises(Exception): + x_itf = itf_src(shape) + + +class TestCovarianceEstimation(unittest.TestCase): + + def setUp(self): + self.batch_size = 64 + self.num_rx = 1 + self.num_rx_ant = 8 + self.fft_size = 72 + self.num_ofdm_symbols = 14 + self.num_bits_per_symbol = 4 + self.dtype = tf.complex128 + self.y = sionna.utils.SymbolSource("qam", self.num_bits_per_symbol, dtype=self.dtype)([self.batch_size, self.num_rx, self.num_rx_ant, self.num_ofdm_symbols, self.fft_size]) + + def test_only_pilots(self): + r'''When there is only one resource element which is a pilot with energy 0, the estimated covariance matrix should be the same for the whole grid: The covariance matrix of this resource element.''' + + mask = np.zeros([1, 1, self.num_ofdm_symbols, self.fft_size], dtype=bool) + estimation_position = (np.random.randint(0, self.num_ofdm_symbols), np.random.randint(0, self.fft_size)) + mask[0, 0, estimation_position[0], estimation_position[1]] = True + pilots = tf.zeros([1, 1, 1], dtype=self.dtype) + pilot_pattern = sionna.ofdm.PilotPattern(mask, pilots) + cov_est = sionna.ofdm.CovarianceEstimator(pilot_pattern)(self.y) + data_estimation_position = self.y[..., estimation_position[0], estimation_position[1]][..., tf.newaxis] + cov_estimation_position = tf.matmul(data_estimation_position, data_estimation_position, adjoint_b=True) + cov_estimation_position = sionna.utils.insert_dims(cov_estimation_position, 2, 2) + # broadcast the estimation to the whole grid + cov_estimation_position = tf.broadcast_to(cov_estimation_position, [self.batch_size, self.num_rx, self.num_ofdm_symbols, self.fft_size, self.num_rx_ant, self.num_rx_ant]) + assert np.allclose(cov_est.numpy(), cov_estimation_position.numpy(), rtol=1e-2) + + + def test_scattered_pilots(self): + r'''Test that the estimation does not fail when the pilots are scattered across the grid.''' + mask = tf.random.uniform([1, 1, self.num_ofdm_symbols, self.fft_size], maxval=2, dtype=tf.int32) + pilots = tf.zeros([1, 1, tf.math.reduce_sum(mask)], dtype=self.dtype) + pilot_pattern = sionna.ofdm.PilotPattern(mask, pilots) + cov_est = sionna.ofdm.CovarianceEstimator(pilot_pattern)(self.y) \ No newline at end of file