diff --git a/ingest/vendored/.cramrc b/ingest/vendored/.cramrc new file mode 100644 index 0000000..153d20f --- /dev/null +++ b/ingest/vendored/.cramrc @@ -0,0 +1,3 @@ +[cram] +shell = /bin/bash +indent = 2 diff --git a/ingest/vendored/.github/pull_request_template.md b/ingest/vendored/.github/pull_request_template.md new file mode 100644 index 0000000..ed4a5b2 --- /dev/null +++ b/ingest/vendored/.github/pull_request_template.md @@ -0,0 +1,16 @@ +### Description of proposed changes + + + +### Related issue(s) + + + +### Checklist + + + +- [ ] Checks pass +- [ ] If adding a script, add an entry for it in the README. + + diff --git a/ingest/vendored/.github/workflows/ci.yaml b/ingest/vendored/.github/workflows/ci.yaml new file mode 100644 index 0000000..c6a218a --- /dev/null +++ b/ingest/vendored/.github/workflows/ci.yaml @@ -0,0 +1,23 @@ +name: CI + +on: + push: + branches: + - main + pull_request: + workflow_dispatch: + +jobs: + shellcheck: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: nextstrain/.github/actions/shellcheck@master + + cram: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + - run: pip install cram + - run: cram tests/ \ No newline at end of file diff --git a/ingest/vendored/.gitrepo b/ingest/vendored/.gitrepo new file mode 100644 index 0000000..541e130 --- /dev/null +++ b/ingest/vendored/.gitrepo @@ -0,0 +1,12 @@ +; DO NOT EDIT (unless you know what you are doing) +; +; This subdirectory is a git "subrepo", and this file is maintained by the +; git-subrepo command. See https://github.com/ingydotnet/git-subrepo#readme +; +[subrepo] + remote = https://github.com/nextstrain/ingest + branch = main + commit = a0faef53a0c6e7cc4057209454ef0852875dc3a9 + parent = 9fba460d1db4d677615d3826cdf061aeffde71a7 + method = merge + cmdver = 0.4.6 diff --git a/ingest/vendored/.shellcheckrc b/ingest/vendored/.shellcheckrc new file mode 100644 index 0000000..ebed438 --- /dev/null +++ b/ingest/vendored/.shellcheckrc @@ -0,0 +1,6 @@ +# Use of this file requires Shellcheck v0.7.0 or newer. +# +# SC2064 - We intentionally want variables to expand immediately within traps +# so the trap can not fail due to variable interpolation later. +# +disable=SC2064 diff --git a/ingest/vendored/README.md b/ingest/vendored/README.md new file mode 100644 index 0000000..0ad83f4 --- /dev/null +++ b/ingest/vendored/README.md @@ -0,0 +1,140 @@ +# ingest + +Shared internal tooling for pathogen data ingest. Used by our individual +pathogen repos which produce Nextstrain builds. Expected to be vendored by +each pathogen repo using `git subtree`. + +Some tools may only live here temporarily before finding a permanent home in +`augur curate` or Nextstrain CLI. Others may happily live out their days here. + +## Vendoring + +Nextstrain maintained pathogen repos will use [`git subrepo`](https://github.com/ingydotnet/git-subrepo) to vendor ingest scripts. +(See discussion on this decision in https://github.com/nextstrain/ingest/issues/3) + +For a list of Nextstrain repos that are currently using this method, use [this +GitHub code search](https://github.com/search?type=code&q=org%3Anextstrain+subrepo+%22remote+%3D+https%3A%2F%2Fgithub.com%2Fnextstrain%2Fingest%22). + +If you don't already have `git subrepo` installed, follow the [git subrepo installation instructions](https://github.com/ingydotnet/git-subrepo#installation). +Then add the latest ingest scripts to the pathogen repo by running: + +``` +git subrepo clone https://github.com/nextstrain/ingest ingest/vendored +``` + +Any future updates of ingest scripts can be pulled in with: + +``` +git subrepo pull ingest/vendored +``` + +If you run into merge conflicts and would like to pull in a fresh copy of the +latest ingest scripts, pull with the `--force` flag: + +``` +git subrepo pull ingest/vendored --force +``` + +> **Warning** +> Beware of rebasing/dropping the parent commit of a `git subrepo` update + +`git subrepo` relies on metadata in the `ingest/vendored/.gitrepo` file, +which includes the hash for the parent commit in the pathogen repos. +If this hash no longer exists in the commit history, there will be errors when +running future `git subrepo pull` commands. + +If you run into an error similar to the following: +``` +$ git subrepo pull ingest/vendored +git-subrepo: Command failed: 'git branch subrepo/ingest/vendored '. +fatal: not a valid object name: '' +``` +Check the parent commit hash in the `ingest/vendored/.gitrepo` file and make +sure the commit exists in the commit history. Update to the appropriate parent +commit hash if needed. + +## History + +Much of this tooling originated in +[ncov-ingest](https://github.com/nextstrain/ncov-ingest) and was passaged thru +[mpox's ingest/](https://github.com/nextstrain/mpox/tree/@/ingest/). It +subsequently proliferated from [mpox][] to other pathogen repos ([rsv][], +[zika][], [dengue][], [hepatitisB][], [forecasts-ncov][]) primarily thru +copying. To [counter that +proliferation](https://bedfordlab.slack.com/archives/C7SDVPBLZ/p1688577879947079), +this repo was made. + +[mpox]: https://github.com/nextstrain/mpox +[rsv]: https://github.com/nextstrain/rsv +[zika]: https://github.com/nextstrain/zika/pull/24 +[dengue]: https://github.com/nextstrain/dengue/pull/10 +[hepatitisB]: https://github.com/nextstrain/hepatitisB +[forecasts-ncov]: https://github.com/nextstrain/forecasts-ncov + +## Elsewhere + +The creation of this repo, in both the abstract and concrete, and the general +approach to "ingest" has been discussed in various internal places, including: + +- https://github.com/nextstrain/private/issues/59 +- @joverlee521's [workflows document](https://docs.google.com/document/d/1rLWPvEuj0Ayc8MR0O1lfRJZfj9av53xU38f20g8nU_E/edit#heading=h.4g0d3mjvb89i) +- [5 July 2023 Slack thread](https://bedfordlab.slack.com/archives/C7SDVPBLZ/p1688577879947079) +- [6 July 2023 team meeting](https://docs.google.com/document/d/1FPfx-ON5RdqL2wyvODhkrCcjgOVX3nlXgBwCPhIEsco/edit) +- _…many others_ + +## Scripts + +Scripts for supporting ingest workflow automation that don’t really belong in any of our existing tools. + +- [notify-on-diff](notify-on-diff) - Send Slack message with diff of a local file and an S3 object +- [notify-on-job-fail](notify-on-job-fail) - Send Slack message with details about failed workflow job on GitHub Actions and/or AWS Batch +- [notify-on-job-start](notify-on-job-start) - Send Slack message with details about workflow job on GitHub Actions and/or AWS Batch +- [notify-on-record-change](notify-on-recod-change) - Send Slack message with details about line count changes for a file compared to an S3 object's metadata `recordcount`. + If the S3 object's metadata does not have `recordcount`, then will attempt to download S3 object to count lines locally, which only supports `xz` compressed S3 objects. +- [notify-slack](notify-slack) - Send message or file to Slack +- [s3-object-exists](s3-object-exists) - Used to prevent 404 errors during S3 file comparisons in the notify-* scripts +- [trigger](trigger) - Triggers downstream GitHub Actions via the GitHub API using repository_dispatch events. +- [trigger-on-new-data](trigger-on-new-data) - Triggers downstream GitHub Actions if the provided `upload-to-s3` outputs do not contain the `identical_file_message` + A hacky way to ensure that we only trigger downstream phylogenetic builds if the S3 objects have been updated. + +NCBI interaction scripts that are useful for fetching public metadata and sequences. + +- [fetch-from-ncbi-entrez](fetch-from-ncbi-entrez) - Fetch metadata and nucleotide sequences from [NCBI Entrez](https://www.ncbi.nlm.nih.gov/books/NBK25501/) and output to a GenBank file. + Useful for pathogens with metadata and annotations in custom fields that are not part of the standard [NCBI Datasets](https://www.ncbi.nlm.nih.gov/datasets/) outputs. + +Historically, some pathogen repos used the undocumented NCBI Virus API through [fetch-from-ncbi-virus](https://github.com/nextstrain/ingest/blob/c97df238518171c2b1574bec0349a55855d1e7a7/fetch-from-ncbi-virus) to fetch data. However we've opted to drop the NCBI Virus scripts due to https://github.com/nextstrain/ingest/issues/18. + +Potential Nextstrain CLI scripts + +- [sha256sum](sha256sum) - Used to check if files are identical in upload-to-s3 and download-from-s3 scripts. +- [cloudfront-invalidate](cloudfront-invalidate) - CloudFront invalidation is already supported in the [nextstrain remote command for S3 files](https://github.com/nextstrain/cli/blob/a5dda9c0579ece7acbd8e2c32a4bbe95df7c0bce/nextstrain/cli/remote/s3.py#L104). + This exists as a separate script to support CloudFront invalidation when using the upload-to-s3 script. +- [upload-to-s3](upload-to-s3) - Upload file to AWS S3 bucket with compression based on file extension in S3 URL. + Skips upload if the local file's hash is identical to the S3 object's metadata `sha256sum`. + Adds the following user defined metadata to uploaded S3 object: + - `sha256sum` - hash of the file generated by [sha256sum](sha256sum) + - `recordcount` - the line count of the file +- [download-from-s3](download-from-s3) - Download file from AWS S3 bucket with decompression based on file extension in S3 URL. + Skips download if the local file already exists and has a hash identical to the S3 object's metadata `sha256sum`. + +Potential augur curate scripts + +- [apply-geolocation-rules](apply-geolocation-rules) - Applies user curated geolocation rules to NDJSON records +- [merge-user-metadata](merge-user-metadata) - Merges user annotations with NDJSON records +- [transform-authors](transform-authors) - Abbreviates full author lists to ' et al.' +- [transform-field-names](transform-field-names) - Rename fields of NDJSON records +- [transform-genbank-location](transform-genbank-location) - Parses `location` field with the expected pattern `"[:][, ]"` based on [GenBank's country field](https://www.ncbi.nlm.nih.gov/genbank/collab/country/) +- [transform-strain-names](transform-strain-names) - Ordered search for strain names across several fields. + +## Software requirements + +Some scripts may require Bash ≥4. If you are running these scripts on macOS, the builtin Bash (`/bin/bash`) does not meet this requirement. You can install [Homebrew's Bash](https://formulae.brew.sh/formula/bash) which is more up to date. + +## Testing + +Most scripts are untested within this repo, relying on "testing in production". That is the only practical testing option for some scripts such as the ones interacting with S3 and Slack. + +For more locally testable scripts, Cram-style functional tests live in `tests` and are run as part of CI. To run these locally, + +1. Download Cram: `pip install cram` +2. Run the tests: `cram tests/` diff --git a/ingest/vendored/apply-geolocation-rules b/ingest/vendored/apply-geolocation-rules new file mode 100755 index 0000000..776cf16 --- /dev/null +++ b/ingest/vendored/apply-geolocation-rules @@ -0,0 +1,234 @@ +#!/usr/bin/env python3 +""" +Applies user curated geolocation rules to the geolocation fields in the NDJSON +records from stdin. The modified records are output to stdout. This does not do +any additional transformations on top of the user curations. +""" +import argparse +import json +from collections import defaultdict +from sys import exit, stderr, stdin, stdout + + +class CyclicGeolocationRulesError(Exception): + pass + + +def load_geolocation_rules(geolocation_rules_file): + """ + Loads the geolocation rules from the provided *geolocation_rules_file*. + Returns the rules as a dict: + { + regions: { + countries: { + divisions: { + locations: corrected_geolocations_tuple + } + } + } + } + """ + geolocation_rules = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) + with open(geolocation_rules_file, 'r') as rules_fh: + for line in rules_fh: + # ignore comments + if line.strip()=="" or line.lstrip()[0] == '#': + continue + + row = line.strip('\n').split('\t') + # Skip lines that cannot be split into raw and annotated geolocations + if len(row) != 2: + print( + f"WARNING: Could not decode geolocation rule {line!r}.", + "Please make sure rules are formatted as", + "'region/country/division/locationregion/country/division/location'.", + file=stderr) + continue + + # remove trailing comments + row[-1] = row[-1].partition('#')[0].rstrip() + raw , annot = tuple( row[0].split('/') ) , tuple( row[1].split('/') ) + + # Skip lines where raw or annotated geolocations cannot be split into 4 fields + if len(raw) != 4: + print( + f"WARNING: Could not decode the raw geolocation {row[0]!r}.", + "Please make sure it is formatted as 'region/country/division/location'.", + file=stderr + ) + continue + + if len(annot) != 4: + print( + f"WARNING: Could not decode the annotated geolocation {row[1]!r}.", + "Please make sure it is formatted as 'region/country/division/location'.", + file=stderr + ) + continue + + + geolocation_rules[raw[0]][raw[1]][raw[2]][raw[3]] = annot + + return geolocation_rules + + +def get_annotated_geolocation(geolocation_rules, raw_geolocation, rule_traversal = None): + """ + Gets the annotated geolocation for the *raw_geolocation* in the provided + *geolocation_rules*. + + Recursively traverses the *geolocation_rules* until we get the annotated + geolocation, which must be a Tuple. Returns `None` if there are no + applicable rules for the provided *raw_geolocation*. + + Rules are applied in the order of region, country, division, location. + First checks the provided raw values for geolocation fields, then if there + are not matches, tries to use general rules marked with '*'. + """ + # Always instantiate the rule traversal as an empty list if not provided, + # e.g. the first call of this recursive function + if rule_traversal is None: + rule_traversal = [] + + current_rules = geolocation_rules + # Traverse the geolocation rules based using the rule_traversal values + for field_value in rule_traversal: + current_rules = current_rules.get(field_value) + # If we hit `None`, then we know there are no matching rules, so stop the rule traversal + if current_rules is None: + break + + # We've found the tuple of the annotated geolocation + if isinstance(current_rules, tuple): + return current_rules + + # We've reach the next level of geolocation rules, + # so try to traverse the rules with the next target in raw_geolocation + if isinstance(current_rules, dict): + next_traversal_target = raw_geolocation[len(rule_traversal)] + rule_traversal.append(next_traversal_target) + return get_annotated_geolocation(geolocation_rules, raw_geolocation, rule_traversal) + + # We did not find any matching rule for the last traversal target + if current_rules is None: + # If we've used all general rules and we still haven't found a match, + # then there are no applicable rules for this geolocation + if all(value == '*' for value in rule_traversal): + return None + + # If we failed to find matching rule with a general rule as the last + # traversal target, then delete all trailing '*'s to reset rule_traversal + # to end with the last index that is currently NOT a '*' + # [A, *, B, *] => [A, *, B] + # [A, B, *, *] => [A, B] + # [A, *, *, *] => [A] + if rule_traversal[-1] == '*': + # Find the index of the first of the consecutive '*' from the + # end of the rule_traversal + # [A, *, B, *] => first_consecutive_general_rule_index = 3 + # [A, B, *, *] => first_consecutive_general_rule_index = 2 + # [A, *, *, *] => first_consecutive_general_rule_index = 1 + for index, field_value in reversed(list(enumerate(rule_traversal))): + if field_value == '*': + first_consecutive_general_rule_index = index + else: + break + + rule_traversal = rule_traversal[:first_consecutive_general_rule_index] + + # Set the final value to '*' in hopes that by moving to a general rule, + # we can find a matching rule. + rule_traversal[-1] = '*' + + return get_annotated_geolocation(geolocation_rules, raw_geolocation, rule_traversal) + + +def transform_geolocations(geolocation_rules, geolocation): + """ + Transform the provided *geolocation* by looking it up in the provided + *geolocation_rules*. + + This will use all rules that apply to the geolocation and rules will + be applied in the order of region, country, division, location. + + Returns the original geolocation if no geolocation rules apply. + + Raises a `CyclicGeolocationRulesError` if more than 1000 rules have + been applied to the raw geolocation. + """ + transformed_values = geolocation + rules_applied = 0 + continue_to_apply = True + + while continue_to_apply: + annotated_values = get_annotated_geolocation(geolocation_rules, transformed_values) + + # Stop applying rules if no annotated values were found + if annotated_values is None: + continue_to_apply = False + else: + rules_applied += 1 + + if rules_applied > 1000: + raise CyclicGeolocationRulesError( + "ERROR: More than 1000 geolocation rules applied on the same entry {geolocation!r}." + ) + + # Create a new list of values for comparison to previous values + new_values = list(transformed_values) + for index, value in enumerate(annotated_values): + # Keep original value if annotated value is '*' + if value != '*': + new_values[index] = value + + # Stop applying rules if this rule did not change the values, + # since this means we've reach rules with '*' that no longer change values + if new_values == transformed_values: + continue_to_apply = False + + transformed_values = new_values + + return transformed_values + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--region-field", default="region", + help="Field that contains regions in NDJSON records.") + parser.add_argument("--country-field", default="country", + help="Field that contains countries in NDJSON records.") + parser.add_argument("--division-field", default="division", + help="Field that contains divisions in NDJSON records.") + parser.add_argument("--location-field", default="location", + help="Field that contains location in NDJSON records.") + parser.add_argument("--geolocation-rules", metavar="TSV", required=True, + help="TSV file of geolocation rules with the format: " + + "'' where the raw and annotated geolocations " + + "are formatted as '///'. " + + "If creating a general rule, then the raw field value can be substituted with '*'." + + "Lines starting with '#' will be ignored as comments." + + "Trailing '#' will be ignored as comments.") + + args = parser.parse_args() + + location_fields = [args.region_field, args.country_field, args.division_field, args.location_field] + + geolocation_rules = load_geolocation_rules(args.geolocation_rules) + + for record in stdin: + record = json.loads(record) + + try: + annotated_values = transform_geolocations(geolocation_rules, [record.get(field, '') for field in location_fields]) + except CyclicGeolocationRulesError as e: + print(e, file=stderr) + exit(1) + + for index, field in enumerate(location_fields): + record[field] = annotated_values[index] + + json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') + print() diff --git a/ingest/vendored/cloudfront-invalidate b/ingest/vendored/cloudfront-invalidate new file mode 100755 index 0000000..dbea398 --- /dev/null +++ b/ingest/vendored/cloudfront-invalidate @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# Originally from @tsibley's gist: https://gist.github.com/tsibley/a66262d341dedbea39b02f27e2837ea8 +set -euo pipefail + +main() { + local domain="$1" + shift + local paths=("$@") + local distribution invalidation + + echo "-> Finding CloudFront distribution" + distribution=$( + aws cloudfront list-distributions \ + --query "DistributionList.Items[?contains(Aliases.Items, \`$domain\`)] | [0].Id" \ + --output text + ) + + if [[ -z $distribution || $distribution == None ]]; then + exec >&2 + echo "Unable to find CloudFront distribution id for $domain" + echo + echo "Are your AWS CLI credentials for the right account?" + exit 1 + fi + + echo "-> Creating CloudFront invalidation for distribution $distribution" + invalidation=$( + aws cloudfront create-invalidation \ + --distribution-id "$distribution" \ + --paths "${paths[@]}" \ + --query Invalidation.Id \ + --output text + ) + + echo "-> Waiting for CloudFront invalidation $invalidation to complete" + echo " Ctrl-C to stop waiting." + aws cloudfront wait invalidation-completed \ + --distribution-id "$distribution" \ + --id "$invalidation" +} + +main "$@" diff --git a/ingest/vendored/download-from-s3 b/ingest/vendored/download-from-s3 new file mode 100755 index 0000000..4981186 --- /dev/null +++ b/ingest/vendored/download-from-s3 @@ -0,0 +1,48 @@ +#!/usr/bin/env bash +set -euo pipefail + +bin="$(dirname "$0")" + +main() { + local src="${1:?A source s3:// URL is required as the first argument.}" + local dst="${2:?A destination file path is required as the second argument.}" + # How many lines to subsample to. 0 means no subsampling. Optional. + # It is not advised to use this for actual subsampling! This is intended to be + # used for debugging workflows with large datasets such as ncov-ingest as + # described in https://github.com/nextstrain/ncov-ingest/pull/367 + + # Uses `tsv-sample` to subsample, so it will not work as expected with files + # that have a single record split across multiple lines (i.e. FASTA sequences) + local n="${3:-0}" + + local s3path="${src#s3://}" + local bucket="${s3path%%/*}" + local key="${s3path#*/}" + + local src_hash dst_hash no_hash=0000000000000000000000000000000000000000000000000000000000000000 + dst_hash="$("$bin/sha256sum" < "$dst" || true)" + src_hash="$(aws s3api head-object --bucket "$bucket" --key "$key" --query Metadata.sha256sum --output text 2>/dev/null || echo "$no_hash")" + + echo "[ INFO] Downloading $src → $dst" + if [[ $src_hash != "$dst_hash" ]]; then + aws s3 cp --no-progress "$src" - | + if [[ "$src" == *.gz ]]; then + gunzip -cfq + elif [[ "$src" == *.xz ]]; then + xz -T0 -dcq + elif [[ "$src" == *.zst ]]; then + zstd -T0 -dcq + else + cat + fi | + if [[ "$n" -gt 0 ]]; then + tsv-sample -H -i -n "$n" + else + cat + fi >"$dst" + else + echo "[ INFO] Files are identical, skipping download" + fi +} + +main "$@" diff --git a/ingest/vendored/fetch-from-ncbi-entrez b/ingest/vendored/fetch-from-ncbi-entrez new file mode 100755 index 0000000..194a0c8 --- /dev/null +++ b/ingest/vendored/fetch-from-ncbi-entrez @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +""" +Fetch metadata and nucleotide sequences from NCBI Entrez and output to a GenBank file. +""" +import json +import argparse +from Bio import SeqIO, Entrez + +# To use the efetch API, the docs indicate only around 10,000 records should be fetched per request +# https://www.ncbi.nlm.nih.gov/books/NBK25499/#chapter4.EFetch +# However, in my testing with HepB, the max records returned was 9,999 +# - Jover, 16 August 2023 +BATCH_SIZE = 9999 + +Entrez.email = "hello@nextstrain.org" + +def parse_args(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('--term', required=True, type=str, + help='Genbank search term. Replace spaces with "+", e.g. "Hepatitis+B+virus[All+Fields]complete+genome[All+Fields]"') + parser.add_argument('--output', required=True, type=str, help='Output file (Genbank)') + return parser.parse_args() + + +def get_esearch_history(term): + """ + Search for the provided *term* via ESearch and store the results using the + Entrez history server.¹ + + Returns the total count of returned records, query key, and web env needed + to access the records from the server. + + ¹ https://www.ncbi.nlm.nih.gov/books/NBK25497/#chapter2.Using_the_Entrez_History_Server + """ + handle = Entrez.esearch(db="nucleotide", term=term, retmode="json", usehistory="y", retmax=0) + esearch_result = json.loads(handle.read())['esearchresult'] + print(f"Search term {term!r} returned {esearch_result['count']} IDs.") + return { + "count": int(esearch_result["count"]), + "query_key": esearch_result["querykey"], + "web_env": esearch_result["webenv"] + } + + +def fetch_from_esearch_history(count, query_key, web_env): + """ + Fetch records in batches from Entrez history server using the provided + *query_key* and *web_env* and yields them as a BioPython SeqRecord iterator. + """ + print(f"Fetching GenBank records in batches of n={BATCH_SIZE}") + + for start in range(0, count, BATCH_SIZE): + handle = Entrez.efetch( + db="nucleotide", + query_key=query_key, + webenv=web_env, + retstart=start, + retmax=BATCH_SIZE, + rettype="gb", + retmode="text") + + yield SeqIO.parse(handle, "genbank") + + +if __name__=="__main__": + args = parse_args() + + with open(args.output, "w") as output_handle: + for batch_results in fetch_from_esearch_history(**get_esearch_history(args.term)): + SeqIO.write(batch_results, output_handle, "genbank") diff --git a/ingest/vendored/merge-user-metadata b/ingest/vendored/merge-user-metadata new file mode 100755 index 0000000..341c2df --- /dev/null +++ b/ingest/vendored/merge-user-metadata @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +""" +Merges user curated annotations with the NDJSON records from stdin, with the user +curations overwriting the existing fields. The modified records are output +to stdout. This does not do any additional transformations on top of the user +curations. +""" +import argparse +import csv +import json +from collections import defaultdict +from sys import exit, stdin, stderr, stdout + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--annotations", metavar="TSV", required=True, + help="Manually curated annotations TSV file. " + + "The TSV should not have a header and should have exactly three columns: " + + "id to match existing metadata, field name, and field value. " + + "If there are multiple annotations for the same id and field, then the last value is used. " + + "Lines starting with '#' are treated as comments. " + + "Any '#' after the field value are treated as comments.") + parser.add_argument("--id-field", default="accession", + help="The ID field in the metadata to use to merge with the annotations.") + + args = parser.parse_args() + + annotations = defaultdict(dict) + with open(args.annotations, 'r') as annotations_fh: + csv_reader = csv.reader(annotations_fh, delimiter='\t') + for row in csv_reader: + if not row or row[0].lstrip()[0] == '#': + continue + elif len(row) != 3: + print("WARNING: Could not decode annotation line " + "\t".join(row), file=stderr) + continue + id, field, value = row + annotations[id][field] = value.partition('#')[0].rstrip() + + for record in stdin: + record = json.loads(record) + + record_id = record.get(args.id_field) + if record_id is None: + print(f"ERROR: ID field {args.id_field!r} does not exist in record", file=stderr) + exit(1) + + record.update(annotations.get(record_id, {})) + + json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') + print() diff --git a/ingest/vendored/notify-on-diff b/ingest/vendored/notify-on-diff new file mode 100755 index 0000000..ddbe7da --- /dev/null +++ b/ingest/vendored/notify-on-diff @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +set -euo pipefail + +: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" +: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" + +bin="$(dirname "$0")" + +src="${1:?A source file is required as the first argument.}" +dst="${2:?A destination s3:// URL is required as the second argument.}" + +dst_local="$(mktemp -t s3-file-XXXXXX)" +diff="$(mktemp -t diff-XXXXXX)" + +trap "rm -f '$dst_local' '$diff'" EXIT + +# if the file is not already present, just exit +"$bin"/s3-object-exists "$dst" || exit 0 + +"$bin"/download-from-s3 "$dst" "$dst_local" + +# diff's exit code is 0 for no differences, 1 for differences found, and >1 for errors +diff_exit_code=0 +diff "$dst_local" "$src" > "$diff" || diff_exit_code=$? + +if [[ "$diff_exit_code" -eq 1 ]]; then + echo "Notifying Slack about diff." + "$bin"/notify-slack --upload "$src.diff" < "$diff" +elif [[ "$diff_exit_code" -gt 1 ]]; then + echo "Notifying Slack about diff failure" + "$bin"/notify-slack "Diff failed for $src" +else + echo "No change in $src." +fi diff --git a/ingest/vendored/notify-on-job-fail b/ingest/vendored/notify-on-job-fail new file mode 100755 index 0000000..7dd2409 --- /dev/null +++ b/ingest/vendored/notify-on-job-fail @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +set -euo pipefail + +: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" +: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" + +: "${AWS_BATCH_JOB_ID:=}" +: "${GITHUB_RUN_ID:=}" + +bin="$(dirname "$0")" +job_name="${1:?A job name is required as the first argument}" +github_repo="${2:?A GitHub repository with owner and repository name is required as the second argument}" + +echo "Notifying Slack about failed ${job_name} job." +message="❌ ${job_name} job has FAILED 😞 " + +if [[ -n "${AWS_BATCH_JOB_ID}" ]]; then + message+="See AWS Batch job \`${AWS_BATCH_JOB_ID}\` () for error details. " +elif [[ -n "${GITHUB_RUN_ID}" ]]; then + message+="See GitHub Action for error details. " +fi + +"$bin"/notify-slack "$message" diff --git a/ingest/vendored/notify-on-job-start b/ingest/vendored/notify-on-job-start new file mode 100755 index 0000000..1c8ce7d --- /dev/null +++ b/ingest/vendored/notify-on-job-start @@ -0,0 +1,27 @@ +#!/usr/bin/env bash +set -euo pipefail + +: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" +: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" + +: "${AWS_BATCH_JOB_ID:=}" +: "${GITHUB_RUN_ID:=}" + +bin="$(dirname "$0")" +job_name="${1:?A job name is required as the first argument}" +github_repo="${2:?A GitHub repository with owner and repository name is required as the second argument}" +build_dir="${3:-ingest}" + +echo "Notifying Slack about started ${job_name} job." +message="${job_name} job has started." + +if [[ -n "${GITHUB_RUN_ID}" ]]; then + message+=" The job was submitted by GitHub Action ." +fi + +if [[ -n "${AWS_BATCH_JOB_ID}" ]]; then + message+=" The job was launched as AWS Batch job \`${AWS_BATCH_JOB_ID}\` ()." + message+=" Follow along in your local clone of ${github_repo} with: "'```'"nextstrain build --aws-batch --no-download --attach ${AWS_BATCH_JOB_ID} ${build_dir}"'```' +fi + +"$bin"/notify-slack "$message" diff --git a/ingest/vendored/notify-on-record-change b/ingest/vendored/notify-on-record-change new file mode 100755 index 0000000..f424252 --- /dev/null +++ b/ingest/vendored/notify-on-record-change @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +set -euo pipefail + +: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" +: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" + +bin="$(dirname "$0")" + +src="${1:?A source ndjson file is required as the first argument.}" +dst="${2:?A destination ndjson s3:// URL is required as the second argument.}" +source_name=${3:?A record source name is required as the third argument.} + +# if the file is not already present, just exit +"$bin"/s3-object-exists "$dst" || exit 0 + +s3path="${dst#s3://}" +bucket="${s3path%%/*}" +key="${s3path#*/}" + +src_record_count="$(wc -l < "$src")" + +# Try getting record count from S3 object metadata +dst_record_count="$(aws s3api head-object --bucket "$bucket" --key "$key" --query "Metadata.recordcount || ''" --output text 2>/dev/null || true)" +if [[ -z "$dst_record_count" ]]; then + # This object doesn't have the record count stored as metadata + # We have to download it and count the lines locally + dst_record_count="$(wc -l < <(aws s3 cp --no-progress "$dst" - | xz -T0 -dcfq))" +fi + +added_records="$(( src_record_count - dst_record_count ))" + +printf "%'4d %s\n" "$src_record_count" "$src" +printf "%'4d %s\n" "$dst_record_count" "$dst" +printf "%'4d added records\n" "$added_records" + +slack_message="" + +if [[ $added_records -gt 0 ]]; then + echo "Notifying Slack about added records (n=$added_records)" + slack_message="📈 New records (n=$added_records) found on $source_name." + +elif [[ $added_records -lt 0 ]]; then + echo "Notifying Slack about fewer records (n=$added_records)" + slack_message="📉 Fewer records (n=$added_records) found on $source_name." + +else + echo "Notifying Slack about same number of records" + slack_message="⛔ No new records found on $source_name." +fi + +slack_message+=" (Total record count: $src_record_count)" + +"$bin"/notify-slack "$slack_message" diff --git a/ingest/vendored/notify-slack b/ingest/vendored/notify-slack new file mode 100755 index 0000000..a343435 --- /dev/null +++ b/ingest/vendored/notify-slack @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +set -euo pipefail + +: "${SLACK_TOKEN:?The SLACK_TOKEN environment variable is required.}" +: "${SLACK_CHANNELS:?The SLACK_CHANNELS environment variable is required.}" + +upload=0 +output=/dev/null +thread_ts="" +broadcast=0 +args=() + +for arg; do + case "$arg" in + --upload) + upload=1;; + --output=*) + output="${arg#*=}";; + --thread-ts=*) + thread_ts="${arg#*=}";; + --broadcast) + broadcast=1;; + *) + args+=("$arg");; + esac +done + +set -- "${args[@]}" + +text="${1:?Some message text is required.}" + +if [[ "$upload" == 1 ]]; then + echo "Uploading data to Slack with the message: $text" + curl https://slack.com/api/files.upload \ + --header "Authorization: Bearer $SLACK_TOKEN" \ + --form-string channels="$SLACK_CHANNELS" \ + --form-string title="$text" \ + --form-string filename="$text" \ + --form-string thread_ts="$thread_ts" \ + --form file=@/dev/stdin \ + --form filetype=text \ + --fail --silent --show-error \ + --http1.1 \ + --output "$output" +else + echo "Posting Slack message: $text" + curl https://slack.com/api/chat.postMessage \ + --header "Authorization: Bearer $SLACK_TOKEN" \ + --form-string channel="$SLACK_CHANNELS" \ + --form-string text="$text" \ + --form-string thread_ts="$thread_ts" \ + --form-string reply_broadcast="$broadcast" \ + --fail --silent --show-error \ + --http1.1 \ + --output "$output" +fi diff --git a/ingest/vendored/s3-object-exists b/ingest/vendored/s3-object-exists new file mode 100755 index 0000000..679c20a --- /dev/null +++ b/ingest/vendored/s3-object-exists @@ -0,0 +1,8 @@ +#!/usr/bin/env bash +set -euo pipefail + +url="${1#s3://}" +bucket="${url%%/*}" +key="${url#*/}" + +aws s3api head-object --bucket "$bucket" --key "$key" &>/dev/null diff --git a/ingest/vendored/sha256sum b/ingest/vendored/sha256sum new file mode 100755 index 0000000..32d7ef8 --- /dev/null +++ b/ingest/vendored/sha256sum @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 +""" +Portable sha256sum utility. +""" +from hashlib import sha256 +from sys import stdin + +chunk_size = 5 * 1024**2 # 5 MiB + +h = sha256() + +for chunk in iter(lambda: stdin.buffer.read(chunk_size), b""): + h.update(chunk) + +print(h.hexdigest()) diff --git a/ingest/vendored/tests/transform-strain-names/transform-strain-names.t b/ingest/vendored/tests/transform-strain-names/transform-strain-names.t new file mode 100644 index 0000000..1c05df7 --- /dev/null +++ b/ingest/vendored/tests/transform-strain-names/transform-strain-names.t @@ -0,0 +1,17 @@ +Look for strain name in "strain" or a list of backup fields. + +If strain entry exists, do not do anything. + + $ echo '{"strain": "i/am/a/strain", "strain_s": "other"}' \ + > | $TESTDIR/../../transform-strain-names \ + > --strain-regex '^.+$' \ + > --backup-fields strain_s accession + {"strain":"i/am/a/strain","strain_s":"other"} + +If strain entry does not exists, search the backup fields + + $ echo '{"strain_s": "other"}' \ + > | $TESTDIR/../../transform-strain-names \ + > --strain-regex '^.+$' \ + > --backup-fields accession strain_s + {"strain_s":"other","strain":"other"} \ No newline at end of file diff --git a/ingest/vendored/transform-authors b/ingest/vendored/transform-authors new file mode 100755 index 0000000..0bade20 --- /dev/null +++ b/ingest/vendored/transform-authors @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +""" +Abbreviates a full list of authors to be ' et al.' of the NDJSON +record from stdin and outputs modified records to stdout. + +Note: This is a "best effort" approach and can potentially mangle the author name. +""" +import argparse +import json +import re +from sys import stderr, stdin, stdout + + +def parse_authors(record: dict, authors_field: str, default_value: str, + index: int, abbr_authors_field: str = None) -> dict: + # Strip and normalize whitespace + new_authors = re.sub(r'\s+', ' ', record[authors_field]) + + if new_authors == "": + new_authors = default_value + else: + # Split authors list on comma/semicolon + # OR "and"/"&" with at least one space before and after + new_authors = re.split(r'(?:\s*[,,;;]\s*|\s+(?:and|&)\s+)', new_authors)[0] + + # if it does not already end with " et al.", add it + if not new_authors.strip('. ').endswith(" et al"): + new_authors += ' et al' + + if abbr_authors_field: + if record.get(abbr_authors_field): + print( + f"WARNING: the {abbr_authors_field!r} field already exists", + f"in record {index} and will be overwritten!", + file=stderr + ) + + record[abbr_authors_field] = new_authors + else: + record[authors_field] = new_authors + + return record + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--authors-field", default="authors", + help="The field containing list of authors.") + parser.add_argument("--default-value", default="?", + help="Default value to use if authors list is empty.") + parser.add_argument("--abbr-authors-field", + help="The field for the generated abbreviated authors. " + + "If not provided, the original authors field will be modified.") + + args = parser.parse_args() + + for index, record in enumerate(stdin): + record = json.loads(record) + + parse_authors(record, args.authors_field, args.default_value, index, args.abbr_authors_field) + + json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') + print() diff --git a/ingest/vendored/transform-field-names b/ingest/vendored/transform-field-names new file mode 100755 index 0000000..fde223f --- /dev/null +++ b/ingest/vendored/transform-field-names @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +""" +Renames fields of the NDJSON record from stdin and outputs modified records +to stdout. +""" +import argparse +import json +from sys import stderr, stdin, stdout + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--field-map", nargs="+", + help="Fields names in the NDJSON record mapped to new field names, " + + "formatted as '{old_field_name}={new_field_name}'. " + + "If the old field does not exist in record, the new field will be added with an empty string value." + + "If the new field already exists in record, then the renaming of the old field will be skipped.") + parser.add_argument("--force", action="store_true", + help="Force renaming of old field even if the new field already exists. " + + "Please keep in mind this will overwrite the value of the new field.") + + args = parser.parse_args() + + field_map = {} + for field in args.field_map: + old_name, new_name = field.split('=') + field_map[old_name] = new_name + + for record in stdin: + record = json.loads(record) + + for old_field, new_field in field_map.items(): + + if record.get(new_field) and not args.force: + print( + f"WARNING: skipping rename of {old_field} because record", + f"already has a field named {new_field}.", + file=stderr + ) + continue + + record[new_field] = record.pop(old_field, '') + + json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') + print() diff --git a/ingest/vendored/transform-genbank-location b/ingest/vendored/transform-genbank-location new file mode 100755 index 0000000..70ba56f --- /dev/null +++ b/ingest/vendored/transform-genbank-location @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +""" +Parses GenBank's 'location' field of the NDJSON record from stdin to 3 separate +fields: 'country', 'division', and 'location'. Checks that a record is from +GenBank by verifying that the 'database' field has a value of "GenBank" or "RefSeq". + +Outputs the modified record to stdout. +""" +import json +from sys import stdin, stdout + + +def parse_location(record: dict) -> dict: + # Expected pattern for the location field is "[:][, ]" + # See GenBank docs for their "country" field: + # https://www.ncbi.nlm.nih.gov/genbank/collab/country/ + geographic_data = record['location'].split(':') + + country = geographic_data[0] + division = '' + location = '' + + if len(geographic_data) == 2: + division , _ , location = geographic_data[1].partition(',') + + record['country'] = country.strip() + record['division'] = division.strip() + record['location'] = location.strip() + + return record + + +if __name__ == '__main__': + + for record in stdin: + record = json.loads(record) + + database = record.get('database', '') + if database in {'GenBank', 'RefSeq'}: + parse_location(record) + + json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') + print() diff --git a/ingest/vendored/transform-strain-names b/ingest/vendored/transform-strain-names new file mode 100755 index 0000000..d86c0e4 --- /dev/null +++ b/ingest/vendored/transform-strain-names @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +""" +Verifies strain name pattern in the 'strain' field of the NDJSON record from +stdin. Adds a 'strain' field to the record if it does not already exist. + +Outputs the modified records to stdout. +""" +import argparse +import json +import re +from sys import stderr, stdin, stdout + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument("--strain-regex", default="^.+$", + help="Regex pattern for strain names. " + + "Strain names that do not match the pattern will be dropped.") + parser.add_argument("--backup-fields", nargs="*", + help="List of backup fields to use as strain name if the value in 'strain' " + + "does not match the strain regex pattern. " + + "If multiple fields are provided, will use the first field that has a non-empty string.") + + args = parser.parse_args() + + strain_name_pattern = re.compile(args.strain_regex) + + for index, record in enumerate(stdin): + record = json.loads(record) + + # Verify strain name matches the strain regex pattern + if strain_name_pattern.match(record.get('strain', '')) is None: + # Default to empty string if not matching pattern + record['strain'] = '' + # Use non-empty value of backup fields if provided + if args.backup_fields: + for field in args.backup_fields: + if record.get(field): + record['strain'] = str(record[field]) + break + + if record['strain'] == '': + print(f"WARNING: Record number {index} has an empty string as the strain name.", file=stderr) + + + json.dump(record, stdout, allow_nan=False, indent=None, separators=',:') + print() diff --git a/ingest/vendored/trigger b/ingest/vendored/trigger new file mode 100755 index 0000000..586f9cc --- /dev/null +++ b/ingest/vendored/trigger @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +set -euo pipefail + +: "${PAT_GITHUB_DISPATCH:=}" + +github_repo="${1:?A GitHub repository with owner and repository name is required as the first argument.}" +event_type="${2:?An event type is required as the second argument.}" +shift 2 + +if [[ $# -eq 0 && -z $PAT_GITHUB_DISPATCH ]]; then + cat >&2 <<. +You must specify options to curl for your GitHub credentials. For example, you +can specify your GitHub username, and will be prompted for your password: + + $0 $github_repo $event_type --user + +Be sure to enter a personal access token¹ as your password since GitHub has +discontinued password authentication to the API starting on November 13, 2020². + +You can also store your credentials or a personal access token in a netrc +file³: + + machine api.github.com + login + password + +and then tell curl to use it: + + $0 $github_repo $event_type --netrc + +which will then not require you to type your password every time. + +¹ https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line +² https://docs.github.com/en/rest/overview/other-authentication-methods#via-username-and-password +³ https://ec.haxx.se/usingcurl/usingcurl-netrc +. + exit 1 +fi + +auth=':' +if [[ -n $PAT_GITHUB_DISPATCH ]]; then + auth="Authorization: Bearer ${PAT_GITHUB_DISPATCH}" +fi + +if curl -fsS "https://api.github.com/repos/${github_repo}/dispatches" \ + -H 'Accept: application/vnd.github.v3+json' \ + -H 'Content-Type: application/json' \ + -H "$auth" \ + -d '{"event_type":"'"$event_type"'"}' \ + "$@" +then + echo "Successfully triggered $event_type" +else + echo "Request failed" >&2 + exit 1 +fi diff --git a/ingest/vendored/trigger-on-new-data b/ingest/vendored/trigger-on-new-data new file mode 100755 index 0000000..470d2f4 --- /dev/null +++ b/ingest/vendored/trigger-on-new-data @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +set -euo pipefail + +: "${PAT_GITHUB_DISPATCH:?The PAT_GITHUB_DISPATCH environment variable is required.}" + +bin="$(dirname "$0")" + +github_repo="${1:?A GitHub repository with owner and repository name is required as the first argument.}" +event_type="${2:?An event type is required as the second argument.}" +metadata="${3:?A metadata upload output file is required as the third argument.}" +sequences="${4:?An sequence FASTA upload output file is required as the fourth argument.}" +identical_file_message="${5:-files are identical}" + +new_metadata=$(grep "$identical_file_message" "$metadata" >/dev/null; echo $?) +new_sequences=$(grep "$identical_file_message" "$sequences" >/dev/null; echo $?) + +slack_message="" + +# grep exit status 0 for found match, 1 for no match, 2 if an error occurred +if [[ $new_metadata -eq 1 || $new_sequences -eq 1 ]]; then + slack_message="Triggering new builds due to updated metadata and/or sequences" + "$bin"/trigger "$github_repo" "$event_type" +elif [[ $new_metadata -eq 0 && $new_sequences -eq 0 ]]; then + slack_message="Skipping trigger of rebuild: Both metadata TSV and sequences FASTA are identical to S3 files." +else + slack_message="Skipping trigger of rebuild: Unable to determine if data has been updated." +fi + + +if ! "$bin"/notify-slack "$slack_message"; then + echo "Notifying Slack failed, but exiting with success anyway." +fi diff --git a/ingest/vendored/upload-to-s3 b/ingest/vendored/upload-to-s3 new file mode 100755 index 0000000..36d171c --- /dev/null +++ b/ingest/vendored/upload-to-s3 @@ -0,0 +1,78 @@ +#!/usr/bin/env bash +set -euo pipefail + +bin="$(dirname "$0")" + +main() { + local quiet=0 + + for arg; do + case "$arg" in + --quiet) + quiet=1 + shift;; + *) + break;; + esac + done + + local src="${1:?A source file is required as the first argument.}" + local dst="${2:?A destination s3:// URL is required as the second argument.}" + local cloudfront_domain="${3:-}" + + local s3path="${dst#s3://}" + local bucket="${s3path%%/*}" + local key="${s3path#*/}" + + local src_hash dst_hash no_hash=0000000000000000000000000000000000000000000000000000000000000000 + src_hash="$("$bin/sha256sum" < "$src")" + dst_hash="$(aws s3api head-object --bucket "$bucket" --key "$key" --query Metadata.sha256sum --output text 2>/dev/null || echo "$no_hash")" + + if [[ $src_hash != "$dst_hash" ]]; then + # The record count may have changed + src_record_count="$(wc -l < "$src")" + + echo "Uploading $src → $dst" + if [[ "$dst" == *.gz ]]; then + gzip -c "$src" + elif [[ "$dst" == *.xz ]]; then + xz -2 -T0 -c "$src" + elif [[ "$dst" == *.zst ]]; then + zstd -T0 -c "$src" + else + cat "$src" + fi | aws s3 cp --no-progress - "$dst" --metadata sha256sum="$src_hash",recordcount="$src_record_count" "$(content-type "$dst")" + + if [[ -n $cloudfront_domain ]]; then + echo "Creating CloudFront invalidation for $cloudfront_domain/$key" + if ! "$bin"/cloudfront-invalidate "$cloudfront_domain" "/$key"; then + echo "CloudFront invalidation failed, but exiting with success anyway." + fi + fi + + if [[ $quiet == 1 ]]; then + echo "Quiet mode. No Slack notification sent." + exit 0 + fi + + if ! "$bin"/notify-slack "Updated $dst available."; then + echo "Notifying Slack failed, but exiting with success anyway." + fi + else + echo "Uploading $src → $dst: files are identical, skipping upload" + fi +} + +content-type() { + case "$1" in + *.tsv) echo --content-type=text/tab-separated-values;; + *.csv) echo --content-type=text/comma-separated-values;; + *.ndjson) echo --content-type=application/x-ndjson;; + *.gz) echo --content-type=application/gzip;; + *.xz) echo --content-type=application/x-xz;; + *.zst) echo --content-type=application/zstd;; + *) echo --content-type=text/plain;; + esac +} + +main "$@"