Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore blocked datasets in WorkerSize metrics for auto scaling #2949

Merged
merged 4 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions libs/libcommon/src/libcommon/queue/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def add_job(
Returns:
`JobDocument`: the new job added to the queue
"""
increase_metric(job_type=job_type, status=Status.WAITING, difficulty=difficulty)
increase_metric(dataset=dataset, job_type=job_type, status=Status.WAITING, difficulty=difficulty)
return JobDocument(
type=job_type,
dataset=dataset,
Expand Down Expand Up @@ -348,7 +348,9 @@ def create_jobs(self, job_infos: list[JobInfo]) -> int:
for job_info in job_infos
]
for job in jobs:
increase_metric(job_type=job.type, status=Status.WAITING, difficulty=job.difficulty)
increase_metric(
dataset=job.dataset, job_type=job.type, status=Status.WAITING, difficulty=job.difficulty
)
job_ids = JobDocument.objects.insert(jobs, load_bulk=False)
return len(job_ids)
except Exception:
Expand All @@ -368,7 +370,7 @@ def delete_waiting_jobs_by_job_id(self, job_ids: list[str]) -> int:
try:
existing = JobDocument.objects(pk__in=job_ids, status=Status.WAITING)
for job in existing.all():
decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty)
decrease_metric(dataset=job.type, job_type=job.type, status=job.status, difficulty=job.difficulty)
deleted_jobs = existing.delete()
return 0 if deleted_jobs is None else deleted_jobs
except Exception:
Expand Down Expand Up @@ -569,6 +571,7 @@ def _start_newest_job_and_delete_others(self, job: JobDocument) -> JobDocument:
):
raise AlreadyStartedJobError(f"job {job.unicity_id} has been started by another worker")
update_metrics_for_type(
dataset=first_job.dataset,
job_type=first_job.type,
previous_status=Status.WAITING,
new_status=Status.STARTED,
Expand Down Expand Up @@ -715,7 +718,7 @@ def finish_job(self, job_id: str) -> Optional[Priority]:
except StartedJobError as e:
logging.error(f"job {job_id} has not the expected format for a started job. Aborting: {e}")
return None
decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty)
decrease_metric(dataset=job.dataset, job_type=job.type, status=job.status, difficulty=job.difficulty)
if job.started_at is not None:
create_past_job(
dataset=job.dataset,
Expand All @@ -740,7 +743,7 @@ def delete_dataset_waiting_jobs(self, dataset: str) -> int:
"""
existing_waiting_jobs = JobDocument.objects(dataset=dataset, status=Status.WAITING)
for job in existing_waiting_jobs.no_cache():
decrease_metric(job_type=job.type, status=job.status, difficulty=job.difficulty)
decrease_metric(dataset=job.dataset, job_type=job.type, status=job.status, difficulty=job.difficulty)
release_lock(key=job.unicity_id)
num_deleted_jobs = existing_waiting_jobs.delete()
return 0 if num_deleted_jobs is None else num_deleted_jobs
Expand Down Expand Up @@ -862,10 +865,18 @@ def get_jobs_count_by_worker_size(self) -> JobsCountByWorkerSize:
an object with the total of jobs by worker size.
Keys are worker_size, and values are the jobs count.
"""
blocked_datasets = get_blocked_datasets()

return {
WorkerSize.heavy.name: JobDocument.objects(difficulty__lte=100, difficulty__gt=70).count(),
WorkerSize.medium.name: JobDocument.objects(difficulty__lte=70, difficulty__gt=40).count(),
WorkerSize.light.name: JobDocument.objects(difficulty__lte=40, difficulty__gt=0).count(),
WorkerSize.heavy.name: JobDocument.objects(
dataset__nin=blocked_datasets, difficulty__lte=100, difficulty__gt=70
).count(),
WorkerSize.medium.name: JobDocument.objects(
dataset__nin=blocked_datasets, difficulty__lte=70, difficulty__gt=40
).count(),
WorkerSize.light.name: JobDocument.objects(
dataset__nin=blocked_datasets, difficulty__lte=40, difficulty__gt=0
).count(),
}

def get_dump_with_status(self, status: Status, job_type: str) -> list[JobDict]:
Expand Down
43 changes: 26 additions & 17 deletions libs/libcommon/src/libcommon/queue/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
WORKER_TYPE_JOB_COUNTS_COLLECTION,
)
from libcommon.dtos import Status, WorkerSize
from libcommon.queue.dataset_blockages import is_blocked
from libcommon.utils import get_datetime

# START monkey patching ### hack ###
Expand Down Expand Up @@ -104,33 +105,41 @@ def get_worker_size(difficulty: int) -> WorkerSize:
objects = QuerySetManager["WorkerSizeJobsCountDocument"]()


def _update_metrics(job_type: str, status: str, increase_by: int, difficulty: int) -> None:
def _update_metrics(dataset: str, job_type: str, status: str, increase_by: int, difficulty: int) -> None:
JobTotalMetricDocument.objects(job_type=job_type, status=status).update(
upsert=True,
write_concern={"w": "majority", "fsync": True},
read_concern={"level": "majority"},
inc__total=increase_by,
)
if status == Status.WAITING:
worker_size = WorkerSizeJobsCountDocument.get_worker_size(difficulty=difficulty)
WorkerSizeJobsCountDocument.objects(worker_size=worker_size).update(
upsert=True,
write_concern={"w": "majority", "fsync": True},
read_concern={"level": "majority"},
inc__jobs_count=increase_by,
)


def increase_metric(job_type: str, status: str, difficulty: int) -> None:
_update_metrics(job_type=job_type, status=status, increase_by=DEFAULT_INCREASE_AMOUNT, difficulty=difficulty)
# Do not consider blocked datasets for auto-scaling metrics
if not is_blocked(dataset):
worker_size = WorkerSizeJobsCountDocument.get_worker_size(difficulty=difficulty)
WorkerSizeJobsCountDocument.objects(worker_size=worker_size).update(
upsert=True,
write_concern={"w": "majority", "fsync": True},
read_concern={"level": "majority"},
inc__jobs_count=increase_by,
)


def increase_metric(dataset: str, job_type: str, status: str, difficulty: int) -> None:
_update_metrics(
dataset=dataset, job_type=job_type, status=status, increase_by=DEFAULT_INCREASE_AMOUNT, difficulty=difficulty
)


def decrease_metric(job_type: str, status: str, difficulty: int) -> None:
_update_metrics(job_type=job_type, status=status, increase_by=DEFAULT_DECREASE_AMOUNT, difficulty=difficulty)
def decrease_metric(dataset: str, job_type: str, status: str, difficulty: int) -> None:
_update_metrics(
dataset=dataset, job_type=job_type, status=status, increase_by=DEFAULT_DECREASE_AMOUNT, difficulty=difficulty
)


def update_metrics_for_type(job_type: str, previous_status: str, new_status: str, difficulty: int) -> None:
def update_metrics_for_type(
dataset: str, job_type: str, previous_status: str, new_status: str, difficulty: int
) -> None:
if job_type is not None:
decrease_metric(job_type=job_type, status=previous_status, difficulty=difficulty)
increase_metric(job_type=job_type, status=new_status, difficulty=difficulty)
decrease_metric(dataset=dataset, job_type=job_type, status=previous_status, difficulty=difficulty)
increase_metric(dataset=dataset, job_type=job_type, status=new_status, difficulty=difficulty)
# ^ this does not affect WorkerSizeJobsCountDocument, so we don't pass the job difficulty
6 changes: 6 additions & 0 deletions libs/libcommon/tests/queue/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,14 +450,20 @@ def test_get_jobs_count_by_worker_size() -> None:
test_type = "test_type"
test_other_type = "test_other_type"
test_dataset = "test_dataset"
test_blocked_dataset = "blocked_dataset"
test_revision = "test_revision"
queue = Queue()

assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 0, "light": 0}
block_dataset(test_blocked_dataset)
queue.add_job(job_type=test_type, dataset=test_blocked_dataset, revision=test_revision, difficulty=50)
assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 0, "light": 0}
queue.add_job(job_type=test_type, dataset=test_dataset, revision=test_revision, difficulty=50)
assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 1, "light": 0}
queue.add_job(job_type=test_other_type, dataset=test_dataset, revision=test_revision, difficulty=10)
assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 1, "light": 1}
block_dataset(test_dataset)
assert queue.get_jobs_count_by_worker_size() == {"heavy": 0, "medium": 0, "light": 0}


def test_get_dataset_pending_jobs_for_type() -> None:
Expand Down