-
Notifications
You must be signed in to change notification settings - Fork 0
/
editor.py
195 lines (166 loc) · 6.08 KB
/
editor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
"""
terminal_emulator
~~~~~~~~~~~~~~~~~
An example showing how to use :mod:`pyte` to implement a basic
terminal emulator using Textual.
To exit the application, hit Ctrl-C.
.. note:: This example requires the ``textual`` library, at least v0.6.0.
:copyright: (c) 2022 by pyte authors and contributors,
see AUTHORS for details.
:license: LGPL, see LICENSE for more details.
"""
import asyncio
import fcntl
import os
import pty
import shlex
import struct
import termios
import pyte
from rich.text import Text
from textual import events
from textual.containers import Vertical
from textual.app import App
from textual.widget import Widget
from textual.widgets import Button, Markdown
from textual import on
from textual.events import Resize
class PyteDisplay:
def __init__(self, lines):
self.lines = lines
def __rich_console__(self, console, options):
for line in self.lines:
yield line
class Terminal(Widget, can_focus=True):
def __init__(self, send_queue, recv_queue, ncol, nrow):
self.ctrl_keys = {
"left": "\u001b[D",
"right": "\u001b[C",
"up": "\u001b[A",
"down": "\u001b[B",
}
self.recv_queue = recv_queue
self.send_queue = send_queue
self.nrow = nrow
self.ncol = ncol
self._display = PyteDisplay([Text()])
self._screen = pyte.Screen(self.ncol, self.nrow)
self.stream = pyte.Stream(self._screen)
asyncio.create_task(self.recv())
super().__init__()
self.focus()
def render(self):
return self._display
@on(Resize)
def set_term_size(self, event: Resize):
self._screen.resize(event.container_size.height, event.container_size.width)
self.log(event)
async def on_key(self, event: events.Key) -> None:
char = self.ctrl_keys.get(event.key) or event.character
await self.send_queue.put(["stdin", char])
self.log(self._screen)
async def recv(self):
while True:
message = await self.recv_queue.get()
cmd = message[0]
if cmd == "setup":
await self.send_queue.put(["set_size", self.nrow, self.ncol, 567, 573])
elif cmd == "stdout":
chars = message[1]
self.stream.feed(chars)
lines = []
for i, line in enumerate(self._screen.display):
text = Text.from_ansi(line)
x = self._screen.cursor.x
if i == self._screen.cursor.y and x < len(text):
cursor = text[x]
cursor.stylize("reverse")
new_text = text[:x]
new_text.append(cursor)
new_text.append(text[x + 1:])
text = new_text
lines.append(text)
self._display = PyteDisplay(lines)
self.refresh()
class TerminalController:
def __init__(self, ncol, nrow) -> None:
self.ncol = ncol
self.nrow = nrow
self.data_or_disconnect = None
self.fd = self.open_terminal()
self.p_out = os.fdopen(self.fd, "w+b", 0)
self.recv_queue = asyncio.Queue()
self.send_queue = asyncio.Queue()
self.event = asyncio.Event()
class TerminalEmulator(App):
CSS_PATH = "terminal.tcss"
def __init__(self, ncol, nrow):
self.ncol = ncol
self.nrow = nrow
self.data_or_disconnect = None
self.fd = self.open_terminal()
self.p_out = os.fdopen(self.fd, "w+b", 0)
self.recv_queue = asyncio.Queue()
self.send_queue = asyncio.Queue()
self.event = asyncio.Event()
super().__init__()
def compose(self):
asyncio.create_task(self._run())
asyncio.create_task(self._send_data())
with Vertical():
yield Terminal(self.recv_queue, self.send_queue, self.ncol, self.nrow)
yield Button("heyther")
yield Button("heyther2", id="tel")
yield Markdown("heyther")
def on_mount(self) -> None:
term = self.query_one(Terminal)
# term.styles.background = "darkblue"
# term.styles.border = ("heavy", "white")
# term.styles.height = 5 + 2
self.log("term size", term.size)
term._screen.resize(lines=5)
def on_button_pressed(self, event) -> None:
# self.query_one(Markdown).focus()
self.log(event)
term = self.query_one(Terminal)
if event.button.id == "tel":
term._screen.reset()
return
term.display = not term.display
def open_terminal(self):
pid, fd = pty.fork()
if pid == 0:
argv = shlex.split("zsh")
env = dict(TERM="linux", LC_ALL="en_GB.UTF-8", COLUMNS=str(self.ncol), LINES=str(self.nrow))
os.execvpe(argv[0], argv, env)
return fd
async def _run(self):
loop = asyncio.get_running_loop()
def on_output():
try:
self.data_or_disconnect = self.p_out.read(65536).decode()
self.event.set()
except Exception:
loop.remove_reader(self.p_out)
self.data_or_disconnect = None
self.event.set()
loop.add_reader(self.p_out, on_output)
await self.send_queue.put(["setup", {}])
while True:
msg = await self.recv_queue.get()
if msg[0] == "stdin":
self.p_out.write(msg[1].encode())
elif msg[0] == "set_size":
winsize = struct.pack("HH", msg[1], msg[2])
fcntl.ioctl(self.fd, termios.TIOCSWINSZ, winsize)
async def _send_data(self):
while True:
await self.event.wait()
self.event.clear()
if self.data_or_disconnect is None:
await self.send_queue.put(["disconnect", 1])
else:
await self.send_queue.put(["stdout", self.data_or_disconnect])
if __name__ == "__main__":
app = TerminalEmulator(80, 10)
app.run()