diff --git a/defaults/parameters.yaml b/defaults/parameters.yaml index 0a7221633..f4d8dd441 100644 --- a/defaults/parameters.yaml +++ b/defaults/parameters.yaml @@ -10,6 +10,32 @@ strip_strain_prefixes: - hCoV-19/ - SARS-CoV-2/ +sanitize_metadata: + rename_fields: + - "Virus name=strain" + - "Type=type" + - "Accession ID=gisaid_epi_isl" + - "Collection date=date" + - "Additional location information=additional_location_information" + - "Sequence length=length" + - "Host=host" + - "Patient age=patient_age" + - "Gender=sex" + - "Clade=GISAID_clade" + - "Pango lineage=pango_lineage" + - "Pangolin version=pangolin_version" + - "Variant=variant" + - "AA Substitutions=aa_substitutions" + - "aaSubtitutions=aa_substitutions" + - "Submission date=date_submitted" + - "Is reference?=is_reference" + - "Is complete?=is_complete" + - "Is high coverage?=is_high_coverage" + - "Is low coverage?=is_low_coverage" + - "N-Content=n_content" + - "GC-Content=gc_content" + parse_location_field: Location + reference_node_name: "USA/WA1/2020" # Define files used for external configuration. Common examples consist of a diff --git a/docs/change_log.md b/docs/change_log.md index 0be56ec0f..a84c41a7c 100644 --- a/docs/change_log.md +++ b/docs/change_log.md @@ -3,6 +3,19 @@ As of April 2021, we use major version numbers (e.g. v2) to reflect backward incompatible changes to the workflow that likely require you to update your Nextstrain installation. We also use this change log to document new features that maintain backward compatibility, indicating these features by the date they were added. +## v7 (27 May 2021) + +For more details about this release, see [the configuration reference for the new "sanitize metadata" parameters](https://nextstrain.github.io/ncov/configuration.html#sanitize_metadata) and [the corresponding pull request](https://github.com/nextstrain/ncov/pull/640). + +### Major changes + +- Deduplicate metadata and sequences from each `inputs` dataset at the beginning of the workflow. + +### Features + +- Support full GISAID metadata and sequences from the "Download packages" interface by converting this default format into Nextstrain-compatible metadata and sequences. +- Support reading metadata and sequences directly from GISAID's tar archives. For example, you can now define `inputs` as `metadata: data/ncov_north-america.tar.gz` and `sequences: data/ncov_north-america.tar.gz` to decompress and read the corresponding data from the archive. + ## New features since last version update - 25 May 2021: Support custom Auspice JSON prefixes with a new configuration parameter, `auspice_json_prefix`. [See the configuration reference for more details](https://nextstrain.github.io/ncov/configuration.html#auspice_json_prefix). ([#643](https://github.com/nextstrain/ncov/pull/643)) diff --git a/docs/configuration.md b/docs/configuration.md index 5d0e012cc..d28462535 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -497,6 +497,44 @@ Valid attributes for list entries in `inputs` are provided below. * description: A list of prefixes to strip from strain names in metadata and sequence records to maintain consistent strain names when analyzing data from multiple sources. * default: `["hCoV-19/", "SARS-CoV-2/"]` +## sanitize_metadata +* type: object +* description: Parameters to configure how to sanitize metadata to a Nextstrain-compatible format. + +### parse_location_field +* type: string +* description: Field in the metadata that stores GISAID-formatted location details (e.g., `North America / USA / Washington`) to be parsed into `region`, `country`, `division`, and `location` fields. +* default: `Location` + +### rename_fields +* type: array +* description: List of key/value pairs mapping fields in the input metadata to rename to another value in the sanitized metadata. +* default: +```yaml + - "Virus name=strain" + - "Type=type" + - "Accession ID=gisaid_epi_isl" + - "Collection date=date" + - "Additional location information=additional_location_information" + - "Sequence length=length" + - "Host=host" + - "Patient age=patient_age" + - "Gender=sex" + - "Clade=GISAID_clade" + - "Pango lineage=pango_lineage" + - "Pangolin version=pangolin_version" + - "Variant=variant" + - "AA Substitutions=aa_substitutions" + - "aaSubtitutions=aa_substitutions" + - "Submission date=date_submitted" + - "Is reference?=is_reference" + - "Is complete?=is_complete" + - "Is high coverage?=is_high_coverage" + - "Is low coverage?=is_low_coverage" + - "N-Content=n_content" + - "GC-Content=gc_content" +``` + ## subsampling * type: object * description: Schemes for subsampling data prior to phylogenetic inference to avoid sampling bias or focus an analysis on specific spatial and/or temporal scales. [See the SARS-CoV-2 tutorial for more details on defining subsampling schemes](https://docs.nextstrain.org/en/latest/tutorials/SARS-CoV-2/steps/customizing-analysis.html#subsampling). diff --git a/scripts/adjust_regional_meta.py b/scripts/adjust_regional_meta.py index f7bba1b29..e6460d625 100644 --- a/scripts/adjust_regional_meta.py +++ b/scripts/adjust_regional_meta.py @@ -41,11 +41,17 @@ metadata.loc[metadata.region != focal_region, 'location'] = "" metadata.loc[metadata.region != focal_region, 'division'] = metadata.region metadata.loc[metadata.region != focal_region, 'country'] = metadata.region - metadata.loc[metadata.region != focal_region, 'division_exposure'] = metadata.region_exposure - metadata.loc[metadata.region != focal_region, 'country_exposure'] = metadata.region_exposure - metadata.loc[(metadata.region == focal_region) & (metadata.region_exposure != focal_region), 'division_exposure'] = metadata.region_exposure - metadata.loc[(metadata.region == focal_region) & (metadata.region_exposure != focal_region), 'country_exposure'] = metadata.region_exposure - metadata.loc[(metadata.region == focal_region) & (metadata.division_exposure.isna()), 'division_exposure'] = metadata.division - metadata.loc[(metadata.region == focal_region) & (metadata.country_exposure.isna()), 'country_exposure'] = metadata.country + + if "region_exposure" in metadata.columns: + metadata.loc[metadata.region != focal_region, 'division_exposure'] = metadata.region_exposure + metadata.loc[metadata.region != focal_region, 'country_exposure'] = metadata.region_exposure + metadata.loc[(metadata.region == focal_region) & (metadata.region_exposure != focal_region), 'division_exposure'] = metadata.region_exposure + metadata.loc[(metadata.region == focal_region) & (metadata.region_exposure != focal_region), 'country_exposure'] = metadata.region_exposure + + if "division_exposure" in metadata.columns: + metadata.loc[(metadata.region == focal_region) & (metadata.division_exposure.isna()), 'division_exposure'] = metadata.division + + if "country_exposure" in metadata.columns: + metadata.loc[(metadata.region == focal_region) & (metadata.country_exposure.isna()), 'country_exposure'] = metadata.country metadata.to_csv(args.output, index=False, sep="\t") diff --git a/scripts/combine-and-dedup-fastas.py b/scripts/combine-and-dedup-fastas.py deleted file mode 100644 index 179618c28..000000000 --- a/scripts/combine-and-dedup-fastas.py +++ /dev/null @@ -1,76 +0,0 @@ -import argparse -from augur.io import open_file, read_sequences, write_sequences -from Bio import SeqIO -import hashlib -import sys -import textwrap - - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description="Combine and dedup FASTAs", - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - - parser.add_argument('--input', type=str, nargs="+", metavar="FASTA", required=True, help="input FASTAs") - parser.add_argument('--warn-about-duplicates', action="store_true", help="warn the user about duplicate strains instead of exiting with an error. The output will include the first occurrence of a duplicate sequence.") - parser.add_argument('--output', type=str, metavar="FASTA", required=True, help="output FASTA") - args = parser.parse_args() - - sequence_hash_by_name = {} - duplicate_strains = set() - - counter = 0 - with open_file(args.output, "w") as output_handle: - # Stream sequences from all input files into a single output file, - # skipping duplicate records (same strain and sequence) and noting - # mismatched sequences for the same strain name. In order to - # prefer the latter files, we have to reverse the order of the - # files. - for record in read_sequences(*reversed(args.input)): - counter += 1 - if counter % 10000 == 0: - print(f"Processed {counter} records") - - # Hash each sequence and check whether another sequence with the - # same name already exists and if the hash is different. - sequence_hash = hashlib.sha256(str(record.seq).encode("utf-8")).hexdigest() - if record.name in sequence_hash_by_name: - # If the hashes differ (multiple entries with the same - # strain name but different sequences), we keep the first - # sequence and add the strain to a list of duplicates to - # report at the end. - if sequence_hash_by_name.get(record.name) != sequence_hash: - duplicate_strains.add(record.name) - - # If the current strain has been seen before, don't write - # out its sequence again. - continue - - sequence_hash_by_name[record.name] = sequence_hash - write_sequences(record, output_handle) - - if len(duplicate_strains) > 0: - error_mode = "ERROR" - exit_code = 1 - - if args.warn_about_duplicates: - error_mode = "WARNING" - exit_code = 0 - - print( - f"{error_mode}: Detected the following duplicate input strains with different sequences:", - file=sys.stderr - ) - for strain in duplicate_strains: - print(textwrap.indent(strain, " "), file=sys.stderr) - - if not args.warn_about_duplicates: - print( - "Use the `--warn-about-duplicates` flag, if you prefer to accept", - "the first occurrence of duplicate sequences based on the order", - "of the given input sequences.", - file=sys.stderr - ) - - sys.exit(exit_code) diff --git a/scripts/sanitize_metadata.py b/scripts/sanitize_metadata.py index 0217e5b83..15c7891f3 100644 --- a/scripts/sanitize_metadata.py +++ b/scripts/sanitize_metadata.py @@ -1,25 +1,257 @@ import argparse -from augur.utils import read_metadata +from pathlib import Path import pandas as pd import re +import sys + +from utils import extract_tar_file_contents + +# Define all possible geographic scales we could expect in the GISAID location +# field. +LOCATION_FIELDS = ( + "region", + "country", + "division", + "location", +) + +# Define possible strain name fields. +STRAIN_FIELDS = ( + "strain", + "name", + "Virus name", +) + +ACCESSION_FIELDS = ( + "gisaid_epi_isl", + "genbank_accession", +) + + +def parse_location_string(location_string, location_fields): + """Parse location string from GISAID into the given separate geographic scales + and return a dictionary of parse values by scale. + + Parameters + ---------- + location_string : str + location_fields : list + + Returns + ------- + dict : + dictionary of geographic fields parsed from the given string + + >>> location_fields = ["region", "country", "division", "location"] + >>> parse_location_string("Asia / Japan", location_fields) + {'region': 'Asia', 'country': 'Japan', 'division': '?', 'location': '?'} + + >>> parse_location_string("Europe / Iceland / Reykjavik", location_fields) + {'region': 'Europe', 'country': 'Iceland', 'division': 'Reykjavik', 'location': '?'} + + >>> parse_location_string("North America / USA / Washington / King County", location_fields) + {'region': 'North America', 'country': 'USA', 'division': 'Washington', 'location': 'King County'} + + Additional location entries beyond what has been specified should be stripped from output. + + >>> parse_location_string("North America / USA / Washington / King County / Extra field", location_fields) + {'region': 'North America', 'country': 'USA', 'division': 'Washington', 'location': 'King County'} + + Trailing location delimiters should be stripped from the output. + + >>> parse_location_string("North America / USA / Washington / King County / ", location_fields) + {'region': 'North America', 'country': 'USA', 'division': 'Washington', 'location': 'King County'} + + Handle inconsistently delimited strings. + + >>> parse_location_string("North America/USA/New York/New York", location_fields) + {'region': 'North America', 'country': 'USA', 'division': 'New York', 'location': 'New York'} + >>> parse_location_string("Europe/ Lithuania", location_fields) + {'region': 'Europe', 'country': 'Lithuania', 'division': '?', 'location': '?'} + + """ + # Try to extract values for specific geographic scales. + values = re.split(r"[ ]*/[ ]*", location_string) + + # Create a default mapping of location fields to missing values and update + # these from the values in the location string. + locations = {field: "?" for field in location_fields} + locations.update(dict(zip(location_fields, values))) + + return locations + + +def resolve_duplicates(metadata, strain_field, error_on_duplicates=False): + """Resolve duplicate records for a given strain field and return a deduplicated + data frame. This approach chooses the record with the most recent database + accession, if accession fields exist, or the first record for a given strain + name. Optionally, raises an error when duplicates are detected, reporting + the list of those duplicate records. + + Parameters + ---------- + metadata : pandas.DataFrame + Metadata table that may or may not have duplicate records by the given strain field. + + strain_field : string + Name of the metadata field corresponding to the strain name and which is used to identify duplicates. + + error_on_duplicates : boolean + Whether to throw an error on detection of duplicates or not. + + Returns + ------- + pandas.DataFrame : + Metadata with no duplicate records by the given strain field. + + Raises + ------ + ValueError + If duplicates are detected and `error_on_duplicates` is `True`. + """ + # Create a list of duplicate records by strain name. + duplicates = metadata.loc[ + metadata.duplicated(strain_field), + strain_field + ].values + + if len(duplicates) == 0: + # No duplicates, so return the original metadata. + return metadata + + if error_on_duplicates: + # Duplicates and error requested on duplicates, so throw an exception + # with the list of duplicate strains. + raise ValueError(", ".join(duplicates)) + + # Try to resolve the duplicates by preferring records with the most recent + # database accession. First, check for standard accession fields. + accession_fields = [ + field + for field in ACCESSION_FIELDS + if field in metadata.columns + ] + + # If any of these fields exists, sort by strain name and accessions in + # ascending order and take the last record (the most recent accession for a + # given strain). Otherwise, sort and group by strain name and take the last + # record from each group. It is possible for metadata to contain fields for + # multiple database accessions and for these fields to be incomplete for + # some databases (for example, GISAID accession is `?` but GenBank accession + # is not). By sorting across all fields, we use information from all + # available accession fields. If all fields contain missing values (e.g., + # "?"), we end up returning the last record for a given strain as a + # reasonable default. + sort_fields = [strain_field] + if len(accession_fields) > 0: + sort_fields.extend(accession_fields) + + # Return the last record from each group after sorting by strain and + # available accessions. + return metadata.sort_values(sort_fields).drop_duplicates(strain_field, "last") if __name__ == '__main__': - parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser = argparse.ArgumentParser( + usage="Sanitize metadata from different sources, applying operations in the same order they appear in the full help (-h).", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) parser.add_argument("--metadata", required=True, help="metadata to be sanitized") + parser.add_argument("--parse-location-field", help="split the given GISAID location field on '/' and create new columns for region, country, etc. based on available data. Replaces missing geographic data with '?' values.") + parser.add_argument("--rename-fields", nargs="+", help="rename specific fields from the string on the left of the equal sign to the string on the right (e.g., 'Virus name=strain')") parser.add_argument("--strip-prefixes", nargs="+", help="prefixes to strip from strain names in the metadata") + parser.add_argument("--error-on-duplicate-strains", action="store_true", help="exit with an error if any duplicate strains are found. By default, duplicates are resolved by preferring most recent accession id or the last record.") parser.add_argument("--output", required=True, help="sanitized metadata") args = parser.parse_args() - metadata, columns = read_metadata(args.metadata) - metadata = pd.DataFrame.from_dict(metadata, orient="index") + # If the input is a tarball, try to find a metadata file inside the archive. + metadata_file = args.metadata + tar_handle = None + if ".tar" in Path(args.metadata).suffixes: + try: + metadata_file, tar_handle = extract_tar_file_contents( + args.metadata, + "metadata" + ) + except FileNotFoundError as error: + print(f"ERROR: {error}", file=sys.stderr) + sys.exit(1) + + # Read metadata with pandas because Augur's read_metadata utility does not + # support metadata without a "strain" or "name" field. + metadata = pd.read_csv( + metadata_file, + sep=None, + engine="python", + skipinitialspace=True, + dtype={ + "strain": "string", + "name": "string", + } + ).fillna("") + + # Close tarball after reading metadata if it is open still. + if tar_handle is not None: + tar_handle.close() + + if args.parse_location_field and args.parse_location_field in metadata.columns: + # Parse GISAID location field into separate fields for geographic + # scales. Replace missing field values with "?". + locations = pd.DataFrame( + ( + parse_location_string(location, LOCATION_FIELDS) + for location in metadata[args.parse_location_field].values + ) + ) + + # Combine new location columns with original metadata and drop the + # original location column. + metadata = pd.concat( + [ + metadata, + locations + ], + axis=1 + ).drop(columns=[args.parse_location_field]) + + new_column_names = {} + if args.rename_fields: + # Rename specific columns using rules like "Virus name=strain". + for rule in args.rename_fields: + if "=" in rule: + old_column, new_column = rule.split("=") + new_column_names[old_column] = new_column + else: + print( + f"WARNING: missing mapping of old to new column in form of 'Virus name=strain' for rule: '{rule}'.", + file=sys.stderr + ) + + # Rename columns as needed. + if len(new_column_names) > 0: + metadata = metadata.rename(columns=new_column_names) + + # Determine field for strain name. + strain_field = None + for field in STRAIN_FIELDS: + if field in metadata.columns: + strain_field = field + break + + if strain_field is None: + print( + f"ERROR: None of the available columns match possible strain name fields ({', '.join(STRAIN_FIELDS)}).", + f"Available columns are: {metadata.columns.values}", + file=sys.stderr + ) + sys.exit(1) if args.strip_prefixes: prefixes = "|".join(args.strip_prefixes) pattern = f"^({prefixes})" - metadata["strain"] = metadata["strain"].apply( + metadata[strain_field] = metadata[strain_field].apply( lambda strain: re.sub( pattern, "", @@ -27,6 +259,21 @@ ) ) + # Remove whitespaces from strain names since they are not allowed in FASTA + # record names. + metadata[strain_field] = metadata[strain_field].str.replace(" ", "") + + # Check for duplicates and try to resolve these by default. + try: + metadata = resolve_duplicates( + metadata, + strain_field, + error_on_duplicates=args.error_on_duplicate_strains + ) + except ValueError as e: + print(f"ERROR: The following strains have duplicate records: {e}", file=sys.stderr) + sys.exit(1) + metadata.to_csv( args.output, sep="\t", diff --git a/scripts/sanitize_sequences.py b/scripts/sanitize_sequences.py index 21f66d554..c94bc9c2b 100644 --- a/scripts/sanitize_sequences.py +++ b/scripts/sanitize_sequences.py @@ -1,25 +1,152 @@ import argparse from augur.io import open_file, read_sequences, write_sequences +import hashlib +from pathlib import Path import re +import sys + +from utils import extract_tar_file_contents + + +class DuplicateSequenceError(ValueError): + pass + + +def rename_sequences(sequences, pattern): + """Rename the given sequences' ids by replacing the given patterns with the + empty string. + + """ + for sequence in sequences: + # Replace the given patterns in the sequence description with the empty + # string. For a simple FASTA record with only an identifier in the + # defline, the description is identical to the `id` and `name` fields. + # For a complex FASTA record that has spaces in the identifier or other + # additional information, we need to parse the description to get any + # trailing components of the strain name. + sequence.id = re.sub(pattern, "", sequence.description) + + # The name field stores the same information for a simple FASTA input, so we need to override its value, too. + sequence.name = sequence.id + + # Do not keep additional information that follows the sequence identifier. + sequence.description = "" + + yield sequence + + +def drop_duplicate_sequences(sequences, error_on_duplicates=False): + """Identify and drop duplicate sequences from the given iterator. + + Parameters + ---------- + sequences : Iterator + + Yields + ------ + Bio.SeqIO.Seq : + Unique sequence records + + Raises + ------ + DuplicateSequenceError : + If `error_on_duplicates` is True and any duplicate records are found, + raises an exception with a comma-delimited list of duplicates as the + message. + + """ + sequence_hash_by_name = {} + duplicate_strains = set() + + for sequence in sequences: + # Hash each sequence and check whether another sequence with the same + # name already exists and if the hash is different. + sequence_hash = hashlib.sha256(str(sequence.seq).encode("utf-8")).hexdigest() + if sequence.name in sequence_hash_by_name: + # If the hashes differ (multiple entries with the same strain name + # but different sequences), we keep the first sequence and add the + # strain to a list of duplicates to report at the end. + if sequence_hash_by_name.get(sequence.name) != sequence_hash: + duplicate_strains.add(sequence.name) + + # If the current strain has been seen before, don't write + # out its sequence again. + continue + + sequence_hash_by_name[sequence.name] = sequence_hash + yield sequence + + # Report names of duplicate strains with different sequences when requested. + if len(duplicate_strains) > 0 and error_on_duplicates: + raise DuplicateSequenceError(", ".join(duplicate_strains)) if __name__ == '__main__': parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--sequences", nargs="+", required=True, help="sequences to be sanitized") parser.add_argument("--strip-prefixes", nargs="+", help="prefixes to strip from strain names in the sequences") + parser.add_argument('--error-on-duplicate-strains', action="store_true", help="exit with an error when the same strain is detected multiple times with different sequences. By default, use the first occurrence of each duplicated sequence.") parser.add_argument("--output", required=True, help="sanitized sequences") args = parser.parse_args() + sequence_files = [] + tar_handles = [] + for sequence_filename in args.sequences: + # If the input is a tarball, try to find a sequence file inside the + # archive. + if ".tar" in Path(sequence_filename).suffixes: + try: + sequence_file, tar_handle = extract_tar_file_contents( + sequence_filename, + "sequences" + ) + + # The extracted tar file is an io.BufferedReader which provides + # a binary stream. BioPython's SeqIO reader expects a text + # stream. We decode each line of the BufferedReader in a + # generator such that decoding happens on the fly per line + # downstream. For more details see: + # https://docs.python.org/3/library/tarfile.html#tarfile.TarFile.extractfile + sequence_files.append(sequence_file) + tar_handles.append(tar_handle) + except FileNotFoundError as error: + print(f"ERROR: {error}", file=sys.stderr) + sys.exit(1) + else: + sequence_files.append(sequence_filename) + + # Replace whitespace and everything following pipes with nothing. + pattern = "( )|(\|.*)" if args.strip_prefixes: prefixes = "|".join(args.strip_prefixes) - pattern = f"^({prefixes})" - else: - pattern = "" + pattern = f"^({prefixes})|{pattern}" with open_file(args.output, "w") as output_handle: # In order to prefer the latter files, we have to reverse the order of # the files. - for sequence in read_sequences(*reversed(args.sequences)): - sequence.id = re.sub(pattern, "", sequence.id) - write_sequences(sequence, output_handle) + sequences = read_sequences(*reversed(sequence_files)) + renamed_sequences = rename_sequences(sequences, pattern) + deduplicated_sequences = drop_duplicate_sequences( + renamed_sequences, + args.error_on_duplicate_strains + ) + + try: + for sequence in deduplicated_sequences: + write_sequences(sequence, output_handle) + except DuplicateSequenceError as error: + print( + f"ERROR: The following strains have duplicate sequences: {error}", + file=sys.stderr + ) + sys.exit(1) + + # Clean up any open sequence files. + for sequence_file in sequence_files: + if hasattr(sequence_file, "close"): + sequence_file.close() + + # Clean up any open tarballs. + for tar_handle in tar_handles: + tar_handle.close() diff --git a/scripts/utils.py b/scripts/utils.py new file mode 100644 index 000000000..d4f7cf48f --- /dev/null +++ b/scripts/utils.py @@ -0,0 +1,43 @@ +import lzma +from pathlib import Path +import tarfile + + +EXTENSION_BY_FILETYPE = { + "metadata": ".tsv", + "sequences": ".fasta", +} + + +def extract_tar_file_contents(filename, filetype): + """Try to extract the contents of a given file type (e.g., metadata or + sequences) from the given tar filename. + + """ + extension = EXTENSION_BY_FILETYPE[filetype] + + tar = tarfile.open(filename) + internal_file = None + for member in tar.getmembers(): + suffixes = Path(member.name).suffixes + + if extension in suffixes: + # By default, return the binary stream for the member file. + internal_file = tar.extractfile(member.name) + + if ".xz" in suffixes: + # Check for LZMA-compressed data and open these with the + # corresponding library. + internal_file = lzma.open(internal_file, "rt") + elif extension == ".fasta": + # For sequence data, handle decoding of the binary stream prior + # to passing the data back to the caller. + internal_file = (line.decode("utf-8") for line in internal_file) + + break + + if internal_file is None: + tar.close() + raise FileNotFoundError(f"Could not find a {filetype} file in '{filename}'") + + return internal_file, tar diff --git a/workflow/snakemake_rules/main_workflow.smk b/workflow/snakemake_rules/main_workflow.smk index d1f9e628a..0b4a05ccc 100644 --- a/workflow/snakemake_rules/main_workflow.smk +++ b/workflow/snakemake_rules/main_workflow.smk @@ -10,13 +10,17 @@ rule sanitize_metadata: log: "logs/sanitize_metadata_{origin}.txt" params: + parse_location_field=f"--parse-location-field {config['sanitize_metadata']['parse_location_field']}" if config["sanitize_metadata"].get("parse_location_field") else "", + rename_fields=config["sanitize_metadata"]["rename_fields"], strain_prefixes=config["strip_strain_prefixes"], shell: """ python3 scripts/sanitize_metadata.py \ --metadata {input.metadata} \ + {params.parse_location_field} \ + --rename-fields {params.rename_fields:q} \ --strip-prefixes {params.strain_prefixes:q} \ - --output {output.metadata} + --output {output.metadata} 2>&1 | tee {log} """ @@ -345,19 +349,16 @@ rule combine_sequences_for_subsampling: "benchmarks/combine_sequences_for_subsampling.txt" conda: config["conda_environment"] params: - warn_about_duplicates="--warn-about-duplicates" if config.get("combine_sequences_for_subsampling", {}).get("warn_about_duplicates") else "", + error_on_duplicate_strains="--error-on-duplicate-strains" if not config.get("combine_sequences_for_subsampling", {}).get("warn_about_duplicates") else "", strain_prefixes=config["strip_strain_prefixes"], shell: """ python3 scripts/sanitize_sequences.py \ --sequences {input} \ --strip-prefixes {params.strain_prefixes:q} \ + {params.error_on_duplicate_strains} \ --output /dev/stdout \ - | python3 scripts/combine-and-dedup-fastas.py \ - --input /dev/stdin \ - {params.warn_about_duplicates} \ - --output /dev/stdout \ - | xz -2 > {output} + | xz -c -2 > {output} """ rule index_sequences: @@ -638,7 +639,7 @@ rule adjust_metadata_regions: input: metadata = _get_unified_metadata output: - metadata = "results/{build_name}/metadata_adjusted.tsv" + metadata = "results/{build_name}/metadata_adjusted.tsv.xz" params: region = lambda wildcards: config["builds"][wildcards.build_name]["region"] log: