Skip to content

Commit

Permalink
feat: implement a possible clustering interface
Browse files Browse the repository at this point in the history
  • Loading branch information
stakach committed Apr 19, 2021
1 parent db677fe commit d7cb7b2
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 154 deletions.
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: redis_service_manager
version: 1.2.0
version: 2.0.0
crystal: ~> 1.0

dependencies:
Expand Down
36 changes: 17 additions & 19 deletions spec/lookup_spec.cr
Original file line number Diff line number Diff line change
@@ -1,37 +1,37 @@
require "./spec_helper"

describe RedisServiceManager::Lookup do
describe RedisServiceManager do
it "should return a rendezvous-hash for the cluster" do
channel = Channel(Nil).new
leader = ""

# Start a 2 node cluster
node1 = RedisServiceManager.new("spec", "http://node1/node1", redis: REDIS_URL, ttl: 4)
node1 = RedisServiceManager.new("spec", uri: "http://node1/node1", redis: REDIS_URL, ttl: 4)
node1.ready.should eq(false)
node1.on_rebalance do |version, _nodes|
node1.on_rebalance do |_nodes, rebalance_complete_cb|
puts "REBALANCING NODE 1"
node1.ready(version)
rebalance_complete_cb.call
end
node1.on_cluster_ready do |_version|
node1.on_cluster_stable do
puts "CLUSTER READY NODE1"
leader = "node1"
channel.send nil
end

node2 = RedisServiceManager.new("spec", "http://node2/node2", redis: REDIS_URL, ttl: 4)
node2 = RedisServiceManager.new("spec", uri: "http://node2/node2", redis: REDIS_URL, ttl: 4)
node2.ready.should eq(false)
node2.on_rebalance do |version, _nodes|
node2.on_rebalance do |_nodes, rebalance_complete_cb|
puts "REBALANCING NODE 2"
node2.ready(version)
rebalance_complete_cb.call
end
node2.on_cluster_ready do |_version|
node2.on_cluster_stable do
puts "CLUSTER READY NODE2"
leader = "node2"
channel.send nil
end

node1.start
node2.start
node1.register
node2.register
loop do
break if node1.cluster_size == 2
channel.receive?
Expand All @@ -43,14 +43,12 @@ describe RedisServiceManager::Lookup do
node2.ready.should be_true

# Get the cluster state
lookup = RedisServiceManager::Lookup.new("spec")
redis = Redis::Client.boot(REDIS_URL)
nodes = lookup.nodes(redis)
lookup = Clustering::Discovery.new RedisServiceManager.new("spec", REDIS_URL)
hash = lookup.nodes
hash.nodes.includes?("http://node1/node1").should be_true
hash.nodes.includes?("http://node2/node2").should be_true

nodes.nodes.includes?("http://node1/node1").should be_true
nodes.nodes.includes?("http://node2/node2").should be_true

node2.stop
node1.stop
node2.unregister
node1.unregister
end
end
143 changes: 79 additions & 64 deletions spec/manager_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,62 @@ describe RedisServiceManager do
it "should join a cluster of one" do
channel = Channel(Nil).new

manager = RedisServiceManager.new("spec", "http://localhost:1234/spec1", redis: REDIS_URL, ttl: 4)
manager.on_rebalance do |version, _nodes|
manager = RedisServiceManager.new("spec", uri: "http://localhost:1234/spec1", redis: REDIS_URL, ttl: 4)
manager.on_rebalance do |_nodes, rebalance_complete_cb|
puts "REBALANCING"
manager.ready(version)
rebalance_complete_cb.call
end
manager.on_cluster_ready do |_version|
manager.on_cluster_stable do
puts "CLUSTER READY"
channel.close
end
manager.start
manager.register

channel.receive?
manager.stop
manager.unregister
end

it "should join a cluster of one and then rebalance when a new node joins" do
channel = Channel(Nil).new
leader = ""

node1 = RedisServiceManager.new("spec", "http://node1/node1", redis: REDIS_URL, ttl: 4)
node1 = RedisServiceManager.new("spec", uri: "http://node1/node1", redis: REDIS_URL, ttl: 4)
node1.ready.should eq(false)

