Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add dynamic schema loader #104

Merged
merged 25 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
be478ae
Add dynamic schema loader
lazebnyi Dec 3, 2024
520998a
Revert imports
lazebnyi Dec 3, 2024
edb52e7
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 3, 2024
7387131
Auto-fix lint and format issues
Dec 3, 2024
807d23e
Fix edge case validation
lazebnyi Dec 3, 2024
59c5c7f
Fix mypy
lazebnyi Dec 3, 2024
ffee00f
Merge branch 'lazebnyi/add-dynamic-schema-loader' of github.com:airby…
lazebnyi Dec 3, 2024
13441ca
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 3, 2024
227325f
Update after review
lazebnyi Dec 3, 2024
fd44be1
Merge branch 'lazebnyi/add-dynamic-schema-loader' of github.com:airby…
lazebnyi Dec 3, 2024
05e4f74
Add default value for schema pointer
lazebnyi Dec 3, 2024
d41f54c
Changed helpers var name in _extract_data and typo in schema loader
lazebnyi Dec 5, 2024
4097480
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 5, 2024
f843c3f
Replace deprecated import
lazebnyi Dec 5, 2024
6bfdff3
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 5, 2024
42357e7
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 6, 2024
dcc6233
Chenged interpolation_content to interpolation_context
lazebnyi Dec 6, 2024
1b7c63f
Merge branch 'lazebnyi/add-dynamic-schema-loader' of github.com:airby…
lazebnyi Dec 6, 2024
9b87978
Formated with ruff
lazebnyi Dec 6, 2024
3da287a
Add DynamicSchemaLoader to DeclarativeStream schema
lazebnyi Dec 12, 2024
507afb6
Add test_dynamic_schema_loader_manifest_flow
lazebnyi Dec 12, 2024
d7e4873
Merge branch 'main' into lazebnyi/add-dynamic-schema-loader
lazebnyi Dec 12, 2024
9c29cee
Fix typo
lazebnyi Dec 12, 2024
1edb25e
Updated imports
lazebnyi Dec 12, 2024
0dbea5d
Merge branch 'lazebnyi/add-dynamic-schema-loader' of github.com:airby…
lazebnyi Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 107 additions & 17 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,97 @@ definitions:
$parameters:
type: object
additionalProperties: true
TypesPair:
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
title: Types Pair
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type.
type: object
required:
- target_type
- current_type
properties:
target_type:
anyOf:
- type: string
- type: array
items:
type: string
current_type:
anyOf:
- type: string
- type: array
items:
type: string
SchemaTypeIdentifier:
title: Schema Type Identifier
description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing.
type: object
required:
- schema_pointer
- key_pointer
properties:
type:
type: string
enum: [SchemaTypeIdentifier]
schema_pointer:
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
title: Schema Path
description: List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
type: array
items:
- type: string
interpolation_content:
- config
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
key_pointer:
title: Key Path
description: List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).
type: array
items:
- type: string
interpolation_content:
- config
type_pointer:
title: Type Path
description: List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).
type: array
items:
- type: string
interpolation_content:
- config
is_nullable:
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
title: Is Nullable
description: Add null to defined field type. This field is automatically set by the CDK.
type: boolean
default: true
types_map:
type: array
items:
- "$ref": "#/definitions/TypesPair"
$parameters:
type: object
additionalProperties: true
DynamicSchemaLoader:
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
title: Dynamic Schema Loader
description: (This component is experimental. Use at your own risk.) Loads a schema by extracting data from retrieved records.
type: object
required:
- type
- retriever
- schema_type_identifier
properties:
type:
type: string
enum: [DynamicSchemaLoader]
retriever:
title: Retriever
description: Component used to coordinate how records are extracted across stream slices and request pages.
anyOf:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
$parameters:
type: object
additionalProperties: true
InlineSchemaLoader:
title: Inline Schema Loader
description: Loads a schema that is defined directly in the manifest file.
Expand Down Expand Up @@ -2044,17 +2135,18 @@ definitions:
The DeclarativeOAuth Specific URL templated string to obtain the `access_token`, `refresh_token` etc.
The placeholders are replaced during the processing to provide neccessary values.
examples:
- access_token_url: https://auth.host.com/oauth2/token?{client_id_key}={{client_id_key}}&{client_secret_key}={{client_secret_key}}&{auth_code_key}={{auth_code_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}
- access_token_url: https://auth.host.com/oauth2/token?{client_id_key}={{client_id_key}}&{client_secret_key}={{client_secret_key}}&{auth_code_key}={{auth_code_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}
access_token_headers:
title: (Optional) DeclarativeOAuth Access Token Headers
type: object
additionalProperties: true
description: |-
The DeclarativeOAuth Specific optional headers to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.
examples:
- access_token_headers: {
"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"
}
- access_token_headers:
{
"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}",
}
access_token_params:
title: (Optional) DeclarativeOAuth Access Token Query Params (Json Encoded)
type: object
Expand All @@ -2063,18 +2155,19 @@ definitions:
The DeclarativeOAuth Specific optional query parameters to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.
When this property is provided, the query params will be encoded as `Json` and included in the outgoing API request.
examples:
- access_token_params: {
"{auth_code_key}": "{{auth_code_key}}",
"{client_id_key}": "{{client_id_key}}",
"{client_secret_key}": "{{client_secret_key}}"
}
- access_token_params:
{
"{auth_code_key}": "{{auth_code_key}}",
"{client_id_key}": "{{client_id_key}}",
"{client_secret_key}": "{{client_secret_key}}",
}
extract_output:
title: DeclarativeOAuth Extract Output
type: array
items:
type: string
description: |-
The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config.
The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config.
examples:
- extract_output: ["access_token", "refresh_token", "other_field"]
state:
Expand All @@ -2086,17 +2179,14 @@ definitions:
- max
description: |-
The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,
including length and complexity.
including length and complexity.
properties:
min:
type: integer
max:
type: integer
examples:
- state: {
"min": 7,
"max": 128,
}
- state: { "min": 7, "max": 128 }
client_id_key:
title: (Optional) DeclarativeOAuth Client ID Key Override
type: string
Expand All @@ -2122,14 +2212,14 @@ definitions:
title: (Optional) DeclarativeOAuth State Key Override
type: string
description: |-
The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider.
The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider.
examples:
- state_key: "my_custom_state_key_key_name"
auth_code_key:
title: (Optional) DeclarativeOAuth Auth Code Key Override
type: string
description: |-
The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider.
The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider.
examples:
- auth_code_key: "my_custom_auth_code_key_name"
redirect_uri_key:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,37 @@ class HttpResponseFilter(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class TypesPair(BaseModel):
target_type: Union[str, List[str]]
current_type: Union[str, List[str]]


class SchemaTypeIdentifier(BaseModel):
type: Optional[Literal["SchemaTypeIdentifier"]] = None
schema_pointer: List[str] = Field(
...,
description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).',
title="Schema Path",
)
key_pointer: List[str] = Field(
...,
description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).',
title="Key Path",
)
type_pointer: Optional[List[str]] = Field(
None,
description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).',
title="Type Path",
)
is_nullable: Optional[bool] = Field(
True,
description="Add null to defined field type. This field is automatically set by the CDK.",
title="Is Nullable",
)
types_map: Optional[List[TypesPair]] = None
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class InlineSchemaLoader(BaseModel):
type: Literal["InlineSchemaLoader"]
schema_: Optional[Dict[str, Any]] = Field(
Expand Down Expand Up @@ -817,13 +848,13 @@ class Config:
)
extract_output: List[str] = Field(
...,
description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config. ",
description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config.",
examples=[{"extract_output": ["access_token", "refresh_token", "other_field"]}],
title="DeclarativeOAuth Extract Output",
)
state: Optional[State] = Field(
None,
description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity. ",
description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity.",
examples=[{"state": {"min": 7, "max": 128}}],
title="(Optional) DeclarativeOAuth Configurable State Query Param",
)
Expand All @@ -847,13 +878,13 @@ class Config:
)
state_key: Optional[str] = Field(
None,
description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider. ",
description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider.",
examples=[{"state_key": "my_custom_state_key_key_name"}],
title="(Optional) DeclarativeOAuth State Key Override",
)
auth_code_key: Optional[str] = Field(
None,
description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider. ",
description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider.",
examples=[{"auth_code_key": "my_custom_auth_code_key_name"}],
title="(Optional) DeclarativeOAuth Auth Code Key Override",
)
Expand Down Expand Up @@ -1700,6 +1731,17 @@ class HttpRequester(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class DynamicSchemaLoader(BaseModel):
type: Literal["DynamicSchemaLoader"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
schema_type_identifier: SchemaTypeIdentifier
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
class ParentStreamConfig(BaseModel):
type: Literal["ParentStreamConfig"]
parent_key: str = Field(
Expand Down Expand Up @@ -1866,5 +1908,6 @@ class SubstreamPartitionRouter(BaseModel):
SelectiveAuthenticator.update_forward_refs()
DeclarativeStream.update_forward_refs()
SessionTokenAuthenticator.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
SimpleRetriever.update_forward_refs()
AsyncRetriever.update_forward_refs()
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
"AddFields.fields": "AddedFieldDefinition",
# CustomPartitionRouter
"CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig",
# DynamicSchemaLoader
"DynamicSchemaLoader.retriever": "SimpleRetriever",
# SchemaTypeIdentifier
"SchemaTypeIdentifier.types_map": "TypesPair",
}

# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to
Expand Down
Loading
Loading