diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 65b1e0ea..8d4ded84 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -987,21 +987,7 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) -> :param config: The custom defined connector config :return: The declarative component built from the Pydantic model to be used at runtime """ - INJECTED_COMPONENTS_PY = "__injected_components_py" - - components_module: Optional[types.ModuleType] = None - if INJECTED_COMPONENTS_PY in config: - # declares a dynamic module `components` from provided text - python_text = config[INJECTED_COMPONENTS_PY] - module_name = "components" - - # Create a new module object - components_module = types.ModuleType(module_name) - # Execute the module text in the module's namespace - exec(python_text, components_module.__dict__) - # Skip insert the module into sys.modules because we pass by reference below - # sys.modules[module_name] = components_module - + components_module = self._get_components_module_object(config=config) custom_component_class = self._get_class_from_fully_qualified_class_name( full_qualified_class_name=model.class_name, components_module=components_module, @@ -1057,9 +1043,31 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) -> } return custom_component_class(**kwargs) + def _get_components_module_object( + config: Config, + ) -> None: + """Get a components module object based on the provided config. + + If custom python components is provided, this will be loaded. Otherwise, we will + attempt to load from the `components` module already imported. + """ + INJECTED_COMPONENTS_PY = "__injected_components_py" + COMPONENTS_MODULE_NAME = "components" + + components_module: types.ModuleType + if INJECTED_COMPONENTS_PY in config: + # Create a new module object and execute the provided Python code text within it + components_module = types.ModuleType(name=COMPONENTS_MODULE_NAME) + python_text = config[INJECTED_COMPONENTS_PY] + exec(python_text, components_module.__dict__) + # Skip insert the module into sys.modules because we pass by reference below + # sys.modules[module_name] = components_module + else: + components_module = importlib.import_module(name=COMPONENTS_MODULE_NAME) + def _get_class_from_fully_qualified_class_name( full_qualified_class_name: str, - components_module: Optional[types.ModuleType] = None, + components_module: types.ModuleType, ) -> Any: """ Get a class from its fully qualified name, optionally using a pre-parsed module. @@ -1075,18 +1083,17 @@ def _get_class_from_fully_qualified_class_name( ValueError: If the class cannot be loaded. """ split = full_qualified_class_name.split(".") - module_name = ".".join(split[:-1]) + module_name_full = ".".join(split[:-1]) + module_name = split[:-2] class_name = split[-1] - try: - # Use the provided module if available and if module name matches - if components_module and components_module.__name__ == module_name: - return getattr(components_module, class_name) - - # Fallback to importing the module dynamically - module = importlib.import_module(module_name) - return getattr(module, class_name) + if module_name != "components": + raise ValueError( + f"Custom components must be defined in a module named `components`. Found {module_name} instead." + ) + try: + return getattr(components_module, class_name) except (AttributeError, ModuleNotFoundError) as e: raise ValueError(f"Could not load class {full_qualified_class_name}.") from e