Skip to content

Commit

Permalink
PTFE-1621 stronger healthcheck against leaks of runner (#582)
Browse files Browse the repository at this point in the history
  • Loading branch information
tcarmet authored Apr 18, 2024
1 parent 9f55b76 commit a8d6d33
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 61 deletions.
31 changes: 21 additions & 10 deletions runner_manager/backend/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from botocore.exceptions import ClientError
from githubkit.versions.latest.webhooks import WorkflowJobEvent
from mypy_boto3_ec2 import EC2Client
from mypy_boto3_ec2.type_defs import FilterTypeDef, InstanceTypeDef, TagTypeDef
from pydantic import Field
from redis_om import NotFoundError

Expand Down Expand Up @@ -54,19 +55,23 @@ def delete(self, runner: Runner):
raise e
return super().delete(runner)

def _get_instance_name(self, instance: InstanceTypeDef) -> str:
"""Get the instance name."""
name = ""
for tag in instance.get("Tags", []):
if tag.get("Key") == "Name":
name = tag.get("Value", "")
return name

def list(self) -> List[Runner]:
"""List runners."""
try:
reservations = self.client.describe_instances(
Filters=[
{
"Name": "tag-key",
"Values": ["runner-manager"],
},
{
"Name": "instance-state-name",
"Values": ["running"],
},
FilterTypeDef(Name="tag:manager", Values=[str(self.manager)]),
FilterTypeDef(
Name="tag:runner_group", Values=[str(self.runner_group)]
),
]
).get("Reservations")
except Exception as e:
Expand All @@ -76,15 +81,18 @@ def list(self) -> List[Runner]:
for reservation in reservations:
for instance in reservation.get("Instances", []):
instance_id = instance.get("InstanceId", "")
name = self._get_instance_name(instance)
try:
runner = Runner.find(
Runner.instance_id == instance_id,
).first()
except NotFoundError:
runner = Runner(
name=instance_id,
name=name,
instance_id=instance_id,
runner_group_name=self.runner_group,
busy=False,
created_at=instance.get("LaunchTime"),
)
runners.append(runner)
return runners
Expand All @@ -97,7 +105,10 @@ def update(
try:
self.client.create_tags(
Resources=[runner.instance_id],
Tags=[{"Key": "tag:status", "Value": runner.status}],
Tags=[
TagTypeDef(Key="status", Value=runner.status),
TagTypeDef(Key="busy", Value=str(runner.busy)),
],
)
except Exception as e:
log.error(e)
Expand Down
1 change: 1 addition & 0 deletions runner_manager/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class BaseBackend(BaseModel):
config: Optional[BackendConfig] = BackendConfig()
instance_config: Optional[InstanceConfig] = InstanceConfig()
manager: Optional[str] = None
runner_group: Optional[str] = None

# Inherited classes will have a client property configured
# to interact with the backend.
Expand Down
14 changes: 9 additions & 5 deletions runner_manager/backend/gcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,10 @@ def setup_labels(
) -> MutableMapping[str, str]:
labels: MutableMapping[str, str] = self.instance_config.labels.copy()
if self.manager:
labels["runner-manager"] = self.manager
labels["manager"] = self.manager
labels["status"] = self._sanitize_label_value(runner.status)
labels["busy"] = self._sanitize_label_value(str(runner.busy))
labels["runner_group"] = self._sanitize_label_value(self.runner_group)
if webhook:
labels["repository"] = self._sanitize_label_value(webhook.repository.name)
labels["organization"] = self._sanitize_label_value(
Expand Down Expand Up @@ -252,12 +253,15 @@ def list(self) -> List[Runner]:
zone=self.config.zone,
)
for instance in instances:
labels = instance.labels or {}
if self.manager and "runner-manager" in labels:
runner = Runner(
manager = instance.labels.get("manager", "")
runner_group = instance.labels.get("runner_group", "")
if manager == self.manager and runner_group == self.runner_group:
runner: Runner = Runner(
name=instance.name,
instance_id=instance.name,
busy=False,
runner_group_name=self.runner_group,
busy=bool(instance.labels.get("busy", False)),
created_at=instance.creation_timestamp,
)
runners.append(runner)

Expand Down
7 changes: 4 additions & 3 deletions runner_manager/models/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,10 @@ def configure_instance(self, runner: Runner) -> AwsInstance:
"""Configure instance."""
tags: Sequence[TagTypeDef] = [
TagTypeDef(Key="Name", Value=runner.name),
TagTypeDef(
Key="runner-manager", Value=runner.manager if runner.manager else ""
),
TagTypeDef(Key="manager", Value=runner.manager if runner.manager else ""),
TagTypeDef(Key="runner_group", Value=runner.runner_group_name),
TagTypeDef(Key="busy", Value=str(runner.busy)),
TagTypeDef(Key="status", Value=runner.status),
]
tags.extend(
[TagTypeDef(Key=key, Value=value) for key, value in self.tags.items()]
Expand Down
5 changes: 5 additions & 0 deletions runner_manager/models/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ class Runner(BaseModel):
def __str__(self):
return f"{self.name} (status: {self.status}, busy: {self.busy})"

def __eq__(self, other: object) -> bool:
if not isinstance(other, Runner):
return NotImplemented
return self.id == other.id or self.name == other.name

@classmethod
def find_from_webhook(cls, webhook: WorkflowJobEvent) -> "Runner | None":
"""Find a runner from a webhook payload
Expand Down
22 changes: 16 additions & 6 deletions runner_manager/models/runner_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from githubkit.versions.latest.webhooks import WorkflowJobEvent
from pydantic import BaseModel as PydanticBaseModel
from pydantic import Field as PydanticField
from pydantic import validator
from pydantic import root_validator, validator
from redis_om import Field, NotFoundError
from typing_extensions import Annotated

Expand Down Expand Up @@ -68,11 +68,11 @@ def __str__(self) -> str:
f"current: {len(self.get_runners())}, queued: {self.queued})"
)

def __post_init_post_parse__(self):
"""Post init."""
super().__post_init_post_parse__()
if self.backend.manager is None:
self.backend.manager = self.manager
@root_validator(skip_on_failure=True)
def setup_backend(cls, values):
values["backend"].manager = values["manager"]
values["backend"].runner_group = values["name"]
return values

@validator("name")
def validate_name(cls, v):
Expand Down Expand Up @@ -313,6 +313,16 @@ def healthcheck(
):
"""Healthcheck runner group."""
runners = self.get_runners()
backend_runners = self.backend.list()
# Check if there's a runner that is not in the database
if len(runners) != len(backend_runners):
for backend_runner in backend_runners:
if backend_runner not in runners:
# A runner has leaked from the database/runner manager
# let's save it and add it to the list of runners
backend_runner.save()
runners.append(backend_runner)

for runner in runners:
runner.update_from_github(github)
if runner.time_to_live_expired(time_to_live):
Expand Down
5 changes: 3 additions & 2 deletions tests/unit/backend/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def aws_group(settings) -> RunnerGroup:
subnet_id = os.getenv("AWS_SUBNET_ID", "")
runner_group: RunnerGroup = RunnerGroup(
id=3,
name="test",
name="default",
organization="test",
manager=settings.name,
backend=AWSBackend(
Expand Down Expand Up @@ -79,7 +79,8 @@ def test_create_delete(aws_runner, aws_group):
@mark.skipif(not os.getenv("AWS_ACCESS_KEY_ID"), reason="AWS credentials not found")
def test_list(aws_runner, aws_group):
runner = aws_group.backend.create(aws_runner)
assert runner in aws_group.backend.list()
runners = aws_group.backend.list()
assert runner in runners
aws_group.backend.delete(runner)
with raises(NotFoundError):
aws_group.backend.get(runner.instance_id)
Expand Down
74 changes: 40 additions & 34 deletions tests/unit/backend/test_gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@


@fixture()
def gcp_group(settings, monkeypatch) -> RunnerGroup:
def gcp_group(settings) -> RunnerGroup:
config = GCPConfig(
project_id=os.environ.get("CLOUDSDK_CORE_PROJECT", ""),
project_id=os.environ.get("GOOGLE_CLOUD_PROJECT", ""),
region=os.environ.get("CLOUDSDK_COMPUTE_REGION", ""),
zone=os.environ.get("CLOUDSDK_COMPUTE_ZONE", ""),
google_application_credentials=os.environ.get(
Expand All @@ -43,13 +43,18 @@ def gcp_group(settings, monkeypatch) -> RunnerGroup:
"label",
],
)
return runner_group


@fixture()
def fake_gcp_group(gcp_group, monkeypatch):
fake_image = Image(
self_link="my_image_link",
source_image="my_image",
)

monkeypatch.setattr(GCPBackend, "image", fake_image)
return runner_group
return gcp_group


@fixture()
Expand All @@ -59,25 +64,25 @@ def gcp_runner(runner: Runner, gcp_group: RunnerGroup) -> Runner:
return runner


def test_gcp_network_interfaces(gcp_group: RunnerGroup):
interfaces: List[NetworkInterface] = gcp_group.backend.network_interfaces
def test_gcp_network_interfaces(fake_gcp_group: RunnerGroup):
interfaces: List[NetworkInterface] = fake_gcp_group.backend.network_interfaces
assert len(interfaces) == 1
assert "default" in gcp_group.backend.network_interfaces[0].subnetwork
assert "default" in fake_gcp_group.backend.network_interfaces[0].subnetwork
assert interfaces[0].access_configs[0].name == "External NAT"
# Test disabling external IP
gcp_group.backend.instance_config.enable_external_ip = False
interfaces: List[NetworkInterface] = gcp_group.backend.network_interfaces
fake_gcp_group.backend.instance_config.enable_external_ip = False
interfaces: List[NetworkInterface] = fake_gcp_group.backend.network_interfaces
assert len(interfaces) == 1
assert len(interfaces[0].access_configs) == 0


def test_gcp_group(gcp_group: RunnerGroup):
gcp_group.save()
gcp_group.delete(gcp_group.pk)
def test_gcp_group(fake_gcp_group: RunnerGroup):
fake_gcp_group.save()
fake_gcp_group.delete(fake_gcp_group.pk)


def test_gcp_metadata(runner: Runner, gcp_group):
metadata = gcp_group.backend.configure_metadata(runner)
def test_gcp_metadata(runner: Runner, fake_gcp_group):
metadata = fake_gcp_group.backend.configure_metadata(runner)

# Assert metadata are properly set
startup: bool = False
Expand All @@ -92,8 +97,8 @@ def test_gcp_metadata(runner: Runner, gcp_group):
assert startup is True


def test_gcp_setup_labels(runner: Runner, gcp_group: RunnerGroup):
labels = gcp_group.backend.setup_labels(runner)
def test_gcp_setup_labels(runner: Runner, fake_gcp_group: RunnerGroup):
labels = fake_gcp_group.backend.setup_labels(runner)
assert labels["status"] == runner.status
assert labels["busy"] == str(runner.busy).lower()
assert labels["key"] == "value"
Expand Down Expand Up @@ -125,45 +130,45 @@ def test_gcp_setup_labels_with_webhook(webhook: WorkflowJobEvent):
assert "workflow" not in labels.keys()


def test_gcp_spot_config(runner: Runner, gcp_group: RunnerGroup):
gcp_group.backend.instance_config.spot = True
scheduling = gcp_group.backend.scheduling
def test_gcp_spot_config(runner: Runner, fake_gcp_group: RunnerGroup):
fake_gcp_group.backend.instance_config.spot = True
scheduling = fake_gcp_group.backend.scheduling
assert scheduling.provisioning_model == "SPOT"
assert scheduling.instance_termination_action == "DELETE"
gcp_group.backend.instance_config.spot = False
scheduling = gcp_group.backend.scheduling
fake_gcp_group.backend.instance_config.spot = False
scheduling = fake_gcp_group.backend.scheduling
assert scheduling.provisioning_model == "STANDARD"
assert scheduling.instance_termination_action == "DEFAULT"


def test_gcp_disks(runner: Runner, gcp_group: RunnerGroup):
def test_gcp_disks(runner: Runner, fake_gcp_group: RunnerGroup):
# patch self.image.self_link to return a fake image

disks = gcp_group.backend.disks
zone = gcp_group.backend.config.zone
disk_type = gcp_group.backend.instance_config.disk_type
disks = fake_gcp_group.backend.disks
zone = fake_gcp_group.backend.config.zone
disk_type = fake_gcp_group.backend.instance_config.disk_type
assert len(disks) == 1
assert (
disks[0].initialize_params.disk_size_gb
== gcp_group.backend.instance_config.disk_size_gb
== fake_gcp_group.backend.instance_config.disk_size_gb
)
assert disks[0].boot is True
assert disks[0].auto_delete is True
assert disks[0].initialize_params.disk_type == f"zones/{zone}/diskTypes/{disk_type}"


def test_gcp_instance(runner: Runner, gcp_group: RunnerGroup):
instance = gcp_group.backend.configure_instance(runner)
def test_gcp_instance(runner: Runner, fake_gcp_group: RunnerGroup):
instance = fake_gcp_group.backend.configure_instance(runner)
assert instance.name == runner.name


def test_sanitize_label(gcp_group: RunnerGroup):
assert "test" == gcp_group.backend._sanitize_label_value("test")
assert "42" == gcp_group.backend._sanitize_label_value(42)
assert "42" == gcp_group.backend._sanitize_label_value(42.0)
assert "" == gcp_group.backend._sanitize_label_value(None)
assert "test" == gcp_group.backend._sanitize_label_value("-test-")
assert "" == gcp_group.backend._sanitize_label_value(float("nan"))
def test_sanitize_label(fake_gcp_group: RunnerGroup):
assert "test" == fake_gcp_group.backend._sanitize_label_value("test")
assert "42" == fake_gcp_group.backend._sanitize_label_value(42)
assert "42" == fake_gcp_group.backend._sanitize_label_value(42.0)
assert "" == fake_gcp_group.backend._sanitize_label_value(None)
assert "test" == fake_gcp_group.backend._sanitize_label_value("-test-")
assert "" == fake_gcp_group.backend._sanitize_label_value(float("nan"))


@mark.skipif(
Expand Down Expand Up @@ -209,6 +214,7 @@ def test_list(gcp_runner, gcp_group):
runner = gcp_group.backend.create(gcp_runner)
runners: List[Runner] = gcp_group.backend.list()
assert any(runner.name == r.name for r in runners)
assert len(runners) == 1
gcp_group.backend.delete(runner)
with raises(NotFoundError):
gcp_group.backend.get(runner.instance_id)
Loading

0 comments on commit a8d6d33

Please sign in to comment.