-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
375 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
import pysam | ||
import pathlib | ||
import sys | ||
import re | ||
|
||
def extractfasta(bed: pathlib.Path, fasta: pathlib.Path, maxout: int = 10000, returninfo: bool = False): | ||
""" | ||
Extract sequences from a FASTA file using a BED file. | ||
:param bed: Path to the BED file. | ||
:param fasta: Path to the FASTA file. | ||
:param maxout: Maximum region size to output (smaller ones skipped). | ||
:param returninfo: Return the locus information as well as the sequence. | ||
:return: DNA sequence as a string. | ||
""" | ||
# read fasta file | ||
ref = pysam.FastaFile(fasta) | ||
|
||
with open(bed) as f: | ||
for line in f: | ||
locus = line.strip().split() | ||
if maxout is not None and int(locus[2]) - int(locus[1]) > maxout: | ||
continue | ||
try: | ||
sequence = ref.fetch(locus[0], int(locus[1]), int(locus[2])) | ||
except KeyError as e: | ||
sys.stderr.write(f'Error: {e}\n') | ||
continue | ||
if returninfo: | ||
yield (locus[0], locus[1], locus[2], locus[3]), sequence | ||
else: | ||
yield (locus[0], locus[1], locus[2]), [sequence] | ||
|
||
def longesthomopolymer(sequence): | ||
""" | ||
Find the longest homopolymer in a sequence. | ||
If there are multiple longest homopolymers, the last one is returned. | ||
Split on Ns | ||
:param sequence: DNA sequence as a string. | ||
:return: length of the longest homopolymer. | ||
>>> longesthomopolymer('ATTTTGGGGGCCCCC') | ||
5 | ||
>>> longesthomopolymer('ATTTTGGGGGNNNNNNNCCCCC') | ||
5 | ||
>>> longesthomopolymer('ATTNTTGGGNGGNNNNNNNCCCNCC') | ||
3 | ||
>>> longesthomopolymer('') is None | ||
True | ||
>>> longesthomopolymer('NNNNNNNNNNNNNN') is None | ||
True | ||
""" | ||
# if sequence is all Ns, return None | ||
if len(sequence) == sequence.count('N'): | ||
return None | ||
maximum = count = 0 | ||
for s in filter(None, re.split('N', sequence, flags=re.IGNORECASE)): | ||
current = '' | ||
for c in s: | ||
if c == current: | ||
count += 1 | ||
else: | ||
count = 1 | ||
current = c | ||
maximum = max(count,maximum) | ||
return maximum | ||
|
||
def GCcontent(sequence): | ||
""" | ||
Calculate the GC content of a sequence. | ||
:param sequence: DNA sequence as a string. | ||
:return: GC content as a float. | ||
>>> GCcontent('ATTTTGGGGGCCCCCATTTT') | ||
0.5 | ||
>>> GCcontent('ATTT') | ||
0.0 | ||
>>> GCcontent('ATTTTGGGGGNNNNNNNNNNNCCCCCATTTT') | ||
0.5 | ||
>>> GCcontent('GGGGCCC') | ||
1.0 | ||
>>> GCcontent('') is None | ||
True | ||
>>> GCcontent('NNNNNNNNNNN') is None | ||
True | ||
""" | ||
if len(sequence) == sequence.count('N'): | ||
return None | ||
sequence = sequence.upper() | ||
return (sequence.count('G') + sequence.count('C')) / (len(sequence) - sequence.count('N')) | ||
|
||
if __name__ == "__main__": | ||
import doctest | ||
doctest.testmod() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
import pandas as pd | ||
import bioframe as bf | ||
import os | ||
import sys | ||
import TRlib | ||
|
||
def annotateFasta(trgt_definitions: str, fasta: str): | ||
"""Extract the sequence from the FASTA file using the locus coordinates | ||
and annotate the TRGT definitions with the sequence and other information.""" | ||
for (chrom, start, end, info), sequence in TRlib.extractfasta(trgt_definitions, fasta, maxout=None, returninfo=True): | ||
|
||
TRid = info.split(';')[0].split('=')[1] | ||
motifs = info.split(';')[1].split('=')[1] | ||
motifslist = motifs.split(',') | ||
struc = info.split(';')[2].split('=')[1] | ||
longest_homopolymer = TRlib.longesthomopolymer(sequence) | ||
if longest_homopolymer is None: | ||
longest_homopolymer = 'NA' | ||
gc_content = TRlib.GCcontent(sequence) | ||
if gc_content is None: | ||
gc_content = 'NA' | ||
n_motifs = len(motifslist) | ||
min_motiflen = len(min(motifslist, key=len)) | ||
max_motiflen = len(max(motifslist, key=len)) | ||
|
||
result = [chrom, int(start), int(end), TRid, | ||
longest_homopolymer, gc_content, | ||
n_motifs, min_motiflen, max_motiflen, | ||
motifs, struc, | ||
] | ||
yield result | ||
|
||
def main(trgt_definitions: str, fasta: str, *, output: str = 'stdout', bed_files: list[str] = None, bed_names: list[str] = None): | ||
""" | ||
Outputs a bed file with annotated TRGT definitions including length of longest homopolymer, | ||
GC content, number of motifs, min and max motif length, and motifs. | ||
If annotation bed files are provided, number of bp overlap is reported. | ||
:param trgt_definitions: Path to the TRGT definitions bed file | ||
:param fasta: Path to the reference FASTA file | ||
:param bed_files: Paths to one or more bed files | ||
:param bed_names: Column names for the bed files (must be the same length as bed_files) | ||
:output: bed files with annotated TRGT definitions | ||
""" | ||
if output != 'stdout': | ||
outstream = open(output, 'w') | ||
sys.stdout = outstream | ||
else: | ||
outstream = sys.stdout | ||
|
||
header = ['chrom', 'start', 'end', 'TRid', | ||
'longest_homopolymer', 'gc_content', | ||
'n_motifs', 'min_motiflen', 'max_motiflen', | ||
'motifs', 'struc', | ||
] | ||
|
||
if bed_files: | ||
if bed_names: | ||
if len(bed_names) != len(bed_files): | ||
raise ValueError('bed_names must be the same length as bed_files') | ||
else: | ||
bed_names = [os.path.splitext(os.path.basename(bed_file))[0] for bed_file in bed_files] | ||
|
||
trgt_df = pd.DataFrame(columns=header, data=annotateFasta(trgt_definitions, fasta)) | ||
|
||
# Read the bed files into a dictionary of pandas DataFrames | ||
bed_dfs = {} | ||
if bed_files: | ||
for bed_name, bed_file in zip(bed_names,bed_files): | ||
bed_dfs[bed_name] = pd.read_csv(bed_file, sep='\t', header=None) | ||
colnames = [str(x) for x in bed_dfs[bed_name].columns] | ||
colnames[:3] = ['chrom', 'start', 'end'] | ||
bed_dfs[bed_name].columns = colnames | ||
|
||
# Overlap the bed files with the TRGT DataFrame | ||
for bed_name, bed_df in bed_dfs.items(): | ||
trgt_df = bf.coverage(trgt_df, bed_df) | ||
trgt_df = trgt_df.rename(columns={'coverage':bed_name}) | ||
|
||
# Write the TRGT definitions with annotations to the output file | ||
trgt_df = trgt_df.rename(columns={'chrom':'#chrom'}) | ||
trgt_df.to_csv(outstream, sep='\t', header=True, index=False) | ||
|
||
|
||
if __name__ == "__main__": | ||
import doctest | ||
doctest.testmod() | ||
import defopt | ||
defopt.run(main) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
#!/usr/bin/env python | ||
|
||
import sys | ||
import cyvcf2 | ||
import pathlib | ||
import subprocess | ||
import re | ||
import pysam | ||
from itertools import groupby | ||
from TRlib import extractfasta | ||
|
||
def extractvcf(infile: pathlib.Path, minins: int = 8): | ||
cyvcf2_vcf = cyvcf2.VCF(infile) | ||
for locus in cyvcf2_vcf: | ||
if locus.INFO['SVTYPE'] == 'INS' and locus.INFO['SVLEN'] >= minins: | ||
yield (locus.CHROM, locus.POS, locus.POS), locus.ALT | ||
|
||
def parsetrsolve(motif_string: str): | ||
""" | ||
Parse the output of tr-solve version 0.2.1+ | ||
Parse the motifs and their start/end positions | ||
e.g. 'CAG_0_18,A_18_38' -> ['CAG', 'A'], [0, 38] | ||
>>> parsetrsolve('AGAGGCGCGGCGCGCCGGCGCAGGCGCAG_0_597') | ||
('AGAGGCGCGGCGCGCCGGCGCAGGCGCAG', 0, 597) | ||
>>> parsetrsolve('CAG_0_18') | ||
('CAG', 0, 18) | ||
>>> parsetrsolve('A_18_38') | ||
('A', 18, 38) | ||
""" | ||
|
||
motif, start, end = motif_string.split('_') | ||
# except ValueError as e: | ||
# sys.stderr.write(f'Error: Cannot parse {motif_list} - {e}\n') | ||
# raise | ||
assert 'N' not in motif, f'N found in motif: {motif}' | ||
start = int(start) | ||
end = int(end) | ||
|
||
return motif, start, end | ||
|
||
def runtrsolve(sequence: str, trsolve: pathlib.Path): | ||
""" | ||
Run tr-solve on a sequence by calling the binary. | ||
Example command: | ||
echo input | tr-solve > output | ||
Report the bases of the TRGT motifs. | ||
DNA sequence as a string. | ||
Return a list of TRGT motifs and the start/end of the left and rightmost motifs. | ||
>>> runtrsolve('ATATATATATATATATATATATAT', trsolve='~/tools/tr-solve-v0.2.1-linux_x86_64') | ||
(['AT'], [0, 24]) | ||
>>> runtrsolve('CAGCAGCAGCAGCAGCAGAAAAAAAAAAAAAAAAAAAA', trsolve='~/tools/tr-solve-v0.2.1-linux_x86_64') | ||
(['CAG', 'A'], [0, 38]) | ||
>>> runtrsolve('GGCACGGCATATATATATATATATATATATAT', trsolve='~/tools/tr-solve-v0.2.1-linux_x86_64') | ||
(['AT'], [8, 32]) | ||
""" | ||
sequence = sequence.upper() | ||
if 'N' in sequence: | ||
sys.stderr.write(f'N found in sequence: {sequence}\n') | ||
# Remove Ns from the sequence | ||
sequence = re.sub('N', '', sequence) | ||
sys.stderr.write(f'Removed Ns from sequence to produce: {sequence}\n') | ||
command = f'echo {sequence} | {trsolve}' | ||
#sys.stderr.write(f'Running: {command}\n') | ||
try: | ||
result = subprocess.run(command, shell=True, check=True, | ||
stdout=subprocess.PIPE, stderr=subprocess.PIPE) | ||
except subprocess.CalledProcessError as e: | ||
sys.stderr.write(f'Error: {command}\n') | ||
sys.stderr.write(f'{e.stderr.decode("utf-8")}\n') | ||
return [], [None, None] #XXX: Should this be an error? | ||
|
||
motif_string_list = result.stdout.decode('utf-8').strip().split('\t') | ||
if len(motif_string_list) < 3: # No motif structure reported | ||
return [], [None, None] | ||
|
||
# Trim the positions to the bases covered by the motifs | ||
minpos = None | ||
maxpos = None | ||
motifs = [] | ||
|
||
for motif_string in motif_string_list[2].split(','): | ||
motif, start, end = parsetrsolve(motif_string) | ||
|
||
motifs.append(motif) | ||
if minpos is None: | ||
minpos = int(start) | ||
if maxpos is None: | ||
maxpos = int(end) | ||
if int(start) < minpos: | ||
minpos = int(start) | ||
if int(end) > maxpos: | ||
maxpos = int(end) | ||
|
||
return motifs, [minpos, maxpos] | ||
|
||
def rmdup(seq): | ||
"""remove adjacent duplicates from a list | ||
>>> rmdup([1, 1, 2, 3, 3, 3, 4, 5, 5]) | ||
[1, 2, 3, 4, 5] | ||
>>> rmdup(['a', 'a', 'b', 'c', 'c', 'c', 'a', 'd', 'e', 'e']) | ||
['a', 'b', 'c', 'a', 'd', 'e'] | ||
""" | ||
return [x[0] for x in groupby(seq)] | ||
|
||
def main(infile: pathlib.Path, outfile: str = 'stdout', *, | ||
fasta: pathlib.Path = None, minins: int = 8, maxout: int = 10000, | ||
trtools: pathlib.Path = pathlib.Path('tr-solve'), | ||
seqout: bool = False, trim: bool = False): | ||
""" | ||
Convert a VCF or BED file to TRGT repeat definitions. | ||
:param infile: Path to the input VCF or BED file. | ||
:param outfile: Path to the output TRGT repeat definitions file. | ||
:param fasta: Path to the reference FASTA file (required for BED inputs). | ||
:param minins: Minimum insertion size to use from VCF. | ||
:param maxout: Maximum region size to output (smaller ones skipped). | ||
:param trtools: Path to the tr-solve binary. | ||
:param seqout: Output all input sequences. When false, sequences with no motif found are skipped. | ||
:param trim: Trim the output coordinates to the bases covered by the motifs. | ||
""" | ||
if infile.suffix == '.vcf' or infile.suffixes[-2:] == ['.vcf', '.gz']: | ||
sequences = extractvcf(infile, minins=minins) | ||
elif infile.suffix == '.bed': | ||
if fasta is None: | ||
raise ValueError('BED input requires FASTA file') | ||
sequences = extractfasta(infile, fasta, maxout=maxout) | ||
else: | ||
raise ValueError(f'Unknown input file type: {infile.suffix}') | ||
|
||
if outfile == 'stdout': | ||
f = sys.stdout | ||
else: | ||
f = open(outfile, 'w') | ||
|
||
for (chrom, start_orig, end_orig), alts in sequences: | ||
for alt in alts: | ||
motifs, bounds = runtrsolve(alt, trsolve=trtools) | ||
#sys.stderr.write(f'{motifs}\t{bounds}\n') | ||
|
||
if len(motifs) == 0: | ||
if seqout: | ||
f.write(f'{alt}\tNone\n') | ||
continue | ||
start = int(start_orig) | ||
end = int(end_orig) | ||
|
||
if trim: | ||
|
||
# Trim the bounds to the bases covered by the motifs | ||
if bounds[0] is not None: | ||
start += bounds[0] | ||
if bounds[1] is not None: | ||
end = start + bounds[1] - bounds[0] | ||
|
||
if start > int(end_orig): | ||
start = int(end_orig) | ||
if end < int(start_orig): | ||
end = int(start_orig) | ||
|
||
unique_motifs = dict.fromkeys(motifs) # can use set(motifs) if order doesn't matter. Assume it does for now | ||
motifs_str = ','.join(unique_motifs) | ||
struc = ''.join([f'({motif})n' for motif in rmdup(motifs)]) | ||
if seqout: | ||
outstring = f'{alt}\t' | ||
else: | ||
outstring = '' | ||
outstring += f'{chrom}\t{start}\t{end}\tID={chrom}_{start}_{end};MOTIFS={motifs_str};STRUC={struc}\n' | ||
f.write(outstring) | ||
|
||
f.flush() | ||
|
||
if __name__ == "__main__": | ||
import doctest | ||
doctest.testmod() | ||
import defopt | ||
defopt.run(main) |