Skip to content

Commit

Permalink
Avoid exporting internal functions. (#3686)
Browse files Browse the repository at this point in the history
Added in #3684.
  • Loading branch information
letitz authored Jan 29, 2024
1 parent dae1bbc commit 4ab81b3
Showing 1 changed file with 22 additions and 22 deletions.
44 changes: 22 additions & 22 deletions src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ class _Subtask(enum.Enum):
POSTPROCESS = "postprocess"


def timestamp_now() -> Timestamp:
def _timestamp_now() -> Timestamp:
ts = Timestamp()
ts.GetCurrentTime()
return ts


def record_e2e_duration(start: Timestamp, utask_module, job_type: str,
subtask: _Subtask, mode: _Mode):
def _record_e2e_duration(start: Timestamp, utask_module, job_type: str,
subtask: _Subtask, mode: _Mode):
duration = start.ToSeconds() - time.time()
monitoring_metrics.UTASK_E2E_DURATION_SECS.add(
duration, {
Expand Down Expand Up @@ -83,7 +83,7 @@ def tworker_preprocess_no_io(utask_module, task_argument, job_type,
uworker_env):
"""Executes the preprocessing step of the utask |utask_module| and returns the
serialized output."""
start = timestamp_now()
start = _timestamp_now()
logs.log('Starting utask_preprocess: %s.' % utask_module)
ensure_uworker_env_type_safety(uworker_env)
set_uworker_env(uworker_env)
Expand All @@ -99,15 +99,15 @@ def tworker_preprocess_no_io(utask_module, task_argument, job_type,
uworker_input.module_name = utask_module.__name__

result = uworker_io.serialize_uworker_input(uworker_input)
record_e2e_duration(start, utask_module, job_type, _Subtask.PREPROCESS,
_Mode.QUEUE)
_record_e2e_duration(start, utask_module, job_type, _Subtask.PREPROCESS,
_Mode.QUEUE)
return result


def uworker_main_no_io(utask_module, serialized_uworker_input):
"""Exectues the main part of a utask on the uworker (locally if not using
remote executor)."""
start = timestamp_now()
start = _timestamp_now()
logs.log('Starting utask_main: %s.' % utask_module)
uworker_input = uworker_io.deserialize_uworker_input(serialized_uworker_input)

Expand All @@ -118,8 +118,8 @@ def uworker_main_no_io(utask_module, serialized_uworker_input):
if uworker_output is None:
return None
result = uworker_io.serialize_uworker_output(uworker_output)
record_e2e_duration(start, utask_module, uworker_input.job_type,
_Subtask.UWORKER_MAIN, _Mode.QUEUE)
_record_e2e_duration(start, utask_module, uworker_input.job_type,
_Subtask.UWORKER_MAIN, _Mode.QUEUE)
return result


Expand All @@ -128,22 +128,22 @@ def tworker_postprocess_no_io(utask_module, uworker_output, uworker_input):
the same bot as the uworker)."""
# TODO(metzman): Stop passing module to this function and uworker_main_no_io.
# Make them consistent with the I/O versions.
start = timestamp_now()
start = _timestamp_now()
uworker_output = uworker_io.deserialize_uworker_output(uworker_output)
# Do this to simulate out-of-band tamper-proof storage of the input.
uworker_input = uworker_io.deserialize_uworker_input(uworker_input)
uworker_output.uworker_input.CopyFrom(uworker_input)
set_uworker_env(uworker_output.uworker_input.uworker_env)
utask_module.utask_postprocess(uworker_output)
record_e2e_duration(start, utask_module, uworker_input.job_type,
_Subtask.POSTPROCESS, _Mode.QUEUE)
_record_e2e_duration(start, utask_module, uworker_input.job_type,
_Subtask.POSTPROCESS, _Mode.QUEUE)


def tworker_preprocess(utask_module, task_argument, job_type, uworker_env):
"""Executes the preprocessing step of the utask |utask_module| and returns the
signed download URL for the uworker's input and the (unsigned) download URL
for its output."""
start = timestamp_now()
start = _timestamp_now()
logs.log('Starting utask_preprocess: %s.' % utask_module)
ensure_uworker_env_type_safety(uworker_env)
set_uworker_env(uworker_env)
Expand All @@ -164,8 +164,8 @@ def tworker_preprocess(utask_module, task_argument, job_type, uworker_env):
uworker_input_signed_download_url, uworker_output_download_gcs_url = (
uworker_io.serialize_and_upload_uworker_input(uworker_input))

record_e2e_duration(start, utask_module, job_type, _Subtask.PREPROCESS,
_Mode.BATCH)
_record_e2e_duration(start, utask_module, job_type, _Subtask.PREPROCESS,
_Mode.BATCH)

# Return the uworker_input_signed_download_url for the remote executor to pass
# to the batch job and for the local executor to download locally. Return
Expand All @@ -183,7 +183,7 @@ def set_uworker_env(uworker_env: dict) -> None:
def uworker_main(input_download_url) -> None:
"""Exectues the main part of a utask on the uworker (locally if not using
remote executor)."""
start = timestamp_now()
start = _timestamp_now()
uworker_input = uworker_io.download_and_deserialize_uworker_input(
input_download_url)
uworker_output_upload_url = uworker_input.uworker_output_upload_url
Expand All @@ -199,9 +199,9 @@ def uworker_main(input_download_url) -> None:
uworker_io.serialize_and_upload_uworker_output(uworker_output,
uworker_output_upload_url)
logs.log('Finished uworker_main.')
record_e2e_duration(start, utask_module,
uworker_output.uworker_input.job_type,
_Subtask.UWORKER_MAIN, _Mode.BATCH)
_record_e2e_duration(start, utask_module,
uworker_output.uworker_input.job_type,
_Subtask.UWORKER_MAIN, _Mode.BATCH)
return True


Expand All @@ -219,12 +219,12 @@ def uworker_bot_main():

def tworker_postprocess(output_download_url) -> None:
"""Executes the postprocess step on the trusted (t)worker."""
start = timestamp_now()
start = _timestamp_now()
uworker_output = uworker_io.download_and_deserialize_uworker_output(
output_download_url)
set_uworker_env(uworker_output.uworker_input.uworker_env)
utask_module = get_utask_module(uworker_output.uworker_input.module_name)
utask_module.utask_postprocess(uworker_output)
job_type = uworker_output.uworker_input.job_type
record_e2e_duration(start, utask_module, job_type, _Subtask.POSTPROCESS,
_Mode.BATCH)
_record_e2e_duration(start, utask_module, job_type, _Subtask.POSTPROCESS,
_Mode.BATCH)

0 comments on commit 4ab81b3

Please sign in to comment.