-
Notifications
You must be signed in to change notification settings - Fork 10
/
parse.py
142 lines (103 loc) · 3.38 KB
/
parse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def write_network_file(G, output_file_handle):
output_file_handle.write("# Tail Node\tHead Node\tWeight\n")
for edge in G.edges(data=True):
line = \
str(edge[0]) + "\t" + \
str(edge[1]) + "\t" + \
str(edge[2]['weight']) + "\n"
output_file_handle.write(line)
def get_source_set(nodes_file_handle):
sources = set()
for line in nodes_file_handle:
if not is_comment_line(line):
tokens = tokenize(line)
if tokens[1].strip() in ['source', 'receptor']:
sources.add(tokens[0])
return sources
def get_target_set(nodes_file_handle):
targets = set()
for line in nodes_file_handle:
if not is_comment_line(line):
tokens = tokenize(line)
if tokens[1].strip() in ['target', 'tr', 'tf']:
targets.add(tokens[0])
return targets
def get_node_set(network_file_handle):
nodes = set()
for line in network_file_handle:
if not is_comment_line(line):
tokens = tokenize(line)
nodes.add(tokens[0])
nodes.add(tokens[1])
return nodes
def get_edge_set(network_file_handle):
edges = set()
for line in network_file_handle:
if not is_comment_line(line):
tokens = tokenize(line)
edge = get_edge(tokens)
edges.add(edge)
return edges
def get_ranked_nodes(ranked_edges):
"""
Given: a list of sets of edges, where all nodes in the set have
the same rank
Outputs: A list of sets of nodes, where all nodes in the set have
the same rank
"""
ranked_nodes = []
overall_set = set()
current_set = set()
for edge_set in ranked_edges:
for edge in edge_set:
for node in edge[0]:
if node not in overall_set:
overall_set.add(node)
current_set.add(node)
ranked_nodes.append(current_set)
current_set = set()
return ranked_nodes
def parse_ranked_edges(ranked_edges_handle):
ranked_edges = []
current_set = set()
prev_rank = 1
for line in ranked_edges_handle:
if not is_comment_line(line):
tokens = tokenize(line)
edge = get_edge(tokens)
rank = int(get_edge_rank(tokens))
weight = float(get_edge_weight(tokens))
if rank == prev_rank:
current_set.add((edge, weight))
else:
ranked_edges.append(current_set)
current_set = set()
current_set.add((edge, weight))
prev_rank = rank
ranked_edges.append(current_set)
return ranked_edges
def parse_ranked_paths(ranked_paths_handle):
ranked_paths = []
prev_rank = 1
for line in ranked_paths_handle:
if not is_comment_line(line):
tokens = tokenize(line)
path = get_path(tokens)
ranked_paths.append(path)
return ranked_paths
def is_comment_line(line):
if line.lstrip().startswith("#"):
return True
return False
def tokenize(line):
return line.split("\t")
def get_edge(tokens):
return (tokens[0], tokens[1])
def get_edge_rank(tokens):
return tokens[2]
def get_edge_weight(tokens):
return tokens[3]
def get_path(tokens):
return (tokens[2].strip(), tokens[1])
def get_path_rank(tokens):
return tokens[1]