Skip to content

Commit

Permalink
Minor shifting around.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sachaa-Thanasius committed Sep 21, 2024
1 parent d1875de commit efd328b
Showing 1 changed file with 59 additions and 36 deletions.
95 changes: 59 additions & 36 deletions src/defer_imports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,16 @@ def _decode_source(self) -> str:
return newline_decoder.decode(self.data.decode(self.encoding)) # pyright: ignore # Common buffer types have decode method.

def _get_node_context(self, node: ast.stmt): # noqa: ANN202 # Version-dependent and too verbose.
"""Get the location context for a node. That context will be used as an argument to SyntaxError."""
"""Get the location context for a node.
Notes
-----
The return value can serve as the "details" argument for SyntaxError [1]_.
References
----------
.. [1] https://docs.python.org/3.14/library/exceptions.html#SyntaxError
"""

text = ast.get_source_segment(self._decode_source(), node, padded=True)
context = (str(self.filepath), node.lineno, node.col_offset + 1, text)
Expand Down Expand Up @@ -564,39 +573,9 @@ def _check_ast_for_defer_usage(data: ast.AST) -> tuple[str, bool]:
class _DeferredFileLoader(SourceFileLoader):
"""A file loader that instruments .py files which use "with defer_imports.until_use: ..."."""

defer_module_level: bool

def create_module(self, spec: ModuleSpec) -> _tp.Optional[_tp.ModuleType]:
# In regular import flows, this method always runs before source_to_code, so defer_module_level should always
# exist.
self.defer_module_level = spec.loader_state["defer_module_level"] if (spec.loader_state is not None) else False
return super().create_module(spec)

def source_to_code(self, data: _SourceData, path: _ModulePath, *, _optimize: int = -1) -> _tp.CodeType: # pyright: ignore [reportIncompatibleMethodOverride]
# NOTE: Signature of SourceFileLoader.source_to_code at runtime and in typeshed aren't consistent.

if not data:
return super().source_to_code(data, path, _optimize=_optimize) # pyright: ignore # See note above.

# Check if the given data uses "with defer_imports.until_use".
if isinstance(data, ast.AST):
encoding, uses_defer = _check_ast_for_defer_usage(data)
else:
encoding, uses_defer = _check_source_for_defer_usage(data)

if not uses_defer:
return super().source_to_code(data, path, _optimize=_optimize) # pyright: ignore # See note above.

# Instrument the AST of the given data.
if isinstance(data, ast.AST):
orig_tree = data
else:
orig_tree = ast.parse(data, path, "exec")

transformer = _DeferredInstrumenter(data, path, encoding, module_level=self.defer_module_level)
new_tree = ast.fix_missing_locations(transformer.visit(orig_tree))

return super().source_to_code(new_tree, path, _optimize=_optimize) # pyright: ignore # See note above.
def __init__(self, *args: _tp.Any, **kwargs: _tp.Any) -> None:
super().__init__(*args, **kwargs)
self.defer_module_level: bool = False

def get_data(self, path: str) -> bytes:
"""Return the data from path as raw bytes.
Expand Down Expand Up @@ -649,8 +628,51 @@ def set_data(self, path: str, data: _tp.ReadableBuffer, *, _mode: int = 0o666) -

return super().set_data(path, data, _mode=_mode)

# NOTE: Signature of SourceFileLoader.source_to_code at runtime and in typeshed aren't consistent.
def source_to_code(self, data: _SourceData, path: _ModulePath, *, _optimize: int = -1) -> _tp.CodeType: # pyright: ignore [reportIncompatibleMethodOverride]
"""Compile 'data' into a code object, but not before potentially instrumenting it.
Parameters
----------
data: _SourceData
Anything that compile() can handle.
path: _ModulePath:
Where the data was retrieved (when applicable).
Returns
-------
_tp.CodeType
The compiled code object.
"""

if not data:
return super().source_to_code(data, path, _optimize=_optimize) # pyright: ignore # See note above.

if isinstance(data, ast.AST):
encoding, uses_defer = _check_ast_for_defer_usage(data)
else:
encoding, uses_defer = _check_source_for_defer_usage(data)

if not uses_defer:
return super().source_to_code(data, path, _optimize=_optimize) # pyright: ignore # See note above.

if isinstance(data, ast.AST):
orig_tree = data
else:
orig_tree = ast.parse(data, path, "exec")

transformer = _DeferredInstrumenter(data, path, encoding, module_level=self.defer_module_level)
new_tree = ast.fix_missing_locations(transformer.visit(orig_tree))

return super().source_to_code(new_tree, path, _optimize=_optimize) # pyright: ignore # See note above.

def exec_module(self, module: _tp.ModuleType) -> None:
"""Execute the module, but only after getting state from module.__spec__.loader_state if present."""

if (spec := module.__spec__) and spec.loader_state is not None:
self.defer_module_level = spec.loader_state["defer_module_level"]

_DEFER_LOADER_DETAILS = (_DeferredFileLoader, SOURCE_SUFFIXES)
return super().exec_module(module)


class _DeferredFileFinder(FileFinder):
Expand Down Expand Up @@ -804,8 +826,9 @@ def install_import_hook(
automatically by using it as a context manager.
"""

loader_details = (_DeferredFileLoader, SOURCE_SUFFIXES)
path_hook = _DeferredFileFinder.path_hook(
_DEFER_LOADER_DETAILS,
loader_details,
defer_all=apply_all,
deferred_modules=module_names,
recursive=recursive,
Expand Down

0 comments on commit efd328b

Please sign in to comment.