From 5b7dfe5e2217e72cf5a3f4009e7e46f3df20b2a7 Mon Sep 17 00:00:00 2001 From: Mintas Date: Mon, 11 Nov 2024 11:49:24 +0300 Subject: [PATCH] MIR_EVAL_392 : added SegmentType, optimized segment.evaluate checks --- mir_eval/segment.py | 402 ++++++++++++++++++++++---------------------- 1 file changed, 201 insertions(+), 201 deletions(-) diff --git a/mir_eval/segment.py b/mir_eval/segment.py index 7a49d6ff..12a4f992 100644 --- a/mir_eval/segment.py +++ b/mir_eval/segment.py @@ -75,15 +75,23 @@ import collections import warnings +from enum import Enum +import math import numpy as np import scipy.stats import scipy.sparse import scipy.misc import scipy.special +from typing import Sequence, Any from . import util +class SegmentType(Enum): + INTERVAL = 'intervals' + BOUNDARY = 'boundaries' + + def validate_boundary(reference_intervals, estimated_intervals, trim): """Check that the input annotations to a segment boundary estimation metric (i.e. one that only takes in segment intervals) look like valid @@ -102,6 +110,12 @@ def validate_boundary(reference_intervals, estimated_intervals, trim): trim : bool will the start and end events be trimmed? """ + _do_validate_segments(reference_intervals, estimated_intervals, trim, segment_type=SegmentType.INTERVAL) + +def validate_boundaries(reference_boundaries, estimated_boundaries, trim): + _do_validate_segments(reference_boundaries, estimated_boundaries, trim, segment_type=SegmentType.BOUNDARY) + +def _do_validate_segments(reference_segments, estimated_segments, trim, segment_type:SegmentType = SegmentType.INTERVAL): if trim: # If we're trimming, then we need at least 2 intervals min_size = 2 @@ -109,15 +123,32 @@ def validate_boundary(reference_intervals, estimated_intervals, trim): # If we're not trimming, then we only need one interval min_size = 1 - if len(reference_intervals) < min_size: - warnings.warn("Reference intervals are empty.") - - if len(estimated_intervals) < min_size: - warnings.warn("Estimated intervals are empty.") - - for intervals in [reference_intervals, estimated_intervals]: - util.validate_intervals(intervals) - + if len(reference_segments) < min_size: + warnings.warn(f"Reference {segment_type.value} are empty.") + + if len(estimated_segments) < min_size: + warnings.warn(f"Estimated {segment_type.value} are empty.") + + + for segments in [reference_segments, estimated_segments]: + if segment_type is SegmentType.INTERVAL: + util.validate_intervals(segments) + else: + util.validate_events(segments, max_time=math.inf) + +def validated_trimmed_boundaries(reference_intervals, estimated_intervals, trim, segment_type:SegmentType = SegmentType.INTERVAL): + if segment_type is SegmentType.INTERVAL: + validate_boundary(reference_intervals, estimated_intervals, trim) + # Convert intervals to boundaries + reference_intervals = util.intervals_to_boundaries(reference_intervals) + estimated_intervals = util.intervals_to_boundaries(estimated_intervals) + if segment_type is SegmentType.BOUNDARY: + validate_boundaries(reference_intervals, estimated_intervals, trim) + # Suppress the first and last intervals + if trim: + reference_intervals = reference_intervals[1:-1] + estimated_intervals = estimated_intervals[1:-1] + return reference_intervals, estimated_intervals def validate_structure( reference_intervals, reference_labels, estimated_intervals, estimated_labels @@ -167,7 +198,8 @@ def validate_structure( def detection( - reference_intervals, estimated_intervals, window=0.5, beta=1.0, trim=False + reference_intervals, estimated_intervals, window=0.5, beta=1.0, trim=False, + segment_type:SegmentType = SegmentType.INTERVAL ): """Boundary detection hit-rate. @@ -196,11 +228,11 @@ def detection( Parameters ---------- - reference_intervals : np.ndarray, shape=(n, 2) + reference_intervals : np.ndarray, shape=(n, 2) if segment_type is INTERVAL else shape=(n,1) reference segment intervals, in the format returned by :func:`mir_eval.io.load_intervals` or :func:`mir_eval.io.load_labeled_intervals`. - estimated_intervals : np.ndarray, shape=(m, 2) + estimated_intervals : np.ndarray, shape=(m, 2) if segment_type is INTERVAL else shape=(m,1) estimated segment intervals, in the format returned by :func:`mir_eval.io.load_intervals` or :func:`mir_eval.io.load_labeled_intervals`. @@ -215,6 +247,8 @@ def detection( if ``True``, the first and last boundary times are ignored. Typically, these denote start (0) and end-markers. (Default value = False) + segment_type : SegmentType + type of segments provided: INTERVAL by default, BOUNDARY for 1d points Returns ------- @@ -225,17 +259,13 @@ def detection( f_measure : float F-measure (weighted harmonic mean of ``precision`` and ``recall``) """ - validate_boundary(reference_intervals, estimated_intervals, trim) + reference_boundaries, estimated_boundaries = validated_trimmed_boundaries(reference_intervals, + estimated_intervals, + trim, segment_type=segment_type) - # Convert intervals to boundaries - reference_boundaries = util.intervals_to_boundaries(reference_intervals) - estimated_boundaries = util.intervals_to_boundaries(estimated_intervals) - - # Suppress the first and last intervals - if trim: - reference_boundaries = reference_boundaries[1:-1] - estimated_boundaries = estimated_boundaries[1:-1] + return _compute_detection(reference_boundaries, estimated_boundaries, window, beta, trim) +def _compute_detection(reference_boundaries, estimated_boundaries, window=0.5, beta=1.0, trim=False): # If we have no boundaries, we get no score. if len(reference_boundaries) == 0 or len(estimated_boundaries) == 0: return 0.0, 0.0, 0.0 @@ -250,7 +280,7 @@ def detection( return precision, recall, f_measure -def deviation(reference_intervals, estimated_intervals, trim=False): +def deviation(reference_intervals, estimated_intervals, trim=False, segment_type:SegmentType = SegmentType.INTERVAL): """Compute the median deviations between reference and estimated boundary times. @@ -284,18 +314,14 @@ def deviation(reference_intervals, estimated_intervals, trim=False): estimated_to_reference : float median time from each estimated boundary to the closest reference boundary - """ - validate_boundary(reference_intervals, estimated_intervals, trim) - - # Convert intervals to boundaries - reference_boundaries = util.intervals_to_boundaries(reference_intervals) - estimated_boundaries = util.intervals_to_boundaries(estimated_intervals) - - # Suppress the first and last intervals - if trim: - reference_boundaries = reference_boundaries[1:-1] - estimated_boundaries = estimated_boundaries[1:-1] - + """ + reference_boundaries, estimated_boundaries = validated_trimmed_boundaries(reference_intervals, + estimated_intervals, + trim, segment_type=segment_type) + + return _compute_deviation(reference_boundaries, estimated_boundaries) + +def _compute_deviation(reference_boundaries, estimated_boundaries): # If we have no boundaries, we get no score. if len(reference_boundaries) == 0 or len(estimated_boundaries) == 0: return np.nan, np.nan @@ -308,6 +334,50 @@ def deviation(reference_intervals, estimated_intervals, trim=False): return reference_to_estimated, estimated_to_reference +def _labeled_structure_metric(metric_calculator, + reference_intervals, reference_labels, + estimated_intervals, estimated_labels, + frame_size=0.1, **calculator_kwargs): + """_summary_ + + Parameters + ---------- + metric_calculator : callable, function, lambda + function that takes actual calculation of labeled_structure metric; + should have signature: metric_calculator(reference_indices, estimated_indices, **kwargs) + """ + y_ref, y_est = _structure_to_indices(reference_intervals, reference_labels, + estimated_intervals, estimated_labels, + frame_size) + return metric_calculator(y_ref, y_est, **calculator_kwargs) + +def _structure_to_indices(reference_intervals, reference_labels, + estimated_intervals, estimated_labels, + frame_size=0.1): + validate_structure( + reference_intervals, reference_labels, estimated_intervals, estimated_labels + ) + + # Check for empty annotations. Don't need to check labels because + # validate_structure makes sure they're the same size as intervals + if reference_intervals.size == 0 or estimated_intervals.size == 0: + return 0.0, 0.0, 0.0 + + # Generate the cluster labels + y_ref = util.intervals_to_samples( + reference_intervals, reference_labels, sample_size=frame_size + )[-1] + + y_ref = util.index_labels(y_ref)[0] + + # Map to index space + y_est = util.intervals_to_samples( + estimated_intervals, estimated_labels, sample_size=frame_size + )[-1] + + y_est = util.index_labels(y_est)[0] + return y_ref, y_est + def pairwise( reference_intervals, reference_labels, @@ -368,41 +438,24 @@ def pairwise( F-measure of detecting whether frames belong in the same cluster """ - validate_structure( - reference_intervals, reference_labels, estimated_intervals, estimated_labels - ) - - # Check for empty annotations. Don't need to check labels because - # validate_structure makes sure they're the same size as intervals - if reference_intervals.size == 0 or estimated_intervals.size == 0: - return 0.0, 0.0, 0.0 - - # Generate the cluster labels - y_ref = util.intervals_to_samples( - reference_intervals, reference_labels, sample_size=frame_size - )[-1] - - y_ref = util.index_labels(y_ref)[0] - - # Map to index space - y_est = util.intervals_to_samples( - estimated_intervals, estimated_labels, sample_size=frame_size - )[-1] - - y_est = util.index_labels(y_est)[0] + return _labeled_structure_metric(_compute_pairwise, + reference_intervals, reference_labels, + estimated_intervals, estimated_labels, + frame_size=frame_size, beta=beta) +def _compute_pairwise(reference_indices, estimated_indices, beta=1.0): # Build the reference label agreement matrix - agree_ref = np.equal.outer(y_ref, y_ref) + agree_ref = np.equal.outer(reference_indices, reference_indices) # Count the unique pairs - n_agree_ref = (agree_ref.sum() - len(y_ref)) / 2.0 + n_agree_ref = (agree_ref.sum() - len(reference_indices)) / 2.0 # Repeat for estimate - agree_est = np.equal.outer(y_est, y_est) - n_agree_est = (agree_est.sum() - len(y_est)) / 2.0 + agree_est = np.equal.outer(estimated_indices, estimated_indices) + n_agree_est = (agree_est.sum() - len(estimated_indices)) / 2.0 # Find where they agree matches = np.logical_and(agree_ref, agree_est) - n_matches = (matches.sum() - len(y_ref)) / 2.0 + n_matches = (matches.sum() - len(reference_indices)) / 2.0 precision = n_matches / n_agree_est recall = n_matches / n_agree_ref @@ -410,7 +463,6 @@ def pairwise( return precision, recall, f_measure - def rand_index( reference_intervals, reference_labels, @@ -458,42 +510,24 @@ def rand_index( length (in seconds) of frames for clustering (Default value = 0.1) beta : float > 0 - beta value for F-measure - (Default value = 1.0) + deprecated parameter - to be removed in 0.9 !!! Returns ------- rand_index : float > 0 Rand index """ - validate_structure( - reference_intervals, reference_labels, estimated_intervals, estimated_labels - ) - - # Check for empty annotations. Don't need to check labels because - # validate_structure makes sure they're the same size as intervals - if reference_intervals.size == 0 or estimated_intervals.size == 0: - return 0.0, 0.0, 0.0 - - # Generate the cluster labels - y_ref = util.intervals_to_samples( - reference_intervals, reference_labels, sample_size=frame_size - )[-1] - - y_ref = util.index_labels(y_ref)[0] - - # Map to index space - y_est = util.intervals_to_samples( - estimated_intervals, estimated_labels, sample_size=frame_size - )[-1] - - y_est = util.index_labels(y_est)[0] + return _labeled_structure_metric(_compute_random_index, + reference_intervals, reference_labels, + estimated_intervals, estimated_labels, + frame_size=frame_size) +def _compute_random_index(reference_indices, estimated_indices): # Build the reference label agreement matrix - agree_ref = np.equal.outer(y_ref, y_ref) + agree_ref = np.equal.outer(reference_indices, reference_indices) # Repeat for estimate - agree_est = np.equal.outer(y_est, y_est) + agree_est = np.equal.outer(estimated_indices, estimated_indices) # Find where they agree matches_pos = np.logical_and(agree_ref, agree_est) @@ -501,15 +535,15 @@ def rand_index( # Find where they disagree matches_neg = np.logical_and(~agree_ref, ~agree_est) - n_pairs = len(y_ref) * (len(y_ref) - 1) / 2.0 + n_pairs = len(reference_indices) * (len(reference_indices) - 1) / 2.0 - n_matches_pos = (matches_pos.sum() - len(y_ref)) / 2.0 + n_matches_pos = (matches_pos.sum() - len(reference_indices)) / 2.0 n_matches_neg = matches_neg.sum() / 2.0 rand = (n_matches_pos + n_matches_neg) / n_pairs return rand - +# TODO : this is a subject for further optimisation refactoring def _contingency_matrix(reference_indices, estimated_indices): """Compute the contingency matrix of a true labeling vs an estimated one. @@ -637,30 +671,10 @@ def ari( Adjusted Rand index between segmentations. """ - validate_structure( - reference_intervals, reference_labels, estimated_intervals, estimated_labels - ) - - # Check for empty annotations. Don't need to check labels because - # validate_structure makes sure they're the same size as intervals - if reference_intervals.size == 0 or estimated_intervals.size == 0: - return 0.0, 0.0, 0.0 - - # Generate the cluster labels - y_ref = util.intervals_to_samples( - reference_intervals, reference_labels, sample_size=frame_size - )[-1] - - y_ref = util.index_labels(y_ref)[0] - - # Map to index space - y_est = util.intervals_to_samples( - estimated_intervals, estimated_labels, sample_size=frame_size - )[-1] - - y_est = util.index_labels(y_est)[0] - - return _adjusted_rand_index(y_ref, y_est) + return _labeled_structure_metric(_adjusted_rand_index, + reference_intervals, reference_labels, + estimated_intervals, estimated_labels, + frame_size=frame_size) def _mutual_info_score(reference_indices, estimated_indices, contingency=None): @@ -923,37 +937,20 @@ def mutual_information( Normalize mutual information between segmentations """ - validate_structure( - reference_intervals, reference_labels, estimated_intervals, estimated_labels - ) - - # Check for empty annotations. Don't need to check labels because - # validate_structure makes sure they're the same size as intervals - if reference_intervals.size == 0 or estimated_intervals.size == 0: - return 0.0, 0.0, 0.0 - - # Generate the cluster labels - y_ref = util.intervals_to_samples( - reference_intervals, reference_labels, sample_size=frame_size - )[-1] - - y_ref = util.index_labels(y_ref)[0] - - # Map to index space - y_est = util.intervals_to_samples( - estimated_intervals, estimated_labels, sample_size=frame_size - )[-1] - - y_est = util.index_labels(y_est)[0] - + return _labeled_structure_metric(_compute_mutual_information, + reference_intervals, reference_labels, + estimated_intervals, estimated_labels, + frame_size=frame_size) + +def _compute_mutual_information(reference_indices, estimated_indices): # Mutual information - mutual_info = _mutual_info_score(y_ref, y_est) + mutual_info = _mutual_info_score(reference_indices, estimated_indices) # Adjusted mutual information - adj_mutual_info = _adjusted_mutual_info_score(y_ref, y_est) + adj_mutual_info = _adjusted_mutual_info_score(reference_indices, estimated_indices) # Normalized mutual information - norm_mutual_info = _normalized_mutual_info_score(y_ref, y_est) + norm_mutual_info = _normalized_mutual_info_score(reference_indices, estimated_indices) return mutual_info, adj_mutual_info, norm_mutual_info @@ -1059,7 +1056,13 @@ def nce( )[-1] y_est = util.index_labels(y_est)[0] + + return _labeled_structure_metric(_compute_nce, + reference_intervals, reference_labels, + estimated_intervals, estimated_labels, + frame_size=frame_size, beta=beta, marginal=marginal) +def _compute_nce(y_ref, y_est, beta=1.0, marginal=False): # Make the contingency table: shape = (n_ref, n_est) contingency = _contingency_matrix(y_ref, y_est).astype(float) @@ -1178,6 +1181,35 @@ def vmeasure( marginal=True, ) +def return_mapping(mapping:Sequence[Any], some_func, *args, **kwargs) -> dict[Any, Any]: + """Return function results as a mapping + + Note: there are no type-checks or length-checks in this method, clients are responsible. + + Args: + mapping (Sequence): keys to form mapping, same order as return values of some_func + some_func (_type_): function to decorate + + Returns: + dict: return values of some_func, mapped by mapping key + """ + return_res = some_func(*args, **kwargs) + return _return_values_to_mapping(mapping, return_res) + +def _return_values_to_mapping(mapping:Sequence[Any], values, accumulator:dict=None) -> dict[Any, Any]: + if accumulator is None: + accumulator = {} + if not isinstance(values, tuple): + accumulator[mapping[0]] = values + else: + for i,m in enumerate(mapping): + accumulator[m] = values[i] + return accumulator + +def __accumulate_metrics(metric_func, metric_names, metric_accumulator=None, *args, **kwargs): + return_res = util.filter_kwargs(metric_func, *args, **kwargs) + return _return_values_to_mapping(metric_names, return_res, metric_accumulator) + def evaluate(ref_intervals, ref_labels, est_intervals, est_labels, **kwargs): """Compute all metrics for the given reference and estimated annotations. @@ -1227,71 +1259,39 @@ def evaluate(ref_intervals, ref_labels, est_intervals, est_labels, **kwargs): # Now compute all the metrics scores = collections.OrderedDict() - # Boundary detection + # Metrics for Intervals without structure labels section: + trim = kwargs.get('trim') + reference_boundaries, estimated_boundaries = validated_trimmed_boundaries(ref_intervals, + est_intervals, + trim, segment_type=SegmentType.INTERVAL) + def __with_interval_metrics(metric_func, metric_names): + return __accumulate_metrics(metric_func, metric_names, scores, reference_boundaries, estimated_boundaries, **kwargs) + + __with_interval_metrics(_compute_deviation, ["Ref-to-est deviation", "Est-to-ref deviation"]) # Force these values for window kwargs["window"] = 0.5 - ( - scores["Precision@0.5"], - scores["Recall@0.5"], - scores["F-measure@0.5"], - ) = util.filter_kwargs(detection, ref_intervals, est_intervals, **kwargs) - + __with_interval_metrics(_compute_detection, ["Precision@0.5", "Recall@0.5", "F-measure@0.5"]) kwargs["window"] = 3.0 - ( - scores["Precision@3.0"], - scores["Recall@3.0"], - scores["F-measure@3.0"], - ) = util.filter_kwargs(detection, ref_intervals, est_intervals, **kwargs) - - # Boundary deviation - scores["Ref-to-est deviation"], scores["Est-to-ref deviation"] = util.filter_kwargs( - deviation, ref_intervals, est_intervals, **kwargs - ) - - # Pairwise clustering - ( - scores["Pairwise Precision"], - scores["Pairwise Recall"], - scores["Pairwise F-measure"], - ) = util.filter_kwargs( - pairwise, ref_intervals, ref_labels, est_intervals, est_labels, **kwargs - ) - - # Rand index - scores["Rand Index"] = util.filter_kwargs( - rand_index, ref_intervals, ref_labels, est_intervals, est_labels, **kwargs - ) - # Adjusted rand index - scores["Adjusted Rand Index"] = util.filter_kwargs( - ari, ref_intervals, ref_labels, est_intervals, est_labels, **kwargs - ) - - # Mutual information metrics - ( - scores["Mutual Information"], - scores["Adjusted Mutual Information"], - scores["Normalized Mutual Information"], - ) = util.filter_kwargs( - mutual_information, - ref_intervals, - ref_labels, - est_intervals, - est_labels, - **kwargs - ) + __with_interval_metrics(_compute_detection, ["Precision@3.0", "Recall@3.0", "F-measure@3.0"]) + + # Structured metrics section: + frame_size = kwargs.get('frame_size') + ref_indices, est_indices = _structure_to_indices(ref_intervals, ref_labels, + est_intervals, est_labels, + frame_size) + def __with_structured_interval_metrics(metric_func, metric_names): + return __accumulate_metrics(metric_func, metric_names, scores, ref_indices, est_indices, **kwargs) + + __with_structured_interval_metrics(_compute_pairwise, ["Pairwise Precision", "Pairwise Recall", "Pairwise F-measure"]) + __with_structured_interval_metrics(_compute_random_index, ["Rand Index"]) + __with_structured_interval_metrics(_adjusted_rand_index, ["Adjusted Rand Index"]) + __with_structured_interval_metrics(_compute_mutual_information, ["Mutual Information", "Adjusted Mutual Information", "Normalized Mutual Information"]) # Conditional entropy metrics - ( - scores["NCE Over"], - scores["NCE Under"], - scores["NCE F-measure"], - ) = util.filter_kwargs( - nce, ref_intervals, ref_labels, est_intervals, est_labels, **kwargs - ) - + kwargs['marginal']=False + __with_structured_interval_metrics(_compute_nce, ["NCE Over", "NCE Under", "NCE F-measure"]) # V-measure metrics - scores["V Precision"], scores["V Recall"], scores["V-measure"] = util.filter_kwargs( - vmeasure, ref_intervals, ref_labels, est_intervals, est_labels, **kwargs - ) + kwargs['marginal']=True + __with_structured_interval_metrics(_compute_nce, ["V Precision", "V Recall", "V-measure"]) return scores