From c667b44aac4f81b26dda6fb7ce5ab9ee61d7852b Mon Sep 17 00:00:00 2001 From: daniel-codecov <159859649+daniel-codecov@users.noreply.github.com> Date: Tue, 9 Apr 2024 13:49:23 -0400 Subject: [PATCH] fix: add repo_id kwarg for parallel upload processing (#371) * add repoid kwarg * update shared --- requirements.in | 2 +- requirements.txt | 2 +- services/report/__init__.py | 2 +- tasks/upload.py | 9 ++++++--- tasks/upload_finisher.py | 2 +- tasks/upload_processor.py | 24 ++++++++++++++++++------ 6 files changed, 28 insertions(+), 13 deletions(-) diff --git a/requirements.in b/requirements.in index b66caefb4..b3f3f926b 100644 --- a/requirements.in +++ b/requirements.in @@ -1,4 +1,4 @@ -https://github.com/codecov/shared/archive/93ac35100e3e94cb29979e2147b8f9680a68368e.tar.gz#egg=shared +https://github.com/codecov/shared/archive/bf973edc6c0438ac4071dbefc08e055c87d720c8.tar.gz#egg=shared https://github.com/codecov/opentelem-python/archive/refs/tags/v0.0.4a1.tar.gz#egg=codecovopentelem https://github.com/codecov/test-results-parser/archive/5515e960d5d38881036e9127f86320efca649f13.tar.gz#egg=test-results-parser boto3 diff --git a/requirements.txt b/requirements.txt index 4368aa380..f733c8b4d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -368,7 +368,7 @@ sentry-sdk==1.40.0 # via # -r requirements.in # shared -shared @ https://github.com/codecov/shared/archive/93ac35100e3e94cb29979e2147b8f9680a68368e.tar.gz +shared @ https://github.com/codecov/shared/archive/bf973edc6c0438ac4071dbefc08e055c87d720c8.tar.gz # via -r requirements.in six==1.15.0 # via diff --git a/services/report/__init__.py b/services/report/__init__.py index c84e8dce0..dff01c32e 100644 --- a/services/report/__init__.py +++ b/services/report/__init__.py @@ -291,7 +291,7 @@ async def initialize_and_save_report( # finisher can build off of it later. Makes the assumption that the CFFs occupy the first # j to i session ids where i is the max id of the CFFs and j is some integer less than i. if await PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value_async( - commit.repository.repoid + repo_id=commit.repository.repoid ): await self.save_parallel_report_to_archive( commit, report, report_code diff --git a/tasks/upload.py b/tasks/upload.py index 47ee2e14a..71a27b29e 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -654,8 +654,9 @@ def _schedule_coverage_processing_task( ), ) processing_tasks.append(sig) - - if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(commit.repository.repoid): + if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( + repo_id=commit.repository.repoid + ): parallel_chunk_size = 1 num_sessions = len(argument_list) redis_key = get_parallel_upload_processing_session_counter_redis_key( @@ -729,7 +730,9 @@ def _schedule_coverage_processing_task( processing_tasks.append(finish_sig) serial_tasks = chain(*processing_tasks) - if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(commit.repository.repoid): + if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( + repo_id=commit.repository.repoid + ): parallel_tasks = chord(parallel_processing_tasks, finish_parallel_sig) parallel_shadow_experiment = serial_tasks | parallel_tasks res = parallel_shadow_experiment.apply_async() diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index a986ff4a8..2bbe177e1 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -93,7 +93,7 @@ def run_impl( repository = commit.repository if ( - PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repository.repoid) + PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repo_id=repository.repoid) and in_parallel ): actual_processing_results = { diff --git a/tasks/upload_processor.py b/tasks/upload_processor.py index 4bac75130..3729a0630 100644 --- a/tasks/upload_processor.py +++ b/tasks/upload_processor.py @@ -91,7 +91,10 @@ def run_impl( extra=dict(repoid=repoid, commit=commitid), ) - if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repoid) and in_parallel: + if ( + PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repo_id=repoid) + and in_parallel + ): log.info( "Using parallel upload processing, skip acquiring upload processing lock", extra=dict( @@ -179,7 +182,10 @@ def process_impl_within_lock( in_parallel=False, **kwargs, ): - if not PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repoid) and in_parallel: + if ( + not PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repo_id=repoid) + and in_parallel + ): log.info( "Obtained upload processing lock, starting", extra=dict( @@ -206,7 +212,7 @@ def process_impl_within_lock( report_service = ReportService(commit_yaml) if ( - PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repository.repoid) + PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repo_id=repository.repoid) and in_parallel ): log.info( @@ -299,7 +305,9 @@ def process_impl_within_lock( parallel_incremental_result = None results_dict = {} if ( - PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repository.repoid) + PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( + repo_id=repository.repoid + ) and in_parallel ): with metrics.timer( @@ -328,7 +336,9 @@ def process_impl_within_lock( # the parallel flow runs second, it parses the human readable artifacts instead # since the serial flow already rewrote it if not ( - PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repository.repoid) + PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( + repo_id=repository.repoid + ) and in_parallel ): deleted_archive = self._possibly_delete_archive( @@ -357,7 +367,9 @@ def process_impl_within_lock( } if ( - PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repository.repoid) + PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( + repo_id=repository.repoid + ) and in_parallel ): result["parallel_incremental_result"] = parallel_incremental_result