From 126d1c5ca564a7b1d9bbaeeb3718189b7ec91ff2 Mon Sep 17 00:00:00 2001 From: Vitor Guidi Date: Sat, 28 Dec 2024 12:08:13 -0300 Subject: [PATCH] Revert 4564 and 4528 (#4568) The preprocess count for fuzz task went to zero after #4564 got deployed, reverting. #4528 is also being reverted because it introduced the following error into the fuzz task scheduler, which caused fuzz tasks to stop being scheduled: ``` Traceback (most recent call last): File "/mnt/scratch0/clusterfuzz/src/python/bot/startup/run_cron.py", line 68, in sys.exit(main()) ^^^^^^ File "/mnt/scratch0/clusterfuzz/src/python/bot/startup/run_cron.py", line 64, in main return 0 if task_module.main() else 1 ^^^^^^^^^^^^^^^^^^ File "/mnt/scratch0/clusterfuzz/src/clusterfuzz/_internal/cron/schedule_fuzz.py", line 304, in main return schedule_fuzz_tasks() ^^^^^^^^^^^^^^^^^^^^^ File "/mnt/scratch0/clusterfuzz/src/clusterfuzz/_internal/cron/schedule_fuzz.py", line 284, in schedule_fuzz_tasks available_cpus = get_available_cpus(project, regions) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/mnt/scratch0/clusterfuzz/src/clusterfuzz/_internal/cron/schedule_fuzz.py", line 247, in get_available_cpus result = pool.starmap_async( # pylint: disable=no-member ^^^^^^^^^^^^^^^^^^ AttributeError: 'ProcessPoolExecutor' object has no attribute 'starmap_async' ``` --- src/clusterfuzz/_internal/base/concurrency.py | 23 ++- .../_internal/base/tasks/__init__.py | 59 +------ .../_internal/cron/schedule_fuzz.py | 148 ++++-------------- .../_internal/google_cloud_utils/batch.py | 21 +-- .../_internal/google_cloud_utils/storage.py | 28 ++-- .../handlers/cron/schedule_fuzz_test.py | 8 +- src/python/bot/startup/run_bot.py | 3 + 7 files changed, 75 insertions(+), 215 deletions(-) diff --git a/src/clusterfuzz/_internal/base/concurrency.py b/src/clusterfuzz/_internal/base/concurrency.py index 52ce2d4b78..36fcb39e47 100644 --- a/src/clusterfuzz/_internal/base/concurrency.py +++ b/src/clusterfuzz/_internal/base/concurrency.py @@ -21,8 +21,22 @@ POOL_SIZE = multiprocessing.cpu_count() +class SingleThreadPool: + """Single thread pool for when it's not worth using Python's thread + implementation.""" + + def __init__(self, size): + del size + + def map(self, f, l): + return list(map(f, l)) + + def imap_unordered(self, f, l): + return list(map(f, l)) + + @contextlib.contextmanager -def make_pool(pool_size=POOL_SIZE, max_pool_size=None): +def make_pool(pool_size=POOL_SIZE, cpu_bound=False, max_pool_size=None): """Returns a pool that can (usually) execute tasks concurrently.""" if max_pool_size is not None: pool_size = min(pool_size, max_pool_size) @@ -30,9 +44,12 @@ def make_pool(pool_size=POOL_SIZE, max_pool_size=None): # Don't use processes on Windows and unittests to avoid hangs. if (environment.get_value('PY_UNITTESTS') or environment.platform() == 'WINDOWS'): - yield futures.ThreadPoolExecutor(pool_size) + if cpu_bound: + yield SingleThreadPool(pool_size) + else: + yield futures.ThreadPoolExecutor(pool_size) else: - yield futures.ProcessPoolExecutor(pool_size) + yield multiprocessing.Pool(pool_size) # TODO(metzman): Find out if batching makes things even faster. diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index 5cd85efd81..fc23db74a2 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -48,9 +48,7 @@ MAX_LEASED_TASKS_LIMIT = 1000 MAX_TASKS_LIMIT = 100000 -# The stated limit is 1000, but in reality meassages do not get delivered -# around this limit. We should probably switch to the real client library. -MAX_PUBSUB_MESSAGES_PER_REQ = 250 +MAX_PUBSUB_MESSAGES_PER_REQ = 1000 # Various variables for task leasing and completion times (in seconds). TASK_COMPLETION_BUFFER = 90 * 60 @@ -322,7 +320,7 @@ def get_preprocess_task(): messages = pubsub_puller.get_messages(max_messages=1) if not messages: return None - task = get_task_from_message(messages[0], task_cls=PubSubTTask) + task = get_task_from_message(messages[0]) if task: logs.info('Pulled from preprocess queue.') return task @@ -516,45 +514,12 @@ def dont_retry(self): self._pubsub_message.ack() -class PubSubTTask(PubSubTask): - """TTask that won't repeat on timeout.""" - TTASK_TIMEOUT = 30 * 60 - - @contextlib.contextmanager - def lease(self, _event=None): # pylint: disable=arguments-differ - """Maintain a lease for the task.""" - task_lease_timeout = TASK_LEASE_SECONDS_BY_COMMAND.get( - self.command, get_task_lease_timeout()) - - environment.set_value('TASK_LEASE_SECONDS', task_lease_timeout) - track_task_start(self, task_lease_timeout) - if _event is None: - _event = threading.Event() - if self.command != 'fuzz': - leaser_thread = _PubSubLeaserThread(self._pubsub_message, _event, - task_lease_timeout) - else: - leaser_thread = _PubSubLeaserThread( - self._pubsub_message, _event, self.TTASK_TIMEOUT, ack_on_timeout=True) - leaser_thread.start() - try: - yield leaser_thread - finally: - _event.set() - leaser_thread.join() - - # If we get here the task succeeded in running. Acknowledge the message. - self._pubsub_message.ack() - track_task_end() - - -def get_task_from_message(message, can_defer=True, - task_cls=None) -> Optional[PubSubTask]: +def get_task_from_message(message, can_defer=True) -> Optional[PubSubTask]: """Returns a task constructed from the first of |messages| if possible.""" if message is None: return None try: - task = initialize_task(message, task_cls=task_cls) + task = initialize_task(message) except KeyError: logs.error('Received an invalid task, discarding...') message.ack() @@ -593,13 +558,11 @@ def handle_multiple_utask_main_messages(messages) -> List[PubSubTask]: return tasks -def initialize_task(message, task_cls=None) -> PubSubTask: +def initialize_task(message) -> PubSubTask: """Creates a task from |messages|.""" - if task_cls is None: - task_cls = PubSubTask if message.attributes.get('eventType') != 'OBJECT_FINALIZE': - return task_cls(message) + return PubSubTask(message) # Handle postprocess task. # The GCS API for pub/sub notifications uses the data field unlike @@ -633,18 +596,13 @@ class _PubSubLeaserThread(threading.Thread): EXTENSION_TIME_SECONDS = 10 * 60 # 10 minutes. - def __init__(self, - message, - done_event, - max_lease_seconds, - ack_on_timeout=False): + def __init__(self, message, done_event, max_lease_seconds): super().__init__() self.daemon = True self._message = message self._done_event = done_event self._max_lease_seconds = max_lease_seconds - self._ack_on_timeout = ack_on_timeout def run(self): """Run the leaser thread.""" @@ -656,9 +614,6 @@ def run(self): if time_left <= 0: logs.info('Lease reached maximum lease time of {} seconds, ' 'stopping renewal.'.format(self._max_lease_seconds)) - if self._ack_on_timeout: - logs.info('Acking on timeout') - self._message.ack() break extension_seconds = min(self.EXTENSION_TIME_SECONDS, time_left) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index 3b01cd1cec..44c4b8278b 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -14,68 +14,31 @@ """Cron job to schedule fuzz tasks that run on batch.""" import collections -import multiprocessing import random import time from typing import Dict -from typing import List -from google.cloud import monitoring_v3 from googleapiclient import discovery -from clusterfuzz._internal.base import concurrency from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils from clusterfuzz._internal.config import local_config from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.datastore import ndb_utils -from clusterfuzz._internal.google_cloud_utils import batch from clusterfuzz._internal.google_cloud_utils import credentials from clusterfuzz._internal.metrics import logs -# TODO(metzman): Actually implement this. -CPUS_PER_FUZZ_JOB = 2 - -def _get_quotas(creds, project, region): - compute = discovery.build('compute', 'v1', credentials=creds) +def _get_quotas(project, region): + gcp_credentials = credentials.get_default()[0] + compute = discovery.build('compute', 'v1', credentials=gcp_credentials) return compute.regions().get( # pylint: disable=no-member region=region, project=project).execute()['quotas'] -def count_unacked(creds, project_id, subscription_id): - """Counts the unacked messages in |subscription_id|.""" - # TODO(metzman): Not all of these are fuzz_tasks. Deal with that. - metric = 'pubsub.googleapis.com/subscription/num_undelivered_messages' - query_filter = (f'metric.type="{metric}" AND ' - f'resource.labels.subscription_id="{subscription_id}"') - time_now = time.time() - # Get the last 5 minutes. - time_interval = monitoring_v3.TimeInterval( - end_time={'seconds': int(time_now)}, - start_time={'seconds': int(time_now - 5 * 60)}, - ) - client = monitoring_v3.MetricServiceClient(credentials=creds) - results = client.list_time_series( - request={ - 'filter': query_filter, - 'interval': time_interval, - 'name': f'projects/{project_id}', - 'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, - }) - # Get the latest point. - for result in results: - if len(result.points) == 0: - continue - result = int(result.points[0].value.int64_value) - logs.info(f'Unacked in {subscription_id}: {result}') - return result - - -def get_available_cpus_for_region(creds, project: str, region: str) -> int: +def get_available_cpus_for_region(project: str, region: str) -> int: """Returns the number of available CPUs in the current GCE region.""" - - quotas = _get_quotas(creds, project, region) + quotas = _get_quotas(project, region) # Sometimes, the preemptible quota is 0, which means the number of preemptible # CPUs is actually limited by the CPU quota. @@ -96,18 +59,13 @@ def get_available_cpus_for_region(creds, project: str, region: str) -> int: assert preemptible_quota or cpu_quota if not preemptible_quota['limit']: - # Preemptible quota is not set. Obey the CPU quota since that limits us. + # Preemptible quota is not set. Obey the CPU quota since that limitss us. quota = cpu_quota else: quota = preemptible_quota assert quota['limit'], quota - # TODO(metzman): Do this in a more configurable way. - # We need this because us-central1 and us-east4 have different numbers of - # cores alloted to us in their quota. Treat them the same to simplify things. - limit = quota['limit'] - limit -= quota['usage'] - return min(limit, 100_000) + return quota['limit'] - quota['usage'] class BaseFuzzTaskScheduler: @@ -121,7 +79,8 @@ def get_fuzz_tasks(self): def _get_cpus_per_fuzz_job(self, job_name): del job_name - return CPUS_PER_FUZZ_JOB + # TODO(metzman): Actually implement this. + return 2 class FuzzTaskCandidate: @@ -223,81 +182,32 @@ def get_fuzz_tasks(available_cpus: int) -> [tasks.Task]: def get_batch_regions(batch_config): - fuzz_subconf_names = { - subconf['name'] for subconf in batch_config.get( - 'mapping.LINUX-PREEMPTIBLE-UNPRIVILEGED.subconfigs') - } - - subconfs = batch_config.get('subconfigs') - return list( - set(subconfs[subconf]['region'] - for subconf in subconfs - if subconf in fuzz_subconf_names)) - - -def get_available_cpus(project: str, regions: List[str]) -> int: - """Returns the available CPUs for fuzz tasks.""" - # TODO(metzman): This doesn't distinguish between fuzz and non-fuzz - # tasks (nor preemptible and non-preemptible CPUs). Fix this. - # Get total scheduled and queued. - creds = credentials.get_default()[0] - count_args = ((project, region) for region in regions) - with concurrency.make_pool() as pool: - # These calls are extremely slow (about 1 minute total). - result = pool.starmap_async( # pylint: disable=no-member - batch.count_queued_or_scheduled_tasks, count_args) - waiting_tasks = count_unacked(creds, project, 'preprocess') - waiting_tasks += count_unacked(creds, project, 'utask_main') - region_counts = zip(*result.get()) # Group all queued and all scheduled. - - # Add up all queued and scheduled. - region_counts = [sum(tup) for tup in region_counts] - logs.info(f'Region counts: {region_counts}') - if region_counts[0] > 5000: - # Check queued tasks. - logs.info('Too many jobs queued, not scheduling more fuzzing.') - return 0 - waiting_tasks += sum(region_counts) # Add up queued and scheduled. - soon_occupied_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB - logs.info(f'Soon occupied CPUs: {soon_occupied_cpus}') - available_cpus = sum( - get_available_cpus_for_region(creds, project, region) - for region in regions) - logs.info('Actually free CPUs (before subtracting soon ' - f'occupied): {available_cpus}') - available_cpus = max(available_cpus - soon_occupied_cpus, 0) - - # Don't schedule more than 10K tasks at once. So we don't overload batch. - print('len_regions', len(regions)) - available_cpus = min(available_cpus, 20_000 * len(regions)) - return available_cpus + mapping = batch_config.get('mapping') + return list(set(config['gce_region'] for config in mapping.values())) def schedule_fuzz_tasks() -> bool: """Schedules fuzz tasks.""" - multiprocessing.set_start_method('spawn') + start = time.time() batch_config = local_config.BatchConfig() + regions = set(get_batch_regions(batch_config)) project = batch_config.get('project') - regions = get_batch_regions(batch_config) - while True: - start = time.time() - available_cpus = get_available_cpus(project, regions) - logs.error(f'{available_cpus} available CPUs.') - if not available_cpus: - continue - - fuzz_tasks = get_fuzz_tasks(available_cpus) - if not fuzz_tasks: - logs.error('No fuzz tasks found to schedule.') - continue - - logs.info(f'Adding {fuzz_tasks} to preprocess queue.') - tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) - logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') - - end = time.time() - total = end - start - logs.info(f'Task scheduling took {total} seconds.') + available_cpus = sum( + get_available_cpus_for_region(project, region) for region in regions) + available_cpus = min(available_cpus, 3500 * len(regions)) + fuzz_tasks = get_fuzz_tasks(available_cpus) + if not fuzz_tasks: + logs.error('No fuzz tasks found to schedule.') + return False + + logs.info(f'Adding {fuzz_tasks} to preprocess queue.') + tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) + logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') + + end = time.time() + total = end - start + logs.info(f'Task scheduling took {total} seconds.') + return True def main(): diff --git a/src/clusterfuzz/_internal/google_cloud_utils/batch.py b/src/clusterfuzz/_internal/google_cloud_utils/batch.py index 74300ee84d..607d567727 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/batch.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/batch.py @@ -66,7 +66,10 @@ def _create_batch_client_new(): """Creates a batch client.""" - creds, _ = credentials.get_default() + creds, project = credentials.get_default() + if not project: + project = utils.get_application_id() + return batch.BatchServiceClient(credentials=creds) @@ -352,19 +355,3 @@ def _get_specs_from_config(batch_tasks) -> Dict: ) specs[(task.command, task.job_type)] = spec return specs - - -def count_queued_or_scheduled_tasks(project: str, - region: str) -> Tuple[int, int]: - """Counts the number of queued and scheduled tasks.""" - region = f'projects/{project}/locations/{region}' - jobs_filter = 'status.state="SCHEDULED" OR status.state="QUEUED"' - req = batch.types.ListJobsRequest(parent=region, filter=jobs_filter) - queued = 0 - scheduled = 0 - for job in _batch_client().list_jobs(request=req): - if job.status.state == 'SCHEDULED': - scheduled += job.task_groups[0].task_count - elif job.status.state == 'QUEUED': - queued += job.task_groups[0].task_count - return (queued, scheduled) diff --git a/src/clusterfuzz/_internal/google_cloud_utils/storage.py b/src/clusterfuzz/_internal/google_cloud_utils/storage.py index 7a7bee5211..11107d956c 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/storage.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/storage.py @@ -14,7 +14,6 @@ """Functions for managing Google Cloud Storage.""" import collections -from concurrent import futures import copy import datetime import json @@ -1368,7 +1367,7 @@ def _mappable_sign_urls_for_existing_file(url_and_include_delete_urls): def sign_urls_for_existing_files(urls, include_delete_urls): logs.info('Signing URLs for existing files.') args = ((url, include_delete_urls) for url in urls) - result = parallel_map(_sign_urls_for_existing_file, args) + result = maybe_parallel_map(_sign_urls_for_existing_file, args) logs.info('Done signing URLs for existing files.') return result @@ -1378,24 +1377,17 @@ def get_arbitrary_signed_upload_url(remote_directory): get_arbitrary_signed_upload_urls(remote_directory, num_uploads=1))[0] -def parallel_map(func, argument_list): +def maybe_parallel_map(func, arguments): """Wrapper around pool.map so we don't do it on OSS-Fuzz hosts which will OOM.""" + if not environment.is_tworker(): + # TODO(b/metzman): When the rearch is done, internal google CF won't have + # tworkers, but maybe should be using parallel. + return map(func, arguments) + max_size = 2 - timeout = 120 - with concurrency.make_pool(max_pool_size=max_size) as pool: - calls = {pool.submit(func, argument) for argument in argument_list} - while calls: - finished_calls, _ = futures.wait( - calls, timeout=timeout, return_when=futures.FIRST_COMPLETED) - if not finished_calls: - logs.error('No call completed.') - for call in calls: - call.cancel() - raise TimeoutError(f'Nothing completed within {timeout} seconds') - for call in finished_calls: - calls.remove(call) - yield call.result(timeout=timeout) + with concurrency.make_pool(cpu_bound=True, max_pool_size=max_size) as pool: + return pool.imap_unordered(func, arguments) def get_arbitrary_signed_upload_urls(remote_directory: str, num_uploads: int): @@ -1414,6 +1406,6 @@ def get_arbitrary_signed_upload_urls(remote_directory: str, num_uploads: int): urls = (f'{base_path}-{idx}' for idx in range(num_uploads)) logs.info('Signing URLs for arbitrary uploads.') - result = parallel_map(get_signed_upload_url, urls) + result = maybe_parallel_map(get_signed_upload_url, urls) logs.info('Done signing URLs for arbitrary uploads.') return result diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index 7cd22829c5..004768456d 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -17,7 +17,6 @@ from clusterfuzz._internal.cron import schedule_fuzz from clusterfuzz._internal.datastore import data_types -from clusterfuzz._internal.google_cloud_utils import credentials from clusterfuzz._internal.tests.test_libs import helpers as test_helpers from clusterfuzz._internal.tests.test_libs import test_utils @@ -85,7 +84,6 @@ class TestGetAvailableCpusForRegion(unittest.TestCase): def setUp(self): test_helpers.patch(self, ['clusterfuzz._internal.cron.schedule_fuzz._get_quotas']) - self.creds = credentials.get_default() def test_usage(self): """Tests that get_available_cpus_for_region handles usage properly.""" @@ -95,8 +93,7 @@ def test_usage(self): 'usage': 2 }] self.assertEqual( - schedule_fuzz.get_available_cpus_for_region(self.creds, 'project', - 'region'), 3) + schedule_fuzz.get_available_cpus_for_region('project', 'region'), 3) def test_cpus_and_preemptible_cpus(self): """Tests that get_available_cpus_for_region handles usage properly.""" @@ -110,5 +107,4 @@ def test_cpus_and_preemptible_cpus(self): 'usage': 5 }] self.assertEqual( - schedule_fuzz.get_available_cpus_for_region(self.creds, 'region', - 'project'), 5) + schedule_fuzz.get_available_cpus_for_region('region', 'project'), 5) diff --git a/src/python/bot/startup/run_bot.py b/src/python/bot/startup/run_bot.py index c9bb37fa4a..8c674fba2c 100644 --- a/src/python/bot/startup/run_bot.py +++ b/src/python/bot/startup/run_bot.py @@ -194,6 +194,9 @@ def main(): sys.exit(-1) fuzzers_init.run() + + logs.info(f'PID is {os.getpid()}') + if environment.is_trusted_host(ensure_connected=False): from clusterfuzz._internal.bot.untrusted_runner import host host.init()