From 89b76ffff40eaaf0ba7d820ef8324109594fe60e Mon Sep 17 00:00:00 2001 From: Olivia Qin Date: Thu, 7 Nov 2024 10:42:16 -0500 Subject: [PATCH] fix reading to cache source name bug --- airbyte/sources/base.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index aeffc8ed..1b882b1e 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -741,14 +741,21 @@ def _read_to_cache( # noqa: PLR0913 # Too many arguments progress_tracker=progress_tracker, ) ) - cache._write_airbyte_message_stream( # noqa: SLF001 # Non-public API - stdin=airbyte_message_iterator, + + cache_record_processor = cache.get_record_processor( + source_name=self.name, catalog_provider=catalog_provider, - write_strategy=write_strategy, state_writer=state_writer, + ) + + cache_record_processor.process_airbyte_messages( + messages=airbyte_message_iterator, + write_strategy=write_strategy, progress_tracker=progress_tracker, ) + progress_tracker.log_cache_processing_complete() + # Flush the WAL, if applicable cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API