From a8d6d3315efee7ae1f179f80878e269a71b9a0bd Mon Sep 17 00:00:00 2001 From: Thomas Carmet <8408330+tcarmet@users.noreply.github.com> Date: Thu, 18 Apr 2024 23:10:48 +0200 Subject: [PATCH] PTFE-1621 stronger healthcheck against leaks of runner (#582) --- runner_manager/backend/aws.py | 31 +++++++---- runner_manager/backend/base.py | 1 + runner_manager/backend/gcloud.py | 14 +++-- runner_manager/models/backend.py | 7 +-- runner_manager/models/runner.py | 5 ++ runner_manager/models/runner_group.py | 22 +++++--- tests/unit/backend/test_aws.py | 5 +- tests/unit/backend/test_gcp.py | 74 ++++++++++++++------------ tests/unit/jobs/test_healthchecks.py | 49 +++++++++++++++++ tests/unit/jobs/test_reset.py | 5 +- tests/unit/models/test_runner.py | 12 +++++ tests/unit/models/test_runner_group.py | 7 +++ 12 files changed, 171 insertions(+), 61 deletions(-) diff --git a/runner_manager/backend/aws.py b/runner_manager/backend/aws.py index b4047850..420b39e6 100644 --- a/runner_manager/backend/aws.py +++ b/runner_manager/backend/aws.py @@ -4,6 +4,7 @@ from botocore.exceptions import ClientError from githubkit.versions.latest.webhooks import WorkflowJobEvent from mypy_boto3_ec2 import EC2Client +from mypy_boto3_ec2.type_defs import FilterTypeDef, InstanceTypeDef, TagTypeDef from pydantic import Field from redis_om import NotFoundError @@ -54,19 +55,23 @@ def delete(self, runner: Runner): raise e return super().delete(runner) + def _get_instance_name(self, instance: InstanceTypeDef) -> str: + """Get the instance name.""" + name = "" + for tag in instance.get("Tags", []): + if tag.get("Key") == "Name": + name = tag.get("Value", "") + return name + def list(self) -> List[Runner]: """List runners.""" try: reservations = self.client.describe_instances( Filters=[ - { - "Name": "tag-key", - "Values": ["runner-manager"], - }, - { - "Name": "instance-state-name", - "Values": ["running"], - }, + FilterTypeDef(Name="tag:manager", Values=[str(self.manager)]), + FilterTypeDef( + Name="tag:runner_group", Values=[str(self.runner_group)] + ), ] ).get("Reservations") except Exception as e: @@ -76,15 +81,18 @@ def list(self) -> List[Runner]: for reservation in reservations: for instance in reservation.get("Instances", []): instance_id = instance.get("InstanceId", "") + name = self._get_instance_name(instance) try: runner = Runner.find( Runner.instance_id == instance_id, ).first() except NotFoundError: runner = Runner( - name=instance_id, + name=name, instance_id=instance_id, + runner_group_name=self.runner_group, busy=False, + created_at=instance.get("LaunchTime"), ) runners.append(runner) return runners @@ -97,7 +105,10 @@ def update( try: self.client.create_tags( Resources=[runner.instance_id], - Tags=[{"Key": "tag:status", "Value": runner.status}], + Tags=[ + TagTypeDef(Key="status", Value=runner.status), + TagTypeDef(Key="busy", Value=str(runner.busy)), + ], ) except Exception as e: log.error(e) diff --git a/runner_manager/backend/base.py b/runner_manager/backend/base.py index 7f3b63b2..6ffd2bb6 100644 --- a/runner_manager/backend/base.py +++ b/runner_manager/backend/base.py @@ -20,6 +20,7 @@ class BaseBackend(BaseModel): config: Optional[BackendConfig] = BackendConfig() instance_config: Optional[InstanceConfig] = InstanceConfig() manager: Optional[str] = None + runner_group: Optional[str] = None # Inherited classes will have a client property configured # to interact with the backend. diff --git a/runner_manager/backend/gcloud.py b/runner_manager/backend/gcloud.py index 1fcd21c8..6bbf81c3 100644 --- a/runner_manager/backend/gcloud.py +++ b/runner_manager/backend/gcloud.py @@ -171,9 +171,10 @@ def setup_labels( ) -> MutableMapping[str, str]: labels: MutableMapping[str, str] = self.instance_config.labels.copy() if self.manager: - labels["runner-manager"] = self.manager + labels["manager"] = self.manager labels["status"] = self._sanitize_label_value(runner.status) labels["busy"] = self._sanitize_label_value(str(runner.busy)) + labels["runner_group"] = self._sanitize_label_value(self.runner_group) if webhook: labels["repository"] = self._sanitize_label_value(webhook.repository.name) labels["organization"] = self._sanitize_label_value( @@ -252,12 +253,15 @@ def list(self) -> List[Runner]: zone=self.config.zone, ) for instance in instances: - labels = instance.labels or {} - if self.manager and "runner-manager" in labels: - runner = Runner( + manager = instance.labels.get("manager", "") + runner_group = instance.labels.get("runner_group", "") + if manager == self.manager and runner_group == self.runner_group: + runner: Runner = Runner( name=instance.name, instance_id=instance.name, - busy=False, + runner_group_name=self.runner_group, + busy=bool(instance.labels.get("busy", False)), + created_at=instance.creation_timestamp, ) runners.append(runner) diff --git a/runner_manager/models/backend.py b/runner_manager/models/backend.py index c08e7535..295d1a7f 100644 --- a/runner_manager/models/backend.py +++ b/runner_manager/models/backend.py @@ -158,9 +158,10 @@ def configure_instance(self, runner: Runner) -> AwsInstance: """Configure instance.""" tags: Sequence[TagTypeDef] = [ TagTypeDef(Key="Name", Value=runner.name), - TagTypeDef( - Key="runner-manager", Value=runner.manager if runner.manager else "" - ), + TagTypeDef(Key="manager", Value=runner.manager if runner.manager else ""), + TagTypeDef(Key="runner_group", Value=runner.runner_group_name), + TagTypeDef(Key="busy", Value=str(runner.busy)), + TagTypeDef(Key="status", Value=runner.status), ] tags.extend( [TagTypeDef(Key=key, Value=value) for key, value in self.tags.items()] diff --git a/runner_manager/models/runner.py b/runner_manager/models/runner.py index dad6f4bd..8d4d241a 100644 --- a/runner_manager/models/runner.py +++ b/runner_manager/models/runner.py @@ -74,6 +74,11 @@ class Runner(BaseModel): def __str__(self): return f"{self.name} (status: {self.status}, busy: {self.busy})" + def __eq__(self, other: object) -> bool: + if not isinstance(other, Runner): + return NotImplemented + return self.id == other.id or self.name == other.name + @classmethod def find_from_webhook(cls, webhook: WorkflowJobEvent) -> "Runner | None": """Find a runner from a webhook payload diff --git a/runner_manager/models/runner_group.py b/runner_manager/models/runner_group.py index b8d117d7..6eab5f73 100644 --- a/runner_manager/models/runner_group.py +++ b/runner_manager/models/runner_group.py @@ -10,7 +10,7 @@ from githubkit.versions.latest.webhooks import WorkflowJobEvent from pydantic import BaseModel as PydanticBaseModel from pydantic import Field as PydanticField -from pydantic import validator +from pydantic import root_validator, validator from redis_om import Field, NotFoundError from typing_extensions import Annotated @@ -68,11 +68,11 @@ def __str__(self) -> str: f"current: {len(self.get_runners())}, queued: {self.queued})" ) - def __post_init_post_parse__(self): - """Post init.""" - super().__post_init_post_parse__() - if self.backend.manager is None: - self.backend.manager = self.manager + @root_validator(skip_on_failure=True) + def setup_backend(cls, values): + values["backend"].manager = values["manager"] + values["backend"].runner_group = values["name"] + return values @validator("name") def validate_name(cls, v): @@ -313,6 +313,16 @@ def healthcheck( ): """Healthcheck runner group.""" runners = self.get_runners() + backend_runners = self.backend.list() + # Check if there's a runner that is not in the database + if len(runners) != len(backend_runners): + for backend_runner in backend_runners: + if backend_runner not in runners: + # A runner has leaked from the database/runner manager + # let's save it and add it to the list of runners + backend_runner.save() + runners.append(backend_runner) + for runner in runners: runner.update_from_github(github) if runner.time_to_live_expired(time_to_live): diff --git a/tests/unit/backend/test_aws.py b/tests/unit/backend/test_aws.py index 9bad369c..61310dbc 100644 --- a/tests/unit/backend/test_aws.py +++ b/tests/unit/backend/test_aws.py @@ -21,7 +21,7 @@ def aws_group(settings) -> RunnerGroup: subnet_id = os.getenv("AWS_SUBNET_ID", "") runner_group: RunnerGroup = RunnerGroup( id=3, - name="test", + name="default", organization="test", manager=settings.name, backend=AWSBackend( @@ -79,7 +79,8 @@ def test_create_delete(aws_runner, aws_group): @mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID"), reason="AWS credentials not found") def test_list(aws_runner, aws_group): runner = aws_group.backend.create(aws_runner) - assert runner in aws_group.backend.list() + runners = aws_group.backend.list() + assert runner in runners aws_group.backend.delete(runner) with raises(NotFoundError): aws_group.backend.get(runner.instance_id) diff --git a/tests/unit/backend/test_gcp.py b/tests/unit/backend/test_gcp.py index 139699a0..aed7e0f1 100644 --- a/tests/unit/backend/test_gcp.py +++ b/tests/unit/backend/test_gcp.py @@ -16,9 +16,9 @@ @fixture() -def gcp_group(settings, monkeypatch) -> RunnerGroup: +def gcp_group(settings) -> RunnerGroup: config = GCPConfig( - project_id=os.environ.get("CLOUDSDK_CORE_PROJECT", ""), + project_id=os.environ.get("GOOGLE_CLOUD_PROJECT", ""), region=os.environ.get("CLOUDSDK_COMPUTE_REGION", ""), zone=os.environ.get("CLOUDSDK_COMPUTE_ZONE", ""), google_application_credentials=os.environ.get( @@ -43,13 +43,18 @@ def gcp_group(settings, monkeypatch) -> RunnerGroup: "label", ], ) + return runner_group + + +@fixture() +def fake_gcp_group(gcp_group, monkeypatch): fake_image = Image( self_link="my_image_link", source_image="my_image", ) monkeypatch.setattr(GCPBackend, "image", fake_image) - return runner_group + return gcp_group @fixture() @@ -59,25 +64,25 @@ def gcp_runner(runner: Runner, gcp_group: RunnerGroup) -> Runner: return runner -def test_gcp_network_interfaces(gcp_group: RunnerGroup): - interfaces: List[NetworkInterface] = gcp_group.backend.network_interfaces +def test_gcp_network_interfaces(fake_gcp_group: RunnerGroup): + interfaces: List[NetworkInterface] = fake_gcp_group.backend.network_interfaces assert len(interfaces) == 1 - assert "default" in gcp_group.backend.network_interfaces[0].subnetwork + assert "default" in fake_gcp_group.backend.network_interfaces[0].subnetwork assert interfaces[0].access_configs[0].name == "External NAT" # Test disabling external IP - gcp_group.backend.instance_config.enable_external_ip = False - interfaces: List[NetworkInterface] = gcp_group.backend.network_interfaces + fake_gcp_group.backend.instance_config.enable_external_ip = False + interfaces: List[NetworkInterface] = fake_gcp_group.backend.network_interfaces assert len(interfaces) == 1 assert len(interfaces[0].access_configs) == 0 -def test_gcp_group(gcp_group: RunnerGroup): - gcp_group.save() - gcp_group.delete(gcp_group.pk) +def test_gcp_group(fake_gcp_group: RunnerGroup): + fake_gcp_group.save() + fake_gcp_group.delete(fake_gcp_group.pk) -def test_gcp_metadata(runner: Runner, gcp_group): - metadata = gcp_group.backend.configure_metadata(runner) +def test_gcp_metadata(runner: Runner, fake_gcp_group): + metadata = fake_gcp_group.backend.configure_metadata(runner) # Assert metadata are properly set startup: bool = False @@ -92,8 +97,8 @@ def test_gcp_metadata(runner: Runner, gcp_group): assert startup is True -def test_gcp_setup_labels(runner: Runner, gcp_group: RunnerGroup): - labels = gcp_group.backend.setup_labels(runner) +def test_gcp_setup_labels(runner: Runner, fake_gcp_group: RunnerGroup): + labels = fake_gcp_group.backend.setup_labels(runner) assert labels["status"] == runner.status assert labels["busy"] == str(runner.busy).lower() assert labels["key"] == "value" @@ -125,45 +130,45 @@ def test_gcp_setup_labels_with_webhook(webhook: WorkflowJobEvent): assert "workflow" not in labels.keys() -def test_gcp_spot_config(runner: Runner, gcp_group: RunnerGroup): - gcp_group.backend.instance_config.spot = True - scheduling = gcp_group.backend.scheduling +def test_gcp_spot_config(runner: Runner, fake_gcp_group: RunnerGroup): + fake_gcp_group.backend.instance_config.spot = True + scheduling = fake_gcp_group.backend.scheduling assert scheduling.provisioning_model == "SPOT" assert scheduling.instance_termination_action == "DELETE" - gcp_group.backend.instance_config.spot = False - scheduling = gcp_group.backend.scheduling + fake_gcp_group.backend.instance_config.spot = False + scheduling = fake_gcp_group.backend.scheduling assert scheduling.provisioning_model == "STANDARD" assert scheduling.instance_termination_action == "DEFAULT" -def test_gcp_disks(runner: Runner, gcp_group: RunnerGroup): +def test_gcp_disks(runner: Runner, fake_gcp_group: RunnerGroup): # patch self.image.self_link to return a fake image - disks = gcp_group.backend.disks - zone = gcp_group.backend.config.zone - disk_type = gcp_group.backend.instance_config.disk_type + disks = fake_gcp_group.backend.disks + zone = fake_gcp_group.backend.config.zone + disk_type = fake_gcp_group.backend.instance_config.disk_type assert len(disks) == 1 assert ( disks[0].initialize_params.disk_size_gb - == gcp_group.backend.instance_config.disk_size_gb + == fake_gcp_group.backend.instance_config.disk_size_gb ) assert disks[0].boot is True assert disks[0].auto_delete is True assert disks[0].initialize_params.disk_type == f"zones/{zone}/diskTypes/{disk_type}" -def test_gcp_instance(runner: Runner, gcp_group: RunnerGroup): - instance = gcp_group.backend.configure_instance(runner) +def test_gcp_instance(runner: Runner, fake_gcp_group: RunnerGroup): + instance = fake_gcp_group.backend.configure_instance(runner) assert instance.name == runner.name -def test_sanitize_label(gcp_group: RunnerGroup): - assert "test" == gcp_group.backend._sanitize_label_value("test") - assert "42" == gcp_group.backend._sanitize_label_value(42) - assert "42" == gcp_group.backend._sanitize_label_value(42.0) - assert "" == gcp_group.backend._sanitize_label_value(None) - assert "test" == gcp_group.backend._sanitize_label_value("-test-") - assert "" == gcp_group.backend._sanitize_label_value(float("nan")) +def test_sanitize_label(fake_gcp_group: RunnerGroup): + assert "test" == fake_gcp_group.backend._sanitize_label_value("test") + assert "42" == fake_gcp_group.backend._sanitize_label_value(42) + assert "42" == fake_gcp_group.backend._sanitize_label_value(42.0) + assert "" == fake_gcp_group.backend._sanitize_label_value(None) + assert "test" == fake_gcp_group.backend._sanitize_label_value("-test-") + assert "" == fake_gcp_group.backend._sanitize_label_value(float("nan")) @mark.skipif( @@ -209,6 +214,7 @@ def test_list(gcp_runner, gcp_group): runner = gcp_group.backend.create(gcp_runner) runners: List[Runner] = gcp_group.backend.list() assert any(runner.name == r.name for r in runners) + assert len(runners) == 1 gcp_group.backend.delete(runner) with raises(NotFoundError): gcp_group.backend.get(runner.instance_id) diff --git a/tests/unit/jobs/test_healthchecks.py b/tests/unit/jobs/test_healthchecks.py index f31943d8..7f8a550b 100644 --- a/tests/unit/jobs/test_healthchecks.py +++ b/tests/unit/jobs/test_healthchecks.py @@ -134,3 +134,52 @@ def test_healthcheck_job( settings.timeout_runner, ) assert len(runner_group.get_runners()) == 1 + + +def test_healthcheck_backend_leak( + runner_group: RunnerGroup, + settings: Settings, + queue: Queue, + github: GitHub, + monkeypatch, +): + runner_group.save() + runner = runner_group.create_runner(github) + # monkeypatch the runner_group.backend.list method to return a list + # with the recently created runner and a fake leaked runner + # This will simulate a leak in the backend + leaked_runner = Runner( + name="leak", + busy=False, + runner_group_name=runner_group.name, + runner_group_id=runner_group.id, + ) + monkeypatch.setattr( + "runner_manager.backend.base.BaseBackend.list", + lambda self: [runner, leaked_runner], + ) + # ensure the method is patched and that the group is unaware of the leaked runner + assert len(runner_group.get_runners()) == 1 + assert len(runner_group.backend.list()) == 2 + queue.enqueue( + healthcheck.group, + runner_group.pk, + settings.time_to_live, + settings.timeout_runner, + ) + # the healthcheck job should add the runner to the group + # because the timestamp is not yet expired + assert len(runner_group.get_runners()) == 2 + # Now we will expire the leaked runner + leaked_runner.created_at = datetime.now(timezone.utc) - ( + settings.timeout_runner + timedelta(minutes=1) + ) + leaked_runner.save() + queue.enqueue( + healthcheck.group, + runner_group.pk, + settings.time_to_live, + settings.timeout_runner, + ) + # the healthcheck job should remove the leaked runner + assert len(runner_group.get_runners()) == 1 diff --git a/tests/unit/jobs/test_reset.py b/tests/unit/jobs/test_reset.py index f4dab155..d8b17f02 100644 --- a/tests/unit/jobs/test_reset.py +++ b/tests/unit/jobs/test_reset.py @@ -24,13 +24,16 @@ def test_reset_job(runner_group: RunnerGroup, queue: Queue, github: GitHub): assert runner in runner_group.get_runners() # run reset job with offline runner runner.status = RunnerStatus.offline + runner.busy = False + runner.id = None runner.save() job = queue.enqueue( reset.group, runner_group.pk, ) assert job.get_status() == JobStatus.FINISHED - assert runner not in runner_group.get_runners() + runners = runner_group.get_runners() + assert runner not in runners assert len(runner_group.get_runners()) == 1 diff --git a/tests/unit/models/test_runner.py b/tests/unit/models/test_runner.py index e775ef16..bfd53779 100644 --- a/tests/unit/models/test_runner.py +++ b/tests/unit/models/test_runner.py @@ -6,6 +6,7 @@ ) from hypothesis import given from hypothesis import strategies as st +from pytest import raises from redis_om import Migrator, NotFoundError from runner_manager.clients.github import GitHub @@ -45,6 +46,17 @@ def test_find_runner(runner: Runner): Runner.find(Runner.name == runner.name).first() +def test_eq_method(runner: Runner): + assert runner == runner + assert runner == Runner(name=runner.name, busy=False) + assert runner != Runner(name="different", busy=False) + assert runner == Runner(id=runner.id, name="different", busy=False) + assert runner in [runner] + assert runner in [Runner(name=runner.name, busy=False)] + with raises(AssertionError): + assert runner == "runner" + + @given(webhook=WorkflowJobCompletedStrategy) def test_find_from_webhook(runner: Runner, webhook: WorkflowJobCompleted): webhook.workflow_job.runner_id = runner.id diff --git a/tests/unit/models/test_runner_group.py b/tests/unit/models/test_runner_group.py index 360dbb21..0ad2a528 100644 --- a/tests/unit/models/test_runner_group.py +++ b/tests/unit/models/test_runner_group.py @@ -79,6 +79,13 @@ def test_find_runner_group_labels(runner_group: RunnerGroup): assert RunnerGroup.find_from_labels(runner_group.labels) == runner_group +def test_applied_config_backend(runner_group: RunnerGroup): + runner_group.save() + assert runner_group.manager is not None + assert runner_group.backend.manager == runner_group.manager + assert runner_group.backend.runner_group == runner_group.name + + def test_find_group_not_found(runner_group: RunnerGroup): runner_group.save() assert RunnerGroup.find_from_labels(["notfound"]) is None