diff --git a/runtime/queries/proto.go b/runtime/queries/proto.go index 9b9cfa9d530..406d6a3ec70 100644 --- a/runtime/queries/proto.go +++ b/runtime/queries/proto.go @@ -113,6 +113,58 @@ func ProtoFromJSON(qryName, qryArgsJSON string, executionTime *time.Time) (*runt return qry, nil } +// MetricsViewFromQuery extracts the metrics view name from a JSON query based on the query name. +func MetricsViewFromQuery(qryName, qryArgsJSON string) (string, error) { + qry := &runtimev1.Query{} + var metricsView string + switch qryName { + case "MetricsViewAggregation": + req := &runtimev1.MetricsViewAggregationRequest{} + qry.Query = &runtimev1.Query_MetricsViewAggregationRequest{MetricsViewAggregationRequest: req} + err := protojson.Unmarshal([]byte(qryArgsJSON), req) + if err != nil { + return "", fmt.Errorf("invalid properties for query %q: %w", qryName, err) + } + metricsView = req.MetricsView + case "MetricsViewToplist": + req := &runtimev1.MetricsViewToplistRequest{} + qry.Query = &runtimev1.Query_MetricsViewToplistRequest{MetricsViewToplistRequest: req} + err := protojson.Unmarshal([]byte(qryArgsJSON), req) + if err != nil { + return "", fmt.Errorf("invalid properties for query %q: %w", qryName, err) + } + metricsView = req.MetricsViewName + case "MetricsViewRows": + req := &runtimev1.MetricsViewRowsRequest{} + qry.Query = &runtimev1.Query_MetricsViewRowsRequest{MetricsViewRowsRequest: req} + err := protojson.Unmarshal([]byte(qryArgsJSON), req) + if err != nil { + return "", fmt.Errorf("invalid properties for query %q: %w", qryName, err) + } + metricsView = req.MetricsViewName + case "MetricsViewTimeSeries": + req := &runtimev1.MetricsViewTimeSeriesRequest{} + qry.Query = &runtimev1.Query_MetricsViewTimeSeriesRequest{MetricsViewTimeSeriesRequest: req} + err := protojson.Unmarshal([]byte(qryArgsJSON), req) + if err != nil { + return "", fmt.Errorf("invalid properties for query %q: %w", qryName, err) + } + metricsView = req.MetricsViewName + case "MetricsViewComparison": + req := &runtimev1.MetricsViewComparisonRequest{} + qry.Query = &runtimev1.Query_MetricsViewComparisonRequest{MetricsViewComparisonRequest: req} + err := protojson.Unmarshal([]byte(qryArgsJSON), req) + if err != nil { + return "", fmt.Errorf("invalid properties for query %q: %w", qryName, err) + } + metricsView = req.MetricsViewName + default: + return "", fmt.Errorf("query %q not supported for reports", qryName) + } + + return metricsView, nil +} + func overrideTimeRange(tr *runtimev1.TimeRange, t time.Time) *runtimev1.TimeRange { if tr == nil { tr = &runtimev1.TimeRange{} diff --git a/runtime/reconcilers/alert.go b/runtime/reconcilers/alert.go index 34ff32fd2fc..e2ae6cf5864 100644 --- a/runtime/reconcilers/alert.go +++ b/runtime/reconcilers/alert.go @@ -13,6 +13,7 @@ import ( "github.com/rilldata/rill/runtime" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/pkg/duration" + "github.com/rilldata/rill/runtime/pkg/formatter" "github.com/rilldata/rill/runtime/pkg/pbutil" "github.com/rilldata/rill/runtime/queries" "go.opentelemetry.io/otel/attribute" @@ -541,6 +542,14 @@ func (r *AlertReconciler) executeSingleWrapped(ctx context.Context, self *runtim if err != nil { return nil, fmt.Errorf("failed to parse query: %w", err) } + metricsViewName, err := queries.MetricsViewFromQuery(a.Spec.QueryName, a.Spec.QueryArgsJson) + if err != nil { + return nil, fmt.Errorf("failed extract metrics view name from query: %w", err) + } + metricsView, err := r.C.Get(ctx, &runtimev1.ResourceName{Kind: runtime.ResourceKindMetricsView, Name: metricsViewName}, false) + if err != nil { + return nil, err + } // Evaluate query attributes var queryForAttrs map[string]any @@ -562,7 +571,7 @@ func (r *AlertReconciler) executeSingleWrapped(ctx context.Context, self *runtim } // Extract result row - row, ok, err := extractQueryResultFirstRow(q) + row, ok, err := extractQueryResultFirstRow(q, metricsView.GetMetricsView().Spec.Measures, r.C.Logger) if err != nil { return nil, fmt.Errorf("failed to extract query result: %w", err) } @@ -882,35 +891,98 @@ func calculateExecutionTimes(a *runtimev1.Alert, watermark, previousWatermark ti // extractQueryResultFirstRow extracts the first row from a query result. // TODO: This should function more like an export, i.e. use dimension/measure labels instead of names. -func extractQueryResultFirstRow(q runtime.Query) (map[string]any, bool, error) { +func extractQueryResultFirstRow(q runtime.Query, measures []*runtimev1.MetricsViewSpec_MeasureV2, logger *zap.Logger) (map[string]any, bool, error) { switch q := q.(type) { case *queries.MetricsViewAggregation: if q.Result != nil && len(q.Result.Data) > 0 { - row := q.Result.Data[0] - return row.AsMap(), true, nil + return formatMetricsViewAggregationResult(q, measures, logger), true, nil } return nil, false, nil case *queries.MetricsViewComparison: if q.Result != nil && len(q.Result.Rows) > 0 { - row := q.Result.Rows[0] - res := make(map[string]any) - res[q.DimensionName] = row.DimensionValue - for _, v := range row.MeasureValues { - res[v.MeasureName] = v.BaseValue.AsInterface() - if v.ComparisonValue != nil { - res[v.MeasureName+" (prev)"] = v.ComparisonValue.AsInterface() - } - if v.DeltaAbs != nil { - res[v.MeasureName+" (Δ)"] = v.DeltaAbs.AsInterface() - } - if v.DeltaRel != nil { - res[v.MeasureName+" (Δ%)"] = v.DeltaRel.AsInterface() - } - } - return res, true, nil + return formatMetricsViewComparisonResult(q, measures, logger), true, nil } return nil, false, nil default: return nil, false, fmt.Errorf("query type %T not supported for alerts", q) } } + +func formatMetricsViewAggregationResult(q *queries.MetricsViewAggregation, measures []*runtimev1.MetricsViewSpec_MeasureV2, logger *zap.Logger) map[string]any { + row := q.Result.Data[0] + res := make(map[string]any) + for k, v := range row.AsMap() { + measureLabel, f := getMeasureLabelAndFormatter(k, measures, logger) + res[measureLabel] = formatValue(f, v, logger) + } + return res +} + +func formatMetricsViewComparisonResult(q *queries.MetricsViewComparison, measures []*runtimev1.MetricsViewSpec_MeasureV2, logger *zap.Logger) map[string]any { + row := q.Result.Rows[0] + res := make(map[string]any) + res[q.DimensionName] = row.DimensionValue + for _, v := range row.MeasureValues { + measureLabel, f := getMeasureLabelAndFormatter(v.MeasureName, measures, logger) + res[measureLabel] = formatValue(f, v.BaseValue.AsInterface(), logger) + if v.ComparisonValue != nil { + res[measureLabel+" (prev)"] = formatValue(f, v.ComparisonValue.AsInterface(), logger) + } + if v.DeltaAbs != nil { + res[measureLabel+" (Δ)"] = formatValue(f, v.DeltaAbs.AsInterface(), logger) + } + if v.DeltaRel != nil { + fp, err := formatter.NewPresetFormatter("percent", false) + if err != nil { + logger.Warn("Failed to get formatter, using no formatter", zap.Error(err)) + fp = nil + } + res[measureLabel+" (Δ%)"] = formatValue(fp, v.DeltaRel.AsInterface(), logger) + } + } + return res +} + +// getMeasureLabelAndFormatter gets the measure label and formatter by a measure name. +// if the measure is not found, it returns the measure name as the label and no formatter. +// if the formatter fails to load, it logs the error and returns the measure name as the label and no formatter. +func getMeasureLabelAndFormatter(measureName string, measures []*runtimev1.MetricsViewSpec_MeasureV2, logger *zap.Logger) (string, formatter.Formatter) { + var measure *runtimev1.MetricsViewSpec_MeasureV2 + for _, m := range measures { + if measureName == m.Name { + measure = m + break + } + } + + if measure == nil { + return measureName, nil + } + + measureLabel := measure.Label + if measureLabel == "" { + measureLabel = measureName + } + + // D3 formatting isn't implemented yet so using the format preset only for now + f, err := formatter.NewPresetFormatter(measure.FormatPreset, false) + if err != nil { + logger.Warn("Failed to get formatter, using no formatter", zap.Error(err)) + return measureLabel, nil + } + + return measureLabel, f +} + +// formatValue formats a measure value using the provided formatter. +// If the formatter is nil, or value is nil, or an error occurred, it will log a warning and return the value as is. +func formatValue(f formatter.Formatter, v any, logger *zap.Logger) any { + if f == nil || v == nil { + return v + } + if s, err := f.StringFormat(v); err != nil { + return s + } + logger.Warn("Failed to format measure value", zap.Any("value", v)) + return fmt.Sprintf("%v", v) +} diff --git a/runtime/reconcilers/alert_test.go b/runtime/reconcilers/alert_test.go index 2873011d274..456b2a9e369 100644 --- a/runtime/reconcilers/alert_test.go +++ b/runtime/reconcilers/alert_test.go @@ -155,7 +155,7 @@ SELECT '2024-01-04T00:00:00Z'::TIMESTAMP as __time, 'Denmark' as country Status: runtimev1.AssertionStatus_ASSERTION_STATUS_FAIL, FailRow: must(structpb.NewStruct(map[string]any{ "country": "Denmark", - "measure_0": 4, + "measure_0": "4", })), }, SentNotifications: true, @@ -309,7 +309,7 @@ SELECT '2024-01-04T00:00:00Z'::TIMESTAMP as __time, 'Denmark' as country Status: runtimev1.AssertionStatus_ASSERTION_STATUS_FAIL, FailRow: must(structpb.NewStruct(map[string]any{ "country": "Denmark", - "measure_0": 4, + "measure_0": "4", })), }, SentNotifications: true,