Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/SigmaHQ/pySigma
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaspatzke committed Nov 10, 2024
2 parents cc82ea9 + 380ec89 commit d6fdf46
Show file tree
Hide file tree
Showing 6 changed files with 421 additions and 56 deletions.
106 changes: 101 additions & 5 deletions sigma/conversion/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import ABC, abstractmethod
from collections import ChainMap, defaultdict
from contextlib import contextmanager
import re

from sigma.correlations import (
Expand Down Expand Up @@ -139,6 +140,9 @@ class Backend(ABC):
# not exists: convert as "not exists-expression" or as dedicated expression
explicit_not_exists_expression: ClassVar[bool] = False

# use not_eq_token, not_eq_expression, etc. to implement != as a separate expression instead of not_token in ConditionNOT
convert_not_as_not_eq: ClassVar[bool] = False

def __init__(
self,
processing_pipeline: Optional[ProcessingPipeline] = None,
Expand Down Expand Up @@ -745,9 +749,15 @@ class variables. If this is not sufficient, the respective methods can be implem
eq_token: ClassVar[Optional[str]] = (
None # Token inserted between field and value (without separator)
)
not_eq_token: ClassVar[Optional[str]] = (
None # Token inserted between field and value (without separator) if using not_eq_expression over not_token
)
eq_expression: ClassVar[str] = (
"{field}{backend.eq_token}{value}" # Expression for field = value
)
not_eq_expression: ClassVar[str] = (
"{field}{backend.not_eq_token}{value}" # Expression for field != value
)

# Query structure
# The generated query can be embedded into further structures. One common example are data
Expand Down Expand Up @@ -812,12 +822,15 @@ class variables. If this is not sufficient, the respective methods can be implem
}
)

