diff --git a/sigma/exceptions.py b/sigma/exceptions.py index ed3e912d..f428c469 100644 --- a/sigma/exceptions.py +++ b/sigma/exceptions.py @@ -145,6 +145,12 @@ class SigmaPipelineParsingError(SigmaError): pass +class SigmaProcessingItemError(SigmaError): + """Error in definition or state of processing item.""" + + pass + + class SigmaPlaceholderError(SigmaValueError): """Attempted to convert an unhandled Placeholder into a query""" diff --git a/sigma/processing/conditions/base.py b/sigma/processing/conditions/base.py index 41bb66f7..eb08bbe6 100644 --- a/sigma/processing/conditions/base.py +++ b/sigma/processing/conditions/base.py @@ -4,18 +4,36 @@ import sigma from sigma.correlations import SigmaCorrelationRule from sigma.types import SigmaFieldReference, SigmaType -from typing import Literal, Union +from typing import Literal, Optional, Union from sigma.rule import ( SigmaDetection, SigmaRule, SigmaDetectionItem, ) -from sigma.exceptions import SigmaConfigurationError, SigmaRegularExpressionError +from sigma.exceptions import ( + SigmaConfigurationError, + SigmaProcessingItemError, + SigmaRegularExpressionError, +) +@dataclass class ProcessingCondition(ABC): """Anchor base class for all processing condition types.""" + _pipeline: Optional["sigma.processing.pipeline.ProcessingPipeline"] = field( + init=False, compare=False, default=None + ) + + def set_pipeline(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline"): + if self._pipeline is None: + self._pipeline = pipeline + else: + raise SigmaProcessingItemError(f"Pipeline for condition was already set.") + + def _clear_pipeline(self) -> None: + self._pipeline = None + @dataclass class RuleProcessingCondition(ProcessingCondition, ABC): @@ -26,7 +44,6 @@ class RuleProcessingCondition(ProcessingCondition, ABC): @abstractmethod def match( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule], ) -> bool: """Match condition on Sigma rule.""" @@ -39,14 +56,11 @@ class FieldNameProcessingCondition(ProcessingCondition, ABC): """ @abstractmethod - def match_field_name( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", field: str - ) -> bool: + def match_field_name(self, field: str) -> bool: "The method match is called for each field name and must return a bool result." def match_detection_item( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem, ) -> bool: """ @@ -60,37 +74,33 @@ def match_detection_item( * `match_detection_item_value` for the whole value list of a detection item and * `match_value` for single detection items values. """ - return self.match_detection_item_field( - pipeline, detection_item - ) or self.match_detection_item_value(pipeline, detection_item) + return self.match_detection_item_field(detection_item) or self.match_detection_item_value( + detection_item + ) def match_detection_item_field( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem, ) -> bool: """Returns True if the field of the detection item matches the implemented field name condition.""" - return self.match_field_name(pipeline, detection_item.field) + return self.match_field_name(detection_item.field) def match_detection_item_value( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem, ) -> bool: """Returns True if any value of a detection item contains a field reference to a field name matching the implemented field name condition. Processing actions must only be applied to matching individual values determined by `match_value`.""" - return any((self.match_value(pipeline, value) for value in detection_item.value)) + return any((self.match_value(value) for value in detection_item.value)) - def match_value( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", value: SigmaType - ) -> bool: + def match_value(self, value: SigmaType) -> bool: """ Checks if a detection item value matches the field name condition implemented in `match_field_name` if it is a field reference. For all other types the method returns False. """ if isinstance(value, SigmaFieldReference): - return self.match_field_name(pipeline, value.field) + return self.match_field_name(value.field) else: return False @@ -104,7 +114,6 @@ class DetectionItemProcessingCondition(ProcessingCondition, ABC): @abstractmethod def match( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem, ) -> bool: """Match condition on Sigma rule.""" @@ -134,17 +143,12 @@ def __post_init__(self): def match( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem, ) -> bool: - return self.match_func( - (self.match_value(pipeline, value) for value in detection_item.value) - ) + return self.match_func((self.match_value(value) for value in detection_item.value)) @abstractmethod - def match_value( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", value: SigmaType - ) -> bool: + def match_value(self, value: SigmaType) -> bool: """Match condition on detection item values.""" @@ -154,7 +158,6 @@ class RuleDetectionItemCondition(RuleProcessingCondition, ABC): def match( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule], ) -> bool: if isinstance(rule, SigmaRule): diff --git a/sigma/processing/conditions/fields.py b/sigma/processing/conditions/fields.py index 85aae2a8..dc86468f 100644 --- a/sigma/processing/conditions/fields.py +++ b/sigma/processing/conditions/fields.py @@ -34,7 +34,6 @@ def __post_init__(self): def match_field_name( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", field: Optional[str], ) -> bool: if field is None: @@ -59,7 +58,6 @@ class ExcludeFieldCondition(IncludeFieldCondition): def match_field_name( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem, ) -> bool: - return not super().match_field_name(pipeline, detection_item) + return not super().match_field_name(detection_item) diff --git a/sigma/processing/conditions/rule.py b/sigma/processing/conditions/rule.py index 3d2ae693..81c13785 100644 --- a/sigma/processing/conditions/rule.py +++ b/sigma/processing/conditions/rule.py @@ -38,7 +38,6 @@ def __post_init__(self): def match( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule], ) -> bool: if isinstance(rule, SigmaRule): @@ -47,7 +46,7 @@ def match( # Will only return true if the rules have been resolved in advance for ref in rule.rules: if hasattr(ref, "rule") and isinstance(ref.rule, (SigmaRule, SigmaCorrelationRule)): - if self.match(pipeline, ref.rule): + if self.match(ref.rule): return True return False @@ -109,7 +108,6 @@ class IsSigmaRuleCondition(RuleProcessingCondition): def match( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule], ) -> bool: return isinstance(rule, SigmaRule) @@ -123,7 +121,6 @@ class IsSigmaCorrelationRuleCondition(RuleProcessingCondition): def match( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule], ) -> bool: return isinstance(rule, SigmaCorrelationRule) @@ -166,7 +163,6 @@ def __post_init__(self): def match( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule], ) -> bool: try: # first try to get built-in attribute @@ -239,7 +235,6 @@ def __post_init__(self): def match( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule], ) -> bool: return self.match_tag in rule.tags diff --git a/sigma/processing/conditions/state.py b/sigma/processing/conditions/state.py index f8146b56..ac1a89fd 100644 --- a/sigma/processing/conditions/state.py +++ b/sigma/processing/conditions/state.py @@ -12,7 +12,7 @@ SigmaRule, SigmaDetectionItem, ) -from sigma.exceptions import SigmaConfigurationError +from sigma.exceptions import SigmaConfigurationError, SigmaProcessingItemError @dataclass @@ -25,7 +25,6 @@ class RuleProcessingItemAppliedCondition(RuleProcessingCondition): def match( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule], ) -> bool: return rule.was_processed_by(self.processing_item_id) @@ -42,11 +41,15 @@ class ProcessingStateConditionBase: val: Union[str, int, float, bool] op: Literal["eq", "ne", "gte", "gt", "lte", "lt"] = field(default="eq") - def match_state(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline") -> bool: + def match_state(self) -> bool: try: - state_val = pipeline.state[self.key] + state_val = self._pipeline.state[self.key] except KeyError: return False + except AttributeError: + raise SigmaProcessingItemError( + "No processing pipeline was passed to condition, but required by it" + ) if self.op == "eq": return state_val == self.val @@ -74,10 +77,9 @@ class RuleProcessingStateCondition(RuleProcessingCondition, ProcessingStateCondi def match( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule], ) -> bool: - return self.match_state(pipeline) + return self.match_state() @dataclass @@ -88,10 +90,9 @@ class FieldNameProcessingStateCondition(FieldNameProcessingCondition, Processing def match_field_name( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", field: str, ) -> bool: - return self.match_state(pipeline) + return self.match_state() @dataclass @@ -104,7 +105,6 @@ class DetectionItemProcessingItemAppliedCondition(DetectionItemProcessingConditi def match( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem, ) -> bool: return detection_item.was_processed_by(self.processing_item_id) @@ -120,10 +120,9 @@ class DetectionItemProcessingStateCondition( def match( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem, ) -> bool: - return self.match_state(pipeline) + return self.match_state() @dataclass @@ -134,14 +133,11 @@ class FieldNameProcessingItemAppliedCondition(FieldNameProcessingCondition): processing_item_id: str - def match_field_name( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", field: str - ) -> bool: - return pipeline.field_was_processed_by(field, self.processing_item_id) + def match_field_name(self, field: str) -> bool: + return self._pipeline.field_was_processed_by(field, self.processing_item_id) def match_detection_item( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", detection_item: SigmaDetectionItem, ): return detection_item.was_processed_by(self.processing_item_id) diff --git a/sigma/processing/conditions/values.py b/sigma/processing/conditions/values.py index 52d0e31c..fa829711 100644 --- a/sigma/processing/conditions/values.py +++ b/sigma/processing/conditions/values.py @@ -29,9 +29,7 @@ def __post_init__(self): f"Regular expression '{self.pattern}' is invalid: {str(e)}" ) from e - def match_value( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", value: SigmaType - ) -> bool: + def match_value(self, value: SigmaType) -> bool: if isinstance(value, SigmaString): result = self.re.match(str(value)) else: @@ -48,9 +46,7 @@ class ContainsWildcardCondition(ValueProcessingCondition): Evaluates to True if the value contains a wildcard character. """ - def match_value( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", value: SigmaType - ) -> bool: + def match_value(self, value: SigmaType) -> bool: if isinstance(value, SigmaString): return value.contains_special() else: @@ -65,7 +61,5 @@ class IsNullCondition(ValueProcessingCondition): false result in all match mode. """ - def match_value( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", value: SigmaType - ) -> bool: + def match_value(self, value: SigmaType) -> bool: return isinstance(value, SigmaNull) diff --git a/sigma/processing/finalization.py b/sigma/processing/finalization.py index 25db774e..74d83985 100644 --- a/sigma/processing/finalization.py +++ b/sigma/processing/finalization.py @@ -5,7 +5,7 @@ import yaml import sigma -from sigma.exceptions import SigmaConfigurationError +from sigma.exceptions import SigmaConfigurationError, SigmaTransformationError from sigma.processing.templates import TemplateBase @@ -14,6 +14,10 @@ class Finalizer: """Conversion output transformation base class.""" + _pipeline: Optional["sigma.processing.pipeline.ProcessingPipeline"] = field( + init=False, compare=False, default=None + ) + @classmethod def from_dict(cls, d: dict) -> "Finalizer": try: @@ -21,14 +25,16 @@ def from_dict(cls, d: dict) -> "Finalizer": except TypeError as e: raise SigmaConfigurationError("Error in instantiation of finalizer: " + str(e)) + def set_pipeline(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline") -> None: + if self._pipeline is None: + self._pipeline = pipeline + else: + raise SigmaTransformationError("Pipeline for finalizer was already set.") + @abstractmethod - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", queries: List[Any] - ) -> Any: + def apply(self, queries: List[Any]) -> Any: """Finalize output by applying a transformation to the list of generated and postprocessed queries. - :param pipeline: Processing pipeline this transformation was contained. - :type pipeline: sigma.processing.pipeline.ProcessingPipeline :param queries: List of converted and postprocessed queries that should be finalized. :type queries: List[Any] :return: Output that can be used in further processing of the conversion result. @@ -45,9 +51,7 @@ class ConcatenateQueriesFinalizer(Finalizer): prefix: str = "" suffix: str = "" - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", queries: List[str] - ) -> str: + def apply(self, queries: List[str]) -> str: return self.prefix + self.separator.join(queries) + self.suffix @@ -55,9 +59,7 @@ def apply( class JSONFinalizer(Finalizer): indent: Optional[int] = None - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", queries: List[Any] - ) -> str: + def apply(self, queries: List[Any]) -> str: return json.dumps(queries, indent=self.indent) @@ -65,9 +67,7 @@ def apply( class YAMLFinalizer(Finalizer): indent: Optional[int] = None - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", queries: List[Any] - ) -> str: + def apply(self, queries: List[Any]) -> str: yaml.safe_dump(queries, indent=self.indent) @@ -85,10 +85,8 @@ class TemplateFinalizer(Finalizer, TemplateBase): controls the Jinja2 HTML/XML auto-escaping. """ - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", queries: List[Any] - ) -> str: - return self.j2template.render(queries=queries, pipeline=pipeline) + def apply(self, queries: List[Any]) -> str: + return self.j2template.render(queries=queries, pipeline=self._pipeline) @dataclass @@ -120,9 +118,7 @@ def from_dict(cls, d: Dict) -> "NestedFinalizer": fs.append(finalizers[finalizer_type].from_dict(finalizer)) return cls(finalizers=fs) - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", queries: List[Any] - ) -> Any: + def apply(self, queries: List[Any]) -> Any: return self._nested_pipeline.finalize(queries) diff --git a/sigma/processing/pipeline.py b/sigma/processing/pipeline.py index 1c2b307e..d889c0e0 100644 --- a/sigma/processing/pipeline.py +++ b/sigma/processing/pipeline.py @@ -31,7 +31,12 @@ field_name_conditions, FieldNameProcessingCondition, ) -from sigma.exceptions import SigmaConfigurationError, SigmaTypeError, SigmaPipelineParsingError +from sigma.exceptions import ( + SigmaConfigurationError, + SigmaProcessingItemError, + SigmaTypeError, + SigmaPipelineParsingError, +) import yaml from sigma.types import SigmaFieldReference, SigmaType @@ -44,6 +49,7 @@ class ProcessingItemBase: rule_condition_negation: bool = False rule_conditions: List[RuleProcessingCondition] = field(default_factory=list) identifier: Optional[str] = None + _pipeline: Optional["ProcessingPipeline"] = field(init=False, compare=False, default=None) @classmethod def _parse_conditions( @@ -108,16 +114,30 @@ def _instantiate_transformation(cls, d: dict, transformations: Dict[str, Type[Tr f"Unknown transformation type '{ transformation_class_name }'" ) - def match_rule_conditions( - self, pipeline: "ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule] - ): + def match_rule_conditions(self, rule: Union[SigmaRule, SigmaCorrelationRule]): cond_result = self.rule_condition_linking( - [condition.match(pipeline, rule) for condition in self.rule_conditions] + [condition.match(rule) for condition in self.rule_conditions] ) if self.rule_condition_negation: cond_result = not cond_result return not self.rule_conditions or cond_result + def set_pipeline(self, pipeline: "ProcessingPipeline") -> None: + if self._pipeline is None: + self._pipeline = pipeline + else: + raise SigmaProcessingItemError(f"Pipeline for processing item was already set.") + + self.transformation.set_pipeline(pipeline) + for rule_condition in self.rule_conditions: + rule_condition.set_pipeline(self._pipeline) + + def _clear_pipeline(self) -> None: + self._pipeline = None + self.transformation._clear_pipeline() + for rule_condition in self.rule_conditions: + rule_condition._clear_pipeline() + @dataclass class ProcessingItem(ProcessingItemBase): @@ -235,40 +255,47 @@ def __post_init__(self): f"Detection item processing condition '{str(field_name_condition)}' is not a FieldNameProcessingCondition" ) - def apply( - self, pipeline: "ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule] - ) -> bool: + def set_pipeline(self, pipeline: "ProcessingPipeline") -> None: + super().set_pipeline(pipeline) + for detection_item_condition in self.detection_item_conditions: + detection_item_condition.set_pipeline(self._pipeline) + for field_name_condition in self.field_name_conditions: + field_name_condition.set_pipeline(self._pipeline) + + def _clear_pipeline(self) -> None: + super()._clear_pipeline() + for detection_item_condition in self.detection_item_conditions: + detection_item_condition._clear_pipeline() + for field_name_condition in self.field_name_conditions: + field_name_condition._clear_pipeline() + + def apply(self, rule: Union[SigmaRule, SigmaCorrelationRule]) -> bool: """ Matches condition against rule and performs transformation if condition is true or not present. Returns Sigma rule and bool if transformation was applied. """ if self.match_rule_conditions( - pipeline, rule + rule ): # apply transformation if conditions match or no condition defined - self.transformation.apply(pipeline, rule) + self.transformation.apply(rule) return True else: # just pass rule through return False - def match_detection_item( - self, pipeline: "ProcessingPipeline", detection_item: SigmaDetectionItem - ) -> bool: + def match_detection_item(self, detection_item: SigmaDetectionItem) -> bool: """ Evalutates detection item and field name conditions from processing item to detection item and returns result. """ detection_item_cond_result = self.detection_item_condition_linking( - [ - condition.match(pipeline, detection_item) - for condition in self.detection_item_conditions - ] + [condition.match(detection_item) for condition in self.detection_item_conditions] ) if self.detection_item_condition_negation: detection_item_cond_result = not detection_item_cond_result field_name_cond_result = self.field_name_condition_linking( [ - condition.match_detection_item(pipeline, detection_item) + condition.match_detection_item(detection_item) for condition in self.field_name_conditions ] ) @@ -277,28 +304,25 @@ def match_detection_item( return detection_item_cond_result and field_name_cond_result - def match_field_name(self, pipeline: "ProcessingPipeline", field: Optional[str]) -> bool: + def match_field_name(self, field: Optional[str]) -> bool: """ Evaluate field name conditions on field names and return result. """ field_name_cond_result = self.field_name_condition_linking( - [ - condition.match_field_name(pipeline, field) - for condition in self.field_name_conditions - ] + [condition.match_field_name(field) for condition in self.field_name_conditions] ) if self.field_name_condition_negation: field_name_cond_result = not field_name_cond_result return field_name_cond_result - def match_field_in_value(self, pipeline: "ProcessingPipeline", value: SigmaType) -> bool: + def match_field_in_value(self, value: SigmaType) -> bool: """ Evaluate field name conditions in field reference values and return result. """ if isinstance(value, SigmaFieldReference): field_name_cond_result = self.field_name_condition_linking( - [condition.match_value(pipeline, value) for condition in self.field_name_conditions] + [condition.match_value(value) for condition in self.field_name_conditions] ) if self.field_name_condition_negation: field_name_cond_result = not field_name_cond_result @@ -370,7 +394,6 @@ def __post_init__(self): def apply( self, - pipeline: "ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule], query: str, ) -> Tuple[str, bool]: @@ -379,9 +402,9 @@ def apply( Returns query and bool if transformation was applied. """ if self.match_rule_conditions( - pipeline, rule + rule ): # apply transformation if conditions match or no condition defined - result = self.transformation.apply(pipeline, rule, query) + result = self.transformation.apply(rule, query) return (result, True) else: # just pass rule through return (query, False) @@ -443,6 +466,25 @@ def __post_init__(self): if not all((isinstance(finalizer, Finalizer) for finalizer in self.finalizers)): raise TypeError("Each item in a finalizer pipeline must be a Finalizer") + # Initialize contained items with just instantiated processing pipeline as context. + self.set_pipeline() + + def set_pipeline(self): + for processing_item in self.items: + processing_item.set_pipeline(self) + for postprocessing_item in self.postprocessing_items: + postprocessing_item.set_pipeline(self) + for finalizer in self.finalizers: + finalizer.set_pipeline(self) + + def _clear_pipeline(self): + for processing_item in self.items: + processing_item._clear_pipeline() + for postprocessing_item in self.postprocessing_items: + postprocessing_item._clear_pipeline() + for finalizer in self.finalizers: + finalizer._pipeline = None + @classmethod def from_dict(cls, d: dict) -> "ProcessingPipeline": """Instantiate processing pipeline from a parsed processing item description.""" @@ -452,7 +494,8 @@ def from_dict(cls, d: dict) -> "ProcessingPipeline": processing_items = list() for i, item in enumerate(items): try: - processing_items.append(ProcessingItem.from_dict(item)) + processing_item = ProcessingItem.from_dict(item) + processing_items.append(processing_item) except SigmaConfigurationError as e: raise SigmaConfigurationError( f"Error in processing rule { i + 1 }: { str(e) }" @@ -516,7 +559,7 @@ def apply( self.field_mappings = FieldMappingTracking() self.state = dict() for item in self.items: - applied = item.apply(self, rule) + applied = item.apply(rule) self.applied.append(applied) if applied and (itid := item.identifier): self.applied_ids.add(itid) @@ -525,14 +568,14 @@ def apply( def postprocess_query(self, rule: Union[SigmaRule, SigmaCorrelationRule], query: Any) -> Any: """Post-process queries with postprocessing_items.""" for item in self.postprocessing_items: - query, applied = item.apply(self, rule, query) + query, applied = item.apply(rule, query) if applied and (itid := item.identifier): self.applied_ids.add(itid) return query def finalize(self, output: Any) -> Any: for finalizer in self.finalizers: - output = finalizer.apply(self, output) + output = finalizer.apply(output) return output def track_field_processing_items( @@ -566,6 +609,9 @@ def __add__(self, other: Optional["ProcessingPipeline"]) -> "ProcessingPipeline" if not isinstance(other, self.__class__): raise TypeError("Processing pipeline must be merged with another one.") + self._clear_pipeline() + other._clear_pipeline() + return self.__class__( items=self.items + other.items, postprocessing_items=self.postprocessing_items + other.postprocessing_items, diff --git a/sigma/processing/postprocessing.py b/sigma/processing/postprocessing.py index ec36be38..82dccdc2 100644 --- a/sigma/processing/postprocessing.py +++ b/sigma/processing/postprocessing.py @@ -20,9 +20,7 @@ class QueryPostprocessingTransformation(Transformation): ) @abstractmethod - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule, query: Any - ) -> Any: + def apply(self, rule: SigmaRule, query: Any) -> Any: """Applies post-processing transformation to arbitrary typed query. :param pipeline: Processing pipeline this transformation was contained. @@ -34,7 +32,7 @@ def apply( :return: Transformed query. :rtype: Any """ - super().apply(pipeline, rule) # tracking of applied rules + super().apply(rule) # tracking of applied rules @dataclass @@ -48,10 +46,8 @@ def __post_init__(self): self.prefix = self.prefix or "" self.suffix = self.suffix or "" - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule, query: str - ) -> str: - super().apply(pipeline, rule, query) + def apply(self, rule: SigmaRule, query: str) -> str: + super().apply(rule, query) return self.prefix + query + self.suffix @@ -70,13 +66,11 @@ class QuerySimpleTemplateTransformation(QueryPostprocessingTransformation): template: str - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule, query: str - ) -> str: + def apply(self, rule: SigmaRule, query: str) -> str: return self.template.format( query=query, rule=rule, - pipeline=pipeline, + pipeline=self._pipeline, ) @@ -95,10 +89,8 @@ class QueryTemplateTransformation(QueryPostprocessingTransformation, TemplateBas controls the Jinja2 HTML/XML auto-escaping. """ - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule, query: str - ) -> str: - return self.j2template.render(query=query, rule=rule, pipeline=pipeline) + def apply(self, rule: SigmaRule, query: str) -> str: + return self.j2template.render(query=query, rule=rule, pipeline=self._pipeline) @dataclass @@ -123,10 +115,8 @@ def _replace_placeholder( def __post_init__(self): self.parsed_json = json.loads(self.json_template) - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule, query: str - ): - super().apply(pipeline, rule, query) + def apply(self, rule: SigmaRule, query: str): + super().apply(rule, query) return json.dumps(self._replace_placeholder(self.parsed_json, query)) @@ -140,10 +130,8 @@ class ReplaceQueryTransformation(QueryPostprocessingTransformation): def __post_init__(self): self.re = re.compile(self.pattern) - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule, query: str - ): - super().apply(pipeline, rule, query) + def apply(self, rule: SigmaRule, query: str): + super().apply(rule, query) return self.re.sub(self.replacement, query) @@ -177,12 +165,10 @@ def from_dict(cls, d: Dict[str, Any]) -> "NestedQueryPostprocessingTransformatio "Nested post-processing transformation requires an 'items' key." ) - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule, query: Any - ) -> Any: - super().apply(pipeline, rule, query) + def apply(self, rule: SigmaRule, query: Any) -> Any: + super().apply(rule, query) query = self._nested_pipeline.postprocess_query(rule, query) - pipeline.applied_ids.update(self._nested_pipeline.applied_ids) + self._pipeline.applied_ids.update(self._nested_pipeline.applied_ids) return query diff --git a/sigma/processing/transformations/base.py b/sigma/processing/transformations/base.py index 972a04c5..5de785b3 100644 --- a/sigma/processing/transformations/base.py +++ b/sigma/processing/transformations/base.py @@ -15,6 +15,7 @@ from sigma.rule import SigmaRule, SigmaDetection, SigmaDetectionItem from sigma.exceptions import ( SigmaConfigurationError, + SigmaTransformationError, ) from sigma.types import ( SigmaString, @@ -34,6 +35,9 @@ class Transformation(ABC): processing_item: Optional["sigma.processing.pipeline.ProcessingItem"] = field( init=False, compare=False, default=None ) + _pipeline: Optional["sigma.processing.pipeline.ProcessingPipeline"] = field( + init=False, compare=False, default=None + ) @classmethod def from_dict(cls, d: dict) -> "Transformation": @@ -45,15 +49,20 @@ def from_dict(cls, d: dict) -> "Transformation": @abstractmethod def apply( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule], ) -> None: """Apply transformation on Sigma rule.""" - self._pipeline: "sigma.processing.pipeline.ProcessingPipeline" = ( - pipeline # make pipeline accessible from all further options in class property - ) self.processing_item_applied(rule) + def set_pipeline(self, pipeline: "sigma.processing.pipeline.ProcessingPipeline"): + if self._pipeline is None: + self._pipeline = pipeline + else: + raise SigmaTransformationError(f"Pipeline for transformation was already set.") + + def _clear_pipeline(self) -> None: + self._pipeline = None + def set_processing_item(self, processing_item: "sigma.processing.pipeline.ProcessingItem"): self.processing_item = processing_item @@ -98,17 +107,15 @@ def apply_detection(self, detection: SigmaDetection): else: if ( self.processing_item is None - or self.processing_item.match_detection_item(self._pipeline, detection_item) + or self.processing_item.match_detection_item(detection_item) ) and (r := self.apply_detection_item(detection_item)) is not None: if isinstance(r, SigmaDetectionItem): r.disable_conversion_to_plain() detection.detection_items[i] = r self.processing_item_applied(r) - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule - ) -> None: - super().apply(pipeline, rule) + def apply(self, rule: SigmaRule) -> None: + super().apply(rule) if isinstance(rule, SigmaRule): for detection in rule.detection.detections.values(): self.apply_detection(detection) @@ -128,17 +135,15 @@ def apply_field_name(self, field: str) -> List[str]: a list of strings that are expanded into a new field list. """ - def _apply_field_name( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", field: str - ) -> List[str]: + def _apply_field_name(self, field: str) -> List[str]: """ Evaluate field name conditions and perform transformation with apply_field_name() method if condition matches, else return original value. """ - if self.processing_item is None or self.processing_item.match_field_name(pipeline, field): + if self.processing_item is None or self.processing_item.match_field_name(field): result = self.apply_field_name(field) if self.processing_item is not None: - pipeline.track_field_processing_items( + self._pipeline.track_field_processing_items( field, result, self.processing_item.identifier ) return result @@ -147,12 +152,12 @@ def _apply_field_name( def apply( self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: Union[SigmaRule, SigmaCorrelationRule], ) -> None: """Apply field name transformations to Sigma rule field names listed in 'fields' attribute.""" - _apply_field_name = partial(self._apply_field_name, pipeline) - rule.fields = [item for mapping in map(_apply_field_name, rule.fields) for item in mapping] + rule.fields = [ + item for mapping in map(self._apply_field_name, rule.fields) for item in mapping + ] if isinstance(rule, SigmaCorrelationRule): if rule.group_by is not None: # first iterate over aliases, map the field names contained in them and keep track @@ -161,7 +166,7 @@ def apply( for alias in rule.aliases: aliases.add(alias.alias) for rule_reference, field_name in alias.mapping.items(): - mapped_field_name = _apply_field_name(field_name) + mapped_field_name = self._apply_field_name(field_name) if len(mapped_field_name) > 1: raise SigmaConfigurationError( "Field name mapping transformation can't be applied to correlation rule alias mapping because it results in multiple field names." @@ -173,20 +178,22 @@ def apply( item for field_name in rule.group_by for item in ( - _apply_field_name(field_name) if field_name not in aliases else [field_name] + self._apply_field_name(field_name) + if field_name not in aliases + else [field_name] ) ] # finally map the field name in the condition if rule.condition is not None and (fieldref := rule.condition.fieldref) is not None: - mapped_field = _apply_field_name(fieldref) + mapped_field = self._apply_field_name(fieldref) if len(mapped_field) > 1: raise SigmaConfigurationError( "Field name mapping transformation can't be applied to correlation rule condition field reference because it results in multiple field names." ) rule.condition.fieldref = mapped_field[0] - return super().apply(pipeline, rule) + return super().apply(rule) def apply_detection_item( self, detection_item: SigmaDetectionItem @@ -196,12 +203,12 @@ def apply_detection_item( match = False for value in detection_item.value: if self.processing_item is not None and self.processing_item.match_field_in_value( - self._pipeline, value + value ): new_values.extend( ( SigmaFieldReference(mapped_field) - for mapped_field in self._apply_field_name(self._pipeline, value.field) + for mapped_field in self._apply_field_name(value.field) ) ) match = True @@ -303,10 +310,8 @@ class ConditionTransformation(Transformation): takes care of marking condition as applied by processing item. """ - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule - ) -> None: - super().apply(pipeline, rule) + def apply(self, rule: SigmaRule) -> None: + super().apply(rule) if isinstance(rule, SigmaRule): for i, condition in enumerate(rule.detection.parsed_condition): condition_before = condition.condition diff --git a/sigma/processing/transformations/condition.py b/sigma/processing/transformations/condition.py index 98f547a8..c24f7670 100644 --- a/sigma/processing/transformations/condition.py +++ b/sigma/processing/transformations/condition.py @@ -37,9 +37,7 @@ def __post_init__(self): if self.name is None: # generate random detection item name if none is given self.name = "_cond_" + ("".join(random.choices(string.ascii_lowercase, k=10))) - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule - ) -> None: + def apply(self, rule: SigmaRule) -> None: if isinstance(rule, SigmaRule): if self.template: conditions = { @@ -66,7 +64,7 @@ def apply( rule.detection.detections[self.name] = SigmaDetection.from_definition(conditions) self.processing_item_applied(rule.detection.detections[self.name]) - super().apply(pipeline, rule) + super().apply(rule) def apply_condition(self, cond: SigmaCondition) -> None: cond.condition = ("not " if self.negated else "") + f"{self.name} and ({cond.condition})" diff --git a/sigma/processing/transformations/failure.py b/sigma/processing/transformations/failure.py index e98cec32..30b997eb 100644 --- a/sigma/processing/transformations/failure.py +++ b/sigma/processing/transformations/failure.py @@ -14,13 +14,14 @@ class RuleFailureTransformation(Transformation): Raise a SigmaTransformationError with the provided message. This enables transformation pipelines to signalize that a certain situation can't be handled, e.g. only a subset of values is allowed because the target data model doesn't offers all possibilities. + + This is a rule transformation. Detection item and field name conditions are not evaluated if + this is used. """ message: str - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule - ) -> None: + def apply(self, rule: SigmaRule) -> None: raise SigmaTransformationError(self.message) @@ -30,6 +31,9 @@ class DetectionItemFailureTransformation(DetectionItemTransformation): Raise a SigmaTransformationError with the provided message. This enables transformation pipelines to signalize that a certain situation can't be handled, e.g. only a subset of values is allowed because the target data model doesn't offers all possibilities. + + This is a detection item transformation that should be used if detection item or field name + conditions are used. """ message: str diff --git a/sigma/processing/transformations/fields.py b/sigma/processing/transformations/fields.py index 279ef906..d30c09b7 100644 --- a/sigma/processing/transformations/fields.py +++ b/sigma/processing/transformations/fields.py @@ -28,7 +28,7 @@ def apply_detection_item(self, detection_item: SigmaDetectionItem): super().apply_detection_item(detection_item) field = detection_item.field mapping = self.get_mapping(field) - if mapping is not None and self.processing_item.match_field_name(self._pipeline, field): + if mapping is not None and self.processing_item.match_field_name(field): self._pipeline.field_mappings.add_mapping(field, mapping) if isinstance(mapping, str): # 1:1 mapping, map field name of detection item directly detection_item.field = mapping @@ -80,7 +80,7 @@ def apply_detection_item(self, detection_item: SigmaDetectionItem): super().apply_detection_item(detection_item) f = detection_item.field mapping = self._transform_name(f) - if self.processing_item.match_field_name(self._pipeline, f): + if self.processing_item.match_field_name(f): self._pipeline.field_mappings.add_mapping(f, mapping) detection_item.field = mapping self.processing_item_applied(detection_item) @@ -100,8 +100,7 @@ class AddFieldnameSuffixTransformation(FieldMappingTransformationBase): def apply_detection_item(self, detection_item: SigmaDetectionItem): super().apply_detection_item(detection_item) if type(orig_field := detection_item.field) is str and ( - self.processing_item is None - or self.processing_item.match_field_name(self._pipeline, orig_field) + self.processing_item is None or self.processing_item.match_field_name(orig_field) ): detection_item.field += self.suffix self._pipeline.field_mappings.add_mapping(orig_field, detection_item.field) @@ -122,8 +121,7 @@ class AddFieldnamePrefixTransformation(FieldMappingTransformationBase): def apply_detection_item(self, detection_item: SigmaDetectionItem): super().apply_detection_item(detection_item) if type(orig_field := detection_item.field) is str and ( - self.processing_item is None - or self.processing_item.match_field_name(self._pipeline, orig_field) + self.processing_item is None or self.processing_item.match_field_name(orig_field) ): detection_item.field = self.prefix + detection_item.field self._pipeline.field_mappings.add_mapping(orig_field, detection_item.field) @@ -141,10 +139,8 @@ class AddFieldTransformation(Transformation): field: Union[str, List[str]] - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule - ) -> None: - super().apply(pipeline, rule) + def apply(self, rule: SigmaRule) -> None: + super().apply(rule) if isinstance(self.field, str): rule.fields.append(self.field) elif isinstance(self.field, list): @@ -160,10 +156,8 @@ class RemoveFieldTransformation(Transformation): field: Union[str, List[str]] - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule - ) -> None: - super().apply(pipeline, rule) + def apply(self, rule: SigmaRule) -> None: + super().apply(rule) if isinstance(self.field, str): try: rule.fields.remove(self.field) @@ -185,8 +179,6 @@ class SetFieldTransformation(Transformation): fields: List[str] - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule - ) -> None: - super().apply(pipeline, rule) + def apply(self, rule: SigmaRule) -> None: + super().apply(rule) rule.fields = self.fields diff --git a/sigma/processing/transformations/meta.py b/sigma/processing/transformations/meta.py index 85e5175c..4a598e35 100644 --- a/sigma/processing/transformations/meta.py +++ b/sigma/processing/transformations/meta.py @@ -42,15 +42,11 @@ def from_dict(cls, d: Dict) -> "NestedProcessingTransformation": "Nested processing transformation requires an 'items' key." ) - def apply( - self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", - rule: Union[SigmaRule, SigmaCorrelationRule], - ) -> None: - super().apply(pipeline, rule) + def apply(self, rule: Union[SigmaRule, SigmaCorrelationRule]) -> None: + super().apply(rule) self._nested_pipeline.apply(rule) - pipeline.applied.extend(self._nested_pipeline.applied) - pipeline.applied_ids.update(self._nested_pipeline.applied_ids) - pipeline.field_name_applied_ids.update(self._nested_pipeline.field_name_applied_ids) - pipeline.field_mappings.merge(self._nested_pipeline.field_mappings) - pipeline.state.update(self._nested_pipeline.state) + self._pipeline.applied.extend(self._nested_pipeline.applied) + self._pipeline.applied_ids.update(self._nested_pipeline.applied_ids) + self._pipeline.field_name_applied_ids.update(self._nested_pipeline.field_name_applied_ids) + self._pipeline.field_mappings.merge(self._nested_pipeline.field_mappings) + self._pipeline.state.update(self._nested_pipeline.state) diff --git a/sigma/processing/transformations/rule.py b/sigma/processing/transformations/rule.py index 9d23bf28..d13e66f5 100644 --- a/sigma/processing/transformations/rule.py +++ b/sigma/processing/transformations/rule.py @@ -23,10 +23,8 @@ class ChangeLogsourceTransformation(Transformation): product: Optional[str] = field(default=None) service: Optional[str] = field(default=None) - def apply( - self, pipeline: "sigma.processing.pipeline.ProcessingPipeline", rule: SigmaRule - ) -> None: - super().apply(pipeline, rule) + def apply(self, rule: SigmaRule) -> None: + super().apply(rule) logsource = SigmaLogSource(self.category, self.product, self.service) rule.logsource = logsource @@ -40,10 +38,6 @@ class SetCustomAttributeTransformation(Transformation): attribute: str value: Any - def apply( - self, - pipeline: "sigma.processing.pipeline.ProcessingPipeline", - rule: Union[SigmaRule, SigmaCorrelationRule], - ) -> None: - super().apply(pipeline, rule) + def apply(self, rule: Union[SigmaRule, SigmaCorrelationRule]) -> None: + super().apply(rule) rule.custom_attributes[self.attribute] = self.value diff --git a/sigma/processing/transformations/state.py b/sigma/processing/transformations/state.py index 1fe719b6..d15ee93e 100644 --- a/sigma/processing/transformations/state.py +++ b/sigma/processing/transformations/state.py @@ -12,6 +12,6 @@ class SetStateTransformation(Transformation): key: str val: Any - def apply(self, pipeline: "sigma.processing.pipeline.Proces", rule: SigmaRule) -> None: - super().apply(pipeline, rule) - pipeline.state[self.key] = self.val + def apply(self, rule: SigmaRule) -> None: + super().apply(rule) + self._pipeline.state[self.key] = self.val diff --git a/tests/test_finalization_tranformations.py b/tests/test_finalization_tranformations.py index ec9e4149..64103ceb 100644 --- a/tests/test_finalization_tranformations.py +++ b/tests/test_finalization_tranformations.py @@ -1,5 +1,5 @@ import pytest -from sigma.exceptions import SigmaConfigurationError +from sigma.exceptions import SigmaConfigurationError, SigmaTransformationError from sigma.processing.finalization import ( ConcatenateQueriesFinalizer, NestedFinalizer, @@ -8,10 +8,18 @@ from .test_processing_transformations import dummy_pipeline, sigma_rule +def test_finalization_multiple_pipeline_set(dummy_pipeline): + finalizer = ConcatenateQueriesFinalizer() + finalizer.set_pipeline(dummy_pipeline) + with pytest.raises(SigmaTransformationError, match="Pipeline.*already set"): + finalizer.set_pipeline(dummy_pipeline) + + def test_concatenate_queries_tranformation(dummy_pipeline): transformation = ConcatenateQueriesFinalizer(separator="', '", prefix="('", suffix="')") + transformation.set_pipeline(dummy_pipeline) assert ( - transformation.apply(dummy_pipeline, ['field1="value1"', 'field2="value2"']) + transformation.apply(['field1="value1"', 'field2="value2"']) == """('field1="value1"', 'field2="value2"')""" ) @@ -27,9 +35,9 @@ def test_template_transformation(dummy_pipeline): query{{ loop.index }} = {{ query }}{% endfor %} """ ) + transformation.set_pipeline(dummy_pipeline) assert ( transformation.apply( - dummy_pipeline, [ "fieldA=val1", "fieldB=val2", @@ -50,9 +58,9 @@ def test_template_transformation(dummy_pipeline): def test_template_transformation_from_file(dummy_pipeline): dummy_pipeline.state["setting"] = "value" transformation = TemplateFinalizer(template="finalize.j2", path="tests/files") + transformation.set_pipeline(dummy_pipeline) assert ( transformation.apply( - dummy_pipeline, [ "fieldA=val1", "fieldB=val2", @@ -71,12 +79,14 @@ def test_template_transformation_from_file(dummy_pipeline): @pytest.fixture def nested_finalizer(dummy_pipeline): - return NestedFinalizer( + nested_finalizer = NestedFinalizer( finalizers=[ ConcatenateQueriesFinalizer(separator="', '", prefix="('", suffix="')"), TemplateFinalizer("allOf({{ queries }})"), ] ) + nested_finalizer.set_pipeline(dummy_pipeline) + return nested_finalizer def test_nested_finalizer_from_dict(nested_finalizer): @@ -113,7 +123,6 @@ def test_nested_finalizer_no_type(): def test_nested_finalizer_apply(nested_finalizer): assert ( nested_finalizer.apply( - dummy_pipeline, [ "fieldA=val1", "fieldB=val2", diff --git a/tests/test_postprocessing_transformations.py b/tests/test_postprocessing_transformations.py index af8e74e4..e63cb92f 100644 --- a/tests/test_postprocessing_transformations.py +++ b/tests/test_postprocessing_transformations.py @@ -15,12 +15,14 @@ def test_embed_query_transformation(dummy_pipeline, sigma_rule): transformation = EmbedQueryTransformation("[ ", " ]") - assert transformation.apply(dummy_pipeline, sigma_rule, "field=value") == "[ field=value ]" + transformation.set_pipeline(dummy_pipeline) + assert transformation.apply(sigma_rule, "field=value") == "[ field=value ]" def test_embed_query_transformation_none(dummy_pipeline, sigma_rule): transformation = EmbedQueryTransformation() - assert transformation.apply(dummy_pipeline, sigma_rule, "field=value") == "field=value" + transformation.set_pipeline(dummy_pipeline) + assert transformation.apply(sigma_rule, "field=value") == "field=value" def test_query_simple_template_transformation( @@ -33,9 +35,10 @@ def test_query_simple_template_transformation( state = {pipeline.state[test]} """ ) + transformation.set_pipeline(dummy_pipeline) dummy_pipeline.state["test"] = "teststate" assert ( - transformation.apply(dummy_pipeline, sigma_rule, 'field="value"') + transformation.apply(sigma_rule, 'field="value"') == """ title = Test query = field="value" @@ -52,9 +55,10 @@ def test_query_template_transformation(dummy_pipeline: ProcessingPipeline, sigma state = {{ pipeline.state.test }} """ ) + transformation.set_pipeline(dummy_pipeline) dummy_pipeline.state["test"] = "teststate" assert ( - transformation.apply(dummy_pipeline, sigma_rule, 'field="value"') + transformation.apply(sigma_rule, 'field="value"') == """ title = Test query = field="value" @@ -65,8 +69,9 @@ def test_query_template_transformation(dummy_pipeline: ProcessingPipeline, sigma def test_embed_query_in_json_transformation_dict(dummy_pipeline, sigma_rule): transformation = EmbedQueryInJSONTransformation('{ "field": "value", "query": "%QUERY%" }') + transformation.set_pipeline(dummy_pipeline) assert ( - transformation.apply(dummy_pipeline, sigma_rule, 'field="value"') + transformation.apply(sigma_rule, 'field="value"') == '{"field": "value", "query": "field=\\"value\\""}' ) @@ -75,20 +80,22 @@ def test_embed_query_in_json_transformation_list(dummy_pipeline, sigma_rule): transformation = EmbedQueryInJSONTransformation( '{ "field": "value", "query": ["foo", "%QUERY%", "bar"] }' ) + transformation.set_pipeline(dummy_pipeline) assert ( - transformation.apply(dummy_pipeline, sigma_rule, 'field="value"') + transformation.apply(sigma_rule, 'field="value"') == '{"field": "value", "query": ["foo", "field=\\"value\\"", "bar"]}' ) def test_replace_query_transformation(dummy_pipeline, sigma_rule): transformation = ReplaceQueryTransformation("v\\w+e", "replaced") - assert transformation.apply(dummy_pipeline, sigma_rule, 'field="value"') == 'field="replaced"' + transformation.set_pipeline(dummy_pipeline) + assert transformation.apply(sigma_rule, 'field="value"') == 'field="replaced"' @pytest.fixture -def nested_query_postprocessing_transformation(): - return NestedQueryPostprocessingTransformation( +def nested_query_postprocessing_transformation(dummy_pipeline): + transformation = NestedQueryPostprocessingTransformation( items=[ QueryPostprocessingItem(ReplaceQueryTransformation("foo", "bar")), QueryPostprocessingItem(EmbedQueryTransformation("[", "]"), identifier="test"), @@ -97,6 +104,8 @@ def nested_query_postprocessing_transformation(): ), ] ) + transformation.set_pipeline(dummy_pipeline) + return transformation def test_nested_query_postprocessing_transformation_from_dict( @@ -128,10 +137,8 @@ def test_nested_query_postprocessing_transformation_no_items(): def test_nested_query_postprocessing_transformation( - nested_query_postprocessing_transformation, dummy_pipeline, sigma_rule + nested_query_postprocessing_transformation, sigma_rule ): - result = nested_query_postprocessing_transformation.apply( - dummy_pipeline, sigma_rule, 'field="foobar"' - ) + result = nested_query_postprocessing_transformation.apply(sigma_rule, 'field="foobar"') assert result == 'title = Test\nquery = [field="barbar"]' assert sigma_rule.was_processed_by("test") diff --git a/tests/test_processing_conditions.py b/tests/test_processing_conditions.py index 9483f365..caa95748 100644 --- a/tests/test_processing_conditions.py +++ b/tests/test_processing_conditions.py @@ -4,7 +4,11 @@ from sigma.correlations import SigmaCorrelationRule from sigma.types import SigmaNull, SigmaNumber, SigmaString from sigma import processing -from sigma.exceptions import SigmaConfigurationError, SigmaRegularExpressionError +from sigma.exceptions import ( + SigmaConfigurationError, + SigmaProcessingItemError, + SigmaRegularExpressionError, +) import pytest from sigma.processing.pipeline import ProcessingItem, ProcessingPipeline from sigma.processing.conditions import ( @@ -90,57 +94,55 @@ def sigma_rule(): ) -def test_logsource_match(dummy_processing_pipeline, sigma_rule): +def test_processing_condition_multiple_pipelines_set(dummy_processing_pipeline): + condition = IsSigmaRuleCondition() + condition.set_pipeline(dummy_processing_pipeline) + with pytest.raises(SigmaProcessingItemError, match="Pipeline.*was already set"): + condition.set_pipeline(dummy_processing_pipeline) + + +def test_logsource_match(sigma_rule): assert LogsourceCondition(category="test_category").match( - dummy_processing_pipeline, sigma_rule, ) -def test_logsource_no_match(dummy_processing_pipeline, sigma_rule): +def test_logsource_no_match(sigma_rule): assert not LogsourceCondition(category="test_category", product="other_product").match( - dummy_processing_pipeline, sigma_rule, ) -def test_logsource_match_correlation_rule_cat(dummy_processing_pipeline, sigma_correlated_rules): +def test_logsource_match_correlation_rule_cat(sigma_correlated_rules): sigma_correlated_rules.resolve_rule_references() assert LogsourceCondition(category="test_category").match( - dummy_processing_pipeline, cast(SigmaCorrelationRule, sigma_correlated_rules.rules[-1]), ) -def test_logsource_match_correlation_rule_prod(dummy_processing_pipeline, sigma_correlated_rules): +def test_logsource_match_correlation_rule_prod(sigma_correlated_rules): sigma_correlated_rules.resolve_rule_references() assert LogsourceCondition(product="test_product").match( - dummy_processing_pipeline, cast(SigmaCorrelationRule, sigma_correlated_rules.rules[-1]), ) -def test_logsource_no_match_correlation_rule_both( - dummy_processing_pipeline, sigma_correlated_rules -): +def test_logsource_no_match_correlation_rule_both(sigma_correlated_rules): sigma_correlated_rules.resolve_rule_references() assert not LogsourceCondition(category="test_category", product="test_product").match( - dummy_processing_pipeline, cast(SigmaCorrelationRule, sigma_correlated_rules.rules[-1]), ) -def test_logsource_no_match_correlation_rule(dummy_processing_pipeline, sigma_correlated_rules): +def test_logsource_no_match_correlation_rule(sigma_correlated_rules): sigma_correlated_rules.resolve_rule_references() assert not LogsourceCondition(service="test_service").match( - dummy_processing_pipeline, cast(SigmaCorrelationRule, sigma_correlated_rules.rules[-1]), ) -def test_logsource_no_rule_correlation_rule(dummy_processing_pipeline, sigma_correlation_rule): +def test_logsource_no_rule_correlation_rule(sigma_correlation_rule): assert not LogsourceCondition(category="test_category", product="other_product").match( - dummy_processing_pipeline, sigma_correlation_rule, ) @@ -148,19 +150,15 @@ def test_logsource_no_rule_correlation_rule(dummy_processing_pipeline, sigma_cor from tests.test_processing_pipeline import processing_item -def test_rule_processing_item_applied( - dummy_processing_pipeline, processing_item, sigma_rule: SigmaRule -): +def test_rule_processing_item_applied(processing_item, sigma_rule: SigmaRule): sigma_rule.add_applied_processing_item(processing_item) assert RuleProcessingItemAppliedCondition(processing_item_id="test").match( - dummy_processing_pipeline, sigma_rule, ) -def test_rule_processing_item_not_applied(dummy_processing_pipeline, sigma_rule: SigmaRule): +def test_rule_processing_item_not_applied(sigma_rule: SigmaRule): assert not RuleProcessingItemAppliedCondition(processing_item_id="test").match( - dummy_processing_pipeline, sigma_rule, ) @@ -168,280 +166,217 @@ def test_rule_processing_item_not_applied(dummy_processing_pipeline, sigma_rule: def test_rule_state_match(dummy_processing_pipeline, sigma_rule): dummy_processing_pipeline.state["key"] = "value" dummy_processing_pipeline.state["number"] = 123 - assert RuleProcessingStateCondition("key", "value").match(dummy_processing_pipeline, sigma_rule) - assert RuleProcessingStateCondition("key", "other_value", "ne").match( - dummy_processing_pipeline, sigma_rule - ) - assert RuleProcessingStateCondition("number", 123, "gte").match( - dummy_processing_pipeline, sigma_rule - ) - assert RuleProcessingStateCondition("number", 123, "lte").match( - dummy_processing_pipeline, sigma_rule - ) - assert RuleProcessingStateCondition("number", 122, "gt").match( - dummy_processing_pipeline, sigma_rule - ) - assert RuleProcessingStateCondition("number", 124, "lt").match( - dummy_processing_pipeline, sigma_rule - ) + condition = RuleProcessingStateCondition("key", "value") + condition.set_pipeline(dummy_processing_pipeline) + assert condition.match(sigma_rule) + + condition = RuleProcessingStateCondition("key", "other_value", "ne") + condition.set_pipeline(dummy_processing_pipeline) + assert condition.match(sigma_rule) + + condition = RuleProcessingStateCondition("number", 123, "gte") + condition.set_pipeline(dummy_processing_pipeline) + assert condition.match(sigma_rule) + + condition = RuleProcessingStateCondition("number", 123, "lte") + condition.set_pipeline(dummy_processing_pipeline) + assert condition.match(sigma_rule) + + condition = RuleProcessingStateCondition("number", 122, "gt") + condition.set_pipeline(dummy_processing_pipeline) + assert condition.match(sigma_rule) -def test_rule_state_nomatch(dummy_processing_pipeline, sigma_rule): + condition = RuleProcessingStateCondition("number", 124, "lt") + condition.set_pipeline(dummy_processing_pipeline) + assert condition.match(sigma_rule) + + +def test_rule_state_nomatch(sigma_rule, dummy_processing_pipeline): dummy_processing_pipeline.state["key"] = "value" - assert not RuleProcessingStateCondition("key", "other_value").match( - dummy_processing_pipeline, sigma_rule - ) + condition = RuleProcessingStateCondition("key", "other_value") + condition.set_pipeline(dummy_processing_pipeline) + assert not condition.match(sigma_rule) -def test_rule_processing_item_applied_correlation_rule( - dummy_processing_pipeline, processing_item, sigma_correlation_rule -): +def test_rule_processing_item_applied_correlation_rule(processing_item, sigma_correlation_rule): assert not RuleProcessingItemAppliedCondition(processing_item_id="test").match( - dummy_processing_pipeline, sigma_correlation_rule, ) sigma_correlation_rule.add_applied_processing_item(processing_item) assert RuleProcessingItemAppliedCondition(processing_item_id="test").match( - dummy_processing_pipeline, sigma_correlation_rule, ) -def test_rule_contains_detection_item_match(sigma_rule, dummy_processing_pipeline): - assert RuleContainsDetectionItemCondition(field="fieldA", value="value").match( - dummy_processing_pipeline, sigma_rule - ) +def test_rule_contains_detection_item_match(sigma_rule): + assert RuleContainsDetectionItemCondition(field="fieldA", value="value").match(sigma_rule) -def test_rule_contains_detection_item_nomatch_field(sigma_rule, dummy_processing_pipeline): - assert not RuleContainsDetectionItemCondition(field="fieldB", value="value").match( - dummy_processing_pipeline, sigma_rule - ) +def test_rule_contains_detection_item_nomatch_field(sigma_rule): + assert not RuleContainsDetectionItemCondition(field="fieldB", value="value").match(sigma_rule) -def test_rule_contains_detection_item_nomatch_value(sigma_rule, dummy_processing_pipeline): - assert not RuleContainsDetectionItemCondition(field="fieldA", value="valuex").match( - dummy_processing_pipeline, sigma_rule - ) +def test_rule_contains_detection_item_nomatch_value(sigma_rule): + assert not RuleContainsDetectionItemCondition(field="fieldA", value="valuex").match(sigma_rule) -def test_rule_contains_detection_item_correlation_rule( - sigma_correlation_rule, dummy_processing_pipeline -): +def test_rule_contains_detection_item_correlation_rule(sigma_correlation_rule): assert not RuleContainsDetectionItemCondition(field="fieldA", value="value").match( - dummy_processing_pipeline, sigma_correlation_rule + sigma_correlation_rule ) -def test_rule_contains_field_match(dummy_processing_pipeline, sigma_rule): - assert RuleContainsFieldCondition("fieldA").match(dummy_processing_pipeline, sigma_rule) +def test_rule_contains_field_match(sigma_rule): + assert RuleContainsFieldCondition("fieldA").match(sigma_rule) -def test_rule_contains_field_nomatch(dummy_processing_pipeline, sigma_rule): - assert not RuleContainsFieldCondition("non_existing").match( - dummy_processing_pipeline, sigma_rule - ) +def test_rule_contains_field_nomatch(sigma_rule): + assert not RuleContainsFieldCondition("non_existing").match(sigma_rule) -def test_rule_contains_field_correlation_rule(dummy_processing_pipeline, sigma_correlation_rule): - assert not RuleContainsFieldCondition("fieldA").match( - dummy_processing_pipeline, sigma_correlation_rule - ) +def test_rule_contains_field_correlation_rule(sigma_correlation_rule): + assert not RuleContainsFieldCondition("fieldA").match(sigma_correlation_rule) -def test_is_sigma_rule_with_rule(dummy_processing_pipeline, sigma_rule): - assert IsSigmaRuleCondition().match(dummy_processing_pipeline, sigma_rule) +def test_is_sigma_rule_with_rule(sigma_rule): + assert IsSigmaRuleCondition().match(sigma_rule) -def test_is_sigma_rule_with_correlation_rule(dummy_processing_pipeline, sigma_correlation_rule): - assert not IsSigmaRuleCondition().match(dummy_processing_pipeline, sigma_correlation_rule) +def test_is_sigma_rule_with_correlation_rule(sigma_correlation_rule): + assert not IsSigmaRuleCondition().match(sigma_correlation_rule) -def test_is_sigma_correlation_rule_with_correlation_rule( - dummy_processing_pipeline, sigma_correlation_rule -): - assert IsSigmaCorrelationRuleCondition().match( - dummy_processing_pipeline, sigma_correlation_rule - ) +def test_is_sigma_correlation_rule_with_correlation_rule(sigma_correlation_rule): + assert IsSigmaCorrelationRuleCondition().match(sigma_correlation_rule) -def test_is_sigma_correlation_rule_with_rule(dummy_processing_pipeline, sigma_rule): - assert not IsSigmaCorrelationRuleCondition().match(dummy_processing_pipeline, sigma_rule) +def test_is_sigma_correlation_rule_with_rule(sigma_rule): + assert not IsSigmaCorrelationRuleCondition().match(sigma_rule) -def test_rule_attribute_condition_str_match(dummy_processing_pipeline, sigma_rule): - assert RuleAttributeCondition("taxonomy", "test").match(dummy_processing_pipeline, sigma_rule) +def test_rule_attribute_condition_str_match(sigma_rule): + assert RuleAttributeCondition("taxonomy", "test").match(sigma_rule) -def test_rule_attribute_condition_invalid_str_op(dummy_processing_pipeline, sigma_rule): +def test_rule_attribute_condition_invalid_str_op(sigma_rule): with pytest.raises(SigmaConfigurationError, match="Invalid operation.*for string"): - RuleAttributeCondition("taxonomy", "test", "gte").match( - dummy_processing_pipeline, sigma_rule - ) + RuleAttributeCondition("taxonomy", "test", "gte").match(sigma_rule) -def test_rule_attribute_condition_invalid_op(dummy_processing_pipeline, sigma_rule): +def test_rule_attribute_condition_invalid_op(sigma_rule): with pytest.raises(SigmaConfigurationError, match="Invalid operation"): RuleAttributeCondition("custom", "123.4", "invalid") -def test_rule_attribute_condition_uuid_match(dummy_processing_pipeline, sigma_rule): - assert RuleAttributeCondition("id", "809718e3-f7f5-46f1-931e-d036f0ffb0af").match( - dummy_processing_pipeline, sigma_rule - ) +def test_rule_attribute_condition_uuid_match(sigma_rule): + assert RuleAttributeCondition("id", "809718e3-f7f5-46f1-931e-d036f0ffb0af").match(sigma_rule) -def test_rule_attribute_condition_custom_field_numeric_match(dummy_processing_pipeline, sigma_rule): - assert RuleAttributeCondition("custom", "123.4", "lte").match( - dummy_processing_pipeline, sigma_rule - ) +def test_rule_attribute_condition_custom_field_numeric_match(sigma_rule): + assert RuleAttributeCondition("custom", "123.4", "lte").match(sigma_rule) -def test_rule_attribute_condition_invalid_numeric_value(dummy_processing_pipeline, sigma_rule): +def test_rule_attribute_condition_invalid_numeric_value(sigma_rule): with pytest.raises(SigmaConfigurationError, match="Invalid number"): - RuleAttributeCondition("custom", "something", "lte").match( - dummy_processing_pipeline, sigma_rule - ) + RuleAttributeCondition("custom", "something", "lte").match(sigma_rule) -def test_rule_attribute_condition_date_match(dummy_processing_pipeline, sigma_rule): - assert RuleAttributeCondition("date", "2022-02-23", "lt").match( - dummy_processing_pipeline, sigma_rule - ) +def test_rule_attribute_condition_date_match(sigma_rule): + assert RuleAttributeCondition("date", "2022-02-23", "lt").match(sigma_rule) -def test_rule_attribute_condition_invalid_date(dummy_processing_pipeline, sigma_rule): +def test_rule_attribute_condition_invalid_date(sigma_rule): with pytest.raises(SigmaConfigurationError, match="Invalid date"): - RuleAttributeCondition("date", "2022-02-23T00:00:00", "lt").match( - dummy_processing_pipeline, sigma_rule - ) + RuleAttributeCondition("date", "2022-02-23T00:00:00", "lt").match(sigma_rule) -def test_rule_attribute_condition_sigmalevel_match(dummy_processing_pipeline, sigma_rule): - assert RuleAttributeCondition("level", "high", "lt").match( - dummy_processing_pipeline, sigma_rule - ) +def test_rule_attribute_condition_sigmalevel_match(sigma_rule): + assert RuleAttributeCondition("level", "high", "lt").match(sigma_rule) -def test_rule_attribute_condition_invalid_sigmalevel(dummy_processing_pipeline, sigma_rule): +def test_rule_attribute_condition_invalid_sigmalevel(sigma_rule): with pytest.raises(SigmaConfigurationError, match="Invalid Sigma severity level"): - RuleAttributeCondition("level", "invalid", "lt").match( - dummy_processing_pipeline, sigma_rule - ) + RuleAttributeCondition("level", "invalid", "lt").match(sigma_rule) -def test_rule_attribute_condition_sigmastatus_match(dummy_processing_pipeline, sigma_rule): - assert RuleAttributeCondition("status", "stable", "lt").match( - dummy_processing_pipeline, sigma_rule - ) +def test_rule_attribute_condition_sigmastatus_match(sigma_rule): + assert RuleAttributeCondition("status", "stable", "lt").match(sigma_rule) -def test_rule_attribute_condition_invalid_sigmastatus(dummy_processing_pipeline, sigma_rule): +def test_rule_attribute_condition_invalid_sigmastatus(sigma_rule): with pytest.raises(SigmaConfigurationError, match="Invalid Sigma status"): - RuleAttributeCondition("status", "invalid", "lt").match( - dummy_processing_pipeline, sigma_rule - ) + RuleAttributeCondition("status", "invalid", "lt").match(sigma_rule) -def test_rule_attribute_condition_invalid_rule_field_type(dummy_processing_pipeline, sigma_rule): +def test_rule_attribute_condition_invalid_rule_field_type(sigma_rule): with pytest.raises(SigmaConfigurationError, match="Unsupported type"): - RuleAttributeCondition("related", "08fbc97d-0a2f-491c-ae21-8ffcfd3174e9").match( - dummy_processing_pipeline, sigma_rule - ) + RuleAttributeCondition("related", "08fbc97d-0a2f-491c-ae21-8ffcfd3174e9").match(sigma_rule) -def test_rule_tag_condition_match(dummy_processing_pipeline, sigma_rule): - assert RuleTagCondition("test.tag").match(dummy_processing_pipeline, sigma_rule) +def test_rule_tag_condition_match(sigma_rule): + assert RuleTagCondition("test.tag").match(sigma_rule) -def test_rule_tag_condition_nomatch(dummy_processing_pipeline, sigma_rule): - assert not RuleTagCondition("test.notag").match(dummy_processing_pipeline, sigma_rule) +def test_rule_tag_condition_nomatch(sigma_rule): + assert not RuleTagCondition("test.notag").match(sigma_rule) -def test_include_field_condition_match(dummy_processing_pipeline, detection_item): - assert IncludeFieldCondition(["field", "otherfield"]).match_field_name( - dummy_processing_pipeline, "field" - ) +def test_include_field_condition_match(): + assert IncludeFieldCondition(["field", "otherfield"]).match_field_name("field") -def test_include_field_condition_match_nofield(dummy_processing_pipeline, detection_item_nofield): - assert not IncludeFieldCondition(["field", "otherfield"]).match_field_name( - dummy_processing_pipeline, None - ) +def test_include_field_condition_match_nofield(): + assert not IncludeFieldCondition(["field", "otherfield"]).match_field_name(None) -def test_include_field_condition_nomatch(dummy_processing_pipeline, detection_item): - assert not IncludeFieldCondition(["testfield", "otherfield"]).match_field_name( - dummy_processing_pipeline, "field" - ) +def test_include_field_condition_nomatch(): + assert not IncludeFieldCondition(["testfield", "otherfield"]).match_field_name("field") -def test_include_field_condition_re_match(dummy_processing_pipeline, detection_item): - assert IncludeFieldCondition(["o[0-9]+", "f.*"], "re").match_field_name( - dummy_processing_pipeline, "field" - ) +def test_include_field_condition_re_match(): + assert IncludeFieldCondition(["o[0-9]+", "f.*"], "re").match_field_name("field") -def test_include_field_condition_re_match_nofield( - dummy_processing_pipeline, detection_item_nofield -): - assert not IncludeFieldCondition(["o[0-9]+", "f.*"], "re").match_field_name( - dummy_processing_pipeline, None - ) +def test_include_field_condition_re_match_nofield(): + assert not IncludeFieldCondition(["o[0-9]+", "f.*"], "re").match_field_name(None) -def test_include_field_condition_re_nomatch(dummy_processing_pipeline, detection_item): - assert not IncludeFieldCondition(["o[0-9]+", "x.*"], "re").match_field_name( - dummy_processing_pipeline, "field" - ) +def test_include_field_condition_re_nomatch(): + assert not IncludeFieldCondition(["o[0-9]+", "x.*"], "re").match_field_name("field") -def test_include_field_condition_wrong_type(dummy_processing_pipeline, detection_item): +def test_include_field_condition_wrong_type(): with pytest.raises(SigmaConfigurationError, match="Invalid.*type"): IncludeFieldCondition(["field", "otherfield"], "invalid") -def test_exclude_field_condition_match(dummy_processing_pipeline, detection_item): - assert ( - ExcludeFieldCondition(["field", "otherfield"]).match_field_name( - dummy_processing_pipeline, "field" - ) - == False - ) +def test_exclude_field_condition_match(): + assert ExcludeFieldCondition(["field", "otherfield"]).match_field_name("field") == False -def test_exclude_field_condition_nomatch(dummy_processing_pipeline, detection_item): - assert ( - ExcludeFieldCondition(["testfield", "otherfield"]).match_field_name( - dummy_processing_pipeline, "field" - ) - == True - ) +def test_exclude_field_condition_nomatch(): + assert ExcludeFieldCondition(["testfield", "otherfield"]).match_field_name("field") == True -def test_exclude_field_condition_re_match(dummy_processing_pipeline, detection_item): - assert ( - ExcludeFieldCondition(["o[0-9]+", "f.*"], "re").match_field_name( - dummy_processing_pipeline, "field" - ) - == False - ) +def test_exclude_field_condition_re_match(): + assert ExcludeFieldCondition(["o[0-9]+", "f.*"], "re").match_field_name("field") == False -def test_exclude_field_condition_re_nomatch(dummy_processing_pipeline, detection_item): - assert ( - ExcludeFieldCondition(["o[0-9]+", "x.*"], "re").match_field_name( - dummy_processing_pipeline, "field" - ) - == True - ) +def test_exclude_field_condition_re_nomatch(): + assert ExcludeFieldCondition(["o[0-9]+", "x.*"], "re").match_field_name("field") == True def test_field_state_condition_match(dummy_processing_pipeline): dummy_processing_pipeline.state["field"] = "value" - assert FieldNameProcessingStateCondition("field", "value").match_field_name( - dummy_processing_pipeline, "field" - ) + condition = FieldNameProcessingStateCondition("field", "value") + condition.set_pipeline(dummy_processing_pipeline) + assert condition.match_field_name("field") + + +def test_processing_state_condition_base_no_pipeline(): + with pytest.raises(SigmaProcessingItemError, match="No processing pipeline"): + FieldNameProcessingStateCondition("field", "value").match_field_name("field") @pytest.fixture @@ -449,42 +384,31 @@ def multivalued_detection_item(): return SigmaDetectionItem("field", [], [SigmaString("value"), SigmaNumber(123)]) -def test_match_string_condition_any( - dummy_processing_pipeline, multivalued_detection_item: SigmaDetectionItem -): +def test_match_string_condition_any(multivalued_detection_item: SigmaDetectionItem): assert ( - MatchStringCondition(pattern="^val.*", cond="any").match( - dummy_processing_pipeline, multivalued_detection_item - ) - == True + MatchStringCondition(pattern="^val.*", cond="any").match(multivalued_detection_item) == True ) -def test_match_string_condition_all( - dummy_processing_pipeline, multivalued_detection_item: SigmaDetectionItem -): +def test_match_string_condition_all(multivalued_detection_item: SigmaDetectionItem): assert ( - MatchStringCondition(pattern="^val.*", cond="all").match( - dummy_processing_pipeline, multivalued_detection_item - ) + MatchStringCondition(pattern="^val.*", cond="all").match(multivalued_detection_item) == False ) -def test_match_string_condition_all_sametype(dummy_processing_pipeline): +def test_match_string_condition_all_sametype(): assert ( MatchStringCondition(pattern="^val.*", cond="all").match( - dummy_processing_pipeline, SigmaDetectionItem("field", [], [SigmaString("val1"), SigmaString("val2")]), ) == True ) -def test_match_string_condition_all_negated(dummy_processing_pipeline): +def test_match_string_condition_all_negated(): assert ( MatchStringCondition(pattern="^val.*", cond="all", negate=True).match( - dummy_processing_pipeline, SigmaDetectionItem("field", [], [SigmaString("val1"), SigmaString("val2")]), ) == False @@ -501,30 +425,30 @@ def test_match_string_condition_error_mode(): MatchStringCondition(pattern="*", cond="any") -def test_contains_wildcard_condition_match(dummy_processing_pipeline): +def test_contains_wildcard_condition_match(): assert ContainsWildcardCondition(cond="any").match( - dummy_processing_pipeline, SigmaDetectionItem("field", [], [SigmaString("*")]) + SigmaDetectionItem("field", [], [SigmaString("*")]) ) -def test_contains_wildcard_condition_nomatch(dummy_processing_pipeline): +def test_contains_wildcard_condition_nomatch(): assert not ContainsWildcardCondition(cond="any").match( - dummy_processing_pipeline, SigmaDetectionItem("field", [], [SigmaString("value")]) + SigmaDetectionItem("field", [], [SigmaString("value")]) ) -def test_contains_wildcard_condition_nostring(dummy_processing_pipeline): +def test_contains_wildcard_condition_nostring(): assert not ContainsWildcardCondition(cond="any").match( - dummy_processing_pipeline, SigmaDetectionItem("field", [], [SigmaNumber(123)]) + SigmaDetectionItem("field", [], [SigmaNumber(123)]) ) -def test_isnull_condition_match(dummy_processing_pipeline, detection_item_null_value): - assert IsNullCondition(cond="all").match(dummy_processing_pipeline, detection_item_null_value) +def test_isnull_condition_match(detection_item_null_value): + assert IsNullCondition(cond="all").match(detection_item_null_value) -def test_isnull_condition_nomatch(dummy_processing_pipeline, detection_item): - assert not IsNullCondition(cond="all").match(dummy_processing_pipeline, detection_item) +def test_isnull_condition_nomatch(detection_item): + assert not IsNullCondition(cond="all").match(detection_item) def test_value_processing_invalid_cond(): @@ -533,20 +457,16 @@ def test_value_processing_invalid_cond(): def test_detection_item_processing_item_applied( - dummy_processing_pipeline, processing_item, detection_item: SigmaDetectionItem + processing_item, detection_item: SigmaDetectionItem ): detection_item.add_applied_processing_item(processing_item) assert DetectionItemProcessingItemAppliedCondition(processing_item_id="test").match( - dummy_processing_pipeline, detection_item, ) -def test_detection_item_processing_item_not_applied( - dummy_processing_pipeline, processing_item, detection_item: SigmaDetectionItem -): +def test_detection_item_processing_item_not_applied(detection_item: SigmaDetectionItem): assert not DetectionItemProcessingItemAppliedCondition(processing_item_id="test").match( - dummy_processing_pipeline, detection_item, ) @@ -559,22 +479,22 @@ def pipeline_field_tracking(): def test_field_name_processing_item_applied(pipeline_field_tracking): - assert FieldNameProcessingItemAppliedCondition( - processing_item_id="processing_item" - ).match_field_name(pipeline_field_tracking, "fieldA") + condition = FieldNameProcessingItemAppliedCondition(processing_item_id="processing_item") + condition.set_pipeline(pipeline_field_tracking) + assert condition.match_field_name("fieldA") def test_field_name_processing_item_not_applied(pipeline_field_tracking): - assert not FieldNameProcessingItemAppliedCondition( - processing_item_id="processing_item" - ).match_field_name(pipeline_field_tracking, "fieldC") + condition = FieldNameProcessingItemAppliedCondition(processing_item_id="processing_item") + condition.set_pipeline(pipeline_field_tracking) + assert not condition.match_field_name("fieldC") -def test_detection_item_state_match(dummy_processing_pipeline, detection_item): +def test_detection_item_state_match(detection_item, dummy_processing_pipeline): dummy_processing_pipeline.state["field"] = "value" - assert DetectionItemProcessingStateCondition("field", "value").match( - dummy_processing_pipeline, detection_item - ) + condition = DetectionItemProcessingStateCondition("field", "value") + condition.set_pipeline(dummy_processing_pipeline) + assert condition.match(detection_item) def test_condition_identifiers_completeness(): diff --git a/tests/test_processing_pipeline.py b/tests/test_processing_pipeline.py index d76d1f91..c39d891a 100644 --- a/tests/test_processing_pipeline.py +++ b/tests/test_processing_pipeline.py @@ -34,7 +34,7 @@ class RuleConditionTrue(RuleProcessingCondition): dummy: str - def match(self, pipeline: ProcessingPipeline, rule: SigmaRule) -> bool: + def match(self, rule: SigmaRule) -> bool: return True @@ -42,7 +42,7 @@ def match(self, pipeline: ProcessingPipeline, rule: SigmaRule) -> bool: class RuleConditionFalse(RuleProcessingCondition): dummy: str - def match(self, pipeline: ProcessingPipeline, rule: SigmaRule) -> bool: + def match(self, rule: SigmaRule) -> bool: return False @@ -50,7 +50,7 @@ def match(self, pipeline: ProcessingPipeline, rule: SigmaRule) -> bool: class DetectionItemConditionTrue(DetectionItemProcessingCondition): dummy: str - def match(self, pipeline: ProcessingPipeline, detection_item: SigmaDetectionItem) -> bool: + def match(self, detection_item: SigmaDetectionItem) -> bool: return True @@ -58,7 +58,7 @@ def match(self, pipeline: ProcessingPipeline, detection_item: SigmaDetectionItem class DetectionItemConditionFalse(DetectionItemProcessingCondition): dummy: str - def match(self, pipeline: ProcessingPipeline, detection_item: SigmaDetectionItem) -> bool: + def match(self, detection_item: SigmaDetectionItem) -> bool: return False @@ -66,7 +66,8 @@ def match(self, pipeline: ProcessingPipeline, detection_item: SigmaDetectionItem class TransformationPrepend(Transformation): s: str - def apply(self, pipeline: ProcessingPipeline, rule: SigmaRule) -> SigmaRule: + def apply(self, rule: SigmaRule) -> SigmaRule: + super().apply(rule) rule.title = self.s + rule.title return rule @@ -75,8 +76,8 @@ def apply(self, pipeline: ProcessingPipeline, rule: SigmaRule) -> SigmaRule: class TransformationAppend(Transformation): s: str - def apply(self, pipeline: ProcessingPipeline, rule: SigmaRule) -> SigmaRule: - super().apply(pipeline, rule) + def apply(self, rule: SigmaRule) -> SigmaRule: + super().apply(rule) rule.title += self.s return rule @@ -314,12 +315,12 @@ def test_processingitem_fromdict_unknown_transformation_parameter(): ) -def test_processingitem_apply(processing_item, dummy_processing_pipeline, sigma_rule): - applied = processing_item.apply(dummy_processing_pipeline, sigma_rule) +def test_processingitem_apply(processing_item, sigma_rule): + applied = processing_item.apply(sigma_rule) assert applied and sigma_rule.title == "TestTest" -def test_processingitem_apply_notapplied_all_with_false(dummy_processing_pipeline, sigma_rule): +def test_processingitem_apply_notapplied_all_with_false(sigma_rule): processing_item = ProcessingItem( transformation=TransformationAppend(s="Test"), rule_condition_linking=all, @@ -328,11 +329,11 @@ def test_processingitem_apply_notapplied_all_with_false(dummy_processing_pipelin RuleConditionFalse(dummy="test-false"), ], ) - applied = processing_item.apply(dummy_processing_pipeline, sigma_rule) + applied = processing_item.apply(sigma_rule) assert not applied and sigma_rule.title == "Test" -def test_processingitem_apply_negated_true(dummy_processing_pipeline, sigma_rule): +def test_processingitem_apply_negated_true(sigma_rule): processing_item = ProcessingItem( transformation=TransformationAppend(s="Test"), rule_condition_negation=True, @@ -340,11 +341,11 @@ def test_processingitem_apply_negated_true(dummy_processing_pipeline, sigma_rule RuleConditionTrue(dummy="test-true"), ], ) - applied = processing_item.apply(dummy_processing_pipeline, sigma_rule) + applied = processing_item.apply(sigma_rule) assert not applied and sigma_rule.title == "Test" -def test_processingitem_apply_negated_false(dummy_processing_pipeline, sigma_rule): +def test_processingitem_apply_negated_false(sigma_rule): processing_item = ProcessingItem( transformation=TransformationAppend(s="Test"), rule_condition_negation=True, @@ -352,11 +353,11 @@ def test_processingitem_apply_negated_false(dummy_processing_pipeline, sigma_rul RuleConditionFalse(dummy="test-false"), ], ) - applied = processing_item.apply(dummy_processing_pipeline, sigma_rule) + applied = processing_item.apply(sigma_rule) assert applied and sigma_rule.title == "TestTest" -def test_processingitem_apply_notapplied_all_with_false(dummy_processing_pipeline, sigma_rule): +def test_processingitem_apply_notapplied_all_with_false(sigma_rule): processing_item = ProcessingItem( transformation=TransformationAppend(s="Test"), rule_condition_linking=all, @@ -365,11 +366,11 @@ def test_processingitem_apply_notapplied_all_with_false(dummy_processing_pipelin RuleConditionFalse(dummy="test-false"), ], ) - applied = processing_item.apply(dummy_processing_pipeline, sigma_rule) + applied = processing_item.apply(sigma_rule) assert not applied and sigma_rule.title == "Test" -def test_processingitem_match_detection_item(dummy_processing_pipeline, detection_item): +def test_processingitem_match_detection_item(detection_item): processing_item = ProcessingItem( transformation=TransformationAppend(s="Test"), detection_item_condition_linking=any, @@ -378,12 +379,10 @@ def test_processingitem_match_detection_item(dummy_processing_pipeline, detectio DetectionItemConditionFalse(dummy="test-false"), ], ) - assert processing_item.match_detection_item(dummy_processing_pipeline, detection_item) == True + assert processing_item.match_detection_item(detection_item) == True -def test_processingitem_match_detection_item_all_with_false( - dummy_processing_pipeline, detection_item -): +def test_processingitem_match_detection_item_all_with_false(detection_item): processing_item = ProcessingItem( transformation=TransformationAppend(s="Test"), detection_item_condition_linking=all, @@ -392,12 +391,10 @@ def test_processingitem_match_detection_item_all_with_false( DetectionItemConditionFalse(dummy="test-false"), ], ) - assert processing_item.match_detection_item(dummy_processing_pipeline, detection_item) == False + assert processing_item.match_detection_item(detection_item) == False -def test_processingitem_match_detection_item_any_without_true( - dummy_processing_pipeline, detection_item -): +def test_processingitem_match_detection_item_any_without_true(detection_item): processing_item = ProcessingItem( transformation=TransformationAppend(s="Test"), detection_item_condition_linking=any, @@ -406,12 +403,10 @@ def test_processingitem_match_detection_item_any_without_true( DetectionItemConditionFalse(dummy="test-false"), ], ) - assert processing_item.match_detection_item(dummy_processing_pipeline, detection_item) == False + assert processing_item.match_detection_item(detection_item) == False -def test_processingitem_match_detection_item_negated_true( - dummy_processing_pipeline, detection_item -): +def test_processingitem_match_detection_item_negated_true(detection_item): processing_item = ProcessingItem( transformation=TransformationAppend(s="Test"), detection_item_condition_negation=True, @@ -419,12 +414,10 @@ def test_processingitem_match_detection_item_negated_true( DetectionItemConditionTrue(dummy="test-true"), ], ) - assert processing_item.match_detection_item(dummy_processing_pipeline, detection_item) == False + assert processing_item.match_detection_item(detection_item) == False -def test_processingitem_match_detection_item_negated_false( - dummy_processing_pipeline, detection_item -): +def test_processingitem_match_detection_item_negated_false(detection_item): processing_item = ProcessingItem( transformation=TransformationAppend(s="Test"), detection_item_condition_negation=True, @@ -432,7 +425,7 @@ def test_processingitem_match_detection_item_negated_false( DetectionItemConditionFalse(dummy="test-false"), ], ) - assert processing_item.match_detection_item(dummy_processing_pipeline, detection_item) + assert processing_item.match_detection_item(detection_item) def test_processingitem_rule_condition_nolist(): @@ -487,19 +480,15 @@ def test_postprocessingitem_fromdict(postprocessing_item_dict, postprocessing_it assert QueryPostprocessingItem.from_dict(postprocessing_item_dict) == postprocessing_item -def test_postprocessingitem_apply( - postprocessing_item: QueryPostprocessingItem, dummy_processing_pipeline, sigma_rule -): - postprocessing_item.apply( - dummy_processing_pipeline, sigma_rule, "field=value" - ) == "[ field=value ]" +def test_postprocessingitem_apply(postprocessing_item: QueryPostprocessingItem, sigma_rule): + postprocessing_item.apply(sigma_rule, "field=value") == "[ field=value ]" def test_postprocessingitem_apply_false_condition( - postprocessing_item: QueryPostprocessingItem, dummy_processing_pipeline, sigma_rule, monkeypatch + postprocessing_item: QueryPostprocessingItem, sigma_rule, monkeypatch ): monkeypatch.setattr(postprocessing_item, "rule_conditions", [RuleConditionFalse(dummy="test")]) - assert postprocessing_item.apply(dummy_processing_pipeline, sigma_rule, "field=value") == ( + assert postprocessing_item.apply(sigma_rule, "field=value") == ( "field=value", False, ) diff --git a/tests/test_processing_transformations.py b/tests/test_processing_transformations.py index 347820ff..0f342ddf 100644 --- a/tests/test_processing_transformations.py +++ b/tests/test_processing_transformations.py @@ -216,6 +216,13 @@ def sigma_rule_placeholders_only(): ) +def test_transformaion_multiple_pipelines_set(dummy_pipeline): + transformation = DropDetectionItemTransformation() + transformation.set_pipeline(dummy_pipeline) + with pytest.raises(SigmaTransformationError, match="Pipeline.*was already set"): + transformation.set_pipeline(dummy_pipeline) + + def test_field_mapping_from_dict(): mapping = { "single": "single_mapping", @@ -250,7 +257,8 @@ def field_mapping_transformation_sigma_rule( identifier="test", ) ) - field_mapping_transformation.apply(dummy_pipeline, sigma_rule) + field_mapping_transformation.set_pipeline(dummy_pipeline) + field_mapping_transformation.apply(sigma_rule) return (field_mapping_transformation, sigma_rule) @@ -286,7 +294,8 @@ def test_field_mapping(field_mapping_transformation_sigma_rule): def test_field_mapping_correlation_rule( dummy_pipeline, sigma_correlation_rule, field_mapping_transformation ): - field_mapping_transformation.apply(dummy_pipeline, sigma_correlation_rule) + field_mapping_transformation.set_pipeline(dummy_pipeline) + field_mapping_transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule.group_by == ["testalias", "field2", "fieldC", "fieldD"] assert sigma_correlation_rule.aliases.aliases["testalias"] == SigmaCorrelationFieldAlias( alias="testalias", @@ -302,7 +311,8 @@ def test_field_mapping_correlation_rule_no_condition_fieldref( monkeypatch, dummy_pipeline, sigma_correlation_rule, field_mapping_transformation ): monkeypatch.setattr(sigma_correlation_rule.condition, "fieldref", None) - field_mapping_transformation.apply(dummy_pipeline, sigma_correlation_rule) + field_mapping_transformation.set_pipeline(dummy_pipeline) + field_mapping_transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule.group_by == ["testalias", "field2", "fieldC", "fieldD"] assert sigma_correlation_rule.aliases.aliases["testalias"] == SigmaCorrelationFieldAlias( alias="testalias", @@ -318,7 +328,8 @@ def test_field_mapping_correlation_rule_no_condition( monkeypatch, dummy_pipeline, sigma_correlation_rule, field_mapping_transformation ): monkeypatch.setattr(sigma_correlation_rule, "condition", None) - field_mapping_transformation.apply(dummy_pipeline, sigma_correlation_rule) + field_mapping_transformation.set_pipeline(dummy_pipeline) + field_mapping_transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule.group_by == ["testalias", "field2", "fieldC", "fieldD"] assert sigma_correlation_rule.aliases.aliases["testalias"] == SigmaCorrelationFieldAlias( alias="testalias", @@ -335,7 +346,8 @@ def test_field_mapping_correlation_rule_no_groupby( ): monkeypatch.setattr(sigma_correlation_rule, "group_by", None) monkeypatch.setattr(sigma_correlation_rule, "aliases", SigmaCorrelationFieldAliases()) - field_mapping_transformation.apply(dummy_pipeline, sigma_correlation_rule) + field_mapping_transformation.set_pipeline(dummy_pipeline) + field_mapping_transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule.group_by is None assert sigma_correlation_rule.aliases == SigmaCorrelationFieldAliases() assert sigma_correlation_rule.condition.fieldref == "fieldA" @@ -345,7 +357,8 @@ def test_field_mapping_correlation_rule_no_alias( monkeypatch, dummy_pipeline, sigma_correlation_rule, field_mapping_transformation ): monkeypatch.setattr(sigma_correlation_rule, "aliases", SigmaCorrelationFieldAliases()) - field_mapping_transformation.apply(dummy_pipeline, sigma_correlation_rule) + field_mapping_transformation.set_pipeline(dummy_pipeline) + field_mapping_transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule.group_by == ["something_different", "field2", "fieldC", "fieldD"] assert sigma_correlation_rule.aliases == SigmaCorrelationFieldAliases() assert sigma_correlation_rule.condition.fieldref == "fieldA" @@ -360,7 +373,8 @@ def test_field_mapping_correlation_rule_multiple_alias_mappings( "field3", ) with pytest.raises(SigmaConfigurationError, match="rule alias mapping.*multiple field names"): - field_mapping_transformation.apply(dummy_pipeline, sigma_correlation_rule) + field_mapping_transformation.set_pipeline(dummy_pipeline) + field_mapping_transformation.apply(sigma_correlation_rule) def test_field_mapping_correlation_rule_multiple_condition_mappings( @@ -368,7 +382,8 @@ def test_field_mapping_correlation_rule_multiple_condition_mappings( ): monkeypatch.setattr(sigma_correlation_rule.condition, "fieldref", "field3") with pytest.raises(SigmaConfigurationError, match="rule condition field.*multiple field names"): - field_mapping_transformation.apply(dummy_pipeline, sigma_correlation_rule) + field_mapping_transformation.set_pipeline(dummy_pipeline) + field_mapping_transformation.apply(sigma_correlation_rule) def test_field_mapping_tracking(field_mapping_transformation_sigma_rule): @@ -441,7 +456,8 @@ def test_field_prefix_mapping(dummy_pipeline, field_prefix_mapping_transformatio ], } ) - field_prefix_mapping_transformation.apply(dummy_pipeline, sigma_rule) + field_prefix_mapping_transformation.set_pipeline(dummy_pipeline) + field_prefix_mapping_transformation.apply(sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -509,7 +525,8 @@ def test_field_prefix_mapping_correlation_rule( }, } ) - field_prefix_mapping_transformation.apply(dummy_pipeline, sigma_correlation_rule) + field_prefix_mapping_transformation.set_pipeline(dummy_pipeline) + field_prefix_mapping_transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule.group_by == [ "testalias", "mapped1.field", @@ -533,7 +550,8 @@ def test_drop_detection_item_transformation(sigma_rule: SigmaRule, dummy_pipelin transformation, field_name_conditions=[IncludeFieldCondition(fields=["field2"])], ) - processing_item.apply(dummy_pipeline, sigma_rule) + processing_item.set_pipeline(dummy_pipeline) + processing_item.apply(sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -580,7 +598,8 @@ def test_drop_detection_item_transformation(sigma_rule: SigmaRule, dummy_pipelin transformation, field_name_conditions=[IncludeFieldCondition(fields=["field2"])], ) - processing_item.apply(dummy_pipeline, sigma_rule) + processing_item.set_pipeline(dummy_pipeline) + processing_item.apply(sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -598,7 +617,8 @@ def test_drop_detection_item_transformation_correlation_rule( ): transformation = DropDetectionItemTransformation() orig_correlation_rule = deepcopy(sigma_correlation_rule) - transformation.apply(dummy_pipeline, sigma_correlation_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule == orig_correlation_rule @@ -608,7 +628,8 @@ def test_drop_detection_item_transformation_all(sigma_rule: SigmaRule, dummy_pip transformation, field_name_conditions=[IncludeFieldCondition(fields=["field1", "field2", "field3"])], ) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.detection.detections["test"].detection_items[0].detection_items == [] @@ -622,7 +643,8 @@ def add_fieldname_suffix_transformation(): def test_add_fieldname_suffix(dummy_pipeline, sigma_rule, add_fieldname_suffix_transformation): - add_fieldname_suffix_transformation.apply(dummy_pipeline, sigma_rule) + add_fieldname_suffix_transformation.set_pipeline(dummy_pipeline) + add_fieldname_suffix_transformation.apply(sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -646,7 +668,8 @@ def test_add_fieldname_suffix(dummy_pipeline, sigma_rule, add_fieldname_suffix_t def test_add_fieldname_suffix_keyword( dummy_pipeline, keyword_sigma_rule, add_fieldname_suffix_transformation ): - add_fieldname_suffix_transformation.apply(dummy_pipeline, keyword_sigma_rule) + add_fieldname_suffix_transformation.set_pipeline(dummy_pipeline) + add_fieldname_suffix_transformation.apply(keyword_sigma_rule) assert keyword_sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetectionItem( @@ -670,7 +693,8 @@ def test_add_fieldname_suffix_tracking( field_name_conditions=[IncludeFieldCondition("field1")], identifier="test", ) - processing_item.apply(dummy_pipeline, sigma_rule) + processing_item.set_pipeline(dummy_pipeline) + processing_item.apply(sigma_rule) detection_items = sigma_rule.detection.detections["test"].detection_items[0].detection_items assert detection_items == [ SigmaDetectionItem("field1.test", [], [SigmaString("value1")]), @@ -689,7 +713,8 @@ def test_add_fieldname_suffix_tracking( def test_add_fieldname_suffix_transformation_correlation_rule( sigma_correlation_rule, dummy_pipeline, add_fieldname_suffix_transformation ): - add_fieldname_suffix_transformation.apply(dummy_pipeline, sigma_correlation_rule) + add_fieldname_suffix_transformation.set_pipeline(dummy_pipeline) + add_fieldname_suffix_transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule.group_by == ["testalias", "field2.test", "field3.test"] assert sigma_correlation_rule.aliases.aliases["testalias"] == SigmaCorrelationFieldAlias( alias="testalias", @@ -711,7 +736,8 @@ def add_fieldname_prefix_transformation(): def test_add_fieldname_prefix(dummy_pipeline, sigma_rule, add_fieldname_prefix_transformation): - add_fieldname_prefix_transformation.apply(dummy_pipeline, sigma_rule) + add_fieldname_prefix_transformation.set_pipeline(dummy_pipeline) + add_fieldname_prefix_transformation.apply(sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -735,7 +761,8 @@ def test_add_fieldname_prefix(dummy_pipeline, sigma_rule, add_fieldname_prefix_t def test_add_fieldname_prefix_keyword( dummy_pipeline, keyword_sigma_rule, add_fieldname_prefix_transformation ): - add_fieldname_prefix_transformation.apply(dummy_pipeline, keyword_sigma_rule) + add_fieldname_prefix_transformation.set_pipeline(dummy_pipeline) + add_fieldname_prefix_transformation.apply(keyword_sigma_rule) assert keyword_sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetectionItem( @@ -759,7 +786,8 @@ def test_add_fieldname_prefix_tracking( field_name_conditions=[IncludeFieldCondition("field1")], identifier="test", ) - processing_item.apply(dummy_pipeline, sigma_rule) + processing_item.set_pipeline(dummy_pipeline) + processing_item.apply(sigma_rule) detection_items = sigma_rule.detection.detections["test"].detection_items[0].detection_items assert detection_items == [ SigmaDetectionItem("test.field1", [], [SigmaString("value1")]), @@ -778,7 +806,8 @@ def test_add_fieldname_prefix_tracking( def test_add_fieldname_prefix_correlation_rule( sigma_correlation_rule, dummy_pipeline, add_fieldname_prefix_transformation ): - add_fieldname_prefix_transformation.apply(dummy_pipeline, sigma_correlation_rule) + add_fieldname_prefix_transformation.set_pipeline(dummy_pipeline) + add_fieldname_prefix_transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule.group_by == ["testalias", "test.field2", "test.field3"] assert sigma_correlation_rule.aliases.aliases["testalias"] == SigmaCorrelationFieldAlias( alias="testalias", @@ -825,7 +854,8 @@ def test_fields_list_mapping_with_detection_item_condition(sigma_rule: SigmaRule def test_wildcard_placeholders(dummy_pipeline, sigma_rule_placeholders: SigmaRule): transformation = WildcardPlaceholderTransformation() - transformation.apply(dummy_pipeline, sigma_rule_placeholders) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule_placeholders) assert sigma_rule_placeholders.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -850,7 +880,8 @@ def test_wildcard_placeholders(dummy_pipeline, sigma_rule_placeholders: SigmaRul def test_wildcard_placeholders_correlation_rule(sigma_correlation_rule, dummy_pipeline): orig_correlation_rule = deepcopy(sigma_correlation_rule) transformation = WildcardPlaceholderTransformation() - transformation.apply(dummy_pipeline, sigma_correlation_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule == orig_correlation_rule @@ -867,7 +898,8 @@ def test_wildcard_placeholders_included(dummy_pipeline, sigma_rule_placeholders: identifier="test", ) ) - transformation.apply(dummy_pipeline, sigma_rule_placeholders) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule_placeholders) detection_items = ( sigma_rule_placeholders.detection.detections["test"].detection_items[0].detection_items ) @@ -900,7 +932,8 @@ def test_wildcard_placeholders_excluded(dummy_pipeline, sigma_rule_placeholders: identifier="test", ) ) - transformation.apply(dummy_pipeline, sigma_rule_placeholders) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule_placeholders) detection_items = ( sigma_rule_placeholders.detection.detections["test"].detection_items[0].detection_items ) @@ -927,7 +960,8 @@ def test_wildcard_placeholders_excluded(dummy_pipeline, sigma_rule_placeholders: def test_wildcard_placeholders_without_placeholders(dummy_pipeline, sigma_rule: SigmaRule): transformation = WildcardPlaceholderTransformation() - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -944,7 +978,8 @@ def test_wildcard_placeholders_without_placeholders(dummy_pipeline, sigma_rule: def test_valuelist_placeholders(sigma_rule_placeholders_simple: SigmaRule): transformation = ValueListPlaceholderTransformation() pipeline = ProcessingPipeline(vars={"var1": ["val1", 123], "var2": "val3*"}) - transformation.apply(pipeline, sigma_rule_placeholders_simple) + transformation.set_pipeline(pipeline) + transformation.apply(sigma_rule_placeholders_simple) assert sigma_rule_placeholders_simple.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -966,22 +1001,25 @@ def test_valuelist_placeholders(sigma_rule_placeholders_simple: SigmaRule): def test_valuelist_placeholders_correlation_rule(sigma_correlation_rule, dummy_pipeline): orig_correlation_rule = deepcopy(sigma_correlation_rule) transformation = ValueListPlaceholderTransformation() - transformation.apply(dummy_pipeline, sigma_correlation_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule == orig_correlation_rule def test_valuelist_placeholders_missing(sigma_rule_placeholders_simple: SigmaRule): transformation = ValueListPlaceholderTransformation() pipeline = ProcessingPipeline([], vars={"var1": "val1"}) + transformation.set_pipeline(pipeline) with pytest.raises(SigmaValueError, match="doesn't exist"): - transformation.apply(pipeline, sigma_rule_placeholders_simple) + transformation.apply(sigma_rule_placeholders_simple) def test_valuelist_placeholders_wrong_type(sigma_rule_placeholders_simple: SigmaRule): transformation = ValueListPlaceholderTransformation() pipeline = ProcessingPipeline(vars={"var1": None}) + transformation.set_pipeline(pipeline) with pytest.raises(SigmaValueError, match="not a string or number"): - transformation.apply(pipeline, sigma_rule_placeholders_simple) + transformation.apply(sigma_rule_placeholders_simple) def test_queryexpr_placeholders(dummy_pipeline, sigma_rule_placeholders_only: SigmaRule): @@ -989,7 +1027,8 @@ def test_queryexpr_placeholders(dummy_pipeline, sigma_rule_placeholders_only: Si transformation = QueryExpressionPlaceholderTransformation( expression=expr, mapping={"var2": "placeholder2"} ) - transformation.apply(dummy_pipeline, sigma_rule_placeholders_only) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule_placeholders_only) assert sigma_rule_placeholders_only.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -1023,7 +1062,8 @@ def test_queryexpr_placeholders_correlation_rule(sigma_correlation_rule, dummy_p transformation = QueryExpressionPlaceholderTransformation( expression="{field} lookup {id}", mapping={"var2": "placeholder2"} ) - transformation.apply(dummy_pipeline, sigma_correlation_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule == orig_correlation_rule @@ -1031,7 +1071,8 @@ def test_queryexpr_placeholders_without_placeholders(dummy_pipeline, sigma_rule: transformation = QueryExpressionPlaceholderTransformation( expression="{field} lookup {id}", ) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -1050,7 +1091,8 @@ def test_queryexpr_placeholders_mixed_string(dummy_pipeline, sigma_rule_placehol expression="{field} lookup {id}", ) with pytest.raises(SigmaValueError, match="only allows placeholder-only strings"): - transformation.apply(dummy_pipeline, sigma_rule_placeholders) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule_placeholders) ### ConditionTransformation ### @@ -1073,7 +1115,8 @@ def test_conditiontransformation_tracking_change(dummy_pipeline, sigma_rule: Sig identifier="test", ) ) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.detection.parsed_condition[0].was_processed_by( "test" ) and sigma_rule.was_processed_by("test") @@ -1087,7 +1130,8 @@ def test_conditiontransformation_tracking_nochange(dummy_pipeline, sigma_rule: S identifier="test", ) ) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert not sigma_rule.detection.parsed_condition[0].was_processed_by( "test" ) and sigma_rule.was_processed_by("test") @@ -1110,7 +1154,8 @@ def test_addconditiontransformation(dummy_pipeline, sigma_rule: SigmaRule): identifier="test", ) ) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert ( sigma_rule.detection.parsed_condition[0].condition == "additional and (test)" # condition expression was added @@ -1139,7 +1184,8 @@ def test_addconditiontransformation_correlation_rule(sigma_correlation_rule, dum }, "additional", ) - transformation.apply(dummy_pipeline, sigma_correlation_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule == orig_correlation_rule @@ -1159,7 +1205,8 @@ def test_addconditiontransformation_template(dummy_pipeline, sigma_rule: SigmaRu identifier="test", ) ) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert ( sigma_rule.detection.parsed_condition[0].condition == "additional and (test)" # condition expression was added @@ -1202,7 +1249,8 @@ def test_addconditiontransformation_negated(dummy_pipeline, sigma_rule: SigmaRul identifier="test", ) ) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert ( sigma_rule.detection.parsed_condition[0].condition == "not additional and (test)" # negated condition expression was added @@ -1215,7 +1263,8 @@ def test_changelogsource(dummy_pipeline, sigma_rule: SigmaRule): ChangeLogsourceTransformation("test_category", "test_product", "test_service"), identifier="test", ) - processing_item.apply(dummy_pipeline, sigma_rule) + processing_item.set_pipeline(dummy_pipeline) + processing_item.apply(sigma_rule) assert sigma_rule.logsource == SigmaLogSource( "test_category", "test_product", "test_service" @@ -1225,13 +1274,15 @@ def test_changelogsource(dummy_pipeline, sigma_rule: SigmaRule): def test_changelogsource_correlation_rule(sigma_correlation_rule, dummy_pipeline): orig_correlation_rule = deepcopy(sigma_correlation_rule) transformation = ChangeLogsourceTransformation("test_category", "test_product", "test_service") - transformation.apply(dummy_pipeline, sigma_correlation_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule == orig_correlation_rule def test_add_fields_transformation_single(dummy_pipeline, sigma_rule): transformation = AddFieldTransformation("added_field") - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.fields == [ "otherfield1", "field1", @@ -1244,7 +1295,8 @@ def test_add_fields_transformation_single(dummy_pipeline, sigma_rule): def test_add_fields_transformation_multiple(dummy_pipeline, sigma_rule): transformation = AddFieldTransformation(["added_field1", "added_field2"]) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.fields == [ "otherfield1", "field1", @@ -1258,7 +1310,8 @@ def test_add_fields_transformation_multiple(dummy_pipeline, sigma_rule): def test_remove_fields_transformation_single(dummy_pipeline, sigma_rule): transformation = RemoveFieldTransformation("field1") - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.fields == [ "otherfield1", "field2", @@ -1269,7 +1322,8 @@ def test_remove_fields_transformation_single(dummy_pipeline, sigma_rule): def test_remove_fields_transformation_multiple(dummy_pipeline, sigma_rule): transformation = RemoveFieldTransformation(["field1", "field3"]) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.fields == [ "otherfield1", "field2", @@ -1279,7 +1333,8 @@ def test_remove_fields_transformation_multiple(dummy_pipeline, sigma_rule): def test_remove_fields_transformation_single_nonexistent(dummy_pipeline, sigma_rule): transformation = RemoveFieldTransformation("nonexistent_field") - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.fields == [ "otherfield1", "field1", @@ -1293,7 +1348,8 @@ def test_remove_fields_transformation_multiple_nonexistent(dummy_pipeline, sigma transformation = RemoveFieldTransformation( ["nonexistent_field1", "field1", "nonexistent_field2"] ) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.fields == [ "otherfield1", "field2", @@ -1304,13 +1360,15 @@ def test_remove_fields_transformation_multiple_nonexistent(dummy_pipeline, sigma def test_set_fields_transformation(dummy_pipeline, sigma_rule): transformation = SetFieldTransformation(["field1", "field2", "field3"]) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.fields == ["field1", "field2", "field3"] def test_replace_string_simple(dummy_pipeline, sigma_rule: SigmaRule): transformation = ReplaceStringTransformation("value", "test") - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -1341,7 +1399,8 @@ def test_replace_string_specials(dummy_pipeline): } ) transformation = ReplaceStringTransformation("^.*\\\\", "/") - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -1371,7 +1430,8 @@ def test_replace_string_placeholder(dummy_pipeline): assert s_before == SigmaString("foo%var%bar").insert_placeholders() transformation = ReplaceStringTransformation("bar", "test") - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) s = sigma_rule.detection.detections["test"].detection_items[0].value[0] assert s == SigmaString("foo%var%test").insert_placeholders() @@ -1393,7 +1453,8 @@ def test_replace_string_no_placeholder(dummy_pipeline): assert s_before == SigmaString("foo%var%bar") transformation = ReplaceStringTransformation("bar", "test") - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) s = sigma_rule.detection.detections["test"].detection_items[0].value[0] assert s == SigmaString("foo%var%test") @@ -1415,7 +1476,8 @@ def test_replace_string_skip_specials(dummy_pipeline): } ) transformation = ReplaceStringTransformation("^.*\\\\", "/?/", True) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -1445,7 +1507,8 @@ def test_replace_string_skip_specials_with_interpret_specials(dummy_pipeline): } ) transformation = ReplaceStringTransformation("^.*\\\\", "/?/", True, True) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -1476,7 +1539,8 @@ def test_replace_string_backslashes(dummy_pipeline): } ) transformation = ReplaceStringTransformation("value", "test") - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -1498,7 +1562,8 @@ def test_replace_string_invalid(): def test_replace_string_correlation_rule(sigma_correlation_rule, dummy_pipeline): orig_correlation_rule = deepcopy(sigma_correlation_rule) transformation = ReplaceStringTransformation("value", "test") - transformation.apply(dummy_pipeline, sigma_correlation_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule == orig_correlation_rule @@ -1513,7 +1578,8 @@ def map_string_transformation(): def test_map_string_transformation(dummy_pipeline, sigma_rule, map_string_transformation): - map_string_transformation.apply(dummy_pipeline, sigma_rule) + map_string_transformation.set_pipeline(dummy_pipeline) + map_string_transformation.apply(sigma_rule) assert sigma_rule.detection.detections["test"] == SigmaDetection( [ SigmaDetection( @@ -1533,7 +1599,8 @@ def test_map_string_transformation_correlation_rule( dummy_pipeline, sigma_correlation_rule, map_string_transformation ): orig_correlation_rule = deepcopy(sigma_correlation_rule) - map_string_transformation.apply(dummy_pipeline, sigma_correlation_rule) + map_string_transformation.set_pipeline(dummy_pipeline) + map_string_transformation.apply(sigma_correlation_rule) assert sigma_correlation_rule == orig_correlation_rule @@ -1672,7 +1739,8 @@ def test_set_state(dummy_pipeline, sigma_rule: SigmaRule): identifier="test", ) ) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert dummy_pipeline.state == {"testkey": "testvalue"} assert sigma_rule.was_processed_by("test") @@ -1685,7 +1753,8 @@ def test_set_state_correlation_rule(sigma_correlation_rule, dummy_pipeline): identifier="test", ) ) - transformation.apply(dummy_pipeline, sigma_correlation_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_correlation_rule) assert dummy_pipeline.state == {"testkey": "testvalue"} assert sigma_correlation_rule.was_processed_by("test") @@ -1693,26 +1762,30 @@ def test_set_state_correlation_rule(sigma_correlation_rule, dummy_pipeline): def test_rule_failure_transformation(dummy_pipeline, sigma_rule): transformation = RuleFailureTransformation("Test") with pytest.raises(SigmaTransformationError, match="^Test$"): - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) def test_rule_failure_transformation_correlation_rule(dummy_pipeline, sigma_correlation_rule): transformation = RuleFailureTransformation("Test") with pytest.raises(SigmaTransformationError, match="^Test$"): - transformation.apply(dummy_pipeline, sigma_correlation_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_correlation_rule) def test_detection_item_failure_transformation(dummy_pipeline, sigma_rule): transformation = DetectionItemFailureTransformation("Test") with pytest.raises(SigmaTransformationError, match="^Test$"): - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) def test_set_custom_attribute(dummy_pipeline, sigma_rule): transformation = SetCustomAttributeTransformation("custom_key", "custom_value") transformation.set_processing_item(ProcessingItem(transformation, identifier="test")) - transformation.apply(dummy_pipeline, sigma_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_rule) assert "custom_key" in sigma_rule.custom_attributes assert sigma_rule.custom_attributes["custom_key"] == "custom_value" assert sigma_rule.was_processed_by("test") @@ -1722,7 +1795,8 @@ def test_set_custom_attribute_correlation_rule(dummy_pipeline, sigma_correlation transformation = SetCustomAttributeTransformation("custom_key", "custom_value") transformation.set_processing_item(ProcessingItem(transformation, identifier="test")) - transformation.apply(dummy_pipeline, sigma_correlation_rule) + transformation.set_pipeline(dummy_pipeline) + transformation.apply(sigma_correlation_rule) assert "custom_key" in sigma_correlation_rule.custom_attributes assert sigma_correlation_rule.custom_attributes["custom_key"] == "custom_value" assert sigma_correlation_rule.was_processed_by("test") @@ -1770,7 +1844,8 @@ def test_nested_pipeline_transformation_from_dict(nested_pipeline_transformation def test_nested_pipeline_transformation_from_dict_apply( dummy_pipeline, sigma_rule, nested_pipeline_transformation ): - nested_pipeline_transformation.apply(dummy_pipeline, sigma_rule) + nested_pipeline_transformation.set_pipeline(dummy_pipeline) + nested_pipeline_transformation.apply(sigma_rule) assert sigma_rule.title == "TestTest" assert sigma_rule.was_processed_by("test")