From 089ece7865636f45af0c80d44c06b16352d563c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sun, 28 Jul 2024 16:14:02 +0200 Subject: [PATCH 1/9] store definitions in etcd --- src/lavinmq/etcd.cr | 14 ++++++++++++++ src/lavinmq/vhost.cr | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index 26133ef58..a81bdd1fb 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -19,6 +19,20 @@ module LavinMQ end end + def get_prefix(key) : Array(String) + range_end = key.to_slice.dup + len = range_end.bytesize + (len - 1).downto(0) do |i| + (range_end[i] &+= 1).zero? || break # continue if wrapped around + len = i + end + range_end = len.zero? ? Bytes[0] : range_end[0, len] # drop ending null values + json = post("/v3/kv/range", %({"key":"#{Base64.strict_encode key}","range_end":"#{Base64.strict_encode range_end}","limit":0,"serializable":true})) + json["kvs"].as_a.map do |kv| + Base64.decode_string kv["value"].as_s + end + end + def put(key, value) : String? body = %({"key":"#{Base64.strict_encode key}","value":"#{Base64.strict_encode value}","prev_kv":true}) json = post("/v3/kv/put", body) diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index d48ce0ad7..cd7656c5f 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -584,6 +584,50 @@ module LavinMQ end end + private def store_definition_in_etcd(frame) + case f + when AMQP::Frame::Exchange::Declare + @etcd.put(join_path("lavinmq", @name, "exchanges", f.exchange_name), { + type: f.exchange_type, + auto_delete: f.auto_delete, + internal: f.internal, + arguments: f.arguments, + }.to_json) + when AMQP::Frame::Exchange::Delete + @etcd.del(join_path("lavinmq", @name, "exchanges", f.exchange_name)) + when AMQP::Frame::Exchange::Bind + @etcd.put(join_path("lavinmq", @name, "exchange", f.destination, "bindings", f.source, f.routing_key, f.arguments.hash), + { + arguments: f.arguments, + }.to_json) + when AMQP::Frame::Exchange::Unbind + @etcd.del(join_path("lavinmq", @name, "exchange", f.destination, "bindings", f.source, f.routing_key, f.arguments.hash)) + when AMQP::Frame::Queue::Declare + @etcd.put(join_path("lavinmq", @name, "queues", f.queue_name), { + arguments: f.arguments, + }.to_json) + when AMQP::Frame::Queue::Delete + @etcd.del(join_path("lavinmq", @name, "queues", f.queue_name)) + when AMQP::Frame::Queue::Bind + @etcd.put(join_path("lavinmq", @name, "queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, f.arguments.hash), + { + arguments: f.arguments, + }.to_json) + when AMQP::Frame::Queue::Unbind + @etcd.put(join_path("lavinmq", @name, "queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, f.arguments.hash)) + else raise "Cannot apply frame #{f.class} in vhost #{@name}" + end + end + + private def join_path(*args) + String.build do |str| + args.each_with_index do |a, i| + str << "/" unless i.zero? + URI.encode_www_form a.to_s, str + end + end + end + private def make_exchange(vhost, name, type, durable, auto_delete, internal, arguments) case type when "direct" From 2a9e568e8470a1e085b530ff4ea515f3ee3c8aa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Mon, 29 Jul 2024 01:54:21 +0200 Subject: [PATCH 2/9] Get/Del etcd keys with prefix --- spec/etcd_spec.cr | 24 ++++++++++++++++++++++++ src/lavinmq/etcd.cr | 29 ++++++++++++++++++++++++----- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/spec/etcd_spec.cr b/spec/etcd_spec.cr index 57d5c4f4c..8327d4b11 100644 --- a/spec/etcd_spec.cr +++ b/spec/etcd_spec.cr @@ -14,6 +14,30 @@ describe LavinMQ::Etcd do end end + it "can get all keys with a prefix" do + cluster = EtcdCluster.new(1) + cluster.run do + etcd = LavinMQ::Etcd.new(cluster.endpoints) + etcd.put("foo/a", "bar") + etcd.put("foo/b", "bar") + etcd.put("fou/c", "bar") + etcd.get_prefix("foo").should eq %w(bar bar) + end + end + + it "can get delete all keys with a prefix" do + cluster = EtcdCluster.new(1) + cluster.run do + etcd = LavinMQ::Etcd.new(cluster.endpoints) + etcd.put("foo/a", "bar") + etcd.put("foo/b", "bar") + etcd.put("fou/c", "bar") + etcd.del_prefix("foo").should eq 2 + etcd.get_prefix("foo").should eq %w() + etcd.get("foo/a").should be_nil + end + end + it "can watch" do cluster = EtcdCluster.new(1) cluster.run do diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index a81bdd1fb..6ef2dfd38 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -19,17 +19,22 @@ module LavinMQ end end - def get_prefix(key) : Array(String) - range_end = key.to_slice.dup + # Get all keys with a prefix + def get_prefix(prefix) : Array(String) + range_end = prefix.to_slice.dup len = range_end.bytesize (len - 1).downto(0) do |i| (range_end[i] &+= 1).zero? || break # continue if wrapped around len = i end range_end = len.zero? ? Bytes[0] : range_end[0, len] # drop ending null values - json = post("/v3/kv/range", %({"key":"#{Base64.strict_encode key}","range_end":"#{Base64.strict_encode range_end}","limit":0,"serializable":true})) - json["kvs"].as_a.map do |kv| - Base64.decode_string kv["value"].as_s + json = post("/v3/kv/range", %({"key":"#{Base64.strict_encode prefix}","range_end":"#{Base64.strict_encode range_end}","limit":0,"serializable":true})) + if kvs = json["kvs"]? + kvs.as_a.map do |kv| + Base64.decode_string kv["value"].as_s + end + else + Array(String).new(0) end end @@ -41,11 +46,25 @@ module LavinMQ end end + # Delete a key def del(key) : Int32 json = post("/v3/kv/deleterange", %({"key":"#{Base64.strict_encode key}"})) json.dig?("deleted").try(&.as_s.to_i) || 0 end + # Delete all keys with a prefix + def del_prefix(prefix) : Int32 + range_end = prefix.to_slice.dup + len = range_end.bytesize + (len - 1).downto(0) do |i| + (range_end[i] &+= 1).zero? || break # continue if wrapped around + len = i + end + range_end = len.zero? ? Bytes[0] : range_end[0, len] # drop ending null values + json = post("/v3/kv/deleterange", %({"key":"#{Base64.strict_encode prefix}","range_end":"#{Base64.strict_encode range_end}"})) + json["deleted"].as_s.to_i + end + def watch(key, &) body = %({"create_request":{"key":"#{Base64.strict_encode key}"}}) post_stream("/v3/watch", body) do |json| From 3c7793146d7c2a18b4c6295b3d36c1c0cfec0e30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Mon, 29 Jul 2024 01:54:37 +0200 Subject: [PATCH 3/9] don't retry on generic etcd error messages --- src/lavinmq/etcd.cr | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index 6ef2dfd38..1f09d59ef 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -254,10 +254,6 @@ module LavinMQ return yield({socket, address}) rescue ex : NoLeader raise ex # don't retry when leader is missing - rescue ex : Error - Log.warn { "Service Unavailable at #{address}, #{ex.message}, retrying" } - socket.close rescue nil - sleep 0.1.seconds rescue IO::Error Log.warn { "Lost connection to #{address}, retrying" } socket.close rescue nil From ab76726c9bd17a280657add4330c8c54259c52c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Mon, 29 Jul 2024 01:54:59 +0200 Subject: [PATCH 4/9] Use stable hash for binding arguments --- src/lavinmq/vhost.cr | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index cd7656c5f..4a283ba6e 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -596,12 +596,14 @@ module LavinMQ when AMQP::Frame::Exchange::Delete @etcd.del(join_path("lavinmq", @name, "exchanges", f.exchange_name)) when AMQP::Frame::Exchange::Bind - @etcd.put(join_path("lavinmq", @name, "exchange", f.destination, "bindings", f.source, f.routing_key, f.arguments.hash), + args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result + @etcd.put(join_path("lavinmq", @name, "exchange", f.destination, "bindings", f.source, f.routing_key, args_hash), { arguments: f.arguments, }.to_json) when AMQP::Frame::Exchange::Unbind - @etcd.del(join_path("lavinmq", @name, "exchange", f.destination, "bindings", f.source, f.routing_key, f.arguments.hash)) + args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result + @etcd.del(join_path("lavinmq", @name, "exchange", f.destination, "bindings", f.source, f.routing_key, args_hash)) when AMQP::Frame::Queue::Declare @etcd.put(join_path("lavinmq", @name, "queues", f.queue_name), { arguments: f.arguments, @@ -609,12 +611,14 @@ module LavinMQ when AMQP::Frame::Queue::Delete @etcd.del(join_path("lavinmq", @name, "queues", f.queue_name)) when AMQP::Frame::Queue::Bind - @etcd.put(join_path("lavinmq", @name, "queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, f.arguments.hash), + args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result + @etcd.put(join_path("lavinmq", @name, "queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash), { arguments: f.arguments, }.to_json) when AMQP::Frame::Queue::Unbind - @etcd.put(join_path("lavinmq", @name, "queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, f.arguments.hash)) + args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result + @etcd.del(join_path("lavinmq", @name, "queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash)) else raise "Cannot apply frame #{f.class} in vhost #{@name}" end end From f0e447f1818d271933aa1274676d38c9b1500447 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Tue, 30 Jul 2024 14:46:22 +0200 Subject: [PATCH 5/9] Etcd#get_prefix returns a hash --- spec/etcd_spec.cr | 4 ++-- src/lavinmq/etcd.cr | 12 +++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/spec/etcd_spec.cr b/spec/etcd_spec.cr index 8327d4b11..5dbc81603 100644 --- a/spec/etcd_spec.cr +++ b/spec/etcd_spec.cr @@ -21,7 +21,7 @@ describe LavinMQ::Etcd do etcd.put("foo/a", "bar") etcd.put("foo/b", "bar") etcd.put("fou/c", "bar") - etcd.get_prefix("foo").should eq %w(bar bar) + etcd.get_prefix("foo").should eq Hash{"foo/a" => "bar", "foo/b" => "bar"} end end @@ -33,7 +33,7 @@ describe LavinMQ::Etcd do etcd.put("foo/b", "bar") etcd.put("fou/c", "bar") etcd.del_prefix("foo").should eq 2 - etcd.get_prefix("foo").should eq %w() + etcd.get_prefix("foo").empty?.should be_true etcd.get("foo/a").should be_nil end end diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index 1f09d59ef..d156e8225 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -20,7 +20,7 @@ module LavinMQ end # Get all keys with a prefix - def get_prefix(prefix) : Array(String) + def get_prefix(prefix) : Hash(String, String) range_end = prefix.to_slice.dup len = range_end.bytesize (len - 1).downto(0) do |i| @@ -29,13 +29,15 @@ module LavinMQ end range_end = len.zero? ? Bytes[0] : range_end[0, len] # drop ending null values json = post("/v3/kv/range", %({"key":"#{Base64.strict_encode prefix}","range_end":"#{Base64.strict_encode range_end}","limit":0,"serializable":true})) + result = Hash(String, String).new if kvs = json["kvs"]? - kvs.as_a.map do |kv| - Base64.decode_string kv["value"].as_s + kvs.as_a.each do |kv| + key = Base64.decode_string kv["key"].as_s + value = Base64.decode_string kv["value"].as_s + result[key] = value end - else - Array(String).new(0) end + result end def put(key, value) : String? From 6037bacce1a4920916df779d124a78ac12566c1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Tue, 30 Jul 2024 14:46:41 +0200 Subject: [PATCH 6/9] load definitions from etcd --- src/lavinmq/vhost.cr | 47 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index 4a283ba6e..00cb7c32b 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -530,6 +530,43 @@ module LavinMQ @exchanges["amq.match"] = HeadersExchange.new(self, "amq.match", true, false, false) end + private def load_definitions_from_etcd + @etcd.get_prefix(join_path("lavinmq", @name, "queues")).each do |key, value| + queue_name = "" + key.split('/') { |s| queue_name = URI.decode_www_form(s) } # get last split value without allocation + json = JSON.parse(value) + @queues[queue_name] = QueueFactory.make(self, json) + end + + @etcd.get_prefix(join_path("lavinmq", @name, "exchanges")).each do |key, value| + exchange_name = "" + key.split('/') { |s| exchange_name = URI.decode_www_form(s) } # get last split value without allocation + json = JSON.parse(value) + @exchanges[exchange_name] = + make_exchange(self, exchange_name, json["type"].as_s, true, json["auto_delete"].as_bool, json["internal"].as_bool, json["arguments"].as_h) + end + + @etcd.get_prefix(join_path("lavinmq", @name, "queue-bindings")).each do |key, value| + _, _, _, queue_name, exchange_name, routing_key, _ = split_etcd_path(key) + json = JSON.parse(value) + x = @exchanges[exchange_name] + q = @queues[queue_name] + x.bind(q, routing_key, json["arguments"].to_h) + end + + @etcd.get_prefix(join_path("lavinmq", @name, "exchange-bindings")).each do |key, value| + _, _, _, destination, source, routing_key, _ = split_etcd_path(key) + json = JSON.parse(value) + src = @exchanges[source] + dst = @queues[destination] + src.bind(dst, routing_key, json["arguments"].to_h) + end + end + + private def split_etcd_path(path) : Array(String) + path.split('/').map! { |p| URI.decode_www_form p } + end + private def compact! @definitions_lock.synchronize do @log.info { "Compacting definitions" } @@ -595,21 +632,21 @@ module LavinMQ }.to_json) when AMQP::Frame::Exchange::Delete @etcd.del(join_path("lavinmq", @name, "exchanges", f.exchange_name)) + @etcd.del_prefix(join_path("lavinmq", @name, "exchange-bindings", f.exchange_name)) when AMQP::Frame::Exchange::Bind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result - @etcd.put(join_path("lavinmq", @name, "exchange", f.destination, "bindings", f.source, f.routing_key, args_hash), - { - arguments: f.arguments, - }.to_json) + @etcd.put(join_path("lavinmq", @name, "exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash), + {arguments: f.arguments}.to_json) when AMQP::Frame::Exchange::Unbind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result - @etcd.del(join_path("lavinmq", @name, "exchange", f.destination, "bindings", f.source, f.routing_key, args_hash)) + @etcd.del(join_path("lavinmq", @name, "exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash)) when AMQP::Frame::Queue::Declare @etcd.put(join_path("lavinmq", @name, "queues", f.queue_name), { arguments: f.arguments, }.to_json) when AMQP::Frame::Queue::Delete @etcd.del(join_path("lavinmq", @name, "queues", f.queue_name)) + @etcd.del_prefix(join_path("lavinmq", @name, "queue-bindings", f.queue_name)) when AMQP::Frame::Queue::Bind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result @etcd.put(join_path("lavinmq", @name, "queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash), From 843e4c22760b0bf9838e972b3d351c817150041f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Mon, 4 Nov 2024 10:14:22 +0100 Subject: [PATCH 7/9] improved etcd path constructor --- src/lavinmq/vhost.cr | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index 00cb7c32b..58b3f317e 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -531,14 +531,14 @@ module LavinMQ end private def load_definitions_from_etcd - @etcd.get_prefix(join_path("lavinmq", @name, "queues")).each do |key, value| + @etcd.get_prefix(etcd_path("queues")).each do |key, value| queue_name = "" key.split('/') { |s| queue_name = URI.decode_www_form(s) } # get last split value without allocation json = JSON.parse(value) @queues[queue_name] = QueueFactory.make(self, json) end - @etcd.get_prefix(join_path("lavinmq", @name, "exchanges")).each do |key, value| + @etcd.get_prefix(etcd_path("exchanges")).each do |key, value| exchange_name = "" key.split('/') { |s| exchange_name = URI.decode_www_form(s) } # get last split value without allocation json = JSON.parse(value) @@ -546,7 +546,7 @@ module LavinMQ make_exchange(self, exchange_name, json["type"].as_s, true, json["auto_delete"].as_bool, json["internal"].as_bool, json["arguments"].as_h) end - @etcd.get_prefix(join_path("lavinmq", @name, "queue-bindings")).each do |key, value| + @etcd.get_prefix(etcd_path("queue-bindings")).each do |key, value| _, _, _, queue_name, exchange_name, routing_key, _ = split_etcd_path(key) json = JSON.parse(value) x = @exchanges[exchange_name] @@ -554,7 +554,7 @@ module LavinMQ x.bind(q, routing_key, json["arguments"].to_h) end - @etcd.get_prefix(join_path("lavinmq", @name, "exchange-bindings")).each do |key, value| + @etcd.get_prefix(etcd_path("exchange-bindings")).each do |key, value| _, _, _, destination, source, routing_key, _ = split_etcd_path(key) json = JSON.parse(value) src = @exchanges[source] @@ -624,46 +624,49 @@ module LavinMQ private def store_definition_in_etcd(frame) case f when AMQP::Frame::Exchange::Declare - @etcd.put(join_path("lavinmq", @name, "exchanges", f.exchange_name), { + @etcd.put(etcd_path("exchanges", f.exchange_name), { type: f.exchange_type, auto_delete: f.auto_delete, internal: f.internal, arguments: f.arguments, }.to_json) when AMQP::Frame::Exchange::Delete - @etcd.del(join_path("lavinmq", @name, "exchanges", f.exchange_name)) - @etcd.del_prefix(join_path("lavinmq", @name, "exchange-bindings", f.exchange_name)) + @etcd.del(etcd_path("exchanges", f.exchange_name)) + @etcd.del_prefix(etcd_path("exchange-bindings", f.exchange_name)) when AMQP::Frame::Exchange::Bind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result - @etcd.put(join_path("lavinmq", @name, "exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash), + @etcd.put(etcd_path("exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash), {arguments: f.arguments}.to_json) when AMQP::Frame::Exchange::Unbind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result - @etcd.del(join_path("lavinmq", @name, "exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash)) + @etcd.del(etcd_path("exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash)) when AMQP::Frame::Queue::Declare - @etcd.put(join_path("lavinmq", @name, "queues", f.queue_name), { + @etcd.put(etcd_path("queues", f.queue_name), { arguments: f.arguments, }.to_json) when AMQP::Frame::Queue::Delete - @etcd.del(join_path("lavinmq", @name, "queues", f.queue_name)) - @etcd.del_prefix(join_path("lavinmq", @name, "queue-bindings", f.queue_name)) + @etcd.del(etcd_path("queues", f.queue_name)) + @etcd.del_prefix(etcd_path("queue-bindings", f.queue_name)) when AMQP::Frame::Queue::Bind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result - @etcd.put(join_path("lavinmq", @name, "queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash), + @etcd.put(etcd_path("queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash), { arguments: f.arguments, }.to_json) when AMQP::Frame::Queue::Unbind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result - @etcd.del(join_path("lavinmq", @name, "queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash)) + @etcd.del(etcd_path("queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash)) else raise "Cannot apply frame #{f.class} in vhost #{@name}" end end - private def join_path(*args) + private def etcd_path(*args) String.build do |str| - args.each_with_index do |a, i| - str << "/" unless i.zero? + str << Config.instance.clustering_etcd_prefix + str << "/vhosts/" + URI.encode_www_form @name, str + args.each do |a, i| + str << "/" URI.encode_www_form a.to_s, str end end From b6a41271f684c0c07a394a7b61c36cb078d3de15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Tue, 12 Nov 2024 15:33:44 +0100 Subject: [PATCH 8/9] fixup! load definitions from etcd --- src/lavinmq/vhost.cr | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index 58b3f317e..187af529b 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -612,7 +612,11 @@ module LavinMQ bytes = frame.to_slice @definitions_file.write bytes @replicator.append @definitions_file_path, bytes - @definitions_file.fsync + if etcd = @replicator.as?(Clustering::Server).try &.@etcd # FIXME: hack + store_definition_in_etcd(frame, etcd) + else + @definitions_file.fsync + end if dirty if (@definitions_deletes += 1) >= Config.instance.max_deleted_definitions compact! @@ -621,41 +625,41 @@ module LavinMQ end end - private def store_definition_in_etcd(frame) + private def store_definition_in_etcd(f, etcd) case f when AMQP::Frame::Exchange::Declare - @etcd.put(etcd_path("exchanges", f.exchange_name), { + etcd.put(etcd_path("exchanges", f.exchange_name), { type: f.exchange_type, auto_delete: f.auto_delete, internal: f.internal, arguments: f.arguments, }.to_json) when AMQP::Frame::Exchange::Delete - @etcd.del(etcd_path("exchanges", f.exchange_name)) - @etcd.del_prefix(etcd_path("exchange-bindings", f.exchange_name)) + etcd.del(etcd_path("exchanges", f.exchange_name)) + etcd.del_prefix(etcd_path("exchange-bindings", f.exchange_name)) when AMQP::Frame::Exchange::Bind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result - @etcd.put(etcd_path("exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash), + etcd.put(etcd_path("exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash), {arguments: f.arguments}.to_json) when AMQP::Frame::Exchange::Unbind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result - @etcd.del(etcd_path("exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash)) + etcd.del(etcd_path("exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash)) when AMQP::Frame::Queue::Declare - @etcd.put(etcd_path("queues", f.queue_name), { + etcd.put(etcd_path("queues", f.queue_name), { arguments: f.arguments, }.to_json) when AMQP::Frame::Queue::Delete - @etcd.del(etcd_path("queues", f.queue_name)) - @etcd.del_prefix(etcd_path("queue-bindings", f.queue_name)) + etcd.del(etcd_path("queues", f.queue_name)) + etcd.del_prefix(etcd_path("queue-bindings", f.queue_name)) when AMQP::Frame::Queue::Bind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result - @etcd.put(etcd_path("queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash), + etcd.put(etcd_path("queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash), { arguments: f.arguments, }.to_json) when AMQP::Frame::Queue::Unbind args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result - @etcd.del(etcd_path("queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash)) + etcd.del(etcd_path("queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash)) else raise "Cannot apply frame #{f.class} in vhost #{@name}" end end @@ -665,7 +669,7 @@ module LavinMQ str << Config.instance.clustering_etcd_prefix str << "/vhosts/" URI.encode_www_form @name, str - args.each do |a, i| + args.each do |a| str << "/" URI.encode_www_form a.to_s, str end From 696a98e4b856b6114f5dce3ce7202b778a45c8d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Fri, 15 Nov 2024 02:29:28 +0100 Subject: [PATCH 9/9] fixup --- src/lavinmq/clustering/replicator.cr | 1 + src/lavinmq/clustering/server.cr | 8 ++++++++ src/lavinmq/vhost.cr | 30 +++++++++++++++------------- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/src/lavinmq/clustering/replicator.cr b/src/lavinmq/clustering/replicator.cr index 24e7e85c7..51308e0e3 100644 --- a/src/lavinmq/clustering/replicator.cr +++ b/src/lavinmq/clustering/replicator.cr @@ -14,6 +14,7 @@ module LavinMQ abstract def listen(server : TCPServer) abstract def clear abstract def password : String + abstract def etcd : Etcd? end end end diff --git a/src/lavinmq/clustering/server.cr b/src/lavinmq/clustering/server.cr index 1719b3698..84a9474d4 100644 --- a/src/lavinmq/clustering/server.cr +++ b/src/lavinmq/clustering/server.cr @@ -41,6 +41,10 @@ module LavinMQ @password = password end + def etcd : Etcd + @etcd + end + def clear @files.clear end @@ -239,6 +243,10 @@ module LavinMQ def password : String "" end + + def etcd : Etcd? + nil + end end end end diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index 187af529b..b15b5a9b9 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -449,10 +449,12 @@ module LavinMQ io = @definitions_file if io.size.zero? load_default_definitions - compact! return end - + if etcd = @replicator.etcd + load_definitions_from_etcd(etcd) + return + end @log.info { "Loading definitions" } @definitions_lock.synchronize do @log.debug { "Verifying schema" } @@ -522,23 +524,23 @@ module LavinMQ private def load_default_definitions @log.info { "Loading default definitions" } - @exchanges[""] = DefaultExchange.new(self, "", true, false, false) - @exchanges["amq.direct"] = DirectExchange.new(self, "amq.direct", true, false, false) - @exchanges["amq.fanout"] = FanoutExchange.new(self, "amq.fanout", true, false, false) - @exchanges["amq.topic"] = TopicExchange.new(self, "amq.topic", true, false, false) - @exchanges["amq.headers"] = HeadersExchange.new(self, "amq.headers", true, false, false) - @exchanges["amq.match"] = HeadersExchange.new(self, "amq.match", true, false, false) + declare_exchange("", "direct", true, false) + declare_exchange("amq.direct", "direct", true, false) + declare_exchange("amq.fanout", "fanout", true, false) + declare_exchange("amq.topic", "topic", true, false) + declare_exchange("amq.headers", "headers", true, false) + declare_exchange("amq.match", "headers", true, false) end - private def load_definitions_from_etcd - @etcd.get_prefix(etcd_path("queues")).each do |key, value| + private def load_definitions_from_etcd(etcd : Etcd) + etcd.get_prefix(etcd_path("queues")).each do |key, value| queue_name = "" key.split('/') { |s| queue_name = URI.decode_www_form(s) } # get last split value without allocation json = JSON.parse(value) @queues[queue_name] = QueueFactory.make(self, json) end - @etcd.get_prefix(etcd_path("exchanges")).each do |key, value| + etcd.get_prefix(etcd_path("exchanges")).each do |key, value| exchange_name = "" key.split('/') { |s| exchange_name = URI.decode_www_form(s) } # get last split value without allocation json = JSON.parse(value) @@ -546,7 +548,7 @@ module LavinMQ make_exchange(self, exchange_name, json["type"].as_s, true, json["auto_delete"].as_bool, json["internal"].as_bool, json["arguments"].as_h) end - @etcd.get_prefix(etcd_path("queue-bindings")).each do |key, value| + etcd.get_prefix(etcd_path("queue-bindings")).each do |key, value| _, _, _, queue_name, exchange_name, routing_key, _ = split_etcd_path(key) json = JSON.parse(value) x = @exchanges[exchange_name] @@ -554,7 +556,7 @@ module LavinMQ x.bind(q, routing_key, json["arguments"].to_h) end - @etcd.get_prefix(etcd_path("exchange-bindings")).each do |key, value| + etcd.get_prefix(etcd_path("exchange-bindings")).each do |key, value| _, _, _, destination, source, routing_key, _ = split_etcd_path(key) json = JSON.parse(value) src = @exchanges[source] @@ -612,7 +614,7 @@ module LavinMQ bytes = frame.to_slice @definitions_file.write bytes @replicator.append @definitions_file_path, bytes - if etcd = @replicator.as?(Clustering::Server).try &.@etcd # FIXME: hack + if etcd = @replicator.etcd store_definition_in_etcd(frame, etcd) else @definitions_file.fsync