Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store definitions in etcd #740

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions spec/etcd_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,30 @@ describe LavinMQ::Etcd do
end
end

it "can get all keys with a prefix" do
cluster = EtcdCluster.new(1)
cluster.run do
etcd = LavinMQ::Etcd.new(cluster.endpoints)
etcd.put("foo/a", "bar")
etcd.put("foo/b", "bar")
etcd.put("fou/c", "bar")
etcd.get_prefix("foo").should eq Hash{"foo/a" => "bar", "foo/b" => "bar"}
end
end

it "can get delete all keys with a prefix" do
cluster = EtcdCluster.new(1)
cluster.run do
etcd = LavinMQ::Etcd.new(cluster.endpoints)
etcd.put("foo/a", "bar")
etcd.put("foo/b", "bar")
etcd.put("fou/c", "bar")
etcd.del_prefix("foo").should eq 2
etcd.get_prefix("foo").empty?.should be_true
etcd.get("foo/a").should be_nil
end
end

it "can watch" do
cluster = EtcdCluster.new(1)
cluster.run do
Expand Down
1 change: 1 addition & 0 deletions src/lavinmq/clustering/replicator.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module LavinMQ
abstract def listen(server : TCPServer)
abstract def clear
abstract def password : String
abstract def etcd : Etcd?
end
end
end
8 changes: 8 additions & 0 deletions src/lavinmq/clustering/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ module LavinMQ
@password = password
end

def etcd : Etcd
@etcd
end

def clear
@files.clear
end
Expand Down Expand Up @@ -239,6 +243,10 @@ module LavinMQ
def password : String
""
end

def etcd : Etcd?
nil
end
end
end
end
39 changes: 35 additions & 4 deletions src/lavinmq/etcd.cr
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,27 @@ module LavinMQ
end
end

# Get all keys with a prefix
def get_prefix(prefix) : Hash(String, String)
range_end = prefix.to_slice.dup
len = range_end.bytesize
(len - 1).downto(0) do |i|
(range_end[i] &+= 1).zero? || break # continue if wrapped around
len = i
end
range_end = len.zero? ? Bytes[0] : range_end[0, len] # drop ending null values
json = post("/v3/kv/range", %({"key":"#{Base64.strict_encode prefix}","range_end":"#{Base64.strict_encode range_end}","limit":0,"serializable":true}))
result = Hash(String, String).new
if kvs = json["kvs"]?
kvs.as_a.each do |kv|
key = Base64.decode_string kv["key"].as_s
value = Base64.decode_string kv["value"].as_s
result[key] = value
end
end
result
end

def put(key, value) : String?
body = %({"key":"#{Base64.strict_encode key}","value":"#{Base64.strict_encode value}","prev_kv":true})
json = post("/v3/kv/put", body)
Expand All @@ -27,11 +48,25 @@ module LavinMQ
end
end

# Delete a key
def del(key) : Int32
json = post("/v3/kv/deleterange", %({"key":"#{Base64.strict_encode key}"}))
json.dig?("deleted").try(&.as_s.to_i) || 0
end

# Delete all keys with a prefix
def del_prefix(prefix) : Int32
range_end = prefix.to_slice.dup
len = range_end.bytesize
(len - 1).downto(0) do |i|
(range_end[i] &+= 1).zero? || break # continue if wrapped around
len = i
end
range_end = len.zero? ? Bytes[0] : range_end[0, len] # drop ending null values
json = post("/v3/kv/deleterange", %({"key":"#{Base64.strict_encode prefix}","range_end":"#{Base64.strict_encode range_end}"}))
json["deleted"].as_s.to_i
end

def watch(key, &)
body = %({"create_request":{"key":"#{Base64.strict_encode key}"}})
post_stream("/v3/watch", body) do |json|
Expand Down Expand Up @@ -221,10 +256,6 @@ module LavinMQ
return yield({socket, address})
rescue ex : NoLeader
raise ex # don't retry when leader is missing
rescue ex : Error
Log.warn { "Service Unavailable at #{address}, #{ex.message}, retrying" }
socket.close rescue nil
sleep 0.1.seconds
rescue IO::Error
Log.warn { "Lost connection to #{address}, retrying" }
socket.close rescue nil
Expand Down
112 changes: 103 additions & 9 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -449,10 +449,12 @@ module LavinMQ
io = @definitions_file
if io.size.zero?
load_default_definitions
compact!
return
end

if etcd = @replicator.etcd
load_definitions_from_etcd(etcd)
return
end
@log.info { "Loading definitions" }
@definitions_lock.synchronize do
@log.debug { "Verifying schema" }
Expand Down Expand Up @@ -522,12 +524,49 @@ module LavinMQ

