Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(low code cdk): dynamic streams changes #132

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2dddf8e
Add component resolver and http component resolver
lazebnyi Nov 26, 2024
c12f351
Fix mypy
lazebnyi Nov 26, 2024
e2505b1
Fix formatting
lazebnyi Nov 26, 2024
c8e3509
Added dynamic stream component
lazebnyi Nov 26, 2024
6e3ecec
Fix model
lazebnyi Nov 26, 2024
b136524
Add unit tests
lazebnyi Nov 27, 2024
1d83663
Replace key with field_path and update according to review
lazebnyi Nov 29, 2024
c30b43f
Update source schema
lazebnyi Nov 29, 2024
d0d7107
Fix dynamic declarative stream schema
lazebnyi Nov 29, 2024
f6542ec
Added unittets for dyanimc stream read
lazebnyi Nov 29, 2024
6bee0aa
Merge branch 'main' into lazebnyi/add-components-resolver
lazebnyi Nov 29, 2024
13d0d0f
Fix formatting
lazebnyi Nov 29, 2024
c478df5
Update component schema
lazebnyi Nov 29, 2024
97a932a
Add caching to components resolver
lazebnyi Nov 29, 2024
ce9539c
Fix description for fields
lazebnyi Nov 29, 2024
0160353
Auto-fix lint and format issues
Nov 29, 2024
284241d
Update unit tests with Maxime comments
lazebnyi Dec 2, 2024
25e6e1f
Merge branch 'lazebnyi/add-components-resolver' of github.com:airbyte…
lazebnyi Dec 2, 2024
be478ae
Add dynamic schema loader
lazebnyi Dec 3, 2024
520998a
Revert imports
lazebnyi Dec 3, 2024
edb52e7
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 3, 2024
7387131
Auto-fix lint and format issues
Dec 3, 2024
c6dcbc8
Updated version for manifest in unit tests
lazebnyi Dec 3, 2024
890eec1
Added details to ComponentMappingDefinition doc string
lazebnyi Dec 3, 2024
807d23e
Fix edge case validation
lazebnyi Dec 3, 2024
59c5c7f
Fix mypy
lazebnyi Dec 3, 2024
ffee00f
Merge branch 'lazebnyi/add-dynamic-schema-loader' of github.com:airby…
lazebnyi Dec 3, 2024
13441ca
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 3, 2024
227325f
Update after review
lazebnyi Dec 3, 2024
fd44be1
Merge branch 'lazebnyi/add-dynamic-schema-loader' of github.com:airby…
lazebnyi Dec 3, 2024
05e4f74
Add default value for schema pointer
lazebnyi Dec 3, 2024
5a42564
Merge branch 'lazebnyi/add-dynamic-schema-loader' into daryna/test-dy…
darynaishchenko Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,13 @@ def _group_streams(

state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later

name_to_stream_mapping = {
stream["name"]: stream for stream in self.resolved_manifest["streams"]
}
# Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
# and this is validated during the initialization of the source.
streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
self._source_config, config
)

name_to_stream_mapping = {stream["name"]: stream for stream in streams}

