Skip to content

Commit

Permalink
fix(concurrent): Fix memory issue following concurrency in source-jira (
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 authored Dec 9, 2024
1 parent e7ccf76 commit 547a4a2
Showing 1 changed file with 36 additions and 35 deletions.
71 changes: 36 additions & 35 deletions airbyte_cdk/sources/declarative/interpolation/jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import ast
from functools import cache
from typing import Any, Mapping, Optional, Tuple, Type
from typing import Any, Mapping, Optional, Set, Tuple, Type

from jinja2 import meta
from jinja2.environment import Template
Expand All @@ -27,7 +27,35 @@ class StreamPartitionAccessEnvironment(SandboxedEnvironment):
def is_safe_attribute(self, obj: Any, attr: str, value: Any) -> bool:
if attr in ["_partition"]:
return True
return super().is_safe_attribute(obj, attr, value)
return super().is_safe_attribute(obj, attr, value) # type: ignore # for some reason, mypy says 'Returning Any from function declared to return "bool"'


# These aliases are used to deprecate existing keywords without breaking all existing connectors.
_ALIASES = {
"stream_interval": "stream_slice", # Use stream_interval to access incremental_sync values
"stream_partition": "stream_slice", # Use stream_partition to access partition router's values
}

# These extensions are not installed so they're not currently a problem,
# but we're still explicitly removing them from the jinja context.
# At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks
_RESTRICTED_EXTENSIONS = ["jinja2.ext.loopcontrols"] # Adds support for break continue in loops

# By default, these Python builtin functions are available in the Jinja context.
# We explicitly remove them because of the potential security risk.
# Please add a unit test to test_jinja.py when adding a restriction.
_RESTRICTED_BUILTIN_FUNCTIONS = [
"range"
] # The range function can cause very expensive computations

_ENVIRONMENT = StreamPartitionAccessEnvironment()
_ENVIRONMENT.filters.update(**filters)
_ENVIRONMENT.globals.update(**macros)

for extension in _RESTRICTED_EXTENSIONS:
_ENVIRONMENT.extensions.pop(extension, None)
for builtin in _RESTRICTED_BUILTIN_FUNCTIONS:
_ENVIRONMENT.globals.pop(builtin, None)


class JinjaInterpolation(Interpolation):
Expand All @@ -48,34 +76,6 @@ class JinjaInterpolation(Interpolation):
Additional information on jinja templating can be found at https://jinja.palletsprojects.com/en/3.1.x/templates/#
"""

# These aliases are used to deprecate existing keywords without breaking all existing connectors.
ALIASES = {
"stream_interval": "stream_slice", # Use stream_interval to access incremental_sync values
"stream_partition": "stream_slice", # Use stream_partition to access partition router's values
}

# These extensions are not installed so they're not currently a problem,
# but we're still explicitely removing them from the jinja context.
# At worst, this is documentation that we do NOT want to include these extensions because of the potential security risks
RESTRICTED_EXTENSIONS = ["jinja2.ext.loopcontrols"] # Adds support for break continue in loops

# By default, these Python builtin functions are available in the Jinja context.
# We explicitely remove them because of the potential security risk.
# Please add a unit test to test_jinja.py when adding a restriction.
RESTRICTED_BUILTIN_FUNCTIONS = [
"range"
] # The range function can cause very expensive computations

def __init__(self) -> None:
self._environment = StreamPartitionAccessEnvironment()
self._environment.filters.update(**filters)
self._environment.globals.update(**macros)

for extension in self.RESTRICTED_EXTENSIONS:
self._environment.extensions.pop(extension, None)
for builtin in self.RESTRICTED_BUILTIN_FUNCTIONS:
self._environment.globals.pop(builtin, None)

def eval(
self,
input_str: str,
Expand All @@ -86,7 +86,7 @@ def eval(
) -> Any:
context = {"config": config, **additional_parameters}

for alias, equivalent in self.ALIASES.items():
for alias, equivalent in _ALIASES.items():
if alias in context:
# This is unexpected. We could ignore or log a warning, but failing loudly should result in fewer surprises
raise ValueError(
Expand All @@ -105,6 +105,7 @@ def eval(
raise Exception(f"Expected a string, got {input_str}")
except UndefinedError:
pass

# If result is empty or resulted in an undefined error, evaluate and return the default string
return self._literal_eval(self._eval(default, context), valid_types)

Expand Down Expand Up @@ -132,16 +133,16 @@ def _eval(self, s: Optional[str], context: Mapping[str, Any]) -> Optional[str]:
return s

@cache
def _find_undeclared_variables(self, s: Optional[str]) -> set[str]:
def _find_undeclared_variables(self, s: Optional[str]) -> Set[str]:
"""
Find undeclared variables and cache them
"""
ast = self._environment.parse(s) # type: ignore # parse is able to handle None
ast = _ENVIRONMENT.parse(s) # type: ignore # parse is able to handle None
return meta.find_undeclared_variables(ast)

@cache
def _compile(self, s: Optional[str]) -> Template:
def _compile(self, s: str) -> Template:
"""
We must cache the Jinja Template ourselves because we're using `from_string` instead of a template loader
"""
return self._environment.from_string(s) # type: ignore [arg-type] # Expected `str | Template` but passed `str | None`
return _ENVIRONMENT.from_string(s)

0 comments on commit 547a4a2

Please sign in to comment.