-
Notifications
You must be signed in to change notification settings - Fork 2
/
find_duplicate_markers.py
72 lines (64 loc) · 2.47 KB
/
find_duplicate_markers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#!/usr/bin/env python
from __future__ import print_function
import sys
from optparse import OptionParser
duplicates = {}
def add_duplicates(current_marker, previous_marker):
sequence = "{}.{}".format(previous_marker[1], previous_marker[3])
if sequence in duplicates:
duplicates[sequence].add(current_marker[0])
duplicates[sequence].add(current_marker[2])
else:
duplicates[sequence] = set([current_marker[0], current_marker[2]])
def process_fasta(fasta_file):
previous_marker = []
current_marker = []
with open(fasta_file, "r") as fa:
lines = []
for line in fa:
lines.append(str(line.strip()))
if len(lines) >= 4:
if current_marker == []:
current_marker = lines
else:
previous_marker = current_marker
current_marker = lines
print("{} {}".format(current_marker[3], previous_marker[3]), file=sys.stderr)
if current_marker[1] == previous_marker[1] and current_marker[3] == previous_marker[3]:
add_duplicates(current_marker, previous_marker)
lines = []
if len(lines) > 0:
raise Exception("Leftover lines in fasta, make sure ref and alt fastas are paired for each marker")
fasta_to_remove = set()
for duplicate_set in duplicates.values():
for duplicate in duplicate_set:
fasta_to_remove.add(duplicate)
print("{} duplicate sequences removed".format(str(len(fasta_to_remove))), file=sys.stderr)
with open(fasta_file, "r") as fa:
lines = []
for line in fa:
lines.append(str(line.strip()))
if len(lines) >= 2:
if not (lines[0] in fasta_to_remove):
print(lines[0])
print(lines[1])
lines = []
if len(lines) > 0:
raise Exception("Leftover lines in fasta, make sure fastas are complete")
def main():
usage = """%prog -i <fasta>
Given a fasta file of paired marker sequences, sorted by sequence, remove duplicated sequences.
Author: Allison Regier
"""
parser = OptionParser(usage)
parser.add_option("-i", dest="fasta",
help="a fasta file",
metavar="FILE")
(opts, args) = parser.parse_args()
if opts.fasta is None:
parser.print_help()
print()
else:
process_fasta(opts.fasta)
if __name__ == "__main__":
sys.exit(main())