Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ElasticSearch Queries in Rule Types #883

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/source/ruletypes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ Rule Configuration Cheat Sheet
+----------------------------------------------------+--------+-----------+-----------+--------+-----------+-------+----------+--------+-----------+
| ``ignore_null`` (boolean, no default) | | | Req | Req | | | | | |
+----------------------------------------------------+--------+-----------+-----------+--------+-----------+-------+----------+--------+-----------+
| ``key_indexed`` (boolean, default False) | | Opt | Opt | | | | | | |
+----------------------------------------------------+--------+-----------+-----------+--------+-----------+-------+----------+--------+-----------+
| ``query_key`` (string, no default) | Opt | | | Req | Opt | Opt | Opt | Req | Opt |
+----------------------------------------------------+--------+-----------+-----------+--------+-----------+-------+----------+--------+-----------+
| ``aggregation_key`` (string, no default) | Opt | | | | | | | | |
Expand Down Expand Up @@ -637,6 +639,9 @@ This rule requires two additional options:

``blacklist``: A list of blacklisted values. The ``compare_key`` term must be equal to one of these values for it to match.

``key_indexed``: If true, Elastalert will craft an ElasticSearch query using the provided blacklist to only return documents that match.
This reduces the amount of documents ElastAlert must verify locally, but requires the data being searched to be adequately indexed/analyzed.

Whitelist
~~~~~~~~~

Expand All @@ -651,6 +656,9 @@ This rule requires three additional options:

``whitelist``: A list of whitelisted values. The ``compare_key`` term must be in this list or else it will match.

``key_indexed``: If true, Elastalert will craft an ElasticSearch query using the provided whitelist to only return documents that do not match.
This reduces the amount of documents ElastAlert must verify locally, but requires the data being searched to be adequately indexed/analyzed.

Change
~~~~~~

Expand Down
11 changes: 7 additions & 4 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def get_index(rule, starttime=None, endtime=None):
return index

@staticmethod
def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field='@timestamp', to_ts_func=dt_to_ts, desc=False):
def get_query(rule_type, filters, starttime=None, endtime=None, sort=True, timestamp_field='@timestamp', to_ts_func=dt_to_ts, desc=False):
""" Returns a query dict that will apply a list of filters, filter by
start and end time, and sort results by timestamp.

Expand All @@ -159,6 +159,9 @@ def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field=
es_filters['filter']['bool']['must'].insert(0, {'range': {timestamp_field: {'gt': starttime,
'lte': endtime}}})
query = {'query': {'filtered': es_filters}}
rule_query = rule_type.generate_query()
if rule_query is not None:
query['query']['filtered']['query'] = rule_query
if sort:
query['sort'] = [{timestamp_field: {'order': 'desc' if desc else 'asc'}}]
return query
Expand Down Expand Up @@ -238,7 +241,7 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False):
:param endtime: The latest time to query.
:return: A list of hits, bounded by rule['max_query_size'].
"""
query = self.get_query(rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], to_ts_func=rule['dt_to_ts'])
query = self.get_query(rule['type'], rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], to_ts_func=rule['dt_to_ts'])
extra_args = {'_source_include': rule['include']}
scroll_keepalive = rule.get('scroll_keepalive', self.scroll_keepalive)
if not rule.get('_source_enabled'):
Expand Down Expand Up @@ -287,7 +290,7 @@ def get_hits_count(self, rule, starttime, endtime, index):
:param endtime: The latest time to query.
:return: A dictionary mapping timestamps to number of hits for that time period.
"""
query = self.get_query(rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'])
query = self.get_query(rule['type'], rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'])

try:
res = self.current_es.count(index=index, doc_type=rule['doc_type'], body=query, ignore_unavailable=True)
Expand All @@ -311,7 +314,7 @@ def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=Non
if rule.get('raw_count_keys', True) and not rule['query_key'].endswith('.raw'):
filter_key = add_raw_postfix(filter_key)
rule_filter.extend([{'term': {filter_key: qk}}])
base_query = self.get_query(rule_filter, starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'])
base_query = self.get_query(rule['type'], rule_filter, starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts'])
if size is None:
size = rule.get('terms_size', 50)
query = self.get_terms_query(base_query, size, key)
Expand Down
32 changes: 32 additions & 0 deletions elastalert/ruletypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ def add_terms_data(self, terms):
:param terms: A list of buckets with a key, corresponding to query_key, and the count """
raise NotImplementedError()

