-
Notifications
You must be signed in to change notification settings - Fork 0
/
__main__.py
190 lines (149 loc) · 6.02 KB
/
__main__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from sys import argv
from os import system
from pathlib import Path
from argparse import ArgumentParser
from .cfex import CFEX
from time import time, sleep
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from watchdog.events import DirModifiedEvent, DirCreatedEvent, DirDeletedEvent
from threading import Thread
def _call(cmd, file=Path('temp')):
system(f'{cmd} > temp')
return file.read_text()
def call(cmd, file=Path('temp')):
print(f'{cmd} > temp')
return _call(cmd, file)
class SSH:
def __init__(self, configs):
self.key = configs['SSH_KEY']
self.user = configs['SSH_USER']
self.host = configs['SSH_HOST']
self.local_dir = Path(
configs['LOCAL_DIR'].replace('~', str(Path.home())))
self.src_dir = Path(configs.get('SRC_DIR', None))
self.remote_dir = Path(configs['REMOTE_DIR'])
if self.src_dir is None:
self.src_dir = self.remote_dir
self.local_store = self.local_dir / self.src_dir.name
def cmd(self, cmd):
return _call(f'ssh -i {self.key} {self.user}@{self.host} "{cmd}"')
def check(self):
answer = self.cmd(f'ls {self.remote_dir.parent}')
return self.remote_dir.name in answer
def download(self, path, r=False):
r_flag = '-r' if r else ''
call(f'scp -i {self.key} {r_flag} {self.user}\
@{self.host}:{path} {self.local_dir}')
def upload(self, _from, _to):
call(f'scp -i {self.key} {_from} {self.user}@{self.host}:{_to}')
def create_file(self, _to):
call(f'ssh -i {self.key} {self.user}@{self.host} "touch {_to}"')
def create_dir(self, _to):
call(f'ssh -i {self.key} {self.user}@{self.host} "mkdir {_to}"')
def delete_file(self, _to):
call(f'ssh -i {self.key} {self.user}@{self.host} "rm {_to}"')
def delete_dir(self, _to):
call(f'ssh -i {self.key} {self.user}@{self.host} "rm -r {_to}"')
def move(self, _from, _to):
call(f'ssh -i {self.key} {self.user}@{self.host} "mv {_from} {_to}"')
class EventHandler(FileSystemEventHandler):
file_cache = set()
next_clear = time() + 300
def __init__(self, ssh, configs):
self.ssh = ssh
self.ignore = configs.get('IGNORE', [])
def dispatch(self, event):
if self.use_cache(event) and not any(
[i in event.src_path for i in self.ignore]
):
super().dispatch(event)
def use_cache(self, event):
seconds = int(time())
if seconds > self.next_clear:
self.file_cache = set()
self.next_clear = seconds + 300
key = (seconds, event)
alt_key = (seconds - 1, event)
if key in self.file_cache or alt_key in self.file_cache:
return False
alt_key = (seconds + 1, event)
self.file_cache.add(key)
self.file_cache.add(alt_key)
return True
def get_path(self, event, attr='src_path'):
absolute_path = Path(getattr(event, attr))
relative_path = getattr(event, attr).replace(
str(ssh.local_store) + '/', '')
dist_path = ssh.remote_dir / relative_path
return absolute_path, dist_path
def on_modified(self, event):
if not isinstance(event, DirModifiedEvent):
self.ssh.upload(*self.get_path(event))
def on_created(self, event):
if isinstance(event, DirCreatedEvent):
_, dir = self.get_path(event)
self.ssh.create_dir(dir)
else:
_, file = self.get_path(event)
self.ssh.create_file(file)
def on_deleted(self, event):
if isinstance(event, DirDeletedEvent):
_, dir = self.get_path(event)
self.ssh.delete_dir(dir)
else:
_, file = self.get_path(event)
self.ssh.delete_file(file)
def on_moved(self, event):
_, src = self.get_path(event)
_, dist = self.get_path(event, 'dest_path')
self.ssh.move(src, dist)
def load_project_config(dir_path, config_file='.colossos.cfex'):
config_file = dir_path / config_file
if not config_file.is_file():
raise Exception('This folder don\'t have a colossos project!')
return CFEX(config_file).load()
class Subscriber(Thread):
def __init__(self, *args, ssh=None, configs=None, life=None, **kwargs):
super().__init__(*args, **kwargs)
self.life = life
self.subscribe = configs.get('SUBSCRIBE', [])
self.execute = configs.get('EXECUTE', [])
self.subscribe_update = configs.get('SUBSCRIBE_UPDATE', 1)
self.execute_update = configs.get('EXECUTE_UPDATE', 1)
self.ssh = ssh
def run(self):
while self.life.is_alive():
if int(time()) % self.subscribe_update == 0:
for file in self.subscribe:
self.ssh.download(self.ssh.remote_dir / file)
if int(time()) % self.execute_update == 0:
for cmd in self.execute:
print(self.ssh.cmd(cmd))
sleep(1)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("-d", "--dir", required=True,
help="directory for sync")
parser.add_argument("-s", "--sync", default=False,
action='store_true', help="sync remote directory")
args = parser.parse_args(argv[1:])
dir_path = Path(args.dir)
CONFIGS = load_project_config(dir_path)
ssh = SSH(CONFIGS)
if not ssh.check():
raise Exception('Remote dir don\'t exist!')
if args.sync:
ssh.download(CONFIGS['REMOTE_DIR'], True)
print('Observer started')
observer = Observer()
event = EventHandler(ssh, CONFIGS)
observer.schedule(event, ssh.local_store, recursive=True)
threads = [observer]
subscribe_len = len(CONFIGS.get('SUBSCRIBE', []))
execute_len = len(CONFIGS.get('EXECUTE', []))
if subscribe_len > 0 or execute_len > 0:
threads.append(Subscriber(ssh=ssh, configs=CONFIGS, life=observer))
for thread in threads:
thread.start()
observer.join()