Skip to content

Commit

Permalink
chore: Hide replicas from CLUSTER subcmds in managed mode (#4174)
Browse files Browse the repository at this point in the history
* chore: Hide replicas from `CLUSTER` subcmds in managed mode

Part of #4173 (see for context)

* server.client()
  • Loading branch information
chakaz authored Nov 24, 2024
1 parent e053639 commit 6a7f345
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 16 deletions.
13 changes: 9 additions & 4 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ ABSL_FLAG(std::string, cluster_node_id, "",
"ID within a cluster, used for slot assignment. MUST be unique. If empty, uses master "
"replication ID (random string)");

ABSL_FLAG(bool, managed_service_info, false,
"Hides some implementation details from users when true (i.e. in managed service env)");

ABSL_DECLARE_FLAG(int32_t, port);
ABSL_DECLARE_FLAG(uint16_t, announce_port);

Expand Down Expand Up @@ -122,10 +125,12 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co

info.master = {.id = id_, .ip = preferred_endpoint, .port = preferred_port};

for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
info.replicas.push_back({.id = replica.id,
.ip = replica.address,
.port = static_cast<uint16_t>(replica.listening_port)});
if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) {
for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
info.replicas.push_back({.id = replica.id,
.ip = replica.address,
.port = static_cast<uint16_t>(replica.listening_port)});
}
}
} else {
// TODO: We currently don't save the master's ID in the replica
Expand Down
45 changes: 33 additions & 12 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def is_local_host(ip: str) -> bool:

info = answer[2]
assert len(info) == 3
ip_addr = str(info[0], "utf-8")
ip_addr = info[0]
assert is_local_host(ip_addr)
assert info[1] == port

Expand All @@ -244,29 +244,29 @@ def is_local_host(ip: str) -> bool:
replica = replicas[i - 3]
rep_info = answer[i]
assert len(rep_info) == 3
ip_addr = str(rep_info[0], "utf-8")
ip_addr = rep_info[0]
assert is_local_host(ip_addr)
assert rep_info[1] == replica.port
assert rep_info[2] == replica.id

return True


@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated"})
# --managed_service_info means that Dragonfly is running in a managed service, so some details
# are hidden from users, see https://github.com/dragonflydb/dragonfly/issues/4173
@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated", "managed_service_info": "true"})
async def test_emulated_cluster_with_replicas(df_factory):
master = df_factory.create(port=BASE_PORT)
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
replicas = [df_factory.create(port=BASE_PORT + i, logtostdout=True) for i in range(1, 3)]

df_factory.start_all([master, *replicas])

c_master = aioredis.Redis(port=master.port)
master_id = (await c_master.execute_command("CLUSTER MYID")).decode("utf-8")
c_master = master.client()
c_master_admin = master.admin_client()
master_id = await c_master.execute_command("CLUSTER MYID")

c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]
replica_ids = [
(await c_replica.execute_command("CLUSTER MYID")).decode("utf-8")
for c_replica in c_replicas
]
c_replicas = [replica.client() for replica in replicas]
replica_ids = [(await c_replica.execute_command("CLUSTER MYID")) for c_replica in c_replicas]

for replica, c_replica in zip(replicas, c_replicas):
res = await c_replica.execute_command("CLUSTER SLOTS")
Expand All @@ -279,7 +279,7 @@ async def test_emulated_cluster_with_replicas(df_factory):
# Connect replicas to master
for replica, c_replica in zip(replicas, c_replicas):
rc = await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
assert str(rc, "utf-8") == "OK"
assert rc == "OK"

await asyncio.sleep(0.5)

Expand All @@ -290,6 +290,13 @@ async def test_emulated_cluster_with_replicas(df_factory):
)

res = await c_master.execute_command("CLUSTER SLOTS")
assert verify_slots_result(
port=master.port,
answer=res[0],
replicas=[],
)

res = await c_master_admin.execute_command("CLUSTER SLOTS")
assert verify_slots_result(
port=master.port,
answer=res[0],
Expand All @@ -308,6 +315,20 @@ async def test_emulated_cluster_with_replicas(df_factory):
"node_id": master_id,
"slots": [["0", "16383"]],
},
}

assert await c_master_admin.execute_command("CLUSTER NODES") == {
f"127.0.0.1:{master.port}": {
"connected": True,
"epoch": "0",
"flags": "myself,master",
"last_ping_sent": "0",
"last_pong_rcvd": "0",
"master_id": "-",
"migrations": [],
"node_id": master_id,
"slots": [["0", "16383"]],
},
f"127.0.0.1:{replicas[0].port}": {
"connected": True,
"epoch": "0",
Expand Down

0 comments on commit 6a7f345

Please sign in to comment.