From 3da93431e0b1bd18d27e9e0089d77e572b566860 Mon Sep 17 00:00:00 2001 From: Avrohom Gottlieb Date: Fri, 6 Dec 2024 14:42:53 -0500 Subject: [PATCH 1/8] implement queue chunking in s3::download_input_files --- api/scpca_portal/s3.py | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/api/scpca_portal/s3.py b/api/scpca_portal/s3.py index 1154b782..536eaf34 100644 --- a/api/scpca_portal/s3.py +++ b/api/scpca_portal/s3.py @@ -13,6 +13,8 @@ logger = get_and_configure_logger(__name__) aws_s3 = boto3.client("s3", config=Config(signature_version="s3v4")) +MAX_QUEUE_CHUNK_SIZE = 250 + def list_input_paths( relative_path: Path, @@ -89,17 +91,28 @@ def download_input_files(file_paths: List[Path], bucket_name: str) -> bool: if not download_queue: return True - command_parts.append("--exclude=*") - command_parts.extend([f"--include={file_path}" for file_path in download_queue]) + while download_queue: + chunk_size = ( + MAX_QUEUE_CHUNK_SIZE + if len(download_queue) > MAX_QUEUE_CHUNK_SIZE + else len(download_queue) + ) - if "public-test" in bucket_name: - command_parts.append("--no-sign-request") + command_parts.append("--exclude=*") + command_parts.extend( + [f"--include={file_path}" for file_path in download_queue[:chunk_size]] + ) - try: - subprocess.check_call(command_parts) - except subprocess.CalledProcessError as error: - logger.error(f"Data files failed to download due to the following error:\n\t{error}") - return False + if "public-test" in bucket_name: + command_parts.append("--no-sign-request") + + try: + subprocess.check_call(command_parts) + except subprocess.CalledProcessError as error: + logger.error(f"Data files failed to download due to the following error:\n\t{error}") + return False + + download_queue = download_queue[chunk_size:] return True From fca811ad8a781b1644bcecfbf3e1eca1b0542851 Mon Sep 17 00:00:00 2001 From: Avrohom Gottlieb Date: Fri, 6 Dec 2024 15:38:58 -0500 Subject: [PATCH 2/8] debugging aws s3 sync command --- api/scpca_portal/config/logging.py | 2 +- api/scpca_portal/loader.py | 5 +++- .../commands/download_input_files.py | 29 +++++++++++++++++++ api/scpca_portal/s3.py | 12 ++++---- 4 files changed, 41 insertions(+), 7 deletions(-) create mode 100644 api/scpca_portal/management/commands/download_input_files.py diff --git a/api/scpca_portal/config/logging.py b/api/scpca_portal/config/logging.py index ea69b0ed..7c4f98b9 100644 --- a/api/scpca_portal/config/logging.py +++ b/api/scpca_portal/config/logging.py @@ -21,7 +21,7 @@ def get_thread_id() -> str: "%(asctime)s {0} %(name)s %(color)s%(levelname)s%(extras)s" ": %(message)s%(color_stop)s" ).format(get_thread_id()) LOG_LEVEL = None -LOG_RUNTIMES = os.getenv("LOG_RUNTIMES", False) +LOG_RUNTIMES = os.getenv("LOG_RUNTIMES", True) def unconfigure_root_logger(): diff --git a/api/scpca_portal/loader.py b/api/scpca_portal/loader.py index f5abc638..c8248412 100644 --- a/api/scpca_portal/loader.py +++ b/api/scpca_portal/loader.py @@ -8,7 +8,7 @@ from django.template.defaultfilters import pluralize from scpca_portal import metadata_file, s3 -from scpca_portal.config.logging import get_and_configure_logger +from scpca_portal.config.logging import configure_runtime_logging, get_and_configure_logger from scpca_portal.models import ( ComputedFile, Contact, @@ -19,6 +19,7 @@ ) logger = get_and_configure_logger(__name__) +log_runtime = configure_runtime_logging(logger) def prep_data_dirs(wipe_input_dir: bool = False, wipe_output_dir: bool = True) -> None: @@ -142,6 +143,7 @@ def create_project( return project +@log_runtime def _create_computed_file( computed_file: ComputedFile, update_s3: bool, clean_up_output_data: bool ) -> None: @@ -172,6 +174,7 @@ def _create_computed_file_callback(future, *, update_s3: bool, clean_up_output_d connection.close() +@log_runtime def generate_computed_file( *, download_config: Dict, diff --git a/api/scpca_portal/management/commands/download_input_files.py b/api/scpca_portal/management/commands/download_input_files.py new file mode 100644 index 00000000..d2416683 --- /dev/null +++ b/api/scpca_portal/management/commands/download_input_files.py @@ -0,0 +1,29 @@ +import logging + +from django.core.management.base import BaseCommand + +from scpca_portal import common, loader, s3 +from scpca_portal.models import Project + +logger = logging.getLogger() +logger.setLevel(logging.INFO) +logger.addHandler(logging.StreamHandler()) + + +class Command(BaseCommand): + def handle(self, *args, **kwargs): + loader.prep_data_dirs() + + project = Project.objects.filter(scpca_id="SCPCP000006").first() + + download_config = common.PROJECT_DOWNLOAD_CONFIGS["SPATIAL_SINGLE_CELL_EXPERIMENT"] + libraries = project.get_libraries(download_config) + + library_data_file_paths = [ + fp for lib in libraries for fp in lib.get_download_config_file_paths(download_config) + ] + project_data_file_paths = project.get_download_config_file_paths(download_config) + + s3.download_input_files( + library_data_file_paths + project_data_file_paths, project.s3_input_bucket + ) diff --git a/api/scpca_portal/s3.py b/api/scpca_portal/s3.py index 536eaf34..9bb3bd88 100644 --- a/api/scpca_portal/s3.py +++ b/api/scpca_portal/s3.py @@ -8,9 +8,11 @@ import boto3 from botocore.client import Config -from scpca_portal.config.logging import get_and_configure_logger +from scpca_portal.config.logging import configure_runtime_logging, get_and_configure_logger logger = get_and_configure_logger(__name__) +log_runtime = configure_runtime_logging(logger) + aws_s3 = boto3.client("s3", config=Config(signature_version="s3v4")) MAX_QUEUE_CHUNK_SIZE = 250 @@ -82,11 +84,12 @@ def list_input_paths( return file_paths +@log_runtime def download_input_files(file_paths: List[Path], bucket_name: str) -> bool: """Download all passed data file paths which have not previously been downloaded.'""" - command_parts = ["aws", "s3", "sync", f"s3://{bucket_name}", settings.INPUT_DATA_PATH] download_queue = [fp for fp in file_paths if not fp.exists()] + # If download_queue is empty, exit early if not download_queue: return True @@ -98,10 +101,9 @@ def download_input_files(file_paths: List[Path], bucket_name: str) -> bool: else len(download_queue) ) + command_parts = ["aws", "s3", "sync", f"s3://{bucket_name}", settings.INPUT_DATA_PATH] command_parts.append("--exclude=*") - command_parts.extend( - [f"--include={file_path}" for file_path in download_queue[:chunk_size]] - ) + command_parts.extend([f"--include={file_path}" for file_path in download_queue]) if "public-test" in bucket_name: command_parts.append("--no-sign-request") From abfbd24a911b2c144752f110bb20018142fc588a Mon Sep 17 00:00:00 2001 From: David Date: Fri, 6 Dec 2024 20:33:51 -0500 Subject: [PATCH 3/8] sync once per projects child directory --- .../commands/download_input_files.py | 2 +- api/scpca_portal/s3.py | 35 ++++++++++--------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/api/scpca_portal/management/commands/download_input_files.py b/api/scpca_portal/management/commands/download_input_files.py index d2416683..1627f47a 100644 --- a/api/scpca_portal/management/commands/download_input_files.py +++ b/api/scpca_portal/management/commands/download_input_files.py @@ -12,7 +12,7 @@ class Command(BaseCommand): def handle(self, *args, **kwargs): - loader.prep_data_dirs() + loader.prep_data_dirs(True) project = Project.objects.filter(scpca_id="SCPCP000006").first() diff --git a/api/scpca_portal/s3.py b/api/scpca_portal/s3.py index 9bb3bd88..024bebfa 100644 --- a/api/scpca_portal/s3.py +++ b/api/scpca_portal/s3.py @@ -1,5 +1,5 @@ import subprocess -from collections import namedtuple +from collections import namedtuple, defaultdict from pathlib import Path from typing import List @@ -15,8 +15,6 @@ aws_s3 = boto3.client("s3", config=Config(signature_version="s3v4")) -MAX_QUEUE_CHUNK_SIZE = 250 - def list_input_paths( relative_path: Path, @@ -88,22 +86,27 @@ def list_input_paths( def download_input_files(file_paths: List[Path], bucket_name: str) -> bool: """Download all passed data file paths which have not previously been downloaded.'""" - download_queue = [fp for fp in file_paths if not fp.exists()] + # NOTE: AWS Sync does one iteration per include flag. + # This causes a tremendous slowdown when trying to sync a long list of specific files. + # In order to overcome this we should sync once per project folder's immediate child subdirectory or file. + download_queue = defaultdict(list) - # If download_queue is empty, exit early - if not download_queue: - return True + for file_path in file_paths: + if not file_path.exists(): - while download_queue: - chunk_size = ( - MAX_QUEUE_CHUNK_SIZE - if len(download_queue) > MAX_QUEUE_CHUNK_SIZE - else len(download_queue) - ) + # default to parent for immediately nested files + bucket_path = file_path.parent + + if len(file_path.parents) > 2: + bucket_path = file_path.parents[-3] - command_parts = ["aws", "s3", "sync", f"s3://{bucket_name}", settings.INPUT_DATA_PATH] + download_queue[bucket_path].append(file_path.relative_to(bucket_path)) + + + for bucket_path, project_file_paths in download_queue.items(): + command_parts = ["aws", "s3", "sync", f"s3://{bucket_name}/{bucket_path}", settings.INPUT_DATA_PATH / bucket_path] command_parts.append("--exclude=*") - command_parts.extend([f"--include={file_path}" for file_path in download_queue]) + command_parts.extend([f"--include={file_path}" for file_path in project_file_paths]) if "public-test" in bucket_name: command_parts.append("--no-sign-request") @@ -114,8 +117,6 @@ def download_input_files(file_paths: List[Path], bucket_name: str) -> bool: logger.error(f"Data files failed to download due to the following error:\n\t{error}") return False - download_queue = download_queue[chunk_size:] - return True From a3b7951a7cb5e01a0bd09b6a2da730d79dca757b Mon Sep 17 00:00:00 2001 From: Avrohom Gottlieb Date: Sun, 8 Dec 2024 12:13:43 -0500 Subject: [PATCH 4/8] replace file_path.parents with file_path.parts --- api/scpca_portal/config/logging.py | 2 +- api/scpca_portal/s3.py | 22 ++++++++++++++-------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/api/scpca_portal/config/logging.py b/api/scpca_portal/config/logging.py index 7c4f98b9..ea69b0ed 100644 --- a/api/scpca_portal/config/logging.py +++ b/api/scpca_portal/config/logging.py @@ -21,7 +21,7 @@ def get_thread_id() -> str: "%(asctime)s {0} %(name)s %(color)s%(levelname)s%(extras)s" ": %(message)s%(color_stop)s" ).format(get_thread_id()) LOG_LEVEL = None -LOG_RUNTIMES = os.getenv("LOG_RUNTIMES", True) +LOG_RUNTIMES = os.getenv("LOG_RUNTIMES", False) def unconfigure_root_logger(): diff --git a/api/scpca_portal/s3.py b/api/scpca_portal/s3.py index 024bebfa..73ee5ee6 100644 --- a/api/scpca_portal/s3.py +++ b/api/scpca_portal/s3.py @@ -1,5 +1,5 @@ import subprocess -from collections import namedtuple, defaultdict +from collections import defaultdict, namedtuple from pathlib import Path from typing import List @@ -88,23 +88,29 @@ def download_input_files(file_paths: List[Path], bucket_name: str) -> bool: # NOTE: AWS Sync does one iteration per include flag. # This causes a tremendous slowdown when trying to sync a long list of specific files. - # In order to overcome this we should sync once per project folder's immediate child subdirectory or file. + # In order to overcome this we should sync once + # per project folder's immediate child subdirectory or file. download_queue = defaultdict(list) for file_path in file_paths: if not file_path.exists(): - # default to parent for immediately nested files - bucket_path = file_path.parent + # default to project folder for immediately nested files + bucket_path = Path(file_path.parts[0]) - if len(file_path.parents) > 2: - bucket_path = file_path.parents[-3] + if len(file_path.parts) > 2: + bucket_path = bucket_path / file_path.parts[1] download_queue[bucket_path].append(file_path.relative_to(bucket_path)) - for bucket_path, project_file_paths in download_queue.items(): - command_parts = ["aws", "s3", "sync", f"s3://{bucket_name}/{bucket_path}", settings.INPUT_DATA_PATH / bucket_path] + command_parts = [ + "aws", + "s3", + "sync", + f"s3://{bucket_name}/{bucket_path}", + settings.INPUT_DATA_PATH / bucket_path, + ] command_parts.append("--exclude=*") command_parts.extend([f"--include={file_path}" for file_path in project_file_paths]) From 785ded509c8bfde1c0d1e2b7ad22184f0f873b62 Mon Sep 17 00:00:00 2001 From: Avrohom Gottlieb Date: Sun, 8 Dec 2024 12:50:15 -0500 Subject: [PATCH 5/8] remove trailing slash from test input bucket --- api/scpca_portal/config/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/scpca_portal/config/test.py b/api/scpca_portal/config/test.py index a559965e..677a14a3 100644 --- a/api/scpca_portal/config/test.py +++ b/api/scpca_portal/config/test.py @@ -6,7 +6,7 @@ class Test(Local): # AWS S3 # Note: Data must be resynced when test bucket is updated - AWS_S3_INPUT_BUCKET_NAME = "scpca-portal-public-test-inputs/2024-09-10/" + AWS_S3_INPUT_BUCKET_NAME = "scpca-portal-public-test-inputs/2024-09-10" # Code Paths INPUT_DATA_PATH = Path("/home/user/code/test_data/input") From 69e8a790e9d435dc4b9c9e8bffd9767b08c1422d Mon Sep 17 00:00:00 2001 From: Avrohom Gottlieb Date: Sun, 8 Dec 2024 13:06:32 -0500 Subject: [PATCH 6/8] remove logging --- api/scpca_portal/loader.py | 5 +--- .../commands/download_input_files.py | 29 ------------------- api/scpca_portal/s3.py | 7 ++--- 3 files changed, 3 insertions(+), 38 deletions(-) delete mode 100644 api/scpca_portal/management/commands/download_input_files.py diff --git a/api/scpca_portal/loader.py b/api/scpca_portal/loader.py index c8248412..f5abc638 100644 --- a/api/scpca_portal/loader.py +++ b/api/scpca_portal/loader.py @@ -8,7 +8,7 @@ from django.template.defaultfilters import pluralize from scpca_portal import metadata_file, s3 -from scpca_portal.config.logging import configure_runtime_logging, get_and_configure_logger +from scpca_portal.config.logging import get_and_configure_logger from scpca_portal.models import ( ComputedFile, Contact, @@ -19,7 +19,6 @@ ) logger = get_and_configure_logger(__name__) -log_runtime = configure_runtime_logging(logger) def prep_data_dirs(wipe_input_dir: bool = False, wipe_output_dir: bool = True) -> None: @@ -143,7 +142,6 @@ def create_project( return project -@log_runtime def _create_computed_file( computed_file: ComputedFile, update_s3: bool, clean_up_output_data: bool ) -> None: @@ -174,7 +172,6 @@ def _create_computed_file_callback(future, *, update_s3: bool, clean_up_output_d connection.close() -@log_runtime def generate_computed_file( *, download_config: Dict, diff --git a/api/scpca_portal/management/commands/download_input_files.py b/api/scpca_portal/management/commands/download_input_files.py deleted file mode 100644 index 1627f47a..00000000 --- a/api/scpca_portal/management/commands/download_input_files.py +++ /dev/null @@ -1,29 +0,0 @@ -import logging - -from django.core.management.base import BaseCommand - -from scpca_portal import common, loader, s3 -from scpca_portal.models import Project - -logger = logging.getLogger() -logger.setLevel(logging.INFO) -logger.addHandler(logging.StreamHandler()) - - -class Command(BaseCommand): - def handle(self, *args, **kwargs): - loader.prep_data_dirs(True) - - project = Project.objects.filter(scpca_id="SCPCP000006").first() - - download_config = common.PROJECT_DOWNLOAD_CONFIGS["SPATIAL_SINGLE_CELL_EXPERIMENT"] - libraries = project.get_libraries(download_config) - - library_data_file_paths = [ - fp for lib in libraries for fp in lib.get_download_config_file_paths(download_config) - ] - project_data_file_paths = project.get_download_config_file_paths(download_config) - - s3.download_input_files( - library_data_file_paths + project_data_file_paths, project.s3_input_bucket - ) diff --git a/api/scpca_portal/s3.py b/api/scpca_portal/s3.py index 73ee5ee6..8664829b 100644 --- a/api/scpca_portal/s3.py +++ b/api/scpca_portal/s3.py @@ -8,11 +8,9 @@ import boto3 from botocore.client import Config -from scpca_portal.config.logging import configure_runtime_logging, get_and_configure_logger +from scpca_portal.config.logging import get_and_configure_logger logger = get_and_configure_logger(__name__) -log_runtime = configure_runtime_logging(logger) - aws_s3 = boto3.client("s3", config=Config(signature_version="s3v4")) @@ -82,7 +80,6 @@ def list_input_paths( return file_paths -@log_runtime def download_input_files(file_paths: List[Path], bucket_name: str) -> bool: """Download all passed data file paths which have not previously been downloaded.'""" @@ -99,7 +96,7 @@ def download_input_files(file_paths: List[Path], bucket_name: str) -> bool: bucket_path = Path(file_path.parts[0]) if len(file_path.parts) > 2: - bucket_path = bucket_path / file_path.parts[1] + bucket_path /= file_path.parts[1] download_queue[bucket_path].append(file_path.relative_to(bucket_path)) From 6b64aff807ed4dcdbafdd05372f52b610fd311c3 Mon Sep 17 00:00:00 2001 From: Avrohom Gottlieb Date: Mon, 9 Dec 2024 12:30:09 -0500 Subject: [PATCH 7/8] add comment explaining subdirectory appending to bucket path --- api/scpca_portal/s3.py | 1 + 1 file changed, 1 insertion(+) diff --git a/api/scpca_portal/s3.py b/api/scpca_portal/s3.py index 8664829b..7866f886 100644 --- a/api/scpca_portal/s3.py +++ b/api/scpca_portal/s3.py @@ -96,6 +96,7 @@ def download_input_files(file_paths: List[Path], bucket_name: str) -> bool: bucket_path = Path(file_path.parts[0]) if len(file_path.parts) > 2: + # append the subdirectory to the parent directory to form the bucket_path bucket_path /= file_path.parts[1] download_queue[bucket_path].append(file_path.relative_to(bucket_path)) From accdbfd896991b3733027ddfea874da6277a7b23 Mon Sep 17 00:00:00 2001 From: Avrohom Gottlieb Date: Mon, 9 Dec 2024 12:33:39 -0500 Subject: [PATCH 8/8] update wording in batch revision tag --- infrastructure/batch.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infrastructure/batch.tf b/infrastructure/batch.tf index e7414de5..43b7c3d1 100644 --- a/infrastructure/batch.tf +++ b/infrastructure/batch.tf @@ -22,6 +22,6 @@ stage = var.stage batch_tags = { module = "batch", - revision = "initial - 16 vCPU compute environment and 1 queue" + revision = "first - 16 vCPU compute environment with 1 vCPU per job" } }