Skip to content

Commit

Permalink
fix(airbyte-cdk): Fix Record Filter Validation in ConcurrentDeclarati…
Browse files Browse the repository at this point in the history
…veSource (#45)
  • Loading branch information
tolik0 authored Nov 13, 2024
1 parent 4d16e1c commit c353761
Showing 1 changed file with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.extractors import RecordSelector
from airbyte_cdk.sources.declarative.extractors.record_filter import (
ClientSideIncrementalRecordFilterDecorator,
)
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
Expand Down Expand Up @@ -291,6 +294,9 @@ def _stream_supports_concurrent_partition_processing(
if isinstance(record_selector, RecordSelector):
if (
record_selector.record_filter
and not isinstance(
record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
)
and "stream_state" in record_selector.record_filter.condition
):
self.logger.warning(
Expand Down

0 comments on commit c353761

Please sign in to comment.