diff --git a/timesketch/lib/analyzers/yetiindicators.py b/timesketch/lib/analyzers/yetiindicators.py index 843cf5eeb7..147c131597 100644 --- a/timesketch/lib/analyzers/yetiindicators.py +++ b/timesketch/lib/analyzers/yetiindicators.py @@ -3,7 +3,7 @@ import json import re -from typing import Dict, List +from typing import Dict, List, Union from flask import current_app import requests @@ -13,15 +13,51 @@ from timesketch.lib import emojis -class YetiIndicators(interface.BaseAnalyzer): - """Analyzer for Yeti threat intel indicators.""" +TYPE_TO_EMOJI = { + "malware": "SPIDER", + "threat-actor": "SKULL", + "intrusion-set": "SKULL", + "tool": "HAMMER", + "investigation": "FIRE", + "campaign": "BOMB", + "vulnerability": "SHIELD", + "attack-pattern": "HIGH_VOLTAGE", +} - NAME = "yetiindicators" - DISPLAY_NAME = "Yeti CTI indicators" - DESCRIPTION = "Mark events using CTI indicators from Yeti" +NEIGHBOR_CACHE = {} + +HIGH_SEVERITY_TYPES = { + "malware", + "threat-actor", + "intrusion-set", + "campaign", + "vulnerability", +} + + +def slugify(text: str) -> str: + """Converts a string to a slug.""" + text = text.lower() + text = re.sub(r"[^a-z0-9]", "-", text) + text = re.sub(r"-+", "-", text) + return text + + +class YetiBaseAnalyzer(interface.BaseAnalyzer): + """Base class for Yeti indicator analyzers.""" DEPENDENCIES = frozenset(["domain"]) + # Entities with these tags will be fetched from Yeti + _TAG_SELECTOR: List[str] = [] + # Entities of this type will be fetched from Yeti + _TYPE_SELECTOR: Union[str, None] = None + # Graph will be traversed from the entities looking for these types + # of neighbors + _TARGET_NEIGHBOR_TYPE: List[str] = [] + # If True, will save intelligence to the sketch + _SAVE_INTELLIGENCE: bool = False + def __init__(self, index_name, sketch_id, timeline_id=None): """Initialize the Analyzer. @@ -38,11 +74,15 @@ def __init__(self, index_name, sketch_id, timeline_id=None): self.yeti_api_root = root self.yeti_web_root = root.replace("/api/v2", "") self.yeti_api_key = current_app.config.get("YETI_API_KEY") + self._yeti_session = requests.Session() tls_cert = current_app.config.get("YETI_TLS_CERTIFICATE") if tls_cert and self.yeti_web_root.startswith("https://"): self._yeti_session.verify = tls_cert + self._intelligence_refs = set() + self._intelligence_attribute = {"data": []} + @property def authenticated_session(self) -> requests.Session: """Returns a requests.Session object with an authenticated Yeti session. @@ -69,56 +109,87 @@ def authenticate_session(self) -> None: access_token = response.json()["access_token"] self._yeti_session.headers.update({"authorization": f"Bearer {access_token}"}) - def get_neighbors(self, yeti_object: Dict) -> List[Dict]: + def _get_neighbors_request(self, params): + """Simple wrapper around requests call to make testing easier.""" + results = self.authenticated_session.post( + f"{self.yeti_api_root}/graph/search", + json=params, + ) + if results.status_code != 200: + raise RuntimeError( + f"Error {results.status_code} retrieving neighbors for " + f"{params['source']} from Yeti:" + str(results.json()) + ) + return results.json() + + def get_neighbors( + self, yeti_object: Dict, max_hops: int, neighbor_types: List[str] + ) -> List[Dict]: """Retrieves a list of neighbors associated to a given entity. Args: yeti_object: The Yeti object to get neighbors from. + max_hops: The number of hops to traverse from the entity. + neighbor_types: A list of Yeti object types to filter by. Returns: - A list of JSON objects describing a Yeti entity. + A list of dictionaries describing a Yeti object. """ + if yeti_object["id"] in NEIGHBOR_CACHE: + return NEIGHBOR_CACHE[yeti_object["id"]] + extended_id = f"{yeti_object['root_type']}/{yeti_object['id']}" - results = self.authenticated_session.post( - f"{self.yeti_api_root}/graph/search", - json={ + results = self._get_neighbors_request( + { + "count": 0, "source": extended_id, "graph": "links", - "hops": 1, - "direction": "any", + "min_hops": 1, + "max_hops": max_hops, + "direction": "outbound", "include_original": False, - }, + "target_types": neighbor_types, + } ) - if results.status_code != 200: - return [] - neighbors = [] - for neighbor in results.json().get("vertices", {}).values(): - if neighbor["root_type"] == "entity": - neighbors.append(neighbor) + neighbors = {} + for neighbor in results.get("vertices", {}).values(): + # Yeti will return all vertices in the graph's path, not just + # the ones that are fo type target_types. We still want these + # in the cache. + if neighbor["type"] in neighbor_types: + neighbors[neighbor["id"]] = neighbor + NEIGHBOR_CACHE[yeti_object["id"]] = neighbors return neighbors - def get_indicators(self, indicator_type: str) -> Dict[str, dict]: - """Populates the intel attribute with entities from Yeti. - - Returns: - A dictionary of indicators obtained from yeti, keyed by indicator - ID. - """ - response = self.authenticated_session.post( - self.yeti_api_root + "/indicators/search", - json={"query": {"name": ""}, "type": indicator_type}, + def _get_entities_request(self, params): + """Simple wrapper around requests call to make testing easier.""" + results = self.authenticated_session.post( + f"{self.yeti_api_root}/entities/search", + json=params, ) - data = response.json() - if response.status_code != 200: + if results.status_code != 200: raise RuntimeError( - f"Error {response.status_code} retrieving indicators from Yeti:" - + str(data) + f"Error {results.status_code} retrieving entities from Yeti:" + + str(results.json()) ) - indicators = {item["id"]: item for item in data["indicators"]} - for _id, indicator in indicators.items(): - indicator["compiled_regexp"] = re.compile(indicator["pattern"]) - indicators[_id] = indicator - return indicators + return results.json() + + def get_entities(self, _type: str, tags: List[str]) -> Dict[str, dict]: + """Fetches Entities with a certain tag on Yeti. + + Args: + _type: Search entities of this type + tags: A list of tags to filter entities with. + + Returns: + A dictionary of entities obtained from Yeti, keyed by entity ID. + """ + query = {"name": "", "tags": tags} + if _type: + query["type"] = _type + data = self._get_entities_request({"query": query, "count": 0}) + entities = {item["id"]: item for item in data["entities"]} + return entities def mark_event( self, indicator: Dict, event: interface.Event, neighbors: List[Dict] @@ -131,26 +202,69 @@ def mark_event( event: a Timesketch sketch Event object. neighbors: a list of Yeti entities related to the indicator. """ - event.add_emojis([emojis.get_emoji("SKULL")]) - tags = [] - for n in neighbors: - slug = re.sub(r"[^a-z0-9]", "-", n["name"].lower()) - slug = re.sub(r"-+", "-", slug) - tags.append(slug) - for tag in indicator["relevant_tags"]: - slug = re.sub(r"[^a-z0-9]", "-", tag.lower()) - slug = re.sub(r"-+", "-", slug) - tags.append(slug) - event.add_tags(tags) + tags = {slugify(tag) for tag in indicator["relevant_tags"]} + for neighbor in neighbors: + tags.add(slugify(neighbor["name"])) + emoji_name = TYPE_TO_EMOJI[neighbor["type"]] + event.add_emojis([emojis.get_emoji(emoji_name)]) + + event.add_tags(list(tags)) event.commit() - msg = f'Indicator match: "{indicator["name"]}" ({indicator["id"]})\n' - msg += f'Related entities: {[n["name"] for n in neighbors]}' - comments = {c.comment for c in event.get_comments()} + msg = f'Indicator match: "{indicator["name"]}" (ID: {indicator["id"]})\n' + if neighbors: + msg += f'Related entities: {[neighbor["name"] for neighbor in neighbors]}' + + comments = {comment.comment for comment in event.get_comments()} if msg not in comments: event.add_comment(msg) event.commit() + def add_intelligence_entry(self, indicator, event, entity): + uri = f"{self.yeti_web_root}/indicators/{indicator['id']}" + + if "compiled_regexp" not in indicator: + indicator["compiled_regexp"] = re.compile(indicator["pattern"]) + regexp = indicator["compiled_regexp"] + match = regexp.search(event.source.get("message")) + if match: + match_in_sketch = match.group(0) + if not match: + return + + if (match_in_sketch, uri) in self._intelligence_refs: + return + + intel = { + "externalURI": uri, + "ioc": match_in_sketch, + "tags": indicator["relevant_tags"] + [entity["name"]], + "type": "other", + } + + self._intelligence_attribute["data"].append(intel) + self._intelligence_refs.add((match_in_sketch, uri)) + + def get_intelligence_attribute(self): + try: + self._intelligence_attribute = self.sketch.get_sketch_attributes( + "intelligence" + ) + self._intelligence_refs = { + (ioc["ioc"], ioc["externalURI"]) + for ioc in self._intelligence_attribute["data"] + } + except ValueError: + print("Intelligence not set on sketch, will be created from scratch.") + + def save_intelligence(self): + self.sketch.add_sketch_attribute( + "intelligence", + [json.dumps(self._intelligence_attribute)], + ontology="intelligence", + overwrite=True, + ) + def run(self): """Entry point for the analyzer. @@ -160,66 +274,49 @@ def run(self): if not self.yeti_api_root or not self.yeti_api_key: return "No Yeti configuration settings found, aborting." - indicators = self.get_indicators("regex") - - entities_found = set() total_matches = 0 - new_indicators = set() + total_processed = 0 + entities_found = set() + matching_indicators = set() + priority = "NOTE" - intelligence_attribute = {"data": []} - existing_refs = set() + if self._SAVE_INTELLIGENCE: + self.get_intelligence_attribute() - try: - intelligence_attribute = self.sketch.get_sketch_attributes("intelligence") - existing_refs = { - (ioc["ioc"], ioc["externalURI"]) - for ioc in intelligence_attribute["data"] - } - except ValueError: - print("Intelligence not set on sketch, will create from scratch.") + entities = self.get_entities(_type=self._TYPE_SELECTOR, tags=self._TAG_SELECTOR) + for entity in entities.values(): + indicators = self.get_neighbors( + entity, max_hops=5, neighbor_types=self._TARGET_NEIGHBOR_TYPE + ) + for indicator in indicators.values(): + query_dsl = { + "query": { + "regexp": { + "message.keyword": ".*" + indicator["pattern"] + ".*" + } + } + } + events = self.event_stream( + query_dsl=query_dsl, return_fields=["message"], scroll=False + ) - intelligence_items = [] + for event in events: + total_matches += 1 + self.mark_event(indicator, event, [entity]) + matching_indicators.add(indicator["id"]) + if entity["type"] in HIGH_SEVERITY_TYPES: + priority = "HIGH" + if self._SAVE_INTELLIGENCE: + self.add_intelligence_entry(indicator, event, entity) - for indicator in indicators.values(): - query_dsl = { - "query": { - "regexp": {"message.keyword": ".*" + indicator["pattern"] + ".*"} - } - } + entities_found.add(f"{entity['name']}:{entity['type']}") - events = self.event_stream(query_dsl=query_dsl, return_fields=["message"]) - neighbors = self.get_neighbors(indicator) - for n in neighbors: - if n["root_type"] == "entity": - entities_found.add(f"{n['name']}:{n['type']}") - - uri = f"{self.yeti_web_root}/indicators/{indicator['id']}" - - for event in events: - total_matches += 1 - self.mark_event(indicator, event, neighbors) - - regex = indicator["compiled_regexp"] - match = regex.search(event.source.get("message")) - if match: - match_in_sketch = match.group() - else: - match_in_sketch = indicator["pattern"] - - intel = { - "externalURI": uri, - "ioc": match_in_sketch, - "tags": [n["name"] for n in neighbors], - "type": "other", - } - if (match_in_sketch, uri) not in existing_refs: - intelligence_items.append(intel) - existing_refs.add((match_in_sketch, uri)) - new_indicators.add(indicator["id"]) + total_processed += 1 + + self.output.result_status = "SUCCESS" + self.output.result_priority = priority if not total_matches: - self.output.result_status = "SUCCESS" - self.output.result_priority = "NOTE" note = "No indicators were found in the timeline." self.output.result_summary = note return str(self.output) @@ -232,23 +329,57 @@ def run(self): query_string=f'tag:"{name}"', ) - all_iocs = intelligence_attribute["data"] + intelligence_items - self.sketch.add_sketch_attribute( - "intelligence", - [json.dumps({"data": all_iocs})], - ontology="intelligence", - overwrite=True, - ) + if self._SAVE_INTELLIGENCE: + self.save_intelligence() success_note = ( - f"{total_matches} events matched {len(new_indicators)} " - f"new indicators. Found: {', '.join(entities_found)}" + f"{total_matches} events matched {len(matching_indicators)} " + f"indicators (out of {total_processed} processed).\n\n" + f"Entities found: {', '.join(entities_found)}" ) - self.output.result_status = "SUCCESS" - self.output.result_priority = "HIGH" self.output.result_summary = success_note return str(self.output) -manager.AnalysisManager.register_analyzer(YetiIndicators) +class YetiTriageIndicators(YetiBaseAnalyzer): + """Analyzer for Yeti triage indicators.""" + + NAME = "yetitriageindicators" + DISPLAY_NAME = "Yeti forensics triage indicators" + DESCRIPTION = ( + "Mark triage events using forensics indicators from Yeti. Will fetch" + ' all attack-patterns tagged with the "triage" tag, and traverse the' + " graph searching for regex indicators." + ) + + DEPENDENCIES = frozenset(["domain"]) + + _TAG_SELECTOR = ["triage"] + _TYPE_SELECTOR = "attack-pattern" + _TARGET_NEIGHBOR_TYPE = ["regex"] + _SAVE_INTELLIGENCE = False + + +class YetiMalwareIndicators(YetiBaseAnalyzer): + """Analyzer for Yeti malware indicators.""" + + NAME = "yetimalwareindicators" + DISPLAY_NAME = "Yeti CTI malware indicators" + DESCRIPTION = ( + "Mark malware-related events using forensic indicators from " + "Yeti. Will fetch all malware entities and traverse the" + " graph searching for regex indicators, and save matches to the " + " sketch's intelligence attribute." + ) + + DEPENDENCIES = frozenset(["domain"]) + + _TAG_SELECTOR = [] + _TYPE_SELECTOR = "malware" + _TARGET_NEIGHBOR_TYPE = ["regex"] + _SAVE_INTELLIGENCE = True + + +manager.AnalysisManager.register_analyzer(YetiTriageIndicators) +manager.AnalysisManager.register_analyzer(YetiMalwareIndicators) diff --git a/timesketch/lib/analyzers/yetiindicators_test.py b/timesketch/lib/analyzers/yetiindicators_test.py index 653aced605..967a02ac52 100644 --- a/timesketch/lib/analyzers/yetiindicators_test.py +++ b/timesketch/lib/analyzers/yetiindicators_test.py @@ -1,7 +1,6 @@ """Tests for ThreatintelPlugin.""" import json -import re import mock from flask import current_app @@ -10,28 +9,63 @@ from timesketch.lib.testlib import BaseTest from timesketch.lib.testlib import MockDataStore -MOCK_YETI_INTEL = { - "12345": { - "id": "12345", - "name": "Random regex", - "pattern": "[0-9a-f]", - "relevant_tags": ["relevant-tag-1"], - "compiled_regexp": re.compile(r"[0-9a-f]+\.com"), - "type": "regex", - "root_type": "indicator", - } + +MOCK_YETI_ENTITY_REQUEST = { + "entities": [ + { + "type": "malware", + "name": "xmrig", + "description": "coin miner", + "created": "2024-02-16T12:10:48.670039Z", + "modified": "2024-02-16T12:10:48.670040Z", + "kill_chain_phases": [], + "aliases": [], + "family": "miner", + "id": "2152646", + "tags": {}, + "root_type": "entity", + } + ], + "total": 1, +} + +MOCK_YETI_NEIGHBORS_RESPONSE = { + "vertices": { + "indicators/2152802": { + "name": "typo'd dhcpd", + "type": "regex", + "description": "Random description", + "created": "2024-02-16T12:12:14.564723Z", + "modified": "2024-02-16T12:12:14.564729Z", + "valid_from": "2024-02-16T12:12:14.564730Z", + "valid_until": "2024-03-17T12:12:14.564758Z", + "pattern": "/usr/bin/dhpcd", + "location": "filesystem", + "diamond": "victim", + "kill_chain_phases": [], + "relevant_tags": ["xmrig", "malware"], + "id": "2152802", + "root_type": "indicator", + } + }, + "paths": [ + [ + { + "source": "entities/2152646", + "target": "indicators/2152802", + "type": "dropped in", + "description": "", + "created": "2024-02-16T12:28:52.731740Z", + "modified": "2024-02-16T12:28:52.731747Z", + "id": "2153330", + } + ] + ], + "total": 1, } -MOCK_YETI_NEIGHBORS = [ - { - "id": "98765", - "name": "Bad malware", - "type": "malware", - "root_type": "entity", - } -] -MATCHING_DOMAIN_MESSAGE = { +MATCHING_PATH_MESSAGE = { "__ts_timeline_id": 1, "es_index": "", "es_id": "", @@ -41,7 +75,7 @@ "datetime": "2014-09-16T19:23:40+00:00", "source_short": "", "source_long": "", - "message": "c0ffeebabe.com", + "message": "/usr/bin/dhpcd", } @@ -56,63 +90,73 @@ def setUp(self): # Mock the OpenSearch datastore. @mock.patch("timesketch.lib.analyzers.interface.OpenSearchDataStore", MockDataStore) @mock.patch( - "timesketch.lib.analyzers.yetiindicators." "YetiIndicators.get_neighbors" + "timesketch.lib.analyzers.yetiindicators." + "YetiBaseAnalyzer._get_neighbors_request" ) @mock.patch( - "timesketch.lib.analyzers.yetiindicators." "YetiIndicators.get_indicators" + "timesketch.lib.analyzers.yetiindicators." + "YetiBaseAnalyzer._get_entities_request" ) - def test_indicator_match(self, mock_get_indicators, mock_get_neighbors): + def test_indicator_match(self, mock_get_entities, mock_get_neighbors): """Test that ES queries for indicators are correctly built.""" - analyzer = yetiindicators.YetiIndicators("test_index", 1, 123) + analyzer = yetiindicators.YetiMalwareIndicators("test_index", 1, 123) analyzer.datastore.client = mock.Mock() - mock_get_indicators.return_value = MOCK_YETI_INTEL - mock_get_neighbors.return_value = MOCK_YETI_NEIGHBORS + mock_get_entities.return_value = MOCK_YETI_ENTITY_REQUEST + mock_get_neighbors.return_value = MOCK_YETI_NEIGHBORS_RESPONSE - analyzer.datastore.import_event("test_index", MATCHING_DOMAIN_MESSAGE, "0") + analyzer.datastore.import_event("test_index", MATCHING_PATH_MESSAGE, "0") message = json.loads(analyzer.run()) self.assertEqual( message["result_summary"], - "1 events matched 1 new indicators. Found: Bad malware:malware", + ( + "1 events matched 1 indicators (out of 1 processed).\n\n" + "Entities found: xmrig:malware" + ), ) - mock_get_indicators.assert_called_once() + mock_get_entities.assert_called_once() mock_get_neighbors.assert_called_once() self.assertEqual( sorted(analyzer.tagged_events["0"]["tags"]), - sorted(["bad-malware", "relevant-tag-1"]), + sorted(["malware", "xmrig"]), ) # Mock the OpenSearch datastore. @mock.patch("timesketch.lib.analyzers.interface.OpenSearchDataStore", MockDataStore) @mock.patch( - "timesketch.lib.analyzers.yetiindicators." "YetiIndicators.get_neighbors" + "timesketch.lib.analyzers.yetiindicators." + "YetiBaseAnalyzer._get_neighbors_request" ) @mock.patch( - "timesketch.lib.analyzers.yetiindicators." "YetiIndicators.get_indicators" + "timesketch.lib.analyzers.yetiindicators." + "YetiBaseAnalyzer._get_entities_request" ) - def test_indicator_nomatch(self, mock_get_indicators, mock_get_neighbors): + def test_indicator_nomatch(self, mock_get_entities, mock_get_neighbors): """Test that ES queries for indicators are correctly built.""" - analyzer = yetiindicators.YetiIndicators("test_index", 1, 123) + analyzer = yetiindicators.YetiMalwareIndicators("test_index", 1, 123) analyzer.datastore.client = mock.Mock() - mock_get_indicators.return_value = MOCK_YETI_INTEL - mock_get_neighbors.return_value = MOCK_YETI_NEIGHBORS + mock_get_entities.return_value = MOCK_YETI_ENTITY_REQUEST + mock_get_neighbors.return_value = MOCK_YETI_NEIGHBORS_RESPONSE message = json.loads(analyzer.run()) self.assertEqual( message["result_summary"], "No indicators were found in the timeline." ) - mock_get_indicators.assert_called_once() + mock_get_entities.assert_called_once() mock_get_neighbors.asset_called_once() @mock.patch("timesketch.lib.analyzers.interface.OpenSearchDataStore", MockDataStore) def test_slug(self): - analyzer = yetiindicators.YetiIndicators("test_index", 1, 123) + analyzer = yetiindicators.YetiMalwareIndicators("test_index", 1, 123) mock_event = mock.Mock() mock_event.get_comments.return_value = [] analyzer.mark_event( - MOCK_YETI_INTEL["12345"], + MOCK_YETI_NEIGHBORS_RESPONSE["vertices"]["indicators/2152802"], mock_event, - MOCK_YETI_NEIGHBORS, + MOCK_YETI_ENTITY_REQUEST["entities"], + ) + mock_event.add_tags.assert_called_once() + self.assertIn( + sorted(["xmrig", "malware"]), + [sorted(x) for x in mock_event.add_tags.call_args[0]], ) - # The name of the entity is "Random incident" - mock_event.add_tags.assert_called_once_with(["bad-malware", "relevant-tag-1"]) diff --git a/timesketch/lib/emojis.py b/timesketch/lib/emojis.py index 6234b8a617..8454158857 100644 --- a/timesketch/lib/emojis.py +++ b/timesketch/lib/emojis.py @@ -26,9 +26,13 @@ EMOJI_MAP = { "BUCKET": emoji("🪣", "Storage bucket"), + "BOMB": emoji("💣", "Malicious campaign activity"), "CAMERA": emoji("📷", "Screenshot activity"), + "HIGH_VOLTAGE": emoji("⚡", "Attack pattern"), "FISHING_POLE": emoji("🎣", "Phishing"), + "FIRE": emoji("🔥", "Investigation activity"), "GLOBE": emoji("🌏", "The world"), + "HAMMER": emoji("🔨", "Tool match"), "ID_BUTTON": emoji("🆔", "Account ID"), "LINK": emoji("🔗", "Events Linked"), "LOCK": emoji("🔒", "Logon activity"), @@ -38,10 +42,12 @@ "RAT": emoji("🐀", "Remote access"), "SATELLITE": emoji("📡", "Domain activity"), "SCREEN": emoji("🖵", "Screensaver activity"), + "SHIELD": emoji("🛡", "Vulnerability"), "SKULL_CROSSBONE": emoji("☠", "Suspicious entry"), - "SKULL": emoji("💀", "Threat intel match"), + "SKULL": emoji("💀", "Threat intel match"), "SLEEPING_FACE": emoji("😴", "Activity outside of regular hours"), "SPARKLES": emoji("✨", "New entity created"), + "SPIDER": emoji("🕷", "Malware activity"), "UNLOCK": emoji("🔓", "Logoff activity"), "VALIDATE": emoji("✅", "Validate"), "WASTEBASKET": emoji("🗑", "Deletion activity"),