Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source Stripe: Update CDK version to prepare for low-code migration #49940

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e094cb9a-26de-4645-8761-65c0c425d1de
dockerImageTag: 5.8.2
dockerImageTag: 5.8.3
dockerRepository: airbyte/source-stripe
documentationUrl: https://docs.airbyte.com/integrations/sources/stripe
erdUrl: https://dbdocs.io/airbyteio/source-stripe?view=relationships
Expand Down
448 changes: 244 additions & 204 deletions airbyte-integrations/connectors/source-stripe/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-stripe/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "5.8.2"
version = "5.8.3"
name = "source-stripe"
description = "Source implementation for Stripe."
authors = [ "Airbyte <[email protected]>",]
Expand All @@ -18,7 +18,7 @@ include = "source_stripe"
[tool.poetry.dependencies]
python = "^3.10,<3.12"
pendulum = "==2.1.2"
airbyte-cdk = "^5"
airbyte-cdk = "^6"

[tool.poetry.scripts]
source-stripe = "source_stripe.run:run"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .parent_incremental_stripe_sub_stream_error_handler import ParentIncrementalStripeSubStreamErrorHandler
from .stripe_error_handler import StripeErrorHandler

__all__ = ['StripeErrorHandler', 'ParentIncrementalStripeSubStreamErrorHandler']
__all__ = ["StripeErrorHandler", "ParentIncrementalStripeSubStreamErrorHandler"]
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .parent_incremental_stripe_sub_stream_error_mapping import PARENT_INCREMENTAL_STRIPE_SUB_STREAM_ERROR_MAPPING

__all__ = ['PARENT_INCREMENTAL_STRIPE_SUB_STREAM_ERROR_MAPPING']
__all__ = ["PARENT_INCREMENTAL_STRIPE_SUB_STREAM_ERROR_MAPPING"]
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pendulum
import requests

