From cd1bd1c7119a31986b85c64cd88053cd4400dd1d Mon Sep 17 00:00:00 2001 From: Brian Lai <51336873+brianjlai@users.noreply.github.com> Date: Tue, 3 Dec 2024 09:04:53 -0500 Subject: [PATCH] feat(Low-Code Concurrent CDK): Allow non-incremental substreams and list based partition router streams with parents to be processed by the concurrent cdk (#89) --- .../concurrent_declarative_source.py | 64 +++++- .../parsers/model_to_component_factory.py | 4 + .../streams/concurrent/default_stream.py | 1 + .../declarative/decoders/test_json_decoder.py | 2 +- .../test_concurrent_declarative_source.py | 191 +++++++++++++----- .../scenarios/stream_facade_scenarios.py | 8 + ...hread_based_concurrent_stream_scenarios.py | 7 + .../streams/concurrent/test_default_stream.py | 4 + 8 files changed, 226 insertions(+), 55 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index b9405a8e..40fd23ed 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -49,6 +49,7 @@ from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( AlwaysAvailableAvailabilityStrategy, ) +from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream from airbyte_cdk.sources.types import Config, StreamState @@ -69,6 +70,15 @@ def __init__( component_factory: Optional[ModelToComponentFactory] = None, **kwargs: Any, ) -> None: + # To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic + # cursors. We do this by no longer automatically instantiating RFR cursors when converting + # the declarative models into runtime components. Concurrent sources will continue to checkpoint + # incremental streams running in full refresh. + component_factory = component_factory or ModelToComponentFactory( + emit_connector_builder_messages=emit_connector_builder_messages, + disable_resumable_full_refresh=True, + ) + super().__init__( source_config=source_config, debug=debug, @@ -191,13 +201,24 @@ def _group_streams( # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, # so we need to treat them as synchronous if isinstance(declarative_stream, DeclarativeStream): - datetime_based_cursor_component_definition = name_to_stream_mapping[ + incremental_sync_component_definition = name_to_stream_mapping[ declarative_stream.name ].get("incremental_sync") + partition_router_component_definition = ( + name_to_stream_mapping[declarative_stream.name] + .get("retriever") + .get("partition_router") + ) + + is_substream_without_incremental = ( + partition_router_component_definition + and not incremental_sync_component_definition + ) + if ( - datetime_based_cursor_component_definition - and datetime_based_cursor_component_definition.get("type", "") + incremental_sync_component_definition + and incremental_sync_component_definition.get("type", "") == DatetimeBasedCursorModel.__name__ and self._stream_supports_concurrent_partition_processing( declarative_stream=declarative_stream @@ -213,7 +234,7 @@ def _group_streams( self._constructor.create_concurrent_cursor_from_datetime_based_cursor( state_manager=state_manager, model_type=DatetimeBasedCursorModel, - component_definition=datetime_based_cursor_component_definition, + component_definition=incremental_sync_component_definition, stream_name=declarative_stream.name, stream_namespace=declarative_stream.namespace, config=config or {}, @@ -247,6 +268,41 @@ def _group_streams( cursor=cursor, ) ) + elif is_substream_without_incremental and hasattr( + declarative_stream.retriever, "stream_slicer" + ): + partition_generator = StreamSlicerPartitionGenerator( + DeclarativePartitionFactory( + declarative_stream.name, + declarative_stream.get_json_schema(), + self._retriever_factory( + name_to_stream_mapping[declarative_stream.name], + config, + {}, + ), + self.message_repository, + ), + declarative_stream.retriever.stream_slicer, + ) + + final_state_cursor = FinalStateCursor( + stream_name=declarative_stream.name, + stream_namespace=declarative_stream.namespace, + message_repository=self.message_repository, + ) + + concurrent_streams.append( + DefaultStream( + partition_generator=partition_generator, + name=declarative_stream.name, + json_schema=declarative_stream.get_json_schema(), + availability_strategy=AlwaysAvailableAvailabilityStrategy(), + primary_key=get_primary_key_from_stream(declarative_stream.primary_key), + cursor_field=None, + logger=self.logger, + cursor=final_state_cursor, + ) + ) else: synchronous_streams.append(declarative_stream) else: diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 089b2065..49e047d3 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -387,6 +387,7 @@ def __init__( emit_connector_builder_messages: bool = False, disable_retries: bool = False, disable_cache: bool = False, + disable_resumable_full_refresh: bool = False, message_repository: Optional[MessageRepository] = None, ): self._init_mappings() @@ -395,6 +396,7 @@ def __init__( self._emit_connector_builder_messages = emit_connector_builder_messages self._disable_retries = disable_retries self._disable_cache = disable_cache + self._disable_resumable_full_refresh = disable_resumable_full_refresh self._message_repository = message_repository or InMemoryMessageRepository( # type: ignore self._evaluate_log_level(emit_connector_builder_messages) ) @@ -1339,6 +1341,8 @@ def _merge_stream_slicers( if model.incremental_sync else None ) + elif self._disable_resumable_full_refresh: + return stream_slicer elif stream_slicer: # For the Full-Refresh sub-streams, we use the nested `ChildPartitionResumableFullRefreshCursor` return PerPartitionCursor( diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index eb94ebba..7679a1eb 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -67,6 +67,7 @@ def as_airbyte_stream(self) -> AirbyteStream: name=self.name, json_schema=dict(self._json_schema), supported_sync_modes=[SyncMode.full_refresh], + is_resumable=False, ) if self._namespace: diff --git a/unit_tests/sources/declarative/decoders/test_json_decoder.py b/unit_tests/sources/declarative/decoders/test_json_decoder.py index bb2dd0c9..7efaa6fb 100644 --- a/unit_tests/sources/declarative/decoders/test_json_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_json_decoder.py @@ -54,7 +54,7 @@ def test_jsonl_decoder(requests_mock, response_body, expected_json): def large_event_response_fixture(): data = {"email": "email1@example.com"} jsonl_string = f"{json.dumps(data)}\n" - lines_in_response = 2_000_000 # ≈ 58 MB of response + lines_in_response = 2 # ≈ 58 MB of response dir_path = os.path.dirname(os.path.realpath(__file__)) file_path = f"{dir_path}/test_response.txt" with open(file_path, "w") as file: diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index f9f89fae..70a01a2c 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -313,7 +313,7 @@ "party_members_skills_stream": { "$ref": "#/definitions/base_stream", "retriever": { - "$ref": "#/definitions/base_incremental_stream/retriever", + "$ref": "#/definitions/base_stream/retriever", "record_selector": {"$ref": "#/definitions/selector"}, "partition_router": { "type": "SubstreamPartitionRouter", @@ -350,14 +350,121 @@ }, }, }, + "arcana_personas_stream": { + "$ref": "#/definitions/base_stream", + "retriever": { + "$ref": "#/definitions/base_stream/retriever", + "record_selector": {"$ref": "#/definitions/selector"}, + "partition_router": { + "type": "ListPartitionRouter", + "cursor_field": "arcana_id", + "values": [ + "Fool", + "Magician", + "Priestess", + "Empress", + "Emperor", + "Hierophant", + "Lovers", + "Chariot", + "Justice", + "Hermit", + "Fortune", + "Strength", + "Hanged Man", + "Death", + "Temperance", + "Devil", + "Tower", + "Star", + "Moon", + "Sun", + "Judgement", + "World", + ], + }, + }, + "$parameters": { + "name": "arcana_personas", + "primary_key": "id", + "path": "/arcanas/{{stream_slice.arcana_id}}/personas", + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the persona", + "type": ["null", "string"], + }, + "arcana_id": { + "description": "The associated arcana tarot for this persona", + "type": ["null", "string"], + }, + }, + }, + }, + }, + "palace_enemies_stream": { + "$ref": "#/definitions/base_incremental_stream", + "retriever": { + "$ref": "#/definitions/base_incremental_stream/retriever", + "record_selector": {"$ref": "#/definitions/selector"}, + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "type": "ParentStreamConfig", + "stream": "#/definitions/palaces_stream", + "parent_key": "id", + "partition_field": "palace_id", + } + ], + }, + }, + "$parameters": { + "name": "palace_enemies", + "primary_key": "id", + "path": "/palaces/{{stream_slice.palace_id}}/enemies", + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the enemy persona", + "type": ["null", "string"], + }, + "palace_id": { + "description": "The palace id where this persona exists in", + "type": ["null", "string"], + }, + }, + }, + }, + }, }, "streams": [ "#/definitions/party_members_stream", "#/definitions/palaces_stream", "#/definitions/locations_stream", "#/definitions/party_members_skills_stream", + "#/definitions/arcana_personas_stream", + "#/definitions/palace_enemies_stream", ], - "check": {"stream_names": ["party_members", "palaces", "locations"]}, + "check": {"stream_names": ["party_members", "locations"]}, "concurrency_level": { "type": "ConcurrencyLevel", "default_concurrency": "{{ config['num_workers'] or 10 }}", @@ -373,7 +480,7 @@ class DeclarativeStreamDecorator(Stream): make it easier to mock behavior and test how low-code streams integrate with the Concurrent CDK. NOTE: We are not using that for now but the intent was to scope the tests to only testing that streams were properly instantiated and - interacted together properly. However in practice, we had a couple surprises like `get_cursor` and `stream_slices` needed to be + interacted together properly. However, in practice, we had a couple surprises like `get_cursor` and `stream_slices` needed to be re-implemented as well. Because of that, we've move away from that in favour of doing tests that integrate up until the HTTP request. The drawback of that is that we are dependent on any change later (like if the DatetimeBasedCursor changes, this will affect those tests) but it feels less flaky than this. If we have new information in the future to infirm that, feel free to re-use this class as @@ -479,21 +586,27 @@ def test_group_streams(): concurrent_streams = source._concurrent_streams synchronous_streams = source._synchronous_streams - # 2 incremental streams - assert len(concurrent_streams) == 2 - concurrent_stream_0, concurrent_stream_1 = concurrent_streams + # 2 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental + assert len(concurrent_streams) == 4 + concurrent_stream_0, concurrent_stream_1, concurrent_stream_2, concurrent_stream_3 = ( + concurrent_streams + ) assert isinstance(concurrent_stream_0, DefaultStream) assert concurrent_stream_0.name == "party_members" assert isinstance(concurrent_stream_1, DefaultStream) assert concurrent_stream_1.name == "locations" + assert isinstance(concurrent_stream_2, DefaultStream) + assert concurrent_stream_2.name == "party_members_skills" + assert isinstance(concurrent_stream_3, DefaultStream) + assert concurrent_stream_3.name == "arcana_personas" - # 1 full refresh stream, 1 substream + # 1 full refresh stream, 1 substream w/ incremental assert len(synchronous_streams) == 2 synchronous_stream_0, synchronous_stream_1 = synchronous_streams assert isinstance(synchronous_stream_0, DeclarativeStream) assert synchronous_stream_0.name == "palaces" assert isinstance(synchronous_stream_1, DeclarativeStream) - assert synchronous_stream_1.name == "party_members_skills" + assert synchronous_stream_1.name == "palace_enemies" @freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) @@ -609,7 +722,14 @@ def test_discover(): """ Verifies that the ConcurrentDeclarativeSource discover command returns concurrent and synchronous catalog definitions """ - expected_stream_names = ["party_members", "palaces", "locations", "party_members_skills"] + expected_stream_names = [ + "party_members", + "palaces", + "locations", + "party_members_skills", + "arcana_personas", + "palace_enemies", + ] source = ConcurrentDeclarativeSource( source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None @@ -617,11 +737,13 @@ def test_discover(): actual_catalog = source.discover(logger=source.logger, config=_CONFIG) - assert len(actual_catalog.streams) == 4 + assert len(actual_catalog.streams) == 6 assert actual_catalog.streams[0].name in expected_stream_names assert actual_catalog.streams[1].name in expected_stream_names assert actual_catalog.streams[2].name in expected_stream_names assert actual_catalog.streams[3].name in expected_stream_names + assert actual_catalog.streams[4].name in expected_stream_names + assert actual_catalog.streams[5].name in expected_stream_names def _mock_requests( @@ -755,7 +877,7 @@ def test_read_with_concurrent_and_synchronous_streams(): assert len(palaces_states) == 1 assert ( palaces_states[0].stream.stream_state.__dict__ - == AirbyteStateBlob(__ab_full_refresh_sync_complete=True).__dict__ + == AirbyteStateBlob(__ab_no_cursor_state_message=True).__dict__ ) # Expects 3 records, 3 slices, 3 records in slice @@ -765,43 +887,11 @@ def test_read_with_concurrent_and_synchronous_streams(): party_members_skills_states = get_states_for_stream( stream_name="party_members_skills", messages=messages ) - assert len(party_members_skills_states) == 3 - assert party_members_skills_states[0].stream.stream_state.__dict__ == { - "states": [ - { - "partition": {"parent_slice": {}, "party_member_id": "amamiya"}, - "cursor": {"__ab_full_refresh_sync_complete": True}, - }, - ] - } - assert party_members_skills_states[1].stream.stream_state.__dict__ == { - "states": [ - { - "partition": {"parent_slice": {}, "party_member_id": "amamiya"}, - "cursor": {"__ab_full_refresh_sync_complete": True}, - }, - { - "partition": {"parent_slice": {}, "party_member_id": "nijima"}, - "cursor": {"__ab_full_refresh_sync_complete": True}, - }, - ] - } - assert party_members_skills_states[2].stream.stream_state.__dict__ == { - "states": [ - { - "partition": {"parent_slice": {}, "party_member_id": "amamiya"}, - "cursor": {"__ab_full_refresh_sync_complete": True}, - }, - { - "partition": {"parent_slice": {}, "party_member_id": "nijima"}, - "cursor": {"__ab_full_refresh_sync_complete": True}, - }, - { - "partition": {"parent_slice": {}, "party_member_id": "yoshizawa"}, - "cursor": {"__ab_full_refresh_sync_complete": True}, - }, - ] - } + assert len(party_members_skills_states) == 1 + assert ( + party_members_skills_states[0].stream.stream_state.__dict__ + == AirbyteStateBlob(__ab_no_cursor_state_message=True).__dict__ + ) @freezegun.freeze_time(_NOW) @@ -1276,7 +1366,7 @@ def test_streams_with_stream_state_interpolation_should_be_synchronous(): state=None, ) - assert len(source._concurrent_streams) == 0 + assert len(source._concurrent_streams) == 2 assert len(source._synchronous_streams) == 4 @@ -1573,4 +1663,5 @@ def get_states_for_stream( def disable_emitting_sequential_state_messages(source: ConcurrentDeclarativeSource) -> None: for concurrent_streams in source._concurrent_streams: # type: ignore # This is the easiest way to disable behavior from the test - concurrent_streams.cursor._connector_state_converter._is_sequential_state = False # type: ignore # see above + if isinstance(concurrent_streams.cursor, ConcurrentCursor): + concurrent_streams.cursor._connector_state_converter._is_sequential_state = False # type: ignore # see above diff --git a/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py b/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py index 7814caef..4397392e 100644 --- a/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py +++ b/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py @@ -116,6 +116,7 @@ }, "name": "stream1", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, } ] } @@ -160,6 +161,7 @@ }, "name": "stream1", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, } ] } @@ -191,6 +193,7 @@ }, "name": "stream1", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, } ] } @@ -223,6 +226,7 @@ }, "name": "stream1", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, }, { "json_schema": { @@ -233,6 +237,7 @@ }, "name": "stream2", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, }, ] } @@ -263,6 +268,7 @@ }, "name": "stream1", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, } ] } @@ -295,6 +301,7 @@ }, "name": "stream1", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, } ] } @@ -327,6 +334,7 @@ }, "name": "stream1", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, } ] } diff --git a/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py b/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py index aa5d3dfc..c6197918 100644 --- a/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py +++ b/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py @@ -310,6 +310,7 @@ }, "name": "stream1", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, } ] } @@ -349,6 +350,7 @@ "name": "stream1", "supported_sync_modes": ["full_refresh"], "source_defined_primary_key": [["id"]], + "is_resumable": False, } ] } @@ -427,6 +429,7 @@ }, "name": "stream1", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, }, { "json_schema": { @@ -438,6 +441,7 @@ }, "name": "stream2", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, }, ] } @@ -476,6 +480,7 @@ }, "name": "stream1", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, } ] } @@ -516,6 +521,7 @@ }, "name": "stream1", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, } ] } @@ -556,6 +562,7 @@ }, "name": "stream1", "supported_sync_modes": ["full_refresh"], + "is_resumable": False, } ] } diff --git a/unit_tests/sources/streams/concurrent/test_default_stream.py b/unit_tests/sources/streams/concurrent/test_default_stream.py index 25b15ca2..43502f1f 100644 --- a/unit_tests/sources/streams/concurrent/test_default_stream.py +++ b/unit_tests/sources/streams/concurrent/test_default_stream.py @@ -74,6 +74,7 @@ def test_as_airbyte_stream(self): default_cursor_field=None, source_defined_primary_key=None, namespace=None, + is_resumable=False, ) actual_airbyte_stream = self._stream.as_airbyte_stream() @@ -110,6 +111,7 @@ def test_as_airbyte_stream_with_primary_key(self): default_cursor_field=None, source_defined_primary_key=[["composite_key_1"], ["composite_key_2"]], namespace=None, + is_resumable=False, ) airbyte_stream = stream.as_airbyte_stream() @@ -146,6 +148,7 @@ def test_as_airbyte_stream_with_composite_primary_key(self): default_cursor_field=None, source_defined_primary_key=[["id_a"], ["id_b"]], namespace=None, + is_resumable=False, ) airbyte_stream = stream.as_airbyte_stream() @@ -212,6 +215,7 @@ def test_as_airbyte_stream_with_namespace(self): default_cursor_field=None, source_defined_primary_key=None, namespace="test", + is_resumable=False, ) actual_airbyte_stream = stream.as_airbyte_stream()