Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #14

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .ruby-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ruby-2.2
ruby-2.6
79 changes: 43 additions & 36 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,41 +1,45 @@
GIT
remote: git://github.com/ruby-druid/ruby-druid.git
revision: 552674ba04a93c20d435c33d94b1ac0650a6afac
revision: 752896502e29898060e13b0ed05fa35749043b2b
specs:
ruby-druid (0.2.0.rc3)
activemodel
activesupport
iso8601
multi_json
rest-client
zk
ruby-druid (0.10.2)
activemodel (>= 3.0.0)
activesupport (>= 3.0.0)
iso8601 (~> 0.8)
multi_json (~> 1.0)
rest-client (>= 1.8, < 3.0)
zk (~> 1.9)

GEM
remote: https://rubygems.org/
specs:
activemodel (5.2.1)
activesupport (= 5.2.1)
activesupport (5.2.1)
activemodel (6.0.0)
activesupport (= 6.0.0)
activesupport (6.0.0)
concurrent-ruby (~> 1.0, >= 1.0.2)
i18n (>= 0.7, < 2)
minitest (~> 5.1)
tzinfo (~> 1.1)
zeitwerk (~> 2.1, >= 2.1.8)
addressable (2.7.0)
public_suffix (>= 2.0.2, < 5.0)
coderay (1.1.2)
concurrent-ruby (1.0.5)
concurrent-ruby (1.1.5)
deep_merge (1.2.1)
diplomat (2.2.5)
deep_merge (~> 1.0, >= 1.0.1)
faraday (~> 0.9)
domain_name (0.5.24)
domain_name (0.5.20190701)
unf (>= 0.0.5, < 1.0.0)
erubis (2.7.0)
faraday (0.15.4)
multipart-post (>= 1.2, < 3)
http-cookie (1.0.2)
http-accept (1.7.0)
http-cookie (1.0.3)
domain_name (~> 0.5)
i18n (1.1.0)
i18n (1.6.0)
concurrent-ruby (~> 1.0)
iso8601 (0.8.6)
iso8601 (0.12.1)
jmx4r (0.1.4)
liquid-ext (3.5.3)
activesupport
Expand All @@ -44,39 +48,42 @@ GEM
mixlib-cli
multi_json
terminal-table
little-plugger (1.1.3)
logging (1.8.2)
little-plugger (>= 1.1.3)
multi_json (>= 1.8.4)
method_source (0.9.2)
mime-types (2.5)
mime-types (3.3)
mime-types-data (~> 3.2015)
mime-types-data (3.2019.0904)
minitest (5.11.3)
mixlib-cli (1.5.0)
multi_json (1.11.0)
mixlib-cli (2.1.1)
multi_json (1.13.1)
multipart-post (2.1.1)
mysql2 (0.5.2)
netrc (0.10.3)
pg (1.1.3)
netrc (0.11.0)
pg (1.1.4)
pry (0.12.2)
coderay (~> 1.1.0)
method_source (~> 0.9.0)
rest-client (1.8.0)
public_suffix (4.0.1)
rest-client (2.1.0)
http-accept (>= 1.7.0, < 2.0)
http-cookie (>= 1.0.2, < 2.0)
mime-types (>= 1.16, < 3.0)
netrc (~> 0.7)
sequel (4.48.0)
terminal-table (1.4.5)
mime-types (>= 1.16, < 4.0)
netrc (~> 0.8)
sequel (5.24.0)
terminal-table (1.8.0)
unicode-display_width (~> 1.1, >= 1.1.1)
thread_safe (0.3.6)
tzinfo (1.2.5)
thread_safe (~> 0.1)
unf (0.1.4)
unf_ext
unf_ext (0.0.7.1)
webhdfs (0.6.0)
zk (1.9.5)
logging (~> 1.8.2)
unf_ext (0.0.7.6)
unicode-display_width (1.6.0)
webhdfs (0.8.0)
addressable
zeitwerk (2.1.10)
zk (1.9.6)
zookeeper (~> 1.4.0)
zookeeper (1.4.10)
zookeeper (1.4.11)

PLATFORMS
ruby
Expand All @@ -92,4 +99,4 @@ DEPENDENCIES
webhdfs

BUNDLED WITH
1.17.1
1.17.3
2 changes: 1 addition & 1 deletion database.json.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"adapter": "postgres"
"host": "localhist",
"host": "localhost",
"username": "druid",
"password": "diurd",
"database": "druid"
Expand Down
8 changes: 4 additions & 4 deletions dumbo.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

