Skip to content

Commit

Permalink
Check invalid pipeline keys
Browse files Browse the repository at this point in the history
  • Loading branch information
frack113 committed Nov 15, 2024
1 parent 3f88199 commit 44e45af
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
18 changes: 18 additions & 0 deletions sigma/processing/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,24 @@ def _clear_pipeline(self):
@classmethod
def from_dict(cls, d: dict) -> "ProcessingPipeline":
"""Instantiate processing pipeline from a parsed processing item description."""

custom_keys = [
k
for k in d.keys()
if k
not in (
"vars",
"transformations",
"postprocessing",
"finalizers",
"priority",
"name",
"allowed_backends",
)
]
if custom_keys:
raise SigmaConfigurationError(f"Unkown keys {custom_keys}")

vars = d.get("vars", dict()) # default: no variables

items = d.get("transformations", list()) # default: no transformation
Expand Down
30 changes: 30 additions & 0 deletions tests/test_processing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
ProcessingPipeline,
ProcessingItem,
QueryPostprocessingItem,
SigmaPipelineParsingError,
)
from sigma.processing.transformations import transformations
from sigma.processing.conditions import (
Expand Down Expand Up @@ -605,6 +606,35 @@ def test_processingpipeline_fromyaml(
)


def test_processingpipeline_fromyaml_invalid(
processing_item_dict, processing_item, postprocessing_item, processing_pipeline_vars
):
with pytest.raises(
SigmaPipelineParsingError, match="Error in parsing of a Sigma processing pipeline"
):
ProcessingPipeline.from_yaml(
"""
{not a yaml
"""
)


def test_processingpipeline_fromyaml_unknown(
processing_item_dict, processing_item, postprocessing_item, processing_pipeline_vars
):
with pytest.raises(SigmaConfigurationError, match="Unkown keys \['transformation'\]"):
ProcessingPipeline.from_yaml(
"""
name: unknown
priority: 10
transformation:
- id: test
type: test
method: test
"""
)


def test_processingpipeline_fromdict_error(processing_item_dict_with_error):
with pytest.raises(SigmaConfigurationError, match="Error in processing rule 1:.*2"):
ProcessingPipeline.from_dict(
Expand Down

0 comments on commit 44e45af

Please sign in to comment.