Skip to content

Commit

Permalink
Isolate counters associated with different workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed Nov 25, 2024
1 parent ddd7b83 commit 6fcc52c
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 19 deletions.
5 changes: 2 additions & 3 deletions polling/frequent/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@

from temporalio import activity

from polling.test_service import ComposeGreetingInput, TestService
from polling.test_service import ComposeGreetingInput, get_service_result


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
test_service = TestService()
while True:
try:
try:
result = await test_service.get_service_result(input)
result = await get_service_result(input)
activity.logger.info(f"Exiting activity ${result}")
return result
except Exception:
Expand Down
5 changes: 2 additions & 3 deletions polling/infrequent/activities.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from temporalio import activity

from polling.test_service import ComposeGreetingInput, TestService
from polling.test_service import ComposeGreetingInput, get_service_result


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
test_service = TestService()
# If this raises an exception because it's not done yet, the activity will
# continually be scheduled for retry
return await test_service.get_service_result(input)
return await get_service_result(input)
26 changes: 13 additions & 13 deletions polling/test_service.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
from collections import Counter
from dataclasses import dataclass

from temporalio import activity

attempts = Counter()
ERROR_ATTEMPTS = 5


@dataclass
class ComposeGreetingInput:
greeting: str
name: str


try_attempts = 0


class TestService:
def __init__(self):
self.error_attempts = 5
async def get_service_result(input):
attempts[activity.info().workflow_id] += 1
attempt = attempts[activity.info().workflow_id]

async def get_service_result(self, input):
global try_attempts
print(f"Attempt {try_attempts} of {self.error_attempts} to invoke service")
try_attempts += 1
if try_attempts % self.error_attempts == 0:
return f"{input.greeting}, {input.name}!"
raise Exception("service is down")
print(f"Attempt {attempt} of {ERROR_ATTEMPTS} to invoke service")
if attempt == ERROR_ATTEMPTS:
return f"{input.greeting}, {input.name}!"
raise Exception("service is down")

0 comments on commit 6fcc52c

Please sign in to comment.