private def load_default_definitions
@log.info { "Loading default definitions" }
@exchanges[""] = DefaultExchange.new(self, "", true, false, false)
@exchanges["amq.direct"] = DirectExchange.new(self, "amq.direct", true, false, false)
@exchanges["amq.fanout"] = FanoutExchange.new(self, "amq.fanout", true, false, false)
@exchanges["amq.topic"] = TopicExchange.new(self, "amq.topic", true, false, false)
@exchanges["amq.headers"] = HeadersExchange.new(self, "amq.headers", true, false, false)
@exchanges["amq.match"] = HeadersExchange.new(self, "amq.match", true, false, false)
declare_exchange("", "direct", true, false)
declare_exchange("amq.direct", "direct", true, false)
declare_exchange("amq.fanout", "fanout", true, false)
declare_exchange("amq.topic", "topic", true, false)
declare_exchange("amq.headers", "headers", true, false)
declare_exchange("amq.match", "headers", true, false)
end

private def load_definitions_from_etcd(etcd : Etcd)
etcd.get_prefix(etcd_path("queues")).each do |key, value|
queue_name = ""
key.split('/') { |s| queue_name = URI.decode_www_form(s) } # get last split value without allocation
json = JSON.parse(value)
@queues[queue_name] = QueueFactory.make(self, json)
end

etcd.get_prefix(etcd_path("exchanges")).each do |key, value|
exchange_name = ""
key.split('/') { |s| exchange_name = URI.decode_www_form(s) } # get last split value without allocation
json = JSON.parse(value)
@exchanges[exchange_name] =
make_exchange(self, exchange_name, json["type"].as_s, true, json["auto_delete"].as_bool, json["internal"].as_bool, json["arguments"].as_h)
end

etcd.get_prefix(etcd_path("queue-bindings")).each do |key, value|
_, _, _, queue_name, exchange_name, routing_key, _ = split_etcd_path(key)
json = JSON.parse(value)
x = @exchanges[exchange_name]
q = @queues[queue_name]
x.bind(q, routing_key, json["arguments"].to_h)
end

etcd.get_prefix(etcd_path("exchange-bindings")).each do |key, value|
_, _, _, destination, source, routing_key, _ = split_etcd_path(key)
json = JSON.parse(value)
src = @exchanges[source]
dst = @queues[destination]
src.bind(dst, routing_key, json["arguments"].to_h)
end
end

private def split_etcd_path(path) : Array(String)
path.split('/').map! { |p| URI.decode_www_form p }
end

private def compact!
Expand Down Expand Up @@ -575,7 +614,11 @@ module LavinMQ
bytes = frame.to_slice
@definitions_file.write bytes
@replicator.append @definitions_file_path, bytes
@definitions_file.fsync
if etcd = @replicator.etcd
store_definition_in_etcd(frame, etcd)
else
@definitions_file.fsync
end
if dirty
if (@definitions_deletes += 1) >= Config.instance.max_deleted_definitions
compact!
Expand All @@ -584,6 +627,57 @@ module LavinMQ
end
end

private def store_definition_in_etcd(f, etcd)
case f
when AMQP::Frame::Exchange::Declare
etcd.put(etcd_path("exchanges", f.exchange_name), {
type: f.exchange_type,
auto_delete: f.auto_delete,
internal: f.internal,
arguments: f.arguments,
}.to_json)
when AMQP::Frame::Exchange::Delete
etcd.del(etcd_path("exchanges", f.exchange_name))
etcd.del_prefix(etcd_path("exchange-bindings", f.exchange_name))
when AMQP::Frame::Exchange::Bind
args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result
etcd.put(etcd_path("exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash),
{arguments: f.arguments}.to_json)
when AMQP::Frame::Exchange::Unbind
args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result
etcd.del(etcd_path("exchanges", f.destination, "bindings", f.source, f.routing_key, args_hash))
when AMQP::Frame::Queue::Declare
etcd.put(etcd_path("queues", f.queue_name), {
arguments: f.arguments,
}.to_json)
when AMQP::Frame::Queue::Delete
etcd.del(etcd_path("queues", f.queue_name))
etcd.del_prefix(etcd_path("queue-bindings", f.queue_name))
when AMQP::Frame::Queue::Bind
args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result
etcd.put(etcd_path("queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash),
{
arguments: f.arguments,
}.to_json)
when AMQP::Frame::Queue::Unbind
args_hash = f.arguments.hash(Crystal::Hasher.new(0, 0)).result
etcd.del(etcd_path("queues", f.queue_name, "bindings", f.exchange_name, f.routing_key, args_hash))
else raise "Cannot apply frame #{f.class} in vhost #{@name}"
end
end

private def etcd_path(*args)
String.build do |str|
str << Config.instance.clustering_etcd_prefix
str << "/vhosts/"
URI.encode_www_form @name, str
args.each do |a|
str << "/"
URI.encode_www_form a.to_s, str
end
end
end

private def make_exchange(vhost, name, type, durable, auto_delete, internal, arguments)
case type
when "direct"
Expand Down
Loading