diff --git a/export_proto_to_parquet/LICENSE b/export_proto_to_parquet/LICENSE new file mode 100644 index 00000000..7307f580 --- /dev/null +++ b/export_proto_to_parquet/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 temporal.io + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/export_proto_to_parquet/README.md b/export_proto_to_parquet/README.md new file mode 100644 index 00000000..22236085 --- /dev/null +++ b/export_proto_to_parquet/README.md @@ -0,0 +1,59 @@ +# Temporal proto to parquet sample + +This is an example workflow to convert exported file from proto to parquet file. The workflow is an hourly schedule + +To use this code, make sure you have a [Temporal Cluster running](https://docs.temporal.io/docs/server/quick-install/) first. + +Create a virtual environment and activate it. On macOS and Linux, run these commands: + +``` +python3 -m venv env +source env/bin/activate +``` + +On Windows, run these commands: + +``` +python -m venv env +env\Scripts\activate +``` + +With the virtual environment configured, install the Temporal SDK: + +``` +python -m pip install temporalio +python -m pip install pandas +python -m pip install pyarrow +python -m pip install boto3 +``` + + +Run the workflow: + +```bash +python run_workflow.py +``` + +In another window, activate the virtual environment: + +On macOS or Linux, run this command: + +``` +source env/bin/activate +``` + +On Windows, run this command: + +``` +python -m venv env +env\Scripts\activate +``` + + +Then run the worker: + + +```bash +python run_worker.py +``` + diff --git a/export_proto_to_parquet/__init__.py b/export_proto_to_parquet/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/export_proto_to_parquet/data_trans_activities.py b/export_proto_to_parquet/data_trans_activities.py new file mode 100644 index 00000000..305c4bb3 --- /dev/null +++ b/export_proto_to_parquet/data_trans_activities.py @@ -0,0 +1,129 @@ +"""Module defines export s3 activities convert exported workflow history file from proto to parquet format.""" + +import json +import uuid + +import boto3 +import pandas as pd +import temporalio.api.export.v1 as export +from dataobject import DataTransAndLandActivitiyInput, GetObjectKeysActivityInput +from google.protobuf.json_format import MessageToJson +from temporalio import activity + + +class ExportS3Activities: + def __init__(self): + # Make sure you have the AWS credentials set up + self.s3 = boto3.client("s3") + + @activity.defn + async def get_object_keys( + self, activity_input: GetObjectKeysActivityInput + ) -> list[str]: + """Function that list objects by key.""" + response = self.s3.list_objects_v2( + Bucket=activity_input.bucket, Prefix=activity_input.path + ) + object_keys = [] + for obj in response.get("Contents", []): + object_keys.append(obj["Key"]) + + if len(object_keys) == 0: + raise FileNotFoundError( + f"No files found in {activity_input.bucket}/{activity_input.path}" + ) + + return object_keys + + @activity.defn + async def data_trans_and_land( + self, activity_input: DataTransAndLandActivitiyInput + ) -> str: + """Function that convert proto to parquet and save to S3.""" + key = activity_input.object_key + data = await self.get_data_from_object_key(activity_input.export_s3_bucket, key) + activity.logger.info("Convert proto to parquet for file: %s", key) + parquet_data = await self.convert_proto_to_parquet_flatten(data) + activity.logger.info("Finish transformation for file: %s", key) + + return await self.save_to_sink( + parquet_data, activity_input.output_s3_bucket, activity_input.write_path + ) + + async def get_data_from_object_key( + self, bucket_name: str, object_key: str + ) -> export.WorkflowExecutions: + """Function that get object by key.""" + v = export.WorkflowExecutions() + try: + data = self.s3.get_object(Bucket=bucket_name, Key=object_key)["Body"].read() + except Exception as e: + activity.logger.error(f"Error reading object: {e}") + raise e + + v.ParseFromString(data) + + return v + + async def convert_proto_to_parquet_flatten( + self, wfs: export.WorkflowExecutions + ) -> pd.DataFrame: + """Function that convert flatten proto data to parquet.""" + dfs = [] + for wf in wfs.items: + start_attributes = wf.history.events[ + 0 + ].workflow_execution_started_event_attributes + histories = wf.history + json_str = MessageToJson(histories) + row = { + "WorkflowID": start_attributes.workflow_id, + "RunID": start_attributes.original_execution_run_id, + "Histories": json.loads(json_str), + } + dfs.append(pd.DataFrame([row])) + + df = pd.concat(dfs, ignore_index=True) + + rows_flatten = [] + for _, row in df.iterrows(): + wf_histories_raw = row["Histories"]["events"] + worfkow_id = row["WorkflowID"] + run_id = row["RunID"] + + for history_event in wf_histories_raw: + row_flatten = pd.json_normalize(history_event, sep="_") + + skip_name = ["payloads", "."] + columns_to_drop = [ + col + for col in row_flatten.columns + for skip in skip_name + if skip in col + ] + row_flatten.drop(columns_to_drop, axis=1, inplace=True) + + row_flatten.insert(0, "WorkflowId", worfkow_id) + row_flatten.insert(1, "RunId", run_id) + + rows_flatten.append(row_flatten) + + df_flatten = pd.concat(rows_flatten, ignore_index=True) + return df_flatten + + async def save_to_sink( + self, data: pd.DataFrame, s3_bucket: str, write_path: str + ) -> str: + """Function that save object to s3 bucket.""" + write_bytes = data.to_parquet(None, compression="snappy", index=False) + s3 = boto3.client("s3") + uuid_name = uuid.uuid1() + file_name = f"{uuid_name}.parquet" + activity.logger.info("Writing to S3 bucket: %s", file_name) + try: + key = f"{write_path}/{file_name}" + s3.put_object(Bucket=s3_bucket, Key=key, Body=write_bytes) + return key + except Exception as e: + activity.logger.error(f"Error saving to sink: {e}") + raise e diff --git a/export_proto_to_parquet/dataobject.py b/export_proto_to_parquet/dataobject.py new file mode 100644 index 00000000..9e7a9a4f --- /dev/null +++ b/export_proto_to_parquet/dataobject.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass + + +@dataclass +class GetObjectKeysActivityInput: + bucket: str + path: str + + +@dataclass +class DataTransAndLandActivitiyInput: + export_s3_bucket: str + object_key: str + output_s3_bucket: str + write_path: str + + +@dataclass +class ProtoToParquetWorkflowInput: + num_delay_hour: int + export_s3_bucket: str + namespace: str + output_s3_bucket: str diff --git a/export_proto_to_parquet/run_worker.py b/export_proto_to_parquet/run_worker.py new file mode 100644 index 00000000..20bff37c --- /dev/null +++ b/export_proto_to_parquet/run_worker.py @@ -0,0 +1,36 @@ +"""Module defines temporal worker.""" + +import asyncio + +from export_proto_to_parquet.activities import ExportS3Activities +from shared import DATA_TRANSFORMATION_TASK_QUEUE_NAME +from temporalio.client import Client +from temporalio.worker import Worker +from temporalio.worker.workflow_sandbox import ( + SandboxedWorkflowRunner, + SandboxRestrictions, +) +from workflows import ProtoToParquet + + +async def main() -> None: + """Main worker function.""" + # Create client connected to server at the given address + client: Client = await Client.connect("localhost:7233", namespace="default") + + # Run the worker + s3_activities = ExportS3Activities() + worker: Worker = Worker( + client, + task_queue=DATA_TRANSFORMATION_TASK_QUEUE_NAME, + workflows=[ProtoToParquet], + activities=[s3_activities.get_object_keys, s3_activities.data_trans_and_land], + workflow_runner=SandboxedWorkflowRunner( + restrictions=SandboxRestrictions.default.with_passthrough_modules("boto3") + ), + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/export_proto_to_parquet/run_workflow.py b/export_proto_to_parquet/run_workflow.py new file mode 100644 index 00000000..391cb1c6 --- /dev/null +++ b/export_proto_to_parquet/run_workflow.py @@ -0,0 +1,52 @@ +"""Module defines run temporal workflow.""" + +import asyncio +import traceback +from datetime import datetime, timedelta + +from dataobject import ProtoToParquetWorkflowInput +from shared import DATA_TRANSFORMATION_TASK_QUEUE_NAME, WORKFLOW_ID_PREFIX +from temporalio.client import ( + Client, + Schedule, + ScheduleActionStartWorkflow, + ScheduleIntervalSpec, + ScheduleSpec, + WorkflowFailureError, +) +from workflows import ProtoToParquet + + +async def main() -> None: + """Main function to run temporal workflow.""" + # Create client connected to server at the given address + client: Client = await Client.connect("localhost:7233", namespace="default") + # TODO: update s3_bucket and namespace to the actual name + wf_input = ProtoToParquetWorkflowInput( + num_delay_hour=2, + export_s3_bucket="test-input-bucket", + namespace="test.namespace", + output_s3_bucket="test-output-bucket", + ) + + try: + await client.create_schedule( + "hourly-proto-to-parquet-wf-schedule", + Schedule( + action=ScheduleActionStartWorkflow( + ProtoToParquet.run, + wf_input, + id=f"{WORKFLOW_ID_PREFIX}-{datetime.now()}", + task_queue=DATA_TRANSFORMATION_TASK_QUEUE_NAME, + ), + spec=ScheduleSpec( + intervals=[ScheduleIntervalSpec(every=timedelta(hours=1))] + ), + ), + ) + except WorkflowFailureError: + print("Got exception: ", traceback.format_exc()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/export_proto_to_parquet/shared.py b/export_proto_to_parquet/shared.py new file mode 100644 index 00000000..d333e7e3 --- /dev/null +++ b/export_proto_to_parquet/shared.py @@ -0,0 +1,2 @@ +DATA_TRANSFORMATION_TASK_QUEUE_NAME = "DATA_TRANSFORMATION_TASK_QUEUE" +WORKFLOW_ID_PREFIX = "proto-to-parquet" diff --git a/export_proto_to_parquet/workflows.py b/export_proto_to_parquet/workflows.py new file mode 100644 index 00000000..cba52b8c --- /dev/null +++ b/export_proto_to_parquet/workflows.py @@ -0,0 +1,67 @@ +"""Module defines workflows convert exported workflow history file from proto to parquet format.""" + +from datetime import timedelta + +from dataobject import ProtoToParquetWorkflowInput +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.exceptions import ActivityError + +with workflow.unsafe.imports_passed_through(): + from export_proto_to_parquet.data_trans_activities import ( + DataTransAndLandActivitiyInput, + ExportS3Activities, + GetObjectKeysActivityInput, + ) + + +@workflow.defn +class ProtoToParquet: + """Proto to parquet workflow.""" + + @workflow.run + async def run(self, workflow_input: ProtoToParquetWorkflowInput) -> str: + """Run proto to parquet workflow.""" + retry_policy = RetryPolicy( + maximum_attempts=10, maximum_interval=timedelta(seconds=5) + ) + + # Read from export S3 bucket and given at least 2 hour delay to ensure the file has been uploaded + read_time = workflow.now() - timedelta(hours=workflow_input.num_delay_hour) + common_path = f"{workflow_input.namespace}/{read_time.year}/{read_time.month:02}/{read_time.day:02}/{read_time.hour:02}/00" + path = f"temporal-workflow-history/export/{common_path}" + get_object_keys_input = GetObjectKeysActivityInput( + workflow_input.export_s3_bucket, path + ) + + # Read Input File + object_keys_output = await workflow.execute_activity_method( + ExportS3Activities.get_object_keys, + get_object_keys_input, + start_to_close_timeout=timedelta(minutes=5), + retry_policy=retry_policy, + ) + + write_path = f"temporal-workflow-history/parquet/{common_path}" + + try: + # Could spin up multiple threads to process files in parallel + for key in object_keys_output: + data_trans_and_land_input = DataTransAndLandActivitiyInput( + workflow_input.export_s3_bucket, + key, + workflow_input.output_s3_bucket, + write_path, + ) + # Convert proto to parquet and save to S3 + await workflow.execute_activity_method( + ExportS3Activities.data_trans_and_land, + data_trans_and_land_input, + start_to_close_timeout=timedelta(minutes=10), + retry_policy=retry_policy, + ) + except ActivityError as output_err: + workflow.logger.error(f"Data transformation failed: {output_err}") + raise output_err + + return write_path diff --git a/export_proto_to_parquet/yarn.lock b/export_proto_to_parquet/yarn.lock new file mode 100644 index 00000000..fb57ccd1 --- /dev/null +++ b/export_proto_to_parquet/yarn.lock @@ -0,0 +1,4 @@ +# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY. +# yarn lockfile v1 + +