Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-3684] Prevent stuck raft cluster on leader departure #379

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pydantic = "^1.10.17"
poetry-core = "^1.9.0"
pyOpenSSL = "^24.2.1"
jinja2 = "^3.1.4"
pysyncobj = "^0.3.12"

[tool.poetry.group.charm-libs.dependencies]
# data_platform_libs/v0/data_interfaces.py
Expand Down
107 changes: 96 additions & 11 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
Unit,
WaitingStatus,
)
from pysyncobj.utility import TcpUtility, UtilityException
from tenacity import RetryError, Retrying, retry, stop_after_attempt, stop_after_delay, wait_fixed

from backups import CANNOT_RESTORE_PITR, MOVE_RESTORED_CLUSTER_TO_ANOTHER_BUCKET, PostgreSQLBackups
Expand Down Expand Up @@ -175,6 +176,7 @@ def __init__(self, *args):
self.framework.observe(self.on[PEER].relation_departed, self._on_peer_relation_departed)
self.framework.observe(self.on.pgdata_storage_detaching, self._on_pgdata_storage_detaching)
self.framework.observe(self.on.start, self._on_start)
self.framework.observe(self.on.stop, self._on_stop)
self.framework.observe(self.on.get_password_action, self._on_get_password)
self.framework.observe(self.on.set_password_action, self._on_set_password)
self.framework.observe(self.on.update_status, self._on_update_status)
Expand Down Expand Up @@ -413,17 +415,8 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
logger.debug("Early exit on_peer_relation_departed: Skipping departing unit")
return

# Remove the departing member from the raft cluster.
try:
departing_member = event.departing_unit.name.replace("/", "-")
member_ip = self._patroni.get_member_ip(departing_member)
self._patroni.remove_raft_member(member_ip)
except RemoveRaftMemberFailedError:
logger.debug(
"Deferring on_peer_relation_departed: Failed to remove member from raft cluster"
)
event.defer()
return
departing_member = event.departing_unit.name.replace("/", "-")
member_ip = self._patroni.get_member_ip(departing_member)

# Allow leader to update the cluster members.
if not self.unit.is_leader():
Expand Down Expand Up @@ -1114,6 +1107,98 @@ def _restart_services_after_reboot(self):
self._patroni.start_patroni()
self.backup.start_stop_pgbackrest_service()

def _remove_raft_status_check(self, status: Dict, current: str) -> None:
if not status:
raise Exception("Failed to get raft status")
if status["leader"].address == current:
logger.warning("cannot remove raft member: member is leader")
raise Exception("Failed to remove raft leader")

def _remove_raft_node(
self, syncobj_util: TcpUtility, partners: List[str], current: str
) -> None:
"""Try to remove a raft member calling a partner node."""
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3), reraise=True):
with attempt:
if not self._patroni.stop_patroni():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change the leader in the raft cluster? (if the unit where Patroni is being stopped was the raft cluster leader)

logger.warning("cannot remove raft member: failed to stop Patroni")
raise Exception("Failed to stop service")

for attempt in Retrying(stop=stop_after_delay(120), wait=wait_fixed(3), reraise=True):
with attempt:
status = None
for partner in partners:
if not (status := self._get_raft_status(syncobj_util, partner)):
continue
self._remove_raft_status_check(status, current)

if f"partner_node_status_server_{current}" not in status:
logger.debug("Raft member already removed")
return

# If removing multiple units partner list will drift
_, partners = self._parse_raft_partners(status)
partners.insert(0, partner)

for partner in partners:
removal_result = syncobj_util.executeCommand(partner, ["remove", current])
if not removal_result.startswith("SUCCESS"):
logger.warning("failed to remove raft member: %s", removal_result)
continue
return
raise Exception("Failed to remove raft member")

def _get_raft_status(self, syncobj_util: TcpUtility, host: str) -> Optional[Dict]:
"""Get raft status."""
try:
return syncobj_util.executeCommand(host, ["status"])
except UtilityException:
return None

def _parse_raft_partners(self, status: Dict) -> Tuple[List[str], List[str]]:
"""Collect raft partner and ready nodes."""
partners = []
ready = []
for key in status.keys():
if key.startswith("partner_node_status_server_") and status[key]:
partner = key.split("partner_node_status_server_")[-1]
partners.append(partner)
if status[key] == 2:
ready.append(partner)
return partners, ready

