forked from elastic/detection-rules
-
Notifications
You must be signed in to change notification settings - Fork 0
/
connector.py
224 lines (180 loc) · 8.49 KB
/
connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Wrapper around requests.Session for HTTP requests to Kibana."""
import atexit
import base64
import json
import sys
import threading
import uuid
import requests
from elasticsearch import Elasticsearch
_context = threading.local()
class Kibana(object):
"""Wrapper around the Kibana SIEM APIs."""
CACHED = False
def __init__(self, cloud_id=None, kibana_url=None, verify=True, elasticsearch=None, space=None):
""""Open a session to the platform."""
self.authenticated = False
self.session = requests.Session()
self.session.verify = verify
self.verify = verify
self.cloud_id = cloud_id
self.kibana_url = kibana_url.rstrip('/') if kibana_url else None
self.elastic_url = None
self.space = space if space and space.lower() != 'default' else None
self.status = None
self.provider_name = None
self.provider_type = None
if self.cloud_id:
self.cluster_name, cloud_info = self.cloud_id.split(":")
self.domain, self.es_uuid, self.kibana_uuid = \
base64.b64decode(cloud_info.encode("utf-8")).decode("utf-8").split("$")
if self.domain.endswith(':443'):
self.domain = self.domain[:-4]
kibana_url_from_cloud = f"https://{self.kibana_uuid}.{self.domain}:9243"
if self.kibana_url and self.kibana_url != kibana_url_from_cloud:
raise ValueError(f'kibana_url provided ({self.kibana_url}) does not match url derived from cloud_id '
f'{kibana_url_from_cloud}')
self.kibana_url = kibana_url_from_cloud
self.elastic_url = f"https://{self.es_uuid}.{self.domain}:9243"
self.provider_name = 'cloud-basic'
self.provider_type = 'basic'
self.session.headers.update({'Content-Type': "application/json", "kbn-xsrf": str(uuid.uuid4())})
self.elasticsearch = elasticsearch
if not verify:
from requests.packages.urllib3.exceptions import \
InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
atexit.register(self.__close)
@property
def version(self):
"""Get the semantic version."""
if self.status:
return self.status.get("version", {}).get("number")
def url(self, uri):
"""Get the full URL given a URI."""
assert self.kibana_url is not None
# If a space is defined update the URL accordingly
uri = uri.lstrip('/')
if self.space:
uri = "s/{}/{}".format(self.space, uri)
return f"{self.kibana_url}/{uri}"
def request(self, method, uri, params=None, data=None, error=True, verbose=True, raw=False, **kwargs):
"""Perform a RESTful HTTP request with JSON responses."""
params = params or {}
url = self.url(uri)
params = {k: v for k, v in params.items()}
body = None
if data is not None:
body = json.dumps(data)
response = self.session.request(method, url, params=params, data=body, **kwargs)
if error:
try:
response.raise_for_status()
except requests.exceptions.HTTPError:
if verbose:
print(response.content.decode("utf-8"), file=sys.stderr)
raise
if not response.content:
return
return response.content if raw else response.json()
def get(self, uri, params=None, data=None, error=True, **kwargs):
"""Perform an HTTP GET."""
return self.request('GET', uri, data=data, params=params, error=error, **kwargs)
def put(self, uri, params=None, data=None, error=True, **kwargs):
"""Perform an HTTP PUT."""
return self.request('PUT', uri, params=params, data=data, error=error, **kwargs)
def post(self, uri, params=None, data=None, error=True, **kwargs):
"""Perform an HTTP POST."""
return self.request('POST', uri, params=params, data=data, error=error, **kwargs)
def patch(self, uri, params=None, data=None, error=True, **kwargs):
"""Perform an HTTP PATCH."""
return self.request('PATCH', uri, params=params, data=data, error=error, **kwargs)
def delete(self, uri, params=None, error=True, **kwargs):
"""Perform an HTTP DELETE."""
return self.request('DELETE', uri, params=params, error=error, **kwargs)
def login(self, kibana_username, kibana_password, provider_type=None, provider_name=None):
"""Authenticate to Kibana using the API to update our cookies."""
payload = {'username': kibana_username, 'password': kibana_password}
path = '/internal/security/login'
try:
self.post(path, data=payload, error=True, verbose=False)
except requests.HTTPError as e:
# 7.10 changed the structure of the auth data
# providers dictated by Kibana configs in:
# https://www.elastic.co/guide/en/kibana/current/security-settings-kb.html#authentication-security-settings
# more details: https://discuss.elastic.co/t/kibana-7-10-login-issues/255201/2
if e.response.status_code == 400 and '[undefined]' in e.response.text:
provider_type = provider_type or self.provider_type or 'basic'
provider_name = provider_name or self.provider_name or 'basic'
payload = {
'params': payload,
'currentURL': '',
'providerType': provider_type,
'providerName': provider_name
}
self.post(path, data=payload, error=True)
else:
raise
# Kibana will authenticate against URLs which contain invalid spaces
if self.space:
self.verify_space(self.space)
self.authenticated = True
self.status = self.get("/api/status")
# create ES and force authentication
if self.elasticsearch is None and self.elastic_url is not None:
self.elasticsearch = Elasticsearch(hosts=[self.elastic_url], http_auth=(kibana_username, kibana_password),
verify_certs=self.verify)
self.elasticsearch.info()
# make chaining easier
return self
def add_cookie(self, cookie):
"""Add cookie to be used for auth (such as from an SSO session)."""
# the request to /api/status will also add the cookie to the cookie jar upon a successful response
self.session.headers['cookie'] = cookie
self.status = self.get('/api/status')
self.authenticated = True
def logout(self):
"""Quit the current session."""
try:
self.get('/logout', raw=True, error=False)
except requests.exceptions.ConnectionError:
# for really short scoping from buildup to teardown, ES will cause a Max retry error
pass
self.status = None
self.authenticated = False
self.session = requests.Session()
self.elasticsearch = None
def __close(self):
if self.authenticated:
self.logout()
def __enter__(self):
"""Use the current Kibana instance for ``with`` syntax."""
if not hasattr(_context, "stack"):
_context.stack = []
# Backup the previous Kibana instance and bind the current one
_context.stack.append(self)
return self
def __exit__(self, exception_type, exception_value, traceback):
"""Use the current Kibana for ``with`` syntax."""
_context.stack.pop()
@classmethod
def current(cls) -> 'Kibana':
"""Get the currently used Kibana stack."""
stack = getattr(_context, "stack", [])
if len(stack) == 0:
raise RuntimeError("No Kibana connector in scope!")
return stack[-1]
def verify_space(self, space):
"""Verify a space is valid."""
spaces = self.get('/api/spaces/space')
space_names = [s['name'] for s in spaces]
if space not in space_names:
raise ValueError(f'Unknown Kibana space: {space}')
def current_user(self):
"""Retrieve info for currently authenticated user."""
if self.authenticated:
return self.get('/internal/security/me')