Skip to content

Commit

Permalink
load definitions from etcd
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Jul 30, 2024
1 parent 3990f4f commit 12da60b
Showing 1 changed file with 42 additions and 5 deletions.
47 changes: 42 additions & 5 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,43 @@ module LavinMQ
@exchanges["amq.match"] = HeadersExchange.new(self, "amq.match", true, false, false)
end

private def load_definitions_from_etcd
@etcd.get_prefix(join_path("lavinmq", @name, "queues")).each do |key, value|
queue_name = ""
key.split('/') { |s| queue_name = URI.decode_www_form(s) } # get last split value without allocation
json = JSON.parse(value)
@queues[queue_name] = QueueFactory.make(self, json)
end

@etcd.get_prefix(join_path("lavinmq", @name, "exchanges")).each do |key, value|
exchange_name = ""
key.split('/') { |s| exchange_name = URI.decode_www_form(s) } # get last split value without allocation
json = JSON.parse(value)
@exchanges[exchange_name] =
make_exchange(self, exchange_name, json["type"].as_s, true, json["auto_delete"].as_bool, json["internal"].as_bool, json["arguments"].as_h)
end

@etcd.get_prefix(join_path("lavinmq", @name, "queue-bindings")).each do |key, value|
_, _, _, queue_name, exchange_name, routing_key, _ = split_etcd_path(key)
json = JSON.parse(value)
x = @exchanges[exchange_name]
q = @queues[queue_name]
x.bind(q, routing_key, json["arguments"].to_h)
end

@etcd.get_prefix(join_path("lavinmq", @name, "exchange-bindings")).each do |key, value|
_, _, _, destination, source, routing_key, _ = split_etcd_path(key)
json = JSON.parse(value)
src = @exchanges[source]
dst = @queues[destination]
src.bind(dst, routing_key, json["arguments"].to_h)
end
end

private def split_etcd_path(path) : Array(String)
path.split('/').map! { |p| URI.decode_www_form p }
end

private def compact!
@definitions_lock.synchronize do
@log.info { "Compacting definitions" }
Expand Down Expand Up @@ -672,21 +709,21 @@ module LavinMQ
}.to_json)
when AMQP::Frame::Exchange::Delete
@etcd.del(join_path("lavinmq", @name, "exchanges", f.exchange_name))
@etcd.del_prefix(join_path("lavinmq", @name, "exchange-bindings", f.exchange_name))
when AMQP::Frame::Exchange::Bind
args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result
@etcd.put(join_path("lavinmq", @name, "exchange", f.destination, "bindings", f.source, f.routing_key, args_hash),
{
arguments: f.arguments,
}.to_json)
@etcd.put(join_path("lavinmq", @name, "exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash),
{arguments: f.arguments}.to_json)
when AMQP::Frame::Exchange::Unbind
args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result
@etcd.del(join_path("lavinmq", @name, "exchange", f.destination, "bindings", f.source, f.routing_key, args_hash))
@etcd.del(join_path("lavinmq", @name, "exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash))
when AMQP::Frame::Queue::Declare
@etcd.put(join_path("lavinmq", @name, "queues", f.queue_name), {
arguments: f.arguments,
}.to_json)
when AMQP::Frame::Queue::Delete
@etcd.del(join_path("lavinmq", @name, "queues", f.queue_name))
@etcd.del_prefix(join_path("lavinmq", @name, "queue-bindings", f.queue_name))
when AMQP::Frame::Queue::Bind
args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result
@etcd.put(join_path("lavinmq", @name, "queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash),
Expand Down

0 comments on commit 12da60b

Please sign in to comment.