Skip to content

Commit

Permalink
Improve throughput of utask_main scheduling (#4461)
Browse files Browse the repository at this point in the history
1. Don't defer the tasks, they've already been deferred.
2. Improve the scheduling time by 20% by batching calls so that we only
need to read YAML and query the database once, per queue read.

---------

Co-authored-by: Vitor Guidi <[email protected]>
Co-authored-by: Ali HIJAZI <[email protected]>
  • Loading branch information
3 people committed Jan 8, 2025
1 parent 9d80b52 commit 2e4d029
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 65 deletions.
6 changes: 3 additions & 3 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ def dont_retry(self):
self._pubsub_message.ack()


def get_task_from_message(message) -> Optional[PubSubTask]:
def get_task_from_message(message, can_defer=True) -> Optional[PubSubTask]:
"""Returns a task constructed from the first of |messages| if possible."""
if message is None:
return None
Expand All @@ -525,7 +525,7 @@ def get_task_from_message(message) -> Optional[PubSubTask]:

# Check that this task should be run now (past the ETA). Otherwise we defer
# its execution.
if task.defer():
if can_defer and task.defer():
return None

return task
Expand All @@ -545,7 +545,7 @@ def handle_multiple_utask_main_messages(messages) -> List[PubSubTask]:
bot."""
tasks = []
for message in messages:
task = get_task_from_message(message)
task = get_task_from_message(message, can_defer=False)
if task is None:
continue
tasks.append(task)
Expand Down
129 changes: 73 additions & 56 deletions src/clusterfuzz/_internal/google_cloud_utils/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
"""Cloud Batch helpers."""
import collections
import threading
from typing import Dict
from typing import List
from typing import Tuple
import uuid

from google.cloud import batch_v1 as batch
Expand All @@ -25,6 +27,7 @@
from clusterfuzz._internal.base.tasks import task_utils
from clusterfuzz._internal.config import local_config
from clusterfuzz._internal.datastore import data_types
from clusterfuzz._internal.datastore import ndb_utils
from clusterfuzz._internal.metrics import logs

# TODO(metzman): Change to from . import credentials when we are done
Expand Down Expand Up @@ -105,9 +108,10 @@ def create_uworker_main_batch_job(module, job_type, input_download_url):
def create_uworker_main_batch_jobs(batch_tasks: List[BatchTask]):
"""Creates batch jobs."""
job_specs = collections.defaultdict(list)
specs = _get_specs_from_config(batch_tasks)
for batch_task in batch_tasks:
logs.info(f'Scheduling {batch_task.command}, {batch_task.job_type}.')
spec = _get_spec_from_config(batch_task.command, batch_task.job_type)
spec = specs[(batch_task.command, batch_task.job_type)]
job_specs[spec].append(batch_task.input_download_url)

logs.info('Creating batch jobs.')
Expand Down Expand Up @@ -248,27 +252,33 @@ def is_no_privilege_workload(command, job_name):

def is_remote_task(command, job_name):
try:
_get_spec_from_config(command, job_name)
_get_specs_from_config([BatchTask(command, job_name, None)])
return True
except ValueError:
return False


def _get_config_name(command, job_name):
"""Returns the name of the config for |command| and |job_name|."""
job = _get_job(job_name)
# As of this writing, batch only supports LINUX.
if utils.is_oss_fuzz():
# TODO(b/377885331): In OSS-Fuzz, the platform can't be used because, as of
# it includes the project name.
config_name = 'LINUX'
else:
config_name = job.platform
if command == 'fuzz':
config_name += '-PREEMPTIBLE-UNPRIVILEGED'
else:
config_name += '-NONPREEMPTIBLE-UNPRIVILEGED'
return config_name
def _get_config_names(
batch_tasks: List[BatchTask]) -> Dict[Tuple[str, str], str]:
""""Gets the name of the configs for each batch_task. Returns a dict
that is indexed by command and job_type for efficient lookup."""
job_names = {task.job_type for task in batch_tasks}
query = data_types.Job.query(data_types.Job.name.IN(list(job_names)))
jobs = ndb_utils.get_all_from_query(query)
job_map = {job.name: job for job in jobs}
config_map = {}
for task in batch_tasks:
if task.job_type not in job_map:
logs.error(f'{task.job_type} doesn\'t exist.')
continue
if task.command == 'fuzz':
suffix = '-PREEMPTIBLE-UNPRIVILEGED'
else:
suffix = '-NONPREEMPTIBLE-UNPRIVILEGED'
job = job_map[task.job_type]
platform = job.platform if not utils.is_oss_fuzz() else 'LINUX'
config_map[(task.command, task.job_type)] = f'{platform}{suffix}'
return config_map


def _get_task_duration(command):
Expand All @@ -292,44 +302,51 @@ def _get_subconfig(batch_config, instance_spec):
return all_subconfigs[weighted_subconfig.name]


def _get_spec_from_config(command, job_name):
def _get_specs_from_config(batch_tasks):
"""Gets the configured specifications for a batch workload."""
config_name = _get_config_name(command, job_name)
batch_config = _get_batch_config()
instance_spec = batch_config.get('mapping').get(config_name, None)
if instance_spec is None:
raise ValueError(f'No mapping for {config_name}')
subconfig = _get_subconfig(batch_config, instance_spec)
project_name = batch_config.get('project')
docker_image = instance_spec['docker_image']
user_data = instance_spec['user_data']
should_retry = instance_spec.get('retry', False)
clusterfuzz_release = instance_spec.get('clusterfuzz_release', 'prod')

# Lower numbers are lower priority. From:
# https://cloud.google.com/batch/docs/reference/rest/v1/projects.locations.jobs
low_priority = command == 'fuzz'
priority = 0 if low_priority else 1

max_run_duration = f'{_get_task_duration(command)}s'
if command == 'corpus_pruning':
should_retry = False # It is naturally retried the next day.

spec = BatchWorkloadSpec(
clusterfuzz_release=clusterfuzz_release,
docker_image=docker_image,
user_data=user_data,
disk_size_gb=instance_spec['disk_size_gb'],
disk_type=instance_spec['disk_type'],
service_account_email=instance_spec['service_account_email'],
gce_region=subconfig['region'],
project=project_name,
network=subconfig['network'],
subnetwork=subconfig['subnetwork'],
preemptible=instance_spec['preemptible'],
machine_type=instance_spec['machine_type'],
priority=priority,
max_run_duration=max_run_duration,
retry=should_retry,
)
return spec
config_map = _get_config_names(batch_tasks)
specs = {}
subconfig_map = {}
for task in batch_tasks:
if (task.command, task.job_type) in specs:
# Don't repeat work for no reason.
continue
config_name = config_map[(task.command, task.job_type)]

instance_spec = batch_config.get('mapping').get(config_name)
if instance_spec is None:
raise ValueError(f'No mapping for {config_name}')
config_name = config_map[(task.command, task.job_type)]
project_name = batch_config.get('project')
clusterfuzz_release = instance_spec.get('clusterfuzz_release', 'prod')
# Lower numbers are a lower priority, meaning less likely to run From:
# https://cloud.google.com/batch/docs/reference/rest/v1/projects.locations.jobs
priority = 0 if task.command == 'fuzz' else 1
max_run_duration = f'{_get_task_duration(task.command)}s'
# This saves us time and reduces fragementation, e.g. every linux fuzz task
# run in this call will run in the same zone.
if config_name not in subconfig_map:
subconfig = _get_subconfig(batch_config, instance_spec)
subconfig_map[config_name] = subconfig

subconfig = subconfig_map[config_name]
spec = BatchWorkloadSpec(
docker_image=instance_spec['docker_image'],
disk_size_gb=instance_spec['disk_size_gb'],
disk_type=instance_spec['disk_type'],
user_data=instance_spec['user_data'],
service_account_email=instance_spec['service_account_email'],
preemptible=instance_spec['preemptible'],
machine_type=instance_spec['machine_type'],
gce_region=subconfig['region'],
network=subconfig['network'],
subnetwork=subconfig['subnetwork'],
project=project_name,
clusterfuzz_release=clusterfuzz_release,
priority=priority,
max_run_duration=max_run_duration,
should_retry=instance_spec.get('retry', False),
)
specs[(task.command, task.job_type)] = spec
return specs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@


@test_utils.with_cloud_emulators('datastore')
class GetSpecFromConfigTest(unittest.TestCase):
class GetSpecsFromConfigTest(unittest.TestCase):
"""Tests for get_spec_from_config."""

def setUp(self):
Expand All @@ -42,7 +42,7 @@ def setUp(self):
def test_nonpreemptible(self):
"""Tests that get_spec_from_config works for non-preemptibles as
expected."""
spec = batch._get_spec_from_config('analyze', self.job.name)
spec = _get_spec_from_config('analyze', self.job.name)
expected_spec = batch.BatchWorkloadSpec(
clusterfuzz_release='prod',
docker_image='gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654',
Expand All @@ -68,7 +68,7 @@ def test_fuzz_get_spec_from_config(self):
"""Tests that get_spec_from_config works for fuzz tasks as expected."""
job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX')
job.put()
spec = batch._get_spec_from_config('fuzz', job.name) # pylint: disable=protected-access
spec = _get_spec_from_config('fuzz', job.name)
expected_spec = batch.BatchWorkloadSpec(
clusterfuzz_release='prod',
docker_image='gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654',
Expand All @@ -93,13 +93,19 @@ def test_fuzz_get_spec_from_config(self):
def test_corpus_pruning(self):
"""Tests that corpus pruning uses a spec of 24 hours and a different one
than normal."""
pruning_spec = batch._get_spec_from_config('corpus_pruning', self.job.name)
pruning_spec = _get_spec_from_config('corpus_pruning', self.job.name)
self.assertEqual(pruning_spec.max_run_duration, f'{24 * 60 * 60}s')
normal_spec = batch._get_spec_from_config('analyze', self.job.name)
normal_spec = _get_spec_from_config('analyze', self.job.name)
self.assertNotEqual(pruning_spec, normal_spec)
job = data_types.Job(name='libfuzzer_chrome_msan', platform='LINUX')
job.put()
# This behavior is important for grouping batch alike tasks into a single
# batch job.
pruning_spec2 = batch._get_spec_from_config('corpus_pruning', job.name)
pruning_spec2 = _get_spec_from_config('corpus_pruning', job.name)
self.assertEqual(pruning_spec, pruning_spec2)


def _get_spec_from_config(command, job_name):
return list(
batch._get_specs_from_config([batch.BatchTask(command, job_name,
None)]).values())[0]

0 comments on commit 2e4d029

Please sign in to comment.