-
Notifications
You must be signed in to change notification settings - Fork 0
/
watchdog.py
75 lines (63 loc) · 2.67 KB
/
watchdog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import logging
import os
import sys
import time
import requests
import slackweb
from kafka import KafkaConsumer
from logstash import TCPLogstashHandler
# logging.basicConfig(level='DEBUG')
# setup logging
logger = logging.getLogger('kafka-watchdog.logging')
logger.setLevel(logging.INFO)
console_logger = logging.StreamHandler(stream=sys.stdout)
console_logger.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
logstash_handler = TCPLogstashHandler(host=os.getenv('LOGSTASH_HOST', 'localhost'),
port=int(os.getenv('LOGSTASH_PORT', 5000)),
version=1)
[logger.addHandler(l) for l in [console_logger, logstash_handler]]
TIMEOUT = 60 # in seconds
RETRIES = 5
HOST = os.getenv('HOST_NAME')
SERVICES = [("db-adapter", "il060:3030"), ("Elastic-Stack", "il060:9600"),
("SensorThings", "il060:8082")]
OBSERVED_TOPICS = ['SensorData', 'node-red-message', ]
BOOTSTRAP_SERVERS = ['il061', 'il062', 'il063']
slack = slackweb.Slack(url=os.getenv('SLACK_URL'))
slack.notify(text='Started Kafka watchdog on host {}'.format(HOST))
consumers = [KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS, api_version=(0, 9))
for topic in OBSERVED_TOPICS]
while True:
for consumer in consumers:
logger.debug('Checking topics %s', consumer.subscription())
messages = consumer.poll(timeout_ms=TIMEOUT * 1000)
if not messages:
text = 'No messages in {} received within {} seconds.'.format(consumer.subscription(), TIMEOUT)
logger.error(text)
slack.notify(attachments=[{'title': 'Kafka Warning', 'text': text, 'color': 'warning'}])
else:
logger.debug('Received %d messages in topic %s', len(messages), consumer.subscription())
for name, url in SERVICES:
logger.debug('Checking service {} on {}'.format(name, url))
reachable = False
trials = 0
while not reachable:
try:
r = requests.get("http://" + url)
status_code = r.status_code
if status_code in [200]:
reachable = True
except:
continue
finally:
trials += 1
time.sleep(3) #TIMEOUT/RETRIES/len(SERVICES))
if trials >= RETRIES:
break
if not reachable:
text = 'No messages in {} received within {} trials.'.format(name, RETRIES)
logger.warning(text)
slack.notify(attachments=[{'title': 'Datastack Warning', 'text': text, 'color': 'warning'}])
else:
logger.debug('Reached service {} on {}'.format(name, url))
time.sleep(15)