Skip to content

Commit

Permalink
WIP - allow different input sources
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshadfield committed Dec 2, 2024
1 parent fb99903 commit 82abda3
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 41 deletions.
57 changes: 55 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The Snakemake pipeline is parameterised by two config files, one for the A/H5N1,
* [Running H5N1 Cattle Outbreak (2024) builds](#running-h5n1-cattle-outbreak-2024-builds)
* [Creating a custom build via config overlays](#creating-a-custom-build-via-config-overlays)
* [Running builds in a separate working directory](#running-builds-in-a-separate-working-directory)
* [Adding private data](#adding-private-data)
* [Cleavage Site Annotations](#cleavage-side-annotations)
* [H5 Clade Labeling](#h5-clade-labeling)
* [Other build customisations](#other-build-customisations)
Expand Down Expand Up @@ -181,6 +182,37 @@ Depending on how you run builds this can be very liberating; for instance if you

> You don't have to name it `config.yaml`, but if you use a different filename you'll have to specify it via `--configfile <filename>`.

## Adding private data

Private metadata and/or sequences can be supplied by defining `additional_inputs` in a config overlay YAML.

```yaml
additional_inputs:
- name: secret
metadata: secret.tsv
sequences:
ha: secret_ha.fasta
```

The filenames here can be S3 URIs (ensure you have credentials set in your environment) or local files.
In this case local files should be specified relative to the analysis directory (typically where you run the command from).

If you have data for all segments you can use a slightly different and more concise syntax:
```yaml
additional_inputs:
- name: secret
metadata: secret.tsv
sequencs: secret_{segment}.fasta
```

> These added data will be subject to the same filtering rules as the starting data.
At a minimum, you'll want to ensure new sequences have metadata which defines their subtype and date, as filter steps will prune out those without valid values here.

> Metadata merging is via `augur merge` and we add one-hot columns for each input source with the column name `input_{NAME}`, for instance the above example would have a `input_secret` column with values of `1` for metadata rows which were included in `secret.tsv` and `0` otherwise.
You can use this for additional filtering commands as needed.

By default each workflow config defines a single metadata input and one FASTA per segment.


## Cleavage Site Annotations

Expand Down Expand Up @@ -224,8 +256,29 @@ Note that you may need to remove any existing data in `results/` in order for sn

### Using locally ingested data (instead of downloading from S3)

Run the pipeline with `--config 'local_ingest=True'` to use the locally available files produced by the ingest pipeline (see `./ingest/README.md` for details on how to run).
Specifically, the files needed are `ingest/results/metadata.tsv` and `ingest/results/sequences_{SEGMENT}.fasta`.
Each workflow defines an input via `config['inputs']`, e.g. the GISAID build uses:
```yaml
inputs:
- name: gisaid
metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst
sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst
```

You can use an approach analogous to the [addition of private data](#adding-private-data) above to replace this array in your config overlay to point to local files instead.
If you have run the default ingest pipeline a config overlay of

```yaml
inputs:
- name: ingest
metadata: ingest/fauna/results/metadata.tsv
sequences: ingest/fauna/results/sequences_{segment}.fasta
```

Will result in the default inputs being replaced by paths to these local ingest files.
The search order for local files is:
1. Relative to the analysis directory
2. Relative to the entry snakefile
3. Relative to the `avian-flu` directory


### To modify this build to work with your own data
Expand Down
125 changes: 93 additions & 32 deletions Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,6 @@ def sanity_check_config():
print("-"*80, file=sys.stderr)
raise InvalidConfigError("No config")

assert LOCAL_INGEST or S3_SRC, "The config must define either 's3_src' or 'local_ingest'"
# NOTE: we could relax the following exclusivity of S3_SRC and LOCAL_INGEST
# if we want to use `--config local_ingest=gisaid` overrides.
assert not (S3_SRC and LOCAL_INGEST), "The config defined both 'local_ingest' and 's3_src', which are mutually exclusive"
if S3_SRC:
assert isinstance(S3_SRC, dict) and all([k in S3_SRC for k in ("name", "sequences", "metadata")]), \
"Config 's3_src' must be a dict with 'name', 'sequences' and 'metadata' keys"

sanity_check_config()

def as_list(x):
Expand Down Expand Up @@ -233,48 +225,115 @@ rule files:
files = rules.files.params


rule download_sequences:
rule download_s3_sequences:
output:
sequences = f"data/{S3_SRC.get('name', None)}/sequences_{{segment}}.fasta",
metadata = "data/{input_name}/sequences_{segment}.fasta",
params:
address=lambda w: S3_SRC.get('sequences', None).format(segment=w.segment),
no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('sequences', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else ""
address=lambda w: input_by_name(w.input_name, segment=w.segment).format(segment=w.segment),
no_sign_request=lambda w: "--no-sign-request" if input_by_name(w.input_name, segment=w.segment).startswith(NEXTSTRAIN_PUBLIC_BUCKET) else ""
shell:
"""
aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.sequences}
aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.metadata}
"""

rule download_metadata:
rule download_s3_metadata:
output:
metadata = f"data/{S3_SRC.get('name', None)}/metadata.tsv",
metadata = "data/{input_name}/metadata.tsv",
params:
address=S3_SRC.get('metadata', None),
no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('metadata', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else ""
address=lambda w: input_by_name(w.input_name, metadata=True),
no_sign_request=lambda w: "--no-sign-request" if input_by_name(w.input_name, metadata=True).startswith(NEXTSTRAIN_PUBLIC_BUCKET) else ""
shell:
"""
aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.metadata}
"""

def _segment_input(info, segment):
address = info.get('sequences', None)
if address is None:
return address
elif isinstance(address, str):
return address
elif isinstance(address, dict):
return address.get(segment, None)
raise InvalidConfigError(f"Invalid structure for one or more provided (additional) input sequences")

def input_metadata(wildcards):
if S3_SRC:
return f"data/{S3_SRC['name']}/metadata.tsv",
elif LOCAL_INGEST:
return f"ingest/{LOCAL_INGEST}/results/metadata.tsv",
raise Exception() # already caught by `sanity_check_config` above, this is just being cautious
def input_by_name(name, metadata=False, segment=False):
assert (metadata and not segment) or (segment and not metadata), "Workflow bug"
info = next((c for c in [*config['inputs'], *config.get('additional_inputs', [])] if c['name']==name))
return info.get('metadata', None) if metadata else _segment_input(info, segment)

def input_sequences(wildcards):
if S3_SRC:
return f"data/{S3_SRC['name']}/sequences_{wildcards.segment}.fasta",
elif LOCAL_INGEST:
return f"ingest/{LOCAL_INGEST}/results/sequences_{wildcards.segment}.fasta"
raise Exception() # already caught by `sanity_check_config` above, this is just being cautious
def collect_inputs(metadata=None, segment=None, augur_merge=False):
"""
This function is pure - subsequent calls will return the same results.
"""
assert (metadata and not segment) or (segment and not metadata), "Workflow bug"

inputs = []
for source in [*config['inputs'], *config.get('additional_inputs', [])]:
name = source['name']

address = source.get('metadata', None) if metadata else _segment_input(source, segment)
if address is None:
continue # inputs can define (e.g.) only metadata, or only sequences for some segments etc

# addresses may be a remote filepath or a local file
if address.startswith('s3://'):
download_path = f"data/{source['name']}/metadata.tsv" \
if metadata \
else f"data/{source['name']}/sequences_{segment}.fasta"
inputs.append((name, download_path))
continue
elif address.startswith(r'http[s]:\/\/'):
raise InvalidConfigError("Workflow cannot yet handle HTTP[S] inputs")
inputs.append((name, resolve_config_path(address, {'segment':segment})))

if not inputs:
raise InvalidConfigError("No inputs provided with defined metadata")

if augur_merge:
return " ".join([f"{x[0]}={x[1]}" for x in inputs])
return [x[1] for x in inputs]

rule merge_metadata:
"""
This rule should only be invoked if there are multiple defined metadata inputs
(config.inputs + config.additional_inputs)
"""
input:
metadata = collect_inputs(metadata=True)
params:
metadata = collect_inputs(metadata=True, augur_merge=True)
output:
metadata = "results/metadata_merged.tsv"
shell:
r"""
augur merge \
--metadata {params.metadata} \
--source-columns 'input_{{NAME}}' \
--output-metadata {output.metadata}
"""

rule merge_sequences:
"""
Placeholder rule while we want for `augur merge` to support sequences
"""
input:
metadata = lambda w: collect_inputs(segment=w.segment)
output:
metadata = "results/sequences_merged_{segment}.fasta"
shell:
r"""
cat {input.metadata} > {output.metadata}
"""

rule filter_sequences_by_subtype:
input:
sequences = input_sequences,
metadata = input_metadata,
sequences = lambda w: collect_inputs(segment=w.segment)[0]
if len(collect_inputs(segment=w.segment))==1
else f"results/sequences_merged_{w.segment}.fasta",
metadata = collect_inputs(metadata=True)[0]
if len(collect_inputs(metadata=True))==1
else "results/metadata_merged.tsv"
output:
sequences = "results/{subtype}/{segment}/sequences.fasta",
params:
Expand All @@ -290,7 +349,9 @@ rule filter_sequences_by_subtype:

rule filter_metadata_by_subtype:
input:
metadata = input_metadata,
metadata = collect_inputs(metadata=True)[0]
if len(collect_inputs(metadata=True))==1
else "results/metadata_merged.tsv"
output:
metadata = "results/{subtype}/metadata.tsv",
params:
Expand Down
12 changes: 5 additions & 7 deletions gisaid/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ builds:
target_patterns:
- "auspice/avian-flu_{subtype}_{segment}_{time}.json"

#### Parameters which define the input source ####
s3_src:
name: gisaid
metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst
sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst
local_ingest: false
# P.S. To use local ingest files, comment out s3_src and change to local_ingest: fauna
# Input source(s)
inputs:
- name: gisaid
metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst
sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst

# For subtypes defined as build wildcards (e.g. "h5n1", "h5nx"), list out the subtype values
# that we'll use to filter the starting metadata's 'subtype' column
Expand Down

0 comments on commit 82abda3

Please sign in to comment.