node1.on_rebalance do |version, _nodes|
node1.on_rebalance do |_nodes, rebalance_complete_cb|
puts "REBALANCING NODE 1"
node1.ready(version)
rebalance_complete_cb.call
end
node1.on_cluster_ready do |_version|
node1.on_cluster_stable do
puts "CLUSTER READY NODE1"
leader = "node1"
channel.send nil
end
node1.start
node1.register

channel.receive?
node1.cluster_size.should eq(1)
node1.ready.should eq(true)

# Join a second node
node2 = RedisServiceManager.new("spec", "http://node2/node2", redis: REDIS_URL, ttl: 4)
node2 = RedisServiceManager.new("spec", uri: "http://node2/node2", redis: REDIS_URL, ttl: 4)
node2.ready.should eq(false)

node2.on_rebalance do |version, _nodes|
node2.on_rebalance do |_nodes, rebalance_complete_cb|
puts "REBALANCING NODE 2"
node2.ready(version)
rebalance_complete_cb.call
end
node2.on_cluster_ready do |_version|
node2.on_cluster_stable do
puts "CLUSTER READY NODE2"
leader = "node2"
channel.send nil
end
node2.start
node2.register

channel.receive?
loop do
break if node1.cluster_size == 2
channel.receive?
end

node1.cluster_size.should eq(2)
node1.ready.should be_true
Expand All @@ -68,7 +71,7 @@ describe RedisServiceManager do
node2.leader.should be_false
leader.should eq("node1")

node1.stop
node1.unregister
channel.receive?

node2.cluster_size.should eq(1)
Expand All @@ -77,47 +80,50 @@ describe RedisServiceManager do
node2.cluster_ready.should be_true
leader.should eq("node2")

node2.stop
node2.unregister
end

it "a two node cluster should detect when the leader node goes offline" do
channel = Channel(Nil).new
leader = ""

node1 = RedisServiceManager.new("spec", "http://node1/node1", redis: REDIS_URL, ttl: 4)
node1 = RedisServiceManager.new("spec", uri: "http://node1/node1", redis: REDIS_URL, ttl: 4)
node1.ready.should eq(false)

node1.on_rebalance do |version, _nodes|
node1.on_rebalance do |_nodes, rebalance_complete_cb|
puts "REBALANCING NODE 1"
node1.ready(version)
rebalance_complete_cb.call
end
node1.on_cluster_ready do |_version|
node1.on_cluster_stable do
puts "CLUSTER READY NODE1"
leader = "node1"
channel.send nil
end
node1.start
node1.register

channel.receive?
node1.cluster_size.should eq(1)
node1.ready.should eq(true)

# Join a second node
node2 = RedisServiceManager.new("spec", "http://node2/node2", redis: REDIS_URL, ttl: 4)
node2 = RedisServiceManager.new("spec", uri: "http://node2/node2", redis: REDIS_URL, ttl: 4)
node2.ready.should eq(false)

node2.on_rebalance do |version, _nodes|
node2.on_rebalance do |_nodes, rebalance_complete_cb|
puts "REBALANCING NODE 2"
node2.ready(version)
rebalance_complete_cb.call
end
node2.on_cluster_ready do |_version|
node2.on_cluster_stable do
puts "CLUSTER READY NODE2"
leader = "node2"
channel.send nil
end
node2.start
node2.register

channel.receive?
loop do
break if node1.cluster_size == 2
channel.receive?
end

node1.cluster_size.should eq(2)
node1.ready.should be_true
Expand All @@ -138,47 +144,50 @@ describe RedisServiceManager do
node2.cluster_ready.should be_true
leader.should eq("node2")

node2.stop
node2.unregister
end

it "a two node cluster should detect when a node goes offline" do
channel = Channel(Nil).new
leader = ""

node1 = RedisServiceManager.new("spec", "http://node1/node1", redis: REDIS_URL, ttl: 4)
node1 = RedisServiceManager.new("spec", uri: "http://node1/node1", redis: REDIS_URL, ttl: 4)
node1.ready.should eq(false)

node1.on_rebalance do |version, _nodes|
node1.on_rebalance do |_nodes, rebalance_complete_cb|
puts "REBALANCING NODE 1"
node1.ready(version)
rebalance_complete_cb.call
end
node1.on_cluster_ready do |_version|
node1.on_cluster_stable do
puts "CLUSTER READY NODE1"
leader = "node1"
channel.send nil
end
node1.start
node1.register

