-
Notifications
You must be signed in to change notification settings - Fork 0
/
mid_man.py
executable file
·110 lines (103 loc) · 4.38 KB
/
mid_man.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/env python
import sys
import time
import threading
from conf import parse_config
from db_man import DBConnector
from time_util import get_time_now, get_sleep_time, get_time_isoformat
from model_server import ServerPostModel, PointDataWithTS
from diffsum import diffsum_timespan_sharp
def worker(server, config):
db_agent = DBConnector(config.AgentDB, config)
db_que = DBConnector(config.QueDB, config)
tag = f"Mid:{server.ServerID}: "
while True:
# since db is set to None when the connection has been rejected.
# see below.
if db_agent is None:
db_agent = DBConnector(config.AgentDB, config)
if db_que is None:
db_que = DBConnector(config.QueDB, config)
#
config.logger.debug(f"{tag}resumed: {get_time_isoformat()}")
cur_ts = get_time_now().timestamp()
# server.IntegrationDeferTime delays to start this process.
# So, cur_ts is enough to get the number of sampling data.
try:
psvt = db_agent.get_data_raw(server.Points, cur_ts)
except db_agent.ConnectionError as e:
db_agent = None
# through here internal loop and try to connect DB.
else:
out_data = []
del_data = []
for pid,svt in psvt.items():
config.logger.debug(f"{tag}{pid}: {len(svt)} data exist.")
if len(svt) > 0:
pinfo = config.Points[pid]
if pinfo.IntegrationSpan > 0:
sumspan = pinfo.IntegrationSpan
ret = diffsum_timespan_sharp(
svt, sumspan=sumspan,
margin=server.IntegrationMarginTime,
ulimit=pinfo.ULimitRotationNumber,
config=config)
values = ret["vsum_list"]
rest_offset = ret["rest_offset"]
else:
values = [svt[-1][1]]
rest_offset = 0
# make data to post the queue db.
tslast = get_time_isoformat(ts=svt[rest_offset][0])
vx = PointDataWithTS(PointID=pid, Values=values,
TSLastValue=tslast)
config.logger.debug(f"{tag}{pid}: {vx.Values} {vx.TSLastValue}")
out_data.append(vx.dict())
if rest_offset > 0:
# Note: score - 1 is to leave the latest data.
last_score = svt[rest_offset-1][0] - 1
else:
last_score = svt[-1][0]
del_data.append((server.Points, last_score))
# post data to the queue db.
if len(out_data) > 0:
config.logger.debug(f"{tag}{len(out_data)} data queued.")
config.logger.debug(f"{tag}{out_data}")
try:
db_que.post_data_ready(server.ServerID, out_data)
except db_que.ConnectionError as e:
db_que = None
# try to connect DB.
else:
# delete data if posting data is successful.
try:
for p,s in del_data:
db_agent.del_data_raw(p,s)
except db_agent.ConnectionError as e:
db_agent = None
# through here and try to connedt to the db next time.
pass
# sleep
sleep_time = get_sleep_time(server.PostInterval,
server.IntegrationDeferTime)
config.logger.debug(f"{tag}sleep: {get_time_isoformat()}, "
f"resume in {sleep_time}.")
time.sleep(sleep_time)
def start_wokers(config):
workers = []
for x in config.Servers:
config.logger.debug(f"starting worker for {x.ServerID}")
workers.append(threading.Thread(target=worker,
args=(x, config),
daemon=False))
# start threads.
for x in workers:
x.start()
#
# Main
#
PROG_NAME = "middle_man"
if __name__ == "__main__":
config = parse_config(PROG_NAME, sys.argv[1:])
config.logger.info(f"Starting {PROG_NAME}")
start_wokers(config)