diff --git a/sigma/conversion/base.py b/sigma/conversion/base.py index 5de71179..c55e0ec3 100644 --- a/sigma/conversion/base.py +++ b/sigma/conversion/base.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod from collections import ChainMap, defaultdict +from contextlib import contextmanager import re from sigma.correlations import ( @@ -139,6 +140,9 @@ class Backend(ABC): # not exists: convert as "not exists-expression" or as dedicated expression explicit_not_exists_expression: ClassVar[bool] = False + # use not_eq_token, not_eq_expression, etc. to implement != as a separate expression instead of not_token in ConditionNOT + convert_not_as_not_eq: ClassVar[bool] = False + def __init__( self, processing_pipeline: Optional[ProcessingPipeline] = None, @@ -745,9 +749,15 @@ class variables. If this is not sufficient, the respective methods can be implem eq_token: ClassVar[Optional[str]] = ( None # Token inserted between field and value (without separator) ) + not_eq_token: ClassVar[Optional[str]] = ( + None # Token inserted between field and value (without separator) if using not_eq_expression over not_token + ) eq_expression: ClassVar[str] = ( "{field}{backend.eq_token}{value}" # Expression for field = value ) + not_eq_expression: ClassVar[str] = ( + "{field}{backend.not_eq_token}{value}" # Expression for field != value + ) # Query structure # The generated query can be embedded into further structures. One common example are data @@ -812,12 +822,15 @@ class variables. If this is not sufficient, the respective methods can be implem } ) - # String matching operators. if none is appropriate eq_token is used. + # String matching operators. if none is appropriate eq_token (or not_eq_token) is used. startswith_expression: ClassVar[Optional[str]] = None + not_startswith_expression: ClassVar[Optional[str]] = None startswith_expression_allow_special: ClassVar[bool] = False endswith_expression: ClassVar[Optional[str]] = None + not_endswith_expression: ClassVar[Optional[str]] = None endswith_expression_allow_special: ClassVar[bool] = False contains_expression: ClassVar[Optional[str]] = None + not_contains_expression: ClassVar[Optional[str]] = None contains_expression_allow_special: ClassVar[bool] = False wildcard_match_expression: ClassVar[Optional[str]] = ( None # Special expression if wildcards can't be matched with the eq_token operator. @@ -828,6 +841,7 @@ class variables. If this is not sufficient, the respective methods can be implem # is one of the flags shortcuts supported by Sigma (currently i, m and s) and refers to the # token stored in the class variable re_flags. re_expression: ClassVar[Optional[str]] = None + not_re_expression: ClassVar[Optional[str]] = None re_escape_char: ClassVar[Optional[str]] = ( None # Character used for escaping in regular expressions ) @@ -849,10 +863,13 @@ class variables. If this is not sufficient, the respective methods can be implem # Case sensitive string matching operators similar to standard string matching. If not provided, # case_sensitive_match_expression is used. case_sensitive_startswith_expression: ClassVar[Optional[str]] = None + case_sensitive_not_startswith_expression: ClassVar[Optional[str]] = None case_sensitive_startswith_expression_allow_special: ClassVar[bool] = False case_sensitive_endswith_expression: ClassVar[Optional[str]] = None + case_sensitive_not_endswith_expression: ClassVar[Optional[str]] = None case_sensitive_endswith_expression_allow_special: ClassVar[bool] = False case_sensitive_contains_expression: ClassVar[Optional[str]] = None + case_sensitive_not_contains_expression: ClassVar[Optional[str]] = None case_sensitive_contains_expression_allow_special: ClassVar[bool] = False # CIDR expressions: define CIDR matching if backend has native support. Else pySigma expands @@ -860,6 +877,7 @@ class variables. If this is not sufficient, the respective methods can be implem cidr_expression: ClassVar[Optional[str]] = ( None # CIDR expression query as format string with placeholders {field}, {value} (the whole CIDR value), {network} (network part only), {prefixlen} (length of network mask prefix) and {netmask} (CIDR network mask only) ) + not_cidr_expression: ClassVar[Optional[str]] = None # Numeric comparison operators compare_op_expression: ClassVar[Optional[str]] = ( @@ -1087,6 +1105,58 @@ def __new__(cls, *args, **kwargs): c.explicit_not_exists_expression = c.field_not_exists_expression is not None return c + @contextmanager + def not_equals_context_manager(self, use_negated_expressions: bool = False): + """Context manager to temporarily swap expressions with their negated versions.""" + if not use_negated_expressions: + yield + return + + # Store original expressions + original_expressions = { + "eq_expression": self.eq_expression, + "re_expression": self.re_expression, + "cidr_expression": self.cidr_expression, + "startswith_expression": self.startswith_expression, + "case_sensitive_startswith_expression": self.case_sensitive_startswith_expression, + "endswith_expression": self.endswith_expression, + "case_sensitive_endswith_expression": self.case_sensitive_endswith_expression, + "contains_expression": self.contains_expression, + "case_sensitive_contains_expression": self.case_sensitive_contains_expression, + } + + # Swap to negated versions + try: + self.eq_expression = self.not_eq_expression + self.re_expression = self.not_re_expression + self.cidr_expression = self.not_cidr_expression + self.startswith_expression = self.not_startswith_expression + self.case_sensitive_startswith_expression = ( + self.case_sensitive_not_startswith_expression + ) + self.endswith_expression = self.not_endswith_expression + self.case_sensitive_endswith_expression = self.case_sensitive_not_endswith_expression + self.contains_expression = self.not_contains_expression + self.case_sensitive_contains_expression = self.case_sensitive_not_contains_expression + yield + finally: + # Restore original expressions + self.eq_expression = original_expressions["eq_expression"] + self.re_expression = original_expressions["re_expression"] + self.cidr_expression = original_expressions["cidr_expression"] + self.startswith_expression = original_expressions["startswith_expression"] + self.case_sensitive_startswith_expression = original_expressions[ + "case_sensitive_startswith_expression" + ] + self.endswith_expression = original_expressions["endswith_expression"] + self.case_sensitive_endswith_expression = original_expressions[ + "case_sensitive_endswith_expression" + ] + self.contains_expression = original_expressions["contains_expression"] + self.case_sensitive_contains_expression = original_expressions[ + "case_sensitive_contains_expression" + ] + def compare_precedence(self, outer: ConditionItem, inner: ConditionItem) -> bool: """ Compare precedence of outer and inner condition items. Return True if precedence of @@ -1209,9 +1279,11 @@ def convert_condition_not( arg = cond.args[0] try: if arg.__class__ in self.precedence: # group if AND or OR condition is negated - return ( - self.not_token + self.token_separator + self.convert_condition_group(arg, state) - ) + converted_group = self.convert_condition_group(arg, state) + if self.convert_not_as_not_eq: + return converted_group + else: + return self.not_token + self.token_separator + converted_group else: expr = self.convert_condition(arg, state) if isinstance( @@ -1219,7 +1291,10 @@ def convert_condition_not( ): # negate deferred expression and pass it to parent return expr.negate() else: # convert negated expression to string - return self.not_token + self.token_separator + expr + if self.convert_not_as_not_eq: + return expr + else: + return self.not_token + self.token_separator + expr except TypeError: # pragma: no cover raise NotImplementedError("Operator 'not' not supported by the backend") @@ -1313,6 +1388,27 @@ def convert_value_str(self, s: SigmaString, state: ConversionState) -> str: else: return converted + def convert_condition_field_eq_val( + self, cond: ConditionFieldEqualsValueExpression, state: ConversionState + ) -> Union[str, DeferredQueryExpression]: + """Uses context manager with parent class method to swap expressions with their negated versions + if convert_not_as_not_eq is set and the parent of the condition is a ConditionNOT.""" + + # Determine if negation is needed + + def is_parent_not(cond): + if cond.parent is None: + return False + if isinstance(cond.parent, ConditionNOT): + return True + return is_parent_not(cond.parent) + + negation = is_parent_not(cond) and self.convert_not_as_not_eq + + # Use context manager to handle negation + with self.not_equals_context_manager(use_negated_expressions=negation): + return super().convert_condition_field_eq_val(cond, state) + def convert_condition_field_eq_val_str( self, cond: ConditionFieldEqualsValueExpression, state: ConversionState ) -> Union[str, DeferredQueryExpression]: diff --git a/tests/test_conversion_base.py b/tests/test_conversion_base.py index 535d86f7..79cbab90 100644 --- a/tests/test_conversion_base.py +++ b/tests/test_conversion_base.py @@ -2803,3 +2803,178 @@ def test_convert_without_output(test_backend): assert rule._conversion_result == ['mappedA="value" and \'field A\'="value"'] assert result == [] + + +def test_convert_not_as_not_eq(test_backend, monkeypatch): + """Test that NOT conditions are converted using not_eq expressions when convert_not_as_not_eq is True""" + monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True) + monkeypatch.setattr(test_backend, "not_eq_token", "!=") + monkeypatch.setattr(test_backend, "not_eq_expression", "{field}{backend.not_eq_token}{value}") + assert ( + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA: value1 + condition: not sel + """ + ) + ) + == ['mappedA!="value1"'] + ) + + +def test_convert_not_startswith(test_backend, monkeypatch): + """Test negated startswith expression when convert_not_as_not_eq is True""" + monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True) + monkeypatch.setattr(test_backend, "not_startswith_expression", "{field} not_startswith {value}") + assert ( + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|startswith: "val" + condition: not sel + """ + ) + ) + == ['mappedA not_startswith "val"'] + ) + + +def test_convert_not_contains(test_backend, monkeypatch): + """Test negated contains expression when convert_not_as_not_eq is True""" + monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True) + monkeypatch.setattr(test_backend, "not_contains_expression", "{field} not_contains {value}") + assert ( + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|contains: "val" + condition: not sel + """ + ) + ) + == ['mappedA not_contains "val"'] + ) + + +def test_convert_not_re(test_backend, monkeypatch): + """Test negated regular expression when convert_not_as_not_eq is True""" + monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True) + monkeypatch.setattr(test_backend, "not_re_expression", "{field}!=/{regex}/") + assert ( + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|re: "val.*" + condition: not sel + """ + ) + ) + == ["mappedA!=/val.*/"] + ) + + +def test_convert_not_cidr(test_backend, monkeypatch): + """Test negated CIDR expression when convert_not_as_not_eq is True""" + monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True) + monkeypatch.setattr(test_backend, "not_cidr_expression", "cidrnotmatch('{field}', \"{value}\")") + assert ( + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel: + fieldA|cidr: "192.168.1.0/24" + condition: not sel + """ + ) + ) + == ["cidrnotmatch('mappedA', \"192.168.1.0/24\")"] + ) + + +def test_convert_not_and_group(test_backend, monkeypatch): + """Test that NOT with AND group is handled correctly when convert_not_as_not_eq is True""" + monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True) + monkeypatch.setattr(test_backend, "not_eq_token", "!=") + monkeypatch.setattr(test_backend, "not_eq_expression", "{field}{backend.not_eq_token}{value}") + assert ( + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel1: + fieldA: value1 + sel2: + fieldB: value2 + condition: not (sel1 and sel2) + """ + ) + ) + == ['(mappedA!="value1" and mappedB!="value2")'] + ) + + +def test_convert_not_or_group(test_backend, monkeypatch): + """Test that NOT with OR group is handled correctly when convert_not_as_not_eq is True""" + monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True) + monkeypatch.setattr(test_backend, "not_eq_token", "!=") + monkeypatch.setattr(test_backend, "not_eq_expression", "{field}{backend.not_eq_token}{value}") + assert ( + test_backend.convert( + SigmaCollection.from_yaml( + """ + title: Test + status: test + logsource: + category: test_category + product: test_product + detection: + sel1: + fieldA: value1 + sel2: + fieldB: value2 + condition: not (sel1 or sel2) + """ + ) + ) + == ['(mappedA!="value1" or mappedB!="value2")'] + )