Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: v-pipe to silo transformer script #24

Merged
merged 20 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,6 @@ dmypy.json

# poetry
poetry.lock

# Output folder
output
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ In particular, it's good to install it and become familiar with its basic functi
conda env create -f environment.yml
conda activate sr2silo
```
```


2. Set up the environment with development tools:
```bash
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ packages = [{ include = "sr2silo", from = "src" }]
[tool.poetry.dependencies]
python = "^3.10"
pysam = "^0.22.1"
click = "^8.0.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7.2.1"
Expand Down Expand Up @@ -37,7 +38,7 @@ ignore-private = true
ignore-property-decorators = false
ignore-module = false
fail-under = 95
exclude = ["setup.py", "docs", "build", "scripts"]
exclude = ["setup.py", "docs", "build", "scripts/dgicev"]
ignore-regex = ["^get$", "^mock_.*", ".*BaseClass.*"]
verbose = 2
quiet = false
Expand Down
58 changes: 58 additions & 0 deletions resources/reference_genomes.json

Large diffs are not rendered by default.

244 changes: 244 additions & 0 deletions scripts/vp_transformer.py
gordonkoehn marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
"""Converts V-PIPE's outputs of nucleotide read sequencs output to
ready to import nuclotides and amino acid sequences to the SILO database."""

from __future__ import annotations

import csv
import json
import logging
from pathlib import Path

import click

from sr2silo.convert import bam_to_sam
from sr2silo.process import pair_normalize_reads
from sr2silo.translation import translate

logging.basicConfig(level=logging.INFO)


def load_config(config_file: Path) -> dict:
"""Load configuration from a JSON file."""
return NotImplementedError
try:
with config_file.open() as f:
config = json.load(f)
logging.debug(f"Loaded config: {config}")
return config
except FileNotFoundError:
logging.error(f"Config file {config_file} not found.")
raise
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON from config file {config_file}: {e}")
raise


def sample_id_decoder(sample_id: str) -> dict:
"""Decode the sample ID into individual components.

Args:
sample_id (str): The sample ID to decode.

Returns:
dict: A dictionary containing the decoded components.
containing the following keys:
- sequencing_well_position (str : sequencing well position)
- location_code (int : code of the location)
- sampling_date (str : date of the sampling)
"""
components = sample_id.split("_")
# Assign components to meaningful variable names
well_position = components[0] # A1
location_code = components[1] # 10
sampling_date = f"{components[2]}-{components[3]}-{components[4]}" # 2024-09-30
return {
"sequencing_well_position": well_position,
"location_code": location_code,
"sampling_date": sampling_date,
}


def batch_id_decoder(batch_id: str) -> dict:
"""Decode the batch ID into individual components.

Args:
batch_id (str): The batch ID to decode.

Returns:
dict: A dictionary containing the decoded components.
containing the following keys:
- sequencing_date (str : date of the sequencing)
- flow_cell_serial_number (str : serial number of the flow cell)
"""
components = batch_id.split("_")
# Assign components to meaningful variable names
sequencing_date = (
f"{components[0][:4]}-{components[0][4:6]}-{components[0][6:]}" # 2024-10-18
)
flow_cell_serial_number = components[1] # AAG55WNM5
return {
"sequencing_date": sequencing_date,
"flow_cell_serial_number": flow_cell_serial_number,
}


def get_metadata(directory: Path, timeline: Path) -> dict:
"""
Get metadata for a given sample and batch directory.
Cross-references the directory with the timeline file to get the metadata.

Args:
directory (Path): The directory to extract metadata from.
timeline (Path): The timeline file to cross-reference the metadata.

Returns:
dict: A dictionary containing the metadata.
"""

# Extract sample and batch IDs from the directory name
# samples/{sample_id}/{batch_id}/alignments/REF_aln_trim.bam
metadata = {}
metadata["sample_id"] = directory.parent.name
metadata["batch_id"] = directory.name

# Decompose the ids into individual components
logging.info(f"Decoding sample_id: {metadata['sample_id']}")
sample_id = metadata["sample_id"]
metadata.update(sample_id_decoder(sample_id))
logging.info(f"Decoding batch_id: {metadata['batch_id']}")
batch_id = metadata["batch_id"]
metadata.update(batch_id_decoder(batch_id))

# Read the timeline file to get additional metadata
# find row with matching sample_id and batch_id
# timline has headers:
# sample_id batch_id read_length primer_protocol location_code sampling_date location_name
# get read length, primer protocol, location name
# double checl if location code and location code are the same
with timeline.open() as f:
reader = csv.reader(f, delimiter="\t")
for row in reader:
if row[0] == metadata["sample_id"] and row[1] == metadata["batch_id"]:
logging.info(
f"Enriching metadata with timeline data e.g. read_length, primer_protocol, location_name"
)
metadata["read_length"] = row[2]
metadata["primer_protocol"] = row[3]
metadata["location_name"] = row[6]
# Convert sampling_date to ISO format for comparison
timeline_sampling_date = f"{row[5][:4]}-{row[5][4:6]}-{row[5][6:]}"
if (
metadata["location_code"] != row[4]
or metadata["sampling_date"] != timeline_sampling_date
):
logging.warning(
f"Mismatch in location code or sampling date for sample_id {metadata['sample_id']} and batch_id {metadata['batch_id']}"
)
break
else:
raise ValueError(
f"No matching entry found in timeline for sample_id {metadata['sample_id']} and batch_id {metadata['batch_id']}"
)
return metadata


def process_directory(
input_dir: Path,
result_dir: Path,
nextclade_reference: str,
timeline_file: Path,
file_name: str = "REF_aln_trim.bam",
) -> None:
"""Process all files in a given directory.

Args:
input_dir (Path): The directory to process.
result_dir (Path): The directory to save the results.
nextclade_reference (str): The reference to use for nextclade.
timeline_file (Path): The timeline file to cross-reference the metadata.
file_name (str): The name of the file to process

Returns:
None (writes results to the result_dir)
"""

# check that one was given a directory and not a file and it exists
if not input_dir.is_dir():
logging.error(f"Input directory not found, is it a directory?: {input_dir}")
return

logging.info(f"Processing directory: {input_dir}")
logging.info(f"Assuming the input file is: {file_name}")

# Get Sample and Batch metadata and write to a file
metadata = get_metadata(input_dir, timeline_file)
# add nextclade reference to metadata
metadata["nextclade_reference"] = nextclade_reference
metadata_file = result_dir / "metadata.json"
result_dir.mkdir(parents=True, exist_ok=True)
with metadata_file.open("w") as f:
json.dump(metadata, f, indent=4)
logging.info(f"Metadata saved to: {metadata_file}")

# Convert BAM to SAM
logging.info(f"Converting BAM to SAM")
bam_file = input_dir / file_name
sam_data = bam_to_sam(bam_file)

# Process SAM to FASTA
logging.info(f"Processing SAM to FASTA (pair, merge, and normalize reads)")
fasta_file = result_dir / "reads.fasta"
insertions_file = result_dir / "insertions.txt"
pair_normalize_reads(sam_data, fasta_file, insertions_file)

# Translate nucleotides to amino acids
logging.info(f"Aliging and translating sequences")
translate([fasta_file], result_dir, nextclade_reference)

logging.info(f"Results saved to: {result_dir}")


# TODO: Implement the read_timeline function
def read_timeline(timeline_file: Path) -> list[Path]:
"""Read the timeline.tsv file and return a list of directory paths to process."""
return NotImplementedError
directories = []
with timeline_file.open() as f:
reader = csv.DictReader(f, delimiter="\t")
for row in reader:
# Assuming the directory path is constructed from metadata in the row
directory = Path(row["base_dir"]) / row["sub_dir"]
directories.append(directory)
return directories


# TODO: Implement the main function
@click.command()
@click.option(
"--config", default="vp_transformer_config.json", help="Path to the config file."
)
def main(config_file: Path) -> None:
"""Main function to process all subdirectories."""
return NotImplementedError
config = load_config(config_file)
base_dir = Path(config["base_dir"])
result_dir = Path(config["result_dir"])
timeline_file = Path(config["timeline_file"])

directories = read_timeline(timeline_file)
for subdir in directories:
if subdir.is_dir():
logging.debug(f"Processing directory: {subdir}")
# process_directory(subdir, result_dir)


if __name__ == "__main__":
config_file = Path("vp_transformer_config.json")
# main(config_file)

# process a directory: batch / sample
process_directory(
Path("../../../data/sr2silo/samples/A1_05_2024_10_08/20241024_2411515907"),
Path("results"),
"nextstrain/sars-cov-2/wuhan-hu-1/orfs",
)
5 changes: 5 additions & 0 deletions scripts/vp_transformer_config.json
gordonkoehn marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"base_dir": "/cluster/project/pangolin/gordon_samples",
"result_dir": "/cluster/project/pangolin/sr2silo/",
"timeline_file": "/cluster/project/pangolin/work-vp-test/variants/timeline.tsv"
}
Binary file not shown.
1 change: 1 addition & 0 deletions tests/data/samples/timeline_A1_05_2024_10_08.tsv
gordonkoehn marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
A1_05_2024_10_08 20241024_2411515907 250 v532 5 2024-10-08 Lugano (TI)
72 changes: 41 additions & 31 deletions tests/test_scripts.py
gordonkoehn marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,40 +1,50 @@
"""This is a sample python file for testing scripts from the source code."""
"""Tests for the scripts in the scripts directory."""

from __future__ import annotations

import subprocess
import sys
from pathlib import Path

import pysam
# Add the scripts directory to the Python path
scripts_dir = Path(__file__).resolve().parent.parent / "scripts"
sys.path.insert(0, str(scripts_dir))

# Import the process_directory function from vp_transformer.py
from vp_transformer import get_metadata # noqa: E402 # pyright: ignore
from vp_transformer import process_directory # noqa: E402 # pyright: ignore

gordonkoehn marked this conversation as resolved.
Show resolved Hide resolved
def bam_to_sam(bam_file):
"""Converts a BAM file to SAM format and returns it as a string.

Args:
bam_file: Path to the input BAM file.
"""
import tempfile

with tempfile.NamedTemporaryFile(delete=False) as temp_sam:
with pysam.AlignmentFile(bam_file, "rb") as in_bam, pysam.AlignmentFile(
temp_sam.name, "w", template=in_bam
) as out_sam:
for read in in_bam:
out_sam.write(read)
temp_sam.seek(0)
return temp_sam.read().decode()


def test_read_run_error_free():
"""Test the read.py script by piping SAM data to it."""
input_bam = "tests/data/REF_aln_trim_subsample.bam"
sam_data = bam_to_sam(input_bam)
def test_get_metadata():
"""Test the get_metadata function."""
metadata = get_metadata(
directory=Path("tests/data/samples/A1_05_2024_10_08/20241024_2411515907"),
timeline=Path("tests/data/samples/timeline_A1_05_2024_10_08.tsv"),
)

# Pipe the SAM data to the script as stdin
result = subprocess.run(
["python", "scripts/dgicev/read.py"],
input=sam_data,
capture_output=True,
text=True,
print(metadata)

expected_metadata = {
"sample_id": "A1_05_2024_10_08",
"batch_id": "20241024_2411515907",
"sequencing_well_position": "A1",
"location_code": "05",
"sampling_date": "2024-10-08",
"sequencing_date": "2024-10-24",
"flow_cell_serial_number": "2411515907",
"read_length": "250",
"primer_protocol": "v532",
"location_name": "Lugano (TI)",
}

assert metadata == expected_metadata


def test_process_directory():
"""Test the process_directory function."""
process_directory(
Path("tests/data/samples/A1_05_2024_10_08/20241024_2411515907"),
Path("tests/output"),
"nextstrain/sars-cov-2/wuhan-hu-1/orfs",
Path("tests/data/samples/timeline_A1_05_2024_10_08.tsv"),
)
assert result.returncode == 0
assert True
Loading
Loading