Skip to content

Commit

Permalink
fix: add repo_id kwarg for parallel upload processing (#371)
Browse files Browse the repository at this point in the history
* add repoid kwarg

* update shared
  • Loading branch information
daniel-codecov authored Apr 9, 2024
1 parent 142d1ec commit c667b44
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 13 deletions.
2 changes: 1 addition & 1 deletion requirements.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
https://github.com/codecov/shared/archive/93ac35100e3e94cb29979e2147b8f9680a68368e.tar.gz#egg=shared
https://github.com/codecov/shared/archive/bf973edc6c0438ac4071dbefc08e055c87d720c8.tar.gz#egg=shared
https://github.com/codecov/opentelem-python/archive/refs/tags/v0.0.4a1.tar.gz#egg=codecovopentelem
https://github.com/codecov/test-results-parser/archive/5515e960d5d38881036e9127f86320efca649f13.tar.gz#egg=test-results-parser
boto3
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ sentry-sdk==1.40.0
# via
# -r requirements.in
# shared
shared @ https://github.com/codecov/shared/archive/93ac35100e3e94cb29979e2147b8f9680a68368e.tar.gz
shared @ https://github.com/codecov/shared/archive/bf973edc6c0438ac4071dbefc08e055c87d720c8.tar.gz
# via -r requirements.in
six==1.15.0
# via
Expand Down
2 changes: 1 addition & 1 deletion services/report/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ async def initialize_and_save_report(
# finisher can build off of it later. Makes the assumption that the CFFs occupy the first
# j to i session ids where i is the max id of the CFFs and j is some integer less than i.
if await PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value_async(
commit.repository.repoid
repo_id=commit.repository.repoid
):
await self.save_parallel_report_to_archive(
commit, report, report_code
Expand Down
9 changes: 6 additions & 3 deletions tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,8 +654,9 @@ def _schedule_coverage_processing_task(
),
)
processing_tasks.append(sig)

if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(commit.repository.repoid):
if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
repo_id=commit.repository.repoid
):
parallel_chunk_size = 1
num_sessions = len(argument_list)
redis_key = get_parallel_upload_processing_session_counter_redis_key(
Expand Down Expand Up @@ -729,7 +730,9 @@ def _schedule_coverage_processing_task(
processing_tasks.append(finish_sig)
serial_tasks = chain(*processing_tasks)

if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(commit.repository.repoid):
if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
repo_id=commit.repository.repoid
):
parallel_tasks = chord(parallel_processing_tasks, finish_parallel_sig)
parallel_shadow_experiment = serial_tasks | parallel_tasks
res = parallel_shadow_experiment.apply_async()
Expand Down
2 changes: 1 addition & 1 deletion tasks/upload_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def run_impl(
repository = commit.repository

if (
PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repository.repoid)
PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repo_id=repository.repoid)
and in_parallel
):
actual_processing_results = {
Expand Down
24 changes: 18 additions & 6 deletions tasks/upload_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ def run_impl(
extra=dict(repoid=repoid, commit=commitid),
)

if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repoid) and in_parallel:
if (
PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repo_id=repoid)
and in_parallel
):
log.info(
"Using parallel upload processing, skip acquiring upload processing lock",
extra=dict(
Expand Down Expand Up @@ -179,7 +182,10 @@ def process_impl_within_lock(
in_parallel=False,
**kwargs,
):
if not PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repoid) and in_parallel:
if (
not PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repo_id=repoid)
and in_parallel
):
log.info(
"Obtained upload processing lock, starting",
extra=dict(
Expand All @@ -206,7 +212,7 @@ def process_impl_within_lock(
report_service = ReportService(commit_yaml)

if (
PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repository.repoid)
PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repo_id=repository.repoid)
and in_parallel
):
log.info(
Expand Down Expand Up @@ -299,7 +305,9 @@ def process_impl_within_lock(
parallel_incremental_result = None
results_dict = {}
if (
PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repository.repoid)
PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
repo_id=repository.repoid
)
and in_parallel
):
with metrics.timer(
Expand Down Expand Up @@ -328,7 +336,9 @@ def process_impl_within_lock(
# the parallel flow runs second, it parses the human readable artifacts instead
# since the serial flow already rewrote it
if not (
PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repository.repoid)
PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
repo_id=repository.repoid
)
and in_parallel
):
deleted_archive = self._possibly_delete_archive(
Expand Down Expand Up @@ -357,7 +367,9 @@ def process_impl_within_lock(
}

if (
PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(repository.repoid)
PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
repo_id=repository.repoid
)
and in_parallel
):
result["parallel_incremental_result"] = parallel_incremental_result
Expand Down

0 comments on commit c667b44

Please sign in to comment.