diff --git a/src/clusterfuzz/_internal/base/tasks.py b/src/clusterfuzz/_internal/base/tasks.py index 900ff7eb6d..77a0c23c87 100644 --- a/src/clusterfuzz/_internal/base/tasks.py +++ b/src/clusterfuzz/_internal/base/tasks.py @@ -122,6 +122,11 @@ def default_queue_suffix(): return queue_suffix_for_platform(environment.platform()) +def generic_android_queue(prefix=JOBS_PREFIX): + """Get the generic 'android' queue that is not tied to a specific device.""" + return prefix + queue_suffix_for_platform(environment.platform()) + + def regular_queue(prefix=JOBS_PREFIX): """Get the regular jobs queue.""" return prefix + default_queue_suffix() @@ -190,6 +195,21 @@ def get_regular_task(queue=None): return task +def get_android_regular_task(): + """Get an android regular task.""" + if not environment.get_value('CF_ALLOWS_LISTENING_GENERIC_QUEUE') or \ + not environment.get_value('QUEUE_OVERRIDE'): + return get_regular_task() + + # We first try to grab a task on the regular default queue, which is the + # device specific subqueue. + task = get_regular_task(queue=regular_queue()) + if not task: + # We then try the generic queue. + task = get_regular_task(queue=generic_android_queue()) + return task + + def get_machine_template_for_queue(queue_name): """Gets the machine template for the instance used to execute a task from |queue_name|. This will be used by tworkers to schedule the appropriate @@ -316,7 +336,10 @@ def get_task(): if task: return task - task = get_regular_task() + if environment.is_android(): + task = get_android_regular_task() + else: + task = get_regular_task() if task: # Log the task details for debug purposes. logs.log(f'Got task with cmd {task.command} args {task.argument} ' diff --git a/src/clusterfuzz/_internal/bot/tasks/commands.py b/src/clusterfuzz/_internal/bot/tasks/commands.py index c1955aa4ba..c74ca3392b 100644 --- a/src/clusterfuzz/_internal/bot/tasks/commands.py +++ b/src/clusterfuzz/_internal/bot/tasks/commands.py @@ -73,6 +73,19 @@ class AlreadyRunningError(Error): """Exception raised for a task that is already running on another bot.""" +def get_queue_for_testcase(testcase_platform_id, current_platform_id): + """Gets the queue where to schedule the task on. Returns None if the default + regular queue should be used. + """ + if environment.is_android(testcase_platform_id.upper()): + # FIXME: Handle these imports in a cleaner way. + from clusterfuzz._internal.platforms import android + if android.util.should_schedule_on_generic_queue(testcase_platform_id, + current_platform_id): + return tasks.generic_android_queue() + return None + + def cleanup_task_state(): """Cleans state before and after a task is executed.""" # Cleanup stale processes. @@ -339,10 +352,16 @@ def process_command_impl(task_name, logs.log('Testcase %d platform (%s) does not match with ours\ (%s), exiting' % (testcase.key.id(), testcase_platform_id, current_platform_id)) + # If this happens on android, we need to make sure we're not + # scheduling this task on a subqueue, since we know the current + # platform doesn't match. + queue = get_queue_for_testcase(testcase_platform_id, + current_platform_id) tasks.add_task( task_name, task_argument, job_name, + queue=queue, wait_time=utils.random_number(1, TASK_RETRY_WAIT_LIMIT)) return None diff --git a/src/clusterfuzz/_internal/platforms/android/util.py b/src/clusterfuzz/_internal/platforms/android/util.py index bec749f80a..32419e61c6 100644 --- a/src/clusterfuzz/_internal/platforms/android/util.py +++ b/src/clusterfuzz/_internal/platforms/android/util.py @@ -82,3 +82,24 @@ def can_testcase_run_on_platform(testcase_platform_id, current_platform_id): return True return False + + +def should_schedule_on_generic_queue(testcase_platform_id, current_platform_id): + """Whether the testcase should be scheduled on the generic queue or the + default queue.""" + if not environment.get_value('QUEUE_OVERRIDE'): + return False + + testcase_fields = testcase_platform_id.split(':') + current_platform_fields = current_platform_id.split(':') + + if len(testcase_fields) != 3 or len(current_platform_fields) != 3: + return False + + testcase_codename = testcase_fields[1].split('_') + current_platform_codename = current_platform_fields.split('_') + + if len(testcase_codename) != 2 or len(current_platform_codename) != 2: + return False + + return testcase_codename[0] != testcase_codename[1] diff --git a/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py b/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py index 2a99dc9dfb..6a4d14a064 100644 --- a/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py @@ -381,3 +381,59 @@ def test_not_progression(self): messages = self.client.pull_from_subscription( self.subscription, max_messages=1) self.assertEqual(0, len(messages)) + + +@test_utils.integration +@test_utils.with_cloud_emulators('datastore', 'pubsub') +class GetTaskAndroidTest(unittest.TestCase): + """GetTask tests.""" + + def setUp(self): + helpers.patch_environ(self) + helpers.patch(self, [ + 'clusterfuzz._internal.base.persistent_cache.get_value', + 'clusterfuzz._internal.base.persistent_cache.set_value', + 'clusterfuzz._internal.base.utils.utcnow', + 'time.sleep', + ]) + + self.mock.get_value.return_value = None + self.mock.sleep.return_value = None + data_types.Job(name='job').put() + + client = pubsub.PubSubClient() + topic = pubsub.topic_name('test-clusterfuzz', 'jobs-android') + client.create_topic(topic) + client.create_subscription( + pubsub.subscription_name('test-clusterfuzz', 'jobs-android'), topic) + + topic = pubsub.topic_name('test-clusterfuzz', 'jobs-android-pixel8') + client.create_topic(topic) + client.create_subscription( + pubsub.subscription_name('test-clusterfuzz', 'jobs-android-pixel8'), + topic) + + self.mock.utcnow.return_value = test_utils.CURRENT_TIME.replace( + microsecond=0) + environment.set_value('OS_OVERRIDE', 'ANDROID') + + def test_device_specific_task(self): + """Test that device specific tasks are correctly scheduled and pulled.""" + environment.set_value('QUEUE_OVERRIDE', 'ANDROID:PIXEL8') + tasks.add_task('test', 'normal', 'job', wait_time=0) + task = tasks.get_task() + self.assertEqual('test', task.command) + self.assertEqual('normal', task.argument) + self.assertEqual('job', task.job) + self.assertEqual('test normal job', task.payload()) + + def test_generic_task_is_pulled(self): + """Test that generic tasks are pulled when QUEUE_OVERRIDE is set.""" + environment.set_value('CF_ALLOWS_LISTENING_GENERIC_QUEUE', 1) + environment.set_value('QUEUE_OVERRIDE', 'ANDROID:PIXEL8') + tasks.add_task('test', 'normal', 'job', queue='jobs-android', wait_time=0) + task = tasks.get_task() + self.assertEqual('test', task.command) + self.assertEqual('normal', task.argument) + self.assertEqual('job', task.job) + self.assertEqual('test normal job', task.payload())