Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix output glitch with inline driver #5015

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 39 additions & 13 deletions src/textual/drivers/linux_inline_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import rich.repr

from textual import events
from textual._loop import loop_last
from textual._parser import ParseError
from textual._xterm_parser import XTermParser
from textual.driver import Driver
from textual.geometry import Size
Expand Down Expand Up @@ -132,23 +134,47 @@ def run_input_thread(self) -> None:
decode = utf8_decoder
read = os.read

def process_selector_events(
selector_events: list[tuple[selectors.SelectorKey, int]],
final: bool = False,
) -> None:
"""Process events from selector.

Args:
selector_events: List of selector events.
final: True if this is the last call.

"""
for last, (_selector_key, mask) in loop_last(selector_events):
if mask & EVENT_READ:
unicode_data = decode(read(fileno, 1024 * 4), final=final and last)
if not unicode_data:
# This can occur if the stdin is piped
break
for event in feed(unicode_data):
if isinstance(event, events.CursorPosition):
self.cursor_origin = (event.x, event.y)
else:
self.process_event(event)
for event in tick():
if isinstance(event, events.CursorPosition):
self.cursor_origin = (event.x, event.y)
else:
self.process_event(event)
self.process_event(event)

try:
while not self.exit_event.is_set():
selector_events = selector.select(0.1)
for event in tick():
self.process_event(event)
for _selector_key, mask in selector_events:
if mask & EVENT_READ:
unicode_data = decode(
read(fileno, 1024), final=self.exit_event.is_set()
)
for event in feed(unicode_data):
if isinstance(event, events.CursorPosition):
self.cursor_origin = (event.x, event.y)
else:
self.process_event(event)
process_selector_events(selector.select(0.1))
process_selector_events(selector.select(0.1), final=True)

finally:
selector.close()
try:
for event in feed(""):
pass
except ParseError:
pass

def start_application_mode(self) -> None:
loop = asyncio.get_running_loop()
Expand Down
Loading