From 5b6057865c399ee46e5cfea7423149208bf972ff Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 29 Jan 2024 21:58:01 +0100 Subject: [PATCH 01/11] Fix metric fields. --- src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py index 5142d2322c..0a9b491ba9 100644 --- a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py +++ b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py @@ -60,8 +60,8 @@ def _record_e2e_duration(start: Timestamp, utask_module, job_type: str, duration, { 'task': task_utils.get_command_from_module(utask_module.__name__), 'job': job_type, - 'subtask': subtask, - 'mode': mode, + 'subtask': subtask.value, + 'mode': mode.value, 'platform': environment.platform(), }) From f19d464cb2e3abb35c7c9b6a4f59f994180ae04b Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 29 Jan 2024 21:58:18 +0100 Subject: [PATCH 02/11] Test tworker_preprocess records start time. --- .../tests/core/bot/tasks/utasks/utasks_test.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py index 89e1d1c2c2..c9e97cd9a4 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py @@ -14,6 +14,7 @@ """Tests for uworker_io.""" import os +import time import unittest from unittest import mock @@ -41,20 +42,29 @@ def setUp(self): self.OUTPUT_SIGNED_UPLOAD_URL, self.OUTPUT_DOWNLOAD_GCS_URL) self.mock.serialize_and_upload_uworker_input.return_value = ( self.INPUT_SIGNED_DOWNLOAD_URL, self.OUTPUT_DOWNLOAD_GCS_URL) - self.uworker_input = uworker_msg_pb2.Input(job_type='something') def test_tworker_preprocess(self): """Tests that tworker_preprocess works as intended.""" module = mock.MagicMock(__name__='tasks.analyze_task') - module.utask_preprocess.return_value = self.uworker_input module.__name__ = 'mock_task' + + uworker_input = uworker_msg_pb2.Input(job_type='something') + module.utask_preprocess.return_value = uworker_input + + time_before = time.time_ns() + result = utasks.tworker_preprocess(module, self.TASK_ARGUMENT, self.JOB_TYPE, self.UWORKER_ENV) + time_after = time.time_ns() + module.utask_preprocess.assert_called_with(self.TASK_ARGUMENT, self.JOB_TYPE, self.UWORKER_ENV) - self.mock.serialize_and_upload_uworker_input.assert_called_with( - self.uworker_input) + + self.mock.serialize_and_upload_uworker_input.assert_called_with(uworker_input) + self.assertGreaterEqual(uworker_input.preprocess_start_time.ToNanoseconds(), time_before) + self.assertLessEqual(uworker_input.preprocess_start_time.ToNanoseconds(), time_after) + self.assertEqual( (self.INPUT_SIGNED_DOWNLOAD_URL, self.OUTPUT_DOWNLOAD_GCS_URL), result) From 169c6dab60961eed1c3b798942c8bf150b3dd573 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 29 Jan 2024 22:20:52 +0100 Subject: [PATCH 03/11] Test uworker_main metric recording. --- .../_internal/bot/tasks/utasks/__init__.py | 2 +- .../core/bot/tasks/utasks/utasks_test.py | 35 +++++++++++++++---- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py index 0a9b491ba9..11f7be4a83 100644 --- a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py +++ b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py @@ -200,7 +200,7 @@ def uworker_main(input_download_url) -> None: uworker_output_upload_url) logs.log('Finished uworker_main.') _record_e2e_duration(start, utask_module, - uworker_output.uworker_input.job_type, + uworker_input.job_type, _Subtask.UWORKER_MAIN, _Mode.BATCH) return True diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py index c9e97cd9a4..f3c289f407 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py @@ -18,8 +18,11 @@ import unittest from unittest import mock +from google.protobuf import timestamp_pb2 + from clusterfuzz._internal.bot.tasks import utasks from clusterfuzz._internal.bot.tasks.utasks import analyze_task +from clusterfuzz._internal.metrics import monitoring_metrics from clusterfuzz._internal.protos import uworker_msg_pb2 from clusterfuzz._internal.tests.test_libs import helpers @@ -109,24 +112,44 @@ def setUp(self): ]) self.module = mock.MagicMock(__name__='tasks.analyze_task') self.mock.get_utask_module.return_value = self.module - self.uworker_input = uworker_msg_pb2.Input( + + def test_uworker_main(self): + """Tests that uworker_main works as intended.""" + start_time_ns = time.time_ns() + start_timestamp = timestamp_pb2.Timestamp() + start_timestamp.FromNanoseconds(start_time_ns) + + uworker_input = uworker_msg_pb2.Input( + job_type='job_type-value', original_job_type='original_job_type-value', uworker_env=self.UWORKER_ENV, uworker_output_upload_url=self.UWORKER_OUTPUT_UPLOAD_URL, + preprocess_start_time=start_timestamp, ) - self.mock.download_and_deserialize_uworker_input.return_value = ( - self.uworker_input) + self.mock.download_and_deserialize_uworker_input.return_value = (uworker_input) - def test_uworker_main(self): - """Tests that uworker_main works as intended.""" uworker_output = { 'crash_time': 70.1, } self.module.utask_main.return_value = uworker_msg_pb2.Output( **uworker_output) input_download_url = 'http://input' + utasks.uworker_main(input_download_url) - self.module.utask_main.assert_called_with(self.uworker_input) + + end_time_ns = time.time_ns() + + self.module.utask_main.assert_called_with(uworker_input) + + durations = monitoring_metrics.UTASK_E2E_DURATION_SECS.get({ + 'task': 'analyze', + 'job': uworker_input.job_type, + 'subtask': 'uworker_main', + 'mode': 'batch', + 'platform': 'LINUX', + }) + self.assertEqual(durations.count, 1) + self.assertLess(durations.sum * 10**9, end_time_ns - start_time_ns) class GetUtaskModuleTest(unittest.TestCase): From 441f76367e3c7e55943e270e0116ef3ebba264e3 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 29 Jan 2024 22:21:16 +0100 Subject: [PATCH 04/11] Suffix *_time with _ns. --- .../_internal/tests/core/bot/tasks/utasks/utasks_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py index f3c289f407..8be6648b24 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py @@ -54,19 +54,19 @@ def test_tworker_preprocess(self): uworker_input = uworker_msg_pb2.Input(job_type='something') module.utask_preprocess.return_value = uworker_input - time_before = time.time_ns() + start_time_ns = time.time_ns() result = utasks.tworker_preprocess(module, self.TASK_ARGUMENT, self.JOB_TYPE, self.UWORKER_ENV) - time_after = time.time_ns() + end_time_ns = time.time_ns() module.utask_preprocess.assert_called_with(self.TASK_ARGUMENT, self.JOB_TYPE, self.UWORKER_ENV) self.mock.serialize_and_upload_uworker_input.assert_called_with(uworker_input) - self.assertGreaterEqual(uworker_input.preprocess_start_time.ToNanoseconds(), time_before) - self.assertLessEqual(uworker_input.preprocess_start_time.ToNanoseconds(), time_after) + self.assertGreaterEqual(uworker_input.preprocess_start_time.ToNanoseconds(), start_time_ns) + self.assertLessEqual(uworker_input.preprocess_start_time.ToNanoseconds(), end_time_ns) self.assertEqual( (self.INPUT_SIGNED_DOWNLOAD_URL, self.OUTPUT_DOWNLOAD_GCS_URL), result) From 9f1d6022b469a65a161a551877617b1126ce1acc Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 29 Jan 2024 22:23:33 +0100 Subject: [PATCH 05/11] Test preprocess metric recording. --- .../tests/core/bot/tasks/utasks/utasks_test.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py index 8be6648b24..b657d31fec 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py @@ -68,6 +68,16 @@ def test_tworker_preprocess(self): self.assertGreaterEqual(uworker_input.preprocess_start_time.ToNanoseconds(), start_time_ns) self.assertLessEqual(uworker_input.preprocess_start_time.ToNanoseconds(), end_time_ns) + durations = monitoring_metrics.UTASK_E2E_DURATION_SECS.get({ + 'task': 'mock', + 'job': self.JOB_TYPE, + 'subtask': 'preprocess', + 'mode': 'batch', + 'platform': 'LINUX', + }) + self.assertEqual(durations.count, 1) + self.assertLess(durations.sum * 10**9, end_time_ns - uworker_input.preprocess_start_time.ToNanoseconds()) + self.assertEqual( (self.INPUT_SIGNED_DOWNLOAD_URL, self.OUTPUT_DOWNLOAD_GCS_URL), result) From 98b9c3d9d136437cf973ffc807618033246fdd84 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 29 Jan 2024 22:36:39 +0100 Subject: [PATCH 06/11] Format uworker_main change. --- .../_internal/bot/tasks/utasks/__init__.py | 3 +- .../core/bot/tasks/utasks/utasks_test.py | 42 ++++++++++++++++++- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py index 11f7be4a83..64a32d8d93 100644 --- a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py +++ b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py @@ -199,8 +199,7 @@ def uworker_main(input_download_url) -> None: uworker_io.serialize_and_upload_uworker_output(uworker_output, uworker_output_upload_url) logs.log('Finished uworker_main.') - _record_e2e_duration(start, utask_module, - uworker_input.job_type, + _record_e2e_duration(start, utask_module, uworker_input.job_type, _Subtask.UWORKER_MAIN, _Mode.BATCH) return True diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py index b657d31fec..e1b16a525b 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py @@ -136,7 +136,8 @@ def test_uworker_main(self): uworker_output_upload_url=self.UWORKER_OUTPUT_UPLOAD_URL, preprocess_start_time=start_timestamp, ) - self.mock.download_and_deserialize_uworker_input.return_value = (uworker_input) + self.mock.download_and_deserialize_uworker_input.return_value = ( + uworker_input) uworker_output = { 'crash_time': 70.1, @@ -169,3 +170,42 @@ def test_get_utask_module(self): self.assertEqual(utasks.get_utask_module(module_name), analyze_task) module_name = analyze_task.__name__ self.assertEqual(utasks.get_utask_module(module_name), analyze_task) + +class TworkerPostprocessTest(unittest.TestCase): + """Tests that tworker_postprocess works as intended.""" + + def setUp(self): + helpers.patch_environ(self) + helpers.patch(self, [ + 'clusterfuzz._internal.bot.tasks.utasks.uworker_io.download_and_deserialize_uworker_output', + 'clusterfuzz._internal.bot.tasks.utasks.get_utask_module', + ]) + + def test_success(self): + download_url = 'https://uworker_output_download_url' + uworker_output = uworker_msg_pb2.Output( + uworker_input=uworker_msg_pb2.Input( + job_type='foo-job', + ), + ) + self.mock.download_and_deserialize_uworker_output.return_value = (uworker_output) + + module = mock.MagicMock(__name__='mock_task') + self.mock.get_utask_module.return_value = module + + start_time_ns = time.time_ns() + utasks.tworker_postprocess(download_url) + end_time_ns = time.time_ns() + + self.mock.download_and_deserialize_uworker_output.assert_called_with(download_url) + module.utask_postprocess.assert_called_with(uworker_output) + + durations = monitoring_metrics.UTASK_E2E_DURATION_SECS.get({ + 'task': 'mock', + 'job': 'foo-job', + 'subtask': 'postprocess', + 'mode': 'batch', + 'platform': 'LINUX', + }) + self.assertEqual(durations.count, 1) + self.assertLess(durations.sum * 10**9, end_time_ns - start_time_ns) From a81139befb3edc9a5bfa7790dad00e79411f940d Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 29 Jan 2024 22:36:53 +0100 Subject: [PATCH 07/11] Format preprocess test. --- .../tests/core/bot/tasks/utasks/utasks_test.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py index e1b16a525b..1d3450df50 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py @@ -64,9 +64,12 @@ def test_tworker_preprocess(self): module.utask_preprocess.assert_called_with(self.TASK_ARGUMENT, self.JOB_TYPE, self.UWORKER_ENV) - self.mock.serialize_and_upload_uworker_input.assert_called_with(uworker_input) - self.assertGreaterEqual(uworker_input.preprocess_start_time.ToNanoseconds(), start_time_ns) - self.assertLessEqual(uworker_input.preprocess_start_time.ToNanoseconds(), end_time_ns) + self.mock.serialize_and_upload_uworker_input.assert_called_with( + uworker_input) + self.assertGreaterEqual(uworker_input.preprocess_start_time.ToNanoseconds(), + start_time_ns) + self.assertLessEqual(uworker_input.preprocess_start_time.ToNanoseconds(), + end_time_ns) durations = monitoring_metrics.UTASK_E2E_DURATION_SECS.get({ 'task': 'mock', @@ -76,7 +79,9 @@ def test_tworker_preprocess(self): 'platform': 'LINUX', }) self.assertEqual(durations.count, 1) - self.assertLess(durations.sum * 10**9, end_time_ns - uworker_input.preprocess_start_time.ToNanoseconds()) + self.assertLess( + durations.sum * 10**9, + end_time_ns - uworker_input.preprocess_start_time.ToNanoseconds()) self.assertEqual( (self.INPUT_SIGNED_DOWNLOAD_URL, self.OUTPUT_DOWNLOAD_GCS_URL), result) From afbdf5388de0c43ae6221463422d1fc4be81faa8 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 29 Jan 2024 22:37:04 +0100 Subject: [PATCH 08/11] Format postprocess test. --- .../tests/core/bot/tasks/utasks/utasks_test.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py index 1d3450df50..dc70e112e5 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py @@ -176,6 +176,7 @@ def test_get_utask_module(self): module_name = analyze_task.__name__ self.assertEqual(utasks.get_utask_module(module_name), analyze_task) + class TworkerPostprocessTest(unittest.TestCase): """Tests that tworker_postprocess works as intended.""" @@ -189,11 +190,9 @@ def setUp(self): def test_success(self): download_url = 'https://uworker_output_download_url' uworker_output = uworker_msg_pb2.Output( - uworker_input=uworker_msg_pb2.Input( - job_type='foo-job', - ), - ) - self.mock.download_and_deserialize_uworker_output.return_value = (uworker_output) + uworker_input=uworker_msg_pb2.Input(job_type='foo-job',),) + self.mock.download_and_deserialize_uworker_output.return_value = ( + uworker_output) module = mock.MagicMock(__name__='mock_task') self.mock.get_utask_module.return_value = module @@ -202,7 +201,8 @@ def test_success(self): utasks.tworker_postprocess(download_url) end_time_ns = time.time_ns() - self.mock.download_and_deserialize_uworker_output.assert_called_with(download_url) + self.mock.download_and_deserialize_uworker_output.assert_called_with( + download_url) module.utask_postprocess.assert_called_with(uworker_output) durations = monitoring_metrics.UTASK_E2E_DURATION_SECS.get({ From 11caf2d5a4990862d0a77df4828c35b7d540c75c Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 29 Jan 2024 22:44:00 +0100 Subject: [PATCH 09/11] Docstring. --- .../_internal/tests/core/bot/tasks/utasks/utasks_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py index dc70e112e5..1c80e4cfd9 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py @@ -188,6 +188,8 @@ def setUp(self): ]) def test_success(self): + """Tests that if utask_postprocess suceeds, uworker_postprocess does too. + """ download_url = 'https://uworker_output_download_url' uworker_output = uworker_msg_pb2.Output( uworker_input=uworker_msg_pb2.Input(job_type='foo-job',),) From 2e604d1e2f713b904a49f6109d1308da18c1a548 Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 29 Jan 2024 23:03:30 +0100 Subject: [PATCH 10/11] Add e2e latency test. --- .../tests/core/bot/tasks/utasks/utasks_test.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py index 1c80e4cfd9..b80476efba 100644 --- a/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py @@ -130,7 +130,7 @@ def setUp(self): def test_uworker_main(self): """Tests that uworker_main works as intended.""" - start_time_ns = time.time_ns() + start_time_ns = time.time_ns() - 42 * 10**9 # Sometime in the past. start_timestamp = timestamp_pb2.Timestamp() start_timestamp.FromNanoseconds(start_time_ns) @@ -166,6 +166,7 @@ def test_uworker_main(self): }) self.assertEqual(durations.count, 1) self.assertLess(durations.sum * 10**9, end_time_ns - start_time_ns) + self.assertGreaterEqual(durations.sum, 42) class GetUtaskModuleTest(unittest.TestCase): @@ -191,15 +192,20 @@ def test_success(self): """Tests that if utask_postprocess suceeds, uworker_postprocess does too. """ download_url = 'https://uworker_output_download_url' + + start_time_ns = time.time_ns() - 42 * 10**9 # Sometime in the past. + start_timestamp = timestamp_pb2.Timestamp() + start_timestamp.FromNanoseconds(start_time_ns) + uworker_output = uworker_msg_pb2.Output( - uworker_input=uworker_msg_pb2.Input(job_type='foo-job',),) + uworker_input=uworker_msg_pb2.Input( + job_type='foo-job', preprocess_start_time=start_timestamp),) self.mock.download_and_deserialize_uworker_output.return_value = ( uworker_output) module = mock.MagicMock(__name__='mock_task') self.mock.get_utask_module.return_value = module - start_time_ns = time.time_ns() utasks.tworker_postprocess(download_url) end_time_ns = time.time_ns() @@ -216,3 +222,4 @@ def test_success(self): }) self.assertEqual(durations.count, 1) self.assertLess(durations.sum * 10**9, end_time_ns - start_time_ns) + self.assertGreaterEqual(durations.sum, 42) From 33b1cf99293ddb29c6a5776231033e74dea099ab Mon Sep 17 00:00:00 2001 From: Titouan Rigoudy Date: Mon, 29 Jan 2024 23:03:41 +0100 Subject: [PATCH 11/11] Fix metric recording. --- .../_internal/bot/tasks/utasks/__init__.py | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py index 64a32d8d93..ef136c14f5 100644 --- a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py +++ b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py @@ -55,9 +55,9 @@ def _timestamp_now() -> Timestamp: def _record_e2e_duration(start: Timestamp, utask_module, job_type: str, subtask: _Subtask, mode: _Mode): - duration = start.ToSeconds() - time.time() + duration_secs = (time.time_ns() - start.ToNanoseconds()) / 10**9 monitoring_metrics.UTASK_E2E_DURATION_SECS.add( - duration, { + duration_secs, { 'task': task_utils.get_command_from_module(utask_module.__name__), 'job': job_type, 'subtask': subtask.value, @@ -107,7 +107,6 @@ def tworker_preprocess_no_io(utask_module, task_argument, job_type, def uworker_main_no_io(utask_module, serialized_uworker_input): """Exectues the main part of a utask on the uworker (locally if not using remote executor).""" - start = _timestamp_now() logs.log('Starting utask_main: %s.' % utask_module) uworker_input = uworker_io.deserialize_uworker_input(serialized_uworker_input) @@ -118,8 +117,9 @@ def uworker_main_no_io(utask_module, serialized_uworker_input): if uworker_output is None: return None result = uworker_io.serialize_uworker_output(uworker_output) - _record_e2e_duration(start, utask_module, uworker_input.job_type, - _Subtask.UWORKER_MAIN, _Mode.QUEUE) + _record_e2e_duration(uworker_input.preprocess_start_time, utask_module, + uworker_input.job_type, _Subtask.UWORKER_MAIN, + _Mode.QUEUE) return result @@ -128,15 +128,15 @@ def tworker_postprocess_no_io(utask_module, uworker_output, uworker_input): the same bot as the uworker).""" # TODO(metzman): Stop passing module to this function and uworker_main_no_io. # Make them consistent with the I/O versions. - start = _timestamp_now() uworker_output = uworker_io.deserialize_uworker_output(uworker_output) # Do this to simulate out-of-band tamper-proof storage of the input. uworker_input = uworker_io.deserialize_uworker_input(uworker_input) uworker_output.uworker_input.CopyFrom(uworker_input) set_uworker_env(uworker_output.uworker_input.uworker_env) utask_module.utask_postprocess(uworker_output) - _record_e2e_duration(start, utask_module, uworker_input.job_type, - _Subtask.POSTPROCESS, _Mode.QUEUE) + _record_e2e_duration(uworker_input.preprocess_start_time, utask_module, + uworker_input.job_type, _Subtask.POSTPROCESS, + _Mode.QUEUE) def tworker_preprocess(utask_module, task_argument, job_type, uworker_env): @@ -183,7 +183,6 @@ def set_uworker_env(uworker_env: dict) -> None: def uworker_main(input_download_url) -> None: """Exectues the main part of a utask on the uworker (locally if not using remote executor).""" - start = _timestamp_now() uworker_input = uworker_io.download_and_deserialize_uworker_input( input_download_url) uworker_output_upload_url = uworker_input.uworker_output_upload_url @@ -199,8 +198,9 @@ def uworker_main(input_download_url) -> None: uworker_io.serialize_and_upload_uworker_output(uworker_output, uworker_output_upload_url) logs.log('Finished uworker_main.') - _record_e2e_duration(start, utask_module, uworker_input.job_type, - _Subtask.UWORKER_MAIN, _Mode.BATCH) + _record_e2e_duration(uworker_input.preprocess_start_time, utask_module, + uworker_input.job_type, _Subtask.UWORKER_MAIN, + _Mode.BATCH) return True @@ -218,12 +218,11 @@ def uworker_bot_main(): def tworker_postprocess(output_download_url) -> None: """Executes the postprocess step on the trusted (t)worker.""" - start = _timestamp_now() uworker_output = uworker_io.download_and_deserialize_uworker_output( output_download_url) set_uworker_env(uworker_output.uworker_input.uworker_env) utask_module = get_utask_module(uworker_output.uworker_input.module_name) utask_module.utask_postprocess(uworker_output) - job_type = uworker_output.uworker_input.job_type - _record_e2e_duration(start, utask_module, job_type, _Subtask.POSTPROCESS, - _Mode.BATCH) + _record_e2e_duration(uworker_output.uworker_input.preprocess_start_time, + utask_module, uworker_output.uworker_input.job_type, + _Subtask.POSTPROCESS, _Mode.BATCH)