Skip to content

Commit

Permalink
feat: use transactions when registering and stopping nodes
Browse files Browse the repository at this point in the history
all service information exists on a single redis node
  • Loading branch information
stakach committed Apr 19, 2021
1 parent 95761d6 commit db677fe
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 19 deletions.
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: redis_service_manager
version: 1.1.5
version: 1.2.0
crystal: ~> 1.0

dependencies:
Expand Down
13 changes: 8 additions & 5 deletions spec/lookup_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,26 @@ describe RedisServiceManager::Lookup do
leader = "node2"
channel.send nil
end

node1.start
Fiber.yield
node2.start
channel.receive?
loop do
break if node1.cluster_size == 2
channel.receive?
end

node1.cluster_size.should eq(2)
node1.ready.should be_true
node1.leader.should be_true
node2.cluster_size.should eq(2)
node2.ready.should be_true
leader.should eq("node1")

# Get the cluster state
lookup = RedisServiceManager::Lookup.new("spec")
redis = Redis::Client.boot(REDIS_URL)
nodes = lookup.nodes(redis)
nodes.nodes.should eq(["http://node1/node1", "http://node2/node2"])

nodes.nodes.includes?("http://node1/node1").should be_true
nodes.nodes.includes?("http://node2/node2").should be_true

node2.stop
node1.stop
Expand Down
28 changes: 16 additions & 12 deletions src/redis_service_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ class RedisServiceManager
# @redis = Redis.new(url: redis)
@redis = Redis::Client.boot(redis)
@lock = Mutex.new(:reentrant)
@version_key = "service_#{@service}_version"
@hash_key = "service_#{@service}_lookup"
@version_key = "{service_#{@service}}_version"
@hash_key = "{service_#{@service}}_lookup"
@hash = NodeHash.new(@hash_key, @redis)
end

Expand Down Expand Up @@ -57,9 +57,12 @@ class RedisServiceManager
@stopped = true

# remove node and bump the version
@redis.del(node_key)
@hash.delete(node_key)
@redis.set(@version_key, ULID.generate)
@redis.multi(node_key, reconnect: true) do |transaction|
transaction.del(node_key)
# @hash.delete(node_key)
transaction.hdel(hash_key, node_key)
transaction.set(@version_key, ULID.generate)
end
end

Log.trace { "node stopped" }
Expand Down Expand Up @@ -109,7 +112,7 @@ class RedisServiceManager
protected def generate_ulid
@leader = false
@ulid = ULID.generate
@node_key = "service_#{@service}.#{@ulid}"
@node_key = "{service_#{@service}}.#{@ulid}"
end

protected def register
Expand All @@ -119,11 +122,12 @@ class RedisServiceManager
Log.trace { "registering node #{@ulid} in cluster" }

# register this node
@redis.set(node_key, node_info.to_json, ex: @ttl)
@hash[node_key] = @uri

# expire the version
@redis.set(@version_key, ULID.generate)
@redis.multi(node_key, reconnect: true) do |transaction|
transaction.set(node_key, node_info.to_json, ex: @ttl)
transaction.hset(hash_key, node_key, @uri)
# expire the version
transaction.set(@version_key, ULID.generate)
end

check_version(node_info)
end
Expand Down Expand Up @@ -159,7 +163,7 @@ class RedisServiceManager
@redis.set(node_key, node_info.to_json, ex: @ttl)

# notify of rebalance
Log.trace { "cluster details updated, #{new_list.size} node detected, requesting rebalance on version #{version} and waiting for ready" }
Log.trace { "cluster details updated, #{new_list.size} nodes detected, requesting rebalance on version #{version} and waiting for ready" }
perform_rebalance version

# we'll give the node a tick before checking if cluster ready
Expand Down
2 changes: 1 addition & 1 deletion src/redis_service_manager/lookup.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class RedisServiceManager
def initialize(service : String, @ttl : Time::Span = 5.seconds)
@nodes = RendezvousHash.new
@expires = Time.unix(0)
@hash_key = "service_#{service}_lookup"
@hash_key = "{service_#{service}}_lookup"
end

# Redis is passed in here as it's not threadsafe and this way we can have a
Expand Down

0 comments on commit db677fe

Please sign in to comment.