From b5c98dc066e31667340f9a9e5dfe64cf7891a955 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 23 Dec 2024 15:52:23 +0100 Subject: [PATCH] DateTimeNormalizerFields: add datetimenormalizer Signed-off-by: Artem Inzhyyants --- .../transformations/datetime_transformer.py | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 airbyte_cdk/sources/declarative/transformations/datetime_transformer.py diff --git a/airbyte_cdk/sources/declarative/transformations/datetime_transformer.py b/airbyte_cdk/sources/declarative/transformations/datetime_transformer.py new file mode 100644 index 00000000..ecf3d2d8 --- /dev/null +++ b/airbyte_cdk/sources/declarative/transformations/datetime_transformer.py @@ -0,0 +1,63 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from dataclasses import InitVar, dataclass +from typing import Any, Dict, List, Mapping, Optional + +import dpath +import dpath.exceptions +from sources.declarative.datetime.datetime_parser import DatetimeParser + +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.types import Config, FieldPointer, StreamSlice, StreamState + + +@dataclass +class DateTimeNormalizerFields(RecordTransformation): + """ + A transformation which removes fields from a record. The fields removed are designated using FieldPointers. + During transformation, if a field or any of its parents does not exist in the record, no error is thrown. + + Usage syntax: + + ```yaml + my_stream: + + transformations: + - type: DateTimeNormalizer + field_pointers: + - ["path", "to", "field1"] + - ["path2"] + ``` + + Attributes: + field_pointers (List[FieldPointer]): pointers to the fields that should be removed + """ + + field_pointers: List[FieldPointer] + parameters: InitVar[Mapping[str, Any]] + datetime_format: str = "" + _parser = DatetimeParser() + + def transform( + self, + record: Dict[str, Any], + config: Optional[Config] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + ) -> None: + """ + :param record: The record to be transformed + :return: the input record with the requested fields removed + """ + for pointer in self.field_pointers: + try: + current_date_time_value: str | int = dpath.get(record, pointer) + new_val = self._parser.parse( + date=current_date_time_value, format=self.datetime_format + ).isoformat() + dpath.set(record, pointer, new_val) + except dpath.exceptions.PathNotFound: + # if the (potentially nested) property does not exist, silently skip + pass