from airbyte_cdk import BackoffStrategy
from airbyte_cdk import BackoffStrategy, StreamSlice
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import ExponentialBackoffStrategy
from airbyte_cdk.sources.streams.checkpoint import Cursor
Expand Down Expand Up @@ -294,7 +294,7 @@ def stream_slices(
if start_ts >= pendulum.now().int_timestamp:
return []
for start, end in self.chunk_dates(start_ts):
yield {"created[gte]": start, "created[lte]": end}
yield StreamSlice(partition={}, cursor_slice={"created[gte]": start, "created[lte]": end})

def get_start_timestamp(self, stream_state) -> int:
start_point = self.start_date
Expand Down Expand Up @@ -418,7 +418,7 @@ def stream_slices(
) -> Iterable[Optional[Mapping[str, Any]]]:
# When reading from a stream, a `read_records` is called once per slice.
# We yield a single slice here because we don't want to make duplicate calls for event based incremental syncs.
yield {}
yield StreamSlice(partition={}, cursor_slice={})

def read_event_increments(
self, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None
Expand Down Expand Up @@ -562,7 +562,11 @@ def stream_slices(
)
if incremental_slices:
parent_records = HttpSubStream.stream_slices(self, sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state)
yield from (slice | rec for rec in parent_records for slice in incremental_slices)
yield from (
StreamSlice(partition=parent_record, cursor_slice=_slice)
for parent_record in parent_records
for _slice in incremental_slices
)
else:
yield from []

Expand Down Expand Up @@ -778,7 +782,7 @@ def stream_slices(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
)
for record in parent_records:
yield {"parent": record}
yield StreamSlice(partition={"parent": record}, cursor_slice={})

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
return {self.cursor_field: max(current_stream_state.get(self.cursor_field, 0), latest_record[self.cursor_field])}
Expand All @@ -798,6 +802,9 @@ def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
if not stream_state:
yield from HttpSubStream.stream_slices(self, sync_mode, cursor_field, stream_state)
yield from map(
lambda stream_slice: StreamSlice(partition=stream_slice, cursor_slice={}),
HttpSubStream.stream_slices(self, sync_mode, cursor_field, stream_state),
)
else:
yield from UpdatedCursorIncrementalStripeStream.stream_slices(self, sync_mode, cursor_field, stream_state)
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ def payment_methods_endpoint(cls, customer_id: str, account_id: str, client_secr
def payouts_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder":
return cls("payouts", account_id, client_secret)

@classmethod
def setup_attempts_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder":
return cls("setup_attempts", account_id, client_secret)

@classmethod
def setup_intents_endpoint(cls, account_id: str, client_secret: str) -> "StripeRequestBuilder":
return cls("setup_intents", account_id, client_secret)

@classmethod
def persons_endpoint(
cls,
Expand Down Expand Up @@ -91,6 +99,7 @@ def __init__(self, resource: str, account_id: str, client_secret: str) -> None:
self._limit: Optional[int] = None
self._object: Optional[str] = None
self._payout: Optional[str] = None
self._setup_intent: Optional[str] = None
self._starting_after_id: Optional[str] = None
self._types: List[str] = []
self._expands: List[str] = []
Expand Down Expand Up @@ -131,6 +140,10 @@ def with_payout(self, payout: str) -> "StripeRequestBuilder":
self._payout = payout
return self

def with_setup_intent(self, setup_intent: str) -> "StripeRequestBuilder":
self._setup_intent = setup_intent
return self

def build(self) -> HttpRequest:
query_params = {}
if self._created_gte:
Expand All @@ -152,6 +165,8 @@ def build(self) -> HttpRequest:
query_params["payout"] = self._payout
if self._expands:
query_params["expand[]"] = self._expands
if self._setup_intent:
query_params["setup_intent"] = self._setup_intent

if self._any_query_params:
if query_params:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def test_given_no_initial_state_when_read_then_return_state_based_on_cursor_fiel
output = self._read(_config().with_start_date(_A_START_DATE), _NO_STATE)
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(created=int(_NOW.timestamp()))
assert most_recent_state.stream_state == AirbyteStateBlob(created=int(cursor_value))

@HttpMocker()
def test_given_state_when_read_then_use_state_for_query_params(self, http_mocker: HttpMocker) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
from unittest import TestCase

import freezegun
from source_stripe import SourceStripe

from airbyte_cdk.models import AirbyteStateBlob, ConfiguredAirbyteCatalog, StreamDescriptor, SyncMode
from airbyte_cdk.sources.source import TState
from airbyte_cdk.test.catalog_builder import CatalogBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read
from airbyte_cdk.test.mock_http import HttpMocker
from airbyte_cdk.test.mock_http.response_builder import (
FieldPath,
HttpResponseBuilder,
NestedPath,
RecordBuilder,
create_record_builder,
create_response_builder,
find_template,
)
from airbyte_cdk.test.state_builder import StateBuilder
from integration.config import ConfigBuilder
from integration.pagination import StripePaginationStrategy
from integration.request_builder import StripeRequestBuilder


_EVENT_TYPES = [
"setup_intent.canceled",
"setup_intent.created",
"setup_intent.requires_action",
"setup_intent.setup_failed",
"setup_intent.succeeded",
]

_DATA_FIELD = NestedPath(["data", "object"])
_STREAM_NAME = "setup_attempts"
_NOW = datetime.now(timezone.utc)
_A_START_DATE = _NOW - timedelta(days=60)
_ACCOUNT_ID = "account_id"
_CLIENT_SECRET = "client_secret"
_SETUP_INTENT_ID_1 = "setup_intent_id_1"
_SETUP_INTENT_ID_2 = "setup_intent_id_2"
_NO_STATE = StateBuilder().build()
_AVOIDING_INCLUSIVE_BOUNDARIES = timedelta(seconds=1)


def _setup_attempts_request(setup_intent: str) -> StripeRequestBuilder:
return StripeRequestBuilder.setup_attempts_endpoint(_ACCOUNT_ID, _CLIENT_SECRET).with_setup_intent(setup_intent)


def _setup_intents_request() -> StripeRequestBuilder:
return StripeRequestBuilder.setup_intents_endpoint(_ACCOUNT_ID, _CLIENT_SECRET)


def _events_request() -> StripeRequestBuilder:
return StripeRequestBuilder.events_endpoint(_ACCOUNT_ID, _CLIENT_SECRET)


def _config() -> ConfigBuilder:
return ConfigBuilder().with_start_date(_NOW - timedelta(days=75)).with_account_id(_ACCOUNT_ID).with_client_secret(_CLIENT_SECRET)


def _catalog(sync_mode: SyncMode) -> ConfiguredAirbyteCatalog:
return CatalogBuilder().with_stream(_STREAM_NAME, sync_mode).build()


def _source(catalog: ConfiguredAirbyteCatalog, config: Dict[str, Any], state: Optional[TState]) -> SourceStripe:
return SourceStripe(catalog, config, state)


def _an_event() -> RecordBuilder:
return create_record_builder(
find_template("events", __file__),
FieldPath("data"),
record_id_path=FieldPath("id"),
record_cursor_path=FieldPath("created"),
)


def _events_response() -> HttpResponseBuilder:
return create_response_builder(find_template("events", __file__), FieldPath("data"), pagination_strategy=StripePaginationStrategy())


def _a_setup_attempt() -> RecordBuilder:
return create_record_builder(
find_template(_STREAM_NAME, __file__),
FieldPath("data"),
record_id_path=FieldPath("id"),
record_cursor_path=FieldPath("created"),
)


def _setup_attempts_response() -> HttpResponseBuilder:
return create_response_builder(find_template(_STREAM_NAME, __file__), FieldPath("data"), pagination_strategy=StripePaginationStrategy())


def _a_setup_intent() -> RecordBuilder:
return create_record_builder(
find_template("setup_intents", __file__),
FieldPath("data"),
record_id_path=FieldPath("id"),
record_cursor_path=FieldPath("created"),
)


def _setup_intents_response() -> HttpResponseBuilder:
return create_response_builder(
find_template("setup_intents", __file__), FieldPath("data"), pagination_strategy=StripePaginationStrategy()
)


def _read(
config_builder: ConfigBuilder, sync_mode: SyncMode, state: Optional[Dict[str, Any]] = None, expecting_exception: bool = False
) -> EntrypointOutput:
catalog = _catalog(sync_mode)
config = config_builder.build()
return read(_source(catalog, config, state), config, catalog, state, expecting_exception)


@freezegun.freeze_time(_NOW.isoformat())
class FullRefreshTest(TestCase):
@HttpMocker()
def test_given_one_page_when_read_then_return_records(self, http_mocker: HttpMocker) -> None:
http_mocker.get(
_setup_intents_request().with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(),
_setup_intents_response()
.with_record(_a_setup_intent().with_id(_SETUP_INTENT_ID_1))
.with_record(_a_setup_intent().with_id(_SETUP_INTENT_ID_2))
.build(),
)
http_mocker.get(
_setup_attempts_request(_SETUP_INTENT_ID_1).with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(),
_setup_attempts_response().with_record(_a_setup_attempt()).build(),
)
http_mocker.get(
_setup_attempts_request(_SETUP_INTENT_ID_2).with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(),
_setup_attempts_response().with_record(_a_setup_attempt()).with_record(_a_setup_attempt()).build(),
)

output = self._read(_config().with_start_date(_A_START_DATE))

assert len(output.records) == 3

def _read(self, config: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput:
return _read(config, SyncMode.full_refresh, expecting_exception=expecting_exception)


@freezegun.freeze_time(_NOW.isoformat())
class IncrementalTest(TestCase):
@HttpMocker()
def test_given_no_state_when_read_then_use_cards_endpoint(self, http_mocker: HttpMocker) -> None:
http_mocker.get(
_setup_intents_request().with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(),
_setup_intents_response()
.with_record(_a_setup_intent().with_id(_SETUP_INTENT_ID_1))
.with_record(_a_setup_intent().with_id(_SETUP_INTENT_ID_2))
.build(),
)
http_mocker.get(
_setup_attempts_request(_SETUP_INTENT_ID_1).with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(),
_setup_attempts_response().with_record(_a_setup_attempt()).build(),
)
http_mocker.get(
_setup_attempts_request(_SETUP_INTENT_ID_2).with_created_gte(_A_START_DATE).with_created_lte(_NOW).with_limit(100).build(),
_setup_attempts_response().with_record(_a_setup_attempt()).with_record(_a_setup_attempt()).build(),
)

output = self._read(_config().with_start_date(_A_START_DATE), _NO_STATE)

assert len(output.records) == 3

@HttpMocker()
def test_given_state_when_read_then_query_events_using_types_and_state_value_plus_1(self, http_mocker: HttpMocker) -> None:
start_date = _NOW - timedelta(days=40)
state_datetime = _NOW - timedelta(days=5)
cursor_value = int(state_datetime.timestamp()) + 10
creation_datetime_of_setup_attempt = int(state_datetime.timestamp()) + 5

http_mocker.get(
_events_request()
.with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES)
.with_created_lte(_NOW)
.with_limit(100)
.with_types(_EVENT_TYPES)
.build(),
_events_response().with_record(self._a_setup_intent_event(cursor_value, _SETUP_INTENT_ID_1)).build(),
)
http_mocker.get(
_setup_attempts_request(_SETUP_INTENT_ID_1)
.with_created_gte(state_datetime + _AVOIDING_INCLUSIVE_BOUNDARIES)
.with_created_lte(_NOW)
.with_limit(100)
.build(),
_setup_attempts_response().with_record(_a_setup_attempt().with_cursor(creation_datetime_of_setup_attempt)).build(),
)

output = self._read(
_config().with_start_date(start_date),
StateBuilder().with_stream_state(_STREAM_NAME, {"created": int(state_datetime.timestamp())}).build(),
)

assert len(output.records) == 1
most_recent_state = output.most_recent_state
assert most_recent_state.stream_descriptor == StreamDescriptor(name=_STREAM_NAME)
assert most_recent_state.stream_state == AirbyteStateBlob(created=creation_datetime_of_setup_attempt)

def _a_setup_intent_event(self, cursor_value: int, setup_intent_id: str) -> RecordBuilder:
return _an_event().with_cursor(cursor_value).with_field(_DATA_FIELD, _a_setup_intent().with_id(setup_intent_id).build())

def _read(self, config: ConfigBuilder, state: Optional[Dict[str, Any]], expecting_exception: bool = False) -> EntrypointOutput:
return _read(config, SyncMode.incremental, state, expecting_exception)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"object": "list",
"url": "/v1/setup_attempts",
"has_more": false,
"data": [
{
"id": "setatt_1ErTsH2eZvKYlo2CI7ukcoF7",
"object": "setup_attempt",
"application": null,
"created": 1562004309,
"customer": null,
"flow_directions": null,
"livemode": false,
"on_behalf_of": null,
"payment_method": "pm_1ErTsG2eZvKYlo2CH0DNen59",
"payment_method_details": {
"card": {
"three_d_secure": null
},
"type": "card"
},
"setup_error": null,
"setup_intent": "seti_1ErTsG2eZvKYlo2CKaT8MITz",
"status": "succeeded",
"usage": "off_session"
}
]
}
Loading
Loading