From cfa65c37da1363c6a4b6f48d3162463777c85f62 Mon Sep 17 00:00:00 2001 From: Christopher Tomkins-Tinch Date: Thu, 12 Dec 2024 12:40:35 -0500 Subject: [PATCH] unpack_archive_to_bucket_path: allow additional opts to be passed to `gcloud storage cp`; reorder of inputs and their descriptions --- pipes/WDL/tasks/tasks_terra.wdl | 13 +--- pipes/WDL/tasks/tasks_utils.wdl | 59 ++++++++++++------- .../WDL/workflows/sarscov2_illumina_full.wdl | 2 - .../workflows/unpack_archive_to_bucket.wdl | 31 +++++++++- 4 files changed, 69 insertions(+), 36 deletions(-) diff --git a/pipes/WDL/tasks/tasks_terra.wdl b/pipes/WDL/tasks/tasks_terra.wdl index 1e421dcb1..ba34ea84d 100644 --- a/pipes/WDL/tasks/tasks_terra.wdl +++ b/pipes/WDL/tasks/tasks_terra.wdl @@ -48,6 +48,7 @@ task check_terra_env { # create Terra-related output files touch user_email.txt + touch workspace_id.txt touch workspace_name.txt touch workspace_namespace.txt touch workspace_bucket_path.txt @@ -56,6 +57,7 @@ task check_terra_env { touch method_version.txt touch method_source.txt touch method_path.txt + touch top_level_submission_id.txt # disable the version update alert messages gcloud sometimes emits when executing any command gcloud config set component_manager/disable_update_check true @@ -134,7 +136,7 @@ task check_terra_env { WORKSPACE_NAME="$(jq -cr '.workspace.name | select (.!=null)' workspace_info.json | tee workspace_name.txt)" WORKSPACE_NAME_URL_ENCODED="$(jq -rn --arg x "${WORKSPACE_NAME}" '$x|@uri')" WORKSPACE_NAMESPACE="$(jq -cr '.workspace.namespace | select (.!=null)' workspace_info.json | tee workspace_namespace.txt)" - WORKSPACE_BUCKET="$(echo gs://${WORKSPACE_ID} | tee workspace_bucket_path.txt)" + WORKSPACE_BUCKET="$(echo "gs://${WORKSPACE_ID}" | tee workspace_bucket_path.txt)" echo "WORKSPACE_NAME: ${WORKSPACE_NAME}" echo "WORKSPACE_NAMESPACE: ${WORKSPACE_NAMESPACE}" @@ -194,15 +196,6 @@ task check_terra_env { else echo "Not running on Terra+GCP" fi - ls -1 /sys - echo "--" - ls -1 /sys/fs - echo "--" - ls -1 /sys/fs/cgroup - echo "-- memory.peak:" - cat /sys/fs/cgroup/memory.peak - echo "--" - #ls -1 /sys/fs/cgroup/memory echo -n'' "MEM_BYTES: "; { if [ -f /sys/fs/cgroup/memory.peak ]; then cat /sys/fs/cgroup/memory.peak; elif [ -f /sys/fs/cgroup/memory/memory.max_usage_in_bytes ]; then cat /sys/fs/cgroup/memory/memory.max_usage_in_bytes; else echo "0"; fi } | tee MEM_BYTES >>> output { diff --git a/pipes/WDL/tasks/tasks_utils.wdl b/pipes/WDL/tasks/tasks_utils.wdl index a47209dcf..742e94727 100644 --- a/pipes/WDL/tasks/tasks_utils.wdl +++ b/pipes/WDL/tasks/tasks_utils.wdl @@ -39,22 +39,24 @@ task unpack_archive_to_bucket_path { String bucket_path_prefix String? out_dir_name - # gcloud storage options - Boolean clobber_existing = false - String? gcloud_access_token - # tar options Boolean bypass_disk_and_unpack_directly_to_bucket = false Int? archive_wrapper_directories_to_strip String tar_opts = "-v --ignore-zeros --no-ignore-command-error" + + # gcloud storage options + Boolean clobber_existing = false + String? gcloud_access_token + String gcloud_storage_cp_opts = "" - # resource requirements + # execution and resource requirements Int disk_size = 500 Int machine_mem_gb = 128 String docker = "quay.io/broadinstitute/viral-core:2.4.0" } parameter_meta { + # data I/O inputs input_archive_files: { description: "List of input archive files to unpack.", patterns: ["*.tar", "*.tar.gz", "*.tgz", "*.tar.bz2", "*.tbz2", "*.tar.xz", "*.txz", "*.tar.lz4", "*.tar.zst"] @@ -65,27 +67,40 @@ task unpack_archive_to_bucket_path { out_dir_name: { description: "Name of the (sub-)directory to unpack the archive contents to within the bucket prefix specified. If not provided, the contents will be unpacked to the bucket prefix." } - gcloud_access_token: { - description: "Access token for the Google Cloud Storage bucket, if needed to write to the bucket specified by 'bucket_path_prefix'. If not provided, the gcloud auth configuration of the execution environment will be used (service/pet account on Terra)." + + # tar params + bypass_disk_and_unpack_directly_to_bucket: { + description: "(tar) If true, unpack the archive(s) and pipe the contents directly to the gcloud storage upload process, without writing to the disk between extraction and upload. If enabled, minimal disk space will be used beyond storage needed to localize the specified input archive(s), but the task may take significantly longer as each file is uploaded using an independent gcloud storage invocation." } archive_wrapper_directories_to_strip: { - description: "If specified, tar extraction excludes this many top-level directories. (i.e. if all files of a tarball are containined within a top-level subdirectory, and archive_wrapper_directories_to_strip=1, the files files will be extracted without being placed into a corresponding output sub-directory. Equivalent to the parameter '--strip-components' of GNU tar." + description: "(tar) If specified, tar extraction excludes this many top-level directories. (i.e. if all files of a tarball are containined within a top-level subdirectory, and archive_wrapper_directories_to_strip=1, the files files will be extracted without being placed into a corresponding output sub-directory. Equivalent to the parameter '--strip-components' of GNU tar." + } + tar_opts: { + description: "(tar) Options to pass to GNU tar during extraction. By default includes: '-v --ignore-zeros --no-ignore-command-error'" } + + # 'gcloud storage cp' params clobber_existing: { - description: "If true, overwrite files in the target directory of the bucket if they already exist." + description: "(gcloud storage cp) If true, overwrite files in the target directory of the bucket if they already exist." } - bypass_disk_and_unpack_directly_to_bucket: { - description: "If true, unpack the archive(s) and pipe the contents directly to the gcloud storage upload process, without writing to the disk between extraction and upload. If enabled, minimal disk space will be used beyond storage needed to localize the specified input archive(s), but the task may take significantly longer as each file is uploaded using an independent gcloud storage invocation." + gcloud_access_token: { + description: "(gcloud storage cp) Access token for the Google Cloud Storage bucket, for account authorized to write to the bucket specified by 'bucket_path_prefix'. If not provided, the gcloud auth configuration of the execution environment will be obtained via 'gcloud auth print-access-token' for the active authenticated user (on Terra, the service worker/'pet' account)." } - tar_opts: { - description: "Options to pass to the tar command during extraction. By default includes: '-v --ignore-zeros --no-ignore-command-error'" + gcloud_storage_cp_opts: { + description: "(gcloud storage cp) Additional options to pass to the 'gcloud storage cp' command at the time of upload." } + + + # execution and resource requirements disk_size: { description: "Size of the disk to allocate for the task, in GB. Note that if multiple files are provided to 'input_archive_files', and extracted data is written to the disk (bypass_disk_and_unpack_directly_to_bucket=false), the extracted data from one archive will be removed before extracting and uploading data from the next input archive." } machine_mem_gb: { description: "Memory to allocate for the task, in GB." } + docker: { + description: "Docker image to use for the task. For this task, the image must provide GNU tar and the google-cloud-cli ('gcloud' command)" + } } command <<< @@ -98,10 +113,11 @@ task unpack_archive_to_bucket_path { if ~{if(defined(gcloud_access_token)) then 'true' else 'false'}; then # set access token env var expected by gcloud, # if provided by the user - export CLOUDSDK_AUTH_ACCESS_TOKEN="~{gcloud_access_token}" + CLOUDSDK_AUTH_ACCESS_TOKEN="~{gcloud_access_token}" else - export CLOUDSDK_AUTH_ACCESS_TOKEN="$(gcloud auth print-access-token)" + CLOUDSDK_AUTH_ACCESS_TOKEN="$(gcloud auth print-access-token)" fi + export CLOUDSDK_AUTH_ACCESS_TOKEN # check that the gcloud access token is populated if [ -z "${CLOUDSDK_AUTH_ACCESS_TOKEN}" ]; then @@ -124,7 +140,7 @@ task unpack_archive_to_bucket_path { # by trying a simple write action, since we cannot rely on # the user having the permissions needed to view the IAM policies # that determine their (write) access to the bucket - if ! echo "write_test" | gcloud storage cp - "${bucket_path_prefix}/.tmp/test-write-access.txt" --quiet; then + if ! echo "write_test" | gcloud storage cp --verbosity error - "${bucket_path_prefix}/.tmp/test-write-access.txt" --quiet; then echo "ERROR: user does not have write access to the target bucket: ~{bucket_path_prefix}" >&2 exit 1 else @@ -152,12 +168,12 @@ task unpack_archive_to_bucket_path { # https://www.gnu.org/software/tar/manual/html_section/extract-options.html#Writing-to-an-External-Program tar ~{tar_opts} -x \ ~{if(defined(archive_wrapper_directories_to_strip)) then "--strip-components=~{archive_wrapper_directories_to_strip}" else ""} \ - --to-command='gcloud storage cp ~{if clobber_existing then "" else "--no-clobber"} --verbosity error - '"${bucket_path_prefix}~{if(defined(out_dir_name)) then '/~{out_dir_name}' else ''}/"'${TAR_REALNAME}' \ + --to-command='gcloud storage cp ~{gcloud_storage_cp_opts} ~{if clobber_existing then "" else "--no-clobber"} --verbosity error - '"${bucket_path_prefix}~{if(defined(out_dir_name)) then '/~{out_dir_name}' else ''}/"'${TAR_REALNAME}' \ -f "${input_archive}" # otherwise extract to disk and then upload to the bucket else - echo 'Extracting archive '$(basename "${input_archive}")' to disk before upload...' + echo 'Extracting archive '"$(basename "${input_archive}")"' to disk before upload...' # create a temporary directory to extract the archive contents to mkdir -p extracted_tmp @@ -177,6 +193,7 @@ task unpack_archive_to_bucket_path { --recursive \ ~{if clobber_existing then "" else "--no-clobber"} \ --verbosity warning \ + ~{gcloud_storage_cp_opts} \ ./ "${bucket_path_prefix}~{if(defined(out_dir_name)) then '/~{out_dir_name}' else ''}" popd @@ -452,7 +469,7 @@ task download_from_url { mv response01 "${downloaded_file_name}" && \ rm "tmp/$downloaded_file_name" else - mv "tmp/${downloaded_file_name} ${downloaded_file_name}" + mv "tmp/${downloaded_file_name}" "${downloaded_file_name}" fi # alternative python implementation to split response headers from body # via https://stackoverflow.com/a/75483099 @@ -490,11 +507,11 @@ task download_from_url { if ~{if defined(md5_hash_expected) then 'true' else 'false'}; then md5_hash_expected="~{md5_hash_expected}" - check_md5_sum $md5_hash_expected $md5sum_of_downloaded + check_md5_sum "$md5_hash_expected" "$md5sum_of_downloaded" fi if ~{if defined(md5_hash_expected_file_url) then 'true' else 'false'}; then md5_hash_expected="$(curl --silent ~{md5_hash_expected_file_url} | cut -f1 -d' ')" - check_md5_sum $md5_hash_expected $md5sum_of_downloaded + check_md5_sum "$md5_hash_expected" "$md5sum_of_downloaded" fi # report the file size, in bytes diff --git a/pipes/WDL/workflows/sarscov2_illumina_full.wdl b/pipes/WDL/workflows/sarscov2_illumina_full.wdl index a0ac5fe36..25265de0e 100644 --- a/pipes/WDL/workflows/sarscov2_illumina_full.wdl +++ b/pipes/WDL/workflows/sarscov2_illumina_full.wdl @@ -32,12 +32,10 @@ workflow sarscov2_illumina_full { description: "amplicon primers to trim in reference coordinate space (0-based BED format)", patterns: ["*.bed"] } - biosample_attributes: { description: "A post-submission attributes file from NCBI BioSample, which is available at https://submit.ncbi.nlm.nih.gov/subs/ and clicking on 'Download attributes file with BioSample accessions'. The 'sample_name' column must match the external_ids used in sample_rename_map (or internal ids if sample_rename_map is omitted).", patterns: ["*.txt", "*.tsv"] } - } input { diff --git a/pipes/WDL/workflows/unpack_archive_to_bucket.wdl b/pipes/WDL/workflows/unpack_archive_to_bucket.wdl index d0d59d84a..30e2f65d8 100644 --- a/pipes/WDL/workflows/unpack_archive_to_bucket.wdl +++ b/pipes/WDL/workflows/unpack_archive_to_bucket.wdl @@ -13,15 +13,40 @@ workflow unpack_archive_to_bucket { } input { - String? gcloud_auth_token + Array[File] input_archive_files + String? bucket_path_prefix + String? out_dir_name + + String? gcloud_access_token + } + + parameter_meta { + input_archive_files: { + description: "List of input archive files to unpack.", + patterns: ["*.tar", "*.tar.gz", "*.tgz", "*.tar.bz2", "*.tbz2", "*.tar.xz", "*.txz", "*.tar.lz4", "*.tar.zst"] + } + bucket_path_prefix: { + description: "Path within the Google Storage bucket to unpack the archive files. If not provided, the root of the bucket will be used." + } + out_dir_name: { + description: "Name of the (sub-)directory to unpack the archive contents to within the bucket prefix specified. If not provided, the contents will be unpacked to the bucket prefix." + } + gcloud_access_token: { + description: "Access token for the Google Cloud Storage bucket, for account authorized to write to the bucket specified by 'bucket_path_prefix'. If not provided, the gcloud auth configuration of the execution environment will be obtained via 'gcloud auth print-access-token' for the active authenticated user (on Terra, the service worker/'pet' account)." + } } call tasks_terra.check_terra_env - if( (check_terra_env.is_running_on_terra && check_terra_env.is_backed_by_gcp) || defined(gcloud_auth_token) ) { + # only run the task if we are running on GCP or the user provides an auth token to interact with GCP + # if needed, we can also inspect 'check_terra_env.is_running_on_terra' + if( check_terra_env.is_backed_by_gcp || defined(gcloud_access_token) ) { call tasks_utils.unpack_archive_to_bucket_path { input: - gcloud_access_token = gcloud_auth_token + input_archive_files = input_archive_files, + gcloud_access_token = gcloud_access_token, + bucket_path_prefix = if (check_terra_env.is_running_on_terra && check_terra_env.is_backed_by_gcp) then select_first([bucket_path_prefix,check_terra_env.workspace_bucket_path]) else select_first([bucket_path_prefix]), + out_dir_name = out_dir_name } } }