This repository has been archived by the owner on Jun 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 170
/
schedule.py
90 lines (79 loc) · 4.01 KB
/
schedule.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import asyncio
import time
from printer import Printer
sec_calc = lambda h, m, s: 3600 * int(h) + 60 * int(m) + float(s)
time_minus = lambda t2, t1: (t2 - t1) % 86400
time_str_calc = lambda sec: f'{sec//3600:02.0f}:{sec%3600//60:02.0f}:{sec%60:02.0f}'
def sec_now():
time_tuple = time.localtime()
return sec_calc(time_tuple.tm_hour, time_tuple.tm_min, time_tuple.tm_sec)
class Schedule:
instance = None
def __new__(cls, *args, **kw):
if not cls.instance:
cls.instance = super(Schedule, cls).__new__(cls)
cls.instance.scheduled_sleep = False
return cls.instance
async def run(self, schedule_str):
if schedule_str == '':
Printer().printer("请填入定时休眠时间段", "Warning", "red")
self.scheduled_sleep = False
return
second_array = sorted([[sec_calc(*time_str.split(':')) for time_str in
time_str_pair.split('-')] for time_str_pair in schedule_str.split(';')])
second_array = [[start, end] for (start, end) in second_array if start != end]
if not len(second_array):
Printer().printer("请填入有效时间段", "Warning", "red")
self.scheduled_sleep = False
return
# 按顺序合并有overlap的时间段
second_rearrng = [second_array[0]]
pos = 1
while pos < len(second_array):
if time_minus(second_array[pos][0], second_rearrng[-1][0]) <= \
time_minus(second_rearrng[-1][1], second_rearrng[-1][0]):
if time_minus(second_rearrng[-1][1], second_rearrng[-1][0]) < \
time_minus(second_array[pos][1], second_rearrng[-1][0]):
second_rearrng[-1][1] = second_array[pos][1]
else:
second_rearrng.append(second_array[pos])
pos += 1
# 考虑最后一个跨0点时间段覆盖最开始几个时间段端点的情况
if second_rearrng[-1][1] < second_rearrng[-1][0]:
while len(second_rearrng) > 1:
if second_rearrng[-1][1] > second_rearrng[0][0]:
if second_rearrng[-1][1] < second_rearrng[0][1]:
second_rearrng[-1][1] = second_rearrng[0][1]
del second_rearrng[0]
else:
break
sec_sequence = __import__('functools').reduce(lambda x, y: x+y, second_rearrng)
sec_init = sec_now()
for i in range(len(sec_sequence)):
if sec_sequence[i] > sec_init:
stage = i
break
else:
stage = len(sec_sequence)-1 if sec_sequence[-1] < sec_sequence[-2] else 0
# 当前时间在0时后且在最后一个包含0时的时间段内
if stage == 0 and sec_init < sec_sequence[-1] < sec_sequence[-2]:
stage = len(sec_sequence)-1
if stage % 2 == 1:
self.scheduled_sleep = True
Printer().printer(f"当前处于定时休眠时间段内,下一次取消休眠时间为 {time_str_calc(sec_sequence[stage])}", "Info", "green")
else:
self.scheduled_sleep = False
Printer().printer(f"当前处于定时休眠时间段外,下一次开始休眠时间为 {time_str_calc(sec_sequence[stage])}", "Info", "green")
while True:
sleep_time = (sec_sequence[stage] - sec_now()) % 86400
# 避免因误差刚好过了下个时间点
sleep_time = 0 if sleep_time > 86395 else sleep_time
await asyncio.sleep(sleep_time)
stage += 1
stage = stage % len(sec_sequence)
if stage % 2 == 0:
Printer().printer(f"结束定时休眠,下一次开始休眠时间为 {time_str_calc(sec_sequence[stage])}", "Info", "green")
self.scheduled_sleep = False
else:
Printer().printer(f"开始定时休眠,本次结束休眠时间为 {time_str_calc(sec_sequence[stage])}", "Info", "green")
self.scheduled_sleep = True