Skip to content

Commit

Permalink
Ignore blocked datasets in WorkerSize metrics for auto scaling (#2949)
Browse files Browse the repository at this point in the history
* Ignore blocked datasets

* Add test for blocked datasets

* Apply code review observation
  • Loading branch information
AndreaFrancis authored Jun 26, 2024
1 parent eea774d commit 21cea15
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 25 deletions.
27 changes: 19 additions & 8 deletions libs/libcommon/src/libcommon/queue/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def add_job(
Returns:
`JobDocument`: the new job added to the queue
"""
increase_metric(job_type=job_type, status=Status.WAITING, difficulty=difficulty)
increase_metric(dataset=dataset, job_type=job_type, status=Status.WAITING, difficulty=difficulty)
return JobDocument(
type=job_type,
dataset=dataset,
Expand Down Expand Up @@ -348,7 +348,9 @@ def create_jobs(self, job_infos: list[JobInfo]) -> int:
for job_info in job_infos
]
for job in jobs:
increase_metric(job_type=job.type, status=Status.WAITING, difficulty=job.difficulty)
increase_metric(
dataset=job.dataset, job_type=job.type, status=Status.WAITING, difficulty=job.difficulty
)
job_ids = JobDocument.objects.insert(jobs, load_bulk=False)
return len(job_ids)
except Exception:
Expand All @@ -368,7 +370,7 @@ def delete_waiting_jobs_by_job_id(self, job_ids: list[str]) -> int:
try:
existing = JobDocument.objects(pk__in=job_ids, status=Status.WAITING)
for job in existing.all():
decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty)
decrease_metric(dataset=job.type, job_type=job.type, status=job.status, difficulty=job.difficulty)
deleted_jobs = existing.delete()
return 0 if deleted_jobs is None else deleted_jobs
except Exception:
Expand Down Expand Up @@ -569,6 +571,7 @@ def _start_newest_job_and_delete_others(self, job: JobDocument) -> JobDocument:
):
raise AlreadyStartedJobError(f"job {job.unicity_id} has been started by another worker")
update_metrics_for_type(
dataset=first_job.dataset,
job_type=first_job.type,
previous_status=Status.WAITING,
new_status=Status.STARTED,
Expand Down Expand Up @@ -715,7 +718,7 @@ def finish_job(self, job_id: str) -> Optional[Priority]:
except StartedJobError as e:
logging.error(f"job {job_id} has not the expected format for a started job. Aborting: {e}")
return None
decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty)
decrease_metric(dataset=job.dataset, job_type=job.type, status=job.status, difficulty=job.difficulty)
if job.started_at is not None:
create_past_job(
dataset=job.dataset,
Expand All @@ -740,7 +743,7 @@ def delete_dataset_waiting_jobs(self, dataset: str) -> int:
"""
existing_waiting_jobs = JobDocument.objects(dataset=dataset, status=Status.WAITING)
for job in existing_waiting_jobs.no_cache():
decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty)
decrease_metric(dataset=job.dataset, job_type=job.type, status=job.status, difficulty=job.difficulty)
release_lock(key=job.unicity_id)
num_deleted_jobs = existing_waiting_jobs.delete()
return 0 if num_deleted_jobs is None else num_deleted_jobs
Expand Down Expand Up @@ -862,10 +865,18 @@ def get_jobs_count_by_worker_size(self) -> JobsCountByWorkerSize:
an object with the total of jobs by worker size.
Keys are worker_size, and values are the jobs count.
"""
blocked_datasets = get_blocked_datasets()

return {
WorkerSize.heavy.name: JobDocument.objects(difficulty__lte=100, difficulty__gt=70).count(),
WorkerSize.medium.name: JobDocument.objects(difficulty__lte=70, difficulty__gt=40).count(),
WorkerSize.light.name: JobDocument.objects(difficulty__lte=40, difficulty__gt=0).count(),
WorkerSize.heavy.name: JobDocument.objects(
dataset__nin=blocked_datasets, difficulty__lte=100, difficulty__gt=70
).count(),
WorkerSize.medium.name: JobDocument.objects(
dataset__nin=blocked_datasets, difficulty__lte=70, difficulty__gt=40
).count(),
WorkerSize.light.name: JobDocument.objects(
dataset__nin=blocked_datasets, difficulty__lte=40, difficulty__gt=0
).count(),
}

def get_dump_with_status(self, status: Status, job_type: str) -> list[JobDict]:
Expand Down
43 changes: 26 additions & 17 deletions libs/libcommon/src/libcommon/queue/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
WORKER_TYPE_JOB_COUNTS_COLLECTION,
)
from libcommon.dtos import Status, WorkerSize
from libcommon.queue.dataset_blockages import is_blocked
from libcommon.utils import get_datetime

# START monkey patching ### hack ###
Expand Down Expand Up @@ -104,33 +105,41 @@ def get_worker_size(difficulty: int) -> WorkerSize:
objects = QuerySetManager["WorkerSizeJobsCountDocument"]()


def _update_metrics(job_type: str, status: str, increase_by: int, difficulty: int) -> None:
def _update_metrics(dataset: str, job_type: str, status: str, increase_by: int, difficulty: int) -> None:
JobTotalMetricDocument.objects(job_type=job_type, status=status).update(
upsert=True,
write_concern={"w": "majority", "fsync": True},
read_concern={"level": "majority"},
inc__total=increase_by,
)
if status == Status.WAITING:
worker_size = WorkerSizeJobsCountDocument.get_worker_size(difficulty=difficulty)
WorkerSizeJobsCountDocument.objects(worker_size=worker_size).update(
upsert=True,
write_concern={"w": "majority", "fsync": True},
read_concern={"level": "majority"},
inc__jobs_count=increase_by,
)


def increase_metric(job_type: str, status: str, difficulty: int) -> None:
_update_metrics(job_type=job_type, status=status, increase_by=DEFAULT_INCREASE_AMOUNT, difficulty=difficulty)
# Do not consider blocked datasets for auto-scaling metrics
if not is_blocked(dataset):
worker_size = WorkerSizeJobsCountDocument.get_worker_size(difficulty=difficulty)
WorkerSizeJobsCountDocument.objects(worker_size=worker_size).update(
upsert=True,
write_concern={"w": "majority", "fsync": True},
read_concern={"level": "majority"},
inc__jobs_count=increase_by,
)


def increase_metric(dataset: str, job_type: str, status: str, difficulty: int) -> None:
_update_metrics(
dataset=dataset, job_type=job_type, status=status, increase_by=DEFAULT_INCREASE_AMOUNT, difficulty=difficulty
)


def decrease_metric(job_type: str, status: str, difficulty: int) -> None:
_update_metrics(job_type=job_type, status=status, increase_by=DEFAULT_DECREASE_AMOUNT, difficulty=difficulty)
def decrease_metric(dataset: str, job_type: str, status: str, difficulty: int) -> None:
_update_metrics(
dataset=dataset, job_type=job_type, status=status, increase_by=DEFAULT_DECREASE_AMOUNT, difficulty=difficulty
)


def update_metrics_for_type(job_type: str, previous_status: str, new_status: str, difficulty: int) -> None:
def update_metrics_for_type(
dataset: str, job_type: str, previous_status: str, new_status: str, difficulty: int
) -> None:
if job_type is not None:
decrease_metric(job_type=job_type, status=previous_status, difficulty=difficulty)
increase_metric(job_type=job_type, status=new_status, difficulty=difficulty)
decrease_metric(dataset=dataset, job_type=job_type, status=previous_status, difficulty=difficulty)
increase_metric(dataset=dataset, job_type=job_type, status=new_status, difficulty=difficulty)
# ^ this does not affect WorkerSizeJobsCountDocument, so we don't pass the job difficulty
6 changes: 6 additions & 0 deletions libs/libcommon/tests/queue/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,14 +450,20 @@ def test_get_jobs_count_by_worker_size() -> None:
test_type = "test_type"
test_other_type = "test_other_type"
test_dataset = "test_dataset"
test_blocked_dataset = "blocked_dataset"
test_revision = "test_revision"
queue = Queue()

assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 0, "light": 0}
block_dataset(test_blocked_dataset)
queue.add_job(job_type=test_type, dataset=test_blocked_dataset, revision=test_revision, difficulty=50)
assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 0, "light": 0}
queue.add_job(job_type=test_type, dataset=test_dataset, revision=test_revision, difficulty=50)
assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 1, "light": 0}
queue.add_job(job_type=test_other_type, dataset=test_dataset, revision=test_revision, difficulty=10)
assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 1, "light": 1}
block_dataset(test_dataset)
assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 0, "light": 0}


def test_get_dataset_pending_jobs_for_type() -> None:
Expand Down

0 comments on commit 21cea15

Please sign in to comment.