channel.receive?
node1.cluster_size.should eq(1)
node1.ready.should eq(true)

# Join a second node
node2 = RedisServiceManager.new("spec", "http://node2/node2", redis: REDIS_URL, ttl: 4)
node2 = RedisServiceManager.new("spec", uri: "http://node2/node2", redis: REDIS_URL, ttl: 4)
node2.ready.should eq(false)

node2.on_rebalance do |version, _nodes|
node2.on_rebalance do |_nodes, rebalance_complete_cb|
puts "REBALANCING NODE 2"
node2.ready(version)
rebalance_complete_cb.call
end
node2.on_cluster_ready do |_version|
node2.on_cluster_stable do
puts "CLUSTER READY NODE2"
leader = "node2"
channel.send nil
end
node2.start
node2.register

channel.receive?
loop do
break if node1.cluster_size == 2
channel.receive?
end

node1.cluster_size.should eq(2)
node1.ready.should be_true
Expand All @@ -199,47 +208,50 @@ describe RedisServiceManager do
node1.cluster_ready.should be_true
leader.should eq("node1")

node1.stop
node1.unregister
end

it "should handle a node going offline and a new node replacing it" do
channel = Channel(Nil).new
leader = ""

node1 = RedisServiceManager.new("spec", "http://node1/node1", redis: REDIS_URL, ttl: 4)
node1 = RedisServiceManager.new("spec", uri: "http://node1/node1", redis: REDIS_URL, ttl: 4)
node1.ready.should eq(false)

node1.on_rebalance do |version, _nodes|
node1.on_rebalance do |_nodes, rebalance_complete_cb|
puts "REBALANCING NODE 1"
node1.ready(version)
rebalance_complete_cb.call
end
node1.on_cluster_ready do |_version|
node1.on_cluster_stable do
puts "CLUSTER READY NODE1"
leader = "node1"
channel.send nil
end
node1.start
node1.register

channel.receive?
node1.cluster_size.should eq(1)
node1.ready.should eq(true)

# Join a second node
node2 = RedisServiceManager.new("spec", "http://node2/node2", redis: REDIS_URL, ttl: 4)
node2 = RedisServiceManager.new("spec", uri: "http://node2/node2", redis: REDIS_URL, ttl: 4)
node2.ready.should eq(false)

node2.on_rebalance do |version, _nodes|
node2.on_rebalance do |_nodes, rebalance_complete_cb|
puts "REBALANCING NODE 2"
node2.ready(version)
rebalance_complete_cb.call
end
node2.on_cluster_ready do |_version|
node2.on_cluster_stable do
puts "CLUSTER READY NODE2"
leader = "node2"
channel.send nil
end
node2.start
node2.register

channel.receive?
loop do
break if node1.cluster_size == 2
channel.receive?
end

node1.cluster_size.should eq(2)
node1.ready.should be_true
Expand All @@ -254,31 +266,34 @@ describe RedisServiceManager do
# ======
# node1 goes offline and node3 replaces it
# ======
node1.chaos_stop
node3 = RedisServiceManager.new("spec", "http://node3/node3", redis: REDIS_URL, ttl: 4)
node3 = RedisServiceManager.new("spec", uri: "http://node3/node3", redis: REDIS_URL, ttl: 4)
node3.ready.should eq(false)
node3.on_rebalance do |version, _nodes|
node3.on_rebalance do |_nodes, rebalance_complete_cb|
puts "REBALANCING NODE 3"
sleep 10
puts "REBALANCED NODE 3"
node3.ready(version)
rebalance_complete_cb.call
end
node3.on_cluster_ready do |_version|
node3.on_cluster_stable do
puts "CLUSTER READY NODE3"
leader = "node3"
channel.send nil
end
node3.start

channel.receive?
node1.chaos_stop
node3.register

loop do
break if node3.cluster_size == 2 && node2.cluster_size == 2
channel.receive?
end

node2.cluster_size.should eq(2)
node2.ready.should be_true
node2.leader.should be_true
node2.cluster_ready.should be_true
leader.should eq("node2")

node2.stop
node3.stop
node2.unregister
node3.unregister
end
end
Loading

0 comments on commit d7cb7b2

Please sign in to comment.