Skip to content

Commit

Permalink
Merge pull request #222 from nextstrain/ingest-updates
Browse files Browse the repository at this point in the history
Ingest updates
  • Loading branch information
joverlee521 authored Nov 6, 2023
2 parents 59d3f4b + 302273c commit c9b8282
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 57 deletions.
4 changes: 2 additions & 2 deletions ingest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ nextstrain build .

This will produce two files (within the `ingest` directory):

- `data/metadata.tsv`
- `data/sequences.fasta`
- `results/metadata.tsv`
- `results/sequences.fasta`

Run the complete ingest pipeline and upload results to AWS S3 with

Expand Down
25 changes: 6 additions & 19 deletions ingest/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,24 @@ send_slack_notifications = config.get("send_slack_notifications", False)

def _get_all_targets(wildcards):
# Default targets are the metadata TSV and sequences FASTA files
all_targets = ["data/sequences.fasta", "data/metadata.tsv"]
all_targets = ["results/sequences.fasta", "results/metadata.tsv"]

# Add additional targets based on upload config
upload_config = config.get("upload", {})

for target, params in upload_config.items():
files_to_upload = params.get("files_to_upload", [])
remote_file_names = params.get("remote_file_names", [])
files_to_upload = params.get("files_to_upload", {})

if len(files_to_upload) != len(remote_file_names):
if not params.get("dst"):
print(
f"Skipping file upload for {target!r} because the number of",
"files to upload does not match the number of remote file names.",
)
elif len(remote_file_names) != len(set(remote_file_names)):
print(
f"Skipping file upload for {target!r} because there are duplicate remote file names."
)
elif not params.get("dst"):
print(
f"Skipping file upload for {target!r} because the destintion was not defined."
f"Skipping file upload for {target!r} because the destination was not defined."
)
else:
all_targets.extend(
expand(
[
f"data/upload/{target}/{{file_to_upload}}-to-{{remote_file_name}}.done"
],
[f"data/upload/{target}/{{remote_file_name}}.done"],
zip,
file_to_upload=files_to_upload,
remote_file_name=remote_file_names,
remote_file_name=files_to_upload.keys(),
)
)

Expand Down
31 changes: 11 additions & 20 deletions ingest/config/optional.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,17 @@ upload:
s3:
# AWS S3 Bucket with prefix
dst: 's3://nextstrain-data/files/workflows/mpox'
# Files to upload to S3 that are in the `data` directory
files_to_upload: [
'genbank.ndjson',
'sequences.ndjson',
'metadata.tsv',
'sequences.fasta',
'alignment.fasta',
'insertions.csv',
'translations.zip'
]
# Remote file names for the files to upload, must be in the same order as local files above
remote_file_names: [
'genbank.ndjson.xz',
'all_sequences.ndjson.xz',
'metadata.tsv.gz',
'sequences.fasta.xz',
'alignment.fasta.xz',
'insertions.csv.gz',
'translations.zip'
]
# Mapping of files to upload, with key as remote file name and the value
# the local file path relative to the ingest directory.
files_to_upload:
genbank.ndjson.xz: data/genbank.ndjson
all_sequences.ndjson.xz: data/sequences.ndjson
metadata.tsv.gz: results/metadata.tsv
sequences.fasta.xz: results/sequences.fasta
alignment.fasta.xz: data/alignment.fasta
insertions.csv.gz: data/insertions.csv
translations.zip: data/translations.zip

cloudfront_domain: 'data.nextstrain.org'

# Toggle for Slack notifications
Expand Down
6 changes: 3 additions & 3 deletions ingest/workflow/snakemake_rules/nextclade.smk
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ rule nextclade_dataset_hMPXV:

rule align:
input:
sequences="data/sequences.fasta",
sequences="results/sequences.fasta",
dataset="hmpxv.zip",
output:
alignment="data/alignment.fasta",
Expand All @@ -41,7 +41,7 @@ rule align:

