forked from broadinstitute/pyro-cov
-
Notifications
You must be signed in to change notification settings - Fork 0
/
moran.py
95 lines (78 loc) · 3.79 KB
/
moran.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# Copyright Contributors to the Pyro-Cov project.
# SPDX-License-Identifier: Apache-2.0
import argparse
from collections import defaultdict
import numpy as np
import pandas as pd
import torch
from pyrocov.sarscov2 import aa_mutation_to_position
# compute moran statistic
def moran(values, distances, lengthscale):
assert values.size(-1) == distances.size(-1)
weights = (distances.unsqueeze(-1) - distances.unsqueeze(-2)) / lengthscale
weights = torch.exp(-weights.pow(2.0))
weights *= 1.0 - torch.eye(weights.size(-1))
weights /= weights.sum(-1, keepdim=True)
output = torch.einsum("...ij,...i,...j->...", weights, values, values)
return output / values.pow(2.0).sum(-1)
# compute moran statistic and do permutation test with given number of permutations
def permutation_test(values, distances, lengthscale, num_perm=9999):
values = values - values.mean()
moran_given = moran(values, distances, lengthscale).item()
idx = [torch.randperm(distances.size(-1)) for _ in range(num_perm)]
idx = torch.stack(idx)
moran_perm = moran(values[idx], distances, lengthscale)
p_value = (moran_perm >= moran_given).sum().item() + 1
p_value /= float(num_perm + 1)
return moran_given, p_value
def main(args):
# read in inferred mutations
df = pd.read_csv("paper/mutations.tsv", sep="\t", index_col=0)
df = df[["mutation", "Δ log R"]]
mutations = df.values[:, 0]
assert mutations.shape == (2337,)
coefficients = df.values[:, 1] if not args.magnitude else np.abs(df.values[:, 1])
gene_map = defaultdict(list)
distance_map = defaultdict(list)
results = []
# collect indices and nucleotide positions corresponding to each mutation
for i, m in enumerate(mutations):
gene = m.split(":")[0]
gene_map[gene].append(i)
distance_map[gene].append(aa_mutation_to_position(m))
# map over each gene
for gene, idx in gene_map.items():
values = torch.from_numpy(np.array(coefficients[idx], dtype=np.float32))
distances = distance_map[gene]
distances = torch.from_numpy(np.array(distances) - min(distances))
gene_size = distances.max().item()
lengthscale = min(gene_size / 20, 50.0)
_, p_value = permutation_test(values, distances, lengthscale, num_perm=999999)
s = "Gene: {} \t #Mut: {} Size: {} \t p-value: {:.6f} Lengthscale: {:.1f}"
print(s.format(gene, distances.size(0), gene_size, p_value, lengthscale))
results.append([distances.size(0), gene_size, p_value, lengthscale])
# compute moran statistic for entire genome for mulitple lengthscales
for global_lengthscale in [100.0, 500.0]:
distances_ = [aa_mutation_to_position(m) for m in mutations]
distances = torch.from_numpy(
np.array(distances_, dtype=np.float32) - min(distances_)
)
values = torch.tensor(np.array(coefficients, dtype=np.float32)).float()
_, p_value = permutation_test(
values, distances, global_lengthscale, num_perm=999999
)
genome_size = distances.max().item()
s = "Entire Genome (#Mut = {}; Size = {}): \t p-value: {:.6f} Lengthscale: {:.1f}"
print(s.format(distances.size(0), genome_size, p_value, global_lengthscale))
results.append([distances.size(0), genome_size, p_value, global_lengthscale])
# save results as csv
results = np.stack(results)
columns = ["NumMutations", "GeneSize", "PValue", "Lengthscale"]
index = list(gene_map.keys()) + ["EntireGenome"] * 2
result = pd.DataFrame(data=results, index=index, columns=columns)
result.to_csv("paper/moran.csv")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Compute moran statistics")
parser.add_argument("--magnitude", action="store_true")
args = parser.parse_args()
main(args)