diff --git a/libs/libcommon/src/libcommon/queue/jobs.py b/libs/libcommon/src/libcommon/queue/jobs.py index d882295d3e..b256a655af 100644 --- a/libs/libcommon/src/libcommon/queue/jobs.py +++ b/libs/libcommon/src/libcommon/queue/jobs.py @@ -296,7 +296,7 @@ def add_job( Returns: `JobDocument`: the new job added to the queue """ - increase_metric(job_type=job_type, status=Status.WAITING, difficulty=difficulty) + increase_metric(dataset=dataset, job_type=job_type, status=Status.WAITING, difficulty=difficulty) return JobDocument( type=job_type, dataset=dataset, @@ -348,7 +348,9 @@ def create_jobs(self, job_infos: list[JobInfo]) -> int: for job_info in job_infos ] for job in jobs: - increase_metric(job_type=job.type, status=Status.WAITING, difficulty=job.difficulty) + increase_metric( + dataset=job.dataset, job_type=job.type, status=Status.WAITING, difficulty=job.difficulty + ) job_ids = JobDocument.objects.insert(jobs, load_bulk=False) return len(job_ids) except Exception: @@ -368,7 +370,7 @@ def delete_waiting_jobs_by_job_id(self, job_ids: list[str]) -> int: try: existing = JobDocument.objects(pk__in=job_ids, status=Status.WAITING) for job in existing.all(): - decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty) + decrease_metric(dataset=job.type, job_type=job.type, status=job.status, difficulty=job.difficulty) deleted_jobs = existing.delete() return 0 if deleted_jobs is None else deleted_jobs except Exception: @@ -569,6 +571,7 @@ def _start_newest_job_and_delete_others(self, job: JobDocument) -> JobDocument: ): raise AlreadyStartedJobError(f"job {job.unicity_id} has been started by another worker") update_metrics_for_type( + dataset=first_job.dataset, job_type=first_job.type, previous_status=Status.WAITING, new_status=Status.STARTED, @@ -715,7 +718,7 @@ def finish_job(self, job_id: str) -> Optional[Priority]: except StartedJobError as e: logging.error(f"job {job_id} has not the expected format for a started job. Aborting: {e}") return None - decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty) + decrease_metric(dataset=job.dataset, job_type=job.type, status=job.status, difficulty=job.difficulty) if job.started_at is not None: create_past_job( dataset=job.dataset, @@ -740,7 +743,7 @@ def delete_dataset_waiting_jobs(self, dataset: str) -> int: """ existing_waiting_jobs = JobDocument.objects(dataset=dataset, status=Status.WAITING) for job in existing_waiting_jobs.no_cache(): - decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty) + decrease_metric(dataset=job.dataset, job_type=job.type, status=job.status, difficulty=job.difficulty) release_lock(key=job.unicity_id) num_deleted_jobs = existing_waiting_jobs.delete() return 0 if num_deleted_jobs is None else num_deleted_jobs @@ -862,10 +865,18 @@ def get_jobs_count_by_worker_size(self) -> JobsCountByWorkerSize: an object with the total of jobs by worker size. Keys are worker_size, and values are the jobs count. """ + blocked_datasets = get_blocked_datasets() + return { - WorkerSize.heavy.name: JobDocument.objects(difficulty__lte=100, difficulty__gt=70).count(), - WorkerSize.medium.name: JobDocument.objects(difficulty__lte=70, difficulty__gt=40).count(), - WorkerSize.light.name: JobDocument.objects(difficulty__lte=40, difficulty__gt=0).count(), + WorkerSize.heavy.name: JobDocument.objects( + dataset__nin=blocked_datasets, difficulty__lte=100, difficulty__gt=70 + ).count(), + WorkerSize.medium.name: JobDocument.objects( + dataset__nin=blocked_datasets, difficulty__lte=70, difficulty__gt=40 + ).count(), + WorkerSize.light.name: JobDocument.objects( + dataset__nin=blocked_datasets, difficulty__lte=40, difficulty__gt=0 + ).count(), } def get_dump_with_status(self, status: Status, job_type: str) -> list[JobDict]: diff --git a/libs/libcommon/src/libcommon/queue/metrics.py b/libs/libcommon/src/libcommon/queue/metrics.py index 366c1cd172..edff91a39b 100644 --- a/libs/libcommon/src/libcommon/queue/metrics.py +++ b/libs/libcommon/src/libcommon/queue/metrics.py @@ -15,6 +15,7 @@ WORKER_TYPE_JOB_COUNTS_COLLECTION, ) from libcommon.dtos import Status, WorkerSize +from libcommon.queue.dataset_blockages import is_blocked from libcommon.utils import get_datetime # START monkey patching ### hack ### @@ -104,7 +105,7 @@ def get_worker_size(difficulty: int) -> WorkerSize: objects = QuerySetManager["WorkerSizeJobsCountDocument"]() -def _update_metrics(job_type: str, status: str, increase_by: int, difficulty: int) -> None: +def _update_metrics(dataset: str, job_type: str, status: str, increase_by: int, difficulty: int) -> None: JobTotalMetricDocument.objects(job_type=job_type, status=status).update( upsert=True, write_concern={"w": "majority", "fsync": True}, @@ -112,25 +113,33 @@ def _update_metrics(job_type: str, status: str, increase_by: int, difficulty: in inc__total=increase_by, ) if status == Status.WAITING: - worker_size = WorkerSizeJobsCountDocument.get_worker_size(difficulty=difficulty) - WorkerSizeJobsCountDocument.objects(worker_size=worker_size).update( - upsert=True, - write_concern={"w": "majority", "fsync": True}, - read_concern={"level": "majority"}, - inc__jobs_count=increase_by, - ) - - -def increase_metric(job_type: str, status: str, difficulty: int) -> None: - _update_metrics(job_type=job_type, status=status, increase_by=DEFAULT_INCREASE_AMOUNT, difficulty=difficulty) + # Do not consider blocked datasets for auto-scaling metrics + if not is_blocked(dataset): + worker_size = WorkerSizeJobsCountDocument.get_worker_size(difficulty=difficulty) + WorkerSizeJobsCountDocument.objects(worker_size=worker_size).update( + upsert=True, + write_concern={"w": "majority", "fsync": True}, + read_concern={"level": "majority"}, + inc__jobs_count=increase_by, + ) + + +def increase_metric(dataset: str, job_type: str, status: str, difficulty: int) -> None: + _update_metrics( + dataset=dataset, job_type=job_type, status=status, increase_by=DEFAULT_INCREASE_AMOUNT, difficulty=difficulty + ) -def decrease_metric(job_type: str, status: str, difficulty: int) -> None: - _update_metrics(job_type=job_type, status=status, increase_by=DEFAULT_DECREASE_AMOUNT, difficulty=difficulty) +def decrease_metric(dataset: str, job_type: str, status: str, difficulty: int) -> None: + _update_metrics( + dataset=dataset, job_type=job_type, status=status, increase_by=DEFAULT_DECREASE_AMOUNT, difficulty=difficulty + ) -def update_metrics_for_type(job_type: str, previous_status: str, new_status: str, difficulty: int) -> None: +def update_metrics_for_type( + dataset: str, job_type: str, previous_status: str, new_status: str, difficulty: int +) -> None: if job_type is not None: - decrease_metric(job_type=job_type, status=previous_status, difficulty=difficulty) - increase_metric(job_type=job_type, status=new_status, difficulty=difficulty) + decrease_metric(dataset=dataset, job_type=job_type, status=previous_status, difficulty=difficulty) + increase_metric(dataset=dataset, job_type=job_type, status=new_status, difficulty=difficulty) # ^ this does not affect WorkerSizeJobsCountDocument, so we don't pass the job difficulty diff --git a/libs/libcommon/tests/queue/test_jobs.py b/libs/libcommon/tests/queue/test_jobs.py index af5558e1fe..f205a84e5e 100644 --- a/libs/libcommon/tests/queue/test_jobs.py +++ b/libs/libcommon/tests/queue/test_jobs.py @@ -450,14 +450,20 @@ def test_get_jobs_count_by_worker_size() -> None: test_type = "test_type" test_other_type = "test_other_type" test_dataset = "test_dataset" + test_blocked_dataset = "blocked_dataset" test_revision = "test_revision" queue = Queue() + assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 0, "light": 0} + block_dataset(test_blocked_dataset) + queue.add_job(job_type=test_type, dataset=test_blocked_dataset, revision=test_revision, difficulty=50) assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 0, "light": 0} queue.add_job(job_type=test_type, dataset=test_dataset, revision=test_revision, difficulty=50) assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 1, "light": 0} queue.add_job(job_type=test_other_type, dataset=test_dataset, revision=test_revision, difficulty=10) assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 1, "light": 1} + block_dataset(test_dataset) + assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 0, "light": 0} def test_get_dataset_pending_jobs_for_type() -> None: