diff --git a/lib/timescaledb/continuous_aggregates_helper.rb b/lib/timescaledb/continuous_aggregates_helper.rb index 700feae..542f012 100644 --- a/lib/timescaledb/continuous_aggregates_helper.rb +++ b/lib/timescaledb/continuous_aggregates_helper.rb @@ -2,6 +2,21 @@ module Timescaledb module ContinuousAggregatesHelper extend ActiveSupport::Concern + included do + class_attribute :rollup_rules, default: { + /count\(\*\)\s+as\s+(\w+)/ => 'sum(\1) as \1', + /sum\((\w+)\)\s+as\s+(\w+)/ => 'sum(\2) as \2', + /min\((\w+)\)\s+as\s+(\w+)/ => 'min(\2) as \2', + /max\((\w+)\)\s+as\s+(\w+)/ => 'max(\2) as \2', + /candlestick_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2', + /stats_agg\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'rollup(\3) as \3', + /stats_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2', + /state_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2', + /percentile_agg\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'rollup(\3) as \3', + /heartbeat_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2', + } + end + class_methods do def continuous_aggregates(options = {}) @time_column = options[:time_column] || 'ts' @@ -22,6 +37,9 @@ def continuous_aggregates(options = {}) # Allow for custom aggregate definitions to override or add to scope-based ones @aggregates.merge!(options[:aggregates] || {}) + # Add custom rollup rules if provided + self.rollup_rules.merge!(options[:custom_rollup_rules] || {}) + define_continuous_aggregate_classes end @@ -37,32 +55,13 @@ def refresh_aggregates(timeframes = nil) def create_continuous_aggregates(with_data: false) @aggregates.each do |aggregate_name, config| - previous_timeframe = nil @timeframes.each do |timeframe| klass = const_get("#{aggregate_name}_per_#{timeframe}".classify) - interval = "'1 #{timeframe.to_s}'" - base_query = - if previous_timeframe - prev_klass = const_get("#{aggregate_name}_per_#{previous_timeframe}".classify) - prev_klass - .select("time_bucket(#{interval}, #{@time_column}) as #{@time_column}, #{config[:select]}") - .group(1, *config[:group_by]) - else - scope = public_send(config[:scope_name]) - select_values = scope.select_values.join(', ') - group_values = scope.group_values - - config[:select] = select_values.gsub('count(*) as total', 'sum(total) as total') - config[:group_by] = (2...(2 + group_values.size)).map(&:to_s).join(', ') - - self.select("time_bucket(#{interval}, #{@time_column}) as #{@time_column}, #{select_values}") - .group(1, *group_values) - end connection.execute <<~SQL CREATE MATERIALIZED VIEW IF NOT EXISTS #{klass.table_name} WITH (timescaledb.continuous) AS - #{base_query.to_sql} + #{klass.base_query.to_sql} #{with_data ? 'WITH DATA' : 'WITH NO DATA'}; SQL @@ -74,16 +73,30 @@ def create_continuous_aggregates(with_data: false) schedule_interval => INTERVAL '#{policy[:schedule_interval]}'); SQL end - - previous_timeframe = timeframe end end end + def rollup(scope, interval) + select_values = scope.select_values.join(', ') + group_values = scope.group_values + + self.select("time_bucket(#{interval}, #{@time_column}) as #{@time_column}, #{select_values}") + .group(1, *group_values) + end + + def apply_rollup_rules(select_values) + rollup_rules.reduce(select_values) do |result, (pattern, replacement)| + result.gsub(pattern, replacement) + end + end + private def define_continuous_aggregate_classes + base_model = self @aggregates.each do |aggregate_name, config| + previous_timeframe = nil @timeframes.each do |timeframe| _table_name = "#{aggregate_name}_per_#{timeframe}" class_name = "#{aggregate_name}_per_#{timeframe}".classify @@ -91,13 +104,27 @@ def define_continuous_aggregate_classes extend ActiveModel::Naming class << self - attr_accessor :config, :timeframe + attr_accessor :config, :timeframe, :base_query, :base_model end self.table_name = _table_name self.config = config self.timeframe = timeframe + interval = "'1 #{timeframe.to_s}'" + self.base_model = base_model + self.base_query = + if previous_timeframe + prev_klass = base_model.const_get("#{aggregate_name}_per_#{previous_timeframe}".classify) + prev_klass + .select("time_bucket(#{interval}, #{base_model.instance_variable_get(:@time_column)}) as #{base_model.instance_variable_get(:@time_column)}, #{config[:select]}") + .group(1, *config[:group_by]) + else + scope = base_model.public_send(config[:scope_name]) + config[:select] = base_model.apply_rollup_rules(scope.select_values.join(', ')) + config[:group_by] = scope.group_values + base_model.rollup(scope, interval) + end def self.refresh! connection.execute("CALL refresh_continuous_aggregate('#{table_name}', null, null);") @@ -111,6 +138,7 @@ def self.refresh_policy config[:refresh_policy]&.dig(timeframe) end end) + previous_timeframe = timeframe end end end diff --git a/spec/timescaledb/continuos_aggregates_helper_spec.rb b/spec/timescaledb/continuos_aggregates_helper_spec.rb index a4d5c21..f521d41 100644 --- a/spec/timescaledb/continuos_aggregates_helper_spec.rb +++ b/spec/timescaledb/continuos_aggregates_helper_spec.rb @@ -85,22 +85,21 @@ class Download < ActiveRecord::Base it 'defines rollup scope for aggregates' do test_class.create_continuous_aggregates aggregate_classes = [test_class::TotalDownloadsPerMinute, test_class::TotalDownloadsPerHour, test_class::TotalDownloadsPerDay, test_class::TotalDownloadsPerMonth] - aggregate_classes.each do |agg_class| - expect(agg_class).to respond_to(:rollup) - expect(agg_class.rollup.to_sql).to include('time_bucket') - expect(agg_class.rollup.to_sql).to include('count(*) as total') - end - end - it 'defines time-based scopes for aggregates' do - aggregate_classes = [test_class::TotalDownloadsPerMinute, test_class::TotalDownloadsPerHour, test_class::TotalDownloadsPerDay, test_class::TotalDownloadsPerMonth] - aggregate_scopes = [:total_downloads, :downloads_by_gem, :downloads_by_version] - - aggregate_scopes.each do |scope| - aggregate_classes.each do |agg_class| - expect(agg_class).to respond_to(scope) - end - end + expect(test_class::TotalDownloadsPerMinute.base_query.to_sql).to eq("SELECT time_bucket('1 minute', ts) as ts, count(*) as total FROM \"downloads\" GROUP BY 1") + expect(test_class::TotalDownloadsPerMonth.base_query.to_sql).to eq("SELECT time_bucket('1 month', ts) as ts, sum(total) as total FROM \"total_downloads_per_day\" GROUP BY 1") + expect(test_class::TotalDownloadsPerDay.base_query.to_sql).to eq("SELECT time_bucket('1 day', ts) as ts, sum(total) as total FROM \"total_downloads_per_hour\" GROUP BY 1") + expect(test_class::TotalDownloadsPerHour.base_query.to_sql).to eq("SELECT time_bucket('1 hour', ts) as ts, sum(total) as total FROM \"total_downloads_per_minute\" GROUP BY 1") + + expect(test_class::DownloadsByVersionPerMinute.base_query.to_sql).to eq("SELECT time_bucket('1 minute', ts) as ts, gem_name, gem_version, count(*) as total FROM \"downloads\" GROUP BY 1, \"downloads\".\"gem_name\", \"downloads\".\"gem_version\"") + expect(test_class::DownloadsByVersionPerMonth.base_query.to_sql).to eq("SELECT time_bucket('1 month', ts) as ts, gem_name, gem_version, sum(total) as total FROM \"downloads_by_version_per_day\" GROUP BY 1, \"downloads_by_version_per_day\".\"gem_name\", \"downloads_by_version_per_day\".\"gem_version\"") + expect(test_class::DownloadsByVersionPerDay.base_query.to_sql).to eq("SELECT time_bucket('1 day', ts) as ts, gem_name, gem_version, sum(total) as total FROM \"downloads_by_version_per_hour\" GROUP BY 1, \"downloads_by_version_per_hour\".\"gem_name\", \"downloads_by_version_per_hour\".\"gem_version\"") + expect(test_class::DownloadsByVersionPerHour.base_query.to_sql).to eq("SELECT time_bucket('1 hour', ts) as ts, gem_name, gem_version, sum(total) as total FROM \"downloads_by_version_per_minute\" GROUP BY 1, \"downloads_by_version_per_minute\".\"gem_name\", \"downloads_by_version_per_minute\".\"gem_version\"") + + expect(test_class::DownloadsByGemPerMinute.base_query.to_sql).to eq("SELECT time_bucket('1 minute', ts) as ts, gem_name, count(*) as total FROM \"downloads\" GROUP BY 1, \"downloads\".\"gem_name\"") + expect(test_class::DownloadsByGemPerMonth.base_query.to_sql).to eq("SELECT time_bucket('1 month', ts) as ts, gem_name, sum(total) as total FROM \"downloads_by_gem_per_day\" GROUP BY 1, \"downloads_by_gem_per_day\".\"gem_name\"") + expect(test_class::DownloadsByGemPerDay.base_query.to_sql).to eq("SELECT time_bucket('1 day', ts) as ts, gem_name, sum(total) as total FROM \"downloads_by_gem_per_hour\" GROUP BY 1, \"downloads_by_gem_per_hour\".\"gem_name\"") + expect(test_class::DownloadsByGemPerHour.base_query.to_sql).to eq("SELECT time_bucket('1 hour', ts) as ts, gem_name, sum(total) as total FROM \"downloads_by_gem_per_minute\" GROUP BY 1, \"downloads_by_gem_per_minute\".\"gem_name\"") end end @@ -112,34 +111,44 @@ class Download < ActiveRecord::Base it 'creates materialized views for each aggregate' do test_class.create_continuous_aggregates - expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_minute/i) - expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_hour/i) - expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_day/i) - expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_month/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_minute/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_minute/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_hour/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_hour/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_day/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_day/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_month/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_month/i) end it 'sets up refresh policies for each aggregate' do test_class.create_continuous_aggregates - expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_minutely/i) - expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_hour/i) - expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_day/i) - expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_month/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_gem_per_minute/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_version_per_minute/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_gem_per_hour/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_version_per_hour/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_gem_per_day/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_gem_per_month/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_version_per_day/i) + expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_version_per_month/i) end end describe 'refresh policies' do it 'defines appropriate refresh policies for each timeframe' do policies = { - minute: { start_offset: "INTERVAL '10 minutes'", end_offset: "INTERVAL '1 minute'", schedule_interval: "INTERVAL '1 minute'" }, - hour: { start_offset: "INTERVAL '4 hour'", end_offset: "INTERVAL '1 hour'", schedule_interval: "INTERVAL '1 hour'" }, - day: { start_offset: "INTERVAL '3 day'", end_offset: "INTERVAL '1 day'", schedule_interval: "INTERVAL '1 day'" }, - month: { start_offset: "INTERVAL '3 month'", end_offset: "INTERVAL '1 day'", schedule_interval: "INTERVAL '1 day'" } + minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" }, + hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" }, + day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 hour" }, + month: { start_offset: "3 month", end_offset: "1 hour", schedule_interval: "1 hour" } } policies.each do |timeframe, expected_policy| - actual_policy = test_class.const_get(timeframe).refresh_policy - expect(actual_policy).to eq(expected_policy) + %w[TotalDownloadsPer DownloadsByGemPer DownloadsByVersionPer].each do |klass| + actual_policy = test_class.const_get("#{klass}#{timeframe.to_s.capitalize}").refresh_policy + expect(actual_policy).to eq(expected_policy) + end end end end