From af2c562027750a958622d60603fcf3ae6c9236ce Mon Sep 17 00:00:00 2001 From: Aditya Hegde Date: Mon, 18 Dec 2023 12:05:49 +0530 Subject: [PATCH] Testing on druid and fixes --- runtime/queries/metricsview.go | 12 +- .../queries/metricsview_aggregation_test.go | 4 +- .../queries/metricsview_comparison_toplist.go | 20 +- .../metricsview_comparison_toplist_test.go | 110 +++++++++-- runtime/queries/metricsview_timeseries.go | 2 +- .../queries/metricsview_timeseries_test.go | 39 ++-- runtime/server/druid_ingest_test.go | 172 ++++++++++++++++++ .../dashboards/ad_bids_metrics.yaml | 26 +++ .../testdata/ad_bids_druid/rill.yaml | 0 .../dashboards/timeseries.yaml | 27 +++ .../dashboards/timeseries_dst_backwards.yaml | 28 +++ .../dashboards/timeseries_dst_forwards.yaml | 28 +++ .../dashboards/timeseries_gaps.yaml | 25 +++ .../dashboards/timeseries_year.yaml | 21 +++ .../testdata/timeseries_druid/rill.yaml | 0 runtime/testruntime/testruntime.go | 25 ++- 16 files changed, 484 insertions(+), 55 deletions(-) create mode 100644 runtime/server/druid_ingest_test.go create mode 100644 runtime/testruntime/testdata/ad_bids_druid/dashboards/ad_bids_metrics.yaml create mode 100644 runtime/testruntime/testdata/ad_bids_druid/rill.yaml create mode 100644 runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries.yaml create mode 100644 runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_dst_backwards.yaml create mode 100644 runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_dst_forwards.yaml create mode 100644 runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_gaps.yaml create mode 100644 runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_year.yaml create mode 100644 runtime/testruntime/testdata/timeseries_druid/rill.yaml diff --git a/runtime/queries/metricsview.go b/runtime/queries/metricsview.go index ef43c66d3984..bb5d150139c2 100644 --- a/runtime/queries/metricsview.go +++ b/runtime/queries/metricsview.go @@ -293,7 +293,7 @@ func buildFilterClauseForCondition(mv *runtimev1.MetricsViewSpec, cond *runtimev return fmt.Sprintf("AND (%s) ", condsClause), args, nil } -func columnIdentifierExpression(mv *runtimev1.MetricsViewSpec, aliases []*runtimev1.MetricsViewComparisonMeasureAlias, name string) (string, bool) { +func columnIdentifierExpression(mv *runtimev1.MetricsViewSpec, aliases []*runtimev1.MetricsViewComparisonMeasureAlias, name string, dialect drivers.Dialect) (string, bool) { // check if identifier is a dimension for _, dim := range mv.Dimensions { if dim.Name == name { @@ -305,8 +305,12 @@ func columnIdentifierExpression(mv *runtimev1.MetricsViewSpec, aliases []*runtim for _, alias := range aliases { if alias.Alias == name { switch alias.Type { - case runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_BASE_VALUE, - runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_UNSPECIFIED: + case runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_UNSPECIFIED, + runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_BASE_VALUE: + // using `measure_0` as is causing ambiguity error in duckdb + if dialect == drivers.DialectDuckDB { + return "base." + safeName(alias.Name), true + } return safeName(alias.Name), true case runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_COMPARISON_VALUE: return safeName(alias.Name + "__previous"), true @@ -351,7 +355,7 @@ func buildExpression(mv *runtimev1.MetricsViewSpec, expr *runtimev1.Expression, return "?", []any{arg}, nil case *runtimev1.Expression_Ident: - expr, isIdent := columnIdentifierExpression(mv, aliases, e.Ident) + expr, isIdent := columnIdentifierExpression(mv, aliases, e.Ident, dialect) if !isIdent { return "", emptyArg, fmt.Errorf("unknown column filter: %s", e.Ident) } diff --git a/runtime/queries/metricsview_aggregation_test.go b/runtime/queries/metricsview_aggregation_test.go index 57deafafca76..0bde35a15d4f 100644 --- a/runtime/queries/metricsview_aggregation_test.go +++ b/runtime/queries/metricsview_aggregation_test.go @@ -14,11 +14,11 @@ import ( ) func TestMetricsViewAggregation_measure_filters(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") + rt, instanceID := testruntime.NewInstanceForProject(t, AdBidsProject) ctr := &queries.ColumnTimeRange{ TableName: "ad_bids", - ColumnName: "timestamp", + ColumnName: AdBidsTimestamp, } err := ctr.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) diff --git a/runtime/queries/metricsview_comparison_toplist.go b/runtime/queries/metricsview_comparison_toplist.go index be33c0fd6b6b..503b7506aa00 100644 --- a/runtime/queries/metricsview_comparison_toplist.go +++ b/runtime/queries/metricsview_comparison_toplist.go @@ -459,7 +459,7 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M var labelTuple string if dialect != drivers.DialectDruid { columnsTuple = fmt.Sprintf( - "base.%[1]s, comparison.%[1]s AS %[2]s, base.%[1]s - comparison.%[1]s AS %[3]s, (base.%[1]s - comparison.%[1]s)/comparison.%[1]s::DOUBLE AS %[4]s", + "base.%[1]s AS %[1]s, comparison.%[1]s AS %[2]s, base.%[1]s - comparison.%[1]s AS %[3]s, (base.%[1]s - comparison.%[1]s)/comparison.%[1]s::DOUBLE AS %[4]s", safeName(m.Name), safeName(m.Name+"__previous"), safeName(m.Name+"__delta_abs"), @@ -475,8 +475,11 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M ) } else { columnsTuple = fmt.Sprintf( - "ANY_VALUE(base.%[1]s), ANY_VALUE(comparison.%[1]s), ANY_VALUE(base.%[1]s - comparison.%[1]s), ANY_VALUE(SAFE_DIVIDE(base.%[1]s - comparison.%[1]s, CAST(comparison.%[1]s AS DOUBLE)))", + "ANY_VALUE(base.%[1]s) AS %[1]s, ANY_VALUE(comparison.%[1]s) AS %[2]s, ANY_VALUE(base.%[1]s - comparison.%[1]s) AS %[3]s, ANY_VALUE(SAFE_DIVIDE(base.%[1]s - comparison.%[1]s, CAST(comparison.%[1]s AS DOUBLE))) AS %[4]s", safeName(m.Name), + safeName(m.Name+"__previous"), + safeName(m.Name+"__delta_abs"), + safeName(m.Name+"__delta_rel"), ) labelTuple = fmt.Sprintf( "ANY_VALUE(base.%[1]s) AS %[2]s, ANY_VALUE(comparison.%[1]s) AS %[3]s, ANY_VALUE(base.%[1]s - comparison.%[1]s) AS %[4]s, ANY_VALUE(SAFE_DIVIDE(base.%[1]s - comparison.%[1]s, CAST(comparison.%[1]s AS DOUBLE))) AS %[5]s", @@ -550,14 +553,13 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M args = append(args, clauseArgs...) } - outerWhereClause := "" + havingClause := "1=1" if q.Having != nil { var havingClauseArgs []any - outerWhereClause, havingClauseArgs, err = buildExpression(mv, q.Having, q.Aliases, dialect) + havingClause, havingClauseArgs, err = buildExpression(mv, q.Having, q.Aliases, dialect) if err != nil { return "", nil, err } - outerWhereClause = "WHERE " + outerWhereClause args = append(args, havingClauseArgs...) } @@ -694,7 +696,7 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M ) comparison ON base.%[2]s = comparison.%[2]s OR (base.%[2]s is null and comparison.%[2]s is null) - %[16]s + WHERE %[16]s ORDER BY %[6]s %[7]s @@ -716,7 +718,7 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M comparisonLimitClause, // 13 unnestClause, // 14 groupByCol, // 15 - outerWhereClause, // 16 + havingClause, // 16 ) } else { /* @@ -776,8 +778,8 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M SELECT %[1]s FROM %[3]s WHERE %[5]s AND %[2]s IN (SELECT %[2]s FROM %[11]s) GROUP BY %[2]s %[10]s ) SELECT %[11]s.%[2]s AS %[14]s, %[9]s FROM %[11]s LEFT JOIN %[12]s ON base.%[2]s = comparison.%[2]s - %[15]s GROUP BY 1 + HAVING %[15]s ORDER BY %[6]s %[7]s OFFSET %[8]d @@ -796,7 +798,7 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M rightSubQueryAlias, // 12 subQueryOrderClause, // 13 finalDimName, // 14 - outerWhereClause, // 15 + havingClause, // 15 ) } diff --git a/runtime/queries/metricsview_comparison_toplist_test.go b/runtime/queries/metricsview_comparison_toplist_test.go index 186322a0cec6..5817e1eab8f2 100644 --- a/runtime/queries/metricsview_comparison_toplist_test.go +++ b/runtime/queries/metricsview_comparison_toplist_test.go @@ -19,12 +19,18 @@ import ( _ "github.com/rilldata/rill/runtime/drivers/duckdb" ) +// replace with "ad_bids_druid" to test on druid +const AdBidsProject = "ad_bids" + +// replace with "timestamp" to test on druid +const AdBidsTimestamp = "__time" + func TestMetricsViewsComparison_dim_order_comparison_toplist_vs_general_toplist(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") + rt, instanceID := testruntime.NewInstanceForProject(t, AdBidsProject) ctr := &queries.ColumnTimeRange{ TableName: "ad_bids", - ColumnName: "timestamp", + ColumnName: AdBidsTimestamp, } err := ctr.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) @@ -116,11 +122,11 @@ func TestMetricsViewsComparison_dim_order_comparison_toplist_vs_general_toplist( } func TestMetricsViewsComparison_dim_order(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") + rt, instanceID := testruntime.NewInstanceForProject(t, AdBidsProject) ctr := &queries.ColumnTimeRange{ TableName: "ad_bids", - ColumnName: "timestamp", + ColumnName: AdBidsTimestamp, } err := ctr.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) @@ -168,11 +174,11 @@ func TestMetricsViewsComparison_dim_order(t *testing.T) { } func TestMetricsViewsComparison_measure_order(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") + rt, instanceID := testruntime.NewInstanceForProject(t, AdBidsProject) ctr := &queries.ColumnTimeRange{ TableName: "ad_bids", - ColumnName: "timestamp", + ColumnName: AdBidsTimestamp, } err := ctr.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) @@ -220,11 +226,11 @@ func TestMetricsViewsComparison_measure_order(t *testing.T) { } func TestMetricsViewsComparison_measure_filters(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") + rt, instanceID := testruntime.NewInstanceForProject(t, AdBidsProject) ctr := &queries.ColumnTimeRange{ TableName: "ad_bids", - ColumnName: "timestamp", + ColumnName: AdBidsTimestamp, } err := ctr.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) @@ -289,11 +295,11 @@ func TestMetricsViewsComparison_measure_filters(t *testing.T) { } func TestMetricsViewsComparison_measure_filters_with_compare_no_alias(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") + rt, instanceID := testruntime.NewInstanceForProject(t, AdBidsProject) ctr := &queries.ColumnTimeRange{ TableName: "ad_bids", - ColumnName: "timestamp", + ColumnName: AdBidsTimestamp, } err := ctr.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) @@ -356,12 +362,92 @@ func TestMetricsViewsComparison_measure_filters_with_compare_no_alias(t *testing require.ErrorContains(t, err, "unknown column filter: measure_1__delta_rel") } +func TestMetricsViewsComparison_measure_filters_with_compare_base_measure(t *testing.T) { + rt, instanceID := testruntime.NewInstanceForProject(t, AdBidsProject) + + ctr := &queries.ColumnTimeRange{ + TableName: "ad_bids", + ColumnName: AdBidsTimestamp, + } + err := ctr.Resolve(context.Background(), rt, instanceID, 0) + require.NoError(t, err) + diff := ctr.Result.Max.AsTime().Sub(ctr.Result.Min.AsTime()) + maxTime := ctr.Result.Min.AsTime().Add(diff / 2) + + ctrl, err := rt.Controller(context.Background(), instanceID) + require.NoError(t, err) + r, err := ctrl.Get(context.Background(), &runtimev1.ResourceName{Kind: runtime.ResourceKindMetricsView, Name: "ad_bids_metrics"}, false) + require.NoError(t, err) + mv := r.GetMetricsView() + + q := &queries.MetricsViewComparison{ + MetricsViewName: "ad_bids_metrics", + DimensionName: "dom", + Measures: []*runtimev1.MetricsViewAggregationMeasure{ + { + Name: "measure_1", + }, + }, + MetricsView: mv.Spec, + TimeRange: &runtimev1.TimeRange{ + Start: ctr.Result.Min, + End: timestamppb.New(maxTime), + }, + ComparisonTimeRange: &runtimev1.TimeRange{ + Start: timestamppb.New(maxTime), + End: ctr.Result.Max, + }, + Sort: []*runtimev1.MetricsViewComparisonSort{ + { + Name: "dom", + SortType: runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_UNSPECIFIED, + Desc: true, + }, + }, + Having: &runtimev1.Expression{ + Expression: &runtimev1.Expression_Cond{ + Cond: &runtimev1.Condition{ + Op: runtimev1.Operation_OPERATION_GT, + Exprs: []*runtimev1.Expression{ + { + Expression: &runtimev1.Expression_Ident{ + Ident: "measure_1", + }, + }, + { + Expression: &runtimev1.Expression_Val{ + Val: structpb.NewNumberValue(3.25), + }, + }, + }, + }, + }, + }, + Aliases: []*runtimev1.MetricsViewComparisonMeasureAlias{ + { + Name: "measure_1", + Type: runtimev1.MetricsViewComparisonMeasureType_METRICS_VIEW_COMPARISON_MEASURE_TYPE_BASE_VALUE, + Alias: "measure_1", + }, + }, + Limit: 250, + } + + err = q.Resolve(context.Background(), rt, instanceID, 0) + require.NoError(t, err) + require.NotEmpty(t, q.Result) + require.Len(t, q.Result.Rows, 3) + require.Equal(t, "sports.yahoo.com", q.Result.Rows[0].DimensionValue.GetStringValue()) + require.Equal(t, "news.google.com", q.Result.Rows[1].DimensionValue.GetStringValue()) + require.Equal(t, "instagram.com", q.Result.Rows[2].DimensionValue.GetStringValue()) +} + func TestMetricsViewsComparison_measure_filters_with_compare_aliases(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "ad_bids") + rt, instanceID := testruntime.NewInstanceForProject(t, AdBidsProject) ctr := &queries.ColumnTimeRange{ TableName: "ad_bids", - ColumnName: "timestamp", + ColumnName: AdBidsTimestamp, } err := ctr.Resolve(context.Background(), rt, instanceID, 0) require.NoError(t, err) diff --git a/runtime/queries/metricsview_timeseries.go b/runtime/queries/metricsview_timeseries.go index 3c7d297a4d21..7f73bc089a57 100644 --- a/runtime/queries/metricsview_timeseries.go +++ b/runtime/queries/metricsview_timeseries.go @@ -320,7 +320,7 @@ func (q *MetricsViewTimeSeries) buildMetricsTimeseriesSQL(olap drivers.OLAPStore sql = q.buildDuckDBSQL(mv, tsAlias, selectCols, whereClause, havingClause, timezone) case drivers.DialectDruid: args = append([]any{timezone}, args...) - sql = q.buildDruidSQL(args, mv, tsAlias, selectCols, havingClause, whereClause) + sql = q.buildDruidSQL(args, mv, tsAlias, selectCols, whereClause, havingClause) default: return "", "", nil, fmt.Errorf("not available for dialect '%s'", olap.Dialect()) } diff --git a/runtime/queries/metricsview_timeseries_test.go b/runtime/queries/metricsview_timeseries_test.go index 63038bf960ad..1c28814f82e0 100644 --- a/runtime/queries/metricsview_timeseries_test.go +++ b/runtime/queries/metricsview_timeseries_test.go @@ -14,8 +14,11 @@ import ( "google.golang.org/protobuf/types/known/structpb" ) +// replace with "timeseries_druid" to test on druid +const TimeseriesProject = "timeseries" + func TestMetricsViewsTimeseries_month_grain(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -65,7 +68,7 @@ func TestMetricsViewsTimeseries_month_grain(t *testing.T) { } func TestMetricsViewsTimeseries_month_grain_IST(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -118,7 +121,7 @@ func TestMetricsViewsTimeseries_month_grain_IST(t *testing.T) { } func TestMetricsViewsTimeseries_quarter_grain_IST(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -157,7 +160,7 @@ func TestMetricsViewsTimeseries_quarter_grain_IST(t *testing.T) { } func TestMetricsViewsTimeseries_year_grain_IST(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -188,7 +191,7 @@ func TestMetricsViewsTimeseries_year_grain_IST(t *testing.T) { } func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Continuous_Weekly(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -222,7 +225,7 @@ func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Continuous_Weekly(t *tes } func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Continuous_WeeklyOnSaturday(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -257,7 +260,7 @@ func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Continuous_WeeklyOnSatur } func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Continuous_Daily(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -291,7 +294,7 @@ func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Continuous_Daily(t *test } func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Sparse_Daily(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -333,7 +336,7 @@ func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Sparse_Daily(t *testing. } func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Continuous_Second(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -379,7 +382,7 @@ func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Continuous_Second(t *tes } func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Continuous_Minute(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -425,7 +428,7 @@ func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Continuous_Minute(t *tes } func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Continuous_Hourly(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -461,7 +464,7 @@ func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Continuous_Hourly(t *tes } func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Sparse_Hourly(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -506,7 +509,7 @@ func TestMetricsViewTimeSeries_DayLightSavingsBackwards_Sparse_Hourly(t *testing } func TestMetricsViewTimeSeries_DayLightSavingsForwards_Continuous_Weekly(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -540,7 +543,7 @@ func TestMetricsViewTimeSeries_DayLightSavingsForwards_Continuous_Weekly(t *test } func TestMetricsViewTimeSeries_DayLightSavingsForwards_Continuous_Daily(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -574,7 +577,7 @@ func TestMetricsViewTimeSeries_DayLightSavingsForwards_Continuous_Daily(t *testi } func TestMetricsViewTimeSeries_DayLightSavingsForwards_Sparse_Daily(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -616,7 +619,7 @@ func TestMetricsViewTimeSeries_DayLightSavingsForwards_Sparse_Daily(t *testing.T } func TestMetricsViewTimeSeries_DayLightSavingsForwards_Continuous_Hourly(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -652,7 +655,7 @@ func TestMetricsViewTimeSeries_DayLightSavingsForwards_Continuous_Hourly(t *test } func TestMetricsViewTimeSeries_DayLightSavingsForwards_Sparse_Hourly(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) @@ -697,7 +700,7 @@ func TestMetricsViewTimeSeries_DayLightSavingsForwards_Sparse_Hourly(t *testing. } func TestMetricsViewTimeSeries_having_clause(t *testing.T) { - rt, instanceID := testruntime.NewInstanceForProject(t, "timeseries") + rt, instanceID := testruntime.NewInstanceForProject(t, TimeseriesProject) ctrl, err := rt.Controller(context.Background(), instanceID) require.NoError(t, err) diff --git a/runtime/server/druid_ingest_test.go b/runtime/server/druid_ingest_test.go new file mode 100644 index 000000000000..58056b375fa8 --- /dev/null +++ b/runtime/server/druid_ingest_test.go @@ -0,0 +1,172 @@ +package server_test + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "testing" + "time" + + runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" + "github.com/rilldata/rill/runtime" + "github.com/rilldata/rill/runtime/drivers/druid" + "github.com/rilldata/rill/runtime/pkg/activity" + "github.com/rilldata/rill/runtime/pkg/ratelimit" + "github.com/rilldata/rill/runtime/queries" + "github.com/rilldata/rill/runtime/server" + "github.com/rilldata/rill/runtime/server/auth" + "github.com/rilldata/rill/runtime/testruntime" + "github.com/stretchr/testify/require" +) + +const DruidIngestUrl = "http://localhost:8888" + +func Test_IngestToDruid(t *testing.T) { + // uncomment and run to ingest testdata folder into druid + // Note: druid should be started outside of this + t.Skip() + + ctx := auth.WithOpen(context.Background()) + + ingestProjectIntoDruid(ctx, t, "ad_bids") + ingestProjectIntoDruid(ctx, t, "timeseries") +} + +func ingestProjectIntoDruid(ctx context.Context, t *testing.T, projectName string) { + rt, instanceID := testruntime.NewInstanceForProject(t, projectName) + srv, err := server.NewServer(context.Background(), &server.Options{}, rt, nil, ratelimit.NewNoop(), activity.NewNoopClient()) + require.NoError(t, err) + + modelsResp, err := srv.ListResources(ctx, &runtimev1.ListResourcesRequest{ + InstanceId: instanceID, + Kind: runtime.ResourceKindModel, + }) + require.NoError(t, err) + + for _, model := range modelsResp.Resources { + if model.Meta.ReconcileError != "" { + continue + } + + tableName := model.GetModel().State.Table + if tableName == "" { + continue + } + + dataPath := exportModelForDruid(ctx, t, rt, instanceID, tableName) + + colsResp, err := srv.TableColumns(ctx, &runtimev1.TableColumnsRequest{ + InstanceId: instanceID, + TableName: tableName, + }) + require.NoError(t, err) + ingestModelIntoDruid(t, dataPath, tableName, colsResp.ProfileColumns) + } +} + +func exportModelForDruid(ctx context.Context, t *testing.T, rt *runtime.Runtime, instanceID, name string) string { + f, err := os.Create(filepath.Join(os.TempDir(), fmt.Sprintf("%s.csv", name))) + require.NoError(t, err) + defer f.Close() + q := queries.TableHead{ + TableName: name, + Limit: 10000, + } + require.NoError(t, q.Export(ctx, rt, instanceID, f, &runtime.ExportOptions{ + Format: runtimev1.ExportFormat_EXPORT_FORMAT_CSV, + })) + + return f.Name() +} + +func ingestModelIntoDruid(t *testing.T, dataPath, name string, cols []*runtimev1.ProfileColumn) { + schema := getDruidModelSpec(name, cols) + data, err := os.ReadFile(dataPath) + require.NoError(t, err) + defer os.Remove(dataPath) + + fmt.Printf("Ingesting model %s\n", name) + require.NoError(t, druid.Ingest( + DruidIngestUrl, + getDruidIngestionSpec(string(data), schema), + name, + 5*time.Minute, + )) +} + +func getDruidIngestionSpec(data, schema string) string { + return fmt.Sprintf( + `{ + "type": "index_parallel", + "spec": { + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "inline", + "data": "%s" + }, + "inputFormat": { + "type": "csv", + "findColumnsFromHeader": true + } + }, + "tuningConfig": { + "type": "index_parallel", + "partitionsSpec": { + "type": "dynamic" + } + }, + "dataSchema": %s + } + }`, + strings.ReplaceAll(data, "\n", "\\n"), + schema, + ) +} + +func getDruidModelSpec(name string, cols []*runtimev1.ProfileColumn) string { + var timestampCol string + var dimensions []string + for _, col := range cols { + if col.Type == "TIMESTAMP" || col.Name == "timestamp" { + timestampCol = col.Name + } else if col.Type == "VARCHAR" { + dimensions = append(dimensions, fmt.Sprintf("%q", col.Name)) + } else { + dimensions = append(dimensions, fmt.Sprintf(`{"type": %q, "name": %q}`, mapColTypeForDruid(col.Type), col.Name)) + } + } + + return fmt.Sprintf( + `{ + "dataSource": "%s", + "timestampSpec": { + "column": "%s", + "format": "auto" + }, + "transformSpec": {}, + "dimensionsSpec": { + "dimensions": [ + %s + ] + }, + "granularitySpec": { + "queryGranularity": "none", + "rollup": false, + "segmentGranularity": "day" + } + }`, + name, + timestampCol, + strings.Join(dimensions, ",\n"), + ) +} + +func mapColTypeForDruid(colType string) string { + if colType == "BIGINT" { + return "long" + } + return strings.ToLower(colType) +} diff --git a/runtime/testruntime/testdata/ad_bids_druid/dashboards/ad_bids_metrics.yaml b/runtime/testruntime/testdata/ad_bids_druid/dashboards/ad_bids_metrics.yaml new file mode 100644 index 000000000000..50747c14dd21 --- /dev/null +++ b/runtime/testruntime/testdata/ad_bids_druid/dashboards/ad_bids_metrics.yaml @@ -0,0 +1,26 @@ +model: ad_bids +display_name: Ad bids +description: + +timeseries: __time +smallest_time_grain: + +dimensions: + - label: Publisher + name: pub + property: publisher + description: "" + - label: Domain + name: dom + property: domain + description: "" + +measures: + - label: "Number of bids" + expression: count(*) + description: "" + format_preset: "" + - label: "Average bid price" + expression: avg(bid_price) + description: "" + format_preset: "" diff --git a/runtime/testruntime/testdata/ad_bids_druid/rill.yaml b/runtime/testruntime/testdata/ad_bids_druid/rill.yaml new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries.yaml b/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries.yaml new file mode 100644 index 000000000000..a4b67ed11c73 --- /dev/null +++ b/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries.yaml @@ -0,0 +1,27 @@ +model: timeseries_model +display_name: Time series +description: + +timeseries: __time +smallest_time_grain: + +dimensions: + - name: device + column: device + - name: publisher + column: publisher + - name: domain + column: domain + - name: latitude + column: latitude + - name: country + column: country +measures: + - name: max_clicks + expression: "max(clicks)" + - name: count + expression: "count(*)" + - name: sum_imps + expression: "sum(imps)" + - name: sum_clicks + expression: "sum(clicks)" diff --git a/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_dst_backwards.yaml b/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_dst_backwards.yaml new file mode 100644 index 000000000000..892089f72f01 --- /dev/null +++ b/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_dst_backwards.yaml @@ -0,0 +1,28 @@ +# Visit https://docs.rilldata.com/reference/project-files to learn more about Rill project files. + +title: timeseries_dst +model: timeseries_dst_backwards_model +timeseries: __time +first_day_of_week: 7 +measures: + - label: Total records + expression: count(*) + name: total_records + description: Total number of records present + format_preset: humanize + valid_percent_of_total: true +dimensions: + - name: label + column: label +available_time_zones: + - America/Los_Angeles + - America/Chicago + - America/New_York + - Europe/London + - Europe/Paris + - Asia/Jerusalem + - Europe/Moscow + - Asia/Kolkata + - Asia/Shanghai + - Asia/Tokyo + - Australia/Sydney \ No newline at end of file diff --git a/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_dst_forwards.yaml b/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_dst_forwards.yaml new file mode 100644 index 000000000000..deb28feeacc5 --- /dev/null +++ b/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_dst_forwards.yaml @@ -0,0 +1,28 @@ +# Visit https://docs.rilldata.com/reference/project-files to learn more about Rill project files. + +title: timeseries_dst +model: timeseries_dst_forwards_model +timeseries: __time +first_day_of_week: 7 +measures: + - label: Total records + expression: count(*) + name: total_records + description: Total number of records present + format_preset: humanize + valid_percent_of_total: true +dimensions: + - name: label + column: label +available_time_zones: + - America/Los_Angeles + - America/Chicago + - America/New_York + - Europe/London + - Europe/Paris + - Asia/Jerusalem + - Europe/Moscow + - Asia/Kolkata + - Asia/Shanghai + - Asia/Tokyo + - Australia/Sydney diff --git a/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_gaps.yaml b/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_gaps.yaml new file mode 100644 index 000000000000..b5ecbe5e6596 --- /dev/null +++ b/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_gaps.yaml @@ -0,0 +1,25 @@ +model: timeseries_gap_model +display_name: Time series gaps + +timeseries: __time + +dimensions: + - name: device + column: device + - name: publisher + column: publisher + - name: domain + column: domain + - name: latitude + column: latitude + - name: country + column: country +measures: + - name: max_clicks + expression: "max(clicks)" + - name: count + expression: "count(*)" + - name: sum_imps + expression: "sum(imps)" + - name: sum_clicks + expression: "sum(clicks)" diff --git a/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_year.yaml b/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_year.yaml new file mode 100644 index 000000000000..ffb109f5b87b --- /dev/null +++ b/runtime/testruntime/testdata/timeseries_druid/dashboards/timeseries_year.yaml @@ -0,0 +1,21 @@ +model: timeseries_year_model +display_name: Year time series +description: + +timeseries: __time +smallest_time_grain: + +dimensions: + - name: device + column: device + - name: publisher + column: publisher + - name: country + column: country +measures: + - name: max_clicks + expression: "max(clicks)" + - name: count + expression: "count(*)" + - name: sum_clicks + expression: "sum(clicks)" \ No newline at end of file diff --git a/runtime/testruntime/testdata/timeseries_druid/rill.yaml b/runtime/testruntime/testdata/timeseries_druid/rill.yaml new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/runtime/testruntime/testruntime.go b/runtime/testruntime/testruntime.go index 52a37b3a2618..7a953a51d34d 100644 --- a/runtime/testruntime/testruntime.go +++ b/runtime/testruntime/testruntime.go @@ -31,6 +31,12 @@ import ( _ "github.com/rilldata/rill/runtime/reconcilers" ) +// testConnector replace with "druid" for testing on druid +const testConnector = "duckdb" + +// testConnectorDSN replace with "http://localhost:8888/druid/v2/sql/avatica-protobuf/" for testing on druid +const testConnectorDSN = "" + // TestingT satisfies both *testing.T and *testing.B. type TestingT interface { Name() string @@ -90,7 +96,7 @@ func NewInstanceWithOptions(t TestingT, opts InstanceOptions) (*runtime.Runtime, tmpDir := t.TempDir() inst := &drivers.Instance{ - OLAPConnector: "duckdb", + OLAPConnector: testConnector, RepoConnector: "repo", CatalogConnector: "catalog", Connectors: []*runtimev1.Connector{ @@ -100,9 +106,9 @@ func NewInstanceWithOptions(t TestingT, opts InstanceOptions) (*runtime.Runtime, Config: map[string]string{"dsn": tmpDir}, }, { - Type: "duckdb", - Name: "duckdb", - Config: map[string]string{"dsn": ""}, + Type: testConnector, + Name: testConnector, + Config: map[string]string{"dsn": testConnectorDSN}, }, { Type: "sqlite", @@ -117,6 +123,7 @@ func NewInstanceWithOptions(t TestingT, opts InstanceOptions) (*runtime.Runtime, StageChanges: opts.StageChanges, ModelDefaultMaterialize: opts.ModelDefaultMaterialize, ModelMaterializeDelaySeconds: opts.ModelMaterializeDelaySeconds, + EmbedCatalog: testConnector != "druid", } for path, data := range opts.Files { @@ -170,7 +177,7 @@ func NewInstanceForProject(t TestingT, name string) (*runtime.Runtime, string) { projectPath := filepath.Join(currentFile, "..", "testdata", name) inst := &drivers.Instance{ - OLAPConnector: "duckdb", + OLAPConnector: testConnector, RepoConnector: "repo", CatalogConnector: "catalog", Connectors: []*runtimev1.Connector{ @@ -180,9 +187,9 @@ func NewInstanceForProject(t TestingT, name string) (*runtime.Runtime, string) { Config: map[string]string{"dsn": projectPath}, }, { - Type: "duckdb", - Name: "duckdb", - Config: map[string]string{"dsn": ""}, + Type: testConnector, + Name: testConnector, + Config: map[string]string{"dsn": testConnectorDSN}, }, { Type: "sqlite", @@ -192,7 +199,7 @@ func NewInstanceForProject(t TestingT, name string) (*runtime.Runtime, string) { Config: map[string]string{"dsn": fmt.Sprintf("file:%s?mode=memory&cache=shared", t.Name())}, }, }, - EmbedCatalog: true, + EmbedCatalog: testConnector != "druid", } err := rt.CreateInstance(context.Background(), inst)