diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 012f104cf..e052db7bd 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -69,7 +69,7 @@ jobs: - name: Install dependencies from Conda uses: mamba-org/setup-micromamba@v2 with: - create-args: mafft raxml fasttree iqtree vcftools sqlite tsv-utils biopython=${{ matrix.biopython-version }} python=${{ matrix.python-version }} + create-args: mafft raxml fasttree iqtree vcftools sqlite seqkit tsv-utils biopython=${{ matrix.biopython-version }} python=${{ matrix.python-version }} condarc: | channels: - conda-forge diff --git a/augur/merge.py b/augur/merge.py index 8a66123cb..23d6c9914 100644 --- a/augur/merge.py +++ b/augur/merge.py @@ -1,8 +1,12 @@ """ -Merge two or more metadata tables into one. +Merge two or more datasets into one. -Tables must be given unique names to identify them in the output and are -merged in the order given. +Datasets can consist of metadata and/or sequence files. + +**Metadata** + +Metadata tables must be given unique names to identify them in the output and +are merged in the order given. Rows are joined by id (e.g. "strain" or "name" or other --metadata-id-columns), and ids must be unique within an input table (i.e. @@ -34,7 +38,21 @@ future. The SQLite 3 CLI, sqlite3, must be available. If it's not on PATH (or you want to use a version different from what's on PATH), set the SQLITE3 environment variable to path of the desired sqlite3 executable. + +**Sequences** + +Sequence files are merged in the order given. Naming files is optional. If +names are provided, they will be checked against any named metadata for order +and IDs. + +SeqKit is used behind the scenes to implement the merge, but, at least for now, +this should be considered an implementation detail that may change in the +future. The CLI program seqkit must be available. If it's not on PATH (or +you want to use a version different from what's on PATH), set the SEQKIT +environment variable to path of the desired seqkit executable. + """ +import argparse import os import re import subprocess @@ -43,27 +61,108 @@ from itertools import starmap from shlex import quote as shquote from shutil import which -from tempfile import mkstemp +from tempfile import mkstemp, NamedTemporaryFile from textwrap import dedent -from typing import Iterable, Tuple, TypeVar +from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple, TypeVar, Union from augur.argparse_ import ExtendOverwriteDefault, SKIP_AUTO_DEFAULT_IN_HELP from augur.errors import AugurError from augur.io.metadata import DEFAULT_DELIMITERS, DEFAULT_ID_COLUMNS, Metadata from augur.io.print import print_err, print_debug, _n +from augur.io.sequences import read_sequences from augur.utils import first_line T = TypeVar('T') +# Tuple is (table name, column name) +Columns = Dict[str, List[Tuple[str, str]]] + + +print_info = print_err + + +# Locate how to re-invoke ourselves (_this_ specific Augur). +augur = "" +if sys.executable: + augur = f"{shquote(sys.executable)} -m augur" +else: + # A bit unusual we don't know our own Python executable, but assume we + # can access ourselves as the ``augur`` command. + augur = f"augur" + + +SEQUENCE_ID_COLUMN = 'id' + + +class Database: + fd: int + """Database file descriptor""" + + path: str + """Database file path""" + + def __init__(self): + # Work with a temporary, on-disk SQLite database under a name we control so + # we can access it from multiple (serial) processes. + self.fd, self.path = mkstemp(prefix="augur-merge-", suffix=".sqlite") + os.close(self.fd) + + def run(self, *args, **kwargs): + error = False + + try: + sqlite3(self.path, *args, **kwargs) + + except SQLiteError as err: + error = True + raise AugurError(str(err)) from err + + finally: + if error: + print_info(f"WARNING: Skipped deletion of {self.path} due to error, but you may want to clean it up yourself (e.g. if it's large).") + + def cleanup(self): + os.unlink(self.path) + + +class UnnamedFile: + table_name: str + """Generated SQLite table name for this file, based on *path*.""" + + path: str + -class NamedMetadata(Metadata): +class NamedFile: name: str - """User-provided descriptive name for this metadata file.""" + """User-provided descriptive name for this file.""" table_name: str - """Generated SQLite table name for this metadata file, based on *name*.""" + """Generated SQLite table name for this file, based on *name*.""" + + path: str + + +class NamedSequenceFile(NamedFile): + def __init__(self, name: str, path: str): + self.name = name + self.path = path + self.table_name = f"sequences_{self.name}" + + def __repr__(self): + return f"" + + +class UnnamedSequenceFile(UnnamedFile): + def __init__(self, path: str): + self.path = path + self.table_name = f"sequences_{re.sub(r'[^a-zA-Z0-9]', '_', os.path.basename(self.path))}" + + def __repr__(self): + return f"" + +class NamedMetadata(Metadata, NamedFile): def __init__(self, name: str, *args, **kwargs): super().__init__(*args, **kwargs) self.name = name @@ -77,55 +176,178 @@ def register_parser(parent_subparsers): parser = parent_subparsers.add_parser("merge", help=first_line(__doc__)) input_group = parser.add_argument_group("inputs", "options related to input") - input_group.add_argument("--metadata", nargs="+", action="extend", required=True, metavar="NAME=FILE", help="Required. Metadata table names and file paths. Names are arbitrary monikers used solely for referring to the associated input file in other arguments and in output column names. Paths must be to seekable files, not unseekable streams. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP) + input_group.add_argument("--metadata", nargs="+", action="extend", metavar="NAME=FILE", help="Required. Metadata table names and file paths. Names are arbitrary monikers used solely for referring to the associated input file in other arguments and in output column names. Paths must be to seekable files, not unseekable streams. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP) + input_group.add_argument("--sequences", nargs="+", action="extend", metavar="[NAME=]FILE", help="Sequence files, optionally named for validation with named metadata. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP) input_group.add_argument("--metadata-id-columns", default=DEFAULT_ID_COLUMNS, nargs="+", action=ExtendOverwriteDefault, metavar="[TABLE=]COLUMN", help=f"Possible metadata column names containing identifiers, considered in the order given. Columns will be considered for all metadata tables by default. Table-specific column names may be given using the same names assigned in --metadata. Only one ID column will be inferred for each table. (default: {' '.join(map(shquote_humanized, DEFAULT_ID_COLUMNS))})" + SKIP_AUTO_DEFAULT_IN_HELP) input_group.add_argument("--metadata-delimiters", default=DEFAULT_DELIMITERS, nargs="+", action=ExtendOverwriteDefault, metavar="[TABLE=]CHARACTER", help=f"Possible field delimiters to use for reading metadata tables, considered in the order given. Delimiters will be considered for all metadata tables by default. Table-specific delimiters may be given using the same names assigned in --metadata. Only one delimiter will be inferred for each table. (default: {' '.join(map(shquote_humanized, DEFAULT_DELIMITERS))})" + SKIP_AUTO_DEFAULT_IN_HELP) output_group = parser.add_argument_group("outputs", "options related to output") - output_group.add_argument('--output-metadata', required=True, metavar="FILE", help="Required. Merged metadata as TSV. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP) + output_group.add_argument('--output-metadata', metavar="FILE", help="Required. Merged metadata as TSV. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP) output_group.add_argument('--source-columns', metavar="TEMPLATE", help=f"Template with which to generate names for the columns (described above) identifying the source of each row's data. Must contain a literal placeholder, {{NAME}}, which stands in for the metadata table names assigned in --metadata. (default: disabled)" + SKIP_AUTO_DEFAULT_IN_HELP) output_group.add_argument('--no-source-columns', dest="source_columns", action="store_const", const=None, help=f"Suppress generated columns (described above) identifying the source of each row's data. This is the default behaviour, but it may be made explicit or used to override a previous --source-columns." + SKIP_AUTO_DEFAULT_IN_HELP) + output_group.add_argument('--output-sequences', metavar="FILE", help="Required. Merged sequences as FASTA. Compressed files are supported." + SKIP_AUTO_DEFAULT_IN_HELP) output_group.add_argument('--quiet', action="store_true", default=False, help="Suppress informational and warning messages normally written to stderr. (default: disabled)" + SKIP_AUTO_DEFAULT_IN_HELP) return parser -def run(args): - print_info = print_err if not args.quiet else lambda *_: None +def validate_arguments(args): + # These will make more sense when sequence support is added. + if not any((args.metadata, args.sequences)): + raise AugurError("At least one input must be specified.") + if not any((args.output_metadata, args.output_sequences)): + raise AugurError("At least one output must be specified.") - # Parse --metadata arguments - if not len(args.metadata) >= 2: + if args.metadata and not len(args.metadata) >= 2: raise AugurError(f"At least two metadata inputs are required for merging.") - if unnamed := [repr(x) for x in args.metadata if "=" not in x or x.startswith("=")]: - raise AugurError(dedent(f"""\ - All metadata inputs must be assigned a name, e.g. with NAME=FILE. + if args.output_metadata and not args.metadata: + raise AugurError("--output-metadata requires --metadata.") + if args.output_sequences and not args.sequences: + raise AugurError("--output-sequences requires --sequences.") - The following {_n("input was", "inputs were", len(unnamed))} missing a name: - {indented_list(unnamed, ' ' + ' ')} - """)) +def run(args: argparse.Namespace): + global print_info + + if args.quiet: + print_info = lambda *_: None + + # Catch user errors early to avoid unnecessary computation. + validate_arguments(args) + + db = Database() + metadata: Optional[List[NamedMetadata]] = None + output_columns: Optional[Columns] = None + output_source_column: Optional[Callable[[str], str]] = None + sequences: Optional[List[Union[NamedSequenceFile, UnnamedSequenceFile]]] = None + named_sequences: Optional[List[NamedSequenceFile]] = None + + if args.metadata: + metadata = get_metadata(args.metadata, args.metadata_id_columns, args.metadata_delimiters) + + if args.sequences: + sequences = list(get_sequences(args.sequences)) + + # Perform checks on file names. + + named_sequences = [s for s in sequences if isinstance(s, NamedSequenceFile)] + + if metadata and (unnamed_sequences := [s for s in sequences if isinstance(s, UnnamedSequenceFile)]): + for x in unnamed_sequences: + print_info(f"WARNING: Sequence file {x.path!r} is unnamed. Skipping validation with metadata.") - metadata = pairs(args.metadata) + if metadata and named_sequences: + metadata_names = [m.name for m in metadata] + sequences_names = [s.name for s in named_sequences] - if duplicate_names := [repr(name) for name, count - in count_unique(name for name, _ in metadata) - if count > 1]: + if sequences_missing_metadata := sorted(set(sequences_names) - set(metadata_names)): + raise AugurError(dedent(f"""\ + Named inputs must be paired. + + The following sequence {_n("input does", "inputs do", len(sequences_missing_metadata))} not have a corresponding metadata input: + + {indented_list([repr(x.path) for x in named_sequences if x.name in sequences_missing_metadata], ' ' + ' ')} + """)) + + if metadata_missing_sequences := sorted(set(metadata_names) - set(sequences_names)): + raise AugurError(dedent(f"""\ + Named inputs must be paired. + + The following metadata {_n("input does", "inputs do", len(metadata_missing_sequences))} not have a corresponding sequence input: + + {indented_list([repr(x.path) for x in metadata if x.name in metadata_missing_sequences], ' ' + ' ')} + """)) + + if metadata_names != sequences_names: + raise AugurError(dedent(f"""\ + Named inputs must be paired with the same ordering. + + Order of inputs differs between named metadata {metadata_names!r} and named sequences {sequences_names!r}. + """)) + + + # Load data. + + if metadata: + load_metadata(db, metadata) + if sequences: + load_sequences(db, sequences) + + + # Perform checks on file contents. + + if metadata and named_sequences: + metadata_by_name = {m.name: m for m in metadata} + sequences_by_name = {s.name: s for s in named_sequences} + + for name in sorted(metadata_by_name.keys() & sequences_by_name.keys()): + m = metadata_by_name[name] + s = sequences_by_name[name] + + # FIXME: import this at the top-level, taking into account the existing sqlite3 function at that level + import sqlite3 + with sqlite3.connect(db.path) as connection: + connection.row_factory = sqlite3.Row + + metadata_ids = {x[m.id_column] for x in + connection.execute(f"""select {sqlite_quote_id(m.id_column)} + from {sqlite_quote_id(m.table_name)} + """)} + + sequence_ids = {x[SEQUENCE_ID_COLUMN] for x in + connection.execute(f"""select {sqlite_quote_id(SEQUENCE_ID_COLUMN)} + from {sqlite_quote_id(s.table_name)} + """)} + + for x in sorted(metadata_ids - sequence_ids): + print_info(f"WARNING: Sequence {x!r} in {m.path!r} is missing from {s.path!r}. Outputs may continue to be mismatched.") + for x in sorted(sequence_ids - metadata_ids): + print_info(f"WARNING: Sequence {x!r} in {s.path!r} is missing from {m.path!r}. Outputs may continue to be mismatched.") + + + # Write outputs. + + if args.output_metadata: + output_source_column = get_output_source_column(args.source_columns, metadata) + output_columns = get_output_columns(metadata) + merge_metadata(db, metadata, output_columns, args.output_metadata, output_source_column) + + if args.output_sequences: + merge_sequences(sequences, args.output_sequences) + + +def get_metadata( + input_metadata: Sequence[str], + input_metadata_id_columns: Sequence[str], + input_metadata_delimiters: Sequence[str], + ) -> List[NamedMetadata]: + # Validate --metadata arguments + try: + metadata = parse_inputs(input_metadata, require_names=True) + except UnnamedInputError as e: raise AugurError(dedent(f"""\ - Metadata input names must be unique. + All metadata inputs must be assigned a name, e.g. with NAME=FILE. - The following {_n("name was", "names were", len(duplicate_names))} used more than once: + The following {_n("input was", "inputs were", len(e.unnamed))} missing a name: - {indented_list(duplicate_names, ' ' + ' ')} - """)) + {indented_list([repr(x) for x in e.unnamed], ' ' + ' ')} + """)) from e + except DuplicateInputNameError as e: + raise AugurError(dedent(f"""\ + Metadata input names must be unique. + + The following {_n("name was", "names were", len(e.duplicates))} used more than once: + {indented_list([repr(x) for x in e.duplicates], ' ' + ' ')} + """)) from e # Parse --metadata-id-columns and --metadata-delimiters metadata_names = set(name for name, _ in metadata) - metadata_id_columns = pairs(args.metadata_id_columns) - metadata_delimiters = pairs(args.metadata_delimiters) + metadata_id_columns = pairs(input_metadata_id_columns) + metadata_delimiters = pairs(input_metadata_delimiters) if unknown_names := [repr(name) for name, _ in metadata_id_columns if name and name not in metadata_names]: raise AugurError(dedent(f"""\ @@ -146,50 +368,60 @@ def run(args): """)) + # Infer delimiters and id columns + return [ + NamedMetadata(name, path, [delim for name_, delim in metadata_delimiters if not name_ or name_ == name] or DEFAULT_DELIMITERS, + [column for name_, column in metadata_id_columns if not name_ or name_ == name] or DEFAULT_ID_COLUMNS) + for name, path in metadata] + + +def get_output_source_column(source_columns: str, metadata: List[NamedMetadata]): # Validate --source-columns template and convert to template function output_source_column = None - if args.source_columns is not None: - if "{NAME}" not in args.source_columns: + if source_columns is not None: + if "{NAME}" not in source_columns: raise AugurError(dedent(f"""\ The --source-columns template must contain the literal - placeholder {{NAME}} but the given value ({args.source_columns!r}) does not. + placeholder {{NAME}} but the given value ({source_columns!r}) does not. You may need to quote the whole template value to prevent your shell from interpreting the placeholder before Augur sees it. """)) - output_source_column = lambda name: args.source_columns.replace('{NAME}', name) + output_source_column = lambda name: source_columns.replace('{NAME}', name) + output_source_columns = set( + output_source_column(m.name) + for m in metadata + if output_source_column) - # Infer delimiters and id columns - metadata = [ - NamedMetadata(name, path, [delim for name_, delim in metadata_delimiters if not name_ or name_ == name] or DEFAULT_DELIMITERS, - [column for name_, column in metadata_id_columns if not name_ or name_ == name] or DEFAULT_ID_COLUMNS) - for name, path in metadata] - + if conflicting_columns := [f"{c!r} in metadata table {m.name!r}" + for m in metadata + for c in m.columns + if c in output_source_columns]: + raise AugurError(dedent(f"""\ + Generated source column names may not conflict with any column + names in metadata inputs. - # Locate how to re-invoke ourselves (_this_ specific Augur). - if sys.executable: - augur = f"{shquote(sys.executable)} -m augur" - else: - # A bit unusual we don't know our own Python executable, but assume we - # can access ourselves as the ``augur`` command. - augur = f"augur" + The given source column template ({source_columns!r}) with the + given metadata table names would conflict with the following input + {_n("column", "columns", len(conflicting_columns))}: + {indented_list(conflicting_columns, ' ' + ' ')} - # Work with a temporary, on-disk SQLite database under a name we control so - # we can access it from multiple (serial) processes. - db_fd, db_path = mkstemp(prefix="augur-merge-", suffix=".sqlite") - os.close(db_fd) + Please adjust the source column template with --source-columns + and/or adjust the metadata table names to avoid conflicts. + """)) + + return output_source_column - # Clean up database file by default - delete_db = True +def get_output_columns(metadata: List[NamedMetadata]): # Track columns as we see them, in order. The first metadata's id column # is always the first output column of the merge, so insert it now. output_id_column = metadata[0].id_column - output_columns = { output_id_column: [] } + output_columns: Columns = { output_id_column: [] } if conflicting_columns := [f"{c!r} in metadata table {m.name!r} (id column: {m.id_column!r})" for m in metadata @@ -208,125 +440,204 @@ def run(args): Renaming may be done with `augur curate rename`. """)) - output_source_columns = set( - output_source_column(m.name) - for m in metadata - if output_source_column) + for m in metadata: + # Track which columns appear in which metadata inputs, preserving + # the order of both. + for column in m.columns: + # Match different id column names in different metadata files + # since they're logically equivalent. Any non-id columns that + # match the output_id_column (i.e. first table's id column) and + # would thus overwrite it with this logic are already a fatal + # error above. + output_column = output_id_column if column == m.id_column else column + + output_columns.setdefault(output_column, []) + output_columns[output_column] += [(m.table_name, column)] + + return output_columns + + +def load_metadata( + db: Database, + metadata: List[NamedMetadata], + ): + + # Read all metadata files into a SQLite db + for m in metadata: + # All other metadata reading in Augur (i.e. via the csv module) + # uses Python's "universal newlines"¹ definition and accepts \n, + # \r\n, and \r as newlines interchangably (even mixed within the + # same file!). We accomplish the same behaviour here with SQLite's + # less flexible newline handling by relying on the universal + # newline translation of `augur read-file`. + # -trs, 24 July 2024 + # + # ¹ + newline = os.linesep - if conflicting_columns := [f"{c!r} in metadata table {m.name!r}" - for m in metadata - for c in m.columns - if c in output_source_columns]: + print_info(f"Reading {m.name!r} metadata from {m.path!r}…") + db.run( + f'.mode csv', + f'.separator {sqlite_quote_dot(m.delimiter)} {sqlite_quote_dot(newline)}', + f'.import {sqlite_quote_dot(f"|{augur} read-file {shquote(m.path)}")} {sqlite_quote_dot(m.table_name)}', + + f'create unique index {sqlite_quote_id(f"{m.table_name}_id")} on {sqlite_quote_id(m.table_name)}({sqlite_quote_id(m.id_column)});', + + # + f'pragma optimize;') + + # We're going to use Metadata.columns to generate the select + # statement, so ensure it matches what SQLite's .import created. + assert m.columns == (table_columns := sqlite3_table_columns(db.path, m.table_name)), \ + f"{m.columns!r} == {table_columns!r}" + + return metadata + + +def merge_metadata( + db: Database, + metadata: List[NamedMetadata], + output_columns: Columns, + output_metadata: str, + output_source_column: Optional[Callable[[str], str]], + ): + # Construct query to produce merged metadata. + select_list = [ + # Output metadata columns coalesced across input metadata columns + *(f"""coalesce({', '.join(f"nullif({x}, '')" for x in starmap(sqlite_quote_id, reversed(input_columns)))}, null) as {sqlite_quote_id(output_column)}""" + for output_column, input_columns in output_columns.items()), + + # Source columns. Select expressions generated here instead of + # earlier to stay adjacent to the join conditions below, upon which + # these rely. + *(f"""{sqlite_quote_id(m.table_name, m.id_column)} is not null as {sqlite_quote_id(output_source_column(m.name))}""" + for m in metadata if output_source_column)] + + from_list = [ + sqlite_quote_id(metadata[0].table_name), + *(f"full outer join {sqlite_quote_id(m.table_name)} on {sqlite_quote_id(m.table_name, m.id_column)} in ({', '.join(sqlite_quote_id(m.table_name, m.id_column) for m in reversed(preceding))})" + for m, preceding in [(m, metadata[:i]) for i, m in enumerate(metadata[1:], 1)])] + + # Take some small pains to make the query readable since it makes + # debugging and development easier. Note that backslashes aren't + # allowed inside f-string expressions, hence the *newline* variable. + newline = '\n' + query = dedent(f"""\ + select + {(',' + newline + ' ').join(select_list)} + from + {(newline + ' ').join(from_list)} + ; + """) + + + # Write merged metadata as export from SQLite db. + # + # Assume TSV like nearly all other extant --output-metadata options. + print_info(f"Merging metadata and writing to {output_metadata!r}…") + print_debug(query) + db.run( + f'.mode csv', + f'.separator "\\t" "\\n"', + f'.headers on', + f'.once {sqlite_quote_dot(f"|{augur} write-file {shquote(output_metadata)}")}', + query) + + db.cleanup() + + +def get_sequences(input_sequences: List[str]): + try: + sequences = parse_inputs(input_sequences) + except InvalidNamedInputError as e: raise AugurError(dedent(f"""\ - Generated source column names may not conflict with any column - names in metadata inputs. + Input filenames cannot start with '='. - The given source column template ({args.source_columns!r}) with the - given metadata table names would conflict with the following input - {_n("column", "columns", len(conflicting_columns))}: + The following {_n("input starts", "inputs start", len(e.invalid))} with '=': - {indented_list(conflicting_columns, ' ' + ' ')} + {indented_list([repr(x) for x in e.invalid], ' ' + ' ')} + """)) from e + except DuplicateInputNameError as e: + raise AugurError(dedent(f"""\ + Sequence input names must be unique. - Please adjust the source column template with --source-columns - and/or adjust the metadata table names to avoid conflicts. - """)) + The following {_n("name was", "names were", len(e.duplicates))} used more than once: + {indented_list([repr(x) for x in e.duplicates], ' ' + ' ')} + """)) from e - try: - # Read all metadata files into a SQLite db - for m in metadata: - # All other metadata reading in Augur (i.e. via the csv module) - # uses Python's "universal newlines"¹ definition and accepts \n, - # \r\n, and \r as newlines interchangably (even mixed within the - # same file!). We accomplish the same behaviour here with SQLite's - # less flexible newline handling by relying on the universal - # newline translation of `augur read-file`. - # -trs, 24 July 2024 - # - # ¹ - newline = os.linesep - - print_info(f"Reading {m.name!r} metadata from {m.path!r}…") - sqlite3(db_path, - f'.mode csv', - f'.separator {sqlite_quote_dot(m.delimiter)} {sqlite_quote_dot(newline)}', - f'.import {sqlite_quote_dot(f"|{augur} read-file {shquote(m.path)}")} {sqlite_quote_dot(m.table_name)}', - - f'create unique index {sqlite_quote_id(f"{m.table_name}_id")} on {sqlite_quote_id(m.table_name)}({sqlite_quote_id(m.id_column)});', - - # - f'pragma optimize;') - - # We're going to use Metadata.columns to generate the select - # statement, so ensure it matches what SQLite's .import created. - assert m.columns == (table_columns := sqlite3_table_columns(db_path, m.table_name)), \ - f"{m.columns!r} == {table_columns!r}" - - # Track which columns appear in which metadata inputs, preserving - # the order of both. - for column in m.columns: - # Match different id column names in different metadata files - # since they're logically equivalent. Any non-id columns that - # match the output_id_column (i.e. first table's id column) and - # would thus overwrite it with this logic are already a fatal - # error above. - output_column = output_id_column if column == m.id_column else column - - output_columns.setdefault(output_column, []) - output_columns[output_column] += [(m.table_name, column)] - - - # Construct query to produce merged metadata. - select_list = [ - # Output metadata columns coalesced across input metadata columns - *(f"""coalesce({', '.join(f"nullif({x}, '')" for x in starmap(sqlite_quote_id, reversed(input_columns)))}, null) as {sqlite_quote_id(output_column)}""" - for output_column, input_columns in output_columns.items()), - - # Source columns. Select expressions generated here instead of - # earlier to stay adjacent to the join conditions below, upon which - # these rely. - *(f"""{sqlite_quote_id(m.table_name, m.id_column)} is not null as {sqlite_quote_id(output_source_column(m.name))}""" - for m in metadata if output_source_column)] - - from_list = [ - sqlite_quote_id(metadata[0].table_name), - *(f"full outer join {sqlite_quote_id(m.table_name)} on {sqlite_quote_id(m.table_name, m.id_column)} in ({', '.join(sqlite_quote_id(m.table_name, m.id_column) for m in reversed(preceding))})" - for m, preceding in [(m, metadata[:i]) for i, m in enumerate(metadata[1:], 1)])] - - # Take some small pains to make the query readable since it makes - # debugging and development easier. Note that backslashes aren't - # allowed inside f-string expressions, hence the *newline* variable. - newline = '\n' - query = dedent(f"""\ - select - {(',' + newline + ' ').join(select_list)} - from - {(newline + ' ').join(from_list)} - ; - """) - - - # Write merged metadata as export from SQLite db. - # - # Assume TSV like nearly all other extant --output-metadata options. - print_info(f"Merging metadata and writing to {args.output_metadata!r}…") - print_debug(query) - sqlite3(db_path, - f'.mode csv', - f'.separator "\\t" "\\n"', - f'.headers on', - f'.once {sqlite_quote_dot(f"|{augur} write-file {shquote(args.output_metadata)}")}', - query) - - except SQLiteError as err: - delete_db = False - raise AugurError(str(err)) from err - - finally: - if delete_db: - os.unlink(db_path) + for name, path in sequences: + if name == "": + yield UnnamedSequenceFile(path) + else: + yield NamedSequenceFile(name, path) + + +def load_sequences(db: Database, sequences: List[Union[NamedSequenceFile, UnnamedSequenceFile]]): + for s in sequences: + print_info(f"Reading sequence IDs from {s.path!r}…") + ids = [seq.id for seq in read_sequences(s.path)] + + if duplicates := [item for item, count in count_unique(ids) if count > 1]: + raise AugurError(dedent(f"""\ + IDs must be unique within a sequence input file. + + The following {_n("entry is", "entries are", len(duplicates))} duplicated in {s.path!r}: + + {indented_list([repr(x) for x in duplicates], ' ' + ' ')} + """)) + + # FIXME: Skip for unnamed sequences? This is only used for named sequences to check against paired metadata. + db.run(f"create table {sqlite_quote_id(s.table_name)} ({sqlite_quote_id(SEQUENCE_ID_COLUMN)} text);") + values = ", ".join([f"('{id}')" for id in ids]) + db.run(f"insert into {sqlite_quote_id(s.table_name)} ({sqlite_quote_id(SEQUENCE_ID_COLUMN)}) values {values};") + + db.run(f'create unique index {sqlite_quote_id(f"{s.table_name}_id")} on {sqlite_quote_id(s.table_name)}({sqlite_quote_id(SEQUENCE_ID_COLUMN)});') + + +def cat(filepath: str): + cat = "cat" + + if filepath.endswith(".gz"): + cat = "gzcat" + if filepath.endswith(".xz"): + cat = "xzcat" + if filepath.endswith(".zst"): + cat = "zstdcat" + + return cat, filepath + + +def merge_sequences( + sequences: List[Union[NamedSequenceFile, UnnamedSequenceFile]], + output_sequences: str, + ): + # Confirm that seqkit is installed. + if which("seqkit") is None: + raise AugurError("'seqkit' is not installed! This is required to merge sequences.") + + with NamedTemporaryFile() as temp_file: + with open(temp_file.name, 'w') as f: + # Reversed because seqkit rmdup keeps the first entry but this command + # should keep the last entry. + for s in reversed(sequences): + print_info(f"Reading sequences from {s.path!r}…") + subprocess.Popen(cat(s.path), stdout=f) + + print_info(f"Merging sequences and writing to {output_sequences!r}…") + process = seqkit('rmdup', temp_file.name, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # FIXME: do this unbuffered + if process.stderr: + for line in process.stderr.splitlines(): + print_info(f"[SeqKit] {line}") + + # FIXME: handle `-` better + if output_sequences == "-": + sys.stdout.write(process.stdout) else: - print_info(f"WARNING: Skipped deletion of {db_path} due to error, but you may want to clean it up yourself (e.g. if it's large).") + with open(output_sequences, "w") as f: + f.write(process.stdout) def sqlite3(*args, **kwargs): @@ -399,6 +710,37 @@ def sqlite_quote_string(x): return "'" + x.replace("'", "''") + "'" +def seqkit(*args, **kwargs): + """ + Internal helper for invoking ``seqkit``, the SeqKit CLI program. + """ + seqkit = os.environ.get("SEQKIT", which("seqkit")) + + if not seqkit: + raise AugurError(dedent(f"""\ + Unable to find the program `seqkit`. Is it installed? + + In order to use `augur merge`, the SeqKit CLI must be installed + separately. It is typically provided by a Nextstrain runtime. + """)) + + argv = [seqkit, *args] + + print_debug(f"running {argv!r}") + proc = subprocess.run(argv, encoding="utf-8", text=True, **kwargs) + + try: + proc.check_returncode() + except subprocess.CalledProcessError as err: + raise SeqKitError(f"seqkit invocation failed") from err + + return proc + + +class SeqKitError(Exception): + pass + + def pairs(xs: Iterable[str]) -> Iterable[Tuple[str, str]]: """ Split an iterable of ``k=v`` strings into an iterable of ``(k,v)`` tuples. @@ -478,3 +820,55 @@ def quote(s): # …and instead quote a final empty string down here if we're still empty # after joining all our parts together. return quoted if quoted else shquote('') + + +def parse_inputs(inputs: Sequence[str], require_names: bool = False): + """ + Parse inputs into tuples of (name, file). + name is an empty string for unnamed inputs. + + If names are required, this function can raise UnnamedInputError or DuplicateInputNameError. + If names are optional, this function can raise InvalidNamedInputError or DuplicateInputNameError. + """ + # These are only used for error checking. + # The original order of inputs should still be used at the end. + invalid_named_inputs: List[str] = [] + named_inputs: List[str] = [] + unnamed_inputs: List[str] = [] + for x in inputs: + if x.startswith("="): + invalid_named_inputs.append(x) + elif "=" in x: + named_inputs.append(x) + else: + unnamed_inputs.append(x) + + if require_names: + if bad_inputs := [*invalid_named_inputs, *unnamed_inputs]: + raise UnnamedInputError(bad_inputs) + elif invalid_named_inputs: + raise InvalidNamedInputError(invalid_named_inputs) + + input_pairs = pairs(inputs) + + if duplicate_names := [name for name, count + in count_unique(name for name, _ in input_pairs) + if name != "" and count > 1]: + raise DuplicateInputNameError(duplicate_names) + + return input_pairs + + +class UnnamedInputError(Exception): + def __init__(self, unnamed: Sequence[str]): + self.unnamed = unnamed + + +class InvalidNamedInputError(Exception): + def __init__(self, invalid: Sequence[str]): + self.invalid = invalid + + +class DuplicateInputNameError(Exception): + def __init__(self, duplicates: Sequence[str]): + self.duplicates = duplicates diff --git a/docs/installation/non-python-dependencies.rst b/docs/installation/non-python-dependencies.rst index 30f520b10..f6ae621df 100644 --- a/docs/installation/non-python-dependencies.rst +++ b/docs/installation/non-python-dependencies.rst @@ -8,7 +8,10 @@ Augur uses some external bioinformatics programs that are not available on PyPI: - `RAxML `__ (optional alternative) - `FastTree `__ (optional alternative) -- ``augur merge`` requires ``sqlite3``, the `SQLite `__ CLI (version ≥3.39). +- ``augur merge`` requires: + + - ``sqlite3``, the `SQLite `__ CLI (version ≥3.39) for metadata + - ``seqkit``, the `SeqKit program `__, for sequences - Bacterial data (or any VCF usage) requires `vcftools `__ @@ -20,14 +23,14 @@ If you use Conda, you can install them in an active environment: .. code:: bash - conda install -c conda-forge -c bioconda mafft raxml fasttree iqtree vcftools sqlite --yes + conda install -c conda-forge -c bioconda mafft raxml fasttree iqtree vcftools sqlite seqkit --yes On macOS using `Homebrew `__: .. code:: bash brew tap brewsci/bio - brew install mafft iqtree raxml fasttree vcftools sqlite + brew install mafft iqtree raxml fasttree vcftools sqlite seqkit On Debian/Ubuntu: @@ -36,3 +39,5 @@ On Debian/Ubuntu: sudo apt install mafft iqtree raxml fasttree vcftools sqlite3 Other Linux distributions will likely have the same packages available, although the names may differ slightly. + +The `SeqKit download page `__ provides Linux binaries. diff --git a/tests/functional/merge/cram/merge-metadata-and-sequences.t b/tests/functional/merge/cram/merge-metadata-and-sequences.t new file mode 100644 index 000000000..3b189cd0c --- /dev/null +++ b/tests/functional/merge/cram/merge-metadata-and-sequences.t @@ -0,0 +1,168 @@ +SETUP + + $ source "$TESTDIR"/_setup.sh + + +BASIC USAGE + +Both metadata and sequences are merged with duplicate ID handling. + + $ cat >x.tsv <<~~ + > strain a b c + > one X1a X1b X1c + > two X2a X2b X2c + > ~~ + + $ cat >y.tsv <<~~ + > strain b c f e d + > two Y2c Y2f Y2e Y2d + > three Y3f Y3e Y3d + > ~~ + + $ cat >z.tsv <<~~ + > strain g c + > one Z1g + > two Z2g Z2c + > three Z3g + > ~~ + + $ cat >x.fasta <<~~ + > >one + > ATCG + > >two + > GCTA + > ~~ + + $ cat >y.fasta <<~~ + > >two + > ATCG + > >three + > GCTA + > ~~ + + $ cat >z.fasta <<~~ + > >one + > ATCG + > >two + > GCTA + > >three + > GACT + > ~~ + + $ ${AUGUR} merge \ + > --metadata X=x.tsv Y=y.tsv \ + > --sequences X=x.fasta Y=y.fasta \ + > --output-metadata merged.tsv \ + > --output-sequences merged.fasta + Reading 'X' metadata from 'x.tsv'… + Reading 'Y' metadata from 'y.tsv'… + Reading sequence IDs from 'x.fasta'… + Reading sequence IDs from 'y.fasta'… + Merging metadata and writing to 'merged.tsv'… + Reading sequences from 'y.fasta'… + Reading sequences from 'x.fasta'… + Merging sequences and writing to 'merged.fasta'… + [SeqKit] [INFO]\x1b[0m 1 duplicated records removed (esc) + + $ tsv-pretty merged.tsv + strain a b c f e d + one X1a X1b X1c + two X2a X2b Y2c Y2f Y2e Y2d + three Y3f Y3e Y3d + + $ cat merged.fasta + >two + ATCG + >three + GCTA + >one + ATCG + +Sequence names are optional. + + $ ${AUGUR} merge \ + > --metadata X=x.tsv Y=y.tsv Z=z.tsv \ + > --sequences x.fasta y.fasta \ + > --output-metadata merged.tsv \ + > --output-sequences merged.fasta \ + > --quiet + +If sequence names are provided, + +(1) they must have matching metadata inputs. + + $ ${AUGUR} merge \ + > --metadata X=x.tsv Y=y.tsv \ + > --sequences X=x.fasta Y=y.fasta Z=z.fasta \ + > --output-metadata merged.tsv \ + > --output-sequences merged.fasta \ + > --quiet + ERROR: Named inputs must be paired. + + The following sequence input does not have a corresponding metadata input: + + 'z.fasta' + + [2] + +(2) they must exhaustively cover all metadata inputs. + + $ ${AUGUR} merge \ + > --metadata X=x.tsv Y=y.tsv Z=z.tsv \ + > --sequences X=x.fasta Y=y.fasta \ + > --output-metadata merged.tsv \ + > --output-sequences merged.fasta \ + > --quiet + ERROR: Named inputs must be paired. + + The following metadata input does not have a corresponding sequence input: + + 'z.tsv' + + [2] + +(3) they must be in the same order as metadata inputs. + + $ ${AUGUR} merge \ + > --metadata X=x.tsv Y=y.tsv \ + > --sequences Y=y.fasta X=x.fasta \ + > --output-metadata merged.tsv \ + > --output-sequences merged.fasta \ + > --quiet + ERROR: Named inputs must be paired with the same ordering. + + Order of inputs differs between named metadata ['X', 'Y'] and named sequences ['Y', 'X']. + + [2] + +(4) IDs are cross-checked between paired input files, but only a warning is emitted. + + $ cat >>y.fasta <<~~ + > >four + > ATCG + > ~~ + + $ ${AUGUR} merge \ + > --metadata X=x.tsv Y=y.tsv \ + > --sequences X=x.fasta Y=y.fasta \ + > --output-metadata merged.tsv \ + > --output-sequences merged.fasta + Reading 'X' metadata from 'x.tsv'\xe2\x80\xa6 (esc) + Reading 'Y' metadata from 'y.tsv'\xe2\x80\xa6 (esc) + Reading sequence IDs from 'x.fasta'\xe2\x80\xa6 (esc) + Reading sequence IDs from 'y.fasta'\xe2\x80\xa6 (esc) + WARNING: Sequence 'four' in 'y.fasta' is missing from 'y.tsv'. Outputs may continue to be mismatched. + Merging metadata and writing to 'merged.tsv'\xe2\x80\xa6 (esc) + Reading sequences from 'y.fasta'\xe2\x80\xa6 (esc) + Reading sequences from 'x.fasta'\xe2\x80\xa6 (esc) + Merging sequences and writing to 'merged.fasta'\xe2\x80\xa6 (esc) + [SeqKit] [INFO]\x1b[0m 1 duplicated records removed (esc) + +(5) Unnamed files can still be present anywhere in the list. + + $ ${AUGUR} merge \ + > --metadata X=x.tsv Y=y.tsv \ + > --sequences X=x.fasta z.fasta Y=y.fasta \ + > --output-metadata merged.tsv \ + > --output-sequences merged.fasta \ + > --quiet diff --git a/tests/functional/merge/cram/merge.t b/tests/functional/merge/cram/merge-metadata.t similarity index 100% rename from tests/functional/merge/cram/merge.t rename to tests/functional/merge/cram/merge-metadata.t index 87b29ad79..c2270f235 100644 --- a/tests/functional/merge/cram/merge.t +++ b/tests/functional/merge/cram/merge-metadata.t @@ -246,8 +246,8 @@ Metadata names are required. The following inputs were missing a name: - 'x.tsv' '=y.tsv' + 'x.tsv' [2] diff --git a/tests/functional/merge/cram/merge-sequences.t b/tests/functional/merge/cram/merge-sequences.t new file mode 100644 index 000000000..efad50115 --- /dev/null +++ b/tests/functional/merge/cram/merge-sequences.t @@ -0,0 +1,60 @@ +SETUP + + $ source "$TESTDIR"/_setup.sh + +BASIC USAGE + +Sequence inputs are merged with duplicate ID handling. + + $ cat >x.fasta <<~~ + > >one + > ATCG + > >two + > GCTA + > ~~ + + $ cat >y.fasta <<~~ + > >two + > ATCG + > >three + > GCTA + > ~~ + + $ ${AUGUR} merge \ + > --sequences x.fasta y.fasta \ + > --output-sequences - > merged.fasta + Reading sequence IDs from 'x.fasta'… + Reading sequence IDs from 'y.fasta'… + Reading sequences from 'y.fasta'… + Reading sequences from 'x.fasta'… + Merging sequences and writing to '-'… + [SeqKit] [INFO]\x1b[0m 1 duplicated records removed (esc) + +seq3 is in both x and y. It is taken from the latter. + + $ cat merged.fasta + >two + ATCG + >three + GCTA + >one + ATCG + +Duplicates are not allowed within individual sequence inputs. + + $ cat >>y.fasta <<~~ + > >three + > ATCG + > ~~ + + $ ${AUGUR} merge \ + > --sequences x.fasta y.fasta \ + > --output-sequences - > merged.fasta \ + > --quiet + ERROR: IDs must be unique within a sequence input file. + + The following entry is duplicated in 'y.fasta': + + 'three' + + [2]