-
Notifications
You must be signed in to change notification settings - Fork 171
/
alerta_msteamswebhook.py
82 lines (69 loc) · 3.35 KB
/
alerta_msteamswebhook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from uuid import UUID
from alerta.models.alert import Alert
from alerta.models.blackout import Blackout
from alerta.utils.audit import write_audit_trail
from alerta.webhooks import WebhookBase
from flask import current_app, g, jsonify, make_response, request
class MsteamsWebhook(WebhookBase):
def incoming(self, query_string, payload):
# Note: This doesn't validate MS Teams Authorization: Bearer JWT
# instead we're relying on alerta to validate X-API-Key header
action = payload.get('action', 'missing')
if action not in ['ack', 'close', 'blackout']:
resp = make_response(
jsonify(status='error', message='Invalid action'), 400)
return resp
if action in ['ack', 'close']:
alert_id = payload.get('alert_id', None)
err = make_response(
jsonify(status='error', message='Missing/invalid alert_id'), 400)
if not alert_id:
return err
try:
# check that alert_id looks like uuid
uuidval = UUID(alert_id, version=4)
if str(uuidval) != alert_id.lower():
return err
except Exception:
return err
alert = Alert.find_by_id(
alert_id, customers=g.get('customers', None))
if not alert:
return err
else:
alert.from_action(
action, text='status changed via MS Teams webhook')
resp = make_response(
jsonify(status='ok', message='status changed'), 200)
resp.headers['CARD-ACTION-STATUS'] = 'Alert {}d'.format(
action.capitalize())
text = 'alert updated via msteams webhook'
write_audit_trail.send(current_app._get_current_object(), event='webhook-updated', message=text,
user=g.login, customers=g.customers, scopes=g.scopes, resource_id=alert.id,
type='alert', request=request)
elif action == 'blackout':
environment = payload.get('environment', None)
resource = payload.get('resource', None)
event = payload.get('event', None)
if environment and resource and event:
duration = payload.get(
'duration', None) or current_app.config['BLACKOUT_DURATION']
try:
if not duration or float(duration) < 0.0:
# Should not happen: set default duration
duration = 3600
except ValueError:
# Should not happen: set default duration
duration = 3600
blackout = Blackout(
environment, resource=resource, event=event, duration=duration)
blackout.create()
resp = make_response(
jsonify(status='ok', message='blackout created'), 201)
resp.headers['CARD-ACTION-STATUS'] = 'Blackout created for {:.1f} hours'.format(
float(duration) / 3600)
else:
# Missging env, resource or event
resp = make_response(
jsonify(status='error', message='Missing blackout params'), 412)
return resp