Skip to content

Commit

Permalink
Bound preprocess and parallel signing calls (#4564)
Browse files Browse the repository at this point in the history
Bound preprocess to 30 minutes (shouldn't be more than 5)
Bound parallel signing to 2 minutes, exception if we pass this.
  • Loading branch information
jonathanmetzman authored Dec 27, 2024
1 parent 246122f commit 36505ab
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 36 deletions.
23 changes: 3 additions & 20 deletions src/clusterfuzz/_internal/base/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,18 @@
POOL_SIZE = multiprocessing.cpu_count()


class SingleThreadPool:
"""Single thread pool for when it's not worth using Python's thread
implementation."""

def __init__(self, size):
del size

def map(self, f, l):
return list(map(f, l))

def imap_unordered(self, f, l):
return list(map(f, l))


@contextlib.contextmanager
def make_pool(pool_size=POOL_SIZE, cpu_bound=False, max_pool_size=None):
def make_pool(pool_size=POOL_SIZE, max_pool_size=None):
"""Returns a pool that can (usually) execute tasks concurrently."""
if max_pool_size is not None:
pool_size = min(pool_size, max_pool_size)

# Don't use processes on Windows and unittests to avoid hangs.
if (environment.get_value('PY_UNITTESTS') or
environment.platform() == 'WINDOWS'):
if cpu_bound:
yield SingleThreadPool(pool_size)
else:
yield futures.ThreadPoolExecutor(pool_size)
yield futures.ThreadPoolExecutor(pool_size)
else:
yield multiprocessing.Pool(pool_size)
yield futures.ProcessPoolExecutor(pool_size)


# TODO(metzman): Find out if batching makes things even faster.
55 changes: 49 additions & 6 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def get_preprocess_task():
messages = pubsub_puller.get_messages(max_messages=1)
if not messages:
return None
task = get_task_from_message(messages[0])
task = get_task_from_message(messages[0], task_cls=PubSubTTask)
if task:
logs.info('Pulled from preprocess queue.')
return task
Expand Down Expand Up @@ -516,12 +516,45 @@ def dont_retry(self):
self._pubsub_message.ack()


def get_task_from_message(message, can_defer=True) -> Optional[PubSubTask]:
class PubSubTTask(PubSubTask):
"""TTask that won't repeat on timeout."""
TTASK_TIMEOUT = 30 * 60

@contextlib.contextmanager
def lease(self, _event=None): # pylint: disable=arguments-differ
"""Maintain a lease for the task."""
task_lease_timeout = TASK_LEASE_SECONDS_BY_COMMAND.get(
self.command, get_task_lease_timeout())

environment.set_value('TASK_LEASE_SECONDS', task_lease_timeout)
track_task_start(self, task_lease_timeout)
if _event is None:
_event = threading.Event()
if self.command != 'fuzz':
leaser_thread = _PubSubLeaserThread(self._pubsub_message, _event,
task_lease_timeout)
else:
leaser_thread = _PubSubLeaserThread(
self._pubsub_message, _event, self.TTASK_TIMEOUT, ack_on_timeout=True)
leaser_thread.start()
try:
yield leaser_thread
finally:
_event.set()
leaser_thread.join()

# If we get here the task succeeded in running. Acknowledge the message.
self._pubsub_message.ack()
track_task_end()


def get_task_from_message(message, can_defer=True,
task_cls=None) -> Optional[PubSubTask]:
"""Returns a task constructed from the first of |messages| if possible."""
if message is None:
return None
try:
task = initialize_task(message)
task = initialize_task(message, task_cls=task_cls)
except KeyError:
logs.error('Received an invalid task, discarding...')
message.ack()
Expand Down Expand Up @@ -560,11 +593,13 @@ def handle_multiple_utask_main_messages(messages) -> List[PubSubTask]:
return tasks


def initialize_task(message) -> PubSubTask:
def initialize_task(message, task_cls=None) -> PubSubTask:
"""Creates a task from |messages|."""
if task_cls is None:
task_cls = PubSubTask

if message.attributes.get('eventType') != 'OBJECT_FINALIZE':
return PubSubTask(message)
return task_cls(message)

# Handle postprocess task.
# The GCS API for pub/sub notifications uses the data field unlike
Expand Down Expand Up @@ -598,13 +633,18 @@ class _PubSubLeaserThread(threading.Thread):

EXTENSION_TIME_SECONDS = 10 * 60 # 10 minutes.

def __init__(self, message, done_event, max_lease_seconds):
def __init__(self,
message,
done_event,
max_lease_seconds,
ack_on_timeout=False):
super().__init__()

self.daemon = True
self._message = message
self._done_event = done_event
self._max_lease_seconds = max_lease_seconds
self._ack_on_timeout = ack_on_timeout

def run(self):
"""Run the leaser thread."""
Expand All @@ -616,6 +656,9 @@ def run(self):
if time_left <= 0:
logs.info('Lease reached maximum lease time of {} seconds, '
'stopping renewal.'.format(self._max_lease_seconds))
if self._ack_on_timeout:
logs.info('Acking on timeout')
self._message.ack()
break

extension_seconds = min(self.EXTENSION_TIME_SECONDS, time_left)
Expand Down
28 changes: 18 additions & 10 deletions src/clusterfuzz/_internal/google_cloud_utils/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Functions for managing Google Cloud Storage."""

import collections
from concurrent import futures
import copy
import datetime
import json
Expand Down Expand Up @@ -1367,7 +1368,7 @@ def _mappable_sign_urls_for_existing_file(url_and_include_delete_urls):
def sign_urls_for_existing_files(urls, include_delete_urls):
logs.info('Signing URLs for existing files.')
args = ((url, include_delete_urls) for url in urls)
result = maybe_parallel_map(_sign_urls_for_existing_file, args)
result = parallel_map(_sign_urls_for_existing_file, args)
logs.info('Done signing URLs for existing files.')
return result

Expand All @@ -1377,17 +1378,24 @@ def get_arbitrary_signed_upload_url(remote_directory):
get_arbitrary_signed_upload_urls(remote_directory, num_uploads=1))[0]


def maybe_parallel_map(func, arguments):
def parallel_map(func, argument_list):
"""Wrapper around pool.map so we don't do it on OSS-Fuzz hosts which
will OOM."""
if not environment.is_tworker():
# TODO(b/metzman): When the rearch is done, internal google CF won't have
# tworkers, but maybe should be using parallel.
return map(func, arguments)

max_size = 2
with concurrency.make_pool(cpu_bound=True, max_pool_size=max_size) as pool:
return pool.imap_unordered(func, arguments)
timeout = 120
with concurrency.make_pool(max_pool_size=max_size) as pool:
calls = {pool.submit(func, argument) for argument in argument_list}
while calls:
finished_calls, _ = futures.wait(
calls, timeout=timeout, return_when=futures.FIRST_COMPLETED)
if not finished_calls:
logs.error('No call completed.')
for call in calls:
call.cancel()
raise TimeoutError(f'Nothing completed within {timeout} seconds')
for call in finished_calls:
calls.remove(call)
yield call.result(timeout=timeout)


def get_arbitrary_signed_upload_urls(remote_directory: str, num_uploads: int):
Expand All @@ -1406,6 +1414,6 @@ def get_arbitrary_signed_upload_urls(remote_directory: str, num_uploads: int):

urls = (f'{base_path}-{idx}' for idx in range(num_uploads))
logs.info('Signing URLs for arbitrary uploads.')
result = maybe_parallel_map(get_signed_upload_url, urls)
result = parallel_map(get_signed_upload_url, urls)
logs.info('Done signing URLs for arbitrary uploads.')
return result

0 comments on commit 36505ab

Please sign in to comment.