From b07f68ac6ad53ad072e4b89a991b776229bf4aa2 Mon Sep 17 00:00:00 2001 From: gitpushoriginmaster <32514250+gitpushoriginmaster@users.noreply.github.com> Date: Sat, 17 Dec 2022 16:40:28 +0200 Subject: [PATCH] Remove dead code and many minor fixes --- basic_pitch/commandline_printing.py | 7 +- basic_pitch/inference.py | 42 +++--- basic_pitch/layers/nnaudio.py | 192 ++++++++++++++-------------- basic_pitch/layers/signal.py | 13 +- basic_pitch/models.py | 33 ++--- basic_pitch/note_creation.py | 5 +- basic_pitch/predict.py | 2 +- tests/test_inference.py | 2 +- tests/test_nn.py | 5 +- 9 files changed, 156 insertions(+), 145 deletions(-) diff --git a/basic_pitch/commandline_printing.py b/basic_pitch/commandline_printing.py index 6cf12488..c2561559 100644 --- a/basic_pitch/commandline_printing.py +++ b/basic_pitch/commandline_printing.py @@ -43,7 +43,7 @@ def generating_file_message(output_type: str) -> None: def file_saved_confirmation(output_type: str, save_path: Union[pathlib.Path, str]) -> None: - """Print a confirmation that the file was saved succesfully + """Print a confirmation that the file was saved successfully Args: output_type: The kind of file that is being generated. @@ -53,15 +53,16 @@ def file_saved_confirmation(output_type: str, save_path: Union[pathlib.Path, str print(f" {OUTPUT_EMOJIS[output_type]} Saved to {save_path}") -def failed_to_save(output_type: str, save_path: Union[pathlib.Path, str]) -> None: +def failed_to_save(output_type: str, save_path: Union[pathlib.Path, str], e: Exception) -> None: """Print a failure to save message Args: output_type: The kind of file that is being generated. save_path: The path to output file. + e: The exception that was raised. """ - print(f"\n🚨 Failed to save {output_type.replace('_', ' ').lower()} to {save_path} \n") + print(f"\n🚨 Failed to save {output_type.replace('_', ' ').lower()} to {save_path} due to {e}\n") @contextmanager diff --git a/basic_pitch/inference.py b/basic_pitch/inference.py index 4e31c237..9601c64e 100644 --- a/basic_pitch/inference.py +++ b/basic_pitch/inference.py @@ -54,7 +54,7 @@ def window_audio_file(audio_original: Tensor, hop_size: int) -> Tuple[Tensor, Li window_times: list of {'start':.., 'end':...} objects (times in seconds) """ - from tensorflow import expand_dims # imporing this here so the module loads faster + from tensorflow import expand_dims # importing this here so the module loads faster audio_windowed = expand_dims( signal.frame(audio_original, AUDIO_N_SAMPLES, hop_size, pad_end=True, pad_value=0), @@ -85,7 +85,7 @@ def get_audio_input( length of original audio file, in frames, BEFORE padding. """ - assert overlap_len % 2 == 0, "overlap_length must be even, got {}".format(overlap_len) + assert overlap_len % 2 == 0, f"overlap_length must be even, got {overlap_len}" audio_original, _ = librosa.load(str(audio_path), sr=AUDIO_SAMPLE_RATE, mono=True) @@ -250,8 +250,8 @@ def save_note_events( save_path: The location we're saving it """ - with open(save_path, "w") as fhandle: - writer = csv.writer(fhandle, delimiter=",") + with open(save_path, "w") as f_handle: + writer = csv.writer(f_handle, delimiter=",") writer.writerow(["start_time_s", "end_time_s", "pitch_midi", "velocity", "pitch_bend"]) for start_time, end_time, note_number, amplitude, pitch_bend in note_events: row = [start_time, end_time, note_number, int(np.round(127 * amplitude))] @@ -280,8 +280,8 @@ def predict( onset_threshold: Minimum energy required for an onset to be considered present. frame_threshold: Minimum energy requirement for a frame to be considered present. minimum_note_length: The minimum allowed note length in frames. - minimum_freq: Minimum allowed output frequency, in Hz. If None, all frequencies are used. - maximum_freq: Maximum allowed output frequency, in Hz. If None, all frequencies are used. + minimum_frequency: Minimum allowed output frequency, in Hz. If None, all frequencies are used. + maximum_frequency: Maximum allowed output frequency, in Hz. If None, all frequencies are used. multiple_pitch_bends: If True, allow overlapping notes in midi file to have pitch bends. melodia_trick: Use the melodia post-processing step. debug_file: An optional path to output debug data to. Useful for testing/verification. @@ -364,15 +364,15 @@ def predict_and_save( audio_path_list: List of file paths for the audio to run inference on. output_directory: Directory to output MIDI and all other outputs derived from the model to. save_midi: True to save midi. - sonify_midi: Whether or not to render audio from the MIDI and output it to a file. + sonify_midi: Whether to render audio from the MIDI and output it to a file. save_model_outputs: True to save contours, onsets and notes from the model prediction. save_notes: True to save note events. model_path: Path to load the Keras saved model from. Can be local or on GCS. onset_threshold: Minimum energy required for an onset to be considered present. frame_threshold: Minimum energy requirement for a frame to be considered present. minimum_note_length: The minimum allowed note length in frames. - minimum_freq: Minimum allowed output frequency, in Hz. If None, all frequencies are used. - maximum_freq: Maximum allowed output frequency, in Hz. If None, all frequencies are used. + minimum_frequency: Minimum allowed output frequency, in Hz. If None, all frequencies are used. + maximum_frequency: Maximum allowed output frequency, in Hz. If None, all frequencies are used. multiple_pitch_bends: If True, allow overlapping notes in midi file to have pitch bends. melodia_trick: Use the melodia post-processing step. debug_file: An optional path to output debug data to. Useful for testing/verification. @@ -400,34 +400,40 @@ def predict_and_save( model_output_path = build_output_path(audio_path, output_directory, OutputExtensions.MODEL_OUTPUT_NPZ) try: np.savez(model_output_path, basic_pitch_model_output=model_output) + except Exception as e: + failed_to_save(OutputExtensions.MODEL_OUTPUT_NPZ.name, model_output_path, e) + else: file_saved_confirmation(OutputExtensions.MODEL_OUTPUT_NPZ.name, model_output_path) - except Exception: - failed_to_save(OutputExtensions.MODEL_OUTPUT_NPZ.name, model_output_path) if save_midi: midi_path = build_output_path(audio_path, output_directory, OutputExtensions.MIDI) try: midi_data.write(str(midi_path)) + except Exception as e: + failed_to_save(OutputExtensions.MIDI.name, midi_path, e) + else: file_saved_confirmation(OutputExtensions.MIDI.name, midi_path) - except Exception: - failed_to_save(OutputExtensions.MIDI.name, midi_path) + if sonify_midi: midi_sonify_path = build_output_path(audio_path, output_directory, OutputExtensions.MIDI_SONIFICATION) try: infer.sonify_midi(midi_data, midi_sonify_path, sr=sonification_samplerate) + except Exception as e: + failed_to_save(OutputExtensions.MIDI_SONIFICATION.name, midi_sonify_path, e) + else: file_saved_confirmation(OutputExtensions.MIDI_SONIFICATION.name, midi_sonify_path) - except Exception: - failed_to_save(OutputExtensions.MIDI_SONIFICATION.name, midi_sonify_path) if save_notes: note_events_path = build_output_path(audio_path, output_directory, OutputExtensions.NOTE_EVENTS) try: save_note_events(note_events, note_events_path) + except Exception as e: + failed_to_save(OutputExtensions.NOTE_EVENTS.name, note_events_path, e) + else: file_saved_confirmation(OutputExtensions.NOTE_EVENTS.name, note_events_path) - except Exception: - failed_to_save(OutputExtensions.NOTE_EVENTS.name, note_events_path) - except Exception: + + except IOError: print("🚨 Something went wrong πŸ˜” - see the traceback below for details.") print("") print(traceback.format_exc()) diff --git a/basic_pitch/layers/nnaudio.py b/basic_pitch/layers/nnaudio.py index 45edb65f..9b544af1 100644 --- a/basic_pitch/layers/nnaudio.py +++ b/basic_pitch/layers/nnaudio.py @@ -61,9 +61,9 @@ def create_lowpass_filter( return tf.constant(filter_kernel, dtype=dtype) -def next_power_of_2(A: int) -> int: - """A helper function to calculate the next nearest number to the power of 2.""" - return int(np.ceil(np.log2(A))) +def next_power_of_2(a: int) -> int: + """A helper function to calculate the next the nearest number to the power of 2.""" + return int(np.ceil(np.log2(a))) def early_downsample( @@ -75,7 +75,7 @@ def early_downsample( ) -> Tuple[Union[float, int], int, int]: """Return new sampling rate and hop length after early downsampling""" downsample_count = early_downsample_count(nyquist_hz, filter_cutoff_hz, hop_length, n_octaves) - downsample_factor = 2 ** (downsample_count) + downsample_factor = 2 ** downsample_count hop_length //= downsample_factor # Getting new hop_length new_sr = sr / float(downsample_factor) # Getting new sampling rate @@ -97,15 +97,15 @@ def early_downsample_count(nyquist_hz: float, filter_cutoff_hz: float, hop_lengt def get_early_downsample_params( - sr: Union[float, int], hop_length: int, fmax_t: float, Q: float, n_octaves: int, dtype: tf.dtypes.DType + sr: Union[float, int], hop_length: int, f_max_t: float, q: float, n_octaves: int, dtype: tf.dtypes.DType ) -> Tuple[Union[float, int], int, float, np.array, bool]: """Compute downsampling parameters used for early downsampling""" window_bandwidth = 1.5 # for hann window - filter_cutoff = fmax_t * (1 + 0.5 * window_bandwidth / Q) + filter_cutoff = f_max_t * (1 + 0.5 * window_bandwidth / q) sr, hop_length, downsample_factor = early_downsample(sr, hop_length, n_octaves, sr // 2, filter_cutoff) if downsample_factor != 1: - earlydownsample = True + early_downsample = True early_downsample_filter = create_lowpass_filter( band_center=1 / downsample_factor, kernel_length=256, @@ -114,19 +114,19 @@ def get_early_downsample_params( ) else: early_downsample_filter = None - earlydownsample = False + early_downsample = False - return sr, hop_length, downsample_factor, early_downsample_filter, earlydownsample + return sr, hop_length, downsample_factor, early_downsample_filter, early_downsample -def get_window_dispatch(window: Union[str, Tuple[str, float]], N: int, fftbins: bool = True) -> np.array: +def get_window_dispatch(window: Union[str, Tuple[str, float]], n: int, fft_bins: bool = True) -> np.array: if isinstance(window, str): - return scipy.signal.get_window(window, N, fftbins=fftbins) + return scipy.signal.get_window(window, n, fftbins=fft_bins) elif isinstance(window, tuple): if window[0] == "gaussian": assert window[1] >= 0 - sigma = np.floor(-N / 2 / np.sqrt(-2 * np.log(10 ** (-window[1] / 20)))) - return scipy.signal.get_window(("gaussian", sigma), N, fftbins=fftbins) + sigma = np.floor(-n / 2 / np.sqrt(-2 * np.log(10 ** (-window[1] / 20)))) + return scipy.signal.get_window(("gaussian", sigma), n, fftbins=fft_bins) else: Warning("Tuple windows may have undesired behaviour regarding Q factor") elif isinstance(window, float): @@ -136,61 +136,59 @@ def get_window_dispatch(window: Union[str, Tuple[str, float]], N: int, fftbins: def create_cqt_kernels( - Q: float, + q: float, fs: float, - fmin: float, + f_min: float, n_bins: int = 84, bins_per_octave: int = 12, norm: int = 1, window: str = "hann", - fmax: Optional[float] = None, - topbin_check: bool = True, + f_max: Optional[float] = None, + top_bin_check: bool = True, ) -> Tuple[np.array, int, np.array, np.array]: """ Automatically create CQT kernels in time domain """ - fftLen = 2 ** next_power_of_2(np.ceil(Q * fs / fmin)) + fft_len = 2 ** next_power_of_2(np.ceil(q * fs / f_min)) - if (fmax is not None) and (n_bins is None): - n_bins = np.ceil(bins_per_octave * np.log2(fmax / fmin)) # Calculate the number of bins - freqs = fmin * 2.0 ** (np.r_[0:n_bins] / np.float(bins_per_octave)) + if (f_max is not None) and (n_bins is None): + n_bins = np.ceil(bins_per_octave * np.log2(f_max / f_min)) # Calculate the number of bins + freqs = f_min * 2.0 ** (np.r_[0:n_bins] / np.float(bins_per_octave)) - elif (fmax is None) and (n_bins is not None): - freqs = fmin * 2.0 ** (np.r_[0:n_bins] / np.float(bins_per_octave)) + elif (f_max is None) and (n_bins is not None): + freqs = f_min * 2.0 ** (np.r_[0:n_bins] / np.float(bins_per_octave)) else: - warnings.warn("If fmax is given, n_bins will be ignored", SyntaxWarning) - n_bins = np.ceil(bins_per_octave * np.log2(fmax / fmin)) # Calculate the number of bins - freqs = fmin * 2.0 ** (np.r_[0:n_bins] / np.float(bins_per_octave)) + warnings.warn("If f_max is given, n_bins will be ignored", SyntaxWarning) + n_bins = np.ceil(bins_per_octave * np.log2(f_max / f_min)) # Calculate the number of bins + freqs = f_min * 2.0 ** (np.r_[0:n_bins] / np.float(bins_per_octave)) - if np.max(freqs) > fs / 2 and topbin_check is True: - raise ValueError( - "The top bin {}Hz has exceeded the Nyquist frequency, please reduce the n_bins".format(np.max(freqs)) - ) + if np.max(freqs) > fs / 2 and top_bin_check is True: + raise ValueError(f"The top bin {np.max(freqs)}Hz has exceeded the Nyquist frequency, please reduce the n_bins") - tempKernel = np.zeros((int(n_bins), int(fftLen)), dtype=np.complex64) + temp_kernel = np.zeros((int(n_bins), int(fft_len)), dtype=np.complex64) - lengths = np.ceil(Q * fs / freqs) + lengths = np.ceil(q * fs / freqs) for k in range(0, int(n_bins)): freq = freqs[k] - _l = np.ceil(Q * fs / freq) + _l = np.ceil(q * fs / freq) # Centering the kernels, pad more zeros on RHS - start = int(np.ceil(fftLen / 2.0 - _l / 2.0)) - int(_l % 2) + start = int(np.ceil(fft_len / 2.0 - _l / 2.0)) - int(_l % 2) sig = ( - get_window_dispatch(window, int(_l), fftbins=True) + get_window_dispatch(window, int(_l), fft_bins=True) * np.exp(np.r_[-_l // 2 : _l // 2] * 1j * 2 * np.pi * freq / fs) / _l ) if norm: # Normalizing the filter # Trying to normalize like librosa - tempKernel[k, start : start + int(_l)] = sig / np.linalg.norm(sig, norm) + temp_kernel[k, start : start + int(_l)] = sig / np.linalg.norm(sig, norm) else: - tempKernel[k, start : start + int(_l)] = sig + temp_kernel[k, start : start + int(_l)] = sig - return tempKernel, fftLen, lengths, freqs + return temp_kernel, fft_len, lengths, freqs def get_cqt_complex( @@ -201,20 +199,20 @@ def get_cqt_complex( padding: tf.keras.layers.Layer, ) -> tf.Tensor: """Multiplying the STFT result with the cqt_kernel, check out the 1992 CQT paper [1] - for how to multiple the STFT result with the CQT kernel + for how to multiply the STFT result with the CQT kernel [2] Brown, Judith C.C. and Miller Puckette. β€œAn efficient algorithm for the calculation of a constant Q transform.” (1992).""" try: x = padding(x) # When center is True, we need padding at the beginning and ending - except Exception: + except ValueError: warnings.warn( f"\ninput size = {x.shape}\tkernel size = {cqt_kernels_real.shape[-1]}\n" "padding with reflection mode might not be the best choice, try using constant padding", UserWarning, ) x = tf.pad(x, (cqt_kernels_real.shape[-1] // 2, cqt_kernels_real.shape[-1] // 2)) - CQT_real = tf.transpose( + cqt_real = tf.transpose( tf.nn.conv1d( tf.transpose(x, [0, 2, 1]), tf.transpose(cqt_kernels_real, [2, 1, 0]), @@ -223,7 +221,7 @@ def get_cqt_complex( ), [0, 2, 1], ) - CQT_imag = -tf.transpose( + cqt_imag = -tf.transpose( tf.nn.conv1d( tf.transpose(x, [0, 2, 1]), tf.transpose(cqt_kernels_imag, [2, 1, 0]), @@ -233,7 +231,7 @@ def get_cqt_complex( [0, 2, 1], ) - return tf.stack((CQT_real, CQT_imag), axis=-1) + return tf.stack((cqt_real, cqt_imag), axis=-1) def downsampling_by_n(x: tf.Tensor, filter_kernel: tf.Tensor, n: float, match_torch_exactly: bool = True) -> tf.Tensor: @@ -243,7 +241,7 @@ def downsampling_by_n(x: tf.Tensor, filter_kernel: tf.Tensor, n: float, match_to and the filter kernel is expected to have shape `(num_output_channels,)` (i.e.: 1D) If match_torch_exactly is passed, we manually pad the input rather than having TensorFlow do so with "SAME". - The result is subtly different than Torch's output, but it is compatible with TensorFlow Lite (as of v2.4.1). + The result is subtly different from Torch's output, but it is compatible with TensorFlow Lite (as of v2.4.1). """ if match_torch_exactly: @@ -275,7 +273,7 @@ def __init__(self, padding: Union[int, Tuple[int]] = 1, **kwargs: Any): super(ReflectionPad1D, self).__init__(**kwargs) def compute_output_shape(self, s: List[int]) -> Tuple[int, int, int]: - return (s[0], s[1], s[2] + 2 * self.padding if isinstance(self.padding, int) else self.padding[0]) + return s[0], s[1], s[2] + 2 * self.padding if isinstance(self.padding, int) else self.padding[0] def call(self, x: tf.Tensor) -> tf.Tensor: return tf.pad(x, [[0, 0], [0, 0], [self.padding, self.padding]], "REFLECT") @@ -293,7 +291,7 @@ def __init__(self, padding: Union[int, Tuple[int]] = 1, value: int = 0, **kwargs super(ConstantPad1D, self).__init__(**kwargs) def compute_output_shape(self, s: List[int]) -> Tuple[int, int, int]: - return (s[0], s[1], s[2] + 2 * self.padding if isinstance(self.padding, int) else self.padding[0]) + return s[0], s[1], s[2] + 2 * self.padding if isinstance(self.padding, int) else self.padding[0] def call(self, x: tf.Tensor) -> tf.Tensor: return tf.pad(x, [[0, 0], [0, 0], [self.padding, self.padding]], "CONSTANT", self.value) @@ -333,8 +331,7 @@ def pad_center(data: np.ndarray, size: int, axis: int = -1, **kwargs: Any) -> np Length to pad `data` axis : int Axis along which to pad and center the data - kwargs : additional keyword arguments - arguments passed to `np.pad()` + kwargs : additional keyword passed to `np.pad()` Returns ------- @@ -362,7 +359,7 @@ def pad_center(data: np.ndarray, size: int, axis: int = -1, **kwargs: Any) -> np lengths[axis] = (lpad, int(size - n - lpad)) if lpad < 0: - raise ValueError(("Target size ({:d}) must be at least input size ({:d})").format(size, n)) + raise ValueError(f"Target size {size} must be at least input size {n}") return np.pad(data, lengths, **kwargs) @@ -399,9 +396,9 @@ class CQT2010v2(tf.keras.layers.Layer): Setting the correct sampling rate is very important for calculating the correct frequency. hop_length : int The hop (or stride) size. Default value is 512. - fmin : float + f_min : float The frequency for the lowest CQT bin. Default is 32.70Hz, which coresponds to the note C0. - fmax : float + f_max : float The frequency for the highest CQT bin. Default is ``None``, therefore the higest CQT bin is inferred from the ``n_bins`` and ``bins_per_octave``. If ``fmax`` is not ``None``, then the argument ``n_bins`` will be ignored and ``n_bins`` will be calculated automatically. @@ -430,10 +427,6 @@ class CQT2010v2(tf.keras.layers.Layer): 'Complex' will return the STFT result in complex number, shape = ``(num_samples, freq_bins, time_steps, 2)``; 'Phase' will return the phase of the STFT reuslt, shape = ``(num_samples, freq_bins,time_steps, 2)``. The complex number is stored as ``(real, imag)`` in the last axis. Default value is 'Magnitude'. - verbose : bool - If ``True``, it shows layer information. If ``False``, it suppresses all prints. - device : str - Choose which device to initialize this layer. Default value is 'cpu'. Returns ------- spectrogram : tf.Tensor @@ -451,8 +444,8 @@ def __init__( self, sr: int = 22050, hop_length: int = 512, - fmin: float = 32.70, - fmax: Optional[float] = None, + f_min: float = 32.70, + f_max: Optional[float] = None, n_bins: int = 84, filter_scale: int = 1, bins_per_octave: int = 12, @@ -460,17 +453,18 @@ def __init__( basis_norm: int = 1, window: str = "hann", pad_mode: str = "reflect", - earlydownsample: bool = True, + early_downsample: bool = True, trainable: bool = False, output_format: str = "Magnitude", match_torch_exactly: bool = True, ): super().__init__() + self.sample_rate: Union[float, int] = sr self.hop_length = hop_length - self.fmin = fmin - self.fmax = fmax + self.f_min = f_min + self.f_max = f_max self.n_bins = n_bins self.filter_scale = filter_scale self.bins_per_octave = bins_per_octave @@ -478,20 +472,34 @@ def __init__( self.basis_norm = basis_norm self.window = window self.pad_mode = pad_mode - self.earlydownsample = earlydownsample + self.early_downsample = early_downsample self.trainable = trainable self.output_format = output_format self.match_torch_exactly = match_torch_exactly self.normalization_type = "librosa" + self.lowpass_filter = None + self.n_octaves = None + self.f_min_t = None + self.early_downsample_filter = None + self.downsample_factor = None + self.n_fft = None + self.frequencies = None + self.lengths = None + self.basis = None + self.cqt_kernels_real = None + self.cqt_kernels_imag = None + self.padding = None + self.reshape_input = None + def get_config(self) -> Any: config = super().get_config().copy() config.update( { "sample_rate": self.sample_rate, "hop_length": self.hop_length, - "fmin": self.fmin, - "fmax": self.fmax, + "f_min": self.f_min, + "f_max": self.f_max, "n_bins": self.n_bins, "filter_scale": self.filter_scale, "bins_per_octave": self.bins_per_octave, @@ -500,7 +508,7 @@ def get_config(self) -> Any: "window": self.window, "pad_mode": self.pad_mode, "output_format": self.output_format, - "earlydownsample": self.earlydownsample, + "early_downsample": self.early_downsample, "trainable": self.trainable, "match_torch_exactly": self.match_torch_exactly, } @@ -509,7 +517,7 @@ def get_config(self) -> Any: def build(self, input_shape: tf.TensorShape) -> None: # This will be used to calculate filter_cutoff and creating CQT kernels - Q = float(self.filter_scale) / (2 ** (1 / self.bins_per_octave) - 1) + q = float(self.filter_scale) / (2 ** (1 / self.bins_per_octave) - 1) self.lowpass_filter = create_lowpass_filter(band_center=0.5, kernel_length=256, transition_bandwidth=0.001) @@ -519,30 +527,28 @@ def build(self, input_shape: tf.TensorShape) -> None: self.n_octaves = int(np.ceil(float(self.n_bins) / self.bins_per_octave)) # Calculate the lowest frequency bin for the top octave kernel - self.fmin_t = self.fmin * 2 ** (self.n_octaves - 1) + self.f_min_t = self.f_min * 2 ** (self.n_octaves - 1) remainder = self.n_bins % self.bins_per_octave if remainder == 0: # Calculate the top bin frequency - fmax_t = self.fmin_t * 2 ** ((self.bins_per_octave - 1) / self.bins_per_octave) + f_max_t = self.f_min_t * 2 ** ((self.bins_per_octave - 1) / self.bins_per_octave) else: # Calculate the top bin frequency - fmax_t = self.fmin_t * 2 ** ((remainder - 1) / self.bins_per_octave) + f_max_t = self.f_min_t * 2 ** ((remainder - 1) / self.bins_per_octave) - self.fmin_t = fmax_t / 2 ** (1 - 1 / self.bins_per_octave) # Adjusting the top minium bins - if fmax_t > self.sample_rate / 2: - raise ValueError( - "The top bin {}Hz has exceeded the Nyquist frequency, please reduce the n_bins".format(fmax_t) - ) + self.f_min_t = f_max_t / 2 ** (1 - 1 / self.bins_per_octave) # Adjusting the top minium bins + if f_max_t > self.sample_rate / 2: + raise ValueError(f"The top bin {f_max_t}Hz has exceeded the Nyquist frequency, please reduce the n_bins") - if self.earlydownsample is True: # Do early downsampling if this argument is True + if self.early_downsample is True: # Do early downsampling if this argument is True ( self.sample_rate, self.hop_length, self.downsample_factor, early_downsample_filter, - self.earlydownsample, - ) = get_early_downsample_params(self.sample_rate, self.hop_length, fmax_t, Q, self.n_octaves, self.dtype) + self.early_downsample, + ) = get_early_downsample_params(self.sample_rate, self.hop_length, f_max_t, q, self.n_octaves, self.dtype) self.early_downsample_filter = early_downsample_filter else: @@ -550,23 +556,23 @@ def build(self, input_shape: tf.TensorShape) -> None: # Preparing CQT kernels basis, self.n_fft, _, _ = create_cqt_kernels( - Q, + q, self.sample_rate, - self.fmin_t, + self.f_min_t, n_filters, self.bins_per_octave, norm=self.basis_norm, - topbin_check=False, + top_bin_check=False, ) # For the normalization in the end # The freqs returned by create_cqt_kernels cannot be used # Since that returns only the top octave bins # We need the information for all freq bin - freqs = self.fmin * 2.0 ** (np.r_[0 : self.n_bins] / np.float(self.bins_per_octave)) + freqs = self.f_min * 2.0 ** (np.r_[0: self.n_bins] / np.float(self.bins_per_octave)) self.frequencies = freqs - self.lengths = np.ceil(Q * self.sample_rate / freqs) + self.lengths = np.ceil(q * self.sample_rate / freqs) self.basis = basis # NOTE(psobot): this is where the implementation here starts to differ from CQT2010. @@ -599,49 +605,49 @@ def build(self, input_shape: tf.TensorShape) -> None: def call(self, x: tf.Tensor) -> tf.Tensor: x = self.reshape_input(x) # type: ignore - if self.earlydownsample is True: + if self.early_downsample is True: x = downsampling_by_n(x, self.early_downsample_filter, self.downsample_factor, self.match_torch_exactly) hop = self.hop_length # Getting the top octave CQT - CQT = get_cqt_complex(x, self.cqt_kernels_real, self.cqt_kernels_imag, hop, self.padding) + cqt = get_cqt_complex(x, self.cqt_kernels_real, self.cqt_kernels_imag, hop, self.padding) x_down = x # Preparing a new variable for downsampling for _ in range(self.n_octaves - 1): hop = hop // 2 x_down = downsampling_by_n(x_down, self.lowpass_filter, 2, self.match_torch_exactly) - CQT1 = get_cqt_complex(x_down, self.cqt_kernels_real, self.cqt_kernels_imag, hop, self.padding) - CQT = tf.concat((CQT1, CQT), axis=1) + cqt1 = get_cqt_complex(x_down, self.cqt_kernels_real, self.cqt_kernels_imag, hop, self.padding) + cqt = tf.concat((cqt1, cqt), axis=1) - CQT = CQT[:, -self.n_bins :, :] # Removing unwanted bottom bins + cqt = cqt[:, -self.n_bins :, :] # Removing unwanted bottom bins # Normalizing the output with the downsampling factor, 2**(self.n_octaves-1) is make it # same mag as 1992 - CQT = CQT * self.downsample_factor + cqt = cqt * self.downsample_factor # Normalize again to get same result as librosa if self.normalization_type == "librosa": - CQT *= tf.math.sqrt(tf.cast(self.lengths.reshape((-1, 1, 1)), self.dtype)) + cqt *= tf.math.sqrt(tf.cast(self.lengths.reshape((-1, 1, 1)), self.dtype)) elif self.normalization_type == "convolutional": pass elif self.normalization_type == "wrap": - CQT *= 2 + cqt *= 2 else: raise ValueError("The normalization_type %r is not part of our current options." % self.normalization_type) # Transpose the output to match the output of the other spectrogram layers. if self.output_format.lower() == "magnitude": # Getting CQT Amplitude - return tf.transpose(tf.math.sqrt(tf.math.reduce_sum(tf.math.pow(CQT, 2), axis=-1)), [0, 2, 1]) + return tf.transpose(tf.math.sqrt(tf.math.reduce_sum(tf.math.pow(cqt, 2), axis=-1)), [0, 2, 1]) elif self.output_format.lower() == "complex": - return CQT + return cqt elif self.output_format.lower() == "phase": - phase_real = tf.math.cos(tf.math.atan2(CQT[:, :, :, 1], CQT[:, :, :, 0])) - phase_imag = tf.math.sin(tf.math.atan2(CQT[:, :, :, 1], CQT[:, :, :, 0])) + phase_real = tf.math.cos(tf.math.atan2(cqt[:, :, :, 1], cqt[:, :, :, 0])) + phase_imag = tf.math.sin(tf.math.atan2(cqt[:, :, :, 1], cqt[:, :, :, 0])) return tf.stack((phase_real, phase_imag), axis=-1) diff --git a/basic_pitch/layers/signal.py b/basic_pitch/layers/signal.py index f5f629f0..4f244f46 100644 --- a/basic_pitch/layers/signal.py +++ b/basic_pitch/layers/signal.py @@ -50,7 +50,7 @@ def __init__( If False, then D[:, t] begins at y[t * hop_length]. pad_mode: Padding to use if center is True. One of "CONSTANT", "REFLECT", or "SYMMETRIC" (case-insensitive). name: Name of the layer. - dtype: Type used in calcuation. + dtype: Type used in calculation. """ super().__init__(trainable=False, name=name, dtype=dtype, dynamic=False) self.fft_length = fft_length @@ -62,12 +62,14 @@ def __init__( self.center = center self.pad_mode = pad_mode + self.spec = None + def build(self, input_shape: tf.TensorShape) -> None: if self.window_length < self.fft_length: lpad = (self.fft_length - self.window_length) // 2 rpad = self.fft_length - self.window_length - lpad - def padded_window(window_length: int, dtype: tf.dtypes.DType = tf.float32) -> tf.Tensor: + def padded_window(dtype: tf.dtypes.DType = tf.float32) -> tf.Tensor: # This is a trick to match librosa's way of handling window lengths < their fft_lengths # In that case the window is 0 padded such that the window is centered around 0s # In the Tensorflow case, the window is computed, multiplied against the frame and then @@ -159,6 +161,13 @@ class NormalizedLog(tf.keras.layers.Layer): This layer adds 1e-10 to all values as a way to avoid NaN math. """ + def __init__( + self, trainable=True, name=None, dtype=None, dynamic=False, **kwargs + ): + super().__init__(trainable, name, dtype, dynamic, kwargs) + + self.squeeze_batch = None + def build(self, input_shape: tf.Tensor) -> None: self.squeeze_batch = lambda batch: batch rank = input_shape.rank diff --git a/basic_pitch/models.py b/basic_pitch/models.py index 0e726510..cb08ab04 100644 --- a/basic_pitch/models.py +++ b/basic_pitch/models.py @@ -60,7 +60,7 @@ def weighted_transcription_loss( Args: y_true: The true labels. y_pred: The predicted labels. - label_smoothing: Smoothing factor. Squeezes labels towards 0.5. + label_smoothing: Smoothing factor. Squeeze labels towards 0.5. positive_weight: Weighting factor for the positive labels. Returns: @@ -87,8 +87,8 @@ def onset_loss( """ Args: - weighted: Whether or not to use a weighted cross entropy loss. - label_smoothing: Smoothing factor. Squeezes labels towards 0.5. + weighted: Whether to use a weighted cross entropy loss. + label_smoothing: Smoothing factor. Squeeze labels towards 0.5. positive_weight: Weighting factor for the positive labels. Returns: @@ -108,8 +108,8 @@ def loss(label_smoothing: float = 0.2, weighted: bool = False, positive_weight: the loss for the contour, note and onset posteriorgrams. Args: - label_smoothing: Smoothing factor. Squeezes labels towards 0.5. - weighted: Whether or not to use a weighted cross entropy loss. + label_smoothing: Smoothing factor. Squeeze labels towards 0.5. + weighted: Whether to use a weighted cross entropy loss. positive_weight: Weighting factor for the positive labels. Returns: @@ -134,7 +134,7 @@ def _kernel_constraint() -> tf.keras.constraints.UnitNorm: return tf.keras.constraints.UnitNorm(axis=[0, 1, 2]) -def get_cqt(inputs: tf.Tensor, n_harmonics: int, use_batchnorm: bool) -> tf.Tensor: +def get_cqt(inputs: tf.Tensor, n_harmonics: int, use_batch_norm: bool) -> tf.Tensor: """Calculate the CQT of the input audio. Input shape: (batch, number of audio samples, 1) @@ -144,7 +144,7 @@ def get_cqt(inputs: tf.Tensor, n_harmonics: int, use_batchnorm: bool) -> tf.Tens inputs: The audio input. n_harmonics: The number of harmonics to capture above the maximum output frequency. Used to calculate the number of semitones for the CQT. - use_batchnorm: If True, applies batch normalization after computing the CQT + use_batch_norm: If True, applies batch normalization after computing the CQT Returns: The log-normalized CQT of the input audio. @@ -159,20 +159,19 @@ def get_cqt(inputs: tf.Tensor, n_harmonics: int, use_batchnorm: bool) -> tf.Tens x = nnaudio.CQT( sr=AUDIO_SAMPLE_RATE, hop_length=FFT_HOP, - fmin=ANNOTATIONS_BASE_FREQUENCY, + f_min=ANNOTATIONS_BASE_FREQUENCY, n_bins=n_semitones * CONTOURS_BINS_PER_SEMITONE, bins_per_octave=12 * CONTOURS_BINS_PER_SEMITONE, )(x) x = signal.NormalizedLog()(x) x = tf.expand_dims(x, -1) - if use_batchnorm: + if use_batch_norm: x = tfkl.BatchNormalization()(x) return x def model( n_harmonics: int = 8, - n_filters_contour: int = 32, n_filters_onsets: int = 32, n_filters_notes: int = 32, no_contours: bool = False, @@ -181,10 +180,9 @@ def model( Args: n_harmonics: The number of harmonics to use in the harmonic stacking layer. - n_filters_contour: Number of filters for the contour convolutional layer. n_filters_onsets: Number of filters for the onsets convolutional layer. n_filters_notes: Number of filters for the notes convolutional layer. - no_contours: Whether or not to include contours in the output. + no_contours: Whether to include contours in the output. """ # input representation inputs = tf.keras.Input(shape=(AUDIO_N_SAMPLES, 1)) # (batch, time, ch) @@ -204,17 +202,6 @@ def model( )(x) # contour layers - fully convolutional - x_contours = tfkl.Conv2D( - n_filters_contour, - (5, 5), - padding="same", - kernel_initializer=_initializer(), - kernel_constraint=_kernel_constraint(), - )(x) - - x_contours = tfkl.BatchNormalization()(x_contours) - x_contours = tfkl.ReLU()(x_contours) - x_contours = tfkl.Conv2D( 8, (3, 3 * 13), diff --git a/basic_pitch/note_creation.py b/basic_pitch/note_creation.py index 2f77e36d..7679f2c4 100644 --- a/basic_pitch/note_creation.py +++ b/basic_pitch/note_creation.py @@ -66,6 +66,7 @@ def model_output_to_notes( } representing the output of the basic pitch model. onset_thresh: Minimum amplitude of an onset activation to be considered an onset. + frame_thresh: Minimum amplitude of a frame activation to be considered a frame. infer_onsets: If True, add additional onsets when there are large differences in frame amplitudes. min_note_len: The minimum allowed note length in frames. min_freq: Minimum allowed output frequency, in Hz. If None, all frequencies are used. @@ -216,7 +217,7 @@ def note_events_to_midi( """Create a pretty_midi object from note events Args: - note_events : list of tuples [(start_time_seconds, end_time_seconds, pitch_midi, amplitude)] + note_events_with_pitch_bends : list of tuples [(start_time_seconds, end_time_seconds, pitch_midi, amplitude)] where amplitude is a number between 0 and 1 multiple_pitch_bends : If True, allow overlapping notes to have pitch bends Note: this will assign each pitch to its own midi instrument, as midi does not yet @@ -475,7 +476,7 @@ def output_to_notes_polyphonic( i -= 1 i_start = i + 1 + k # go back to frame above threshold - assert i_start >= 0, "{}".format(i_start) + assert i_start >= 0, str(i_start) assert i_end < n_frames if i_end - i_start <= min_note_len: diff --git a/basic_pitch/predict.py b/basic_pitch/predict.py index c6f9fb2f..677c66d9 100644 --- a/basic_pitch/predict.py +++ b/basic_pitch/predict.py @@ -73,7 +73,7 @@ def main() -> None: "--minimum-note-length", type=float, default=58, - help="The minimum allowed note length, in miliseconds.", + help="The minimum allowed note length, in milliseconds.", ) parser.add_argument( "--minimum-frequency", diff --git a/tests/test_inference.py b/tests/test_inference.py index 23a477e5..f083cb49 100644 --- a/tests/test_inference.py +++ b/tests/test_inference.py @@ -37,7 +37,7 @@ def test_predict(self) -> None: test_audio_path, ICASSP_2022_MODEL_PATH, ) - assert set(model_output.keys()) == set(["note", "onset", "contour"]) + assert set(model_output.keys()) == {"note", "onset", "contour"} assert model_output["note"].shape == model_output["onset"].shape assert isinstance(midi_data, pretty_midi.PrettyMIDI) lowest_supported_midi = 21 diff --git a/tests/test_nn.py b/tests/test_nn.py index 7e2b1c05..6df231d1 100644 --- a/tests/test_nn.py +++ b/tests/test_nn.py @@ -35,13 +35,14 @@ class TestHarmonicStacking(unittest.TestCase): - def _audio_data_gen(self) -> Iterator[Tuple[np.array, np.array]]: + @staticmethod + def _audio_data_gen() -> Iterator[Tuple[np.array, np.array]]: while True: audio = np.random.uniform(size=(BATCH_SIZE, AUDIO_N_SAMPLES, 1)).astype(np.float32) output = np.random.uniform(size=(BATCH_SIZE, ANNOT_N_FRAMES, ANNOTATIONS_N_SEMITONES * 3, 1)).astype( np.float32 ) - yield (audio, output) + yield audio, output def _dummy_dataset(self) -> tf.data.Dataset: ds = tf.data.Dataset.from_generator(