Skip to content

Commit

Permalink
Revert 4564 and 4528 (#4568)
Browse files Browse the repository at this point in the history
The preprocess count for fuzz task went to zero after #4564 got
deployed, reverting.

#4528 is also being reverted because it introduced the following error
into the fuzz task scheduler, which caused fuzz tasks to stop being
scheduled:

```
Traceback (most recent call last):
  File "/mnt/scratch0/clusterfuzz/src/python/bot/startup/run_cron.py", line 68, in <module>
    sys.exit(main())
             ^^^^^^
  File "/mnt/scratch0/clusterfuzz/src/python/bot/startup/run_cron.py", line 64, in main
    return 0 if task_module.main() else 1
                ^^^^^^^^^^^^^^^^^^
  File "/mnt/scratch0/clusterfuzz/src/clusterfuzz/_internal/cron/schedule_fuzz.py", line 304, in main
    return schedule_fuzz_tasks()
           ^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/scratch0/clusterfuzz/src/clusterfuzz/_internal/cron/schedule_fuzz.py", line 284, in schedule_fuzz_tasks
    available_cpus = get_available_cpus(project, regions)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/scratch0/clusterfuzz/src/clusterfuzz/_internal/cron/schedule_fuzz.py", line 247, in get_available_cpus
    result = pool.starmap_async(  # pylint: disable=no-member
             ^^^^^^^^^^^^^^^^^^
AttributeError: 'ProcessPoolExecutor' object has no attribute 'starmap_async'

```
  • Loading branch information
vitorguidi authored Dec 28, 2024
1 parent 36505ab commit 126d1c5
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 215 deletions.
23 changes: 20 additions & 3 deletions src/clusterfuzz/_internal/base/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,35 @@
POOL_SIZE = multiprocessing.cpu_count()


class SingleThreadPool:
"""Single thread pool for when it's not worth using Python's thread
implementation."""

def __init__(self, size):
del size

def map(self, f, l):
return list(map(f, l))

def imap_unordered(self, f, l):
return list(map(f, l))


@contextlib.contextmanager
def make_pool(pool_size=POOL_SIZE, max_pool_size=None):
def make_pool(pool_size=POOL_SIZE, cpu_bound=False, max_pool_size=None):
"""Returns a pool that can (usually) execute tasks concurrently."""
if max_pool_size is not None:
pool_size = min(pool_size, max_pool_size)

# Don't use processes on Windows and unittests to avoid hangs.
if (environment.get_value('PY_UNITTESTS') or
environment.platform() == 'WINDOWS'):
yield futures.ThreadPoolExecutor(pool_size)
if cpu_bound:
yield SingleThreadPool(pool_size)
else:
yield futures.ThreadPoolExecutor(pool_size)
else:
yield futures.ProcessPoolExecutor(pool_size)
yield multiprocessing.Pool(pool_size)


# TODO(metzman): Find out if batching makes things even faster.
59 changes: 7 additions & 52 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@
MAX_LEASED_TASKS_LIMIT = 1000
MAX_TASKS_LIMIT = 100000

# The stated limit is 1000, but in reality meassages do not get delivered
# around this limit. We should probably switch to the real client library.
MAX_PUBSUB_MESSAGES_PER_REQ = 250
MAX_PUBSUB_MESSAGES_PER_REQ = 1000

# Various variables for task leasing and completion times (in seconds).
TASK_COMPLETION_BUFFER = 90 * 60
Expand Down Expand Up @@ -322,7 +320,7 @@ def get_preprocess_task():
messages = pubsub_puller.get_messages(max_messages=1)
if not messages:
return None
task = get_task_from_message(messages[0], task_cls=PubSubTTask)
task = get_task_from_message(messages[0])
if task:
logs.info('Pulled from preprocess queue.')
return task
Expand Down Expand Up @@ -516,45 +514,12 @@ def dont_retry(self):
self._pubsub_message.ack()


class PubSubTTask(PubSubTask):
"""TTask that won't repeat on timeout."""
TTASK_TIMEOUT = 30 * 60

@contextlib.contextmanager
def lease(self, _event=None): # pylint: disable=arguments-differ
"""Maintain a lease for the task."""
task_lease_timeout = TASK_LEASE_SECONDS_BY_COMMAND.get(
self.command, get_task_lease_timeout())

environment.set_value('TASK_LEASE_SECONDS', task_lease_timeout)
track_task_start(self, task_lease_timeout)
if _event is None:
_event = threading.Event()
if self.command != 'fuzz':
leaser_thread = _PubSubLeaserThread(self._pubsub_message, _event,
task_lease_timeout)
else:
leaser_thread = _PubSubLeaserThread(
self._pubsub_message, _event, self.TTASK_TIMEOUT, ack_on_timeout=True)
leaser_thread.start()
try:
yield leaser_thread
finally:
_event.set()
leaser_thread.join()

# If we get here the task succeeded in running. Acknowledge the message.
self._pubsub_message.ack()
track_task_end()


def get_task_from_message(message, can_defer=True,
task_cls=None) -> Optional[PubSubTask]:
def get_task_from_message(message, can_defer=True) -> Optional[PubSubTask]:
"""Returns a task constructed from the first of |messages| if possible."""
if message is None:
return None
try:
task = initialize_task(message, task_cls=task_cls)
task = initialize_task(message)
except KeyError:
logs.error('Received an invalid task, discarding...')
message.ack()
Expand Down Expand Up @@ -593,13 +558,11 @@ def handle_multiple_utask_main_messages(messages) -> List[PubSubTask]:
return tasks


def initialize_task(message, task_cls=None) -> PubSubTask:
def initialize_task(message) -> PubSubTask:
"""Creates a task from |messages|."""
if task_cls is None:
task_cls = PubSubTask

if message.attributes.get('eventType') != 'OBJECT_FINALIZE':
return task_cls(message)
return PubSubTask(message)

# Handle postprocess task.
# The GCS API for pub/sub notifications uses the data field unlike
Expand Down Expand Up @@ -633,18 +596,13 @@ class _PubSubLeaserThread(threading.Thread):

EXTENSION_TIME_SECONDS = 10 * 60 # 10 minutes.

def __init__(self,
message,
done_event,
max_lease_seconds,
ack_on_timeout=False):
def __init__(self, message, done_event, max_lease_seconds):
super().__init__()

self.daemon = True
self._message = message
self._done_event = done_event
self._max_lease_seconds = max_lease_seconds
self._ack_on_timeout = ack_on_timeout

def run(self):
"""Run the leaser thread."""
Expand All @@ -656,9 +614,6 @@ def run(self):
if time_left <= 0:
logs.info('Lease reached maximum lease time of {} seconds, '
'stopping renewal.'.format(self._max_lease_seconds))
if self._ack_on_timeout:
logs.info('Acking on timeout')
self._message.ack()
break

extension_seconds = min(self.EXTENSION_TIME_SECONDS, time_left)
Expand Down
148 changes: 29 additions & 119 deletions src/clusterfuzz/_internal/cron/schedule_fuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,68 +14,31 @@
"""Cron job to schedule fuzz tasks that run on batch."""

import collections
import multiprocessing
import random
import time
from typing import Dict
from typing import List

from google.cloud import monitoring_v3
from googleapiclient import discovery

from clusterfuzz._internal.base import concurrency
from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.config import local_config
from clusterfuzz._internal.datastore import data_types
from clusterfuzz._internal.datastore import ndb_utils
from clusterfuzz._internal.google_cloud_utils import batch
from clusterfuzz._internal.google_cloud_utils import credentials
from clusterfuzz._internal.metrics import logs

# TODO(metzman): Actually implement this.
CPUS_PER_FUZZ_JOB = 2


def _get_quotas(creds, project, region):
compute = discovery.build('compute', 'v1', credentials=creds)
def _get_quotas(project, region):
gcp_credentials = credentials.get_default()[0]
compute = discovery.build('compute', 'v1', credentials=gcp_credentials)
return compute.regions().get( # pylint: disable=no-member
region=region, project=project).execute()['quotas']


def count_unacked(creds, project_id, subscription_id):
"""Counts the unacked messages in |subscription_id|."""
# TODO(metzman): Not all of these are fuzz_tasks. Deal with that.
metric = 'pubsub.googleapis.com/subscription/num_undelivered_messages'
query_filter = (f'metric.type="{metric}" AND '
f'resource.labels.subscription_id="{subscription_id}"')
time_now = time.time()
# Get the last 5 minutes.
time_interval = monitoring_v3.TimeInterval(
end_time={'seconds': int(time_now)},
start_time={'seconds': int(time_now - 5 * 60)},
)
client = monitoring_v3.MetricServiceClient(credentials=creds)
results = client.list_time_series(
request={
'filter': query_filter,
'interval': time_interval,
'name': f'projects/{project_id}',
'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
})
# Get the latest point.
for result in results:
if len(result.points) == 0:
continue
result = int(result.points[0].value.int64_value)
logs.info(f'Unacked in {subscription_id}: {result}')
return result


def get_available_cpus_for_region(creds, project: str, region: str) -> int:
def get_available_cpus_for_region(project: str, region: str) -> int:
"""Returns the number of available CPUs in the current GCE region."""

quotas = _get_quotas(creds, project, region)
quotas = _get_quotas(project, region)

# Sometimes, the preemptible quota is 0, which means the number of preemptible
# CPUs is actually limited by the CPU quota.
Expand All @@ -96,18 +59,13 @@ def get_available_cpus_for_region(creds, project: str, region: str) -> int:
assert preemptible_quota or cpu_quota

if not preemptible_quota['limit']:
# Preemptible quota is not set. Obey the CPU quota since that limits us.
# Preemptible quota is not set. Obey the CPU quota since that limitss us.
quota = cpu_quota
else:
quota = preemptible_quota
assert quota['limit'], quota

# TODO(metzman): Do this in a more configurable way.
# We need this because us-central1 and us-east4 have different numbers of
# cores alloted to us in their quota. Treat them the same to simplify things.
limit = quota['limit']
limit -= quota['usage']
return min(limit, 100_000)
return quota['limit'] - quota['usage']


class BaseFuzzTaskScheduler:
Expand All @@ -121,7 +79,8 @@ def get_fuzz_tasks(self):

def _get_cpus_per_fuzz_job(self, job_name):
del job_name
return CPUS_PER_FUZZ_JOB
# TODO(metzman): Actually implement this.
return 2


class FuzzTaskCandidate:
Expand Down Expand Up @@ -223,81 +182,32 @@ def get_fuzz_tasks(available_cpus: int) -> [tasks.Task]:


def get_batch_regions(batch_config):
fuzz_subconf_names = {
subconf['name'] for subconf in batch_config.get(
'mapping.LINUX-PREEMPTIBLE-UNPRIVILEGED.subconfigs')
}

subconfs = batch_config.get('subconfigs')
return list(
set(subconfs[subconf]['region']
for subconf in subconfs
if subconf in fuzz_subconf_names))


def get_available_cpus(project: str, regions: List[str]) -> int:
"""Returns the available CPUs for fuzz tasks."""
# TODO(metzman): This doesn't distinguish between fuzz and non-fuzz
# tasks (nor preemptible and non-preemptible CPUs). Fix this.
# Get total scheduled and queued.
creds = credentials.get_default()[0]
count_args = ((project, region) for region in regions)
with concurrency.make_pool() as pool:
# These calls are extremely slow (about 1 minute total).
result = pool.starmap_async( # pylint: disable=no-member
batch.count_queued_or_scheduled_tasks, count_args)
waiting_tasks = count_unacked(creds, project, 'preprocess')
waiting_tasks += count_unacked(creds, project, 'utask_main')
region_counts = zip(*result.get()) # Group all queued and all scheduled.

# Add up all queued and scheduled.
region_counts = [sum(tup) for tup in region_counts]
logs.info(f'Region counts: {region_counts}')
if region_counts[0] > 5000:
# Check queued tasks.
logs.info('Too many jobs queued, not scheduling more fuzzing.')
return 0
waiting_tasks += sum(region_counts) # Add up queued and scheduled.
soon_occupied_cpus = waiting_tasks * CPUS_PER_FUZZ_JOB
logs.info(f'Soon occupied CPUs: {soon_occupied_cpus}')
available_cpus = sum(
get_available_cpus_for_region(creds, project, region)
for region in regions)
logs.info('Actually free CPUs (before subtracting soon '
f'occupied): {available_cpus}')
available_cpus = max(available_cpus - soon_occupied_cpus, 0)

# Don't schedule more than 10K tasks at once. So we don't overload batch.
print('len_regions', len(regions))
available_cpus = min(available_cpus, 20_000 * len(regions))
return available_cpus
mapping = batch_config.get('mapping')
return list(set(config['gce_region'] for config in mapping.values()))


def schedule_fuzz_tasks() -> bool:
"""Schedules fuzz tasks."""
multiprocessing.set_start_method('spawn')
start = time.time()
batch_config = local_config.BatchConfig()
regions = set(get_batch_regions(batch_config))
project = batch_config.get('project')
regions = get_batch_regions(batch_config)
while True:
start = time.time()
available_cpus = get_available_cpus(project, regions)
logs.error(f'{available_cpus} available CPUs.')
if not available_cpus:
continue

fuzz_tasks = get_fuzz_tasks(available_cpus)
if not fuzz_tasks:
logs.error('No fuzz tasks found to schedule.')
continue

logs.info(f'Adding {fuzz_tasks} to preprocess queue.')
tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True)
logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.')

