-
Notifications
You must be signed in to change notification settings - Fork 22
/
deimos.rb
129 lines (111 loc) · 3.9 KB
/
deimos.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# frozen_string_literal: true
require 'active_support'
require 'phobos'
require 'deimos/version'
require 'deimos/config/configuration'
require 'deimos/producer'
require 'deimos/active_record_producer'
require 'deimos/active_record_consumer'
require 'deimos/consumer'
require 'deimos/batch_consumer'
require 'deimos/instrumentation'
require 'deimos/utils/lag_reporter'
require 'deimos/backends/base'
require 'deimos/backends/kafka'
require 'deimos/backends/kafka_async'
require 'deimos/backends/test'
require 'deimos/schema_backends/base'
require 'deimos/utils/schema_class'
require 'deimos/schema_class/enum'
require 'deimos/schema_class/record'
require 'deimos/monkey_patches/phobos_cli'
require 'deimos/railtie' if defined?(Rails)
require 'deimos/utils/schema_controller_mixin' if defined?(ActionController)
if defined?(ActiveRecord)
require 'deimos/kafka_source'
require 'deimos/kafka_topic_info'
require 'deimos/backends/db'
require 'sigurd'
require 'deimos/utils/db_producer'
require 'deimos/utils/db_poller'
end
require 'deimos/utils/inline_consumer'
require 'yaml'
require 'erb'
# Parent module.
module Deimos
class << self
# @return [Class<Deimos::SchemaBackends::Base>]
def schema_backend_class
backend = Deimos.config.schema.backend.to_s
require "deimos/schema_backends/#{backend}"
"Deimos::SchemaBackends::#{backend.classify}".constantize
end
# @param schema [String, Symbol]
# @param namespace [String]
# @return [Deimos::SchemaBackends::Base]
def schema_backend(schema:, namespace:)
if Utils::SchemaClass.use?(config.to_h)
# Initialize an instance of the provided schema
# in the event the schema class is an override, the inherited
# schema and namespace will be applied
schema_class = Utils::SchemaClass.klass(schema, namespace)
if schema_class.nil?
schema_backend_class.new(schema: schema, namespace: namespace)
else
schema_instance = schema_class.new
schema_backend_class.new(schema: schema_instance.schema, namespace: schema_instance.namespace)
end
else
schema_backend_class.new(schema: schema, namespace: namespace)
end
end
# @param schema [String]
# @param namespace [String]
# @param payload [Hash]
# @param subject [String]
# @return [String]
def encode(schema:, namespace:, payload:, subject: nil)
self.schema_backend(schema: schema, namespace: namespace).
encode(payload, topic: subject || "#{namespace}.#{schema}" )
end
# @param schema [String]
# @param namespace [String]
# @param payload [String]
# @return [Hash,nil]
def decode(schema:, namespace:, payload:)
self.schema_backend(schema: schema, namespace: namespace).decode(payload)
end
# Start the DB producers to send Kafka messages.
# @param thread_count [Integer] the number of threads to start.
# @return [void]
def start_db_backend!(thread_count: 1)
Sigurd.exit_on_signal = true
if self.config.producers.backend != :db
raise('Publish backend is not set to :db, exiting')
end
if thread_count.nil? || thread_count.zero?
raise('Thread count is not given or set to zero, exiting')
end
producers = (1..thread_count).map do
Deimos::Utils::DbProducer.
new(self.config.db_producer.logger || self.config.logger)
end
executor = Sigurd::Executor.new(producers,
sleep_seconds: 5,
logger: self.config.logger)
signal_handler = Sigurd::SignalHandler.new(executor)
signal_handler.run!
end
end
end
at_exit do
begin
Deimos::Backends::KafkaAsync.shutdown_producer
Deimos::Backends::Kafka.shutdown_producer
rescue StandardError => e
Deimos.config.logger.error(
"Error closing producer on shutdown: #{e.message} #{e.backtrace.join("\n")}"
)
end
end