diff --git a/cycspec_simulator/baseband.py b/cycspec_simulator/baseband.py index 9bd9a75..3d0e80b 100644 --- a/cycspec_simulator/baseband.py +++ b/cycspec_simulator/baseband.py @@ -67,6 +67,7 @@ def sample(self, n_samples, t_start=None, interp=lerp, dtype=np.float32): a template array and an array of sample points at which to evaluate the interpolated function (extended periodically). `fft_interp` and `lerp` (the default) both work. + dtype: Numpy dtype to use for samples. """ if t_start is None: t_start = self.predictor.epoch @@ -82,7 +83,6 @@ def sample(self, n_samples, t_start=None, interp=lerp, dtype=np.float32): I = interp(self.template.I, binno) noise1 = complex_white_noise(n_samples, self.rng, dtype) noise2 = complex_white_noise(n_samples, self.rng, dtype) - noise3 = complex_white_noise(n_samples, self.rng, dtype) if self.template.full_stokes: Q = interp(self.template.Q, binno) U = interp(self.template.U, binno) @@ -106,8 +106,10 @@ def sample(self, n_samples, t_start=None, interp=lerp, dtype=np.float32): data = BasebandData(A, B, t_start, self.feed_poln, self.bandwidth, self.obsfreq) for filtr in self.filters: data = filtr.apply(data) - A += np.sqrt(self.noise_level)*noise3 - B += np.sqrt(self.noise_level)*noise3 + + noise3 = complex_white_noise(data.n_samples, self.rng, dtype) + data.A += np.sqrt(self.noise_level)*noise3 + data.B += np.sqrt(self.noise_level)*noise3 return data diff --git a/cycspec_simulator/cycspec.py b/cycspec_simulator/cycspec.py index fb6f826..54990c5 100644 --- a/cycspec_simulator/cycspec.py +++ b/cycspec_simulator/cycspec.py @@ -199,7 +199,6 @@ def cycfold_cpu(data, ncyc, nbin, phase_predictor, include_end=False, print(f"Total products accumulated: {4*np.sum(samples)}") throughput = 4*np.sum(samples)/(timer.elapsed/1000) print(f"Throughput: {throughput:g} products/sec.") - print(corr_AA.shape) corr_CR = (corr_AB + corr_BA)/2 corr_CI = (corr_AB - corr_BA)/2j pspec_AA = np.fft.fftshift(np.fft.hfft(corr_AA, axis=0), axes=0) diff --git a/cycspec_simulator/filterbank.py b/cycspec_simulator/filterbank.py new file mode 100644 index 0000000..2cafeec --- /dev/null +++ b/cycspec_simulator/filterbank.py @@ -0,0 +1,138 @@ +import numpy as np +from scipy import signal + +from .baseband import BasebandData +from .interpolation import lerp + +def pfb(x, nchan, ntap, window="hamming", fs=1.0): + """ + Channelize a time series using a polyphase filterbank + + Parameters + ---------- + x : ndarray + The input time series. + nchan : int + The number of channels to form. + ntap : int + The number of PFB taps to use. + window : str + The windowing function to use for the PFB coefficients. + fs : float + The sampling frequency of the input data. + + Returns + ------- + x_pfb : ndarray + The channelized data + + Notes + ----- + If the input data are real-valued then only positive frequencies + are returned. + """ + h = signal.firwin(ntap*nchan,cutoff=1.0/nchan, window="rectangular") + h *= signal.get_window(window, ntap*nchan) + nwin = x.shape[0]//ntap//nchan + x = x[:nwin*ntap*nchan].reshape((nwin*ntap, nchan)).T + h = h.reshape((ntap, nchan)).T + xs = np.zeros((nchan, ntap*(nwin-1)+1), dtype=x.dtype) + for ii in range(ntap*(nwin-1)+1): + xw = h*x[:, ii:ii+ntap] + xs[:, ii] = xw.sum(axis=1) + xs = xs.T + xpfb = np.fft.fft(xs, nchan, axis=1) + xpfb *= np.sqrt(nchan) + + return xpfb + +def channelize(data, nchan, ntap=24, window='hamming'): + """ + Channelize a BasebandData object using a polyphase filterbank. + + Parameters + ---------- + data: BasebandData object + nchan: Number of channels into which to split the data + ntap: Number of polyphase fiterbank taps + window: Window function used for polyphase filterbank + (string interpreted by `scipy.signal.get_window()`) + + Returns + ------- + channelized_data: ChannelizedData object + """ + A_pfb = pfb(data.A, nchan=nchan, ntap=ntap, window=window, fs=data.bandwidth) + B_pfb = pfb(data.B, nchan=nchan, ntap=ntap, window=window, fs=data.bandwidth) + freqs = np.fft.fftshift(np.fft.fftfreq(nchan, d=1/data.bandwidth)) + freqs += data.obsfreq + A_pfb = np.fft.fftshift(A_pfb.T, axes=0) + B_pfb = np.fft.fftshift(B_pfb.T, axes=0) + return ChannelizedData( + A_pfb, B_pfb, start_time=data.start_time, + feed_poln=data.feed_poln, + chan_bw=data.bandwidth/nchan, freqs=freqs, + ) + + +class ChannelizedModel: + """ + A model representing data which has been channelized using a polyphase filterbank. + """ + def __init__(self, baseband_model, nchan, ntap=24, window='hamming'): + """ + Create a channelized model. + + Parameters + ---------- + baseband_model: The underlying baseband data model + nchan: Number of channels into which to split the data + ntap: Number of polyphase fiterbank taps + window: Window function used for polyphase filterbank + (string interpreted by `scipy.signal.get_window()`) + """ + self.baseband_model = baseband_model + self.nchan = nchan + self.ntap = ntap + self.window = window + + @property + def chan_bw(self): + return self.baseband_model.bandwidth/self.nchan + + def sample(self, n_samples, t_start=None, interp=lerp, dtype=np.float32): + """ + Simulate a specified number of samples in each channel. + + Parameters + ---------- + + n_samples: The number of samples to use. + t_start: Time of the first sample, as a Time object. If `None`, + the predictor epoch will be used. + interp: Interpolation function to use. Passed to `BasebandModel.sample()`. + dtype: Numpy dtype to use for samples. + """ + data = self.baseband_model.sample(n_samples, t_start, interp, dtype) + return channelize(data, self.nchan, self.ntap, self.window) + +class ChannelizedData: + """ + Channelized data produced by a polyphase filterbank. + """ + def __init__(self, A, B, start_time, feed_poln, chan_bw, freqs): + self.A = A + self.B = B + self.start_time = start_time + self.feed_poln = feed_poln + self.chan_bw = chan_bw + self.freqs = freqs + + def extract_channel(self, ichan): + return BasebandData( + self.A[ichan], self.B[ichan], start_time=self.start_time, + feed_poln=self.feed_poln, bandwidth=self.chan_bw, obsfreq=self.freqs[ichan], + ) + + def __getitem__(self, ichan): + return self.extract_channel(ichan)