def generate_query(self):
""" Gets called when forming ElasticSearch queries, expects a DSL 'query'.

Gives an opportunity to the rule to search for more specific things, enabling more efficient searches.
"""
return None


class CompareRule(RuleType):
""" A base class for matching a specific term by passing it to a compare function """
Expand All @@ -103,6 +110,19 @@ class BlacklistRule(CompareRule):
""" A CompareRule where the compare function checks a given key against a blacklist """
required_options = frozenset(['compare_key', 'blacklist'])

def generate_query(self):
if self.rules.get('key_indexed', False):
query = {'bool': {'should': [],
'minimum_should_match': 1}}

for blacklist_item in self.rules['blacklist']:
should = {'match': {self.rules['compare_key']: blacklist_item}}
query['bool']['should'].append(should)

return query
else:
return None

def compare(self, event):
term = lookup_es_key(event, self.rules['compare_key'])
if term in self.rules['blacklist']:
Expand All @@ -114,6 +134,18 @@ class WhitelistRule(CompareRule):
""" A CompareRule where the compare function checks a given term against a whitelist """
required_options = frozenset(['compare_key', 'whitelist', 'ignore_null'])

def generate_query(self):
if self.rules.get('key_indexed', False):
query = {'bool': {'must_not': []}}

for whitelist_item in self.rules['whitelist']:
must_not = {'match': {self.rules['compare_key']: whitelist_item}}
query['bool']['must_not'].append(must_not)

return query
else:
return None

def compare(self, event):
term = lookup_es_key(event, self.rules['compare_key'])
if term is None:
Expand Down
2 changes: 2 additions & 0 deletions elastalert/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ oneOf:
type: {enum: [blacklist]}
compare_key: {type: string}
blacklist: {type: array, items: {type: string}}
key_indexed: {type: boolean}

- title: Whitelist
required: [whitelist, compare_key, ignore_null]
Expand All @@ -48,6 +49,7 @@ oneOf:
compare_key: {type: string}
whitelist: {type: array, items: {type: string}}
ignore_null: {type: boolean}
key_indexed: {type: boolean}

- title: Change
required: [query_key, compare_key, ignore_null]
Expand Down
6 changes: 4 additions & 2 deletions elastalert/test_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ def test_file(self, conf, args):
if args.schema_only:
return []

load_modules(conf)

# Set up Elasticsearch client and query
es_client = elasticsearch_client(conf)
start_time = ts_now() - datetime.timedelta(days=args.days)
end_time = ts_now()
ts = conf.get('timestamp_field', '@timestamp')
query = ElastAlerter.get_query(conf['filter'], starttime=start_time, endtime=end_time, timestamp_field=ts)
query = ElastAlerter.get_query(conf['type'], conf['filter'], starttime=start_time, endtime=end_time, timestamp_field=ts)
index = ElastAlerter.get_index(conf, start_time, end_time)

# Get one document for schema
Expand All @@ -72,7 +74,7 @@ def test_file(self, conf, args):
doc_type = res['hits']['hits'][0]['_type']

# Get a count of all docs
count_query = ElastAlerter.get_query(conf['filter'], starttime=start_time, endtime=end_time, timestamp_field=ts, sort=False)
count_query = ElastAlerter.get_query(conf['type'], conf['filter'], starttime=start_time, endtime=end_time, timestamp_field=ts, sort=False)
count_query = {'query': {'filtered': count_query}}
try:
res = es_client.count(index, doc_type=doc_type, body=count_query, ignore_unavailable=True)
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self):
self.get_match_data = lambda x: x
self.get_match_str = lambda x: "some stuff happened"
self.garbage_collect = mock.Mock()
self.generate_query = mock.Mock(return_value=None)


class mock_alert(object):
Expand Down