forked from ubiome-opensource/microbiome-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ubiomeMultiSample.py
142 lines (106 loc) · 5.29 KB
/
ubiomeMultiSample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import sys
import csv
class UbiomeMultiSample(object):
"""
merge a bunch of samples into a single data frame called 'samples'
[ fullTaxList, sample1Quantities, sample2Quantities, ... ]
fullTaxList is a list containing strings of tax_name; fullTaxList[0] = "tax_name"
sampleQuantities is a list where sampleQuantities[0] = "July" and [1..n] correspond to Quants for fullTaxList[1..n]
usage: (assuming sample1 and sample2 are of class UbiomeSample)
x = UbiomeMultiSample() # initializing
x.merge(sample1) #
"""
def __init__(self,newSample = []):
"""
:rtype: UbiomeMultiSample
"""
self.fullTaxList = [["tax_name","tax_rank"]]
self.samples = []
if newSample:
self.fullTaxList +=newSample.taxnames()
# self.samples+=[[newSample.name]+[sample["count_norm"] for sample in newSample.sampleList]]
self.samples+=[[newSample.name]+[sample.count_norm for sample in newSample.taxaList]]
def alltaxa(self):
''' returns just the taxa in this multisample
:return: list
'''
alltaxa = []
for sample in self.fullTaxList:
taxa = sample[0]
alltaxa += [taxa]
return alltaxa
def showContents(self):
print("length of fullTaxList=",len(self.fullTaxList))
print([self.fullTaxList[i] for i in range(10)])
print("length of samples=",len(self.samples))
for sample in self.samples:
print(sample[0],"--->",len(sample))
print("latest sample:",[self.samples[len(self.samples)-1][i] for i in range(10)])
def __str__(self):
return "UbiomeMultiSample: len=" + str(len(self.fullTaxList)) + "\n" + "latest sample:"+str([self.samples[len(self.samples)-1][i] for i in range(10)])
def merge(self,sample2):
""" merge the current multiSample with sample2. This operation is mutable
so you permanently modify the current UbiomeMultiSample when you do this.
:param sample2: UbiomeSample
:return:
"""
if len(self.fullTaxList)==1: #special case when there are no existing samples
self.fullTaxList +=sample2.taxnames()
self.samples+=[[sample2.name]+[sample.count_norm for sample in sample2.taxaList]]
return True
# find the taxNames missing from fullTaxList
newTaxNamesL = []
sampleTaxNames = sample2.taxnames()
#Sample2ZippedList = sample2.taxnames()
#justSample2TaxNames, justSample2TaxRanks = zip(*Sample2ZippedList)
# justFullTaxNames, justFullTaxRanks = zip(*self.fullTaxList)
#newTaxRanksL = []
for i,taxName in enumerate(sampleTaxNames):
if taxName not in self.fullTaxList:
newTaxNamesL+=[taxName]
# newTaxRanksL+=[justFullTaxRanks[i]]
self.fullTaxList+=newTaxNamesL
newTaxons = [sample2.taxonOf(taxa[0])for taxa in newTaxNamesL]
#[sample["count_norm"] for sample in sample2.sampleList]
oldSamplesList = self.samples[len(self.samples)-1]
newSampleCountsForPreviousTaxa = []
for i in range(len(oldSamplesList)-1):
taxonForTaxName = sample2.taxonOf(self.fullTaxList[i+1][0])
if taxonForTaxName:
taxCount = taxonForTaxName["count_norm"]
else: taxCount = 0
newSampleCountsForPreviousTaxa+=[taxCount]
newCounts = newSampleCountsForPreviousTaxa + [taxon["count_norm"] for taxon in newTaxons]
self.samples += [[sample2.name] + newCounts]
# new length of a sample is len(newTaxons)+ len(oldSamplesList)
# fill previous samples with count_norm = 0
for i, sample in enumerate(self.samples):
if len(self.samples[i])<(len(newCounts)+1):
self.samples[i]=self.samples[i] + [0 for k in range(len(newTaxons))]
def writeCSV(self,filename):
""" write the merged bunch of sample to a single CSV file (or sys.stdout)
:param filename:
:return:
"""
if filename==sys.stdout:
ubiomeWriter = csv.DictWriter(sys.stdout,dialect='excel',fieldnames=["tax_name"]+ ["tax_rank"] + [sample[0] for sample in self.samples])
#print('writing to csv')
ubiomeWriter.writeheader()
for i,taxa in enumerate(self.fullTaxList):
taxName, taxRank = taxa
rowDict = ["tax_name",taxName]
rankDict = ["tax_rank",taxRank]
sampleDict = [[sample[0],sample[i]] for sample in self.samples]
ubiomeWriter.writerow(dict([rowDict]+[rankDict] +sampleDict))
else:
with open(filename,'w') as csvFile:
#print('writing to csv')
ubiomeWriter = csv.DictWriter(csvFile, dialect='excel',fieldnames=["tax_name"]+ ["tax_rank"] + [sample[0] for sample in self.samples])
#print('writing to csv')
ubiomeWriter.writeheader()
for i,taxa in enumerate(self.fullTaxList):
taxName, taxRank = taxa
rowDict = ["tax_name",taxName]
rankDict = ["tax_rank",taxRank]
sampleDict = [[sample[0],sample[i]] for sample in self.samples]
ubiomeWriter.writerow(dict([rowDict]+[rankDict] +sampleDict))