From 6fcc52c1b31b61d0e2e65d05a75d91f0175affb7 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 25 Nov 2024 12:16:39 -0500 Subject: [PATCH] Isolate counters associated with different workflows --- polling/frequent/activities.py | 5 ++--- polling/infrequent/activities.py | 5 ++--- polling/test_service.py | 26 +++++++++++++------------- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/polling/frequent/activities.py b/polling/frequent/activities.py index 96fed1a..a112b41 100644 --- a/polling/frequent/activities.py +++ b/polling/frequent/activities.py @@ -2,16 +2,15 @@ from temporalio import activity -from polling.test_service import ComposeGreetingInput, TestService +from polling.test_service import ComposeGreetingInput, get_service_result @activity.defn async def compose_greeting(input: ComposeGreetingInput) -> str: - test_service = TestService() while True: try: try: - result = await test_service.get_service_result(input) + result = await get_service_result(input) activity.logger.info(f"Exiting activity ${result}") return result except Exception: diff --git a/polling/infrequent/activities.py b/polling/infrequent/activities.py index cbed702..b3db1ae 100644 --- a/polling/infrequent/activities.py +++ b/polling/infrequent/activities.py @@ -1,11 +1,10 @@ from temporalio import activity -from polling.test_service import ComposeGreetingInput, TestService +from polling.test_service import ComposeGreetingInput, get_service_result @activity.defn async def compose_greeting(input: ComposeGreetingInput) -> str: - test_service = TestService() # If this raises an exception because it's not done yet, the activity will # continually be scheduled for retry - return await test_service.get_service_result(input) + return await get_service_result(input) diff --git a/polling/test_service.py b/polling/test_service.py index 42a54ab..79d8ea7 100644 --- a/polling/test_service.py +++ b/polling/test_service.py @@ -1,5 +1,11 @@ +from collections import Counter from dataclasses import dataclass +from temporalio import activity + +attempts = Counter() +ERROR_ATTEMPTS = 5 + @dataclass class ComposeGreetingInput: @@ -7,17 +13,11 @@ class ComposeGreetingInput: name: str -try_attempts = 0 - - -class TestService: - def __init__(self): - self.error_attempts = 5 +async def get_service_result(input): + attempts[activity.info().workflow_id] += 1 + attempt = attempts[activity.info().workflow_id] - async def get_service_result(self, input): - global try_attempts - print(f"Attempt {try_attempts} of {self.error_attempts} to invoke service") - try_attempts += 1 - if try_attempts % self.error_attempts == 0: - return f"{input.greeting}, {input.name}!" - raise Exception("service is down") + print(f"Attempt {attempt} of {ERROR_ATTEMPTS} to invoke service") + if attempt == ERROR_ATTEMPTS: + return f"{input.greeting}, {input.name}!" + raise Exception("service is down")