end = time.time()
total = end - start
logs.info(f'Task scheduling took {total} seconds.')
available_cpus = sum(
get_available_cpus_for_region(project, region) for region in regions)
available_cpus = min(available_cpus, 3500 * len(regions))
fuzz_tasks = get_fuzz_tasks(available_cpus)
if not fuzz_tasks:
logs.error('No fuzz tasks found to schedule.')
return False

logs.info(f'Adding {fuzz_tasks} to preprocess queue.')
tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True)
logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.')

end = time.time()
total = end - start
logs.info(f'Task scheduling took {total} seconds.')
return True


def main():
Expand Down
21 changes: 4 additions & 17 deletions src/clusterfuzz/_internal/google_cloud_utils/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@

def _create_batch_client_new():
"""Creates a batch client."""
creds, _ = credentials.get_default()
creds, project = credentials.get_default()
if not project:
project = utils.get_application_id()

return batch.BatchServiceClient(credentials=creds)


Expand Down Expand Up @@ -352,19 +355,3 @@ def _get_specs_from_config(batch_tasks) -> Dict:
)
specs[(task.command, task.job_type)] = spec
return specs


def count_queued_or_scheduled_tasks(project: str,
region: str) -> Tuple[int, int]:
"""Counts the number of queued and scheduled tasks."""
region = f'projects/{project}/locations/{region}'
jobs_filter = 'status.state="SCHEDULED" OR status.state="QUEUED"'
req = batch.types.ListJobsRequest(parent=region, filter=jobs_filter)
queued = 0
scheduled = 0
for job in _batch_client().list_jobs(request=req):
if job.status.state == 'SCHEDULED':
scheduled += job.task_groups[0].task_count
elif job.status.state == 'QUEUED':
queued += job.task_groups[0].task_count
return (queued, scheduled)
Loading

0 comments on commit 126d1c5

Please sign in to comment.