Skip to content

Commit

Permalink
Black formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
slincoln-aiq committed Oct 21, 2024
1 parent 48c1716 commit e903945
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 42 deletions.
35 changes: 18 additions & 17 deletions sigma/processing/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ def apply_value(
The type annotation of the val argument is used to skip incompatible values.
"""



class HashesFieldsDetectionItemTransformation(DetectionItemTransformation):
"""
Transforms the 'Hashes' field in Sigma rules by creating separate detection items for each hash type.
Expand All @@ -324,7 +325,9 @@ class HashesFieldsDetectionItemTransformation(DetectionItemTransformation):
FileMD5: '987B65CD9B9F4E9A1AFD8F8B48CF64A7'
"""

def __init__(self, valid_hash_algos: List[str], field_prefix: str = None, drop_algo_prefix: bool = False):
def __init__(
self, valid_hash_algos: List[str], field_prefix: str = None, drop_algo_prefix: bool = False
):
"""
Initializes the HashesDetectionItemTransformation.
Expand All @@ -336,12 +339,7 @@ def __init__(self, valid_hash_algos: List[str], field_prefix: str = None, drop_a
self.valid_hash_algos = valid_hash_algos
self.field_prefix = field_prefix or ""
self.drop_algo_prefix = drop_algo_prefix
self.hash_lengths = {
32: "MD5",
40: "SHA1",
64: "SHA256",
128: "SHA512"
}
self.hash_lengths = {32: "MD5", 40: "SHA1", 64: "SHA256", 128: "SHA512"}

def apply_detection_item(
self, detection_item: SigmaDetectionItem
Expand All @@ -360,15 +358,17 @@ def apply_detection_item(
Exception: If no valid hash algorithms were found in the detection item.
"""
algo_dict = self._parse_hash_values(detection_item.value)

if not algo_dict:
raise Exception(
f"No valid hash algo found in Hashes field. Please use one of the following: {', '.join(self.valid_hash_algos)}"
)

return self._create_new_detection_items(algo_dict)

def _parse_hash_values(self, values: Union[SigmaString, List[SigmaString]]) -> Dict[str, List[str]]:
def _parse_hash_values(
self, values: Union[SigmaString, List[SigmaString]]
) -> Dict[str, List[str]]:
"""
Parses the hash values from the detection item.
Expand All @@ -381,13 +381,13 @@ def _parse_hash_values(self, values: Union[SigmaString, List[SigmaString]]) -> D
algo_dict = defaultdict(list)
if not isinstance(values, list):
values = [values]

for value in values:
hash_algo, hash_value = self._extract_hash_algo_and_value(value.to_plain())
if hash_algo:
field_name = self._get_field_name(hash_algo)
algo_dict[field_name].append(hash_value)

return algo_dict

def _extract_hash_algo_and_value(self, value: str) -> Tuple[str, str]:
Expand All @@ -407,7 +407,7 @@ def _extract_hash_algo_and_value(self, value: str) -> Tuple[str, str]:
else:
hash_value = parts[0]
hash_algo = self._determine_hash_algo_by_length(hash_value)

return (hash_algo, hash_value) if hash_algo in self.valid_hash_algos else ("", hash_value)

def _determine_hash_algo_by_length(self, hash_value: str) -> str:
Expand Down Expand Up @@ -449,11 +449,12 @@ def _create_new_detection_items(self, algo_dict: Dict[str, List[str]]) -> SigmaD
SigmaDetectionItem(
field=k if k != "keyword" else None,
modifiers=[],
value=[SigmaString(x) for x in v]
value=[SigmaString(x) for x in v],
)
for k, v in algo_dict.items() if k
for k, v in algo_dict.items()
if k
],
item_linking=ConditionOR
item_linking=ConditionOR,
)


Expand Down
85 changes: 60 additions & 25 deletions tests/test_processing_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1795,70 +1795,100 @@ def hashes_transformation():
return HashesFieldsDetectionItemTransformation(
valid_hash_algos=["MD5", "SHA1", "SHA256", "SHA512"],
field_prefix="File",
drop_algo_prefix=False
drop_algo_prefix=False,
)


def test_hashes_transformation_single_hash(hashes_transformation):
detection_item = SigmaDetectionItem("Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")])
detection_item = SigmaDetectionItem(
"Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")]
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 1
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[0].value == [SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")]
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]


def test_hashes_transformation_multiple_hashes(hashes_transformation):
detection_item = SigmaDetectionItem("Hashes", [], [
SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"),
SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7")
])
detection_item = SigmaDetectionItem(
"Hashes",
[],
[
SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"),
SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7"),
],
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 2
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[0].value == [SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")]
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]
assert result.detection_items[1].field == "FileMD5"
assert result.detection_items[1].value == [SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7")]
assert result.item_linking == ConditionOR


def test_hashes_transformation_drop_algo_prefix():
transformation = HashesFieldsDetectionItemTransformation(
valid_hash_algos=["MD5", "SHA1", "SHA256", "SHA512"],
field_prefix="File",
drop_algo_prefix=True
drop_algo_prefix=True,
)
detection_item = SigmaDetectionItem(
"Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")]
)
detection_item = SigmaDetectionItem("Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")])
result = transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 1
assert result.detection_items[0].field == "File"
assert result.detection_items[0].value == [SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")]
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]


def test_hashes_transformation_invalid_hash(hashes_transformation):
detection_item = SigmaDetectionItem("Hashes", [], [SigmaString("INVALID=123456")])
with pytest.raises(Exception, match="No valid hash algo found"):
hashes_transformation.apply_detection_item(detection_item)


def test_hashes_transformation_mixed_valid_invalid(hashes_transformation):
detection_item = SigmaDetectionItem("Hashes", [], [
SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"),
SigmaString("INVALID=123456"),
SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7")
])
detection_item = SigmaDetectionItem(
"Hashes",
[],
[
SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"),
SigmaString("INVALID=123456"),
SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7"),
],
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 2
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[0].value == [SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")]
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]
assert result.detection_items[1].field == "FileMD5"
assert result.detection_items[1].value == [SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7")]


def test_hashes_transformation_auto_detect_hash_type(hashes_transformation):
detection_item = SigmaDetectionItem("Hashes", [], [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025"), # SHA1
SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7"), # MD5
SigmaString("A" * 64), # SHA256
SigmaString("B" * 128), # SHA512
])
detection_item = SigmaDetectionItem(
"Hashes",
[],
[
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025"), # SHA1
SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7"), # MD5
SigmaString("A" * 64), # SHA256
SigmaString("B" * 128), # SHA512
],
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 4
Expand All @@ -1867,10 +1897,15 @@ def test_hashes_transformation_auto_detect_hash_type(hashes_transformation):
assert result.detection_items[2].field == "FileSHA256"
assert result.detection_items[3].field == "FileSHA512"


def test_hashes_transformation_pipe_separator(hashes_transformation):
detection_item = SigmaDetectionItem("Hashes", [], [SigmaString("SHA1|5F1CBC3D99558307BC1250D084FA968521482025")])
detection_item = SigmaDetectionItem(
"Hashes", [], [SigmaString("SHA1|5F1CBC3D99558307BC1250D084FA968521482025")]
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 1
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[0].value == [SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")]
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]

0 comments on commit e903945

Please sign in to comment.