-
Notifications
You must be signed in to change notification settings - Fork 2
/
cam.py
147 lines (127 loc) · 5.79 KB
/
cam.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import requests
import json
import time
import threading
import uuid
import logging
logging.basicConfig(level=logging.INFO)
class Client:
def __init__(self, url, workerid=None):
self.url = url
self.workerid = str(workerid) if workerid else str(uuid.uuid1())
self.threads = []
self.stop_event = threading.Event()
def __fetch_and_lock(self, endpoint, task, callback=None, interval=300):
try:
while not self.stop_event.is_set():
logging.info(f"Polling subscription: {task['topics'][0]['topicName']}")
response = requests.post(endpoint, json=task)
logging.info(f"Response status: {response.status_code}")
response_text = response.text
if response_text != '[]':
response_data = json.loads(response_text)
taskid = str(response_data[0]['id'])
task_variables = response_data[0].get('variables', {})
logging.info(f"Task fetched with ID: {taskid} and data: {task_variables}")
if callback:
try:
callback(taskid, response_data, self)
except TypeError as e:
try:
callback(taskid, response_data)
except TypeError:
raise TypeError(
f"The callback function must accept at least 2 parameters (taskid, response_data) and optionally a 3rd parameter (worker). "
f"Original error: {e}"
)
else:
return response_data
else:
time.sleep(interval / 1000)
except requests.exceptions.RequestException as e:
logging.error(f"Request failed for subscription: {task['topics'][0]['topicName']}, error: {e}")
def subscribe(self, topic, callback=None, tenantId=None, lockDuration=20000, longPolling=29000):
endpoint = f"{self.url}/external-task/fetchAndLock"
task = {
"workerId": self.workerid,
"maxTasks": 1,
"usePriority": "true",
"asyncResponseTimeout": longPolling,
"topics": [{
"topicName": topic,
"lockDuration": lockDuration,
}]
}
if tenantId:
task['topics'][0]['tenantIdIn'] = [tenantId]
if callback:
self.threads.append(threading.Thread(target=self.__fetch_and_lock, args=(endpoint, task, callback,)))
else:
return self.__fetch_and_lock(endpoint, task)
def polling(self):
try:
for thread in self.threads:
if not self.stop_event.is_set():
thread.start()
for thread in self.threads:
if thread.is_alive():
thread.join()
if self.stop_event.is_set():
self.threads = []
self.stop_event.clear()
logging.info("Stopped - you may need to subscribe again")
except KeyboardInterrupt:
self.stop_event.set()
def complete(self, taskid, **kwargs):
endpoint = f"{self.url}/external-task/{taskid}/complete"
variables_for_response = {key: {"value": val} for key, val in kwargs.items()}
request = {
"workerId": self.workerid,
"variables": variables_for_response
}
try:
response = requests.post(endpoint, json=request)
if response.status_code == 204:
logging.info(f"Task {taskid} completed successfully with data: {request}")
else:
logging.info(f"Response: {response.text}, Status code: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Request failed: {e}")
def error(self, taskid, error_code, error_message="not defined", **kwargs):
endpoint = f"{self.url}/external-task/{taskid}/bpmnError"
variables_for_response = {key: {"value": val} for key, val in kwargs.items()}
request = {
"workerId": self.workerid,
"errorCode": error_code,
"errorMessage": error_message,
"variables": variables_for_response
}
try:
response = requests.post(endpoint, json=request)
logging.info(f"Error response: {response.text}, Status code: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Request failed: {e}")
def failure(self, taskid, error_message="not defined", retries=0, retry_timeout=0):
endpoint = f"{self.url}/external-task/{taskid}/failure"
request = {
"workerId": self.workerid,
"errorMessage": error_message,
"retries": retries,
"retryTimeout": retry_timeout
}
try:
response = requests.post(endpoint, json=request)
logging.info(f"Failure response: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Request failed: {e}")
def extend_lock(self, taskid, new_duration):
endpoint = f"{self.url}/external-task/{taskid}/extendLock"
request = {
"workerId": self.workerid,
"newDuration": new_duration
}
try:
response = requests.post(endpoint, json=request)
logging.info(f"Extended lock for task {taskid} with response: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Request failed: {e}")