diff --git a/CHANGELOG.md b/CHANGELOG.md index db3489d805..908cbe1b2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Input cursor blink effect will now restart correctly when any action is performed on the input https://github.com/Textualize/textual/pull/4773 +### Added + +- Textual will use the `ESCDELAY` env var when detecting escape keys https://github.com/Textualize/textual/pull/4848 + ## [0.75.1] - 2024-08-02 ### Fixed diff --git a/src/textual/_parser.py b/src/textual/_parser.py index 812e063882..e723139b2d 100644 --- a/src/textual/_parser.py +++ b/src/textual/_parser.py @@ -1,172 +1,126 @@ from __future__ import annotations -import io from collections import deque -from typing import Callable, Deque, Generator, Generic, Iterable, TypeVar, Union +from typing import Callable, Deque, Generator, Generic, Iterable, NamedTuple, TypeVar + +from ._time import get_time class ParseError(Exception): - pass + """Base class for parse related errors.""" class ParseEOF(ParseError): """End of Stream.""" -class Awaitable: - __slots__: list[str] = [] - - -class _Read(Awaitable): - __slots__ = ["remaining"] - - def __init__(self, count: int) -> None: - self.remaining = count +class ParseTimeout(ParseError): + """Read has timed out.""" - def __repr__(self) -> str: - return f"_ReadBytes({self.remaining})" +class Read1(NamedTuple): + """Reads a single character.""" -class _Read1(Awaitable): - __slots__: list[str] = [] + timeout: float | None = None + """Optional timeout in seconds.""" -class _ReadUntil(Awaitable): - __slots__ = ["sep", "max_bytes"] +class Peek1(NamedTuple): + """Reads a single character, but does not advance the parser position.""" - def __init__(self, sep: str, max_bytes: int | None = None) -> None: - self.sep = sep - self.max_bytes = max_bytes - - -class _PeekBuffer(Awaitable): - __slots__: list[str] = [] + timeout: float | None = None + """Optional timeout in seconds.""" T = TypeVar("T") - - TokenCallback = Callable[[T], None] class Parser(Generic[T]): - read = _Read - read1 = _Read1 - read_until = _ReadUntil - peek_buffer = _PeekBuffer + """Base class for a simple parser.""" + + read1 = Read1 + peek1 = Peek1 def __init__(self) -> None: - self._buffer = io.StringIO() self._eof = False self._tokens: Deque[T] = deque() self._gen = self.parse(self._tokens.append) - self._awaiting: Union[Awaitable, T] = next(self._gen) + self._awaiting: Read1 | Peek1 = next(self._gen) + self._timeout_time: float | None = None @property def is_eof(self) -> bool: + """Is the parser at the end of the file (i.e. exhausted)?""" return self._eof - def reset(self) -> None: - self._gen = self.parse(self._tokens.append) - self._awaiting = next(self._gen) + def tick(self) -> Iterable[T]: + """Call at regular intervals to check for timeouts.""" + if self._timeout_time is not None and get_time() >= self._timeout_time: + self._timeout_time = None + self._awaiting = self._gen.throw(ParseTimeout()) + while self._tokens: + yield self._tokens.popleft() def feed(self, data: str) -> Iterable[T]: + """Feed data to be parsed. + + Args: + data: Data to parser. + + Raises: + ParseError: If the data could not be parsed. + + Yields: + T: A generic data type. + """ if self._eof: raise ParseError("end of file reached") from None + + tokens = self._tokens + popleft = tokens.popleft + if not data: self._eof = True try: - self._gen.send(self._buffer.getvalue()) + self._gen.throw(EOFError()) except StopIteration: - raise ParseError("end of file reached") from None - while self._tokens: - yield self._tokens.popleft() - - self._buffer.truncate(0) + pass + while tokens: + yield popleft() return - _buffer = self._buffer pos = 0 - tokens = self._tokens - popleft = tokens.popleft data_size = len(data) while tokens: yield popleft() - while pos < data_size or isinstance(self._awaiting, _PeekBuffer): + while pos < data_size: _awaiting = self._awaiting - if isinstance(_awaiting, _Read1): - self._awaiting = self._gen.send(data[pos : pos + 1]) + if isinstance(_awaiting, Read1): + self._timeout_time = None + self._awaiting = self._gen.send(data[pos]) pos += 1 + elif isinstance(_awaiting, Peek1): + self._timeout_time = None + self._awaiting = self._gen.send(data[pos]) - elif isinstance(_awaiting, _PeekBuffer): - self._awaiting = self._gen.send(data[pos:]) - - elif isinstance(_awaiting, _Read): - remaining = _awaiting.remaining - chunk = data[pos : pos + remaining] - chunk_size = len(chunk) - pos += chunk_size - _buffer.write(chunk) - remaining -= chunk_size - if remaining: - _awaiting.remaining = remaining - else: - _awaiting = self._gen.send(_buffer.getvalue()) - _buffer.seek(0) - _buffer.truncate() - - elif isinstance(_awaiting, _ReadUntil): - chunk = data[pos:] - _buffer.write(chunk) - sep = _awaiting.sep - sep_index = _buffer.getvalue().find(sep) - - if sep_index == -1: - pos += len(chunk) - if ( - _awaiting.max_bytes is not None - and _buffer.tell() > _awaiting.max_bytes - ): - self._gen.throw(ParseError(f"expected {sep}")) - else: - sep_index += len(sep) - if ( - _awaiting.max_bytes is not None - and sep_index > _awaiting.max_bytes - ): - self._gen.throw(ParseError(f"expected {sep}")) - data = _buffer.getvalue()[sep_index:] - pos = 0 - self._awaiting = self._gen.send(_buffer.getvalue()[:sep_index]) - _buffer.seek(0) - _buffer.truncate() + if self._awaiting.timeout is not None: + self._timeout_time = get_time() + self._awaiting.timeout while tokens: yield popleft() - def parse(self, on_token: Callable[[T], None]) -> Generator[Awaitable, str, None]: - yield from () - + def parse( + self, token_callback: TokenCallback + ) -> Generator[Read1 | Peek1, str, None]: + """Implement to parse a stream of text. -if __name__ == "__main__": - data = "Where there is a Will there is a way!" + Args: + token_callback: Callable to report a successful parsed data type. - class TestParser(Parser[str]): - def parse( - self, on_token: Callable[[str], None] - ) -> Generator[Awaitable, str, None]: - while True: - data = yield self.read1() - if not data: - break - on_token(data) - - test_parser = TestParser() - - for n in range(0, len(data), 5): - for token in test_parser.feed(data[n : n + 5]): - print(token) - for token in test_parser.feed(""): - print(token) + Yields: + ParseAwaitable: One of `self.read1` or `self.peek1` + """ + yield from () diff --git a/src/textual/_xterm_parser.py b/src/textual/_xterm_parser.py index 5b810273fa..23eaeb4832 100644 --- a/src/textual/_xterm_parser.py +++ b/src/textual/_xterm_parser.py @@ -1,15 +1,16 @@ from __future__ import annotations import re -from typing import Any, Callable, Generator, Iterable +from typing import Any, Generator, Iterable from typing_extensions import Final -from . import events, messages +from . import constants, events, messages from ._ansi_sequences import ANSI_SEQUENCES_KEYS, IGNORE_SEQUENCE from ._keyboard_protocol import FUNCTIONAL_KEYS -from ._parser import Awaitable, Parser, TokenCallback +from ._parser import Parser, ParseTimeout, Peek1, Read1, TokenCallback from .keys import KEY_NAME_REPLACEMENTS, Keys, _character_to_key +from .message import Message # When trying to determine whether the current sequence is a supported/valid # escape sequence, at which length should we give up and consider our search @@ -32,21 +33,20 @@ FOCUSOUT: Final[str] = "\x1b[O" """Sequence received when focus is lost from the terminal.""" +SPECIAL_SEQUENCES = {BRACKETED_PASTE_START, BRACKETED_PASTE_END, FOCUSIN, FOCUSOUT} +"""Set of special sequences.""" + _re_extended_key: Final = re.compile(r"\x1b\[(?:(\d+)(?:;(\d+))?)?([u~ABCDEFHPQRS])") -class XTermParser(Parser[events.Event]): +class XTermParser(Parser[Message]): _re_sgr_mouse = re.compile(r"\x1b\[<(\d+);(\d+);(\d+)([Mm])") - def __init__(self, more_data: Callable[[], bool], debug: bool = False) -> None: - self.more_data = more_data + def __init__(self, debug: bool = False) -> None: self.last_x = 0 self.last_y = 0 - self._debug_log_file = open("keys.log", "at") if debug else None - super().__init__() - self.debug_log("---") def debug_log(self, *args: Any) -> None: # pragma: no cover @@ -54,11 +54,11 @@ def debug_log(self, *args: Any) -> None: # pragma: no cover self._debug_log_file.write(" ".join(args) + "\n") self._debug_log_file.flush() - def feed(self, data: str) -> Iterable[events.Event]: + def feed(self, data: str) -> Iterable[Message]: self.debug_log(f"FEED {data!r}") return super().feed(data) - def parse_mouse_code(self, code: str) -> events.Event | None: + def parse_mouse_code(self, code: str) -> Message | None: sgr_match = self._re_sgr_mouse.match(code) if sgr_match: _buttons, _x, _y, state = sgr_match.groups() @@ -100,26 +100,19 @@ def parse_mouse_code(self, code: str) -> events.Event | None: return event return None - _reissued_sequence_debug_book: Callable[[str], None] | None = None - """INTERNAL USE ONLY! - - If this property is set to a callable, it will be called *instead* of - the reissued sequence being emitted as key events. - """ - - def parse(self, _on_token: TokenCallback) -> Generator[Awaitable, str, None]: + def parse( + self, token_callback: TokenCallback + ) -> Generator[Read1 | Peek1, str, None]: ESC = "\x1b" read1 = self.read1 sequence_to_key_events = self._sequence_to_key_events - more_data = self.more_data paste_buffer: list[str] = [] bracketed_paste = False - use_prior_escape = False - def on_token(token: events.Event) -> None: + def on_token(token: Message) -> None: """Hook to log events.""" self.debug_log(str(token)) - _on_token(token) + token_callback(token) def on_key_token(event: events.Key) -> None: """Token callback wrapper for handling keys. @@ -136,9 +129,12 @@ def on_key_token(event: events.Key) -> None: on_token(event) def reissue_sequence_as_keys(reissue_sequence: str) -> None: - if self._reissued_sequence_debug_book is not None: - self._reissued_sequence_debug_book(reissue_sequence) - return + """Called when an escape sequence hasn't been understood. + + Args: + reissue_sequence: Key sequence to report to the app. + """ + self.debug_log("REISSUE", repr(reissue_sequence)) for character in reissue_sequence: key_events = sequence_to_key_events(character) for event in key_events: @@ -159,134 +155,100 @@ def reissue_sequence_as_keys(reissue_sequence: str) -> None: on_token(events.Paste(pasted_text.replace("\x00", ""))) paste_buffer.clear() - character = ESC if use_prior_escape else (yield read1()) - use_prior_escape = False + try: + character = yield read1() + except EOFError: + return if bracketed_paste: paste_buffer.append(character) self.debug_log(f"character={character!r}") - if character == ESC: - # Could be the escape key was pressed OR the start of an escape sequence - sequence: str = character + if character != ESC: if not bracketed_paste: - # TODO: There's nothing left in the buffer at the moment, - # but since we're on an escape, how can we be sure that the - # data that next gets fed to the parser isn't an escape sequence? - - # This problem arises when an ESC falls at the end of a chunk. - # We'll be at an escape, but peek_buffer will return an empty - # string because there's nothing in the buffer yet. - - # This code makes an assumption that an escape sequence will never be - # "chopped up", so buffers would never contain partial escape sequences. - peek_buffer = yield self.peek_buffer() - if not peek_buffer: - # An escape arrived without any following characters - on_token(events.Key("escape", "\x1b")) - continue - if peek_buffer and peek_buffer[0] == ESC: - # There is an escape in the buffer, so ESC ESC has arrived - yield read1() - on_token(events.Key("escape", "\x1b")) - # If there is no further data, it is not part of a sequence, - # So we don't need to go in to the loop - if len(peek_buffer) == 1 and not more_data(): - continue - - # Look ahead through the suspected escape sequence for a match - while True: - # If we run into another ESC at this point, then we've failed - # to find a match, and should issue everything we've seen within - # the suspected sequence as Key events instead. - sequence_character = yield read1() - new_sequence = sequence + sequence_character - - threshold_exceeded = len(sequence) > _MAX_SEQUENCE_SEARCH_THRESHOLD - found_escape = sequence_character and sequence_character == ESC - - if threshold_exceeded: - # We exceeded the sequence length threshold, so reissue all the - # characters in that sequence as key-presses. - reissue_sequence_as_keys(new_sequence) - break + for event in sequence_to_key_events(character): + on_key_token(event) + if not character: + return + continue - if found_escape: - # We've hit an escape, so we need to reissue all the keys - # up to but not including it, since this escape could be - # part of an upcoming control sequence. - use_prior_escape = True - reissue_sequence_as_keys(sequence) - break + # # Could be the escape key was pressed OR the start of an escape sequence + sequence: str = ESC - sequence = new_sequence + def send_escape() -> None: + """Send escape key and reissue sequence.""" + on_token(events.Key("escape", "\x1b")) + reissue_sequence_as_keys(sequence[1:]) - self.debug_log(f"sequence={sequence!r}") + while True: + try: + new_character = yield read1(constants.ESCAPE_DELAY) + except ParseTimeout: + send_escape() + break + except EOFError: + send_escape() + return + + if new_character == ESC: + send_escape() + sequence = character + continue + else: + sequence += new_character + if len(sequence) > _MAX_SEQUENCE_SEARCH_THRESHOLD: + reissue_sequence_as_keys(sequence) + break + self.debug_log(f"sequence={sequence!r}") + if sequence in SPECIAL_SEQUENCES: if sequence == FOCUSIN: on_token(events.AppFocus()) - break - - if sequence == FOCUSOUT: + elif sequence == FOCUSOUT: on_token(events.AppBlur()) - break - - if sequence == BRACKETED_PASTE_START: + elif sequence == BRACKETED_PASTE_START: bracketed_paste = True - break - - if sequence == BRACKETED_PASTE_END: + elif sequence == BRACKETED_PASTE_END: bracketed_paste = False - break + break - if not bracketed_paste: - # Check cursor position report - if ( - cursor_position_match := _re_cursor_position.match(sequence) - ) is not None: - row, column = cursor_position_match.groups() - # Cursor position report conflicts with f3 key - # If it is a keypress, "row" will be 1, so ignore - if int(row) != 1: - on_token( - events.CursorPosition( - x=int(column) - 1, y=int(row) - 1 - ) - ) - break - - # Was it a pressed key event that we received? - key_events = list(sequence_to_key_events(sequence)) - for key_event in key_events: - on_key_token(key_event) - if key_events: - break - # Or a mouse event? - if (mouse_match := _re_mouse_event.match(sequence)) is not None: - mouse_code = mouse_match.group(0) - event = self.parse_mouse_code(mouse_code) - if event: - on_token(event) + if not bracketed_paste: + # Check cursor position report + cursor_position_match = _re_cursor_position.match(sequence) + if cursor_position_match is not None: + row, column = cursor_position_match.groups() + # Cursor position report conflicts with f3 key + # If it is a keypress, "row" will be 1, so ignore + if int(row) != 1: + x = int(column) - 1 + y = int(row) - 1 + on_token(events.CursorPosition(x, y)) break - # Or a mode report? - # (i.e. the terminal saying it supports a mode we requested) - if ( - mode_report_match := _re_terminal_mode_response.match( - sequence - ) - ) is not None: - if ( - mode_report_match["mode_id"] == "2026" - and int(mode_report_match["setting_parameter"]) > 0 - ): - on_token(messages.TerminalSupportsSynchronizedOutput()) - break + # Was it a pressed key event that we received? + key_events = list(sequence_to_key_events(sequence)) + for key_event in key_events: + on_key_token(key_event) + if key_events: + break + # Or a mouse event? + mouse_match = _re_mouse_event.match(sequence) + if mouse_match is not None: + mouse_code = mouse_match.group(0) + mouse_event = self.parse_mouse_code(mouse_code) + if mouse_event is not None: + on_token(mouse_event) + break - else: - if not bracketed_paste: - for event in sequence_to_key_events(character): - on_key_token(event) + # Or a mode report? + # (i.e. the terminal saying it supports a mode we requested) + mode_report_match = _re_terminal_mode_response.match(sequence) + if mode_report_match is not None: + mode_id = mode_report_match["mode_id"] + setting_parameter = mode_report_match["setting_parameter"] + if mode_id == "2026" and int(setting_parameter) > 0: + on_token(messages.TerminalSupportsSynchronizedOutput()) + break if self._debug_log_file is not None: self._debug_log_file.close() @@ -328,7 +290,7 @@ def _sequence_to_key_events(self, sequence: str) -> Iterable[events.Key]: key_tokens.sort() key_tokens.append(key) yield events.Key( - f'{"+".join(key_tokens)}', sequence if len(sequence) == 1 else None + "+".join(key_tokens), sequence if len(sequence) == 1 else None ) return diff --git a/src/textual/constants.py b/src/textual/constants.py index 4730410e64..3dbbc17988 100644 --- a/src/textual/constants.py +++ b/src/textual/constants.py @@ -115,3 +115,6 @@ def _get_textual_animations() -> AnimationLevel: TEXTUAL_ANIMATIONS: AnimationLevel = _get_textual_animations() """Determines whether animations run or not.""" + +ESCAPE_DELAY: float = _get_environ_int("ESCDELAY", 100) / 1000.0 +"""The delay (in seconds) before reporting an escape key (not used if the extend key protocol is available).""" diff --git a/src/textual/drivers/_input_reader_linux.py b/src/textual/drivers/_input_reader_linux.py index 04604d820b..a7e0a65b10 100644 --- a/src/textual/drivers/_input_reader_linux.py +++ b/src/textual/drivers/_input_reader_linux.py @@ -20,14 +20,6 @@ def __init__(self, timeout: float = 0.1) -> None: self._selector.register(self._fileno, selectors.EVENT_READ) self._exit_event = Event() - def more_data(self) -> bool: - """Check if there is data pending.""" - EVENT_READ = selectors.EVENT_READ - for _key, events in self._selector.select(0.01): - if events & EVENT_READ: - return True - return False - def close(self) -> None: """Close the reader (will exit the iterator).""" self._exit_event.set() diff --git a/src/textual/drivers/_input_reader_windows.py b/src/textual/drivers/_input_reader_windows.py index 7f9aeb1ebb..c001c728e2 100644 --- a/src/textual/drivers/_input_reader_windows.py +++ b/src/textual/drivers/_input_reader_windows.py @@ -17,10 +17,6 @@ def __init__(self, timeout: float = 0.1) -> None: self.timeout = timeout self._exit_event = Event() - def more_data(self) -> bool: - """Check if there is data pending.""" - return True - def close(self) -> None: """Close the reader (will exit the iterator).""" self._exit_event.set() diff --git a/src/textual/drivers/linux_driver.py b/src/textual/drivers/linux_driver.py index b633eaf4b1..4445d43f9a 100644 --- a/src/textual/drivers/linux_driver.py +++ b/src/textual/drivers/linux_driver.py @@ -246,6 +246,8 @@ def on_terminal_resize(signum, stack) -> None: self.write("\x1b[?25l") # Hide cursor self.write("\x1b[?1004h") # Enable FocusIn/FocusOut. self.write("\x1b[>1u") # https://sw.kovidgoyal.net/kitty/keyboard-protocol/ + # Disambiguate escape codes https://sw.kovidgoyal.net/kitty/keyboard-protocol/#progressive-enhancement + self.write("\x1b[1;u") self.flush() self._key_thread = Thread(target=self._run_input_thread) send_size_event() @@ -357,7 +359,7 @@ def _run_input_thread(self) -> None: """ try: self.run_input_thread() - except BaseException as error: + except BaseException: import rich.traceback self._app.call_later( @@ -373,16 +375,9 @@ def run_input_thread(self) -> None: fileno = self.fileno EVENT_READ = selectors.EVENT_READ - def more_data() -> bool: - """Check if there is more data to parse.""" - - for _key, selector_events in selector.select(0.1): - if selector_events & EVENT_READ: - return True - return False - - parser = XTermParser(more_data, self._debug) + parser = XTermParser(self._debug) feed = parser.feed + tick = parser.tick utf8_decoder = getincrementaldecoder("utf-8")().decode decode = utf8_decoder @@ -407,11 +402,12 @@ def process_selector_events( break for event in feed(unicode_data): self.process_event(event) + for event in tick(): + self.process_event(event) try: while not self.exit_event.is_set(): process_selector_events(selector.select(0.1)) - process_selector_events(selector.select(0.1), final=True) finally: diff --git a/src/textual/drivers/linux_inline_driver.py b/src/textual/drivers/linux_inline_driver.py index 66bde69260..06cf874306 100644 --- a/src/textual/drivers/linux_inline_driver.py +++ b/src/textual/drivers/linux_inline_driver.py @@ -124,16 +124,9 @@ def run_input_thread(self) -> None: fileno = self.fileno EVENT_READ = selectors.EVENT_READ - def more_data() -> bool: - """Check if there is more data to parse.""" - - for _key, events in selector.select(0.1): - if events & EVENT_READ: - return True - return False - - parser = XTermParser(more_data, self._debug) + parser = XTermParser(self._debug) feed = parser.feed + tick = parser.tick utf8_decoder = getincrementaldecoder("utf-8")().decode decode = utf8_decoder @@ -142,6 +135,8 @@ def more_data() -> bool: try: while not self.exit_event.is_set(): selector_events = selector.select(0.1) + for event in tick(): + self.process_event(event) for _selector_key, mask in selector_events: if mask & EVENT_READ: unicode_data = decode( diff --git a/src/textual/drivers/web_driver.py b/src/textual/drivers/web_driver.py index 18898b82a8..570e283f1a 100644 --- a/src/textual/drivers/web_driver.py +++ b/src/textual/drivers/web_driver.py @@ -163,7 +163,7 @@ def stop_application_mode(self) -> None: def run_input_thread(self) -> None: """Wait for input and dispatch events.""" input_reader = self._input_reader - parser = XTermParser(input_reader.more_data, debug=self._debug) + parser = XTermParser(debug=self._debug) utf8_decoder = getincrementaldecoder("utf-8")().decode decode = utf8_decoder # The server sends us a stream of bytes, which contains the equivalent of stdin, plus diff --git a/src/textual/drivers/win32.py b/src/textual/drivers/win32.py index bfa4a11fec..b2f707f3f1 100644 --- a/src/textual/drivers/win32.py +++ b/src/textual/drivers/win32.py @@ -9,6 +9,7 @@ from ctypes.wintypes import BOOL, CHAR, DWORD, HANDLE, SHORT, UINT, WCHAR, WORD from typing import IO, TYPE_CHECKING, Callable, List, Optional +from .. import constants from .._xterm_parser import XTermParser from ..events import Event, Resize from ..geometry import Size @@ -226,7 +227,7 @@ def __init__( def run(self) -> None: exit_requested = self.exit_event.is_set - parser = XTermParser(lambda: False) + parser = XTermParser(debug=constants.DEBUG) try: read_count = wintypes.DWORD(0) @@ -243,8 +244,12 @@ def run(self) -> None: append_key = keys.append while not exit_requested(): + + for event in parser.tick(): + self.process_event(event) + # Wait for new events - if wait_for_handles([hIn], 200) is None: + if wait_for_handles([hIn], 100) is None: # No new events continue diff --git a/tests/test_xterm_parser.py b/tests/test_xterm_parser.py index 2995738897..4e0168b921 100644 --- a/tests/test_xterm_parser.py +++ b/tests/test_xterm_parser.py @@ -33,7 +33,7 @@ def chunks(data, size): @pytest.fixture def parser(): - return XTermParser(more_data=lambda: False) + return XTermParser() @pytest.mark.parametrize("chunk_size", [2, 3, 4, 5, 6]) @@ -110,13 +110,9 @@ def test_cant_match_escape_sequence_too_long(parser): @pytest.mark.parametrize( "chunk_size", [ - pytest.param( - 2, marks=pytest.mark.xfail(reason="Fails when ESC at end of chunk") - ), + 2, 3, - pytest.param( - 4, marks=pytest.mark.xfail(reason="Fails when ESC at end of chunk") - ), + 4, 5, 6, ], @@ -132,14 +128,15 @@ def test_unknown_sequence_followed_by_known_sequence(parser, chunk_size): sequence = unknown_sequence + known_sequence events = [] - parser.more_data = lambda: True + for chunk in chunks(sequence, chunk_size): events.append(parser.feed(chunk)) events = list(itertools.chain.from_iterable(list(event) for event in events)) + print(repr([event.key for event in events])) assert [event.key for event in events] == [ - "circumflex_accent", + "escape", "left_square_bracket", "question_mark", "end", @@ -169,14 +166,17 @@ def test_key_presses_and_escape_sequence_mixed(parser): def test_single_escape(parser): """A single \x1b should be interpreted as a single press of the Escape key""" - events = parser.feed("\x1b") + events = list(parser.feed("\x1b")) + events.extend(parser.feed("")) assert [event.key for event in events] == ["escape"] def test_double_escape(parser): - """Windows Terminal writes double ESC when the user presses the Escape key once.""" - events = parser.feed("\x1b\x1b") - assert [event.key for event in events] == ["escape"] + """Test double escape.""" + events = list(parser.feed("\x1b\x1b")) + events.extend(parser.feed("")) + print(events) + assert [event.key for event in events] == ["escape", "escape"] @pytest.mark.parametrize(