From e903945c2bd3bd3e531f0198e45510545401c9ff Mon Sep 17 00:00:00 2001 From: Stephen Lincoln Date: Mon, 21 Oct 2024 11:12:23 -0400 Subject: [PATCH] Black formatting --- sigma/processing/transformations.py | 35 +++++----- tests/test_processing_transformations.py | 85 +++++++++++++++++------- 2 files changed, 78 insertions(+), 42 deletions(-) diff --git a/sigma/processing/transformations.py b/sigma/processing/transformations.py index 36f7414d..c30a4d22 100644 --- a/sigma/processing/transformations.py +++ b/sigma/processing/transformations.py @@ -299,7 +299,8 @@ def apply_value( The type annotation of the val argument is used to skip incompatible values. """ - + + class HashesFieldsDetectionItemTransformation(DetectionItemTransformation): """ Transforms the 'Hashes' field in Sigma rules by creating separate detection items for each hash type. @@ -324,7 +325,9 @@ class HashesFieldsDetectionItemTransformation(DetectionItemTransformation): FileMD5: '987B65CD9B9F4E9A1AFD8F8B48CF64A7' """ - def __init__(self, valid_hash_algos: List[str], field_prefix: str = None, drop_algo_prefix: bool = False): + def __init__( + self, valid_hash_algos: List[str], field_prefix: str = None, drop_algo_prefix: bool = False + ): """ Initializes the HashesDetectionItemTransformation. @@ -336,12 +339,7 @@ def __init__(self, valid_hash_algos: List[str], field_prefix: str = None, drop_a self.valid_hash_algos = valid_hash_algos self.field_prefix = field_prefix or "" self.drop_algo_prefix = drop_algo_prefix - self.hash_lengths = { - 32: "MD5", - 40: "SHA1", - 64: "SHA256", - 128: "SHA512" - } + self.hash_lengths = {32: "MD5", 40: "SHA1", 64: "SHA256", 128: "SHA512"} def apply_detection_item( self, detection_item: SigmaDetectionItem @@ -360,15 +358,17 @@ def apply_detection_item( Exception: If no valid hash algorithms were found in the detection item. """ algo_dict = self._parse_hash_values(detection_item.value) - + if not algo_dict: raise Exception( f"No valid hash algo found in Hashes field. Please use one of the following: {', '.join(self.valid_hash_algos)}" ) - + return self._create_new_detection_items(algo_dict) - def _parse_hash_values(self, values: Union[SigmaString, List[SigmaString]]) -> Dict[str, List[str]]: + def _parse_hash_values( + self, values: Union[SigmaString, List[SigmaString]] + ) -> Dict[str, List[str]]: """ Parses the hash values from the detection item. @@ -381,13 +381,13 @@ def _parse_hash_values(self, values: Union[SigmaString, List[SigmaString]]) -> D algo_dict = defaultdict(list) if not isinstance(values, list): values = [values] - + for value in values: hash_algo, hash_value = self._extract_hash_algo_and_value(value.to_plain()) if hash_algo: field_name = self._get_field_name(hash_algo) algo_dict[field_name].append(hash_value) - + return algo_dict def _extract_hash_algo_and_value(self, value: str) -> Tuple[str, str]: @@ -407,7 +407,7 @@ def _extract_hash_algo_and_value(self, value: str) -> Tuple[str, str]: else: hash_value = parts[0] hash_algo = self._determine_hash_algo_by_length(hash_value) - + return (hash_algo, hash_value) if hash_algo in self.valid_hash_algos else ("", hash_value) def _determine_hash_algo_by_length(self, hash_value: str) -> str: @@ -449,11 +449,12 @@ def _create_new_detection_items(self, algo_dict: Dict[str, List[str]]) -> SigmaD SigmaDetectionItem( field=k if k != "keyword" else None, modifiers=[], - value=[SigmaString(x) for x in v] + value=[SigmaString(x) for x in v], ) - for k, v in algo_dict.items() if k + for k, v in algo_dict.items() + if k ], - item_linking=ConditionOR + item_linking=ConditionOR, ) diff --git a/tests/test_processing_transformations.py b/tests/test_processing_transformations.py index d82b77f6..fc0a4330 100644 --- a/tests/test_processing_transformations.py +++ b/tests/test_processing_transformations.py @@ -1795,70 +1795,100 @@ def hashes_transformation(): return HashesFieldsDetectionItemTransformation( valid_hash_algos=["MD5", "SHA1", "SHA256", "SHA512"], field_prefix="File", - drop_algo_prefix=False + drop_algo_prefix=False, ) + def test_hashes_transformation_single_hash(hashes_transformation): - detection_item = SigmaDetectionItem("Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")]) + detection_item = SigmaDetectionItem( + "Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")] + ) result = hashes_transformation.apply_detection_item(detection_item) assert isinstance(result, SigmaDetection) assert len(result.detection_items) == 1 assert result.detection_items[0].field == "FileSHA1" - assert result.detection_items[0].value == [SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")] + assert result.detection_items[0].value == [ + SigmaString("5F1CBC3D99558307BC1250D084FA968521482025") + ] + def test_hashes_transformation_multiple_hashes(hashes_transformation): - detection_item = SigmaDetectionItem("Hashes", [], [ - SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"), - SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7") - ]) + detection_item = SigmaDetectionItem( + "Hashes", + [], + [ + SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"), + SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7"), + ], + ) result = hashes_transformation.apply_detection_item(detection_item) assert isinstance(result, SigmaDetection) assert len(result.detection_items) == 2 assert result.detection_items[0].field == "FileSHA1" - assert result.detection_items[0].value == [SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")] + assert result.detection_items[0].value == [ + SigmaString("5F1CBC3D99558307BC1250D084FA968521482025") + ] assert result.detection_items[1].field == "FileMD5" assert result.detection_items[1].value == [SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7")] assert result.item_linking == ConditionOR + def test_hashes_transformation_drop_algo_prefix(): transformation = HashesFieldsDetectionItemTransformation( valid_hash_algos=["MD5", "SHA1", "SHA256", "SHA512"], field_prefix="File", - drop_algo_prefix=True + drop_algo_prefix=True, + ) + detection_item = SigmaDetectionItem( + "Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")] ) - detection_item = SigmaDetectionItem("Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")]) result = transformation.apply_detection_item(detection_item) assert isinstance(result, SigmaDetection) assert len(result.detection_items) == 1 assert result.detection_items[0].field == "File" - assert result.detection_items[0].value == [SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")] + assert result.detection_items[0].value == [ + SigmaString("5F1CBC3D99558307BC1250D084FA968521482025") + ] + def test_hashes_transformation_invalid_hash(hashes_transformation): detection_item = SigmaDetectionItem("Hashes", [], [SigmaString("INVALID=123456")]) with pytest.raises(Exception, match="No valid hash algo found"): hashes_transformation.apply_detection_item(detection_item) + def test_hashes_transformation_mixed_valid_invalid(hashes_transformation): - detection_item = SigmaDetectionItem("Hashes", [], [ - SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"), - SigmaString("INVALID=123456"), - SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7") - ]) + detection_item = SigmaDetectionItem( + "Hashes", + [], + [ + SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"), + SigmaString("INVALID=123456"), + SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7"), + ], + ) result = hashes_transformation.apply_detection_item(detection_item) assert isinstance(result, SigmaDetection) assert len(result.detection_items) == 2 assert result.detection_items[0].field == "FileSHA1" - assert result.detection_items[0].value == [SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")] + assert result.detection_items[0].value == [ + SigmaString("5F1CBC3D99558307BC1250D084FA968521482025") + ] assert result.detection_items[1].field == "FileMD5" assert result.detection_items[1].value == [SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7")] + def test_hashes_transformation_auto_detect_hash_type(hashes_transformation): - detection_item = SigmaDetectionItem("Hashes", [], [ - SigmaString("5F1CBC3D99558307BC1250D084FA968521482025"), # SHA1 - SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7"), # MD5 - SigmaString("A" * 64), # SHA256 - SigmaString("B" * 128), # SHA512 - ]) + detection_item = SigmaDetectionItem( + "Hashes", + [], + [ + SigmaString("5F1CBC3D99558307BC1250D084FA968521482025"), # SHA1 + SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7"), # MD5 + SigmaString("A" * 64), # SHA256 + SigmaString("B" * 128), # SHA512 + ], + ) result = hashes_transformation.apply_detection_item(detection_item) assert isinstance(result, SigmaDetection) assert len(result.detection_items) == 4 @@ -1867,10 +1897,15 @@ def test_hashes_transformation_auto_detect_hash_type(hashes_transformation): assert result.detection_items[2].field == "FileSHA256" assert result.detection_items[3].field == "FileSHA512" + def test_hashes_transformation_pipe_separator(hashes_transformation): - detection_item = SigmaDetectionItem("Hashes", [], [SigmaString("SHA1|5F1CBC3D99558307BC1250D084FA968521482025")]) + detection_item = SigmaDetectionItem( + "Hashes", [], [SigmaString("SHA1|5F1CBC3D99558307BC1250D084FA968521482025")] + ) result = hashes_transformation.apply_detection_item(detection_item) assert isinstance(result, SigmaDetection) assert len(result.detection_items) == 1 assert result.detection_items[0].field == "FileSHA1" - assert result.detection_items[0].value == [SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")] \ No newline at end of file + assert result.detection_items[0].value == [ + SigmaString("5F1CBC3D99558307BC1250D084FA968521482025") + ]