for declarative_stream in self.streams(config=config):
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
Expand Down
221 changes: 202 additions & 19 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ version: 1.0.0
required:
- type
- check
- streams
- version
anyOf:
- required:
- streams
- required:
- dynamic_streams
properties:
type:
type: string
Expand All @@ -19,6 +23,10 @@ properties:
type: array
items:
"$ref": "#/definitions/DeclarativeStream"
dynamic_streams:
type: array
items:
"$ref": "#/definitions/DynamicDeclarativeStream"
version:
type: string
description: The version of the Airbyte CDK used to build and test the source.
Expand Down Expand Up @@ -1321,7 +1329,7 @@ definitions:
type: array
items:
- type: string
interpolation_content:
interpolation_context:
- config
examples:
- ["data"]
Expand Down Expand Up @@ -1676,6 +1684,92 @@ definitions:
$parameters:
type: object
additionalProperties: true
TypesMap:
title: Types Map
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type.
type: object
required:
- target_type
- current_type
properties:
target_type:
anyOf:
- type: string
- type: array
items:
type: string
current_type:
anyOf:
- type: string
- type: array
items:
type: string
SchemaTypeIdentifier:
title: Schema Type Identifier
description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing.
type: object
required:
- key_pointer
properties:
type:
type: string
enum: [SchemaTypeIdentifier]
schema_pointer:
title: Schema Path
description: List of nested fields defining the schema field path to extract. Defaults to [].
type: array
default: []
items:
- type: string
interpolation_content:
- config
key_pointer:
title: Key Path
description: List of potentially nested fields describing the full path of the field key to extract.
type: array
items:
- type: string
interpolation_content:
- config
type_pointer:
title: Type Path
description: List of potentially nested fields describing the full path of the field type to extract.
type: array
items:
- type: string
interpolation_content:
- config
types_mapping:
type: array
items:
- "$ref": "#/definitions/TypesMap"
$parameters:
type: object
additionalProperties: true
DynamicSchemaLoader:
title: Dynamic Schema Loader
description: (This component is experimental. Use at your own risk.) Loads a schema by extracting data from retrieved records.
type: object
required:
- type
- retriever
- schema_type_identifier
properties:
type:
type: string
enum: [DynamicSchemaLoader]
retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
$parameters:
type: object
additionalProperties: true
InlineSchemaLoader:
title: Inline Schema Loader
description: Loads a schema that is defined directly in the manifest file.
Expand Down Expand Up @@ -2057,17 +2151,18 @@ definitions:
The DeclarativeOAuth Specific URL templated string to obtain the `access_token`, `refresh_token` etc.
The placeholders are replaced during the processing to provide neccessary values.
examples:
- access_token_url: https://auth.host.com/oauth2/token?{client_id_key}={{client_id_key}}&{client_secret_key}={{client_secret_key}}&{auth_code_key}={{auth_code_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}
- access_token_url: https://auth.host.com/oauth2/token?{client_id_key}={{client_id_key}}&{client_secret_key}={{client_secret_key}}&{auth_code_key}={{auth_code_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}
access_token_headers:
title: (Optional) DeclarativeOAuth Access Token Headers
type: object
additionalProperties: true
description: |-
The DeclarativeOAuth Specific optional headers to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.
examples:
- access_token_headers: {
"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"
}
- access_token_headers:
{
"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}",
}
access_token_params:
title: (Optional) DeclarativeOAuth Access Token Query Params (Json Encoded)
type: object
Expand All @@ -2076,18 +2171,19 @@ definitions:
The DeclarativeOAuth Specific optional query parameters to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.
When this property is provided, the query params will be encoded as `Json` and included in the outgoing API request.
examples:
- access_token_params: {
"{auth_code_key}": "{{auth_code_key}}",
"{client_id_key}": "{{client_id_key}}",
"{client_secret_key}": "{{client_secret_key}}"
}
- access_token_params:
{
"{auth_code_key}": "{{auth_code_key}}",
"{client_id_key}": "{{client_id_key}}",
"{client_secret_key}": "{{client_secret_key}}",
}
extract_output:
title: DeclarativeOAuth Extract Output
type: array
items:
type: string
description: |-
The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config.
The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config.
examples:
- extract_output: ["access_token", "refresh_token", "other_field"]
state:
Expand All @@ -2099,17 +2195,14 @@ definitions:
- max
description: |-
The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,
including length and complexity.
including length and complexity.
properties:
min:
type: integer
max:
type: integer
examples:
- state: {
"min": 7,
"max": 128,
}
- state: { "min": 7, "max": 128 }
client_id_key:
title: (Optional) DeclarativeOAuth Client ID Key Override
type: string
Expand All @@ -2135,14 +2228,14 @@ definitions:
title: (Optional) DeclarativeOAuth State Key Override
type: string
description: |-
The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider.
The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider.
examples:
- state_key: "my_custom_state_key_key_name"
auth_code_key:
title: (Optional) DeclarativeOAuth Auth Code Key Override
type: string
description: |-
The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider.
The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider.
examples:
- auth_code_key: "my_custom_auth_code_key_name"
redirect_uri_key:
Expand Down Expand Up @@ -2896,6 +2989,96 @@ definitions:
$parameters:
type: object
additionalProperties: true
ComponentMappingDefinition:
title: Component Mapping Definition
description: (This component is experimental. Use at your own risk.) Specifies a mapping definition to update or add fields in a record or configuration. This allows dynamic mapping of data by interpolating values into the template based on provided contexts.
type: object
required:
- type
- field_path
- value
properties:
type:
type: string
enum: [ComponentMappingDefinition]
field_path:
title: Field Path
description: A list of potentially nested fields indicating the full path where value will be added or updated.
type: array
items:
- type: string
interpolation_context:
- config
- components_values
- stream_template_config
examples:
- ["data"]
- ["data", "records"]
- ["data", "{{ parameters.name }}"]
- ["data", "*", "record"]
value:
title: Value
description: The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime.
type: string
interpolation_context:
- config
- stream_template_config
- components_values
examples:
- "{{ components_values['updates'] }}"
- "{{ components_values['MetaData']['LastUpdatedTime'] }}"
- "{{ config['segment_id'] }}"
value_type:
title: Value Type
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
"$ref": "#/definitions/ValueType"
$parameters:
type: object
additionalProperties: true
HttpComponentsResolver:
type: object
description: (This component is experimental. Use at your own risk.) Component resolve and populates stream templates with components fetched via an HTTP retriever.
properties:
type:
type: string
enum: [HttpComponentsResolver]
retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
components_mapping:
type: array
items:
"$ref": "#/definitions/ComponentMappingDefinition"
$parameters:
type: object
additionalProperties: true
required:
- type
- retriever
- components_mapping
DynamicDeclarativeStream:
type: object
description: (This component is experimental. Use at your own risk.) A component that described how will be created declarative streams based on stream template.
properties:
type:
type: string
enum: [DynamicDeclarativeStream]
stream_template:
title: Stream Template
description: Reference to the stream template.
"$ref": "#/definitions/DeclarativeStream"
components_resolver:
title: Components Resolver
description: Component resolve and populates stream templates with components values.
"$ref": "#/definitions/HttpComponentsResolver"
required:
- type
- stream_template
- components_resolver
interpolation:
variables:
- title: config
Expand Down
Loading
Loading