Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add transformation to dynamic schema loader #176

Merged
merged 10 commits into from
Dec 18, 2024
10 changes: 10 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2514,6 +2514,16 @@ definitions:
schema_normalization:
"$ref": "#/definitions/SchemaNormalization"
default: None
transformations:
title: Transformations
description: A list of transformations to be applied to each output record.
type: array
items:
anyOf:
- "$ref": "#/definitions/AddFields"
- "$ref": "#/definitions/CustomTransformation"
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,13 @@ class RecordSelector(BaseModel):
title="Record Filter",
)
schema_normalization: Optional[SchemaNormalization] = SchemaNormalization.None_
transformations: Optional[
List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]]
] = Field(
None,
description="A list of transformations to be applied to each output record.",
title="Transformations",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,12 @@ def create_record_selector(
SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization]
)

if model.transformations:
for transformation_model in model.transformations:
transformations.append(
self._create_component_from_model(model=transformation_model, config=config)
)

return RecordSelector(
extractor=extractor,
name=name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ def test_full_config_stream():
record_filter:
type: RecordFilter
condition: "{{ record['id'] > stream_state['id'] }}"
transformations:
- type: RemoveFields
field_pointers:
- ["extra"]
metadata_paginator:
type: DefaultPaginator
page_size_option:
Expand Down Expand Up @@ -295,11 +299,14 @@ def test_full_config_stream():
assert isinstance(stream.schema_loader, JsonFileSchemaLoader)
assert stream.schema_loader._get_json_filepath() == "./source_sendgrid/schemas/lists.json"

assert len(stream.retriever.record_selector.transformations) == 1
assert len(stream.retriever.record_selector.transformations) == 2
add_fields = stream.retriever.record_selector.transformations[0]
assert isinstance(add_fields, AddFields)
assert add_fields.fields[0].path == ["extra"]
assert add_fields.fields[0].value.string == "{{ response.to_add }}"
remove_fields = stream.retriever.record_selector.transformations[1]
assert isinstance(add_fields, AddFields)
assert remove_fields.field_pointers[0] == ["extra"]
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved

assert isinstance(stream.retriever, SimpleRetriever)
assert stream.retriever.primary_key == stream.primary_key
Expand Down Expand Up @@ -1305,6 +1312,10 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel
extractor:
$ref: "#/extractor"
field_path: ["{record_selector}"]
transformations:
- type: RemoveFields
field_pointers:
- ["extra"]
"""
parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
Expand All @@ -1326,6 +1337,7 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel
assert [fp.eval(input_config) for fp in selector.extractor._field_path] == [
expected_runtime_selector
]
assert isinstance(selector.transformations[0], RemoveFields)
assert isinstance(selector.record_filter, RecordFilter)
assert selector.record_filter.condition == "{{ record['id'] > stream_state['id'] }}"

Expand Down Expand Up @@ -2183,9 +2195,10 @@ def test_remove_fields(self):
)

assert isinstance(stream, DeclarativeStream)
expected = [
RemoveFields(field_pointers=[["path", "to", "field1"], ["path2"]], parameters={})
]
expected = (
[RemoveFields(field_pointers=[["path", "to", "field1"], ["path2"]], parameters={})] * 2
) # Entity will be present twice as we resolve parameters and add the component to both the record
# selector level and the stream level
assert stream.retriever.record_selector.transformations == expected

def test_add_fields_no_value_type(self):
Expand All @@ -2200,21 +2213,25 @@ def test_add_fields_no_value_type(self):
- path: ["field1"]
value: "static_value"
"""
expected = [
AddFields(
fields=[
AddedFieldDefinition(
path=["field1"],
value=InterpolatedString(
string="static_value", default="static_value", parameters={}
),
value_type=None,
parameters={},
)
],
parameters={},
)
]
expected = (
[
AddFields(
fields=[
AddedFieldDefinition(
path=["field1"],
value=InterpolatedString(
string="static_value", default="static_value", parameters={}
),
value_type=None,
parameters={},
)
],
parameters={},
)
]
* 2
) # Entity will be present twice as we resolve parameters and add the component to both the record
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
# selector level and the stream level
self._test_add_fields(content, expected)

def test_add_fields_value_type_is_string(self):
Expand All @@ -2230,21 +2247,25 @@ def test_add_fields_value_type_is_string(self):
value: "static_value"
value_type: string
"""
expected = [
AddFields(
fields=[
AddedFieldDefinition(
path=["field1"],
value=InterpolatedString(
string="static_value", default="static_value", parameters={}
),
value_type=str,
parameters={},
)
],
parameters={},
)
]
expected = (
[
AddFields(
fields=[
AddedFieldDefinition(
path=["field1"],
value=InterpolatedString(
string="static_value", default="static_value", parameters={}
),
value_type=str,
parameters={},
)
],
parameters={},
)
]
* 2
) # Entity will be present twice as we resolve parameters and add the component to both the record
# selector level and the stream level
self._test_add_fields(content, expected)

def test_add_fields_value_type_is_number(self):
Expand All @@ -2260,19 +2281,23 @@ def test_add_fields_value_type_is_number(self):
value: "1"
value_type: number
"""
expected = [
AddFields(
fields=[
AddedFieldDefinition(
path=["field1"],
value=InterpolatedString(string="1", default="1", parameters={}),
value_type=float,
parameters={},
)
],
parameters={},
)
]
expected = (
[
AddFields(
fields=[
AddedFieldDefinition(
path=["field1"],
value=InterpolatedString(string="1", default="1", parameters={}),
value_type=float,
parameters={},
)
],
parameters={},
)
]
* 2
) # Entity will be present twice as we resolve parameters and add the component to both the record
# selector level and the stream level
self._test_add_fields(content, expected)

def test_add_fields_value_type_is_integer(self):
Expand All @@ -2288,19 +2313,23 @@ def test_add_fields_value_type_is_integer(self):
value: "1"
value_type: integer
"""
expected = [
AddFields(
fields=[
AddedFieldDefinition(
path=["field1"],
value=InterpolatedString(string="1", default="1", parameters={}),
value_type=int,
parameters={},
)
],
parameters={},
)
]
expected = (
[
AddFields(
fields=[
AddedFieldDefinition(
path=["field1"],
value=InterpolatedString(string="1", default="1", parameters={}),
value_type=int,
parameters={},
)
],
parameters={},
)
]
* 2
) # Entity will be present twice as we resolve parameters and add the component to both the record
# selector level and the stream level
self._test_add_fields(content, expected)

def test_add_fields_value_type_is_boolean(self):
Expand Down Expand Up @@ -2328,7 +2357,7 @@ def test_add_fields_value_type_is_boolean(self):
],
parameters={},
)
]
] * 2
self._test_add_fields(content, expected)

def _test_add_fields(self, content, expected):
Expand Down
Loading