From 82abda37719640d5c566160094695bd10134d868 Mon Sep 17 00:00:00 2001 From: james hadfield Date: Mon, 2 Dec 2024 17:10:56 +1300 Subject: [PATCH] WIP - allow different input sources --- README.md | 57 ++++++++++++++++++++- Snakefile | 125 +++++++++++++++++++++++++++++++++------------ gisaid/config.yaml | 12 ++--- 3 files changed, 153 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index be7b1d2..31cf8e8 100755 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ The Snakemake pipeline is parameterised by two config files, one for the A/H5N1, * [Running H5N1 Cattle Outbreak (2024) builds](#running-h5n1-cattle-outbreak-2024-builds) * [Creating a custom build via config overlays](#creating-a-custom-build-via-config-overlays) * [Running builds in a separate working directory](#running-builds-in-a-separate-working-directory) +* [Adding private data](#adding-private-data) * [Cleavage Site Annotations](#cleavage-side-annotations) * [H5 Clade Labeling](#h5-clade-labeling) * [Other build customisations](#other-build-customisations) @@ -181,6 +182,37 @@ Depending on how you run builds this can be very liberating; for instance if you > You don't have to name it `config.yaml`, but if you use a different filename you'll have to specify it via `--configfile `. +## Adding private data + +Private metadata and/or sequences can be supplied by defining `additional_inputs` in a config overlay YAML. + +```yaml +additional_inputs: + - name: secret + metadata: secret.tsv + sequences: + ha: secret_ha.fasta +``` + +The filenames here can be S3 URIs (ensure you have credentials set in your environment) or local files. +In this case local files should be specified relative to the analysis directory (typically where you run the command from). + +If you have data for all segments you can use a slightly different and more concise syntax: +```yaml +additional_inputs: + - name: secret + metadata: secret.tsv + sequencs: secret_{segment}.fasta +``` + +> These added data will be subject to the same filtering rules as the starting data. + At a minimum, you'll want to ensure new sequences have metadata which defines their subtype and date, as filter steps will prune out those without valid values here. + +> Metadata merging is via `augur merge` and we add one-hot columns for each input source with the column name `input_{NAME}`, for instance the above example would have a `input_secret` column with values of `1` for metadata rows which were included in `secret.tsv` and `0` otherwise. + You can use this for additional filtering commands as needed. + +By default each workflow config defines a single metadata input and one FASTA per segment. + ## Cleavage Site Annotations @@ -224,8 +256,29 @@ Note that you may need to remove any existing data in `results/` in order for sn ### Using locally ingested data (instead of downloading from S3) -Run the pipeline with `--config 'local_ingest=True'` to use the locally available files produced by the ingest pipeline (see `./ingest/README.md` for details on how to run). -Specifically, the files needed are `ingest/results/metadata.tsv` and `ingest/results/sequences_{SEGMENT}.fasta`. +Each workflow defines an input via `config['inputs']`, e.g. the GISAID build uses: +```yaml +inputs: + - name: gisaid + metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst + sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst +``` + +You can use an approach analogous to the [addition of private data](#adding-private-data) above to replace this array in your config overlay to point to local files instead. +If you have run the default ingest pipeline a config overlay of + +```yaml +inputs: + - name: ingest + metadata: ingest/fauna/results/metadata.tsv + sequences: ingest/fauna/results/sequences_{segment}.fasta +``` + +Will result in the default inputs being replaced by paths to these local ingest files. +The search order for local files is: + 1. Relative to the analysis directory + 2. Relative to the entry snakefile + 3. Relative to the `avian-flu` directory ### To modify this build to work with your own data diff --git a/Snakefile b/Snakefile index 3f7accc..b6fe123 100755 --- a/Snakefile +++ b/Snakefile @@ -148,14 +148,6 @@ def sanity_check_config(): print("-"*80, file=sys.stderr) raise InvalidConfigError("No config") - assert LOCAL_INGEST or S3_SRC, "The config must define either 's3_src' or 'local_ingest'" - # NOTE: we could relax the following exclusivity of S3_SRC and LOCAL_INGEST - # if we want to use `--config local_ingest=gisaid` overrides. - assert not (S3_SRC and LOCAL_INGEST), "The config defined both 'local_ingest' and 's3_src', which are mutually exclusive" - if S3_SRC: - assert isinstance(S3_SRC, dict) and all([k in S3_SRC for k in ("name", "sequences", "metadata")]), \ - "Config 's3_src' must be a dict with 'name', 'sequences' and 'metadata' keys" - sanity_check_config() def as_list(x): @@ -233,48 +225,115 @@ rule files: files = rules.files.params -rule download_sequences: +rule download_s3_sequences: output: - sequences = f"data/{S3_SRC.get('name', None)}/sequences_{{segment}}.fasta", + metadata = "data/{input_name}/sequences_{segment}.fasta", params: - address=lambda w: S3_SRC.get('sequences', None).format(segment=w.segment), - no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('sequences', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else "" + address=lambda w: input_by_name(w.input_name, segment=w.segment).format(segment=w.segment), + no_sign_request=lambda w: "--no-sign-request" if input_by_name(w.input_name, segment=w.segment).startswith(NEXTSTRAIN_PUBLIC_BUCKET) else "" shell: """ - aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.sequences} + aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.metadata} """ -rule download_metadata: +rule download_s3_metadata: output: - metadata = f"data/{S3_SRC.get('name', None)}/metadata.tsv", + metadata = "data/{input_name}/metadata.tsv", params: - address=S3_SRC.get('metadata', None), - no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('metadata', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else "" + address=lambda w: input_by_name(w.input_name, metadata=True), + no_sign_request=lambda w: "--no-sign-request" if input_by_name(w.input_name, metadata=True).startswith(NEXTSTRAIN_PUBLIC_BUCKET) else "" shell: """ aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.metadata} """ +def _segment_input(info, segment): + address = info.get('sequences', None) + if address is None: + return address + elif isinstance(address, str): + return address + elif isinstance(address, dict): + return address.get(segment, None) + raise InvalidConfigError(f"Invalid structure for one or more provided (additional) input sequences") -def input_metadata(wildcards): - if S3_SRC: - return f"data/{S3_SRC['name']}/metadata.tsv", - elif LOCAL_INGEST: - return f"ingest/{LOCAL_INGEST}/results/metadata.tsv", - raise Exception() # already caught by `sanity_check_config` above, this is just being cautious +def input_by_name(name, metadata=False, segment=False): + assert (metadata and not segment) or (segment and not metadata), "Workflow bug" + info = next((c for c in [*config['inputs'], *config.get('additional_inputs', [])] if c['name']==name)) + return info.get('metadata', None) if metadata else _segment_input(info, segment) -def input_sequences(wildcards): - if S3_SRC: - return f"data/{S3_SRC['name']}/sequences_{wildcards.segment}.fasta", - elif LOCAL_INGEST: - return f"ingest/{LOCAL_INGEST}/results/sequences_{wildcards.segment}.fasta" - raise Exception() # already caught by `sanity_check_config` above, this is just being cautious +def collect_inputs(metadata=None, segment=None, augur_merge=False): + """ + This function is pure - subsequent calls will return the same results. + """ + assert (metadata and not segment) or (segment and not metadata), "Workflow bug" + + inputs = [] + for source in [*config['inputs'], *config.get('additional_inputs', [])]: + name = source['name'] + + address = source.get('metadata', None) if metadata else _segment_input(source, segment) + if address is None: + continue # inputs can define (e.g.) only metadata, or only sequences for some segments etc + + # addresses may be a remote filepath or a local file + if address.startswith('s3://'): + download_path = f"data/{source['name']}/metadata.tsv" \ + if metadata \ + else f"data/{source['name']}/sequences_{segment}.fasta" + inputs.append((name, download_path)) + continue + elif address.startswith(r'http[s]:\/\/'): + raise InvalidConfigError("Workflow cannot yet handle HTTP[S] inputs") + inputs.append((name, resolve_config_path(address, {'segment':segment}))) + + if not inputs: + raise InvalidConfigError("No inputs provided with defined metadata") + + if augur_merge: + return " ".join([f"{x[0]}={x[1]}" for x in inputs]) + return [x[1] for x in inputs] + +rule merge_metadata: + """ + This rule should only be invoked if there are multiple defined metadata inputs + (config.inputs + config.additional_inputs) + """ + input: + metadata = collect_inputs(metadata=True) + params: + metadata = collect_inputs(metadata=True, augur_merge=True) + output: + metadata = "results/metadata_merged.tsv" + shell: + r""" + augur merge \ + --metadata {params.metadata} \ + --source-columns 'input_{{NAME}}' \ + --output-metadata {output.metadata} + """ +rule merge_sequences: + """ + Placeholder rule while we want for `augur merge` to support sequences + """ + input: + metadata = lambda w: collect_inputs(segment=w.segment) + output: + metadata = "results/sequences_merged_{segment}.fasta" + shell: + r""" + cat {input.metadata} > {output.metadata} + """ rule filter_sequences_by_subtype: input: - sequences = input_sequences, - metadata = input_metadata, + sequences = lambda w: collect_inputs(segment=w.segment)[0] + if len(collect_inputs(segment=w.segment))==1 + else f"results/sequences_merged_{w.segment}.fasta", + metadata = collect_inputs(metadata=True)[0] + if len(collect_inputs(metadata=True))==1 + else "results/metadata_merged.tsv" output: sequences = "results/{subtype}/{segment}/sequences.fasta", params: @@ -290,7 +349,9 @@ rule filter_sequences_by_subtype: rule filter_metadata_by_subtype: input: - metadata = input_metadata, + metadata = collect_inputs(metadata=True)[0] + if len(collect_inputs(metadata=True))==1 + else "results/metadata_merged.tsv" output: metadata = "results/{subtype}/metadata.tsv", params: diff --git a/gisaid/config.yaml b/gisaid/config.yaml index 779d2df..cb7f0c6 100644 --- a/gisaid/config.yaml +++ b/gisaid/config.yaml @@ -29,13 +29,11 @@ builds: target_patterns: - "auspice/avian-flu_{subtype}_{segment}_{time}.json" -#### Parameters which define the input source #### -s3_src: - name: gisaid - metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst - sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst -local_ingest: false -# P.S. To use local ingest files, comment out s3_src and change to local_ingest: fauna +# Input source(s) +inputs: + - name: gisaid + metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst + sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst # For subtypes defined as build wildcards (e.g. "h5n1", "h5nx"), list out the subtype values # that we'll use to filter the starting metadata's 'subtype' column