Skip to content

Commit

Permalink
shm and se changes (#116)
Browse files Browse the repository at this point in the history
* shm and se changes

* install bioconda::fastq-pair

* package_data

* bin -> scripts

* flake8

* new nuqc_job.sh

* rm test

* Alter test to reflect changes in demux.

* update nuqc_job.sh

* add module load

* updates based on qiita-rc testing

---------

Co-authored-by: Charles Cowart <[email protected]>
  • Loading branch information
antgonza and charles-cowart authored Jan 6, 2024
1 parent 006095b commit 1fb3e71
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 273 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ jobs:
- name: Main install
shell: bash -l {0}
run: |
conda install bioconda::fastq-pair
pip install -e ".[all]"
- name: Python linter
shell: bash -l {0}
run: flake8 sequence_processing_pipeline/ setup.py

- name: Run tests and measure coverage
shell: bash -l {0}
run: |
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ git clone https://github.com/biocore/mg-scripts.git
Create a Python3 Conda environment in which to run the notebook:

```bash
conda create -n sp_pipeline 'python==3.9' numpy pandas click scipy matplotlib
conda create -n sp_pipeline 'python==3.9' numpy pandas click scipy matplotlib fastq-pair
```

Activate the Conda environment:
Expand Down
70 changes: 32 additions & 38 deletions sequence_processing_pipeline/Commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import glob
import pgzip
import gzip
import os
from sequence_processing_pipeline.util import iter_paired_files

Expand Down Expand Up @@ -57,50 +57,41 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
return split_offset


def demux_cmd(id_map_fp, fp_fp, out_d, encoded, threads):
def demux_cmd(id_map_fp, fp_fp, out_d, task, maxtask):
with open(id_map_fp, 'r') as f:
id_map = f.readlines()
id_map = [line.rstrip() for line in id_map]
id_map = [line.strip().split('\t') for line in id_map]

# fp needs to be an open file handle.
# ensure task and maxtask are proper ints when coming from cmd-line.
with open(fp_fp, 'r') as fp:
demux(id_map, fp, out_d, encoded, threads)
demux(id_map, fp, out_d, int(task), int(maxtask))


def demux(id_map, fp, out_d, encoded, threads):
def demux(id_map, fp, out_d, task, maxtask):
"""Split infile data based in provided map"""
delimiter = '::MUX::'
mode = 'wt'
ext = '.fastq.gz'
sep = '/'
rec = '@'

# load mapping information
fpmap = {}
for tmp in id_map:
idx, r1, r2, outbase = tmp.strip().split('\t')
fpmap[rec + idx] = (r1, r2, outbase)
openfps = {}

# this is our encoded file, and the one we care about
idx = rec + encoded
fname_r1, fname_r2, outbase = fpmap[idx]
for offset, (idx, r1, r2, outbase) in enumerate(id_map):
if offset % maxtask == task:
idx = rec + idx

# setup output locations
outdir = out_d + sep + outbase
# setup output locations
outdir = out_d + sep + outbase
fullname_r1 = outdir + sep + r1 + ext
fullname_r2 = outdir + sep + r2 + ext

fullname_r1 = outdir + sep + fname_r1 + '.fastq.gz'
fullname_r2 = outdir + sep + fname_r2 + '.fastq.gz'

os.makedirs(outdir, exist_ok=True)
current_fp_r1 = pgzip.open(fullname_r1, mode, thread=threads,
blocksize=2 * 10 ** 8)
current_fp_r2 = pgzip.open(fullname_r2, mode, thread=threads,
blocksize=2 * 10 ** 8)

current_fp = (current_fp_r1, current_fp_r2)

# we assume R1 comes first... which is safe if data are coming
# from samtools
orientation_offset = 0
os.makedirs(outdir, exist_ok=True)
current_fp_r1 = gzip.open(fullname_r1, mode)
current_fp_r2 = gzip.open(fullname_r2, mode)
current_fp = {'1': current_fp_r1, '2': current_fp_r2}
openfps[idx] = current_fp

# setup a parser
id_ = iter(fp)
Expand All @@ -111,14 +102,17 @@ def demux(id_map, fp, out_d, encoded, threads):
for i, s, d, q in zip(id_, seq, dumb, qual):
fname_encoded, id_ = i.split(delimiter, 1)

if fname_encoded == idx:
id_ = rec + id_
if fname_encoded not in openfps:
continue

current_fp[orientation_offset].write(id_)
current_fp[orientation_offset].write(s)
current_fp[orientation_offset].write(d)
current_fp[orientation_offset].write(q)
orientation_offset = 1 - orientation_offset # switch between 0 / 1
orientation = id_[-2] # -1 is \n
current_fp = openfps[fname_encoded]
id_ = rec + id_
current_fp[orientation].write(id_)
current_fp[orientation].write(s)
current_fp[orientation].write(d)
current_fp[orientation].write(q)

for outfp in current_fp:
outfp.close()
for d in openfps.values():
for f in d.values():
f.close()
15 changes: 14 additions & 1 deletion sequence_processing_pipeline/NuQCJob.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from metapool import load_sample_sheet
from os import stat, makedirs, rename
from os.path import join, basename, dirname, exists
from os.path import join, basename, dirname, exists, abspath
from sequence_processing_pipeline.Job import Job
from sequence_processing_pipeline.PipelineError import PipelineError
from sequence_processing_pipeline.Pipeline import Pipeline
Expand Down Expand Up @@ -351,8 +351,19 @@ def _generate_job_script(self):
if not exists(demux_path):
raise ValueError(f"{demux_path} does not exist.")

# get this file location and add splitter as it should live there
splitter_binary = join(
dirname(abspath(__file__)), 'scripts', 'splitter')
if not exists(splitter_binary):
raise ValueError(f'{splitter_binary} does not exist.')

with open(job_script_path, mode="w", encoding="utf-8") as f:
# the job resources should come from a configuration file

# generate a string of linux system modules to load before
# processing begins.
mtl = ' '.join(self.modules_to_load)

f.write(template.render(job_name=job_name,
queue_name=self.queue_name,
# should be 4 * 24 * 60 = 4 days
Expand All @@ -371,6 +382,8 @@ def _generate_job_script(self):
json_path=json_path,
demux_path=demux_path,
temp_dir=self.temp_dir,
splitter_binary=splitter_binary,
modules_to_load=mtl,
length_limit=self.length_limit))

return job_script_path
8 changes: 4 additions & 4 deletions sequence_processing_pipeline/scripts/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ def cli():
@click.option('--id-map', type=click.Path(exists=True), required=True)
@click.option('--infile', type=click.Path(exists=True), required=True)
@click.option('--output', type=click.Path(exists=True), required=True)
@click.option('--encoded-id', type=str, required=True)
@click.option('--threads', type=int, required=True)
def demux(id_map, infile, output, encoded_id, threads):
demux_cmd(id_map, infile, output, encoded_id, threads)
@click.option('--task', type=int, required=True)
@click.option('--maxtask', type=int, required=True)
def demux(id_map, infile, output, task, maxtask):
demux_cmd(id_map, infile, output, task, maxtask)


if __name__ == '__main__':
Expand Down
Binary file added sequence_processing_pipeline/scripts/splitter
Binary file not shown.
Loading

0 comments on commit 1fb3e71

Please sign in to comment.