diff --git a/spec/etcd_spec.cr b/spec/etcd_spec.cr index 57d5c4f4c..5dbc81603 100644 --- a/spec/etcd_spec.cr +++ b/spec/etcd_spec.cr @@ -14,6 +14,30 @@ describe LavinMQ::Etcd do end end + it "can get all keys with a prefix" do + cluster = EtcdCluster.new(1) + cluster.run do + etcd = LavinMQ::Etcd.new(cluster.endpoints) + etcd.put("foo/a", "bar") + etcd.put("foo/b", "bar") + etcd.put("fou/c", "bar") + etcd.get_prefix("foo").should eq Hash{"foo/a" => "bar", "foo/b" => "bar"} + end + end + + it "can get delete all keys with a prefix" do + cluster = EtcdCluster.new(1) + cluster.run do + etcd = LavinMQ::Etcd.new(cluster.endpoints) + etcd.put("foo/a", "bar") + etcd.put("foo/b", "bar") + etcd.put("fou/c", "bar") + etcd.del_prefix("foo").should eq 2 + etcd.get_prefix("foo").empty?.should be_true + etcd.get("foo/a").should be_nil + end + end + it "can watch" do cluster = EtcdCluster.new(1) cluster.run do diff --git a/src/lavinmq/clustering/replicator.cr b/src/lavinmq/clustering/replicator.cr index 24e7e85c7..51308e0e3 100644 --- a/src/lavinmq/clustering/replicator.cr +++ b/src/lavinmq/clustering/replicator.cr @@ -14,6 +14,7 @@ module LavinMQ abstract def listen(server : TCPServer) abstract def clear abstract def password : String + abstract def etcd : Etcd? end end end diff --git a/src/lavinmq/clustering/server.cr b/src/lavinmq/clustering/server.cr index 1719b3698..84a9474d4 100644 --- a/src/lavinmq/clustering/server.cr +++ b/src/lavinmq/clustering/server.cr @@ -41,6 +41,10 @@ module LavinMQ @password = password end + def etcd : Etcd + @etcd + end + def clear @files.clear end @@ -239,6 +243,10 @@ module LavinMQ def password : String "" end + + def etcd : Etcd? + nil + end end end end diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index 26133ef58..d156e8225 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -19,6 +19,27 @@ module LavinMQ end end + # Get all keys with a prefix + def get_prefix(prefix) : Hash(String, String) + range_end = prefix.to_slice.dup + len = range_end.bytesize + (len - 1).downto(0) do |i| + (range_end[i] &+= 1).zero? || break # continue if wrapped around + len = i + end + range_end = len.zero? ? Bytes[0] : range_end[0, len] # drop ending null values + json = post("/v3/kv/range", %({"key":"#{Base64.strict_encode prefix}","range_end":"#{Base64.strict_encode range_end}","limit":0,"serializable":true})) + result = Hash(String, String).new + if kvs = json["kvs"]? + kvs.as_a.each do |kv| + key = Base64.decode_string kv["key"].as_s + value = Base64.decode_string kv["value"].as_s + result[key] = value + end + end + result + end + def put(key, value) : String? body = %({"key":"#{Base64.strict_encode key}","value":"#{Base64.strict_encode value}","prev_kv":true}) json = post("/v3/kv/put", body) @@ -27,11 +48,25 @@ module LavinMQ end end + # Delete a key def del(key) : Int32 json = post("/v3/kv/deleterange", %({"key":"#{Base64.strict_encode key}"})) json.dig?("deleted").try(&.as_s.to_i) || 0 end + # Delete all keys with a prefix + def del_prefix(prefix) : Int32 + range_end = prefix.to_slice.dup + len = range_end.bytesize + (len - 1).downto(0) do |i| + (range_end[i] &+= 1).zero? || break # continue if wrapped around + len = i + end + range_end = len.zero? ? Bytes[0] : range_end[0, len] # drop ending null values + json = post("/v3/kv/deleterange", %({"key":"#{Base64.strict_encode prefix}","range_end":"#{Base64.strict_encode range_end}"})) + json["deleted"].as_s.to_i + end + def watch(key, &) body = %({"create_request":{"key":"#{Base64.strict_encode key}"}}) post_stream("/v3/watch", body) do |json| @@ -221,10 +256,6 @@ module LavinMQ return yield({socket, address}) rescue ex : NoLeader raise ex # don't retry when leader is missing - rescue ex : Error - Log.warn { "Service Unavailable at #{address}, #{ex.message}, retrying" } - socket.close rescue nil - sleep 0.1.seconds rescue IO::Error Log.warn { "Lost connection to #{address}, retrying" } socket.close rescue nil diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index d48ce0ad7..b15b5a9b9 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -449,10 +449,12 @@ module LavinMQ io = @definitions_file if io.size.zero? load_default_definitions - compact! return end - + if etcd = @replicator.etcd + load_definitions_from_etcd(etcd) + return + end @log.info { "Loading definitions" } @definitions_lock.synchronize do @log.debug { "Verifying schema" } @@ -522,12 +524,49 @@ module LavinMQ private def load_default_definitions @log.info { "Loading default definitions" } - @exchanges[""] = DefaultExchange.new(self, "", true, false, false) - @exchanges["amq.direct"] = DirectExchange.new(self, "amq.direct", true, false, false) - @exchanges["amq.fanout"] = FanoutExchange.new(self, "amq.fanout", true, false, false) - @exchanges["amq.topic"] = TopicExchange.new(self, "amq.topic", true, false, false) - @exchanges["amq.headers"] = HeadersExchange.new(self, "amq.headers", true, false, false) - @exchanges["amq.match"] = HeadersExchange.new(self, "amq.match", true, false, false) + declare_exchange("", "direct", true, false) + declare_exchange("amq.direct", "direct", true, false) + declare_exchange("amq.fanout", "fanout", true, false) + declare_exchange("amq.topic", "topic", true, false) + declare_exchange("amq.headers", "headers", true, false) + declare_exchange("amq.match", "headers", true, false) + end + + private def load_definitions_from_etcd(etcd : Etcd) + etcd.get_prefix(etcd_path("queues")).each do |key, value| + queue_name = "" + key.split('/') { |s| queue_name = URI.decode_www_form(s) } # get last split value without allocation + json = JSON.parse(value) + @queues[queue_name] = QueueFactory.make(self, json) + end + + etcd.get_prefix(etcd_path("exchanges")).each do |key, value| + exchange_name = "" + key.split('/') { |s| exchange_name = URI.decode_www_form(s) } # get last split value without allocation + json = JSON.parse(value) + @exchanges[exchange_name] = + make_exchange(self, exchange_name, json["type"].as_s, true, json["auto_delete"].as_bool, json["internal"].as_bool, json["arguments"].as_h) + end + + etcd.get_prefix(etcd_path("queue-bindings")).each do |key, value| + _, _, _, queue_name, exchange_name, routing_key, _ = split_etcd_path(key) + json = JSON.parse(value) + x = @exchanges[exchange_name] + q = @queues[queue_name] + x.bind(q, routing_key, json["arguments"].to_h) + end + + etcd.get_prefix(etcd_path("exchange-bindings")).each do |key, value| + _, _, _, destination, source, routing_key, _ = split_etcd_path(key) + json = JSON.parse(value) + src = @exchanges[source] + dst = @queues[destination] + src.bind(dst, routing_key, json["arguments"].to_h) + end + end + + private def split_etcd_path(path) : Array(String) + path.split('/').map! { |p| URI.decode_www_form p } end private def compact! @@ -575,7 +614,11 @@ module LavinMQ bytes = frame.to_slice @definitions_file.write bytes @replicator.append @definitions_file_path, bytes - @definitions_file.fsync + if etcd = @replicator.etcd + store_definition_in_etcd(frame, etcd) + else + @definitions_file.fsync + end if dirty if (@definitions_deletes += 1) >= Config.instance.max_deleted_definitions compact! @@ -584,6 +627,57 @@ module LavinMQ end end + private def store_definition_in_etcd(f, etcd) + case f + when AMQP::Frame::Exchange::Declare + etcd.put(etcd_path("exchanges", f.exchange_name), { + type: f.exchange_type, + auto_delete: f.auto_delete, + internal: f.internal, + arguments: f.arguments, + }.to_json) + when AMQP::Frame::Exchange::Delete + etcd.del(etcd_path("exchanges", f.exchange_name)) + etcd.del_prefix(etcd_path("exchange-bindings", f.exchange_name)) + when AMQP::Frame::Exchange::Bind + args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result + etcd.put(etcd_path("exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash), + {arguments: f.arguments}.to_json) + when AMQP::Frame::Exchange::Unbind + args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result + etcd.del(etcd_path("exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash)) + when AMQP::Frame::Queue::Declare + etcd.put(etcd_path("queues", f.queue_name), { + arguments: f.arguments, + }.to_json) + when AMQP::Frame::Queue::Delete + etcd.del(etcd_path("queues", f.queue_name)) + etcd.del_prefix(etcd_path("queue-bindings", f.queue_name)) + when AMQP::Frame::Queue::Bind + args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result + etcd.put(etcd_path("queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash), + { + arguments: f.arguments, + }.to_json) + when AMQP::Frame::Queue::Unbind + args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result + etcd.del(etcd_path("queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash)) + else raise "Cannot apply frame #{f.class} in vhost #{@name}" + end + end + + private def etcd_path(*args) + String.build do |str| + str << Config.instance.clustering_etcd_prefix + str << "/vhosts/" + URI.encode_www_form @name, str + args.each do |a| + str << "/" + URI.encode_www_form a.to_s, str + end + end + end + private def make_exchange(vhost, name, type, durable, auto_delete, internal, arguments) case type when "direct"