From 4fb9eaebac2ec1bbe8ff8c43ed404be3977706f0 Mon Sep 17 00:00:00 2001 From: Murad Khan Date: Wed, 25 Jan 2017 21:16:36 +0000 Subject: [PATCH 1/3] Update base rule type to allow queries to generate elasticsearch queries to potentially optimize searches. Both the blacklist and whitelist rule types have been updated to take advantage of this enabling them to only receive data that should match their given criteria. --- docs/source/ruletypes.rst | 8 ++++++++ elastalert/elastalert.py | 11 +++++++---- elastalert/ruletypes.py | 38 ++++++++++++++++++++++++++++++++++++++ elastalert/schema.yaml | 2 ++ elastalert/test_rule.py | 6 ++++-- 5 files changed, 59 insertions(+), 6 deletions(-) diff --git a/docs/source/ruletypes.rst b/docs/source/ruletypes.rst index 6e3d3dafa..5a5f1340e 100644 --- a/docs/source/ruletypes.rst +++ b/docs/source/ruletypes.rst @@ -108,6 +108,8 @@ Rule Configuration Cheat Sheet +----------------------------------------------------+--------+-----------+-----------+--------+-----------+-------+----------+--------+-----------+ | ``ignore_null`` (boolean, no default) | | | Req | Req | | | | | | +----------------------------------------------------+--------+-----------+-----------+--------+-----------+-------+----------+--------+-----------+ +| ``key_indexed`` (boolean, default False) | | Opt | Opt | | | | | | | ++----------------------------------------------------+--------+-----------+-----------+--------+-----------+-------+----------+--------+-----------+ | ``query_key`` (string, no default) | Opt | | | Req | Opt | Opt | Opt | Req | Opt | +----------------------------------------------------+--------+-----------+-----------+--------+-----------+-------+----------+--------+-----------+ | ``aggregation_key`` (string, no default) | Opt | | | | | | | | | @@ -637,6 +639,9 @@ This rule requires two additional options: ``blacklist``: A list of blacklisted values. The ``compare_key`` term must be equal to one of these values for it to match. +``key_indexed``: If true, Elastalert will craft an ElasticSearch query using the provided blacklist to only return documents that match. +This reduces the amount of documents ElastAlert must verify locally, but requires the data being searched to be adequately indexed/analyzed. + Whitelist ~~~~~~~~~ @@ -651,6 +656,9 @@ This rule requires three additional options: ``whitelist``: A list of whitelisted values. The ``compare_key`` term must be in this list or else it will match. +``key_indexed``: If true, Elastalert will craft an ElasticSearch query using the provided whitelist to only return documents that do not match. +This reduces the amount of documents ElastAlert must verify locally, but requires the data being searched to be adequately indexed/analyzed. + Change ~~~~~~ diff --git a/elastalert/elastalert.py b/elastalert/elastalert.py index 6d9bcff72..a083a7990 100644 --- a/elastalert/elastalert.py +++ b/elastalert/elastalert.py @@ -141,7 +141,7 @@ def get_index(rule, starttime=None, endtime=None): return index @staticmethod - def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field='@timestamp', to_ts_func=dt_to_ts, desc=False): + def get_query(rule_type, filters, starttime=None, endtime=None, sort=True, timestamp_field='@timestamp', to_ts_func=dt_to_ts, desc=False): """ Returns a query dict that will apply a list of filters, filter by start and end time, and sort results by timestamp. @@ -159,6 +159,9 @@ def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field= es_filters['filter']['bool']['must'].insert(0, {'range': {timestamp_field: {'gt': starttime, 'lte': endtime}}}) query = {'query': {'filtered': es_filters}} + rule_query = rule_type.generate_query() + if rule_query is not None: + query['query']['filtered']['query'] = rule_query if sort: query['sort'] = [{timestamp_field: {'order': 'desc' if desc else 'asc'}}] return query @@ -238,7 +241,7 @@ def get_hits(self, rule, starttime, endtime, index, scroll=False): :param endtime: The latest time to query. :return: A list of hits, bounded by rule['max_query_size']. """ - query = self.get_query(rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], to_ts_func=rule['dt_to_ts']) + query = self.get_query(rule['type'], rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], to_ts_func=rule['dt_to_ts']) extra_args = {'_source_include': rule['include']} scroll_keepalive = rule.get('scroll_keepalive', self.scroll_keepalive) if not rule.get('_source_enabled'): @@ -287,7 +290,7 @@ def get_hits_count(self, rule, starttime, endtime, index): :param endtime: The latest time to query. :return: A dictionary mapping timestamps to number of hits for that time period. """ - query = self.get_query(rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts']) + query = self.get_query(rule['type'], rule['filter'], rule['query'], starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts']) try: res = self.current_es.count(index=index, doc_type=rule['doc_type'], body=query, ignore_unavailable=True) @@ -311,7 +314,7 @@ def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=Non if rule.get('raw_count_keys', True) and not rule['query_key'].endswith('.raw'): filter_key = add_raw_postfix(filter_key) rule_filter.extend([{'term': {filter_key: qk}}]) - base_query = self.get_query(rule_filter, starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts']) + base_query = self.get_query(rule['type'], rule_filter, starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts']) if size is None: size = rule.get('terms_size', 50) query = self.get_terms_query(base_query, size, key) diff --git a/elastalert/ruletypes.py b/elastalert/ruletypes.py index f53d2f4d2..1d86989ff 100644 --- a/elastalert/ruletypes.py +++ b/elastalert/ruletypes.py @@ -83,6 +83,13 @@ def add_terms_data(self, terms): :param terms: A list of buckets with a key, corresponding to query_key, and the count """ raise NotImplementedError() + def generate_query(self): + """ Gets called when forming ElasticSearch queries, expects a DSL 'query'. + + Gives an opportunity to the rule to search for more specific things, enabling more efficient searches. + """ + return None + class CompareRule(RuleType): """ A base class for matching a specific term by passing it to a compare function """ @@ -103,6 +110,22 @@ class BlacklistRule(CompareRule): """ A CompareRule where the compare function checks a given key against a blacklist """ required_options = frozenset(['compare_key', 'blacklist']) + def generate_query(self): + if self.rules.get('key_indexed', False): + query = {'bool': { + 'should': [], + 'minimum_should_match': 1 + } + } + + for blacklist_item in self.rules['blacklist']: + should = {'match': {self.rules['compare_key']: blacklist_item}} + query['bool']['should'].append(should) + + return query + else: + return None + def compare(self, event): term = lookup_es_key(event, self.rules['compare_key']) if term in self.rules['blacklist']: @@ -114,6 +137,21 @@ class WhitelistRule(CompareRule): """ A CompareRule where the compare function checks a given term against a whitelist """ required_options = frozenset(['compare_key', 'whitelist', 'ignore_null']) + def generate_query(self): + if self.rules.get('key_indexed', False): + query = {'bool': { + 'must_not': [], + } + } + + for whitelist_item in self.rules['whitelist']: + must_not = {'match': {self.rules['compare_key']: whitelist_item}} + query['bool']['must_not'].append(must_not) + + return query + else: + return None + def compare(self, event): term = lookup_es_key(event, self.rules['compare_key']) if term is None: diff --git a/elastalert/schema.yaml b/elastalert/schema.yaml index 4aee5a1a0..9f42f929f 100644 --- a/elastalert/schema.yaml +++ b/elastalert/schema.yaml @@ -40,6 +40,7 @@ oneOf: type: {enum: [blacklist]} compare_key: {type: string} blacklist: {type: array, items: {type: string}} + key_indexed: {type: boolean} - title: Whitelist required: [whitelist, compare_key, ignore_null] @@ -48,6 +49,7 @@ oneOf: compare_key: {type: string} whitelist: {type: array, items: {type: string}} ignore_null: {type: boolean} + key_indexed: {type: boolean} - title: Change required: [query_key, compare_key, ignore_null] diff --git a/elastalert/test_rule.py b/elastalert/test_rule.py index e05441a03..6dfb0e9df 100644 --- a/elastalert/test_rule.py +++ b/elastalert/test_rule.py @@ -49,12 +49,14 @@ def test_file(self, conf, args): if args.schema_only: return [] + load_modules(conf) + # Set up Elasticsearch client and query es_client = elasticsearch_client(conf) start_time = ts_now() - datetime.timedelta(days=args.days) end_time = ts_now() ts = conf.get('timestamp_field', '@timestamp') - query = ElastAlerter.get_query(conf['filter'], starttime=start_time, endtime=end_time, timestamp_field=ts) + query = ElastAlerter.get_query(conf['type'], conf['filter'], starttime=start_time, endtime=end_time, timestamp_field=ts) index = ElastAlerter.get_index(conf, start_time, end_time) # Get one document for schema @@ -72,7 +74,7 @@ def test_file(self, conf, args): doc_type = res['hits']['hits'][0]['_type'] # Get a count of all docs - count_query = ElastAlerter.get_query(conf['filter'], starttime=start_time, endtime=end_time, timestamp_field=ts, sort=False) + count_query = ElastAlerter.get_query(conf['type'], conf['filter'], starttime=start_time, endtime=end_time, timestamp_field=ts, sort=False) count_query = {'query': {'filtered': count_query}} try: res = es_client.count(index, doc_type=doc_type, body=count_query, ignore_unavailable=True) From 579549c5a10bbef55460075e67c4e54bb342a2bb Mon Sep 17 00:00:00 2001 From: Murad Khan Date: Wed, 25 Jan 2017 22:06:02 +0000 Subject: [PATCH 2/3] Fix artifact from previous mod --- elastalert/elastalert.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elastalert/elastalert.py b/elastalert/elastalert.py index a083a7990..5038a3fbb 100644 --- a/elastalert/elastalert.py +++ b/elastalert/elastalert.py @@ -290,7 +290,7 @@ def get_hits_count(self, rule, starttime, endtime, index): :param endtime: The latest time to query. :return: A dictionary mapping timestamps to number of hits for that time period. """ - query = self.get_query(rule['type'], rule['filter'], rule['query'], starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts']) + query = self.get_query(rule['type'], rule['filter'], starttime, endtime, timestamp_field=rule['timestamp_field'], sort=False, to_ts_func=rule['dt_to_ts']) try: res = self.current_es.count(index=index, doc_type=rule['doc_type'], body=query, ignore_unavailable=True) From f30a9c50071c29cf23795db1d1b193bf8b77566b Mon Sep 17 00:00:00 2001 From: Murad Khan Date: Wed, 25 Jan 2017 22:19:47 +0000 Subject: [PATCH 3/3] Updated code style to match PEP8. Also added generate_query function call to mock setup which returns None. Might want to add test that checks return syntax ... --- elastalert/ruletypes.py | 12 +++--------- tests/conftest.py | 1 + 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/elastalert/ruletypes.py b/elastalert/ruletypes.py index 1d86989ff..af1689991 100644 --- a/elastalert/ruletypes.py +++ b/elastalert/ruletypes.py @@ -112,11 +112,8 @@ class BlacklistRule(CompareRule): def generate_query(self): if self.rules.get('key_indexed', False): - query = {'bool': { - 'should': [], - 'minimum_should_match': 1 - } - } + query = {'bool': {'should': [], + 'minimum_should_match': 1}} for blacklist_item in self.rules['blacklist']: should = {'match': {self.rules['compare_key']: blacklist_item}} @@ -139,10 +136,7 @@ class WhitelistRule(CompareRule): def generate_query(self): if self.rules.get('key_indexed', False): - query = {'bool': { - 'must_not': [], - } - } + query = {'bool': {'must_not': []}} for whitelist_item in self.rules['whitelist']: must_not = {'match': {self.rules['compare_key']: whitelist_item}} diff --git a/tests/conftest.py b/tests/conftest.py index 0a03b9421..d5c2d8f5a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,6 +29,7 @@ def __init__(self): self.get_match_data = lambda x: x self.get_match_str = lambda x: "some stuff happened" self.garbage_collect = mock.Mock() + self.generate_query = mock.Mock(return_value=None) class mock_alert(object):