-
-
Notifications
You must be signed in to change notification settings - Fork 100
/
terminal_daemon.py
129 lines (105 loc) · 3.6 KB
/
terminal_daemon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# -*- coding: utf-8 -*-
#
# gdb-frontend is a easy, flexible and extensionable gui debugger
#
# https://github.com/rohanrhu/gdb-frontend
# https://oguzhaneroglu.com/projects/gdb-frontend/
#
# Licensed under GNU/GPLv3
# Copyright (C) 2019, Oğuzhan Eroğlu (https://oguzhaneroglu.com/) <[email protected]>
"""
GDBFrontend's VT100 Terminal Sharing
This handler is used by WebSocket handler and provides
terminal sharing fundementals over WebSocket.
"""
import os
import pty
import json
import subprocess
import threading
import time
import select
import struct
import fcntl
import termios
import signal
import atexit
import traceback
import util
class TerminalDaemon:
ws = False
pty_proc = False
pty_fd = False
pty_pid = False
terminal_command = False
def __init__(self, ws, terminal_command):
self.ws = ws
self.terminal_command = terminal_command
def start(self):
util.verbose("Spawning terminal process for client#%d: \"%s\"" % (self.ws.client_id, " ".join(self.terminal_command)))
pgid = os.getpgrp()
(self.pty_pid, self.pty_fd) = pty.fork()
if self.pty_pid == 0:
pty_proc = subprocess.Popen(self.terminal_command)
try:
os.setpgid(pty_proc.pid, self.pty_pid)
except Exception as e:
util.verbose(e, traceback.format_exc())
pty_proc.wait()
exit(0)
else:
@atexit.register
def terminatePTYProc():
util.verbose("Sending SIGKILL on terminatePTYProc() to PTY process. (%s)" % " ".join(self.terminal_command))
os.killpg(self.pty_pid, signal.SIGKILL)
os.kill(self.pty_pid, signal.SIGKILL)
os.waitpid(self.pty_pid, 0)
th = threading.Thread(target=self.syncTerm)
th.setDaemon(True)
th.start()
def stop(self):
util.verbose("Sending SIGKILL on TerminalDaemon.stop() to PTY process. (%s)" % " ".join(self.terminal_command))
os.killpg(self.pty_pid, signal.SIGKILL)
os.kill(self.pty_pid, signal.SIGKILL)
os.waitpid(self.pty_pid, 0)
def syncTerm(self):
while self.ws.ws_connected:
time.sleep(0.01)
(ready_fds, _, _) = select.select([self.pty_fd], [], [], 0)
if not ready_fds:
continue
try:
output = os.read(self.pty_fd, 4094)
except IOError:
continue
message = {
"event": "terminal_data",
"data": output.decode("utf-8", errors="ignore")
}
self.wsSend(json.dumps(message))
try:
os.killpg(self.pty_pid, signal.SIGKILL)
os.kill(self.pty_pid, signal.SIGKILL)
except:
pass
def wsSend(self, message):
try:
self.ws.wsSend(message)
except Exception as e:
pass
def handleMessage(self):
message = json.loads(self.ws.message)
if message["event"] == "terminal_resize":
try:
size = struct.pack("HHHH", message["rows"], message["cols"], message["width"], message["height"])
fcntl.ioctl(self.pty_fd, termios.TIOCSWINSZ, size)
except:
pass
return True
elif message["event"] == "terminal_data":
try:
os.write(self.pty_fd, message["data"].encode("utf-8"))
except:
return True
return True
return False