Gem::Specification.new do |spec|
spec.name = "dumbo"
spec.version = "0.1.0"
spec.authors = ["remerge GmbH"]
spec.email = ["tech@remerge.io"]
spec.version = "0.2.0"
spec.authors = ["LiquidM GmbH"]
spec.email = ["tech@liquidm.com"]
spec.summary = %q{}
spec.description = %q{}
spec.homepage = "https://github.com/remerge/dumbo"
spec.homepage = "https://github.com/liquidm/druid-dumbo"
spec.license = "MIT"

spec.files = `git ls-files -z`.split("\x0")
Expand Down
59 changes: 23 additions & 36 deletions lib/dumbo/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,6 @@ def run
validate_events(topic)
end
run_tasks
when "merge"
$log.info("merging segments")
@segments = Dumbo::Segment.all(@db, @druid, @sources)
@topics.each do |topic|
merge_segments(topic)
end
run_tasks
when "compact"
$log.info("compacting segments")
@segments = Dumbo::Segment.all(@db, @druid, @sources)
Expand Down Expand Up @@ -180,7 +173,6 @@ def get_overlapping_segments_and_interval(source, check_interval, available_segm
newest = check_interval.last

check_range = (oldest...newest)
$log.info("checking overlapping intervals for", dataSource: dataSource, interval: check_range)

segments = available_segments.select do |segment|
if segment.source == dataSource && (segment.interval[0]...segment.interval[-1]).overlaps?(check_range)
Expand Down Expand Up @@ -209,41 +201,36 @@ def get_segment_granularity(source)
1.hour
when 'day'
1.day
when 'week'
7.days
when 'month'
1.month
when 'year'
1.year
else
raise "Unsupported segmentGranularity #{source['output']['segmentGranularity']}"
end
end

def merge_segments(topic)
def compact_segments(topic)
source = @sources[topic]
$log.info("compacting scan", topic: topic, interval: @interval)
merging = get_overlapping_segments_and_interval(source, @interval)

segment_size = get_segment_granularity(source)
merging[:interval][0].to_i.step(merging[:interval][-1].to_i - 1, segment_size) do |start_time|
segment_interval = [Time.at(start_time).utc, Time.at(start_time + segment_size).utc]
segment_input = get_overlapping_segments_and_interval(source, segment_interval, merging[:segments])
maxShards = (source['output'] && source['output']['maxShards']) || 10
if maxShards > 0 && maxShards < segment_input[:segments].length
$log.info("detected too many shards,", is: segment_input[:segments].length, max: maxShards)
@tasks << Task::CompactSegments.new(source, segment_interval)
end
end
end
floor_date = segment_size

def compact_segments(topic)
source = @sources[topic]
compact_interval = [@interval[0].utc, @interval[-1].floor(1.day).utc]
compact_interval[0] -= 1.day if compact_interval[0] == compact_interval[-1]
$log.info("request compaction", topic: topic, interval: @interval, granularity: (source['output']['segmentGranularity'] || 'hour').downcase)
compact_interval = [@interval[0].floor(floor_date).utc, @interval[-1].floor(floor_date).utc]
$log.info("compacting scan", topic: topic, interval: compact_interval)
compacting = get_overlapping_segments_and_interval(source, compact_interval)
segment_size = get_segment_granularity(source)

expectedMetrics = Set.new(source['metrics'].keys).add("events")
expectedDimensions = Set.new(source['dimensions'])

compacting[:interval][0].to_i.step(compacting[:interval][-1].to_i - 1, segment_size) do |start_time|
segment_interval = [Time.at(start_time).utc, Time.at(start_time + segment_size).utc]
segment_input = get_overlapping_segments_and_interval(source, segment_interval, compacting[:segments])
segment_interval = [compact_interval[0], compact_interval[0]]
while segment_interval[-1] < compact_interval[-1] do
segment_interval = [segment_interval[-1].utc, (segment_interval[-1] + 1).ceil(floor_date).utc]
$log.info("scanning segment", topic: topic, interval: segment_interval)

segment_input = get_overlapping_segments_and_interval(source, segment_interval)

must_compact = segment_input[:segments].any? do |input_segment|
should_compact = false
Expand Down Expand Up @@ -276,9 +263,8 @@ def compact_segments(topic)
should_compact
end

maxShards = (source['output'] && source['output']['maxShards']) || 0
if maxShards > 0 && maxShards < segment_input[:segments].length
$log.info("detected too many shards,", is: segment_input[:segments].length, max: maxShards)
if segment_input[:segments].size > 0 && source['output']['numShards'] && segment_input[:segments].size < source['output']['numShards']
$log.info("detected too few segments,", is: segment_input[:segments].size, expected: source['output']['numShards'])
must_compact = true
end

Expand All @@ -296,8 +282,6 @@ def granularity(g)
30.minutes
when 'hour'
1.hour
when 'day'
1.day
else
raise "Unsupported granularity #{g}"
end
Expand Down Expand Up @@ -330,7 +314,10 @@ def log_tasks_number service_name, number

data << new_entry unless updated

Diplomat::Kv.put("druid-dumbo/#{service_name}", MultiJson.dump(data))
begin
Diplomat::Kv.put("druid-dumbo/#{service_name}", MultiJson.dump(data))
rescue
end
end
end
end
40 changes: 17 additions & 23 deletions lib/dumbo/task/compact_segments.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def as_json(options = {})
metricsSpec: (@source['metrics'] || {}).map do |name, aggregator|
{ type: aggregator, name: name, fieldName: name }
# WARNING: do NOT use count for events, will count in segment vs count in raw input
end + [{ type: "doubleSum", name: "events", fieldName: "events" }],
end + [{ type: "longSum", name: "events", fieldName: "events" }],
granularitySpec: {
segmentGranularity: @source['output']['segmentGranularity'] || "hour",
queryGranularity: @source['output']['queryGranularity'] || "minute",
Expand All @@ -58,8 +58,11 @@ def as_json(options = {})
buildV9Directly: true,
useCombiner: true,
forceExtendableShardSpecs: true,
maxRowsInMemory: 10000000,
numBackgroundPersistThreads: 1,
partitionsSpec: {
type: "none",
type: "hashed",
numShards: @source['output']['numShards'] || 3
},
indexSpec: {
bitmap: {
Expand All @@ -70,30 +73,21 @@ def as_json(options = {})
},
},
}
if @source['output']['partitionDimension']
config[:spec][:tuningConfig][:partitionsSpec] = {
type: "dimension",
partitionDimension: @source['output']['partitionDimension'],
targetPartitionSize: (@source['output']['targetPartitionSize'] || 1000000),
}
elsif (@source['output']['targetPartitionSize'] || 0) > 0
config[:spec][:tuningConfig][:partitionsSpec] = {
type: "hashed",
targetPartitionSize: @source['output']['targetPartitionSize'],
numShards: -1,
}
elsif (@source['output']['numShards'] || 0) > 0
config[:spec][:tuningConfig][:partitionsSpec] = {
type: "hashed",
targetPartitionSize: -1,
numShards: @source['output']['numShards'],
}
end

if (@source['output']['maxPartitionSize'])
config[:spec][:tuningConfig][:partitionsSpec][:maxPartitionSize] = @source['output']['maxPartitionSize']
if @source['output']['targetPartitionSize']
config[:spec][:tuningConfig][:partitionsSpec][:targetPartitionSize] = @source['output']['targetPartitionSize']
config[:spec][:tuningConfig][:partitionsSpec].delete(:numShards)
end

# if @source['output']['partitionDimension']
# config[:spec][:tuningConfig][:partitionsSpec] = {
# type: "dimension",
# partitionDimension: @source['output']['partitionDimension']
# targetPartitionSize: (@source['output']['targetPartitionSize'] || 419430400)
# }
# end


config
end
end
Expand Down
10 changes: 4 additions & 6 deletions lib/dumbo/task/reintake.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ def as_json(options = {})
ignoreInvalidRows: true,
buildV9Directly: true,
forceExtendableShardSpecs: true,
maxRowsInMemory: 10000000,
numBackgroundPersistThreads: 1,
useCombiner: true,
partitionsSpec: {
type: "none",
type: "hashed",
numShards: @source['output']['numShards'] || 3,
},
indexSpec: {
bitmap: {
Expand All @@ -63,11 +66,6 @@ def as_json(options = {})
},
},
}
config[:spec][:tuningConfig][:partitionsSpec] = {
type: "hashed",
targetPartitionSize: -1,
numShards: 10,
}
config
end
end
Expand Down
20 changes: 18 additions & 2 deletions lib/dumbo/time_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,26 @@

class Time
def floor(granularity = 1.day)
Time.at((self.to_f / granularity).floor * granularity)
if granularity == 7.days
self.at_beginning_of_week
elsif granularity == 1.month
self.at_beginning_of_month
elsif granularity == 1.year
self.at_beginning_of_year
else
Time.at((self.to_f / granularity).floor * granularity)
end
end

def ceil(granularity = 1.day)
Time.at((self.to_f / granularity).ceil * granularity)
if granularity == 7.days
self.at_end_of_week.ceil
elsif granularity == 1.month
self.at_end_of_month.ceil
elsif granularity == 1.year
self.at_end_of_year.ceil
else
Time.at((self.to_f / granularity).ceil * granularity)
end
end
end
Loading