From 4ab81b3bbc1f168385a6d538862c1088f93616d8 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 29 Jan 2024 14:46:31 -0500 Subject: [PATCH] Avoid exporting internal functions. (#3686) Added in #3684. --- .../_internal/bot/tasks/utasks/__init__.py | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py index e1d20c0179..5142d2322c 100644 --- a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py +++ b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py @@ -47,14 +47,14 @@ class _Subtask(enum.Enum): POSTPROCESS = "postprocess" -def timestamp_now() -> Timestamp: +def _timestamp_now() -> Timestamp: ts = Timestamp() ts.GetCurrentTime() return ts -def record_e2e_duration(start: Timestamp, utask_module, job_type: str, - subtask: _Subtask, mode: _Mode): +def _record_e2e_duration(start: Timestamp, utask_module, job_type: str, + subtask: _Subtask, mode: _Mode): duration = start.ToSeconds() - time.time() monitoring_metrics.UTASK_E2E_DURATION_SECS.add( duration, { @@ -83,7 +83,7 @@ def tworker_preprocess_no_io(utask_module, task_argument, job_type, uworker_env): """Executes the preprocessing step of the utask |utask_module| and returns the serialized output.""" - start = timestamp_now() + start = _timestamp_now() logs.log('Starting utask_preprocess: %s.' % utask_module) ensure_uworker_env_type_safety(uworker_env) set_uworker_env(uworker_env) @@ -99,15 +99,15 @@ def tworker_preprocess_no_io(utask_module, task_argument, job_type, uworker_input.module_name = utask_module.__name__ result = uworker_io.serialize_uworker_input(uworker_input) - record_e2e_duration(start, utask_module, job_type, _Subtask.PREPROCESS, - _Mode.QUEUE) + _record_e2e_duration(start, utask_module, job_type, _Subtask.PREPROCESS, + _Mode.QUEUE) return result def uworker_main_no_io(utask_module, serialized_uworker_input): """Exectues the main part of a utask on the uworker (locally if not using remote executor).""" - start = timestamp_now() + start = _timestamp_now() logs.log('Starting utask_main: %s.' % utask_module) uworker_input = uworker_io.deserialize_uworker_input(serialized_uworker_input) @@ -118,8 +118,8 @@ def uworker_main_no_io(utask_module, serialized_uworker_input): if uworker_output is None: return None result = uworker_io.serialize_uworker_output(uworker_output) - record_e2e_duration(start, utask_module, uworker_input.job_type, - _Subtask.UWORKER_MAIN, _Mode.QUEUE) + _record_e2e_duration(start, utask_module, uworker_input.job_type, + _Subtask.UWORKER_MAIN, _Mode.QUEUE) return result @@ -128,22 +128,22 @@ def tworker_postprocess_no_io(utask_module, uworker_output, uworker_input): the same bot as the uworker).""" # TODO(metzman): Stop passing module to this function and uworker_main_no_io. # Make them consistent with the I/O versions. - start = timestamp_now() + start = _timestamp_now() uworker_output = uworker_io.deserialize_uworker_output(uworker_output) # Do this to simulate out-of-band tamper-proof storage of the input. uworker_input = uworker_io.deserialize_uworker_input(uworker_input) uworker_output.uworker_input.CopyFrom(uworker_input) set_uworker_env(uworker_output.uworker_input.uworker_env) utask_module.utask_postprocess(uworker_output) - record_e2e_duration(start, utask_module, uworker_input.job_type, - _Subtask.POSTPROCESS, _Mode.QUEUE) + _record_e2e_duration(start, utask_module, uworker_input.job_type, + _Subtask.POSTPROCESS, _Mode.QUEUE) def tworker_preprocess(utask_module, task_argument, job_type, uworker_env): """Executes the preprocessing step of the utask |utask_module| and returns the signed download URL for the uworker's input and the (unsigned) download URL for its output.""" - start = timestamp_now() + start = _timestamp_now() logs.log('Starting utask_preprocess: %s.' % utask_module) ensure_uworker_env_type_safety(uworker_env) set_uworker_env(uworker_env) @@ -164,8 +164,8 @@ def tworker_preprocess(utask_module, task_argument, job_type, uworker_env): uworker_input_signed_download_url, uworker_output_download_gcs_url = ( uworker_io.serialize_and_upload_uworker_input(uworker_input)) - record_e2e_duration(start, utask_module, job_type, _Subtask.PREPROCESS, - _Mode.BATCH) + _record_e2e_duration(start, utask_module, job_type, _Subtask.PREPROCESS, + _Mode.BATCH) # Return the uworker_input_signed_download_url for the remote executor to pass # to the batch job and for the local executor to download locally. Return @@ -183,7 +183,7 @@ def set_uworker_env(uworker_env: dict) -> None: def uworker_main(input_download_url) -> None: """Exectues the main part of a utask on the uworker (locally if not using remote executor).""" - start = timestamp_now() + start = _timestamp_now() uworker_input = uworker_io.download_and_deserialize_uworker_input( input_download_url) uworker_output_upload_url = uworker_input.uworker_output_upload_url @@ -199,9 +199,9 @@ def uworker_main(input_download_url) -> None: uworker_io.serialize_and_upload_uworker_output(uworker_output, uworker_output_upload_url) logs.log('Finished uworker_main.') - record_e2e_duration(start, utask_module, - uworker_output.uworker_input.job_type, - _Subtask.UWORKER_MAIN, _Mode.BATCH) + _record_e2e_duration(start, utask_module, + uworker_output.uworker_input.job_type, + _Subtask.UWORKER_MAIN, _Mode.BATCH) return True @@ -219,12 +219,12 @@ def uworker_bot_main(): def tworker_postprocess(output_download_url) -> None: """Executes the postprocess step on the trusted (t)worker.""" - start = timestamp_now() + start = _timestamp_now() uworker_output = uworker_io.download_and_deserialize_uworker_output( output_download_url) set_uworker_env(uworker_output.uworker_input.uworker_env) utask_module = get_utask_module(uworker_output.uworker_input.module_name) utask_module.utask_postprocess(uworker_output) job_type = uworker_output.uworker_input.job_type - record_e2e_duration(start, utask_module, job_type, _Subtask.POSTPROCESS, - _Mode.BATCH) + _record_e2e_duration(start, utask_module, job_type, _Subtask.POSTPROCESS, + _Mode.BATCH)