Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: moving V-pipe specifics into module #69

Merged
merged 10 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,6 @@ poetry.lock

# Output folder
output

# Secret files
secrets
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,15 @@ To upload the processed outputs S3 storage is required.

For sensitive information like AWS credentials, use Docker secrets. Create the following files in the secrets directory:

- secrets/aws_access_key_id.txt:
```YourAWSAccessKeyId````
- `secrets/aws_access_key_id.txt`:

- secrets/aws_secret_access_key.txt:
```YourAWSSecretAccessKey````
```YourAWSAccessKeyId```

- secrets/aws_default_region.txt:
- `secrets/aws_secret_access_key.txt`:

```YourAWSSecretAccessKey```

- `secrets/aws_default_region.txt`:
```YourAWSRegion```

#### Run Transformation
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ mkdocstrings = { extras = ["python"], version = "^0.21.2" }
mkdocs-gen-files = "^0.4.0"
mkdocs-literate-nav = "^0.6.0"
setuptools = "^74.1.2"
moto = {extras = ["all", "ec2", "s3"], version = "^5.0.22"}
moto = { extras = ["all", "ec2", "s3"], version = "^5.0.22" }

[tool.coverage.report]
fail_under = 85.0
Expand Down Expand Up @@ -71,3 +71,6 @@ exclude = '''
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.isort]
profile = "black"
163 changes: 4 additions & 159 deletions scripts/vp_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from sr2silo.process import pair_normalize_reads
from sr2silo.s3 import compress_bz2, upload_file_to_s3
from sr2silo.translation import translate
from sr2silo.vpipe import Sample

logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s"
Expand All @@ -37,164 +38,6 @@ def load_config(config_file: Path) -> dict:
raise


def sample_id_decoder(sample_id: str) -> dict:
"""Decode the sample ID into individual components.

Args:
sample_id (str): The sample ID to decode.

Returns:
dict: A dictionary containing the decoded components.
containing the following keys:
- sequencing_well_position (str : sequencing well position)
- location_code (int : code of the location)
- sampling_date (str : date of the sampling)
"""
components = sample_id.split("_")
# Assign components to meaningful variable names
well_position = components[0] # A1
location_code = components[1] # 10
sampling_date = f"{components[2]}-{components[3]}-{components[4]}" # 2024-09-30
return {
"sequencing_well_position": well_position,
"location_code": location_code,
"sampling_date": sampling_date,
}


def batch_id_decoder(batch_id: str) -> dict:
"""Decode the batch ID into individual components.

Args:
batch_id (str): The batch ID to decode.

Returns:
dict: A dictionary containing the decoded components.
containing the following keys:
- sequencing_date (str : date of the sequencing)
- flow_cell_serial_number (str : serial number of the flow cell)
"""
components = batch_id.split("_")
# Assign components to meaningful variable names
sequencing_date = (
f"{components[0][:4]}-{components[0][4:6]}-{components[0][6:]}" # 2024-10-18
)
flow_cell_serial_number = components[1] # AAG55WNM5
return {
"sequencing_date": sequencing_date,
"flow_cell_serial_number": flow_cell_serial_number,
}


def convert_to_iso_date(date: str) -> str:
"""Convert a date string to ISO 8601 format (date only)."""
# Parse the date string
date_obj = datetime.datetime.strptime(date, "%Y-%m-%d")
# Format the date as ISO 8601 (date only)
return date_obj.date().isoformat()


def get_metadata(sample_id: str, batch_id: str, timeline: Path, primers: Path) -> dict:
"""
Get metadata for a given sample and batch directory.
Cross-references the directory with the timeline file to get the metadata.

Args:
sample_id (str): The sample ID to use for metadata.
batch_id (str): The batch ID to use for metadata.
timeline (Path): The timeline file to cross-reference the metadata.
primers (Path): The primers file to cross-reference the metadata.

Returns:
dict: A dictionary containing the metadata.

"""

metadata = {}
metadata["sample_id"] = sample_id
metadata["batch_id"] = batch_id

# Decompose the ids into individual components
logging.info(f"Decoding sample_id: {metadata['sample_id']}")
sample_id = metadata["sample_id"]
metadata.update(sample_id_decoder(sample_id))
logging.info(f"Decoding batch_id: {metadata['batch_id']}")
batch_id = metadata["batch_id"]
metadata.update(batch_id_decoder(batch_id))

# Read the timeline file to get additional metadata
# find row with matching sample_id and batch_id
# timline has headers:
# sample_id batch_id read_length primer_protocol location_code sampling_date location_name
# get read length, primer protocol, location name
# double check if location code and location code are the same
if not timeline.is_file():
logging.error(f"Timeline file not found or is not a file: {timeline}")
raise FileNotFoundError(f"Timeline file not found or is not a file: {timeline}")
with timeline.open() as f:
reader = csv.reader(f, delimiter="\t")
for row in reader:
if row[0] == metadata["sample_id"] and row[1] == metadata["batch_id"]:
logging.info(
f"Enriching metadata with timeline data e.g. read_length, primer_protocol, location_name"
)
metadata["read_length"] = row[2]
metadata["primer_protocol"] = row[3]
metadata["location_name"] = row[6]
# Convert sampling_date to ISO format for comparison
timeline_sampling_date = convert_to_iso_date(row[5])
if int(metadata["location_code"]) != int(row[4]):
# output both location codes for comparison and their types for debugging
logging.warning(
f"Mismatch in location code for sample_id {metadata['sample_id']} and batch_id {metadata['batch_id']}"
)
logging.debug(
f"Location code mismatch: {metadata['location_code']} (sample_id) vs {row[4]} (timeline)"
)
logging.debug(
f"Location code types: {type(metadata['location_code'])} (sample_id) vs {type(row[4])} (timeline)"
)
if metadata["sampling_date"] != timeline_sampling_date:
# output both sampling dates for comparison and their types for debugging
logging.warning(
f"Mismatch in sampling date for sample_id {metadata['sample_id']} and batch_id {metadata['batch_id']}"
)
logging.debug(
f"Sampling date mismatch: {metadata['sampling_date']} (sample_id) vs {timeline_sampling_date} (timeline)"
)
logging.debug(
f"Sampling date types: {type(metadata['sampling_date'])} (sample_id) vs {type(timeline_sampling_date)} (timeline)"
)
break
else:
raise ValueError(
f"No matching entry found in timeline for sample_id {metadata['sample_id']} and batch_id {metadata['batch_id']}"
)
# Read the primers yaml to get additional metadata
# find the key with matching primer_protocol and get the "name" value
# as the canonical name of the primer protocol
if not primers.is_file():
logging.error(f"Primers file not found or is not a file: {primers}")
raise FileNotFoundError(f"Primers file not found or is not a file: {primers}")
# Load YAML file
with open(primers, "r") as file:
primers_conf = yaml.safe_load(file)
logging.debug(f"Primers: {primers_conf}")
logging.debug(f" Type of primers: {type(primers_conf)}")
for primer in primers_conf.keys():
if primer == metadata["primer_protocol"]:
logging.info(
f"Enriching metadata with primer data e.g. primer_protocol_name"
)
metadata["primer_protocol_name"] = primers_conf[primer]["name"]
break
else:
raise ValueError(
f"No matching entry found in primers for primer_protocol {metadata['primer_protocol']}"
)
return metadata


def wrangle_for_transformer(
input_dir: Path,
output_dir: Path,
Expand Down Expand Up @@ -485,7 +328,9 @@ def process_directory(
raise FileNotFoundError(f"Input file not found: {sample_fp}")

##### Get Sample and Batch metadata and write to a file #####
metadata = get_metadata(sample_id, batch_id, timeline_file, primers_file)
sample_to_process = Sample(sample_id, batch_id)
sample_to_process.enrich_metadata(timeline_file, primers_file)
metadata = sample_to_process.get_metadata()
# add nextclade reference to metadata
metadata["nextclade_reference"] = nextclade_reference
metadata_file = result_dir / "metadata.json"
Expand Down
4 changes: 4 additions & 0 deletions src/sr2silo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@
"""sr2silo connects pairs, normalizes reads, and converts BAM to SAM files."""
from __future__ import annotations

import sr2silo.vpipe as vpipe

__version__ = "0.0.2"

__all__ = ["vpipe"]
10 changes: 10 additions & 0 deletions src/sr2silo/vpipe/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Implements V-Pipe specific utilities.

i.e. extracting metadata from V-Pipe Filenaming Conventions.
"""

from __future__ import annotations

from sr2silo.vpipe.sample import Sample

__all__ = ["Sample"]
Loading
Loading