diff --git a/sphinx/util/_inventory_file_reader.py b/sphinx/util/_inventory_file_reader.py new file mode 100644 index 00000000000..d19faa87ea9 --- /dev/null +++ b/sphinx/util/_inventory_file_reader.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import zlib +from typing import TYPE_CHECKING + +from sphinx.util import logging + +BUFSIZE = 16 * 1024 +logger = logging.getLogger(__name__) + +if TYPE_CHECKING: + from collections.abc import Iterator + from typing import Protocol + + # Readable file stream for inventory loading + class _SupportsRead(Protocol): + def read(self, size: int = ...) -> bytes: ... + + +__all__ = ('InventoryFileReader',) + + +class InventoryFileReader: + """A file reader for an inventory file. + + This reader supports mixture of texts and compressed texts. + """ + + def __init__(self, stream: _SupportsRead) -> None: + self.stream = stream + self.buffer = b'' + self.eof = False + + def read_buffer(self) -> None: + chunk = self.stream.read(BUFSIZE) + if chunk == b'': + self.eof = True + self.buffer += chunk + + def readline(self) -> str: + pos = self.buffer.find(b'\n') + if pos != -1: + line = self.buffer[:pos].decode() + self.buffer = self.buffer[pos + 1 :] + elif self.eof: + line = self.buffer.decode() + self.buffer = b'' + else: + self.read_buffer() + line = self.readline() + + return line + + def readlines(self) -> Iterator[str]: + while not self.eof: + line = self.readline() + if line: + yield line + + def read_compressed_chunks(self) -> Iterator[bytes]: + decompressor = zlib.decompressobj() + while not self.eof: + self.read_buffer() + yield decompressor.decompress(self.buffer) + self.buffer = b'' + yield decompressor.flush() + + def read_compressed_lines(self) -> Iterator[str]: + buf = b'' + for chunk in self.read_compressed_chunks(): + buf += chunk + pos = buf.find(b'\n') + while pos != -1: + yield buf[:pos].decode() + buf = buf[pos + 1 :] + pos = buf.find(b'\n') diff --git a/sphinx/util/inventory.py b/sphinx/util/inventory.py index c8f00c86841..0b615b3382e 100644 --- a/sphinx/util/inventory.py +++ b/sphinx/util/inventory.py @@ -15,7 +15,7 @@ if TYPE_CHECKING: import os - from collections.abc import Callable, Iterator, Sequence + from collections.abc import Callable, Sequence from typing import Protocol from sphinx.builders import Builder @@ -29,60 +29,13 @@ def read(self, size: int = ...) -> bytes: ... _JoinFunc = Callable[[str, str], str] -class InventoryFileReader: - """A file reader for an inventory file. - - This reader supports mixture of texts and compressed texts. - """ - - def __init__(self, stream: _SupportsRead) -> None: - self.stream = stream - self.buffer = b'' - self.eof = False - - def read_buffer(self) -> None: - chunk = self.stream.read(BUFSIZE) - if chunk == b'': - self.eof = True - self.buffer += chunk - - def readline(self) -> str: - pos = self.buffer.find(b'\n') - if pos != -1: - line = self.buffer[:pos].decode() - self.buffer = self.buffer[pos + 1 :] - elif self.eof: - line = self.buffer.decode() - self.buffer = b'' - else: - self.read_buffer() - line = self.readline() - - return line - - def readlines(self) -> Iterator[str]: - while not self.eof: - line = self.readline() - if line: - yield line - - def read_compressed_chunks(self) -> Iterator[bytes]: - decompressor = zlib.decompressobj() - while not self.eof: - self.read_buffer() - yield decompressor.decompress(self.buffer) - self.buffer = b'' - yield decompressor.flush() - - def read_compressed_lines(self) -> Iterator[str]: - buf = b'' - for chunk in self.read_compressed_chunks(): - buf += chunk - pos = buf.find(b'\n') - while pos != -1: - yield buf[:pos].decode() - buf = buf[pos + 1 :] - pos = buf.find(b'\n') +def __getattr__(name: str) -> object: + if name == 'InventoryFileReader': + from sphinx.util._inventory_file_reader import InventoryFileReader + + return InventoryFileReader + msg = f'module {__name__!r} has no attribute {name!r}' + raise AttributeError(msg) class InventoryFile: