Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix e2e subtask duration metric collection #3698

Merged
merged 11 commits into from
Jan 30, 2024
34 changes: 16 additions & 18 deletions src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ def _timestamp_now() -> Timestamp:

def _record_e2e_duration(start: Timestamp, utask_module, job_type: str,
subtask: _Subtask, mode: _Mode):
duration = start.ToSeconds() - time.time()
duration_secs = (time.time_ns() - start.ToNanoseconds()) / 10**9
monitoring_metrics.UTASK_E2E_DURATION_SECS.add(
duration, {
duration_secs, {
'task': task_utils.get_command_from_module(utask_module.__name__),
'job': job_type,
'subtask': subtask,
'mode': mode,
'subtask': subtask.value,
'mode': mode.value,
'platform': environment.platform(),
})

Expand Down Expand Up @@ -107,7 +107,6 @@ def tworker_preprocess_no_io(utask_module, task_argument, job_type,
def uworker_main_no_io(utask_module, serialized_uworker_input):
"""Exectues the main part of a utask on the uworker (locally if not using
remote executor)."""
start = _timestamp_now()
logs.log('Starting utask_main: %s.' % utask_module)
uworker_input = uworker_io.deserialize_uworker_input(serialized_uworker_input)

Expand All @@ -118,8 +117,9 @@ def uworker_main_no_io(utask_module, serialized_uworker_input):
if uworker_output is None:
return None
result = uworker_io.serialize_uworker_output(uworker_output)
_record_e2e_duration(start, utask_module, uworker_input.job_type,
_Subtask.UWORKER_MAIN, _Mode.QUEUE)
_record_e2e_duration(uworker_input.preprocess_start_time, utask_module,
uworker_input.job_type, _Subtask.UWORKER_MAIN,
_Mode.QUEUE)
return result


Expand All @@ -128,15 +128,15 @@ def tworker_postprocess_no_io(utask_module, uworker_output, uworker_input):
the same bot as the uworker)."""
# TODO(metzman): Stop passing module to this function and uworker_main_no_io.
# Make them consistent with the I/O versions.
start = _timestamp_now()
uworker_output = uworker_io.deserialize_uworker_output(uworker_output)
# Do this to simulate out-of-band tamper-proof storage of the input.
uworker_input = uworker_io.deserialize_uworker_input(uworker_input)
uworker_output.uworker_input.CopyFrom(uworker_input)
set_uworker_env(uworker_output.uworker_input.uworker_env)
utask_module.utask_postprocess(uworker_output)
_record_e2e_duration(start, utask_module, uworker_input.job_type,
_Subtask.POSTPROCESS, _Mode.QUEUE)
_record_e2e_duration(uworker_input.preprocess_start_time, utask_module,
uworker_input.job_type, _Subtask.POSTPROCESS,
_Mode.QUEUE)


def tworker_preprocess(utask_module, task_argument, job_type, uworker_env):
Expand Down Expand Up @@ -183,7 +183,6 @@ def set_uworker_env(uworker_env: dict) -> None:
def uworker_main(input_download_url) -> None:
"""Exectues the main part of a utask on the uworker (locally if not using
remote executor)."""
start = _timestamp_now()
uworker_input = uworker_io.download_and_deserialize_uworker_input(
input_download_url)
uworker_output_upload_url = uworker_input.uworker_output_upload_url
Expand All @@ -199,9 +198,9 @@ def uworker_main(input_download_url) -> None:
uworker_io.serialize_and_upload_uworker_output(uworker_output,
uworker_output_upload_url)
logs.log('Finished uworker_main.')
_record_e2e_duration(start, utask_module,
uworker_output.uworker_input.job_type,
_Subtask.UWORKER_MAIN, _Mode.BATCH)
_record_e2e_duration(uworker_input.preprocess_start_time, utask_module,
uworker_input.job_type, _Subtask.UWORKER_MAIN,
_Mode.BATCH)
return True


Expand All @@ -219,12 +218,11 @@ def uworker_bot_main():

def tworker_postprocess(output_download_url) -> None:
"""Executes the postprocess step on the trusted (t)worker."""
start = _timestamp_now()
uworker_output = uworker_io.download_and_deserialize_uworker_output(
output_download_url)
set_uworker_env(uworker_output.uworker_input.uworker_env)
utask_module = get_utask_module(uworker_output.uworker_input.module_name)
utask_module.utask_postprocess(uworker_output)
job_type = uworker_output.uworker_input.job_type
_record_e2e_duration(start, utask_module, job_type, _Subtask.POSTPROCESS,
_Mode.BATCH)
_record_e2e_duration(uworker_output.uworker_input.preprocess_start_time,
utask_module, uworker_output.uworker_input.job_type,
_Subtask.POSTPROCESS, _Mode.BATCH)
113 changes: 105 additions & 8 deletions src/clusterfuzz/_internal/tests/core/bot/tasks/utasks/utasks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
"""Tests for uworker_io."""

import os
import time
import unittest
from unittest import mock

from google.protobuf import timestamp_pb2

from clusterfuzz._internal.bot.tasks import utasks
from clusterfuzz._internal.bot.tasks.utasks import analyze_task
from clusterfuzz._internal.metrics import monitoring_metrics
from clusterfuzz._internal.protos import uworker_msg_pb2
from clusterfuzz._internal.tests.test_libs import helpers

Expand All @@ -41,20 +45,44 @@ def setUp(self):
self.OUTPUT_SIGNED_UPLOAD_URL, self.OUTPUT_DOWNLOAD_GCS_URL)
self.mock.serialize_and_upload_uworker_input.return_value = (
self.INPUT_SIGNED_DOWNLOAD_URL, self.OUTPUT_DOWNLOAD_GCS_URL)
self.uworker_input = uworker_msg_pb2.Input(job_type='something')

def test_tworker_preprocess(self):
"""Tests that tworker_preprocess works as intended."""
module = mock.MagicMock(__name__='tasks.analyze_task')
module.utask_preprocess.return_value = self.uworker_input
module.__name__ = 'mock_task'

uworker_input = uworker_msg_pb2.Input(job_type='something')
module.utask_preprocess.return_value = uworker_input

start_time_ns = time.time_ns()

result = utasks.tworker_preprocess(module, self.TASK_ARGUMENT,
self.JOB_TYPE, self.UWORKER_ENV)

end_time_ns = time.time_ns()

module.utask_preprocess.assert_called_with(self.TASK_ARGUMENT,
self.JOB_TYPE, self.UWORKER_ENV)

self.mock.serialize_and_upload_uworker_input.assert_called_with(
self.uworker_input)
uworker_input)
self.assertGreaterEqual(uworker_input.preprocess_start_time.ToNanoseconds(),
start_time_ns)
self.assertLessEqual(uworker_input.preprocess_start_time.ToNanoseconds(),
end_time_ns)

durations = monitoring_metrics.UTASK_E2E_DURATION_SECS.get({
'task': 'mock',
'job': self.JOB_TYPE,
'subtask': 'preprocess',
'mode': 'batch',
'platform': 'LINUX',
})
self.assertEqual(durations.count, 1)
self.assertLess(
durations.sum * 10**9,
end_time_ns - uworker_input.preprocess_start_time.ToNanoseconds())

self.assertEqual(
(self.INPUT_SIGNED_DOWNLOAD_URL, self.OUTPUT_DOWNLOAD_GCS_URL), result)

Expand Down Expand Up @@ -99,24 +127,46 @@ def setUp(self):
])
self.module = mock.MagicMock(__name__='tasks.analyze_task')
self.mock.get_utask_module.return_value = self.module
self.uworker_input = uworker_msg_pb2.Input(

def test_uworker_main(self):
"""Tests that uworker_main works as intended."""
start_time_ns = time.time_ns() - 42 * 10**9 # Sometime in the past.
start_timestamp = timestamp_pb2.Timestamp()
start_timestamp.FromNanoseconds(start_time_ns)

uworker_input = uworker_msg_pb2.Input(
job_type='job_type-value',
original_job_type='original_job_type-value',
uworker_env=self.UWORKER_ENV,
uworker_output_upload_url=self.UWORKER_OUTPUT_UPLOAD_URL,
preprocess_start_time=start_timestamp,
)
self.mock.download_and_deserialize_uworker_input.return_value = (
self.uworker_input)
uworker_input)

def test_uworker_main(self):
"""Tests that uworker_main works as intended."""
uworker_output = {
'crash_time': 70.1,
}
self.module.utask_main.return_value = uworker_msg_pb2.Output(
**uworker_output)
input_download_url = 'http://input'

utasks.uworker_main(input_download_url)
self.module.utask_main.assert_called_with(self.uworker_input)

end_time_ns = time.time_ns()

self.module.utask_main.assert_called_with(uworker_input)

durations = monitoring_metrics.UTASK_E2E_DURATION_SECS.get({
'task': 'analyze',
'job': uworker_input.job_type,
'subtask': 'uworker_main',
'mode': 'batch',
'platform': 'LINUX',
})
self.assertEqual(durations.count, 1)
self.assertLess(durations.sum * 10**9, end_time_ns - start_time_ns)
self.assertGreaterEqual(durations.sum, 42)


class GetUtaskModuleTest(unittest.TestCase):
Expand All @@ -126,3 +176,50 @@ def test_get_utask_module(self):
self.assertEqual(utasks.get_utask_module(module_name), analyze_task)
module_name = analyze_task.__name__
self.assertEqual(utasks.get_utask_module(module_name), analyze_task)


class TworkerPostprocessTest(unittest.TestCase):
"""Tests that tworker_postprocess works as intended."""

def setUp(self):
helpers.patch_environ(self)
helpers.patch(self, [
'clusterfuzz._internal.bot.tasks.utasks.uworker_io.download_and_deserialize_uworker_output',
'clusterfuzz._internal.bot.tasks.utasks.get_utask_module',
])

def test_success(self):
"""Tests that if utask_postprocess suceeds, uworker_postprocess does too.
"""
download_url = 'https://uworker_output_download_url'

start_time_ns = time.time_ns() - 42 * 10**9 # Sometime in the past.
start_timestamp = timestamp_pb2.Timestamp()
start_timestamp.FromNanoseconds(start_time_ns)

uworker_output = uworker_msg_pb2.Output(
uworker_input=uworker_msg_pb2.Input(
job_type='foo-job', preprocess_start_time=start_timestamp),)
self.mock.download_and_deserialize_uworker_output.return_value = (
uworker_output)

module = mock.MagicMock(__name__='mock_task')
self.mock.get_utask_module.return_value = module

utasks.tworker_postprocess(download_url)
end_time_ns = time.time_ns()

self.mock.download_and_deserialize_uworker_output.assert_called_with(
download_url)
module.utask_postprocess.assert_called_with(uworker_output)

durations = monitoring_metrics.UTASK_E2E_DURATION_SECS.get({
'task': 'mock',
'job': 'foo-job',
'subtask': 'postprocess',
'mode': 'batch',
'platform': 'LINUX',
})
self.assertEqual(durations.count, 1)
self.assertLess(durations.sum * 10**9, end_time_ns - start_time_ns)
self.assertGreaterEqual(durations.sum, 42)
Loading