Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
alice-yin committed Apr 19, 2024
1 parent 9128e4c commit 6c737b5
Show file tree
Hide file tree
Showing 15 changed files with 1,030 additions and 787 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [sentry](sentry) - Report errors to Sentry.
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.

* [cloud_export_to_parquet](cloud_export_to_parquet) - Set up schedule workflow to process exported workflow on an hourly basis

## Test

Expand Down
23 changes: 23 additions & 0 deletions cloud_export_to_parquet/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Cloud Export to parquet sample

This is an example workflow to convert exported file from proto to parquet file. The workflow is an hourly schedule.

For this sample, the optional `cloud_export_to_parquet` dependency group must be included. To include, run:

poetry install --with cloud_export_to_parquet

Before you start, please modify workflow input in `create_schedule.py` with your s3 bucket and namespace. Also make sure you've the right AWS permission set up in your environment to allow this workflow read and write to your s3 bucket.

To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the worker:

```bash
poetry run python run_worker.py
```

This will start the worker. Then, in another terminal, run the following to execute the schedule:

```bash
poetry run python create_schedule.py
```

The workflow should convert exported file in your input s3 bucket to parquet in your specified location.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
"""Module defines run temporal workflow."""

import asyncio
import traceback
from datetime import datetime, timedelta

from dataobject import ProtoToParquetWorkflowInput
from shared import DATA_TRANSFORMATION_TASK_QUEUE_NAME, WORKFLOW_ID_PREFIX
from temporalio.client import (
Client,
Schedule,
Expand All @@ -16,28 +12,42 @@
)
from workflows import ProtoToParquet

from cloud_export_to_parquet.workflows import ProtoToParquetWorkflowInput


async def main() -> None:
"""Main function to run temporal workflow."""
# Create client connected to server at the given address
client: Client = await Client.connect("localhost:7233", namespace="default")
# TODO: update s3_bucket and namespace to the actual name
client = await Client.connect("localhost:7233")
# TODO: update s3_bucket and namespace to the actual usecase
wf_input = ProtoToParquetWorkflowInput(
num_delay_hour=2,
export_s3_bucket="test-input-bucket",
namespace="test.namespace",
output_s3_bucket="test-output-bucket",
)

# Run the workflow
# try:
# await client.start_workflow(
# ProtoToParquet.run,
# wf_input,
# id = f"proto-to-parquet-{datetime.now()}",
# task_queue="DATA_TRANSFORMATION_TASK_QUEUE",
# )
# except WorkflowFailureError:
# print("Got exception: ", traceback.format_exc())