def _on_stop(self, _) -> None:
syncobj_util = TcpUtility(timeout=3)
raft_host = "localhost:2222"
# Try to call a different unit
status = None
for ip in self._units_ips:
if ip != self._unit_ip:
raft_host = f"{ip}:2222"
if not (status := self._get_raft_status(syncobj_util, raft_host)):
continue
break
if not status:
raft_host = "localhost:2222"
if not (status := self._get_raft_status(syncobj_util, raft_host)):
logger.warning("Stopping unit: all raft members are unreachable")
self._patroni.stop_patroni()
return
Comment on lines +1181 to +1186
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if, in this situation, this unit can still be present in the raft cluster on the other units.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be because of the network cut case.


partners, ready = self._parse_raft_partners(status)
if not ready and not partners:
logger.debug("Terminating the last raft member")
self._patroni.stop_patroni()
return
if not ready:
raise Exception("Cannot stop unit: All other members are still connecting")

try:
self._remove_raft_node(syncobj_util, ready, status["self"].address)
except Exception:
self._patroni.start_patroni()
raise

def _setup_exporter(self) -> None:
"""Set up postgresql_exporter options."""
cache = snap.SnapCache()
Expand Down
33 changes: 17 additions & 16 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import requests
from charms.operator_libs_linux.v2 import snap
from jinja2 import Template
from pysyncobj.utility import TcpUtility, UtilityException
from tenacity import (
AttemptManager,
RetryError,
Expand Down Expand Up @@ -688,27 +689,27 @@ def remove_raft_member(self, member_ip: str) -> None:
is not part of the raft cluster.
"""
# Get the status of the raft cluster.
raft_status = subprocess.check_output([
"charmed-postgresql.syncobj-admin",
"-conn",
"127.0.0.1:2222",
"-status",
]).decode("UTF-8")
syncobj_util = TcpUtility(timeout=3)

raft_host = "127.0.0.1:2222"
try:
raft_status = syncobj_util.executeCommand(raft_host, ["status"])
except UtilityException:
logger.warning("Remove raft member: Cannot connect to raft cluster")
raise RemoveRaftMemberFailedError()

# Check whether the member is still part of the raft cluster.
if not member_ip or member_ip not in raft_status:
if not member_ip or f"partner_node_status_server_{member_ip}:2222" not in raft_status:
return

# Remove the member from the raft cluster.
result = subprocess.check_output([
"charmed-postgresql.syncobj-admin",
"-conn",
"127.0.0.1:2222",
"-remove",
f"{member_ip}:2222",
]).decode("UTF-8")

if "SUCCESS" not in result:
try:
result = syncobj_util.executeCommand(raft_host, ["remove", f"{member_ip}:2222"])
except UtilityException:
logger.debug("Remove raft member: Remove call failed")
raise RemoveRaftMemberFailedError()

if not result.startswith("SUCCESS"):
raise RemoveRaftMemberFailedError()

@retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=10))
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/new_relations/test_new_relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,11 @@ async def test_relation_data_is_updated_correctly_when_scaling(ops_test: OpsTest
unit for unit in units_to_remove if unit != leader_unit.name
])
await ops_test.model.wait_for_idle(
apps=[DATABASE_APP_NAME], status="active", timeout=600, wait_for_exact_units=3
)
await ops_test.model.applications[DATABASE_APP_NAME].destroy_units(leader_unit.name)
await ops_test.model.wait_for_idle(
apps=[DATABASE_APP_NAME], status="active", timeout=600, wait_for_exact_units=2
apps=[DATABASE_APP_NAME],
status="active",
timeout=1500,
wait_for_exact_units=2,
raise_on_error=False,
)

# Get the updated connection data and assert it can be used
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ async def test_relation_data_is_updated_correctly_when_scaling(ops_test: OpsTest
unit for unit in units_to_remove if unit != leader_unit.name
])
await ops_test.model.wait_for_idle(
apps=[DATABASE_APP_NAME], status="active", timeout=600, wait_for_exact_units=3
)
await ops_test.model.applications[DATABASE_APP_NAME].destroy_units(leader_unit.name)
await ops_test.model.wait_for_idle(
apps=[DATABASE_APP_NAME], status="active", timeout=600, wait_for_exact_units=2
apps=[DATABASE_APP_NAME],
status="active",
timeout=1500,
wait_for_exact_units=2,
raise_on_error=False,
)

# Get the updated connection data and assert it can be used
Expand Down
Loading
Loading