From 8d11478f35187b8afefb366a9ad60456871c1f4f Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Mon, 16 Sep 2019 23:20:09 +0200 Subject: [PATCH 01/12] refactor for druid 0.16 (wip) --- .ruby-version | 2 +- Gemfile.lock | 79 +++++++++++++----------- database.json.example | 2 +- dumbo.gemspec | 8 +-- lib/dumbo/cli.rb | 55 ++++++----------- lib/dumbo/task/compact_segments.rb | 97 ++++++------------------------ lib/dumbo/task/reintake.rb | 8 +-- lib/dumbo/time_ext.rb | 20 +++++- sources.json.example | 1 - 9 files changed, 106 insertions(+), 166 deletions(-) diff --git a/.ruby-version b/.ruby-version index 5edb1d0..d5a6340 100644 --- a/.ruby-version +++ b/.ruby-version @@ -1 +1 @@ -ruby-2.2 +ruby-2.6 \ No newline at end of file diff --git a/Gemfile.lock b/Gemfile.lock index e032444..744da25 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,34 +1,38 @@ GIT remote: git://github.com/ruby-druid/ruby-druid.git - revision: 552674ba04a93c20d435c33d94b1ac0650a6afac + revision: 752896502e29898060e13b0ed05fa35749043b2b specs: - ruby-druid (0.2.0.rc3) - activemodel - activesupport - iso8601 - multi_json - rest-client - zk + ruby-druid (0.10.2) + activemodel (>= 3.0.0) + activesupport (>= 3.0.0) + iso8601 (~> 0.8) + multi_json (~> 1.0) + rest-client (>= 1.8, < 3.0) + zk (~> 1.9) GEM remote: https://rubygems.org/ specs: - activemodel (5.2.1) - activesupport (= 5.2.1) - activesupport (5.2.1) + activemodel (6.0.0) + activesupport (= 6.0.0) + activesupport (6.0.0) concurrent-ruby (~> 1.0, >= 1.0.2) i18n (>= 0.7, < 2) minitest (~> 5.1) tzinfo (~> 1.1) - concurrent-ruby (1.0.5) - domain_name (0.5.24) + zeitwerk (~> 2.1, >= 2.1.8) + addressable (2.7.0) + public_suffix (>= 2.0.2, < 5.0) + concurrent-ruby (1.1.5) + domain_name (0.5.20190701) unf (>= 0.0.5, < 1.0.0) erubis (2.7.0) - http-cookie (1.0.2) + http-accept (1.7.0) + http-cookie (1.0.3) domain_name (~> 0.5) - i18n (1.1.0) + i18n (1.6.0) concurrent-ruby (~> 1.0) - iso8601 (0.8.6) + iso8601 (0.12.1) jmx4r (0.1.4) liquid-ext (3.5.3) activesupport @@ -37,34 +41,37 @@ GEM mixlib-cli multi_json terminal-table - little-plugger (1.1.3) - logging (1.8.2) - little-plugger (>= 1.1.3) - multi_json (>= 1.8.4) - mime-types (2.5) + mime-types (3.3) + mime-types-data (~> 3.2015) + mime-types-data (3.2019.0904) minitest (5.11.3) - mixlib-cli (1.5.0) - multi_json (1.11.0) + mixlib-cli (2.1.1) + multi_json (1.13.1) mysql2 (0.5.2) - netrc (0.10.3) - pg (1.1.3) - rest-client (1.8.0) + netrc (0.11.0) + pg (1.1.4) + public_suffix (4.0.1) + rest-client (2.1.0) + http-accept (>= 1.7.0, < 2.0) http-cookie (>= 1.0.2, < 2.0) - mime-types (>= 1.16, < 3.0) - netrc (~> 0.7) - sequel (4.48.0) - terminal-table (1.4.5) + mime-types (>= 1.16, < 4.0) + netrc (~> 0.8) + sequel (5.24.0) + terminal-table (1.8.0) + unicode-display_width (~> 1.1, >= 1.1.1) thread_safe (0.3.6) tzinfo (1.2.5) thread_safe (~> 0.1) unf (0.1.4) unf_ext - unf_ext (0.0.7.1) - webhdfs (0.6.0) - zk (1.9.5) - logging (~> 1.8.2) + unf_ext (0.0.7.6) + unicode-display_width (1.6.0) + webhdfs (0.8.0) + addressable + zeitwerk (2.1.10) + zk (1.9.6) zookeeper (~> 1.4.0) - zookeeper (1.4.10) + zookeeper (1.4.11) PLATFORMS ruby @@ -78,4 +85,4 @@ DEPENDENCIES webhdfs BUNDLED WITH - 1.16.1 + 1.17.3 diff --git a/database.json.example b/database.json.example index 90913dd..ae8c168 100644 --- a/database.json.example +++ b/database.json.example @@ -1,6 +1,6 @@ { "adapter": "postgres" - "host": "localhist", + "host": "localhost", "username": "druid", "password": "diurd", "database": "druid" diff --git a/dumbo.gemspec b/dumbo.gemspec index 63ff96d..863c71e 100644 --- a/dumbo.gemspec +++ b/dumbo.gemspec @@ -2,12 +2,12 @@ Gem::Specification.new do |spec| spec.name = "dumbo" - spec.version = "0.1.0" - spec.authors = ["remerge GmbH"] - spec.email = ["tech@remerge.io"] + spec.version = "0.2.0" + spec.authors = ["LiquidM GmbH"] + spec.email = ["tech@liquidm.com"] spec.summary = %q{} spec.description = %q{} - spec.homepage = "https://github.com/remerge/dumbo" + spec.homepage = "https://github.com/liquidm/druid-dumbo" spec.license = "MIT" spec.files = `git ls-files -z`.split("\x0") diff --git a/lib/dumbo/cli.rb b/lib/dumbo/cli.rb index 4c9be02..b9159ca 100644 --- a/lib/dumbo/cli.rb +++ b/lib/dumbo/cli.rb @@ -38,13 +38,6 @@ def run validate_events(topic) end run_tasks - when "merge" - $log.info("merging segments") - @segments = Dumbo::Segment.all(@db, @druid, @sources) - @topics.each do |topic| - merge_segments(topic) - end - run_tasks when "compact" $log.info("compacting segments") @segments = Dumbo::Segment.all(@db, @druid, @sources) @@ -176,7 +169,6 @@ def get_overlapping_segments_and_interval(source, check_interval, available_segm newest = check_interval.last check_range = (oldest...newest) - $log.info("checking overlapping intervals for", dataSource: dataSource, interval: check_range) segments = available_segments.select do |segment| if segment.source == dataSource && (segment.interval[0]...segment.interval[-1]).overlaps?(check_range) @@ -205,41 +197,36 @@ def get_segment_granularity(source) 1.hour when 'day' 1.day + when 'week' + 7.days + when 'month' + 1.month + when 'year' + 1.year else raise "Unsupported segmentGranularity #{source['output']['segmentGranularity']}" end end - def merge_segments(topic) + def compact_segments(topic) source = @sources[topic] - $log.info("compacting scan", topic: topic, interval: @interval) - merging = get_overlapping_segments_and_interval(source, @interval) + segment_size = get_segment_granularity(source) - merging[:interval][0].to_i.step(merging[:interval][-1].to_i - 1, segment_size) do |start_time| - segment_interval = [Time.at(start_time).utc, Time.at(start_time + segment_size).utc] - segment_input = get_overlapping_segments_and_interval(source, segment_interval, merging[:segments]) - maxShards = (source['output'] && source['output']['maxShards']) || 10 - if maxShards > 0 && maxShards < segment_input[:segments].length - $log.info("detected too many shards,", is: segment_input[:segments].length, max: maxShards) - @tasks << Task::CompactSegments.new(source, segment_interval) - end - end - end + floor_date = [segment_size, 1.day].max - def compact_segments(topic) - source = @sources[topic] - compact_interval = [@interval[0].utc, @interval[-1].floor(1.day).utc] - compact_interval[0] -= 1.day if compact_interval[0] == compact_interval[-1] + $log.info("request compaction", topic: topic, interval: @interval, granularity: (source['output']['segmentGranularity'] || 'hour').downcase) + compact_interval = [@interval[0].floor(floor_date).utc, @interval[-1].ceil(floor_date).utc] $log.info("compacting scan", topic: topic, interval: compact_interval) - compacting = get_overlapping_segments_and_interval(source, compact_interval) - segment_size = get_segment_granularity(source) expectedMetrics = Set.new(source['metrics'].keys).add("events") expectedDimensions = Set.new(source['dimensions']) - compacting[:interval][0].to_i.step(compacting[:interval][-1].to_i - 1, segment_size) do |start_time| - segment_interval = [Time.at(start_time).utc, Time.at(start_time + segment_size).utc] - segment_input = get_overlapping_segments_and_interval(source, segment_interval, compacting[:segments]) + segment_interval = [compact_interval[0], compact_interval[0]] + while segment_interval[-1] < compact_interval[-1] do + segment_interval = [segment_interval[-1], segment_interval[-1].ceil(segment_size).utc] + $log.info("scanning segment", topic: topic, interval: segment_interval) + + segment_input = get_overlapping_segments_and_interval(source, segment_interval) must_compact = segment_input[:segments].any? do |input_segment| should_compact = false @@ -272,12 +259,6 @@ def compact_segments(topic) should_compact end - maxShards = (source['output'] && source['output']['maxShards']) || 0 - if maxShards > 0 && maxShards < segment_input[:segments].length - $log.info("detected too many shards,", is: segment_input[:segments].length, max: maxShards) - must_compact = true - end - @tasks << Task::CompactSegments.new(source, segment_interval) if must_compact end end @@ -292,8 +273,6 @@ def granularity(g) 30.minutes when 'hour' 1.hour - when 'day' - 1.day else raise "Unsupported granularity #{g}" end diff --git a/lib/dumbo/task/compact_segments.rb b/lib/dumbo/task/compact_segments.rb index 32ddc8d..ee6ff0d 100644 --- a/lib/dumbo/task/compact_segments.rb +++ b/lib/dumbo/task/compact_segments.rb @@ -12,87 +12,30 @@ def initialize(source, interval) def as_json(options = {}) config = { - type: 'index_hadoop', - spec: { - dataSchema: { - dataSource: @source['dataSource'], - parser: { - parseSpec: { - format: "json", - timestampSpec: { - column: (@source['input']['timestamp']['column'] || 'timestamp'), - format: (@source['input']['timestamp']['format'] || 'ruby'), - }, - dimensionsSpec: { - dimensions: (@source['dimensions'] || []), - spatialDimensions: (@source['spacialDimensions'] || []), - } - } - }, - metricsSpec: (@source['metrics'] || {}).map do |name, aggregator| - { type: aggregator, name: name, fieldName: name } - # WARNING: do NOT use count for events, will count in segment vs count in raw input - end + [{ type: "doubleSum", name: "events", fieldName: "events" }], - granularitySpec: { - segmentGranularity: @source['output']['segmentGranularity'] || "hour", - queryGranularity: @source['output']['queryGranularity'] || "minute", - intervals: [@interval], - } - }, - ioConfig: { - type: 'hadoop', - inputSpec: { - type: 'dataSource', - ingestionSpec: { - type: 'dataSource', - dataSource: @source['dataSource'], - interval: @interval, - granularity: @source['output']['queryGranularity'] || "minute", - }, - }, - }, - tuningConfig: { - type: "hadoop", - overwriteFiles: true, - ignoreInvalidRows: true, - buildV9Directly: true, - useCombiner: true, - forceExtendableShardSpecs: true, - partitionsSpec: { - type: "none", - }, - indexSpec: { - bitmap: { - type: @source['output']['bitmap'] || "roaring", - }, - longEncoding: "auto", + type: 'compact', + dataSource: @source['dataSource'], + interval: @interval, + dimensionsSpec: { + dimensions: (@source['dimensions'] || []), + spatialDimensions: (@source['spacialDimensions'] || []), + }, + metricsSpec: (@source['metrics'] || {}).map do |name, aggregator| + { type: aggregator, name: name, fieldName: name } + # WARNING: do NOT use count for events, will count in segment vs count in raw input + end + [{ type: "longSum", name: "events", fieldName: "events" }], + segmentGranularity: @source['output']['segmentGranularity'] || "hour", + targetCompactionSizeBytes: 419430400, + tuningConfig: { + type: "index", + partitionDimensions: [@source['output']['partitionDimension']], + indexSpec: { + bitmap: { + type: @source['output']['bitmap'] || "roaring", }, + longEncoding: "auto", }, }, } - if @source['output']['partitionDimension'] - config[:spec][:tuningConfig][:partitionsSpec] = { - type: "dimension", - partitionDimension: @source['output']['partitionDimension'], - targetPartitionSize: (@source['output']['targetPartitionSize'] || 1000000), - } - elsif (@source['output']['targetPartitionSize'] || 0) > 0 - config[:spec][:tuningConfig][:partitionsSpec] = { - type: "hashed", - targetPartitionSize: @source['output']['targetPartitionSize'], - numShards: -1, - } - elsif (@source['output']['numShards'] || 0) > 0 - config[:spec][:tuningConfig][:partitionsSpec] = { - type: "hashed", - targetPartitionSize: -1, - numShards: @source['output']['numShards'], - } - end - - if (@source['output']['maxPartitionSize']) - config[:spec][:tuningConfig][:partitionsSpec][:maxPartitionSize] = @source['output']['maxPartitionSize'] - end config end diff --git a/lib/dumbo/task/reintake.rb b/lib/dumbo/task/reintake.rb index 3ef75b5..aeb5028 100644 --- a/lib/dumbo/task/reintake.rb +++ b/lib/dumbo/task/reintake.rb @@ -52,7 +52,8 @@ def as_json(options = {}) forceExtendableShardSpecs: true, useCombiner: true, partitionsSpec: { - type: "none", + type: "hashed", + targetPartitionSize: 419430400, }, indexSpec: { bitmap: { @@ -63,11 +64,6 @@ def as_json(options = {}) }, }, } - config[:spec][:tuningConfig][:partitionsSpec] = { - type: "hashed", - targetPartitionSize: -1, - numShards: 10, - } config end end diff --git a/lib/dumbo/time_ext.rb b/lib/dumbo/time_ext.rb index c639828..8a31d78 100644 --- a/lib/dumbo/time_ext.rb +++ b/lib/dumbo/time_ext.rb @@ -2,10 +2,26 @@ class Time def floor(granularity = 1.day) - Time.at((self.to_f / granularity).floor * granularity) + if granularity == 7.days + self.at_beginning_of_week + elsif granularity == 1.month + self.at_beginning_of_month + elsif granularity == 1.year + self.at_beginning_of_year + else + Time.at((self.to_f / granularity).floor * granularity) + end end def ceil(granularity = 1.day) - Time.at((self.to_f / granularity).ceil * granularity) + if granularity == 7.days + self.at_end_of_week.ceil + elsif granularity == 1.month + self.at_end_of_month.ceil + elsif granularity == 1.year + self.at_end_of_year.ceil + else + Time.at((self.to_f / granularity).ceil * granularity) + end end end diff --git a/sources.json.example b/sources.json.example index 17a35df..e77bff9 100644 --- a/sources.json.example +++ b/sources.json.example @@ -14,7 +14,6 @@ "output": { "segmentGranularity": "minute", "indexGranularity": "minute", - "numShards": 1 }, "metrics": { "value": "doubleSum" From 01772e57c76b53b6787f0e213a5e16c5a1169b6c Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Wed, 18 Sep 2019 12:14:21 +0200 Subject: [PATCH 02/12] backport to pre 0.15 --- lib/dumbo/task/compact_segments.rb | 84 +++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 20 deletions(-) diff --git a/lib/dumbo/task/compact_segments.rb b/lib/dumbo/task/compact_segments.rb index ee6ff0d..a1067b8 100644 --- a/lib/dumbo/task/compact_segments.rb +++ b/lib/dumbo/task/compact_segments.rb @@ -12,31 +12,75 @@ def initialize(source, interval) def as_json(options = {}) config = { - type: 'compact', - dataSource: @source['dataSource'], - interval: @interval, - dimensionsSpec: { - dimensions: (@source['dimensions'] || []), - spatialDimensions: (@source['spacialDimensions'] || []), - }, - metricsSpec: (@source['metrics'] || {}).map do |name, aggregator| - { type: aggregator, name: name, fieldName: name } - # WARNING: do NOT use count for events, will count in segment vs count in raw input - end + [{ type: "longSum", name: "events", fieldName: "events" }], - segmentGranularity: @source['output']['segmentGranularity'] || "hour", - targetCompactionSizeBytes: 419430400, - tuningConfig: { - type: "index", - partitionDimensions: [@source['output']['partitionDimension']], - indexSpec: { - bitmap: { - type: @source['output']['bitmap'] || "roaring", + type: 'index_hadoop', + spec: { + dataSchema: { + dataSource: @source['dataSource'], + parser: { + parseSpec: { + format: "json", + timestampSpec: { + column: (@source['input']['timestamp']['column'] || 'timestamp'), + format: (@source['input']['timestamp']['format'] || 'ruby'), + }, + dimensionsSpec: { + dimensions: (@source['dimensions'] || []), + spatialDimensions: (@source['spacialDimensions'] || []), + } + } + }, + metricsSpec: (@source['metrics'] || {}).map do |name, aggregator| + { type: aggregator, name: name, fieldName: name } + # WARNING: do NOT use count for events, will count in segment vs count in raw input + end + [{ type: "doubleSum", name: "events", fieldName: "events" }], + granularitySpec: { + segmentGranularity: @source['output']['segmentGranularity'] || "hour", + queryGranularity: @source['output']['queryGranularity'] || "minute", + intervals: [@interval], + } + }, + ioConfig: { + type: 'hadoop', + inputSpec: { + type: 'dataSource', + ingestionSpec: { + type: 'dataSource', + dataSource: @source['dataSource'], + interval: @interval, + granularity: @source['output']['queryGranularity'] || "minute", + }, + }, + }, + tuningConfig: { + type: "hadoop", + overwriteFiles: true, + ignoreInvalidRows: true, + buildV9Directly: true, + useCombiner: true, + forceExtendableShardSpecs: true, + partitionsSpec: { + type: "hashed", + targetPartitionSize: (@source['output']['targetPartitionSize'] || 419430400), + numShards: -1, + }, + indexSpec: { + bitmap: { + type: @source['output']['bitmap'] || "roaring", + }, + longEncoding: "auto", }, - longEncoding: "auto", }, }, } + if @source['output']['partitionDimension'] + config[:spec][:tuningConfig][:partitionsSpec] = { + type: "dimension", + partitionDimension: @source['output']['partitionDimension'], + targetPartitionSize: (@source['output']['targetPartitionSize'] || 419430400), + } + end + config end end From 2128195d4a79683fe24d02378fd3a4684c9f3918 Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Wed, 18 Sep 2019 12:53:46 +0200 Subject: [PATCH 03/12] bring events metrics up to standard --- lib/dumbo/task/compact_segments.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/dumbo/task/compact_segments.rb b/lib/dumbo/task/compact_segments.rb index a1067b8..83b7aa3 100644 --- a/lib/dumbo/task/compact_segments.rb +++ b/lib/dumbo/task/compact_segments.rb @@ -32,7 +32,7 @@ def as_json(options = {}) metricsSpec: (@source['metrics'] || {}).map do |name, aggregator| { type: aggregator, name: name, fieldName: name } # WARNING: do NOT use count for events, will count in segment vs count in raw input - end + [{ type: "doubleSum", name: "events", fieldName: "events" }], + end + [{ type: "longSum", name: "events", fieldName: "events" }], granularitySpec: { segmentGranularity: @source['output']['segmentGranularity'] || "hour", queryGranularity: @source['output']['queryGranularity'] || "minute", From 548c1197e7e0d7a36d010e3041a4e6b4e49a99d2 Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Wed, 18 Sep 2019 14:18:16 +0200 Subject: [PATCH 04/12] fixup for diplomat --- Gemfile.lock | 12 ++++++++++++ lib/dumbo/cli.rb | 5 ++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/Gemfile.lock b/Gemfile.lock index 12be165..65a7101 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -23,10 +23,17 @@ GEM zeitwerk (~> 2.1, >= 2.1.8) addressable (2.7.0) public_suffix (>= 2.0.2, < 5.0) + coderay (1.1.2) concurrent-ruby (1.1.5) + deep_merge (1.2.1) + diplomat (2.2.5) + deep_merge (~> 1.0, >= 1.0.1) + faraday (~> 0.9) domain_name (0.5.20190701) unf (>= 0.0.5, < 1.0.0) erubis (2.7.0) + faraday (0.15.4) + multipart-post (>= 1.2, < 3) http-accept (1.7.0) http-cookie (1.0.3) domain_name (~> 0.5) @@ -41,15 +48,20 @@ GEM mixlib-cli multi_json terminal-table + method_source (0.9.2) mime-types (3.3) mime-types-data (~> 3.2015) mime-types-data (3.2019.0904) minitest (5.11.3) mixlib-cli (2.1.1) multi_json (1.13.1) + multipart-post (2.1.1) mysql2 (0.5.2) netrc (0.11.0) pg (1.1.4) + pry (0.12.2) + coderay (~> 1.1.0) + method_source (~> 0.9.0) public_suffix (4.0.1) rest-client (2.1.0) http-accept (>= 1.7.0, < 2.0) diff --git a/lib/dumbo/cli.rb b/lib/dumbo/cli.rb index c48f2cc..61b1b9b 100644 --- a/lib/dumbo/cli.rb +++ b/lib/dumbo/cli.rb @@ -309,7 +309,10 @@ def log_tasks_number service_name, number data << new_entry unless updated - Diplomat::Kv.put("druid-dumbo/#{service_name}", MultiJson.dump(data)) + begin + Diplomat::Kv.put("druid-dumbo/#{service_name}", MultiJson.dump(data)) + rescue + end end end end From ad4d7ff13dcf9862205f939c0a59669d8516e20d Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Wed, 18 Sep 2019 20:35:13 +0200 Subject: [PATCH 05/12] make sure segment never crosses boundaries set by cli --- lib/dumbo/cli.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/dumbo/cli.rb b/lib/dumbo/cli.rb index 61b1b9b..f03cf72 100644 --- a/lib/dumbo/cli.rb +++ b/lib/dumbo/cli.rb @@ -219,7 +219,7 @@ def compact_segments(topic) floor_date = [segment_size, 1.day].max $log.info("request compaction", topic: topic, interval: @interval, granularity: (source['output']['segmentGranularity'] || 'hour').downcase) - compact_interval = [@interval[0].floor(floor_date).utc, @interval[-1].ceil(floor_date).utc] + compact_interval = [@interval[0].ceil(floor_date).utc, @interval[-1].floor(floor_date).utc] $log.info("compacting scan", topic: topic, interval: compact_interval) expectedMetrics = Set.new(source['metrics'].keys).add("events") From c518a912c460b98792c20b13d2dd5241530ffeb6 Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Sat, 21 Sep 2019 00:54:25 +0200 Subject: [PATCH 06/12] disable partition dimension --- lib/dumbo/cli.rb | 4 ++-- lib/dumbo/task/compact_segments.rb | 21 +++++++++++++-------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/lib/dumbo/cli.rb b/lib/dumbo/cli.rb index f03cf72..41fd391 100644 --- a/lib/dumbo/cli.rb +++ b/lib/dumbo/cli.rb @@ -219,7 +219,7 @@ def compact_segments(topic) floor_date = [segment_size, 1.day].max $log.info("request compaction", topic: topic, interval: @interval, granularity: (source['output']['segmentGranularity'] || 'hour').downcase) - compact_interval = [@interval[0].ceil(floor_date).utc, @interval[-1].floor(floor_date).utc] + compact_interval = [@interval[0].floor(floor_date).utc, @interval[-1].floor(floor_date).utc] $log.info("compacting scan", topic: topic, interval: compact_interval) expectedMetrics = Set.new(source['metrics'].keys).add("events") @@ -227,7 +227,7 @@ def compact_segments(topic) segment_interval = [compact_interval[0], compact_interval[0]] while segment_interval[-1] < compact_interval[-1] do - segment_interval = [segment_interval[-1], segment_interval[-1].ceil(segment_size).utc] + segment_interval = [segment_interval[-1].utc, (segment_interval[-1] + 1).ceil(floor_date).utc] $log.info("scanning segment", topic: topic, interval: segment_interval) segment_input = get_overlapping_segments_and_interval(source, segment_interval) diff --git a/lib/dumbo/task/compact_segments.rb b/lib/dumbo/task/compact_segments.rb index 83b7aa3..81899ab 100644 --- a/lib/dumbo/task/compact_segments.rb +++ b/lib/dumbo/task/compact_segments.rb @@ -60,8 +60,7 @@ def as_json(options = {}) forceExtendableShardSpecs: true, partitionsSpec: { type: "hashed", - targetPartitionSize: (@source['output']['targetPartitionSize'] || 419430400), - numShards: -1, + numShards: @source['output']['numShards'] || 3 }, indexSpec: { bitmap: { @@ -73,14 +72,20 @@ def as_json(options = {}) }, } - if @source['output']['partitionDimension'] - config[:spec][:tuningConfig][:partitionsSpec] = { - type: "dimension", - partitionDimension: @source['output']['partitionDimension'], - targetPartitionSize: (@source['output']['targetPartitionSize'] || 419430400), - } + if @source['output']['targetPartitionSize'] + config[:spec][:tuningConfig][:partitionsSpec][:targetPartitionSize] = @source['output']['targetPartitionSize'] + config[:spec][:tuningConfig][:partitionsSpec][:numShards] = -1 end + # if @source['output']['partitionDimension'] + # config[:spec][:tuningConfig][:partitionsSpec] = { + # type: "dimension", + # partitionDimension: @source['output']['partitionDimension'] + # targetPartitionSize: (@source['output']['targetPartitionSize'] || 419430400) + # } + # end + + config end end From 3f095b4de9b056b1624fbdad81fa7c00121d724b Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Mon, 23 Sep 2019 00:06:42 +0200 Subject: [PATCH 07/12] static dimensions seems to be the way to go --- lib/dumbo/task/compact_segments.rb | 4 +++- lib/dumbo/task/reintake.rb | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/dumbo/task/compact_segments.rb b/lib/dumbo/task/compact_segments.rb index 81899ab..1e06365 100644 --- a/lib/dumbo/task/compact_segments.rb +++ b/lib/dumbo/task/compact_segments.rb @@ -58,6 +58,8 @@ def as_json(options = {}) buildV9Directly: true, useCombiner: true, forceExtendableShardSpecs: true, + maxRowsInMemory: 4000000, + numBackgroundPersistThreads: 1, partitionsSpec: { type: "hashed", numShards: @source['output']['numShards'] || 3 @@ -74,7 +76,7 @@ def as_json(options = {}) if @source['output']['targetPartitionSize'] config[:spec][:tuningConfig][:partitionsSpec][:targetPartitionSize] = @source['output']['targetPartitionSize'] - config[:spec][:tuningConfig][:partitionsSpec][:numShards] = -1 + config[:spec][:tuningConfig][:partitionsSpec].delete(:numShards) end # if @source['output']['partitionDimension'] diff --git a/lib/dumbo/task/reintake.rb b/lib/dumbo/task/reintake.rb index aeb5028..d79147a 100644 --- a/lib/dumbo/task/reintake.rb +++ b/lib/dumbo/task/reintake.rb @@ -50,10 +50,12 @@ def as_json(options = {}) ignoreInvalidRows: true, buildV9Directly: true, forceExtendableShardSpecs: true, + maxRowsInMemory: 4000000, + numBackgroundPersistThreads: 1, useCombiner: true, partitionsSpec: { type: "hashed", - targetPartitionSize: 419430400, + numShards: @source['output']['numShards'] || 3, }, indexSpec: { bitmap: { From f577a5070337ffb1cf80ca87f1101521577cc923 Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Mon, 23 Sep 2019 01:39:51 +0200 Subject: [PATCH 08/12] tuning maxRowsInMemory --- lib/dumbo/task/compact_segments.rb | 2 +- lib/dumbo/task/reintake.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/dumbo/task/compact_segments.rb b/lib/dumbo/task/compact_segments.rb index 1e06365..130a307 100644 --- a/lib/dumbo/task/compact_segments.rb +++ b/lib/dumbo/task/compact_segments.rb @@ -58,7 +58,7 @@ def as_json(options = {}) buildV9Directly: true, useCombiner: true, forceExtendableShardSpecs: true, - maxRowsInMemory: 4000000, + maxRowsInMemory: 10000000, numBackgroundPersistThreads: 1, partitionsSpec: { type: "hashed", diff --git a/lib/dumbo/task/reintake.rb b/lib/dumbo/task/reintake.rb index d79147a..a3113e3 100644 --- a/lib/dumbo/task/reintake.rb +++ b/lib/dumbo/task/reintake.rb @@ -50,7 +50,7 @@ def as_json(options = {}) ignoreInvalidRows: true, buildV9Directly: true, forceExtendableShardSpecs: true, - maxRowsInMemory: 4000000, + maxRowsInMemory: 10000000, numBackgroundPersistThreads: 1, useCombiner: true, partitionsSpec: { From ba8b3a23eb104a87859839f5ec57d6af3c43e60e Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Fri, 27 Sep 2019 20:50:51 +0200 Subject: [PATCH 09/12] ensure the shard number if explicit --- lib/dumbo/cli.rb | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/dumbo/cli.rb b/lib/dumbo/cli.rb index 41fd391..0bc077c 100644 --- a/lib/dumbo/cli.rb +++ b/lib/dumbo/cli.rb @@ -263,6 +263,11 @@ def compact_segments(topic) should_compact end + if source['output']['numShards'] && segment_input[:segments].size != source['output']['numShards'] + $log.info("detected segment number mismatch,", is: segment_input[:segments].size, expected: source['output']['numShards']) + must_compact = true + end + @tasks << Task::CompactSegments.new(source, segment_interval) if must_compact end end From b1444b8bc0c4428285c155dceec52fb38c0f9906 Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Sat, 28 Sep 2019 20:10:41 +0200 Subject: [PATCH 10/12] only force a run on too few, too many possible with overlapping segments that can't be unloaded --- lib/dumbo/cli.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/dumbo/cli.rb b/lib/dumbo/cli.rb index 0bc077c..49b6642 100644 --- a/lib/dumbo/cli.rb +++ b/lib/dumbo/cli.rb @@ -263,8 +263,8 @@ def compact_segments(topic) should_compact end - if source['output']['numShards'] && segment_input[:segments].size != source['output']['numShards'] - $log.info("detected segment number mismatch,", is: segment_input[:segments].size, expected: source['output']['numShards']) + if source['output']['numShards'] && segment_input[:segments].size < source['output']['numShards'] + $log.info("detected too few segments,", is: segment_input[:segments].size, expected: source['output']['numShards']) must_compact = true end From 3f1fb9081d0cf2925a74abe1016aea45b25a8307 Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Sun, 29 Sep 2019 23:53:01 +0200 Subject: [PATCH 11/12] allow segments less than 1 day again --- lib/dumbo/cli.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/dumbo/cli.rb b/lib/dumbo/cli.rb index 49b6642..5cefd2e 100644 --- a/lib/dumbo/cli.rb +++ b/lib/dumbo/cli.rb @@ -216,7 +216,7 @@ def compact_segments(topic) source = @sources[topic] segment_size = get_segment_granularity(source) - floor_date = [segment_size, 1.day].max + floor_date = segment_size $log.info("request compaction", topic: topic, interval: @interval, granularity: (source['output']['segmentGranularity'] || 'hour').downcase) compact_interval = [@interval[0].floor(floor_date).utc, @interval[-1].floor(floor_date).utc] From a4cb5e368b65de2eab5d649b20eeff0b0a4910b9 Mon Sep 17 00:00:00 2001 From: Hagen Rother Date: Wed, 30 Oct 2019 08:46:04 +0100 Subject: [PATCH 12/12] trigger on too many shards, i.e. numShards implies maxShards --- lib/dumbo/cli.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/dumbo/cli.rb b/lib/dumbo/cli.rb index 5cefd2e..34cf151 100644 --- a/lib/dumbo/cli.rb +++ b/lib/dumbo/cli.rb @@ -263,7 +263,7 @@ def compact_segments(topic) should_compact end - if source['output']['numShards'] && segment_input[:segments].size < source['output']['numShards'] + if segment_input[:segments].size > 0 && source['output']['numShards'] && segment_input[:segments].size < source['output']['numShards'] $log.info("detected too few segments,", is: segment_input[:segments].size, expected: source['output']['numShards']) must_compact = true end