From 6182b2aeed36cee7b925904ed1606a5c6f844b7d Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Fri, 8 Nov 2024 18:42:43 -0500 Subject: [PATCH] Update executing and PEP 667 --- src/objprint/executing/__init__.py | 2 + .../executing/_position_node_finder.py | 420 +++++++++++++++++- src/objprint/executing/executing.py | 151 ++----- src/objprint/executing/py.typed | 0 4 files changed, 426 insertions(+), 147 deletions(-) create mode 100644 src/objprint/executing/py.typed diff --git a/src/objprint/executing/__init__.py b/src/objprint/executing/__init__.py index 1b157e7..b645197 100644 --- a/src/objprint/executing/__init__.py +++ b/src/objprint/executing/__init__.py @@ -1,6 +1,8 @@ """ Get information about what a frame is currently doing. Typical usage: + import executing + node = executing.Source.executing(frame).node # node will be an AST node or None """ diff --git a/src/objprint/executing/_position_node_finder.py b/src/objprint/executing/_position_node_finder.py index e5cddc3..0f83441 100644 --- a/src/objprint/executing/_position_node_finder.py +++ b/src/objprint/executing/_position_node_finder.py @@ -46,8 +46,10 @@ def mangled_name(node: EnhancedAST) -> str: elif isinstance(node, ast.ExceptHandler): assert node.name name = node.name + elif sys.version_info >= (3,12) and isinstance(node,ast.TypeVar): + name=node.name else: - raise TypeError("no node to mangle") + raise TypeError("no node to mangle for type "+repr(type(node))) if name.startswith("__") and not name.endswith("__"): @@ -70,7 +72,7 @@ def mangled_name(node: EnhancedAST) -> str: @lru_cache(128) # pragma: no mutate def get_instructions(code: CodeType) -> list[dis.Instruction]: - return list(dis.get_instructions(code, show_caches=True)) + return list(dis.get_instructions(code)) types_cmp_issue_fix = ( @@ -112,7 +114,7 @@ class PositionNodeFinder(object): """ def __init__(self, frame: FrameType, stmts: Set[EnhancedAST], tree: ast.Module, lasti: int, source: Source): - self.bc_list = get_instructions(frame.f_code) + self.bc_dict={bc.offset:bc for bc in get_instructions(frame.f_code) } self.source = source self.decorator: Optional[EnhancedAST] = None @@ -139,7 +141,7 @@ def __init__(self, frame: FrameType, stmts: Set[EnhancedAST], tree: ast.Module, # we ignore here the start position and try to find the ast-node just by end position and expected node type # This is save, because there can only be one attribute ending at a specific point in the source code. typ = (ast.Attribute,) - elif self.opname(lasti) == "CALL": + elif self.opname(lasti) in ("CALL", "CALL_KW"): # A CALL instruction can be a method call, in which case the lineno and col_offset gets changed by the compiler. # Therefore we ignoring here this attributes and searchnig for a Call-node only by end_col_offset and end_lineno. # This is save, because there can only be one method ending at a specific point in the source code. @@ -154,13 +156,18 @@ def __init__(self, frame: FrameType, stmts: Set[EnhancedAST], tree: ast.Module, typ=typ, ) - self.known_issues(self.result, self.instruction(lasti)) + instruction = self.instruction(lasti) + assert instruction is not None + + self.result = self.fix_result(self.result, instruction) + + self.known_issues(self.result, instruction) self.test_for_decorator(self.result, lasti) # verify if self.decorator is None: - self.verify(self.result, self.instruction(lasti)) + self.verify(self.result, instruction) else: assert_(self.decorator in self.result.decorator_list) @@ -176,21 +183,24 @@ def test_for_decorator(self, node: EnhancedAST, index: int) -> None: # index opname # ------------------ - # index-4 PRECALL + # index-4 PRECALL (only in 3.11) # index-2 CACHE # index CALL <- the call instruction # ... CACHE some CACHE instructions # maybe multiple other bytecode blocks for other decorators - # index-4 PRECALL + # index-4 PRECALL (only in 3.11) # index-2 CACHE # index CALL <- index of the next loop # ... CACHE some CACHE instructions # index+x STORE_* the ast-node of this instruction points to the decorated thing - if self.opname(index - 4) != "PRECALL" or self.opname(index) != "CALL": # pragma: no mutate - break # pragma: no mutate + if not ( + (self.opname(index - 4) == "PRECALL" or sys.version_info >= (3, 12)) + and self.opname(index) == "CALL" + ): # pragma: no mutate + break # pragma: no mutate index += 2 @@ -205,7 +215,94 @@ def test_for_decorator(self, node: EnhancedAST, index: int) -> None: self.decorator = node return - index += 4 + if sys.version_info < (3, 12): + index += 4 + + def fix_result( + self, node: EnhancedAST, instruction: dis.Instruction + ) -> EnhancedAST: + if ( + sys.version_info >= (3, 12, 5) + and instruction.opname in ("GET_ITER", "FOR_ITER") + and isinstance(node.parent, ast.For) + and node is node.parent.iter + ): + # node positions have changed in 3.12.5 + # https://github.com/python/cpython/issues/93691 + # `for` calls __iter__ and __next__ during execution, the calling + # expression of these calls was the ast.For node since cpython 3.11 (see test_iter). + # cpython 3.12.5 changed this to the `iter` node of the loop, to make tracebacks easier to read. + # This keeps backward compatibility with older executing versions. + + # there are also cases like: + # + # for a in iter(l): pass + # + # where `iter(l)` would be otherwise the resulting node for the `iter()` call and the __iter__ call of the for implementation. + # keeping the old behaviour makes it possible to distinguish both cases. + + return node.parent + + if ( + sys.version_info >= (3, 12, 6) + and instruction.opname in ("GET_ITER", "FOR_ITER") + and isinstance( + node.parent.parent, + (ast.ListComp, ast.SetComp, ast.DictComp, ast.GeneratorExp), + ) + and isinstance(node.parent,ast.comprehension) + and node is node.parent.iter + ): + # same as above but only for comprehensions, see: + # https://github.com/python/cpython/issues/123142 + + return node.parent.parent + + if sys.version_info >= (3, 12,6) and instruction.opname == "CALL": + before = self.instruction_before(instruction) + if ( + before is not None + and before.opname == "LOAD_CONST" + and before.positions == instruction.positions + and isinstance(node.parent, ast.withitem) + and node is node.parent.context_expr + ): + # node positions for with-statements have change + # and is now equal to the expression which created the context-manager + # https://github.com/python/cpython/pull/120763 + + # with context_manager: + # ... + + # but there is one problem to distinguish call-expressions from __exit__() + + # with context_manager(): + # ... + + # the call for __exit__ + + # 20 1:5 1:22 LOAD_CONST(None) + # 22 1:5 1:22 LOAD_CONST(None) + # 24 1:5 1:22 LOAD_CONST(None) + # 26 1:5 1:22 CALL() # <-- same source range as context_manager() + + # but we can use the fact that the previous load for None + # has the same source range as the call, wich can not happen for normal calls + + # we return the same ast.With statement at the and to preserve backward compatibility + + return node.parent.parent + + if ( + sys.version_info >= (3, 12,6) + and instruction.opname == "BEFORE_WITH" + and isinstance(node.parent, ast.withitem) + and node is node.parent.context_expr + ): + # handle positions changes for __enter__ + return node.parent.parent + + return node def known_issues(self, node: EnhancedAST, instruction: dis.Instruction) -> None: if instruction.opname in ("COMPARE_OP", "IS_OP", "CONTAINS_OP") and isinstance( @@ -259,6 +356,37 @@ def known_issues(self, node: EnhancedAST, instruction: dis.Instruction) -> None: # TODO: investigate raise KnownIssue("pattern matching ranges seems to be wrong") + if ( + sys.version_info >= (3, 12) + and isinstance(node, ast.Call) + and isinstance(node.func, ast.Name) + and node.func.id == "super" + ): + # super is optimized to some instructions which do not map nicely to a Call + + # find the enclosing function + func = node.parent + while hasattr(func, "parent") and not isinstance( + func, (ast.AsyncFunctionDef, ast.FunctionDef) + ): + + func = func.parent + + # get the first function argument (self/cls) + first_arg = None + + if hasattr(func, "args"): + args = [*func.args.posonlyargs, *func.args.args] + if args: + first_arg = args[0].arg + + if (instruction.opname, instruction.argval) in [ + ("LOAD_DEREF", "__class__"), + ("LOAD_FAST", first_arg), + ("LOAD_DEREF", first_arg), + ]: + raise KnownIssue("super optimization") + if self.is_except_cleanup(instruction, node): raise KnownIssue("exeption cleanup does not belong to the last node in a except block") @@ -279,6 +407,43 @@ def known_issues(self, node: EnhancedAST, instruction: dis.Instruction) -> None: raise KnownIssue("store __classcell__") + if ( + instruction.opname == "CALL" + and not isinstance(node,ast.Call) + and any(isinstance(p, ast.Assert) for p in parents(node)) + and sys.version_info >= (3, 11, 2) + ): + raise KnownIssue("exception generation maps to condition") + + if sys.version_info >= (3, 13): + if instruction.opname in ( + "STORE_FAST_STORE_FAST", + "STORE_FAST_LOAD_FAST", + "LOAD_FAST_LOAD_FAST", + ): + raise KnownIssue(f"can not map {instruction.opname} to two ast nodes") + + if instruction.opname == "LOAD_FAST" and instruction.argval == "__class__": + # example: + # class T: + # def a(): + # super() + # some_node # <- there is a LOAD_FAST for this node because we use super() + + raise KnownIssue( + f"loading of __class__ is accociated with a random node at the end of a class if you use super()" + ) + + if ( + instruction.opname == "COMPARE_OP" + and isinstance(node, ast.UnaryOp) + and isinstance(node.operand,ast.Compare) + and isinstance(node.op, ast.Not) + ): + # work around for + # https://github.com/python/cpython/issues/114671 + self.result = node.operand + @staticmethod def is_except_cleanup(inst: dis.Instruction, node: EnhancedAST) -> bool: if inst.opname not in ( @@ -392,6 +557,13 @@ def node_match(node_type: Union[Type, Tuple[Type, ...]], **kwargs: Any) -> bool: # call to the generator function return + if ( + sys.version_info >= (3, 12) + and inst_match(("LOAD_FAST_AND_CLEAR", "STORE_FAST")) + and node_match((ast.ListComp, ast.SetComp, ast.DictComp)) + ): + return + if inst_match(("CALL", "CALL_FUNCTION_EX")) and node_match( (ast.ClassDef, ast.Call) ): @@ -410,6 +582,7 @@ def node_match(node_type: Union[Type, Tuple[Type, ...]], **kwargs: Any) -> bool: if ( ( inst_match("LOAD_METHOD", argval="join") + or inst_match("LOAD_ATTR", argval="join") # 3.12 or inst_match(("CALL", "BUILD_STRING")) ) and node_match(ast.BinOp, left=ast.Constant, op=ast.Mod) @@ -495,9 +668,14 @@ def node_match(node_type: Union[Type, Tuple[Type, ...]], **kwargs: Any) -> bool: ): return - if inst_match(("JUMP_IF_TRUE_OR_POP", "JUMP_IF_FALSE_OR_POP")) and node_match( - ast.BoolOp - ): + if inst_match( + ( + "JUMP_IF_TRUE_OR_POP", + "JUMP_IF_FALSE_OR_POP", + "POP_JUMP_IF_TRUE", + "POP_JUMP_IF_FALSE", + ) + ) and node_match(ast.BoolOp): # and/or short circuit return @@ -511,7 +689,15 @@ def node_match(node_type: Union[Type, Tuple[Type, ...]], **kwargs: Any) -> bool: and isinstance(node.parent, ast.AugAssign) ) ) and inst_match( - ("LOAD_NAME", "LOAD_FAST", "LOAD_GLOBAL", "LOAD_DEREF"), argval=mangled_name(node) + ( + "LOAD_NAME", + "LOAD_FAST", + "LOAD_FAST_CHECK", + "LOAD_GLOBAL", + "LOAD_DEREF", + "LOAD_FROM_DICT_OR_DEREF", + ), + argval=mangled_name(node), ): return @@ -520,6 +706,170 @@ def node_match(node_type: Union[Type, Tuple[Type, ...]], **kwargs: Any) -> bool: ): return + if node_match(ast.Constant) and inst_match( + "LOAD_CONST", argval=cast(ast.Constant, node).value + ): + return + + if node_match( + (ast.ListComp, ast.SetComp, ast.DictComp, ast.GeneratorExp, ast.For) + ) and inst_match(("GET_ITER", "FOR_ITER")): + return + + if sys.version_info >= (3, 12): + if node_match(ast.UnaryOp, op=ast.UAdd) and inst_match( + "CALL_INTRINSIC_1", argrepr="INTRINSIC_UNARY_POSITIVE" + ): + return + + if node_match(ast.Subscript) and inst_match("BINARY_SLICE"): + return + + if node_match(ast.ImportFrom) and inst_match( + "CALL_INTRINSIC_1", argrepr="INTRINSIC_IMPORT_STAR" + ): + return + + if ( + node_match(ast.Yield) or isinstance(node.parent, ast.GeneratorExp) + ) and inst_match("CALL_INTRINSIC_1", argrepr="INTRINSIC_ASYNC_GEN_WRAP"): + return + + if node_match(ast.Name) and inst_match("LOAD_DEREF",argval="__classdict__"): + return + + if node_match(ast.TypeVar) and ( + inst_match("CALL_INTRINSIC_1", argrepr="INTRINSIC_TYPEVAR") + or inst_match( + "CALL_INTRINSIC_2", argrepr="INTRINSIC_TYPEVAR_WITH_BOUND" + ) + or inst_match( + "CALL_INTRINSIC_2", argrepr="INTRINSIC_TYPEVAR_WITH_CONSTRAINTS" + ) + or inst_match(("STORE_FAST", "STORE_DEREF"), argrepr=mangled_name(node)) + ): + return + + if node_match(ast.TypeVarTuple) and ( + inst_match("CALL_INTRINSIC_1", argrepr="INTRINSIC_TYPEVARTUPLE") + or inst_match(("STORE_FAST", "STORE_DEREF"), argrepr=node.name) + ): + return + + if node_match(ast.ParamSpec) and ( + inst_match("CALL_INTRINSIC_1", argrepr="INTRINSIC_PARAMSPEC") + + or inst_match(("STORE_FAST", "STORE_DEREF"), argrepr=node.name)): + return + + + if node_match(ast.TypeAlias): + if( + inst_match("CALL_INTRINSIC_1", argrepr="INTRINSIC_TYPEALIAS") + or inst_match( + ("STORE_NAME", "STORE_FAST", "STORE_DEREF"), argrepr=node.name.id + ) + or inst_match("CALL") + ): + return + + + if node_match(ast.ClassDef) and node.type_params: + if inst_match( + ("STORE_DEREF", "LOAD_DEREF", "LOAD_FROM_DICT_OR_DEREF"), + argrepr=".type_params", + ): + return + + if inst_match(("STORE_FAST", "LOAD_FAST"), argrepr=".generic_base"): + return + + if inst_match( + "CALL_INTRINSIC_1", argrepr="INTRINSIC_SUBSCRIPT_GENERIC" + ): + return + + if inst_match("LOAD_DEREF",argval="__classdict__"): + return + + if node_match((ast.FunctionDef,ast.AsyncFunctionDef)) and node.type_params: + if inst_match("CALL"): + return + + if inst_match( + "CALL_INTRINSIC_2", argrepr="INTRINSIC_SET_FUNCTION_TYPE_PARAMS" + ): + return + + if inst_match("LOAD_FAST",argval=".defaults"): + return + + if inst_match("LOAD_FAST",argval=".kwdefaults"): + return + + if inst_match("STORE_NAME", argval="__classdictcell__"): + # this is a general thing + return + + + # f-strings + + if node_match(ast.JoinedStr) and ( + inst_match("LOAD_ATTR", argval="join") + or inst_match(("LIST_APPEND", "CALL")) + ): + return + + if node_match(ast.FormattedValue) and inst_match("FORMAT_VALUE"): + return + + if sys.version_info >= (3, 13): + + if inst_match("NOP"): + return + + if inst_match("TO_BOOL") and node_match(ast.BoolOp): + return + + if inst_match("CALL_KW") and node_match((ast.Call, ast.ClassDef)): + return + + if inst_match("LOAD_FAST", argval=".type_params"): + return + + if inst_match("LOAD_FAST", argval="__classdict__"): + return + + if inst_match("LOAD_FAST") and node_match( + ( + ast.FunctionDef, + ast.ClassDef, + ast.TypeAlias, + ast.TypeVar, + ast.Lambda, + ast.AsyncFunctionDef, + ) + ): + # These are loads for closure variables. + # It is difficult to check that this is actually closure variable, see: + # https://github.com/alexmojaki/executing/pull/80#discussion_r1716027317 + return + + if ( + inst_match("LOAD_FAST") + and node_match(ast.TypeAlias) + and node.name.id == instruction.argval + ): + return + + if inst_match("STORE_NAME",argval="__static_attributes__"): + # the node is the first node in the body + return + + if inst_match("LOAD_FAST") and isinstance(node.parent,ast.TypeVar): + return + + # old verifier typ: Type = type(None) @@ -541,7 +891,7 @@ def node_match(node_type: Union[Type, Tuple[Type, ...]], **kwargs: Any) -> bool: UNARY_INVERT=ast.Invert, )[op_name] extra_filter = lambda e: isinstance(cast(ast.UnaryOp, e).op, op_type) - elif op_name in ("LOAD_ATTR", "LOAD_METHOD", "LOOKUP_METHOD"): + elif op_name in ("LOAD_ATTR", "LOAD_METHOD", "LOOKUP_METHOD","LOAD_SUPER_ATTR"): typ = ast.Attribute ctx = ast.Load extra_filter = lambda e: mangled_name(e) == instruction.argval @@ -587,19 +937,45 @@ def node_match(node_type: Union[Type, Tuple[Type, ...]], **kwargs: Any) -> bool: raise VerifierFailure(title, node, instruction) - def instruction(self, index: int) -> dis.Instruction: - return self.bc_list[index // 2] + def instruction(self, index: int) -> Optional[dis.Instruction]: + return self.bc_dict.get(index,None) + + def instruction_before( + self, instruction: dis.Instruction + ) -> Optional[dis.Instruction]: + return self.bc_dict.get(instruction.offset - 2, None) def opname(self, index: int) -> str: - return self.instruction(index).opname + i=self.instruction(index) + if i is None: + return "CACHE" + return i.opname + + extra_node_types=() + if sys.version_info >= (3,12): + extra_node_types = (ast.type_param,) def find_node( self, index: int, - match_positions: Sequence[str]=("lineno", "end_lineno", "col_offset", "end_col_offset"), - typ: tuple[Type, ...]=(ast.expr, ast.stmt, ast.excepthandler, ast.pattern), + match_positions: Sequence[str] = ( + "lineno", + "end_lineno", + "col_offset", + "end_col_offset", + ), + typ: tuple[Type, ...] = ( + ast.expr, + ast.stmt, + ast.excepthandler, + ast.pattern, + *extra_node_types, + ), ) -> EnhancedAST: - position = self.instruction(index).positions + instruction = self.instruction(index) + assert instruction is not None + + position = instruction.positions assert position is not None and position.lineno is not None return only( diff --git a/src/objprint/executing/executing.py b/src/objprint/executing/executing.py index 100afa5..5cf117e 100644 --- a/src/objprint/executing/executing.py +++ b/src/objprint/executing/executing.py @@ -25,115 +25,41 @@ import __future__ import ast import dis -import functools import inspect import io import linecache import re import sys import types -from collections import defaultdict, namedtuple +from collections import defaultdict from copy import deepcopy +from functools import lru_cache from itertools import islice +from itertools import zip_longest from operator import attrgetter +from pathlib import Path from threading import RLock -from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Sized, Tuple, Type, TypeVar, Union, cast +from tokenize import detect_encoding +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Sized, Tuple, \ + Type, TypeVar, Union, cast if TYPE_CHECKING: # pragma: no cover from asttokens import ASTTokens, ASTText from asttokens.asttokens import ASTTextBase -function_node_types = (ast.FunctionDef,) # type: Tuple[Type, ...] -if sys.version_info[0] == 3: - function_node_types += (ast.AsyncFunctionDef,) +function_node_types = (ast.FunctionDef, ast.AsyncFunctionDef) # type: Tuple[Type, ...] -if sys.version_info[0] == 3: - # noinspection PyUnresolvedReferences - from functools import lru_cache - # noinspection PyUnresolvedReferences - from tokenize import detect_encoding - from itertools import zip_longest - # noinspection PyUnresolvedReferences,PyCompatibility - from pathlib import Path - - cache = lru_cache(maxsize=None) - text_type = str -else: - from lib2to3.pgen2.tokenize import detect_encoding, cookie_re as encoding_pattern # type: ignore[attr-defined] - from itertools import izip_longest as zip_longest - - class Path(object): - pass - - - def cache(func): - # type: (Callable) -> Callable - d = {} # type: Dict[Tuple, Callable] - - @functools.wraps(func) - def wrapper(*args): - # type: (Any) -> Any - if args in d: - return d[args] - result = d[args] = func(*args) - return result - - return wrapper - - - # noinspection PyUnresolvedReferences - text_type = unicode +cache = lru_cache(maxsize=None) # Type class used to expand out the definition of AST to include fields added by this library # It's not actually used for anything other than type checking though! class EnhancedAST(ast.AST): parent = None # type: EnhancedAST -if sys.version_info >= (3, 4): - # noinspection PyUnresolvedReferences - _get_instructions = dis.get_instructions - from dis import Instruction as _Instruction - - class Instruction(_Instruction): - lineno = None # type: int -else: - class Instruction(namedtuple('Instruction', 'offset argval opname starts_line')): - lineno = None # type: int - - from dis import HAVE_ARGUMENT, EXTENDED_ARG, hasconst, opname, findlinestarts, hasname - - # Based on dis.disassemble from 2.7 - # Left as similar as possible for easy diff - - def _get_instructions(co): - # type: (types.CodeType) -> Iterator[Instruction] - code = co.co_code - linestarts = dict(findlinestarts(co)) - n = len(code) - i = 0 - extended_arg = 0 - while i < n: - offset = i - c = code[i] - op = ord(c) - lineno = linestarts.get(i) - argval = None - i = i + 1 - if op >= HAVE_ARGUMENT: - oparg = ord(code[i]) + ord(code[i + 1]) * 256 + extended_arg - extended_arg = 0 - i = i + 2 - if op == EXTENDED_ARG: - extended_arg = oparg * 65536 - - if op in hasconst: - argval = co.co_consts[oparg] - elif op in hasname: - argval = co.co_names[oparg] - elif opname[op] == 'LOAD_FAST': - argval = co.co_varnames[oparg] - yield Instruction(offset, argval, opname[op], lineno) + +class Instruction(dis.Instruction): + lineno = None # type: int # Type class used to expand out the definition of AST to include fields added by this library @@ -157,7 +83,7 @@ def assert_(condition, message=""): def get_instructions(co): # type: (types.CodeType) -> Iterator[EnhancedInstruction] lineno = co.co_firstlineno - for inst in _get_instructions(co): + for inst in dis.get_instructions(co): inst = cast(EnhancedInstruction, inst) lineno = inst.starts_line or lineno assert_(lineno) @@ -224,29 +150,9 @@ def __init__(self, filename, lines): """ self.filename = filename - text = ''.join(lines) - - if not isinstance(text, text_type): - encoding = self.detect_encoding(text) - # noinspection PyUnresolvedReferences - text = text.decode(encoding) - lines = [line.decode(encoding) for line in lines] - - self.text = text + self.text = ''.join(lines) self.lines = [line.rstrip('\r\n') for line in lines] - if sys.version_info[0] == 3: - ast_text = text - else: - # In python 2 it's a syntax error to parse unicode - # with an encoding declaration, so we remove it but - # leave empty lines in its place to keep line numbers the same - ast_text = ''.join([ - '\n' if i < 2 and encoding_pattern.match(line) - else line - for i, line in enumerate(lines) - ]) - self._nodes_by_line = defaultdict(list) self.tree = None self._qualnames = {} @@ -254,7 +160,7 @@ def __init__(self, filename, lines): self._asttext = None # type: Optional[ASTText] try: - self.tree = ast.parse(ast_text, filename=filename) + self.tree = ast.parse(self.text, filename=filename) except (SyntaxError, ValueError): pass else: @@ -289,7 +195,7 @@ def for_filename( def get_lines(): # type: () -> List[str] - return linecache.getlines(cast(text_type, filename), module_globals) + return linecache.getlines(cast(str, filename), module_globals) # Save the current linecache entry, then ensure the cache is up to date. entry = linecache.cache.get(filename) # type: ignore[attr-defined] @@ -320,8 +226,7 @@ def _for_filename_and_lines(cls, filename, lines): @classmethod def lazycache(cls, frame): # type: (types.FrameType) -> None - if sys.version_info >= (3, 5): - linecache.lazycache(frame.f_code.co_filename, frame.f_globals) + linecache.lazycache(frame.f_code.co_filename, frame.f_globals) @classmethod def executing(cls, frame_or_tb): @@ -368,16 +273,15 @@ def executing(cls, frame_or_tb): node_finder = NodeFinder(frame, stmts, tree, lasti, source) node = node_finder.result decorator = node_finder.decorator + + if node: + new_stmts = {statement_containing_node(node)} + assert_(new_stmts <= stmts) + stmts = new_stmts except Exception: if TESTING: raise - assert stmts is not None - if node: - new_stmts = {statement_containing_node(node)} - assert_(new_stmts <= stmts) - stmts = new_stmts - executing_cache[key] = args = source, node, stmts, decorator return Executing(frame, *args) @@ -456,7 +360,7 @@ def _asttext_base(self): @staticmethod def decode_source(source): - # type: (Union[str, bytes]) -> text_type + # type: (Union[str, bytes]) -> str if isinstance(source, bytes): encoding = Source.detect_encoding(source) return source.decode(encoding) @@ -548,10 +452,7 @@ def add_qualname(self, node, name=None): def visit_FunctionDef(self, node, name=None): # type: (ast.AST, Optional[str]) -> None - if sys.version_info[0] == 3: - assert isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)), node - else: - assert isinstance(node, (ast.FunctionDef, ast.Lambda)), node + assert isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)), node self.add_qualname(node, name) self.stack.append('') children = [] # type: Sequence[ast.AST] @@ -684,8 +585,7 @@ def __init__(self, frame, stmts, tree, lasti, source): elif op_name in ('LOAD_NAME', 'LOAD_GLOBAL', 'LOAD_FAST', 'LOAD_DEREF', 'LOAD_CLASSDEREF'): typ = ast.Name ctx = ast.Load - if sys.version_info[0] == 3 or instruction.argval: - extra_filter = lambda e: e.id == instruction.argval + extra_filter = lambda e: e.id == instruction.argval elif op_name in ('COMPARE_OP', 'IS_OP', 'CONTAINS_OP'): typ = ast.Compare extra_filter = lambda e: len(e.ops) == 1 @@ -1257,3 +1157,4 @@ def node_linenos(node): from ._position_node_finder import PositionNodeFinder as NodeFinder else: NodeFinder = SentinelNodeFinder + diff --git a/src/objprint/executing/py.typed b/src/objprint/executing/py.typed new file mode 100644 index 0000000..e69de29