From db677feab85b2494492147d0f225349f952be0cc Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Mon, 19 Apr 2021 15:28:39 +1000 Subject: [PATCH] feat: use transactions when registering and stopping nodes all service information exists on a single redis node --- shard.yml | 2 +- spec/lookup_spec.cr | 13 ++++++++----- src/redis_service_manager.cr | 28 ++++++++++++++++------------ src/redis_service_manager/lookup.cr | 2 +- 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/shard.yml b/shard.yml index aa73494..b465a57 100644 --- a/shard.yml +++ b/shard.yml @@ -1,5 +1,5 @@ name: redis_service_manager -version: 1.1.5 +version: 1.2.0 crystal: ~> 1.0 dependencies: diff --git a/spec/lookup_spec.cr b/spec/lookup_spec.cr index b7f6edf..a52ce40 100644 --- a/spec/lookup_spec.cr +++ b/spec/lookup_spec.cr @@ -29,23 +29,26 @@ describe RedisServiceManager::Lookup do leader = "node2" channel.send nil end + node1.start - Fiber.yield node2.start - channel.receive? + loop do + break if node1.cluster_size == 2 + channel.receive? + end node1.cluster_size.should eq(2) node1.ready.should be_true - node1.leader.should be_true node2.cluster_size.should eq(2) node2.ready.should be_true - leader.should eq("node1") # Get the cluster state lookup = RedisServiceManager::Lookup.new("spec") redis = Redis::Client.boot(REDIS_URL) nodes = lookup.nodes(redis) - nodes.nodes.should eq(["http://node1/node1", "http://node2/node2"]) + + nodes.nodes.includes?("http://node1/node1").should be_true + nodes.nodes.includes?("http://node2/node2").should be_true node2.stop node1.stop diff --git a/src/redis_service_manager.cr b/src/redis_service_manager.cr index 0586249..11bf5f8 100644 --- a/src/redis_service_manager.cr +++ b/src/redis_service_manager.cr @@ -8,8 +8,8 @@ class RedisServiceManager # @redis = Redis.new(url: redis) @redis = Redis::Client.boot(redis) @lock = Mutex.new(:reentrant) - @version_key = "service_#{@service}_version" - @hash_key = "service_#{@service}_lookup" + @version_key = "{service_#{@service}}_version" + @hash_key = "{service_#{@service}}_lookup" @hash = NodeHash.new(@hash_key, @redis) end @@ -57,9 +57,12 @@ class RedisServiceManager @stopped = true # remove node and bump the version - @redis.del(node_key) - @hash.delete(node_key) - @redis.set(@version_key, ULID.generate) + @redis.multi(node_key, reconnect: true) do |transaction| + transaction.del(node_key) + # @hash.delete(node_key) + transaction.hdel(hash_key, node_key) + transaction.set(@version_key, ULID.generate) + end end Log.trace { "node stopped" } @@ -109,7 +112,7 @@ class RedisServiceManager protected def generate_ulid @leader = false @ulid = ULID.generate - @node_key = "service_#{@service}.#{@ulid}" + @node_key = "{service_#{@service}}.#{@ulid}" end protected def register @@ -119,11 +122,12 @@ class RedisServiceManager Log.trace { "registering node #{@ulid} in cluster" } # register this node - @redis.set(node_key, node_info.to_json, ex: @ttl) - @hash[node_key] = @uri - - # expire the version - @redis.set(@version_key, ULID.generate) + @redis.multi(node_key, reconnect: true) do |transaction| + transaction.set(node_key, node_info.to_json, ex: @ttl) + transaction.hset(hash_key, node_key, @uri) + # expire the version + transaction.set(@version_key, ULID.generate) + end check_version(node_info) end @@ -159,7 +163,7 @@ class RedisServiceManager @redis.set(node_key, node_info.to_json, ex: @ttl) # notify of rebalance - Log.trace { "cluster details updated, #{new_list.size} node detected, requesting rebalance on version #{version} and waiting for ready" } + Log.trace { "cluster details updated, #{new_list.size} nodes detected, requesting rebalance on version #{version} and waiting for ready" } perform_rebalance version # we'll give the node a tick before checking if cluster ready diff --git a/src/redis_service_manager/lookup.cr b/src/redis_service_manager/lookup.cr index 07f20c4..9af5e9d 100644 --- a/src/redis_service_manager/lookup.cr +++ b/src/redis_service_manager/lookup.cr @@ -7,7 +7,7 @@ class RedisServiceManager def initialize(service : String, @ttl : Time::Span = 5.seconds) @nodes = RendezvousHash.new @expires = Time.unix(0) - @hash_key = "service_#{service}_lookup" + @hash_key = "{service_#{service}}_lookup" end # Redis is passed in here as it's not threadsafe and this way we can have a