# Create the schedule
try:
await client.create_schedule(
"hourly-proto-to-parquet-wf-schedule",
Schedule(
action=ScheduleActionStartWorkflow(
ProtoToParquet.run,
wf_input,
id=f"{WORKFLOW_ID_PREFIX}-{datetime.now()}",
task_queue=DATA_TRANSFORMATION_TASK_QUEUE_NAME,
id=f"proto-to-parquet-{datetime.now()}",
task_queue="DATA_TRANSFORMATION_TASK_QUEUE",
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(hours=1))]
Expand Down
127 changes: 127 additions & 0 deletions cloud_export_to_parquet/data_trans_activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import json
import uuid
from dataclasses import dataclass
from typing import List

import aioboto3
import pandas as pd
import temporalio.api.export.v1 as export
from google.protobuf.json_format import MessageToJson
from temporalio import activity


@dataclass
class GetObjectKeysActivityInput:
bucket: str
path: str


@dataclass
class DataTransAndLandActivityInput:
export_s3_bucket: str
object_key: str
output_s3_bucket: str
write_path: str


@activity.defn
async def get_object_keys(activity_input: GetObjectKeysActivityInput) -> List[str]:
"""Function that list objects by key."""
session = aioboto3.Session()
async with session.client("s3") as s3:
response = await s3.list_objects_v2(
Bucket=activity_input.bucket, Prefix=activity_input.path
)
object_keys = []
for obj in response.get("Contents", []):
object_keys.append(obj["Key"])
if len(object_keys) == 0:
raise FileNotFoundError(
f"No files found in {activity_input.bucket}/{activity_input.path}"
)
return object_keys


@activity.defn
async def data_trans_and_land(activity_input: DataTransAndLandActivityInput) -> str:
"""Function that convert proto to parquet and save to S3."""
key = activity_input.object_key
data = await get_data_from_object_key(activity_input.export_s3_bucket, key)
activity.logger.info("Convert proto to parquet for file: %s", key)
parquet_data = convert_proto_to_parquet_flatten(data)
activity.logger.info("Finish transformation for file: %s", key)
return await save_to_sink(
parquet_data, activity_input.output_s3_bucket, activity_input.write_path
)


async def get_data_from_object_key(
bucket_name: str, object_key: str
) -> export.WorkflowExecutions:
"""Function that get object by key."""
v = export.WorkflowExecutions()

session = aioboto3.Session()
async with session.client("s3") as s3:
try:
get_object = await s3.get_object(Bucket=bucket_name, Key=object_key)
data = await get_object["Body"].read()
except Exception as e:
activity.logger.error(f"Error reading object: {e}")
raise e

v.ParseFromString(data)
return v


def convert_proto_to_parquet_flatten(wfs: export.WorkflowExecutions) -> pd.DataFrame:
"""Function that convert flatten proto data to parquet."""
dfs = []
for wf in wfs.items:
start_attributes = wf.history.events[
0
].workflow_execution_started_event_attributes
histories = wf.history
json_str = MessageToJson(histories)
row = {
"WorkflowID": start_attributes.workflow_id,
"RunID": start_attributes.original_execution_run_id,
"Histories": json.loads(json_str),
}
dfs.append(pd.DataFrame([row]))
df = pd.concat(dfs, ignore_index=True)
rows_flatten = []
for _, row in df.iterrows():
wf_histories_raw = row["Histories"]["events"]
worfkow_id = row["WorkflowID"]
run_id = row["RunID"]
for history_event in wf_histories_raw:
row_flatten = pd.json_normalize(history_event, sep="_")
skip_name = ["payloads", "."]
columns_to_drop = [
col for col in row_flatten.columns for skip in skip_name if skip in col
]
row_flatten.drop(columns_to_drop, axis=1, inplace=True)
row_flatten.insert(0, "WorkflowId", worfkow_id)
row_flatten.insert(1, "RunId", run_id)
rows_flatten.append(row_flatten)
df_flatten = pd.concat(rows_flatten, ignore_index=True)
return df_flatten


async def save_to_sink(data: pd.DataFrame, s3_bucket: str, write_path: str) -> str:
"""Function that save object to s3 bucket."""
write_bytes = data.to_parquet(None, compression="snappy", index=False)
uuid_name = uuid.uuid1()
file_name = f"{uuid_name}.parquet"
activity.logger.info("Writing to S3 bucket: %s", file_name)

session = aioboto3.Session()
async with session.client("s3") as s3:
try:
key = f"{write_path}/{file_name}"
await s3.put_object(Bucket=s3_bucket, Key=key, Body=write_bytes)
return key
except Exception as e:
activity.logger.error(f"Error saving to sink: {e}")
raise e
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
"""Module defines temporal worker."""

import asyncio

from shared import DATA_TRANSFORMATION_TASK_QUEUE_NAME
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.worker.workflow_sandbox import (
Expand All @@ -11,21 +8,23 @@
)
from workflows import ProtoToParquet

from export_proto_to_parquet.activities import ExportS3Activities
from cloud_export_to_parquet.data_trans_activities import (
data_trans_and_land,
get_object_keys,
)


async def main() -> None:
"""Main worker function."""
# Create client connected to server at the given address
client: Client = await Client.connect("localhost:7233", namespace="default")
client = await Client.connect("localhost:7233")

# Run the worker
s3_activities = ExportS3Activities()
worker: Worker = Worker(
client,
task_queue=DATA_TRANSFORMATION_TASK_QUEUE_NAME,
task_queue="DATA_TRANSFORMATION_TASK_QUEUE",
workflows=[ProtoToParquet],
activities=[s3_activities.get_object_keys, s3_activities.data_trans_and_land],
activities=[get_object_keys, data_trans_and_land],
workflow_runner=SandboxedWorkflowRunner(
restrictions=SandboxRestrictions.default.with_passthrough_modules("boto3")
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
"""Module defines workflows convert exported workflow history file from proto to parquet format."""

from datetime import timedelta

from dataobject import ProtoToParquetWorkflowInput
from temporalio import workflow
from temporalio.common import RetryPolicy
from temporalio.exceptions import ActivityError

with workflow.unsafe.imports_passed_through():
from export_proto_to_parquet.data_trans_activities import (
DataTransAndLandActivitiyInput,
ExportS3Activities,
from cloud_export_to_parquet.data_trans_activities import (
DataTransAndLandActivityInput,
data_trans_and_land,
get_object_keys,
GetObjectKeysActivityInput,
)
from dataclasses import dataclass


@dataclass
class ProtoToParquetWorkflowInput:
num_delay_hour: int
export_s3_bucket: str
namespace: str
output_s3_bucket: str


@workflow.defn
Expand All @@ -36,7 +43,7 @@ async def run(self, workflow_input: ProtoToParquetWorkflowInput) -> str:

# Read Input File
object_keys_output = await workflow.execute_activity_method(
ExportS3Activities.get_object_keys,
get_object_keys,
get_object_keys_input,
start_to_close_timeout=timedelta(minutes=5),
retry_policy=retry_policy,
Expand All @@ -47,15 +54,15 @@ async def run(self, workflow_input: ProtoToParquetWorkflowInput) -> str:
try:
# Could spin up multiple threads to process files in parallel
for key in object_keys_output:
data_trans_and_land_input = DataTransAndLandActivitiyInput(
data_trans_and_land_input = DataTransAndLandActivityInput(
workflow_input.export_s3_bucket,
key,
workflow_input.output_s3_bucket,
write_path,
)
# Convert proto to parquet and save to S3
await workflow.execute_activity_method(
ExportS3Activities.data_trans_and_land,
data_trans_and_land,
data_trans_and_land_input,
start_to_close_timeout=timedelta(minutes=10),
retry_policy=retry_policy,
Expand Down
21 changes: 0 additions & 21 deletions export_proto_to_parquet/LICENSE

This file was deleted.

59 changes: 0 additions & 59 deletions export_proto_to_parquet/README.md

This file was deleted.

Loading

0 comments on commit 6c737b5

Please sign in to comment.