diff --git a/recidiviz/aggregated_metrics/aggregated_metrics.py b/recidiviz/aggregated_metrics/aggregated_metrics.py index 61299a37db..6534f3f91f 100644 --- a/recidiviz/aggregated_metrics/aggregated_metrics.py +++ b/recidiviz/aggregated_metrics/aggregated_metrics.py @@ -89,7 +89,7 @@ def generate_aggregated_metrics_view_builder( metrics_query_str = ",\n ".join([metric.name for metric in included_metrics]) view_description_metrics = [ f"|{metric.display_name} (`{metric.name}`)|{metric.description}|{metric.pretty_name()}|" - f"`{metric.get_observation_conditions_string_no_newline() if isinstance(metric, MetricConditionsMixin) else 'N/A'}`|" + f"`{metric.get_observation_conditions_string_no_newline(filter_by_observation_type=True, read_observation_attributes_from_json=True) if isinstance(metric, MetricConditionsMixin) else 'N/A'}`|" for metric in included_metrics ] view_description_metrics_str = "\n".join(view_description_metrics) diff --git a/recidiviz/aggregated_metrics/assignment_event_aggregated_metrics.py b/recidiviz/aggregated_metrics/assignment_event_aggregated_metrics.py index 4c76f3bb0f..4e83295a3c 100644 --- a/recidiviz/aggregated_metrics/assignment_event_aggregated_metrics.py +++ b/recidiviz/aggregated_metrics/assignment_event_aggregated_metrics.py @@ -46,6 +46,8 @@ ) +# TODO(#29291): This function should become unused once we've migrated over to optimized +# aggregated metrics queries. def get_assignment_event_time_specific_cte( unit_of_analysis: MetricUnitOfAnalysis, population_type: MetricPopulationType, @@ -86,6 +88,8 @@ def get_assignment_event_time_specific_cte( metric_aggregation_fragment_inner = ",\n".join( [ metric.generate_aggregation_query_fragment( + filter_observations_by_type=True, + read_observation_attributes_from_json=True, event_date_col="events.event_date", assignment_date_col="assign.assignment_date", ) @@ -151,6 +155,8 @@ def get_assignment_event_time_specific_cte( ) +# TODO(#29291): This function should become unused once we've migrated over to optimized +# aggregated metrics queries. def generate_assignment_event_aggregated_metrics_view_builder( unit_of_analysis: MetricUnitOfAnalysis, population_type: MetricPopulationType, diff --git a/recidiviz/aggregated_metrics/assignment_span_aggregated_metrics.py b/recidiviz/aggregated_metrics/assignment_span_aggregated_metrics.py index 1674c569ee..5174b4fb42 100644 --- a/recidiviz/aggregated_metrics/assignment_span_aggregated_metrics.py +++ b/recidiviz/aggregated_metrics/assignment_span_aggregated_metrics.py @@ -47,6 +47,8 @@ from recidiviz.observations.span_type import SpanType +# TODO(#29291): This function should become unused once we've migrated over to optimized +# aggregated metrics queries. def get_assignment_span_time_specific_cte( unit_of_analysis: MetricUnitOfAnalysis, population_type: MetricPopulationType, @@ -85,6 +87,8 @@ def get_assignment_span_time_specific_cte( metric_aggregation_fragment_inner = ",\n".join( [ metric.generate_aggregation_query_fragment( + filter_observations_by_type=True, + read_observation_attributes_from_json=True, span_start_date_col="spans.start_date", span_end_date_col="spans.end_date", assignment_date_col="assign.assignment_date", @@ -155,6 +159,8 @@ def get_assignment_span_time_specific_cte( ) +# TODO(#29291): This function should become unused once we've migrated over to optimized +# aggregated metrics queries. def generate_assignment_span_aggregated_metrics_view_builder( unit_of_analysis: MetricUnitOfAnalysis, population_type: MetricPopulationType, diff --git a/recidiviz/aggregated_metrics/models/aggregated_metric.py b/recidiviz/aggregated_metrics/models/aggregated_metric.py index 7354e96282..b03229ca5c 100644 --- a/recidiviz/aggregated_metrics/models/aggregated_metric.py +++ b/recidiviz/aggregated_metrics/models/aggregated_metric.py @@ -36,7 +36,10 @@ MetricUnitOfObservationType, ) from recidiviz.observations.observation_selector import ObservationSelector -from recidiviz.observations.observation_type_utils import ObservationTypeT +from recidiviz.observations.observation_type_utils import ( + ObservationTypeT, + observation_attribute_value_clause, +) from recidiviz.observations.span_selector import SpanSelector from recidiviz.observations.span_type import SpanType @@ -86,36 +89,39 @@ def unit_of_observation(self) -> MetricUnitOfObservation: def unit_of_observation_type(self) -> MetricUnitOfObservationType: return self.unit_of_observation.type - def get_observation_conditions_string(self) -> str: + def get_observation_conditions_string( + self, + filter_by_observation_type: bool, + read_observation_attributes_from_json: bool, + ) -> str: """Returns a query fragment that filters a rows that contain observation data based on configured observation conditions for this metric. """ - fragment = self.observation_selector.generate_observation_conditions_query_fragment( - filter_by_observation_type=True, - # TODO(#29291): Figure out where get_observation_conditions_string() is - # being used and pass as a parameter through to - # get_observation_conditions_string() instead of always setting to True - # here so we can gate the new aggregated metric query code properly. - read_attributes_from_json=True, - strip_newlines=False, + fragment = ( + self.observation_selector.generate_observation_conditions_query_fragment( + filter_by_observation_type=filter_by_observation_type, + read_attributes_from_json=read_observation_attributes_from_json, + strip_newlines=False, + ) ) return f"({fragment})" - def get_observation_conditions_string_no_newline(self) -> str: + def get_observation_conditions_string_no_newline( + self, + filter_by_observation_type: bool, + read_observation_attributes_from_json: bool, + ) -> str: """Returns a query fragment that filters a rows that contain observation data based on configured observation conditions for this metric. All newlines are stripped from the condition string so this can be used in places where we want more succinct output. """ - fragment = self.observation_selector.generate_observation_conditions_query_fragment( - filter_by_observation_type=True, - # TODO(#29291): Figure out where - # get_observation_conditions_string_no_newline() is being used and pass as - # a parameter through to get_observation_conditions_string_no_newline() - # instead of always setting to True here so we can gate the new aggregated - # metric query code properly. - read_attributes_from_json=True, - strip_newlines=True, + fragment = ( + self.observation_selector.generate_observation_conditions_query_fragment( + filter_by_observation_type=filter_by_observation_type, + read_attributes_from_json=read_observation_attributes_from_json, + strip_newlines=True, + ) ) return f"({fragment})" @@ -183,6 +189,13 @@ class PeriodSpanAggregatedMetric(AggregatedMetric, SpanMetricConditionsMixin): @abc.abstractmethod def generate_aggregation_query_fragment( self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, span_start_date_col: str, span_end_date_col: str, period_start_date_col: str, @@ -204,7 +217,17 @@ class AssignmentSpanAggregatedMetric(AggregatedMetric, SpanMetricConditionsMixin @abc.abstractmethod def generate_aggregation_query_fragment( - self, span_start_date_col: str, span_end_date_col: str, assignment_date_col: str + self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, + span_start_date_col: str, + span_end_date_col: str, + assignment_date_col: str, ) -> str: """Returns a query fragment that calculates an aggregation corresponding to the AssignmentSpan metric type.""" @@ -217,7 +240,17 @@ class PeriodEventAggregatedMetric(AggregatedMetric, EventMetricConditionsMixin): """ @abc.abstractmethod - def generate_aggregation_query_fragment(self, event_date_col: str) -> str: + def generate_aggregation_query_fragment( + self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, + event_date_col: str, + ) -> str: """Returns a query fragment that calculates an aggregation corresponding to the PeriodEvent metric type.""" @@ -236,7 +269,16 @@ def generate_aggregate_time_periods_query_fragment(self) -> str: @abc.abstractmethod def generate_aggregation_query_fragment( - self, event_date_col: str, assignment_date_col: str + self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, + event_date_col: str, + assignment_date_col: str, ) -> str: """Returns a query fragment that calculates an aggregation corresponding to the AssignmentEvent metric type.""" @@ -252,12 +294,23 @@ class DailyAvgSpanCountMetric(PeriodSpanAggregatedMetric): def generate_aggregation_query_fragment( self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, span_start_date_col: str, span_end_date_col: str, period_start_date_col: str, period_end_date_col: str, original_span_start_date: Optional[str] = None, ) -> str: + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, + ) return f""" SUM( ( @@ -265,7 +318,7 @@ def generate_aggregation_query_fragment( LEAST({period_end_date_col}, {nonnull_current_date_exclusive_clause(span_end_date_col)}), GREATEST({period_start_date_col}, {span_start_date_col}), DAY) - ) * (IF({self.get_observation_conditions_string()}, 1, 0)) / DATE_DIFF({period_end_date_col}, {period_start_date_col}, DAY) + ) * (IF({observation_conditions}, 1, 0)) / DATE_DIFF({period_end_date_col}, {period_start_date_col}, DAY) ) AS {self.name} """ @@ -291,12 +344,28 @@ class DailyAvgSpanValueMetric(PeriodSpanAggregatedMetric): def generate_aggregation_query_fragment( self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, span_start_date_col: str, span_end_date_col: str, period_start_date_col: str, period_end_date_col: str, original_span_start_date: Optional[str] = None, ) -> str: + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, + ) + span_value_numeric_clause = observation_attribute_value_clause( + observation_type=self.observation_selector.observation_type, + attribute=self.span_value_numeric, + read_attributes_from_json=read_observation_attributes_from_json, + ) return f""" SAFE_DIVIDE( SUM( @@ -305,8 +374,8 @@ def generate_aggregation_query_fragment( GREATEST({period_start_date_col}, {span_start_date_col}), DAY ) * IF( - {self.get_observation_conditions_string()}, - CAST(JSON_EXTRACT_SCALAR(span_attributes, "$.{self.span_value_numeric}") AS FLOAT64), + {observation_conditions}, + CAST({span_value_numeric_clause} AS FLOAT64), 0 ) ), @@ -315,7 +384,7 @@ def generate_aggregation_query_fragment( LEAST({period_end_date_col}, {nonnull_current_date_exclusive_clause(span_end_date_col)}), GREATEST({period_start_date_col}, {span_start_date_col}), DAY - ) * IF({self.get_observation_conditions_string()}, 1, 0) + ) * IF({observation_conditions}, 1, 0) ) ) AS {self.name} """ @@ -342,12 +411,23 @@ class DailyAvgTimeSinceSpanStartMetric(PeriodSpanAggregatedMetric): def generate_aggregation_query_fragment( self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, span_start_date_col: str, span_end_date_col: str, period_start_date_col: str, period_end_date_col: str, original_span_start_date: str, ) -> str: + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, + ) return f""" SAFE_DIVIDE( SUM( @@ -356,7 +436,7 @@ def generate_aggregation_query_fragment( GREATEST({period_start_date_col}, {span_start_date_col}), DAY ) * IF( - {self.get_observation_conditions_string()}, + {observation_conditions}, ( # Average of LoS on last day (inclusive) of period/span and LoS on first day of period/span (DATE_DIFF( @@ -375,7 +455,7 @@ def generate_aggregation_query_fragment( LEAST({period_end_date_col}, {nonnull_current_date_exclusive_clause(span_end_date_col)}), GREATEST({period_start_date_col}, {span_start_date_col}), DAY - ) * IF({self.get_observation_conditions_string()}, 1, 0) + ) * IF({observation_conditions}, 1, 0) ) ) AS {self.name} """ @@ -403,19 +483,34 @@ class SumSpanDaysMetric(PeriodSpanAggregatedMetric): def generate_aggregation_query_fragment( self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, span_start_date_col: str, span_end_date_col: str, period_start_date_col: str, period_end_date_col: str, original_span_start_date: Optional[str] = None, ) -> str: - weight_snippet = ( - ( - f'CAST(JSON_EXTRACT_SCALAR(span_attributes, "$.{self.weight_col}") AS FLOAT64) * ' - ) - if self.weight_col - else "" + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, ) + + if self.weight_col: + weight_col_clause = observation_attribute_value_clause( + observation_type=self.observation_selector.observation_type, + attribute=self.weight_col, + read_attributes_from_json=read_observation_attributes_from_json, + ) + weight_snippet = f"CAST({weight_col_clause} AS FLOAT64) * " + else: + weight_snippet = "" + return f""" SUM( ( @@ -423,7 +518,7 @@ def generate_aggregation_query_fragment( LEAST({period_end_date_col}, {nonnull_current_date_exclusive_clause(span_end_date_col)}), GREATEST({period_start_date_col}, {span_start_date_col}), DAY) - ) * (IF({self.get_observation_conditions_string()}, 1, 0)) + ) * (IF({observation_conditions}, 1, 0)) ) AS {self.name} """ @@ -442,15 +537,26 @@ class SpanDistinctUnitCountMetric(PeriodSpanAggregatedMetric): def generate_aggregation_query_fragment( self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, span_start_date_col: str, span_end_date_col: str, period_start_date_col: str, period_end_date_col: str, original_span_start_date: str, ) -> str: + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, + ) return f""" COUNT(DISTINCT IF( - {self.get_observation_conditions_string()}, + {observation_conditions}, CONCAT({self.unit_of_observation.get_primary_key_columns_query_string(prefix="ses")}), NULL )) AS {self.name} @@ -471,11 +577,25 @@ class AssignmentSpanDaysMetric(AssignmentSpanAggregatedMetric): """ def generate_aggregation_query_fragment( - self, span_start_date_col: str, span_end_date_col: str, assignment_date_col: str + self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, + span_start_date_col: str, + span_end_date_col: str, + assignment_date_col: str, ) -> str: + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, + ) return f""" SUM( - IF({self.get_observation_conditions_string()}, DATE_DIFF( + IF({observation_conditions}, DATE_DIFF( LEAST( DATE_ADD({assignment_date_col}, INTERVAL {self.window_length_days} DAY), {nonnull_current_date_exclusive_clause(span_end_date_col)} @@ -504,12 +624,26 @@ class AssignmentSpanMaxDaysMetric(AssignmentSpanAggregatedMetric): """ def generate_aggregation_query_fragment( - self, span_start_date_col: str, span_end_date_col: str, assignment_date_col: str + self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, + span_start_date_col: str, + span_end_date_col: str, + assignment_date_col: str, ) -> str: + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, + ) return f""" MAX( IF( - {self.get_observation_conditions_string()} + {observation_conditions} AND {span_start_date_col} <= DATE_ADD({assignment_date_col}, INTERVAL {self.window_length_days} DAY), DATE_DIFF( LEAST( @@ -543,14 +677,33 @@ class AssignmentSpanValueAtStartMetric(AssignmentSpanAggregatedMetric): span_count_metric: AssignmentSpanDaysMetric def generate_aggregation_query_fragment( - self, span_start_date_col: str, span_end_date_col: str, assignment_date_col: str + self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, + span_start_date_col: str, + span_end_date_col: str, + assignment_date_col: str, ) -> str: + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, + ) + span_value_numeric_clause = observation_attribute_value_clause( + observation_type=self.observation_selector.observation_type, + attribute=self.span_value_numeric, + read_attributes_from_json=read_observation_attributes_from_json, + ) return f""" AVG( IF( - {self.get_observation_conditions_string()} + {observation_conditions} AND {assignment_date_col} BETWEEN {span_start_date_col} AND {nonnull_current_date_exclusive_clause(span_end_date_col)}, - CAST(JSON_EXTRACT_SCALAR(span_attributes, "$.{self.span_value_numeric}") AS FLOAT64), + CAST({span_value_numeric_clause} AS FLOAT64), NULL ) ) AS {self.name} @@ -569,7 +722,17 @@ class AssignmentCountMetric(AssignmentSpanAggregatedMetric): """ def generate_aggregation_query_fragment( - self, span_start_date_col: str, span_end_date_col: str, assignment_date_col: str + self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, + span_start_date_col: str, + span_end_date_col: str, + assignment_date_col: str, ) -> str: return f"1 AS {self.name}" @@ -592,15 +755,30 @@ class EventCountMetric(PeriodEventAggregatedMetric): # Otherwise, we treat those rows as the same event. event_segmentation_columns: Optional[List[str]] = None - def generate_aggregation_query_fragment(self, event_date_col: str) -> str: + def generate_aggregation_query_fragment( + self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, + event_date_col: str, + ) -> str: # If `event_segmentation_columns` are provided, add to the set of fields used to calculate the # COUNT DISTINCT. event_segmentation_columns = [] if self.event_segmentation_columns: event_segmentation_columns = self.event_segmentation_columns + event_segmentation_columns_json = [ - f'JSON_EXTRACT_SCALAR(event_attributes, "$.{col}")' + observation_attribute_value_clause( + observation_type=self.observation_selector.observation_type, + attribute=col, + read_attributes_from_json=read_observation_attributes_from_json, + ) for col in event_segmentation_columns ] event_segmentation_columns_str = ( @@ -608,9 +786,13 @@ def generate_aggregation_query_fragment(self, event_date_col: str) -> str: if len(event_segmentation_columns_json) > 0 else "" ) + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, + ) return f""" COUNT(DISTINCT IF( - {self.get_observation_conditions_string()}, + {observation_conditions}, CONCAT( {self.unit_of_observation.get_primary_key_columns_query_string(prefix="events")}, {event_date_col}{event_segmentation_columns_str} @@ -637,11 +819,30 @@ class EventValueMetric(PeriodEventAggregatedMetric): # EventCount metric counting the number of events contributing to the event value metric event_count_metric: EventCountMetric - def generate_aggregation_query_fragment(self, event_date_col: str) -> str: + def generate_aggregation_query_fragment( + self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, + event_date_col: str, + ) -> str: + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, + ) + event_value_numeric_clause = observation_attribute_value_clause( + observation_type=self.observation_selector.observation_type, + attribute=self.event_value_numeric, + read_attributes_from_json=read_observation_attributes_from_json, + ) return f""" AVG(IF( - {self.get_observation_conditions_string()}, - CAST(JSON_EXTRACT_SCALAR(event_attributes, "$.{self.event_value_numeric}") AS FLOAT64), + {observation_conditions}, + CAST({event_value_numeric_clause} AS FLOAT64), NULL )) AS {self.name} """ @@ -659,10 +860,24 @@ class EventDistinctUnitCountMetric(PeriodEventAggregatedMetric): Example metric: distinct active users. """ - def generate_aggregation_query_fragment(self, event_date_col: str) -> str: + def generate_aggregation_query_fragment( + self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, + event_date_col: str, + ) -> str: + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, + ) return f""" COUNT(DISTINCT IF( - {self.get_observation_conditions_string()}, + {observation_conditions}, CONCAT({self.unit_of_observation.get_primary_key_columns_query_string(prefix="events")}), NULL )) AS {self.name} @@ -683,13 +898,26 @@ class AssignmentDaysToFirstEventMetric(AssignmentEventAggregatedMetric): """ def generate_aggregation_query_fragment( - self, event_date_col: str, assignment_date_col: str + self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, + event_date_col: str, + assignment_date_col: str, ) -> str: + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, + ) return f""" MIN(DATE_DIFF( IFNULL( IF( - {self.get_observation_conditions_string()}, + {observation_conditions}, LEAST({event_date_col}, DATE_ADD({assignment_date_col}, INTERVAL {self.window_length_days} DAY)), NULL ), DATE_ADD({assignment_date_col}, INTERVAL {self.window_length_days} DAY)), @@ -712,12 +940,25 @@ class AssignmentEventCountMetric(AssignmentEventAggregatedMetric): """ def generate_aggregation_query_fragment( - self, event_date_col: str, assignment_date_col: str + self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, + event_date_col: str, + assignment_date_col: str, ) -> str: + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, + ) return f""" COUNT( DISTINCT IF( - {self.get_observation_conditions_string()} + {observation_conditions} AND {event_date_col} <= DATE_ADD({assignment_date_col}, INTERVAL {self.window_length_days} DAY), CONCAT({self.unit_of_observation.get_primary_key_columns_query_string(prefix="events")}, {event_date_col}), NULL @@ -739,11 +980,24 @@ class AssignmentEventBinaryMetric(AssignmentEventAggregatedMetric): """ def generate_aggregation_query_fragment( - self, event_date_col: str, assignment_date_col: str + self, + *, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + filter_observations_by_type: bool, + # TODO(#29291): Remove this flag once we've fully migrated to optimized + # aggregated metrics queries. + read_observation_attributes_from_json: bool, + event_date_col: str, + assignment_date_col: str, ) -> str: + observation_conditions = self.get_observation_conditions_string( + filter_by_observation_type=filter_observations_by_type, + read_observation_attributes_from_json=read_observation_attributes_from_json, + ) return f""" CAST(LOGICAL_OR( - {self.get_observation_conditions_string()} + {observation_conditions} AND {event_date_col} <= DATE_ADD({assignment_date_col}, INTERVAL {self.window_length_days} DAY) ) AS INT64) AS {self.name}""" diff --git a/recidiviz/aggregated_metrics/period_event_aggregated_metrics.py b/recidiviz/aggregated_metrics/period_event_aggregated_metrics.py index 2f9b24d109..48515f702a 100644 --- a/recidiviz/aggregated_metrics/period_event_aggregated_metrics.py +++ b/recidiviz/aggregated_metrics/period_event_aggregated_metrics.py @@ -46,6 +46,8 @@ ) +# TODO(#29291): This function should become unused once we've migrated over to optimized +# aggregated metrics queries. def get_period_event_time_specific_cte( unit_of_analysis: MetricUnitOfAnalysis, population_type: MetricPopulationType, @@ -75,7 +77,9 @@ def get_period_event_time_specific_cte( metric_aggregation_fragment = ",\n".join( [ metric.generate_aggregation_query_fragment( - event_date_col="events.event_date" + filter_observations_by_type=True, + read_observation_attributes_from_json=True, + event_date_col="events.event_date", ) for metric in metrics_for_unit_of_observation ] @@ -126,6 +130,8 @@ def get_period_event_time_specific_cte( ) +# TODO(#29291): This function should become unused once we've migrated over to optimized +# aggregated metrics queries. def generate_period_event_aggregated_metrics_view_builder( unit_of_analysis: MetricUnitOfAnalysis, population_type: MetricPopulationType, diff --git a/recidiviz/aggregated_metrics/period_span_aggregated_metrics.py b/recidiviz/aggregated_metrics/period_span_aggregated_metrics.py index 82fbee3d91..9235b54b79 100644 --- a/recidiviz/aggregated_metrics/period_span_aggregated_metrics.py +++ b/recidiviz/aggregated_metrics/period_span_aggregated_metrics.py @@ -49,6 +49,8 @@ from recidiviz.observations.span_type import SpanType +# TODO(#29291): This function should become unused once we've migrated over to optimized +# aggregated metrics queries. def get_period_span_time_specific_cte( unit_of_analysis: MetricUnitOfAnalysis, population_type: MetricPopulationType, @@ -87,6 +89,8 @@ def get_period_span_time_specific_cte( metric_aggregation_fragment = ",\n".join( sorted( metric.generate_aggregation_query_fragment( + filter_observations_by_type=True, + read_observation_attributes_from_json=True, span_start_date_col="ses.start_date", span_end_date_col="ses.end_date", period_start_date_col="pop.population_start_date", @@ -154,6 +158,8 @@ def get_period_span_time_specific_cte( ) +# TODO(#29291): This function should become unused once we've migrated over to optimized +# aggregated metrics queries. def generate_period_span_aggregated_metrics_view_builder( unit_of_analysis: MetricUnitOfAnalysis, population_type: MetricPopulationType, diff --git a/recidiviz/observations/observation_type_utils.py b/recidiviz/observations/observation_type_utils.py index 73618ec06d..b87470950c 100644 --- a/recidiviz/observations/observation_type_utils.py +++ b/recidiviz/observations/observation_type_utils.py @@ -52,6 +52,7 @@ def attributes_column_name_for_observation_type( # trivial) once we are only reading from single observation tables and the single # observation tables do not package their attributes into JSON. def observation_attribute_value_clause( + *, observation_type: EventType | SpanType, attribute: str, read_attributes_from_json: bool, diff --git a/recidiviz/outliers/types.py b/recidiviz/outliers/types.py index 3efec5b919..7c328df2e4 100644 --- a/recidiviz/outliers/types.py +++ b/recidiviz/outliers/types.py @@ -118,7 +118,18 @@ def metric_event_conditions_string(self) -> str: """ The query fragment to use to filter analyst_data.person_events for this metric's events """ - return self.aggregated_metric.get_observation_conditions_string_no_newline() + return self.aggregated_metric.get_observation_conditions_string_no_newline( + # TODO(#29291): Flip this flag once format_state_specific_person_events_filters() + # in recidiviz/calculator/query/state/views/outliers/utils.py pulls from + # the observation-specific table for this metric. + filter_by_observation_type=True, + # TODO(#34498), TODO(#29291): Flip this flag once + # format_state_specific_person_events_filters() in + # recidiviz/calculator/query/state/views/outliers/utils.py pulls from + # the observation-specific table for this metric AND we've updated + # observation-specific tables to not pack their attributes in JSON. + read_observation_attributes_from_json=True, + ) @attr.s @@ -135,7 +146,7 @@ def name(self) -> str: return self.aggregated_metric.name -@attr.s(eq=False) +@attr.s(eq=False, kw_only=True) class OutliersMetricConfig: """ Represents all information needed for a single metric in the Outliers products @@ -195,18 +206,18 @@ def build_from_metric( list_table_text: str | None = None, ) -> "OutliersMetricConfig": return cls( - metric.name, - metric.outcome_type, - title_display_name, - body_display_name, - event_name, - event_name_singular, - event_name_past_tense, - description_markdown, - metric.metric_event_conditions_string, - top_x_pct, - is_absconsion_metric, - list_table_text, + name=metric.name, + outcome_type=metric.outcome_type, + title_display_name=title_display_name, + body_display_name=body_display_name, + event_name=event_name, + event_name_singular=event_name_singular, + event_name_past_tense=event_name_past_tense, + description_markdown=description_markdown, + metric_event_conditions_string=metric.metric_event_conditions_string, + top_x_pct=top_x_pct, + is_absconsion_metric=is_absconsion_metric, + list_table_text=list_table_text, ) diff --git a/recidiviz/tests/observations/observation_type_utils_test.py b/recidiviz/tests/observations/observation_type_utils_test.py index 4dfebe73e2..8fdc77d8ba 100644 --- a/recidiviz/tests/observations/observation_type_utils_test.py +++ b/recidiviz/tests/observations/observation_type_utils_test.py @@ -44,7 +44,7 @@ def test_observation_attribute_value_clause(self) -> None: self.assertEqual( 'JSON_EXTRACT_SCALAR(event_attributes, "$.is_positive_result")', observation_attribute_value_clause( - EventType.DRUG_SCREEN, + observation_type=EventType.DRUG_SCREEN, attribute="is_positive_result", read_attributes_from_json=True, ), @@ -52,7 +52,7 @@ def test_observation_attribute_value_clause(self) -> None: self.assertEqual( "is_positive_result", observation_attribute_value_clause( - EventType.DRUG_SCREEN, + observation_type=EventType.DRUG_SCREEN, attribute="is_positive_result", read_attributes_from_json=False, ), @@ -60,7 +60,7 @@ def test_observation_attribute_value_clause(self) -> None: self.assertEqual( 'JSON_EXTRACT_SCALAR(span_attributes, "$.effective_date")', observation_attribute_value_clause( - SpanType.SENTENCE_SPAN, + observation_type=SpanType.SENTENCE_SPAN, attribute="effective_date", read_attributes_from_json=True, ), @@ -68,7 +68,7 @@ def test_observation_attribute_value_clause(self) -> None: self.assertEqual( "effective_date", observation_attribute_value_clause( - SpanType.SENTENCE_SPAN, + observation_type=SpanType.SENTENCE_SPAN, attribute="effective_date", read_attributes_from_json=False, ), diff --git a/recidiviz/tools/analyst/aggregated_metrics_utils.py b/recidiviz/tools/analyst/aggregated_metrics_utils.py index c42d85a0b3..9e28ad8df1 100644 --- a/recidiviz/tools/analyst/aggregated_metrics_utils.py +++ b/recidiviz/tools/analyst/aggregated_metrics_utils.py @@ -401,6 +401,8 @@ def get_person_events( raise ValueError("Must be an AggregatedMetric.") print(metric.name) + # TODO(#29291): Update this query to use the observation-specific table for + # the metric. query = f""" SELECT e.state_code, @@ -420,7 +422,7 @@ def get_person_events( LEFT JOIN `normalized_state.state_person_external_id` pei ON e.person_id = pei.person_id AND e.state_code = pei.state_code - WHERE ({metric.get_observation_conditions_string()}) + WHERE ({metric.get_observation_conditions_string(filter_by_observation_type=True, read_observation_attributes_from_json=True)}) {officer_ids_filter} AND e.event_date BETWEEN {min_date_str} AND {max_date_str} diff --git a/recidiviz/tools/looker/aggregated_metrics/custom_metrics_lookml_utils.py b/recidiviz/tools/looker/aggregated_metrics/custom_metrics_lookml_utils.py index d7c20c6316..80f599b093 100644 --- a/recidiviz/tools/looker/aggregated_metrics/custom_metrics_lookml_utils.py +++ b/recidiviz/tools/looker/aggregated_metrics/custom_metrics_lookml_utils.py @@ -120,6 +120,8 @@ def liquid_wrap_json_field(query_fragment: str, field_name: str, view_name: str) return f"""{{% if {view_name}.{field_name}._in_query or {view_name}.{field_name}._is_filtered %}}{query_fragment}{{% endif %}}""" +# TODO(#29291): Adapt this LookML generation helper to use optimized aggregated metrics +# queries. def generate_period_span_metric_view( metrics: List[PeriodSpanAggregatedMetric], view_name: str, @@ -129,6 +131,8 @@ def generate_period_span_metric_view( """Generates LookMLView with derived table performing logic for a set of PeriodSpanAggregatedMetric objects""" metric_aggregation_fragment = ( AVG_DAILY_POPULATION.generate_aggregation_query_fragment( + filter_observations_by_type=True, + read_observation_attributes_from_json=True, span_start_date_col="ses.start_date", span_end_date_col="ses.end_date", period_start_date_col="time_period.start_date", @@ -140,6 +144,8 @@ def generate_period_span_metric_view( [ liquid_wrap_metric( metric.generate_aggregation_query_fragment( + filter_observations_by_type=True, + read_observation_attributes_from_json=True, span_start_date_col="ses.start_date", span_end_date_col="ses.end_date", period_start_date_col="time_period.start_date", @@ -265,6 +271,8 @@ def generate_period_span_metric_view( ) +# TODO(#29291): Adapt this LookML generation helper to use optimized aggregated metrics +# queries. def generate_period_event_metric_view( metrics: List[PeriodEventAggregatedMetric], view_name: str, @@ -276,7 +284,9 @@ def generate_period_event_metric_view( [ liquid_wrap_metric( metric.generate_aggregation_query_fragment( - event_date_col="events.event_date" + filter_observations_by_type=True, + read_observation_attributes_from_json=True, + event_date_col="events.event_date", ), metric, view_name, @@ -356,6 +366,8 @@ def generate_period_event_metric_view( ) +# TODO(#29291): Adapt this LookML generation helper to use optimized aggregated metrics +# queries. def generate_assignment_span_metric_view( metrics: List[AssignmentSpanAggregatedMetric], view_name: str, @@ -373,6 +385,8 @@ def generate_assignment_span_metric_view( [ liquid_wrap_metric( metric.generate_aggregation_query_fragment( + filter_observations_by_type=True, + read_observation_attributes_from_json=True, span_start_date_col="spans.start_date", span_end_date_col="spans.end_date", assignment_date_col="assignments.assignment_date", @@ -456,6 +470,8 @@ def generate_assignment_span_metric_view( ) +# TODO(#29291): Adapt this LookML generation helper to use optimized aggregated metrics +# queries. def generate_assignment_event_metric_view( metrics: List[AssignmentEventAggregatedMetric], view_name: str, @@ -467,6 +483,8 @@ def generate_assignment_event_metric_view( [ liquid_wrap_metric( metric.generate_aggregation_query_fragment( + filter_observations_by_type=True, + read_observation_attributes_from_json=True, event_date_col="events.event_date", assignment_date_col="assignments.assignment_date", ),