diff --git a/README.md b/README.md index 97cf0ae..4475ac6 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ Build with the [Meltano Target SDK](https://sdk.meltano.com). "include_process_date": true|false, "append_date_to_prefix": true|false, "partition_name_enabled": true|false, + "use_raw_stream_name": true|false, "append_date_to_prefix_grain": "day", "append_date_to_filename": true|false, "append_date_to_filename_grain": "microsecond", diff --git a/target_s3/formats/format_base.py b/target_s3/formats/format_base.py index df9caf3..163fc1c 100644 --- a/target_s3/formats/format_base.py +++ b/target_s3/formats/format_base.py @@ -67,9 +67,14 @@ def __init__(self, config: dict, context: dict, extension: str) -> None: endpoint_url=aws_config.get("aws_endpoint_override", None), ) + steam_name: str = self.context["stream_name"] self.prefix = config.get("prefix", None) self.logger = context["logger"] - self.fully_qualified_key = self.create_key() + self.fully_qualified_key = ( + f"{self.bucket}/{self.prefix}/{steam_name}" + if config.get("use_raw_stream_name") + else self.create_key() + ) self.logger.info(f"key: {self.fully_qualified_key}") @abstractmethod @@ -78,7 +83,7 @@ def _write(self, contents: str = None) -> None: # TODO: create dynamic cloud # TODO: is there a better way to handle write contents ? with open( - f"s3://{self.fully_qualified_key}.{self.extension}.{self.compression}", + f"s3://{self.fully_qualified_key}", "w", transport_params={"client": self.client}, ) as f: @@ -120,7 +125,7 @@ def create_key(self) -> str: grain = DATE_GRAIN[self.config["append_date_to_filename_grain"].lower()] file_name += f"{self.create_file_structure(batch_start, grain)}" - return f"{folder_path}{file_name}" + return f"{folder_path}{file_name}.{self.extension}.{self.compression}" def create_folder_structure( self, batch_start: datetime, grain: int, partition_name_enabled: bool diff --git a/target_s3/formats/format_jsonl.py b/target_s3/formats/format_jsonl.py new file mode 100644 index 0000000..d324a21 --- /dev/null +++ b/target_s3/formats/format_jsonl.py @@ -0,0 +1,33 @@ +from datetime import datetime + +from bson import ObjectId +from simplejson import JSONEncoder, dumps + +from target_s3.formats.format_base import FormatBase + + +class JsonSerialize(JSONEncoder): + def default(self, obj: any) -> any: + if isinstance(obj, ObjectId): + return str(obj) + if isinstance(obj, datetime): + return obj.isoformat() + else: + raise TypeError(f"Type {type(obj)} not serializable") + + +class FormatJsonl(FormatBase): + def __init__(self, config, context) -> None: + super().__init__(config, context, "jsonl") + pass + + def _prepare_records(self): + # use default behavior, no additional prep needed + return super()._prepare_records() + + def _write(self) -> None: + return super()._write('\n'.join(map(dumps, self.records))) + + def run(self) -> None: + # use default behavior, no additional run steps needed + return super().run(self.context["records"]) diff --git a/target_s3/sinks.py b/target_s3/sinks.py index f84ecb4..c1fd7c3 100644 --- a/target_s3/sinks.py +++ b/target_s3/sinks.py @@ -9,10 +9,11 @@ from target_s3.formats.format_parquet import FormatParquet from target_s3.formats.format_csv import FormatCsv from target_s3.formats.format_json import FormatJson +from target_s3.formats.format_jsonl import FormatJsonl LOGGER = logging.getLogger("target-s3") -FORMAT_TYPE = {"parquet": FormatParquet, "csv": FormatCsv, "json": FormatJson} +FORMAT_TYPE = {"parquet": FormatParquet, "csv": FormatCsv, "json": FormatJson, "jsonl": FormatJsonl} class s3Sink(BatchSink): diff --git a/target_s3/target.py b/target_s3/target.py index dc19623..66e6f0f 100644 --- a/target_s3/target.py +++ b/target_s3/target.py @@ -29,6 +29,7 @@ class Targets3(Target): allowed_values=[ "parquet", "json", + "jsonl", ], # TODO: configure this from class ), th.Property( @@ -137,6 +138,12 @@ class Targets3(Target): description="A flag indicating whether to append _process_date to record.", default=False, ), + th.Property( + "use_raw_stream_name", + th.BooleanType, + description="A flag to force the filename to be identical to the stream name.", + default=False, + ), th.Property( "append_date_to_prefix", th.BooleanType, @@ -215,4 +222,4 @@ def deserialize_json(self, line: str) -> dict: if __name__ == "__main__": - Targets3.cli() \ No newline at end of file + Targets3.cli()