Skip to content

Commit

Permalink
feat(low-code concurrent): [ISSUE #10550] have streams without partit…
Browse files Browse the repository at this point in the history
…ion routers nor cursor run … (#61)
  • Loading branch information
maxi297 authored Dec 4, 2024
1 parent acb6630 commit ac6cf92
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 75 deletions.
67 changes: 43 additions & 24 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ def _group_streams(
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
# so we need to treat them as synchronous
if isinstance(declarative_stream, DeclarativeStream):
if (
isinstance(declarative_stream, DeclarativeStream)
and name_to_stream_mapping[declarative_stream.name].get("retriever")["type"]
== "SimpleRetriever"
):
incremental_sync_component_definition = name_to_stream_mapping[
declarative_stream.name
].get("incremental_sync")
Expand All @@ -210,36 +214,30 @@ def _group_streams(
.get("retriever")
.get("partition_router")
)
is_without_partition_router_or_cursor = not bool(
incremental_sync_component_definition
) and not bool(partition_router_component_definition)

is_substream_without_incremental = (
partition_router_component_definition
and not incremental_sync_component_definition
)

if (
incremental_sync_component_definition
and incremental_sync_component_definition.get("type", "")
== DatetimeBasedCursorModel.__name__
and self._stream_supports_concurrent_partition_processing(
declarative_stream=declarative_stream
)
and hasattr(declarative_stream.retriever, "stream_slicer")
and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
if self._is_datetime_incremental_without_partition_routing(
declarative_stream, incremental_sync_component_definition
):
stream_state = state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)

cursor, connector_state_converter = (
self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition,
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
stream_state=stream_state,
)
cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition,
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
stream_state=stream_state,
)

partition_generator = StreamSlicerPartitionGenerator(
Expand All @@ -263,14 +261,19 @@ def _group_streams(
json_schema=declarative_stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=cursor.cursor_field.cursor_field_key,
cursor_field=cursor.cursor_field.cursor_field_key
if hasattr(cursor, "cursor_field")
and hasattr(
cursor.cursor_field, "cursor_field_key"
) # FIXME this will need to be updated once we do the per partition
else None,
logger=self.logger,
cursor=cursor,
)
)
elif is_substream_without_incremental and hasattr(
declarative_stream.retriever, "stream_slicer"
):
elif (
is_substream_without_incremental or is_without_partition_router_or_cursor
) and hasattr(declarative_stream.retriever, "stream_slicer"):
partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
declarative_stream.name,
Expand Down Expand Up @@ -310,6 +313,22 @@ def _group_streams(

return concurrent_streams, synchronous_streams

def _is_datetime_incremental_without_partition_routing(
self,
declarative_stream: DeclarativeStream,
incremental_sync_component_definition: Mapping[str, Any],
) -> bool:
return (
bool(incremental_sync_component_definition)
and incremental_sync_component_definition.get("type", "")
== DatetimeBasedCursorModel.__name__
and self._stream_supports_concurrent_partition_processing(
declarative_stream=declarative_stream
)
and hasattr(declarative_stream.retriever, "stream_slicer")
and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
)

def _stream_supports_concurrent_partition_processing(
self, declarative_stream: DeclarativeStream
) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
Mapping,
MutableMapping,
Optional,
Tuple,
Type,
Union,
get_args,
Expand Down Expand Up @@ -760,7 +759,7 @@ def create_concurrent_cursor_from_datetime_based_cursor(
config: Config,
stream_state: MutableMapping[str, Any],
**kwargs: Any,
) -> Tuple[ConcurrentCursor, DateTimeStreamStateConverter]:
) -> ConcurrentCursor:
component_type = component_definition.get("type")
if component_definition.get("type") != model_type.__name__:
raise ValueError(
Expand Down Expand Up @@ -891,23 +890,20 @@ def create_concurrent_cursor_from_datetime_based_cursor(
if evaluated_step:
step_length = parse_duration(evaluated_step)

return (
ConcurrentCursor(
stream_name=stream_name,
stream_namespace=stream_namespace,
stream_state=stream_state,
message_repository=self._message_repository, # type: ignore # message_repository is always instantiated with a value by factory
connector_state_manager=state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
slice_boundary_fields=slice_boundary_fields,
start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
lookback_window=lookback_window,
slice_range=step_length,
cursor_granularity=cursor_granularity,
),
connector_state_converter,
return ConcurrentCursor(
stream_name=stream_name,
stream_namespace=stream_namespace,
stream_state=stream_state,
message_repository=self._message_repository, # type: ignore # message_repository is always instantiated with a value by factory
connector_state_manager=state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
slice_boundary_fields=slice_boundary_fields,
start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
lookback_window=lookback_window,
slice_range=step_length,
cursor_granularity=cursor_granularity,
)

@staticmethod
Expand Down
9 changes: 8 additions & 1 deletion airbyte_cdk/sources/streams/concurrent/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@ def get_primary_key_from_stream(
elif isinstance(stream_primary_key, str):
return [stream_primary_key]
elif isinstance(stream_primary_key, list):
if len(stream_primary_key) > 0 and all(isinstance(k, str) for k in stream_primary_key):
are_all_elements_str = all(isinstance(k, str) for k in stream_primary_key)
are_all_elements_list_of_size_one = all(
isinstance(k, list) and len(k) == 1 for k in stream_primary_key
)

if are_all_elements_str:
return stream_primary_key # type: ignore # We verified all items in the list are strings
elif are_all_elements_list_of_size_one:
return list(map(lambda x: x[0], stream_primary_key))
else:
raise ValueError(f"Nested primary keys are not supported. Found {stream_primary_key}")
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3062,7 +3062,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(
"lookback_window": "P3D",
}

concurrent_cursor, stream_state_converter = (
concurrent_cursor = (
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=connector_state_manager,
model_type=DatetimeBasedCursorModel,
Expand Down Expand Up @@ -3094,6 +3094,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields(
assert concurrent_cursor._end_provider() == expected_end
assert concurrent_cursor._concurrent_state == expected_concurrent_state

stream_state_converter = concurrent_cursor._connector_state_converter
assert isinstance(stream_state_converter, CustomFormatConcurrentStreamStateConverter)
assert stream_state_converter._datetime_format == expected_datetime_format
assert stream_state_converter._is_sequential_state
Expand Down Expand Up @@ -3194,7 +3195,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor(
stream_state={},
)
else:
concurrent_cursor, stream_state_converter = (
concurrent_cursor = (
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=connector_state_manager,
model_type=DatetimeBasedCursorModel,
Expand Down Expand Up @@ -3251,7 +3252,7 @@ def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined():
"lookback_window": "P3D",
}

concurrent_cursor, stream_state_converter = (
concurrent_cursor = (
connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=connector_state_manager,
model_type=DatetimeBasedCursorModel,
Expand Down
Loading

0 comments on commit ac6cf92

Please sign in to comment.