# String matching operators. if none is appropriate eq_token is used.
# String matching operators. if none is appropriate eq_token (or not_eq_token) is used.
startswith_expression: ClassVar[Optional[str]] = None
not_startswith_expression: ClassVar[Optional[str]] = None
startswith_expression_allow_special: ClassVar[bool] = False
endswith_expression: ClassVar[Optional[str]] = None
not_endswith_expression: ClassVar[Optional[str]] = None
endswith_expression_allow_special: ClassVar[bool] = False
contains_expression: ClassVar[Optional[str]] = None
not_contains_expression: ClassVar[Optional[str]] = None
contains_expression_allow_special: ClassVar[bool] = False
wildcard_match_expression: ClassVar[Optional[str]] = (
None # Special expression if wildcards can't be matched with the eq_token operator.
Expand All @@ -828,6 +841,7 @@ class variables. If this is not sufficient, the respective methods can be implem
# is one of the flags shortcuts supported by Sigma (currently i, m and s) and refers to the
# token stored in the class variable re_flags.
re_expression: ClassVar[Optional[str]] = None
not_re_expression: ClassVar[Optional[str]] = None
re_escape_char: ClassVar[Optional[str]] = (
None # Character used for escaping in regular expressions
)
Expand All @@ -849,17 +863,21 @@ class variables. If this is not sufficient, the respective methods can be implem
# Case sensitive string matching operators similar to standard string matching. If not provided,
# case_sensitive_match_expression is used.
case_sensitive_startswith_expression: ClassVar[Optional[str]] = None
case_sensitive_not_startswith_expression: ClassVar[Optional[str]] = None
case_sensitive_startswith_expression_allow_special: ClassVar[bool] = False
case_sensitive_endswith_expression: ClassVar[Optional[str]] = None
case_sensitive_not_endswith_expression: ClassVar[Optional[str]] = None
case_sensitive_endswith_expression_allow_special: ClassVar[bool] = False
case_sensitive_contains_expression: ClassVar[Optional[str]] = None
case_sensitive_not_contains_expression: ClassVar[Optional[str]] = None
case_sensitive_contains_expression_allow_special: ClassVar[bool] = False

# CIDR expressions: define CIDR matching if backend has native support. Else pySigma expands
# CIDR values into string wildcard matches.
cidr_expression: ClassVar[Optional[str]] = (
None # CIDR expression query as format string with placeholders {field}, {value} (the whole CIDR value), {network} (network part only), {prefixlen} (length of network mask prefix) and {netmask} (CIDR network mask only)
)
not_cidr_expression: ClassVar[Optional[str]] = None

# Numeric comparison operators
compare_op_expression: ClassVar[Optional[str]] = (
Expand Down Expand Up @@ -1087,6 +1105,58 @@ def __new__(cls, *args, **kwargs):
c.explicit_not_exists_expression = c.field_not_exists_expression is not None
return c

@contextmanager
def not_equals_context_manager(self, use_negated_expressions: bool = False):
"""Context manager to temporarily swap expressions with their negated versions."""
if not use_negated_expressions:
yield
return

# Store original expressions
original_expressions = {
"eq_expression": self.eq_expression,
"re_expression": self.re_expression,
"cidr_expression": self.cidr_expression,
"startswith_expression": self.startswith_expression,
"case_sensitive_startswith_expression": self.case_sensitive_startswith_expression,
"endswith_expression": self.endswith_expression,
"case_sensitive_endswith_expression": self.case_sensitive_endswith_expression,
"contains_expression": self.contains_expression,
"case_sensitive_contains_expression": self.case_sensitive_contains_expression,
}

# Swap to negated versions
try:
self.eq_expression = self.not_eq_expression
self.re_expression = self.not_re_expression
self.cidr_expression = self.not_cidr_expression
self.startswith_expression = self.not_startswith_expression
self.case_sensitive_startswith_expression = (
self.case_sensitive_not_startswith_expression
)
self.endswith_expression = self.not_endswith_expression
self.case_sensitive_endswith_expression = self.case_sensitive_not_endswith_expression
self.contains_expression = self.not_contains_expression
self.case_sensitive_contains_expression = self.case_sensitive_not_contains_expression
yield
finally:
# Restore original expressions
self.eq_expression = original_expressions["eq_expression"]
self.re_expression = original_expressions["re_expression"]
self.cidr_expression = original_expressions["cidr_expression"]
self.startswith_expression = original_expressions["startswith_expression"]
self.case_sensitive_startswith_expression = original_expressions[
"case_sensitive_startswith_expression"
]
self.endswith_expression = original_expressions["endswith_expression"]
self.case_sensitive_endswith_expression = original_expressions[
"case_sensitive_endswith_expression"
]
self.contains_expression = original_expressions["contains_expression"]
self.case_sensitive_contains_expression = original_expressions[
"case_sensitive_contains_expression"
]

def compare_precedence(self, outer: ConditionItem, inner: ConditionItem) -> bool:
"""
Compare precedence of outer and inner condition items. Return True if precedence of
Expand Down Expand Up @@ -1209,17 +1279,22 @@ def convert_condition_not(
arg = cond.args[0]
try:
if arg.__class__ in self.precedence: # group if AND or OR condition is negated
return (
self.not_token + self.token_separator + self.convert_condition_group(arg, state)
)
converted_group = self.convert_condition_group(arg, state)
if self.convert_not_as_not_eq:
return converted_group
else:
return self.not_token + self.token_separator + converted_group
else:
expr = self.convert_condition(arg, state)
if isinstance(
expr, DeferredQueryExpression
): # negate deferred expression and pass it to parent
return expr.negate()
else: # convert negated expression to string
return self.not_token + self.token_separator + expr
if self.convert_not_as_not_eq:
return expr
else:
return self.not_token + self.token_separator + expr
except TypeError: # pragma: no cover
raise NotImplementedError("Operator 'not' not supported by the backend")

Expand Down Expand Up @@ -1313,6 +1388,27 @@ def convert_value_str(self, s: SigmaString, state: ConversionState) -> str:
else:
return converted

def convert_condition_field_eq_val(
self, cond: ConditionFieldEqualsValueExpression, state: ConversionState
) -> Union[str, DeferredQueryExpression]:
"""Uses context manager with parent class method to swap expressions with their negated versions
if convert_not_as_not_eq is set and the parent of the condition is a ConditionNOT."""

# Determine if negation is needed

def is_parent_not(cond):
if cond.parent is None:
return False
if isinstance(cond.parent, ConditionNOT):
return True
return is_parent_not(cond.parent)

negation = is_parent_not(cond) and self.convert_not_as_not_eq

# Use context manager to handle negation
with self.not_equals_context_manager(use_negated_expressions=negation):
return super().convert_condition_field_eq_val(cond, state)

def convert_condition_field_eq_val_str(
self, cond: ConditionFieldEqualsValueExpression, state: ConversionState
) -> Union[str, DeferredQueryExpression]:
Expand Down
9 changes: 7 additions & 2 deletions sigma/processing/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,11 @@ def __post_init__(self):

def apply_string_value(self, field: str, val: SigmaString) -> Optional[SigmaString]:
regex = ""

# empty string can not be convert into a simple regex
if val == "":
return val

for sc in val.s: # iterate over all SigmaString components (strings and special chars)
if isinstance(sc, str): # if component is a string
if (
Expand Down Expand Up @@ -1117,7 +1122,7 @@ class RuleFailureTransformation(Transformation):
def apply(
self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule
) -> None:
raise SigmaTransformationError(self.message)
raise SigmaTransformationError(self.message, source=rule.source)


@dataclass
Expand All @@ -1131,7 +1136,7 @@ class DetectionItemFailureTransformation(DetectionItemTransformation):
message: str

def apply_detection_item(self, detection_item: SigmaDetectionItem) -> None:
raise SigmaTransformationError(self.message)
raise SigmaTransformationError(self.message, source=detection_item.source)


@dataclass
Expand Down
66 changes: 42 additions & 24 deletions sigma/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from enum import Enum, auto
from datetime import date, datetime
import yaml
import re
import sigma
from sigma.types import SigmaType, SigmaNull, SigmaString, SigmaNumber, sigma_type
from sigma.modifiers import (
Expand Down Expand Up @@ -847,6 +848,45 @@ class instantiation of an object derived from the SigmaRuleBase class and the er
SigmaRule object. Else the first recognized error is raised as exception.
"""
errors = []

def get_rule_as_date(name: str, exception_class) -> Optional[date]:
"""
Accepted string based date formats are in range 1000-01-01 .. 3999-12-31:
* XXXX-XX-XX -- fully corresponds to yaml date format
* XXXX/XX/XX, XXXX/XX/X, XXXX/X/XX, XXXX/X/X -- often occurs in the US-based sigmas
Not accepted are ambiguous dates such as:
2024-01-1, 24-1-24, 24/1/1, ...
"""
nonlocal errors, rule, source
result = rule.get(name)
if (
result is not None
and not isinstance(result, date)
and not isinstance(result, datetime)
):
error = True
try:
result = str(result) # forcifully convert whatever the type is into string
accepted_regexps = (
"([1-3][0-9][0-9][0-9])-([01][0-9])-([0-3][0-9])", # 1000-01-01 .. 3999-12-31
"([1-3][0-9][0-9][0-9])/([01]?[0-9])/([0-3]?[0-9])", # 1000/1/1, 1000/01/01 .. 3999/12/31
)
for date_regexp in accepted_regexps:
matcher = re.fullmatch(date_regexp, result)
if matcher:
result = date(int(matcher[1]), int(matcher[2]), int(matcher[3]))
error = False
break
except Exception:
pass
if error:
errors.append(
exception_class(
f"Rule {name} '{ result }' is invalid, use yyyy-mm-dd", source=source
)
)
return result

# Rule identifier may be empty or must be valid UUID
rule_id = rule.get("id")
if rule_id is not None:
Expand Down Expand Up @@ -944,32 +984,10 @@ class instantiation of an object derived from the SigmaRuleBase class and the er
)

# parse rule date if existing
rule_date = rule.get("date")
if rule_date is not None:
if not isinstance(rule_date, date) and not isinstance(rule_date, datetime):
try:
rule_date = date(*(int(i) for i in rule_date.split("-")))
except ValueError:
errors.append(
sigma_exceptions.SigmaDateError(
f"Rule date '{ rule_date }' is invalid, must be yyyy-mm-dd",
source=source,
)
)
rule_date = get_rule_as_date("date", sigma_exceptions.SigmaDateError)

# parse rule modified if existing
rule_modified = rule.get("modified")
if rule_modified is not None:
if not isinstance(rule_modified, date) and not isinstance(rule_modified, datetime):
try:
rule_modified = date(*(int(i) for i in rule_modified.split("-")))
except ValueError:
errors.append(
sigma_exceptions.SigmaModifiedError(
f"Rule modified '{ rule_modified }' is invalid, must be yyyy-mm-dd",
source=source,
)
)
rule_modified = get_rule_as_date("modified", sigma_exceptions.SigmaModifiedError)

# Rule fields validation
rule_fields = rule.get("fields")
Expand Down
Loading

0 comments on commit d6fdf46

Please sign in to comment.