diff --git a/splunk_connect_for_snmp/inventory/tasks.py b/splunk_connect_for_snmp/inventory/tasks.py index 101690a30..2d3b45dea 100644 --- a/splunk_connect_for_snmp/inventory/tasks.py +++ b/splunk_connect_for_snmp/inventory/tasks.py @@ -304,87 +304,88 @@ def create_profile(profile_name, frequency, varbinds, records): return profile -def create_query(conditions: typing.List[dict], address: str) -> dict: - conditional_profiles_mapping = { - "equals": "$eq", - "gt": "$gt", - "lt": "$lt", - "in": "$in", - "regex": "$regex", - } +def convert_to_float(value: typing.Any, ignore_error: bool = False) -> typing.Any: + try: + return float(value) + except ValueError: + if ignore_error: + return value + raise BadlyFormattedFieldError(f"Value '{value}' should be numeric") - negative_profiles_mapping = { - "equals": "$ne", - "gt": "$lte", - "lt": "$gte", - "in": "$nin", - "regex": "$regex", + +def create_query(conditions: typing.List[dict], address: str) -> dict: + # Define mappings for conditional and negative profiles + profile_mappings = { + "positive": { + "equals": "$eq", + "gt": "$gt", + "lt": "$lt", + "in": "$in", + "regex": "$regex", + }, + "negative": { + "equals": "$ne", + "gt": "$lte", + "lt": "$gte", + "in": "$nin", + "regex": "$regex", + }, } + # Helper functions def _parse_mib_component(field: str) -> str: - mib_component = field.split("|") - if len(mib_component) < 2: + components = field.split("|") + if len(components) < 2: raise BadlyFormattedFieldError(f"Field {field} is badly formatted") - return mib_component[0] - - def _convert_to_float(value: typing.Any, ignore_error=False) -> typing.Any: - try: - return float(value) - except ValueError: - if ignore_error: - return value - else: - raise BadlyFormattedFieldError(f"Value '{value}' should be numeric") + return components[0] def _prepare_regex(value: str) -> typing.Union[list, str]: pattern = value.strip("/").split("/") - if len(pattern) > 1: - return pattern - else: - return pattern[0] + return pattern if len(pattern) > 1 else pattern[0] - def _get_value_for_operation(operation: str, value: str) -> typing.Any: - if operation in ["lt", "gt"]: - return _convert_to_float(value) - elif operation == "in": - return [_convert_to_float(v, True) for v in value] - elif operation == "regex": - return _prepare_regex(value) - return value + def _get_value_for_operation(operation: str, value: typing.Any) -> typing.Any: + operation_handlers = { + "lt": lambda v: convert_to_float(v), + "gt": lambda v: convert_to_float(v), + "in": lambda v: [convert_to_float(item, True) for item in v], + "regex": lambda v: _prepare_regex(v), + } + return operation_handlers.get(operation, lambda v: v)(value) def _prepare_query_input( - operation: str, value: typing.Any, field: str, negate_operation: bool + operation: str, value: typing.Any, field: str, negate: bool, mongo_op: str ) -> dict: - if operation == "regex" and isinstance(value, list): - query = {mongo_operation: value[0], "$options": value[1]} - else: - query = {mongo_operation: value} - if operation == "regex" and negate_operation: + query = ( + {mongo_op: value} + if not (operation == "regex" and isinstance(value, list)) + else {mongo_op: value[0], "$options": value[1]} + ) + if operation == "regex" and negate: query = {"$not": query} return {f"fields.{field}.value": query} + # Main processing loop filters = [] - field = "" for condition in conditions: - field = condition["field"] - # fields in databases are written in convention "IF-MIB|ifInOctets" - field = field.replace(".", "|") + field = condition["field"].replace(".", "|") # Standardize field format value = condition["value"] - negate_operation = human_bool( - condition.get("negate_operation", False), default=False - ) + negate = human_bool(condition.get("negate_operation", False), default=False) operation = condition["operation"].lower() - value_for_querying = _get_value_for_operation(operation, value) - mongo_operation = ( - negative_profiles_mapping.get(operation) - if negate_operation - else conditional_profiles_mapping.get(operation) + + # Determine MongoDB operator and prepare query + mongo_op = profile_mappings["negative" if negate else "positive"].get( + operation, "" ) + value_for_query = _get_value_for_operation(operation, value) query = _prepare_query_input( - operation, value_for_querying, field, negate_operation + operation, value_for_query, field, negate, mongo_op ) filters.append(query) + + # Parse MIB component for address matching mib_component = _parse_mib_component(field) + + # Construct final query return { "$and": [ {"address": address}, diff --git a/test/inventory/test_conditional_profiles.py b/test/inventory/test_conditional_profiles.py index 5754f6462..f28515a29 100644 --- a/test/inventory/test_conditional_profiles.py +++ b/test/inventory/test_conditional_profiles.py @@ -396,3 +396,28 @@ def test_create_profile_not_enough_varbinds(self, return_all_profiles): }, result, ) + + def test_convert_to_float(self, return_all_profiles): + from splunk_connect_for_snmp.inventory.tasks import convert_to_float + + val = 1 + result = convert_to_float(val) + self.assertIsInstance(result, float) + + def test_convert_to_float_ignore(self, return_all_profiles): + from splunk_connect_for_snmp.inventory.tasks import convert_to_float + + val = "up" + result = convert_to_float(val, True) + self.assertEqual(result, val) + + def test_convert_to_float_error(self, return_all_profiles): + from splunk_connect_for_snmp.inventory.tasks import ( + BadlyFormattedFieldError, + convert_to_float, + ) + + val = "up" + with self.assertRaises(BadlyFormattedFieldError) as context: + convert_to_float(val) + self.assertEqual("Value 'up' should be numeric", context.exception.args[0])