From c11f545e6e99532cb5524934f59463ee2cc819a4 Mon Sep 17 00:00:00 2001 From: Egor Riashin Date: Mon, 25 Mar 2024 19:11:37 +0300 Subject: [PATCH] Druid Explore Exactify mode (#4397) * Druid Exactify * Druid Exactify * Druid Exactify * Druid Exactify * Druid Exactify --------- Co-authored-by: Egor Ryashin --- .../queries/metricsview_comparison_toplist.go | 145 +++++++++++++++--- 1 file changed, 122 insertions(+), 23 deletions(-) diff --git a/runtime/queries/metricsview_comparison_toplist.go b/runtime/queries/metricsview_comparison_toplist.go index f59e22a1ff7..993ad36c9d0 100644 --- a/runtime/queries/metricsview_comparison_toplist.go +++ b/runtime/queries/metricsview_comparison_toplist.go @@ -10,6 +10,7 @@ import ( runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" "github.com/rilldata/rill/runtime/drivers" + "github.com/rilldata/rill/runtime/pkg/expressionpb" "github.com/rilldata/rill/runtime/pkg/pbutil" "google.golang.org/protobuf/types/known/structpb" @@ -112,13 +113,97 @@ func (q *MetricsViewComparison) Resolve(ctx context.Context, rt *runtime.Runtime return err } + // comparison toplist if !isTimeRangeNil(q.ComparisonTimeRange) { + // execute toplist for base and get dim list + // create and add filter and execute comprison toplist + // remove strict limits in comp toplist sql + if drivers.DialectDruid != olap.Dialect() || q.Exact { + return q.executeComparisonToplist(ctx, olap, mv, priority, security) + } + + // Druid-based `exactify` approach: + // 1. The first query fetch topN dimensions. + // 2. The second query fetches topN filtered by the colleted dimensions. + // The dimension filter contrains topN table avoiding approximation in measures (due to mearging multiple topN Druid results from different nodes). + // Optimizations: + // * the first query fetches only sorted dimensions + // * the second query isn't run if the topN already less than the limit + originalMeasures := q.removeNoSortMeasures() + + if q.isBase() || q.isDeltaComparison() { + err = q.executeToplist(ctx, olap, mv, priority, security) + if err != nil { + return err + } + } else { + ttr := q.TimeRange + q.TimeRange = q.ComparisonTimeRange + err = q.executeToplist(ctx, olap, mv, priority, security) + if err != nil { + return err + } + + q.TimeRange = ttr + } + + q.addDimsAsFilter() + q.Measures = originalMeasures return q.executeComparisonToplist(ctx, olap, mv, priority, security) } + // general toplist + if drivers.DialectDruid != olap.Dialect() || q.Exact { + return q.executeToplist(ctx, olap, mv, priority, security) + } + + // Druid-based `exactify` approach (see comments above) + originalMeasures := q.Measures + if len(q.Measures) >= 5 { + originalMeasures = q.removeNoSortMeasures() + } + + err = q.executeToplist(ctx, olap, mv, priority, security) + if err != nil { + return err + } + + if len(q.Result.Rows) < int(q.Limit) && len(q.Measures) == len(originalMeasures) { + return nil + } + + q.addDimsAsFilter() + + q.Measures = originalMeasures return q.executeToplist(ctx, olap, mv, priority, security) } +func (q *MetricsViewComparison) removeNoSortMeasures() []*runtimev1.MetricsViewAggregationMeasure { + measures := q.Measures + sortMeasures := make([]*runtimev1.MetricsViewAggregationMeasure, 0, len(q.Sort)) + for _, m := range q.Measures { + for _, s := range q.Sort { + if s.Name == m.Name { + sortMeasures = append(sortMeasures, m) + } + } + } + q.Measures = sortMeasures + return measures +} + +func (q *MetricsViewComparison) addDimsAsFilter() { + inExpressions := make([]*runtimev1.Expression, 0, len(q.Result.Rows)) + for _, r := range q.Result.Rows { + inExpressions = append(inExpressions, expressionpb.Value(r.DimensionValue)) + } + if q.Where != nil { + q.Where = expressionpb.And([]*runtimev1.Expression{q.Where, expressionpb.In(expressionpb.Identifier(q.DimensionName), inExpressions)}) + } else { + q.Where = expressionpb.In(expressionpb.Identifier(q.DimensionName), inExpressions) + } +} + func (q *MetricsViewComparison) calculateMeasuresMeta() error { compare := !isTimeRangeNil(q.ComparisonTimeRange) @@ -707,12 +792,8 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M } limitClause := "" - twiceTheLimitClause := "" if q.Limit > 0 { limitClause = fmt.Sprintf(" LIMIT %d", q.Limit) - twiceTheLimitClause = fmt.Sprintf(" LIMIT %d", q.Limit*2) - } else if q.Limit == 0 { - twiceTheLimitClause = fmt.Sprintf(" LIMIT %d", 100_000) // use Druid limit } baseLimitClause := "" @@ -723,7 +804,7 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M deltaComparison := q.Sort[0].SortType == runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_ABS_DELTA || q.Sort[0].SortType == runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_REL_DELTA - approximationLimit := q.Limit + approximationLimit := int(q.Limit) if q.Limit != 0 && q.Limit < 100 && deltaComparison { approximationLimit = 100 } @@ -779,23 +860,23 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M // this leads to ambiguity whether it applies to the base.measure ot comparison.measure. // to keep the clause builder consistent we add an outer query here. sql = fmt.Sprintf(` - SELECT * from ( - SELECT COALESCE(base.%[2]s, comparison.%[2]s) AS %[10]s, %[9]s FROM - ( - SELECT %[1]s FROM %[3]s %[14]s WHERE %[4]s GROUP BY 1 %[12]s - ) base - %[11]s JOIN - ( - SELECT %[16]s FROM %[3]s %[14]s WHERE %[5]s GROUP BY 1 %[13]s - ) comparison - ON - %[17]s - %[6]s - %[7]s - OFFSET - %[8]d - ) WHERE %[15]s - `, + SELECT * from ( + SELECT COALESCE(base.%[2]s, comparison.%[2]s) AS %[10]s, %[9]s FROM + ( + SELECT %[1]s FROM %[3]s %[14]s WHERE %[4]s GROUP BY 1 %[12]s + ) base + %[11]s JOIN + ( + SELECT %[16]s FROM %[3]s %[14]s WHERE %[5]s GROUP BY 1 %[13]s + ) comparison + ON + %[17]s + %[6]s + %[7]s + OFFSET + %[8]d + ) WHERE %[15]s + `, subSelectClause, // 1 colName, // 2 safeName(mv.Table), // 3 @@ -897,6 +978,15 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M rightWhereClause = baseWhereClause } + twiceTheLimitClause := "" + if q.Exact { + if q.Limit > 0 { + twiceTheLimitClause = fmt.Sprintf(" LIMIT %d", q.Limit*2) + } else if q.Limit == 0 { + twiceTheLimitClause = fmt.Sprintf(" LIMIT %d", 100_000) // use Druid limit + } + } + sql = fmt.Sprintf(` WITH %[11]s AS ( SELECT %[1]s FROM %[3]s WHERE %[4]s GROUP BY 1 %[13]s %[10]s OFFSET %[8]d @@ -905,7 +995,7 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M ) SELECT %[11]s.%[2]s AS %[14]s, %[9]s FROM %[11]s LEFT JOIN %[12]s ON base.%[2]s = comparison.%[2]s GROUP BY 1 - HAVING %[15]s + HAVING %[15]s %[6]s %[7]s OFFSET %[8]d @@ -1207,3 +1297,12 @@ func validateMeasureAliases(aliases []*runtimev1.MetricsViewComparisonMeasureAli func isTimeRangeNil(tr *runtimev1.TimeRange) bool { return tr == nil || (tr.Start == nil && tr.End == nil) } + +func (q *MetricsViewComparison) isDeltaComparison() bool { + return q.Sort[0].SortType == runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_ABS_DELTA || + q.Sort[0].SortType == runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_REL_DELTA +} + +func (q *MetricsViewComparison) isBase() bool { + return q.Sort[0].SortType == runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_BASE_VALUE +}