diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 371e34a8..0d3a176c 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -206,7 +206,7 @@ def _group_streams( # so we need to treat them as synchronous if ( isinstance(declarative_stream, DeclarativeStream) - and name_to_stream_mapping[declarative_stream.name].get("retriever")["type"] + and name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] == "SimpleRetriever" ): incremental_sync_component_definition = name_to_stream_mapping[ @@ -215,7 +215,7 @@ def _group_streams( partition_router_component_definition = ( name_to_stream_mapping[declarative_stream.name] - .get("retriever") + .get("retriever", {}) .get("partition_router") ) is_without_partition_router_or_cursor = not bool( @@ -237,7 +237,7 @@ def _group_streams( cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( state_manager=state_manager, model_type=DatetimeBasedCursorModel, - component_definition=incremental_sync_component_definition, + component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above stream_name=declarative_stream.name, stream_namespace=declarative_stream.namespace, config=config or {}, @@ -320,10 +320,11 @@ def _group_streams( def _is_datetime_incremental_without_partition_routing( self, declarative_stream: DeclarativeStream, - incremental_sync_component_definition: Mapping[str, Any], + incremental_sync_component_definition: Mapping[str, Any] | None, ) -> bool: return ( - bool(incremental_sync_component_definition) + incremental_sync_component_definition is not None + and bool(incremental_sync_component_definition) and incremental_sync_component_definition.get("type", "") == DatetimeBasedCursorModel.__name__ and self._stream_supports_concurrent_partition_processing(