rule nextclade:
input:
sequences="data/sequences.fasta",
sequences="results/sequences.fasta",
dataset="mpxv.zip",
output:
"data/nextclade.tsv",
Expand All @@ -58,7 +58,7 @@ rule join_metadata_clades:
metadata="data/metadata_raw.tsv",
nextclade_field_map=config["nextclade"]["field_map"],
output:
metadata="data/metadata.tsv",
metadata="results/metadata.tsv",
params:
id_field=config["transform"]["id_field"],
nextclade_id_field=config["nextclade"]["id_field"],
Expand Down
2 changes: 1 addition & 1 deletion ingest/workflow/snakemake_rules/slack_notifications.smk
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ rule notify_on_genbank_record_change:

rule notify_on_metadata_diff:
input:
metadata="data/metadata.tsv",
metadata="results/metadata.tsv",
output:
touch("data/notify/metadata-diff.done"),
params:
Expand Down
6 changes: 3 additions & 3 deletions ingest/workflow/snakemake_rules/transform.smk
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ formats and expects input file
This will produce output files as
metadata = "data/metadata.tsv"
sequences = "data/sequences.fasta"
metadata = "data/metadata_raw.tsv"
sequences = "results/sequences.fasta"
Parameters are expected to be defined in `config.transform`.
"""
Expand Down Expand Up @@ -43,7 +43,7 @@ rule transform:
annotations=config["transform"]["annotations"],
output:
metadata="data/metadata_raw.tsv",
sequences="data/sequences.fasta",
sequences="results/sequences.fasta",
log:
"logs/transform.txt",
params:
Expand Down
4 changes: 2 additions & 2 deletions ingest/workflow/snakemake_rules/trigger_rebuild.smk
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ rule trigger_build:
Triggering monekypox builds via repository action type `rebuild`.
"""
input:
metadata_upload="data/upload/s3/metadata.tsv-to-metadata.tsv.gz.done",
fasta_upload="data/upload/s3/sequences.fasta-to-sequences.fasta.xz.done",
metadata_upload="data/upload/s3/metadata.tsv.gz.done",
fasta_upload="data/upload/s3/sequences.fasta.xz.done",
output:
touch("data/trigger/rebuild.done"),
shell:
Expand Down
14 changes: 7 additions & 7 deletions ingest/workflow/snakemake_rules/upload.smk
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This part of the workflow handles uploading files to a specified destination.
Uses predefined wildcard `file_to_upload` determine input and predefined
wildcard `remote_file_name` as the remote file name in the specified destination.
Produces output files as `data/upload/{upload_target_name}/{file_to_upload}-to-{remote_file_name}.done`.
Produces output files as `data/upload/{upload_target_name}/{remote_file_name}.done`.
Currently only supports uploads to AWS S3, but additional upload rules can
be easily added as long as they follow the output pattern described above.
Expand All @@ -26,18 +26,18 @@ def _get_upload_inputs(wildcards):
the rules in `slack_notifications.smk`, so it only includes flag files if
`send_notifications` is True.
"""
file_to_upload = wildcards.file_to_upload

inputs = {
"file_to_upload": f"data/{file_to_upload}",
"file_to_upload": config["upload"]["s3"]["files_to_upload"][
wildcards.remote_file_name
],
}

if send_notifications:
flag_file = []

if file_to_upload == "genbank.ndjson":
if file_to_upload == "data/genbank.ndjson":
flag_file = "data/notify/genbank-record-change.done"
elif file_to_upload == "metadata.tsv":
elif file_to_upload == "results/metadata.tsv":
flag_file = "data/notify/metadata-diff.done"

inputs["notify_flag_file"] = flag_file
Expand All @@ -49,7 +49,7 @@ rule upload_to_s3:
input:
unpack(_get_upload_inputs),
output:
"data/upload/s3/{file_to_upload}-to-{remote_file_name}.done",
"data/upload/s3/{remote_file_name}.done",
params:
quiet="" if send_notifications else "--quiet",
s3_dst=config["upload"].get("s3", {}).get("dst", ""),
Expand Down

0 comments on commit c9b8282

Please sign in to comment.