diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 1531dd27..d273681e 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1767,9 +1767,9 @@ definitions: - "$ref": "#/definitions/AsyncRetriever" - "$ref": "#/definitions/CustomRetriever" - "$ref": "#/definitions/SimpleRetriever" - transformations: - title: Transformations - description: A list of transformations to be applied to retrieved record. + schema_transformations: + title: Schema Transformations + description: A list of transformations to be applied to the schema. type: array items: anyOf: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 923bd77b..cc08ff7a 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -528,9 +528,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -824,9 +822,7 @@ class Config: access_token_headers: Optional[Dict[str, Any]] = Field( None, description="The DeclarativeOAuth Specific optional headers to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.", - examples=[ - {"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"} - ], + examples=[{"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"}], title="Access Token Headers", ) access_token_params: Optional[Dict[str, Any]] = Field( @@ -895,28 +891,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -934,9 +926,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1647,16 +1637,12 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -1845,7 +1831,7 @@ class DynamicSchemaLoader(BaseModel): description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - transformations: Optional[ + schema_transformations: Optional[ List[ Union[ AddFields, @@ -1857,8 +1843,8 @@ class DynamicSchemaLoader(BaseModel): ] ] = Field( None, - description="A list of transformations to be applied to retrieved record.", - title="Transformations", + description="A list of transformations to be applied to the schema.", + title="Schema Transformations", ) schema_type_identifier: SchemaTypeIdentifier parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -1922,11 +1908,7 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -1968,9 +1950,7 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2000,11 +1980,7 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2068,12 +2044,10 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( - Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", - ) + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 781eb76f..1e960dcf 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -236,6 +236,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( KeysToLower as KeysToLowerModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + KeysToSnakeCase as KeysToSnakeCaseModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( LegacySessionTokenAuthenticator as LegacySessionTokenAuthenticatorModel, ) @@ -390,6 +393,9 @@ from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import ( KeysToLowerTransformation, ) +from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import ( + KeysToSnakeCaseTransformation, +) from airbyte_cdk.sources.message import ( InMemoryMessageRepository, LogAppenderMessageRepositoryDecorator, @@ -472,6 +478,7 @@ def _init_mappings(self) -> None: JsonlDecoderModel: self.create_jsonl_decoder, GzipJsonDecoderModel: self.create_gzipjson_decoder, KeysToLowerModel: self.create_keys_to_lower_transformation, + KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, JsonFileSchemaLoaderModel: self.create_json_file_schema_loader, @@ -587,6 +594,11 @@ def create_keys_to_lower_transformation( ) -> KeysToLowerTransformation: return KeysToLowerTransformation() + def create_keys_to_snake_transformation( + self, model: KeysToSnakeCaseModel, config: Config, **kwargs: Any + ) -> KeysToSnakeCaseTransformation: + return KeysToSnakeCaseTransformation() + @staticmethod def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]: if not value_type: @@ -1638,10 +1650,10 @@ def create_dynamic_schema_loader( model.retriever, stream_slicer ) - transformations = [] - if model.transformations: - for transformation_model in model.transformations: - transformations.append( + schema_transformations = [] + if model.schema_transformations: + for transformation_model in model.schema_transformations: + schema_transformations.append( self._create_component_from_model(model=transformation_model, config=config) ) @@ -1651,7 +1663,7 @@ def create_dynamic_schema_loader( name="", primary_key=None, stream_slicer=combined_slicers, - transformations=transformations, + transformations=[], ) schema_type_identifier = self._create_component_from_model( model.schema_type_identifier, config=config, parameters=model.parameters or {} @@ -1659,6 +1671,7 @@ def create_dynamic_schema_loader( return DynamicSchemaLoader( retriever=retriever, config=config, + schema_transformations=schema_transformations, schema_type_identifier=schema_type_identifier, parameters=model.parameters or {}, ) diff --git a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py index 95b5bf0a..16347a43 100644 --- a/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py +++ b/airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py @@ -4,7 +4,7 @@ from copy import deepcopy -from dataclasses import InitVar, dataclass +from dataclasses import InitVar, dataclass, field from typing import Any, List, Mapping, MutableMapping, Optional, Union import dpath @@ -13,8 +13,9 @@ from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader +from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.source import ExperimentalClassWarning -from airbyte_cdk.sources.types import Config +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState AIRBYTE_DATA_TYPES: Mapping[str, Mapping[str, Any]] = { "string": {"type": ["null", "string"]}, @@ -103,6 +104,7 @@ class DynamicSchemaLoader(SchemaLoader): config: Config parameters: InitVar[Mapping[str, Any]] schema_type_identifier: SchemaTypeIdentifier + schema_transformations: List[RecordTransformation] = field(default_factory=lambda: []) def get_json_schema(self) -> Mapping[str, Any]: """ @@ -128,12 +130,27 @@ def get_json_schema(self) -> Mapping[str, Any]: ) properties[key] = value + transformed_properties = self._transform(properties, {}) + return { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", - "properties": properties, + "properties": transformed_properties, } + def _transform( + self, + properties: Mapping[str, Any], + stream_state: StreamState, + stream_slice: Optional[StreamSlice] = None, + ) -> Mapping[str, Any]: + for transformation in self.schema_transformations: + transformation.transform( + properties, # type: ignore # properties has type Mapping[str, Any], but Dict[str, Any] expected + config=self.config, + ) + return properties + def _get_key( self, raw_schema: MutableMapping[str, Any], diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index c35f917c..3e960884 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -64,6 +64,11 @@ }, "paginator": {"type": "NoPagination"}, }, + "schema_transformations": [ + { + "type": "KeysToSnakeCase", + } + ], "schema_type_identifier": { "schema_pointer": ["fields"], "key_pointer": ["name"], @@ -230,7 +235,7 @@ def test_dynamic_schema_loader_manifest_flow(): "type": "object", "properties": { "id": {"type": ["null", "integer"]}, - "name": {"type": ["null", "string"]}, + "first_name": {"type": ["null", "string"]}, "description": {"type": ["null", "string"]}, }, } @@ -245,8 +250,8 @@ def test_dynamic_schema_loader_manifest_flow(): HttpResponse( body=json.dumps( [ - {"id": 1, "name": "member_1", "description": "First member"}, - {"id": 2, "name": "member_2", "description": "Second member"}, + {"id": 1, "first_name": "member_1", "description": "First member"}, + {"id": 2, "first_name": "member_2", "description": "Second member"}, ] ) ), @@ -257,9 +262,9 @@ def test_dynamic_schema_loader_manifest_flow(): body=json.dumps( { "fields": [ - {"name": "id", "type": "integer"}, - {"name": "name", "type": "string"}, - {"name": "description", "type": "singleLineText"}, + {"name": "Id", "type": "integer"}, + {"name": "FirstName", "type": "string"}, + {"name": "Description", "type": "singleLineText"}, ] } )