Skip to content

Commit

Permalink
Merge pull request #4848 from Textualize/esc-delay
Browse files Browse the repository at this point in the history
add ESCDELAY environment var
  • Loading branch information
willmcgugan authored Aug 8, 2024
2 parents 3bdd363 + 1a400cc commit b2af20c
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 296 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

- Input cursor blink effect will now restart correctly when any action is performed on the input https://github.com/Textualize/textual/pull/4773

### Added

- Textual will use the `ESCDELAY` env var when detecting escape keys https://github.com/Textualize/textual/pull/4848

## [0.75.1] - 2024-08-02

### Fixed
Expand Down
178 changes: 66 additions & 112 deletions src/textual/_parser.py
Original file line number Diff line number Diff line change
@@ -1,172 +1,126 @@
from __future__ import annotations

import io
from collections import deque
from typing import Callable, Deque, Generator, Generic, Iterable, TypeVar, Union
from typing import Callable, Deque, Generator, Generic, Iterable, NamedTuple, TypeVar

from ._time import get_time


class ParseError(Exception):
pass
"""Base class for parse related errors."""


class ParseEOF(ParseError):
"""End of Stream."""


class Awaitable:
__slots__: list[str] = []


class _Read(Awaitable):
__slots__ = ["remaining"]

def __init__(self, count: int) -> None:
self.remaining = count
class ParseTimeout(ParseError):
"""Read has timed out."""

def __repr__(self) -> str:
return f"_ReadBytes({self.remaining})"

class Read1(NamedTuple):
"""Reads a single character."""

class _Read1(Awaitable):
__slots__: list[str] = []
timeout: float | None = None
"""Optional timeout in seconds."""


class _ReadUntil(Awaitable):
__slots__ = ["sep", "max_bytes"]
class Peek1(NamedTuple):
"""Reads a single character, but does not advance the parser position."""

def __init__(self, sep: str, max_bytes: int | None = None) -> None:
self.sep = sep
self.max_bytes = max_bytes


class _PeekBuffer(Awaitable):
__slots__: list[str] = []
timeout: float | None = None
"""Optional timeout in seconds."""


T = TypeVar("T")


TokenCallback = Callable[[T], None]


class Parser(Generic[T]):
read = _Read
read1 = _Read1
read_until = _ReadUntil
peek_buffer = _PeekBuffer
"""Base class for a simple parser."""

read1 = Read1
peek1 = Peek1

def __init__(self) -> None:
self._buffer = io.StringIO()
self._eof = False
self._tokens: Deque[T] = deque()
self._gen = self.parse(self._tokens.append)
self._awaiting: Union[Awaitable, T] = next(self._gen)
self._awaiting: Read1 | Peek1 = next(self._gen)
self._timeout_time: float | None = None

@property
def is_eof(self) -> bool:
"""Is the parser at the end of the file (i.e. exhausted)?"""
return self._eof

def reset(self) -> None:
self._gen = self.parse(self._tokens.append)
self._awaiting = next(self._gen)
def tick(self) -> Iterable[T]:
"""Call at regular intervals to check for timeouts."""
if self._timeout_time is not None and get_time() >= self._timeout_time:
self._timeout_time = None
self._awaiting = self._gen.throw(ParseTimeout())
while self._tokens:
yield self._tokens.popleft()

def feed(self, data: str) -> Iterable[T]:
"""Feed data to be parsed.
Args:
data: Data to parser.
Raises:
ParseError: If the data could not be parsed.
Yields:
T: A generic data type.
"""
if self._eof:
raise ParseError("end of file reached") from None

tokens = self._tokens
popleft = tokens.popleft

if not data:
self._eof = True
try:
self._gen.send(self._buffer.getvalue())
self._gen.throw(EOFError())
except StopIteration:
raise ParseError("end of file reached") from None
while self._tokens:
yield self._tokens.popleft()

self._buffer.truncate(0)
pass
while tokens:
yield popleft()
return

_buffer = self._buffer
pos = 0
tokens = self._tokens
popleft = tokens.popleft
data_size = len(data)

while tokens:
yield popleft()

while pos < data_size or isinstance(self._awaiting, _PeekBuffer):
while pos < data_size:
_awaiting = self._awaiting
if isinstance(_awaiting, _Read1):
self._awaiting = self._gen.send(data[pos : pos + 1])
if isinstance(_awaiting, Read1):
self._timeout_time = None
self._awaiting = self._gen.send(data[pos])
pos += 1
elif isinstance(_awaiting, Peek1):
self._timeout_time = None
self._awaiting = self._gen.send(data[pos])

elif isinstance(_awaiting, _PeekBuffer):
self._awaiting = self._gen.send(data[pos:])

elif isinstance(_awaiting, _Read):
remaining = _awaiting.remaining
chunk = data[pos : pos + remaining]
chunk_size = len(chunk)
pos += chunk_size
_buffer.write(chunk)
remaining -= chunk_size
if remaining:
_awaiting.remaining = remaining
else:
_awaiting = self._gen.send(_buffer.getvalue())
_buffer.seek(0)
_buffer.truncate()

elif isinstance(_awaiting, _ReadUntil):
chunk = data[pos:]
_buffer.write(chunk)
sep = _awaiting.sep
sep_index = _buffer.getvalue().find(sep)

if sep_index == -1:
pos += len(chunk)
if (
_awaiting.max_bytes is not None
and _buffer.tell() > _awaiting.max_bytes
):
self._gen.throw(ParseError(f"expected {sep}"))
else:
sep_index += len(sep)
if (
_awaiting.max_bytes is not None
and sep_index > _awaiting.max_bytes
):
self._gen.throw(ParseError(f"expected {sep}"))
data = _buffer.getvalue()[sep_index:]
pos = 0
self._awaiting = self._gen.send(_buffer.getvalue()[:sep_index])
_buffer.seek(0)
_buffer.truncate()
if self._awaiting.timeout is not None:
self._timeout_time = get_time() + self._awaiting.timeout

while tokens:
yield popleft()

def parse(self, on_token: Callable[[T], None]) -> Generator[Awaitable, str, None]:
yield from ()

def parse(
self, token_callback: TokenCallback
) -> Generator[Read1 | Peek1, str, None]:
"""Implement to parse a stream of text.
if __name__ == "__main__":
data = "Where there is a Will there is a way!"
Args:
token_callback: Callable to report a successful parsed data type.
class TestParser(Parser[str]):
def parse(
self, on_token: Callable[[str], None]
) -> Generator[Awaitable, str, None]:
while True:
data = yield self.read1()
if not data:
break
on_token(data)

test_parser = TestParser()

for n in range(0, len(data), 5):
for token in test_parser.feed(data[n : n + 5]):
print(token)
for token in test_parser.feed(""):
print(token)
Yields:
ParseAwaitable: One of `self.read1` or `self.peek1`
"""
yield from ()
Loading

0 comments on commit b2af20c

Please sign in to comment.