From be0f748685e6c19d845d06bfa63e7d925ad6f1a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Thu, 25 Apr 2024 16:01:06 +0200 Subject: [PATCH 1/3] batch_daily example --- batch_daily/README.md | 22 +++++++++ batch_daily/__init__.py | 0 batch_daily/activities.py | 32 +++++++++++++ batch_daily/create_schedule.py | 51 +++++++++++++++++++++ batch_daily/run_worker.py | 38 +++++++++++++++ batch_daily/workflows.py | 84 ++++++++++++++++++++++++++++++++++ 6 files changed, 227 insertions(+) create mode 100644 batch_daily/README.md create mode 100644 batch_daily/__init__.py create mode 100644 batch_daily/activities.py create mode 100644 batch_daily/create_schedule.py create mode 100644 batch_daily/run_worker.py create mode 100644 batch_daily/workflows.py diff --git a/batch_daily/README.md b/batch_daily/README.md new file mode 100644 index 00000000..e21fa571 --- /dev/null +++ b/batch_daily/README.md @@ -0,0 +1,22 @@ +# Batch sample + +This is an example workflow to process batches of records matching a particular +query criteria, in daily windows of time. + +Please make sure your python is 3.9 above. For this sample, run: + +``` +poetry install --with batch_daily +``` + +To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the worker: + +```bash +poetry run python run_worker.py +``` + +This will start the worker. Then, in another terminal, run the following to execute the schedule: + +```bash +poetry run python create_schedule.py +``` diff --git a/batch_daily/__init__.py b/batch_daily/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/batch_daily/activities.py b/batch_daily/activities.py new file mode 100644 index 00000000..1d32b2e2 --- /dev/null +++ b/batch_daily/activities.py @@ -0,0 +1,32 @@ +import logging +from typing import List +from temporalio import activity + +from dataclasses import dataclass + +log = logging.getLogger(__name__) + + +@dataclass +class ListRecordActivityInput: + record_filter: str + day: str + + +@dataclass +class ProcessRecordActivityInput: + uri: str + + +@activity.defn +def list_records(activity_input: ListRecordActivityInput) -> List[str]: + log.info( + f"filtering records on {activity_input.day} based on filter: {activity_input.record_filter}" + ) + return [f"uri://record-id{idx}" for idx in range(10)] + + +@activity.defn +def process_record(activity_input: ProcessRecordActivityInput) -> str: + log.info(f"this record is yummy: {activity_input.uri}") + return activity_input.uri diff --git a/batch_daily/create_schedule.py b/batch_daily/create_schedule.py new file mode 100644 index 00000000..52974070 --- /dev/null +++ b/batch_daily/create_schedule.py @@ -0,0 +1,51 @@ +import asyncio +import traceback +from datetime import datetime, timedelta + +from temporalio.client import ( + Client, + Schedule, + ScheduleActionStartWorkflow, + ScheduleIntervalSpec, + ScheduleSpec, + WorkflowFailureError, +) + +from batch.workflows import ( + DailyBatch, + DailyBatchWorkflowInput, +) + + +async def main() -> None: + """Main function to run temporal workflow.""" + client = await Client.connect("localhost:7233") + + wf_input = DailyBatchWorkflowInput( + record_filter="taste=yummy", + # XXX: how do we get the current day in a way that works with the schedule? + start_day=datetime.now().date().strftime("%Y-%m-%d"), + end_day=((datetime.now().date()) + timedelta(days=1)).strftime("%Y-%m-%d"), + ) + + try: + await client.create_schedule( + "daily-batch-wf-schedule", + Schedule( + action=ScheduleActionStartWorkflow( + DailyBatch.run, + wf_input, + id=f"daily-batch-{wf_input.record_filter}", + task_queue="TASK_QUEUE", + ), + spec=ScheduleSpec( + intervals=[ScheduleIntervalSpec(every=timedelta(hours=1))] + ), + ), + ) + except WorkflowFailureError: + print("Got exception: ", traceback.format_exc()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/batch_daily/run_worker.py b/batch_daily/run_worker.py new file mode 100644 index 00000000..df02de11 --- /dev/null +++ b/batch_daily/run_worker.py @@ -0,0 +1,38 @@ +import asyncio +from concurrent.futures import ThreadPoolExecutor + +from temporalio.client import Client +from temporalio.worker import Worker +from temporalio.worker.workflow_sandbox import ( + SandboxedWorkflowRunner, + SandboxRestrictions, +) + +from cloud_export_to_parquet.data_trans_activities import ( + data_trans_and_land, + get_object_keys, +) +from cloud_export_to_parquet.workflows import ProtoToParquet + + +async def main() -> None: + """Main worker function.""" + # Create client connected to server at the given address + client = await Client.connect("localhost:7233") + + # Run the worker + worker: Worker = Worker( + client, + task_queue="DATA_TRANSFORMATION_TASK_QUEUE", + workflows=[ProtoToParquet], + activities=[get_object_keys, data_trans_and_land], + workflow_runner=SandboxedWorkflowRunner( + restrictions=SandboxRestrictions.default.with_passthrough_modules("boto3") + ), + activity_executor=ThreadPoolExecutor(100), + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/batch_daily/workflows.py b/batch_daily/workflows.py new file mode 100644 index 00000000..bace78e1 --- /dev/null +++ b/batch_daily/workflows.py @@ -0,0 +1,84 @@ +from datetime import datetime, timedelta + +from dataclasses import dataclass + +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.exceptions import ActivityError + +with workflow.unsafe.imports_passed_through(): + from batch.activities import ( + ListRecordActivityInput, + list_records, + ProcessRecordActivityInput, + process_record, + ) + + +@dataclass +class RecordProcessorWorkflowInput: + day: str + record_uri: str + + +@workflow.defn +class RecordProcessor: + @workflow.run + async def run(self, workflow_input: RecordProcessorWorkflowInput) -> str: + list_records_input = ListRecordActivityInput( + record_filter="taste=yummy", day=workflow_input.day + ) + + record_uri_list = await workflow.execute_activity( + list_records, + list_records_input, + start_to_close_timeout=timedelta(minutes=5), + ) + try: + for key in record_uri_list: + process_record_input = ProcessRecordActivityInput(uri=key) + await workflow.execute_activity( + process_record, + process_record_input, + start_to_close_timeout=timedelta(minutes=1), + ) + + except ActivityError as output_err: + workflow.logger.error(f"failed: {output_err}") + raise output_err + + +@dataclass +class DailyBatchWorkflowInput: + start_day: str + end_day: str + record_filter: str + + +@workflow.defn +class DailyBatch: + """DailyBatch workflow""" + + @workflow.run + async def run(self, workflow_input: DailyBatchWorkflowInput) -> str: + if workflow_input.start_day == workflow_input.end_day: + return "" + + await workflow.execute_child_workflow( + RecordProcessor.run, + RecordProcessorWorkflowInput( + day=workflow_input.start_day, record_uri=workflow_input.record_filter + ), + ) + + next_start_day = ( + datetime.strptime(workflow_input.start_day, "%Y-%m-%d") + timedelta(days=1) + ).strftime("%Y-%m-%d") + + return workflow.continue_as_new( + DailyBatchWorkflowInput( + start_day=next_start_day, + end_day=workflow_input.end_day, + record_filter=workflow_input.record_filter, + ) + ) From 1a70e3a17ac713567414d11049bcdb20cbe6687a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Thu, 25 Apr 2024 18:59:58 +0200 Subject: [PATCH 2/3] Improvements to the scheduling and workflow --- batch_daily/activities.py | 29 ++++++---- batch_daily/create_schedule.py | 21 +++----- batch_daily/run_worker.py | 23 +++----- batch_daily/starter.py | 28 ++++++++++ batch_daily/workflows.py | 98 +++++++++++++++++++++------------- 5 files changed, 123 insertions(+), 76 deletions(-) create mode 100644 batch_daily/starter.py diff --git a/batch_daily/activities.py b/batch_daily/activities.py index 1d32b2e2..3ca9a3a4 100644 --- a/batch_daily/activities.py +++ b/batch_daily/activities.py @@ -1,11 +1,11 @@ -import logging -from typing import List +import asyncio +import time +import random +from typing import Any, Dict, List from temporalio import activity from dataclasses import dataclass -log = logging.getLogger(__name__) - @dataclass class ListRecordActivityInput: @@ -18,15 +18,26 @@ class ProcessRecordActivityInput: uri: str +async def random_sleep(): + """ + simulate a long running operation with a random sleep. + """ + sleep_s = 1 / random.randint(1, 100) + await asyncio.sleep(sleep_s) + + @activity.defn -def list_records(activity_input: ListRecordActivityInput) -> List[str]: - log.info( +async def list_records(activity_input: ListRecordActivityInput) -> List[str]: + print( f"filtering records on {activity_input.day} based on filter: {activity_input.record_filter}" ) + await random_sleep() return [f"uri://record-id{idx}" for idx in range(10)] @activity.defn -def process_record(activity_input: ProcessRecordActivityInput) -> str: - log.info(f"this record is yummy: {activity_input.uri}") - return activity_input.uri +async def process_record(activity_input: ProcessRecordActivityInput) -> Dict[str, Any]: + t0 = time.monotonic() + print(f"this record is yummy: {activity_input.uri}") + await random_sleep() + return {"runtime": time.monotonic() - t0} diff --git a/batch_daily/create_schedule.py b/batch_daily/create_schedule.py index 52974070..72b1d28f 100644 --- a/batch_daily/create_schedule.py +++ b/batch_daily/create_schedule.py @@ -11,9 +11,10 @@ WorkflowFailureError, ) -from batch.workflows import ( - DailyBatch, - DailyBatchWorkflowInput, +from batch_daily.workflows import ( + RecordBatchProcessor, + RecordBatchProcessorWorkflowInput, + TASK_QUEUE_NAME, ) @@ -21,22 +22,16 @@ async def main() -> None: """Main function to run temporal workflow.""" client = await Client.connect("localhost:7233") - wf_input = DailyBatchWorkflowInput( - record_filter="taste=yummy", - # XXX: how do we get the current day in a way that works with the schedule? - start_day=datetime.now().date().strftime("%Y-%m-%d"), - end_day=((datetime.now().date()) + timedelta(days=1)).strftime("%Y-%m-%d"), - ) - try: + wf_input = RecordBatchProcessorWorkflowInput(record_filter="taste=yummy") await client.create_schedule( "daily-batch-wf-schedule", Schedule( action=ScheduleActionStartWorkflow( - DailyBatch.run, + RecordBatchProcessor.run, wf_input, - id=f"daily-batch-{wf_input.record_filter}", - task_queue="TASK_QUEUE", + id=f"record-filter-{wf_input.record_filter}", + task_queue=TASK_QUEUE_NAME, ), spec=ScheduleSpec( intervals=[ScheduleIntervalSpec(every=timedelta(hours=1))] diff --git a/batch_daily/run_worker.py b/batch_daily/run_worker.py index df02de11..356962d9 100644 --- a/batch_daily/run_worker.py +++ b/batch_daily/run_worker.py @@ -3,32 +3,23 @@ from temporalio.client import Client from temporalio.worker import Worker -from temporalio.worker.workflow_sandbox import ( - SandboxedWorkflowRunner, - SandboxRestrictions, -) -from cloud_export_to_parquet.data_trans_activities import ( - data_trans_and_land, - get_object_keys, +from batch_daily.activities import ( + list_records, + process_record, ) -from cloud_export_to_parquet.workflows import ProtoToParquet +from batch_daily.workflows import DailyBatch, RecordBatchProcessor, TASK_QUEUE_NAME async def main() -> None: """Main worker function.""" - # Create client connected to server at the given address client = await Client.connect("localhost:7233") - # Run the worker worker: Worker = Worker( client, - task_queue="DATA_TRANSFORMATION_TASK_QUEUE", - workflows=[ProtoToParquet], - activities=[get_object_keys, data_trans_and_land], - workflow_runner=SandboxedWorkflowRunner( - restrictions=SandboxRestrictions.default.with_passthrough_modules("boto3") - ), + task_queue=TASK_QUEUE_NAME, + workflows=[DailyBatch, RecordBatchProcessor], + activities=[list_records, process_record], activity_executor=ThreadPoolExecutor(100), ) await worker.run() diff --git a/batch_daily/starter.py b/batch_daily/starter.py new file mode 100644 index 00000000..4f834665 --- /dev/null +++ b/batch_daily/starter.py @@ -0,0 +1,28 @@ +import asyncio + +from temporalio.client import Client + +# from batch_daily.activity import +from batch_daily.workflows import DailyBatchWorkflowInput, TASK_QUEUE_NAME, DailyBatch + + +async def main(): + client = await Client.connect( + "localhost:7233", + ) + + result = await client.execute_workflow( + DailyBatch.run, + DailyBatchWorkflowInput( + start_day="2024-01-01", + end_day="2024-03-01", + record_filter="taste=yummy", + ), + id=f"daily_batch-workflow-id", + task_queue=TASK_QUEUE_NAME, + ) + print(f"Workflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/batch_daily/workflows.py b/batch_daily/workflows.py index bace78e1..e9a1c037 100644 --- a/batch_daily/workflows.py +++ b/batch_daily/workflows.py @@ -1,32 +1,49 @@ +import asyncio from datetime import datetime, timedelta from dataclasses import dataclass +import time +from typing import Any, Dict, Optional from temporalio import workflow from temporalio.common import RetryPolicy from temporalio.exceptions import ActivityError +from temporalio.common import SearchAttributeKey with workflow.unsafe.imports_passed_through(): - from batch.activities import ( + from batch_daily.activities import ( ListRecordActivityInput, list_records, ProcessRecordActivityInput, process_record, ) +TASK_QUEUE_NAME = "MY_TASK_QUEUE" + @dataclass -class RecordProcessorWorkflowInput: - day: str - record_uri: str +class RecordBatchProcessorWorkflowInput: + record_filter: str + day: Optional[str] = None @workflow.defn -class RecordProcessor: +class RecordBatchProcessor: @workflow.run - async def run(self, workflow_input: RecordProcessorWorkflowInput) -> str: + async def run( + self, workflow_input: RecordBatchProcessorWorkflowInput + ) -> Dict[str, Any]: + if workflow_input.day is None: + schedule_time = workflow.info().typed_search_attributes.get( + SearchAttributeKey.for_datetime("TemporalScheduledStartTime") + ) + assert schedule_time is not None, "when not scheduled, day must be provided" + workflow_input.day = schedule_time.strftime("%Y-%m-%d") + + print(f"starting RecordProcessor with {workflow_input}") + list_records_input = ListRecordActivityInput( - record_filter="taste=yummy", day=workflow_input.day + record_filter=workflow_input.record_filter, day=workflow_input.day ) record_uri_list = await workflow.execute_activity( @@ -34,18 +51,22 @@ async def run(self, workflow_input: RecordProcessorWorkflowInput) -> str: list_records_input, start_to_close_timeout=timedelta(minutes=5), ) - try: + + task_list = [] + async with asyncio.TaskGroup() as tg: for key in record_uri_list: process_record_input = ProcessRecordActivityInput(uri=key) - await workflow.execute_activity( - process_record, - process_record_input, - start_to_close_timeout=timedelta(minutes=1), + task_list.append( + tg.create_task( + workflow.execute_activity( + process_record, + process_record_input, + start_to_close_timeout=timedelta(minutes=1), + ) + ) ) - - except ActivityError as output_err: - workflow.logger.error(f"failed: {output_err}") - raise output_err + total_runtime = sum(map(lambda task: task.result()["runtime"], task_list)) + return {"runtime": total_runtime} @dataclass @@ -60,25 +81,26 @@ class DailyBatch: """DailyBatch workflow""" @workflow.run - async def run(self, workflow_input: DailyBatchWorkflowInput) -> str: - if workflow_input.start_day == workflow_input.end_day: - return "" - - await workflow.execute_child_workflow( - RecordProcessor.run, - RecordProcessorWorkflowInput( - day=workflow_input.start_day, record_uri=workflow_input.record_filter - ), - ) - - next_start_day = ( - datetime.strptime(workflow_input.start_day, "%Y-%m-%d") + timedelta(days=1) - ).strftime("%Y-%m-%d") - - return workflow.continue_as_new( - DailyBatchWorkflowInput( - start_day=next_start_day, - end_day=workflow_input.end_day, - record_filter=workflow_input.record_filter, - ) - ) + async def run(self, workflow_input: DailyBatchWorkflowInput) -> Dict[str, Any]: + print(f"starting DailyBatch with {workflow_input}") + + start = datetime.strptime(workflow_input.start_day, "%Y-%m-%d") + end = datetime.strptime(workflow_input.end_day, "%Y-%m-%d") + task_list = [] + async with asyncio.TaskGroup() as tg: + for day in [ + start + timedelta(days=x) for x in range(0, (end - start).days) + ]: + task_list.append( + tg.create_task( + workflow.execute_child_workflow( + RecordBatchProcessor.run, + RecordBatchProcessorWorkflowInput( + day=day.strftime("%Y-%m-%d"), + record_filter=workflow_input.record_filter, + ), + ) + ) + ) + total_runtime = sum(map(lambda task: task.result()["runtime"], task_list)) + return {"runtime": total_runtime} From b64040ab95a573d85e26431c3d9cb57120857710 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Thu, 25 Apr 2024 19:03:14 +0200 Subject: [PATCH 3/3] Update Readme --- batch_daily/README.md | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/batch_daily/README.md b/batch_daily/README.md index e21fa571..2c75dbb6 100644 --- a/batch_daily/README.md +++ b/batch_daily/README.md @@ -1,7 +1,15 @@ # Batch sample -This is an example workflow to process batches of records matching a particular -query criteria, in daily windows of time. +This is an example workflow that solves the following use-case. + +You have a series of records that are divided into daily batches (think a days +worth of telemetry coming from an application). +Every day you would like to run a batch to process a days worth of records, but +you would also like to have the ability to backfill the records from a previous +window of time. + +Backfilling might be run as a schedule or it might be run as a directly +triggered workflow. Please make sure your python is 3.9 above. For this sample, run: @@ -15,7 +23,13 @@ To run, first see [README.md](../README.md) for prerequisites. Then, run the fol poetry run python run_worker.py ``` -This will start the worker. Then, in another terminal, run the following to execute the schedule: +This will start the worker. Then, in another terminal, run the following to start the workflow: + +```bash +poetry run python starter.py +``` + +Optionally, you can schedule the workflow with: ```bash poetry run python create_schedule.py