diff --git a/splunk_connect_for_snmp_poller/manager/hec_sender.py b/splunk_connect_for_snmp_poller/manager/hec_sender.py index f18d721..87c1f1d 100644 --- a/splunk_connect_for_snmp_poller/manager/hec_sender.py +++ b/splunk_connect_for_snmp_poller/manager/hec_sender.py @@ -93,8 +93,8 @@ def post_data_to_splunk_hec( host, variables_binds, index, - one_time_flag, - mib_enricher, + one_time_flag=one_time_flag, + mib_enricher=mib_enricher, ) hec_sender.send_hec_request(is_metric, data) diff --git a/splunk_connect_for_snmp_poller/manager/poller_utilities.py b/splunk_connect_for_snmp_poller/manager/poller_utilities.py index 98fe377..3a0a879 100644 --- a/splunk_connect_for_snmp_poller/manager/poller_utilities.py +++ b/splunk_connect_for_snmp_poller/manager/poller_utilities.py @@ -14,7 +14,6 @@ # limitations under the License. # import csv -import json import logging.config import threading @@ -77,7 +76,7 @@ def iterate_through_unwalked_hosts_scheduler( inventory_record, server_config, splunk_indexes, - json.dumps(OnetimeFlag.AFTER_FAIL), + OnetimeFlag.AFTER_FAIL.value, ) @@ -85,10 +84,9 @@ def onetime_task( inventory_record: InventoryRecord, server_config, splunk_indexes, - one_time_flag=json.dumps(OnetimeFlag.FIRST_WALK), + one_time_flag=OnetimeFlag.FIRST_WALK.value, ): logger.debug("Executing onetime_task for %s", inventory_record.__repr__()) - snmp_polling.delay( inventory_record.to_json(), server_config, diff --git a/splunk_connect_for_snmp_poller/manager/task_utilities.py b/splunk_connect_for_snmp_poller/manager/task_utilities.py index c80c80f..013c587 100644 --- a/splunk_connect_for_snmp_poller/manager/task_utilities.py +++ b/splunk_connect_for_snmp_poller/manager/task_utilities.py @@ -259,8 +259,8 @@ async def snmp_get_handler( index, ir, additional_metric_fields, - one_time_flag, - mib_enricher, + one_time_flag=OnetimeFlag.is_a_walk(one_time_flag), + mib_enricher=mib_enricher, ) else: is_error, result = prepare_error_message( @@ -275,7 +275,7 @@ async def snmp_get_handler( index, ir, additional_metric_fields, - one_time_flag, + one_time_flag=OnetimeFlag.is_a_walk(one_time_flag), is_error=is_error, ) @@ -344,7 +344,7 @@ def _any_walk_failure_happened( index, ir, additional_metric_fields, - one_time_flag, + one_time_flag=one_time_flag, is_error=is_error, ) @@ -419,8 +419,8 @@ async def snmp_bulk_handler( index, ir, additional_metric_fields, - one_time_flag, - mib_enricher, + one_time_flag=OnetimeFlag.is_a_walk(one_time_flag), + mib_enricher=mib_enricher, ) else: is_error, result = prepare_error_message( @@ -435,7 +435,7 @@ async def snmp_bulk_handler( index, ir, additional_metric_fields, - one_time_flag, + one_time_flag=OnetimeFlag.is_a_walk(one_time_flag), is_error=is_error, ) break @@ -479,7 +479,7 @@ async def walk_handler( errorIndex, host, index, - one_time_flag, + OnetimeFlag.is_a_walk(one_time_flag), is_metric, ir, additional_metric_fields, @@ -498,9 +498,9 @@ async def walk_handler( index, ir, additional_metric_fields, - one_time_flag, + one_time_flag=OnetimeFlag.is_a_walk(one_time_flag), ) - if one_time_flag: + if OnetimeFlag.is_a_walk(one_time_flag): process_one_time_flag( one_time_flag, error_in_one_time_walk, @@ -530,12 +530,9 @@ def extract_data_to_mongo(host, port, mongo_connection, var_binds): def process_one_time_flag( one_time_flag, error_in_one_time_walk, mongo_connection, host, ir ): - if one_time_flag == json.dumps(OnetimeFlag.FIRST_WALK) and error_in_one_time_walk: + if one_time_flag == OnetimeFlag.FIRST_WALK.value and error_in_one_time_walk: mongo_connection.add_onetime_walk_result(host, ir.version, ir.community) - if ( - one_time_flag == json.dumps(OnetimeFlag.AFTER_FAIL) - and not error_in_one_time_walk - ): + if one_time_flag == OnetimeFlag.AFTER_FAIL.value and not error_in_one_time_walk: mongo_connection.delete_onetime_walk_result(host) @@ -580,13 +577,13 @@ async def walk_handler_with_enricher( errorIndex, host, index, - one_time_flag, + OnetimeFlag.is_a_walk(one_time_flag), is_metric, ir, additional_metric_fields, var_binds, ): - if one_time_flag: + if OnetimeFlag.is_a_walk(one_time_flag): error_in_one_time_walk = True break else: @@ -608,9 +605,9 @@ async def walk_handler_with_enricher( index, ir, additional_metric_fields, - one_time_flag, + one_time_flag=OnetimeFlag.is_a_walk(one_time_flag), ) - if one_time_flag: + if OnetimeFlag.is_a_walk(one_time_flag): process_one_time_flag( one_time_flag, error_in_one_time_walk, diff --git a/splunk_connect_for_snmp_poller/manager/tasks.py b/splunk_connect_for_snmp_poller/manager/tasks.py index 1336685..55daa5f 100644 --- a/splunk_connect_for_snmp_poller/manager/tasks.py +++ b/splunk_connect_for_snmp_poller/manager/tasks.py @@ -25,6 +25,7 @@ from splunk_connect_for_snmp_poller.manager.hec_sender import HecSender from splunk_connect_for_snmp_poller.manager.realtime.oid_constant import OidConstant from splunk_connect_for_snmp_poller.manager.task_utilities import ( + OnetimeFlag, VarbindCollection, build_authData, build_contextData, @@ -117,10 +118,16 @@ def sort_varbinds(varbind_list: list) -> VarbindCollection: return casted_multikey_elements -# TODO remove the debugging statement later @app.task(ignore_result=True) -def snmp_polling(ir_json: str, server_config, index, profiles, one_time_flag=False): +def snmp_polling( + ir_json: str, + server_config, + index, + profiles, + one_time_flag=OnetimeFlag.NOT_A_WALK.value, +): ir = InventoryRecord.from_json(ir_json) + logger.info(f"Got one_time_flag - {one_time_flag} with Ir - {ir.__repr__()}") async_to_sync(snmp_polling_async)(ir, server_config, index, profiles, one_time_flag) @@ -128,7 +135,7 @@ def snmp_polling(ir_json: str, server_config, index, profiles, one_time_flag=Fal async def snmp_polling_async( - ir: InventoryRecord, server_config, index, profiles, one_time_flag=False + ir: InventoryRecord, server_config, index, profiles, one_time_flag: str ): hec_sender = HecSender( os.environ["OTEL_SERVER_METRICS_URL"], os.environ["OTEL_SERVER_LOGS_URL"] diff --git a/splunk_connect_for_snmp_poller/utilities.py b/splunk_connect_for_snmp_poller/utilities.py index b80fd6b..cc39626 100644 --- a/splunk_connect_for_snmp_poller/utilities.py +++ b/splunk_connect_for_snmp_poller/utilities.py @@ -156,3 +156,17 @@ def multi_key_lookup(dictionary, tuple_of_keys): class OnetimeFlag(str, Enum): FIRST_WALK = "first_time" AFTER_FAIL = "after_fail" + NOT_A_WALK = "not_a_walk" + + @staticmethod + def is_a_walk(value: str) -> bool: + if ( + value == OnetimeFlag.FIRST_WALK.value + or value == OnetimeFlag.AFTER_FAIL.value + ): + return True + elif value == OnetimeFlag.NOT_A_WALK.value: + return False + else: + logger.error(f"Unexpected OneTimeFlag value - {value}") + return False diff --git a/tests/test_task_utilities.py b/tests/test_task_utilities.py index 5cd05fc..dd43999 100644 --- a/tests/test_task_utilities.py +++ b/tests/test_task_utilities.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import json from unittest import TestCase from unittest.mock import MagicMock @@ -186,7 +185,7 @@ def test_process_one_time_flag_first_walk_error(self): mongo.add_onetime_walk_result.return_value = "add_onetime" mongo.delete_onetime_walk_result.return_value = "delete_onetime" process_one_time_flag( - json.dumps(OnetimeFlag.FIRST_WALK), True, mongo, "127.0.0.1:161", ir + OnetimeFlag.FIRST_WALK.value, True, mongo, "127.0.0.1:161", ir ) self.assertTrue(mongo.add_onetime_walk_result.called) self.assertFalse(mongo.delete_onetime_walk_result.called) @@ -197,7 +196,7 @@ def test_process_one_time_first_walk_success(self): mongo.add_onetime_walk_result.return_value = "add_onetime" mongo.delete_onetime_walk_result.return_value = "delete_onetime" process_one_time_flag( - json.dumps(OnetimeFlag.FIRST_WALK), False, mongo, "127.0.0.1:161", ir + OnetimeFlag.FIRST_WALK.value, False, mongo, "127.0.0.1:161", ir ) self.assertFalse(mongo.add_onetime_walk_result.called) self.assertFalse(mongo.delete_onetime_walk_result.called) @@ -208,7 +207,7 @@ def test_process_one_time_flag_after_failure_walk_error(self): mongo.add_onetime_walk_result.return_value = "add_onetime" mongo.delete_onetime_walk_result.return_value = "delete_onetime" process_one_time_flag( - json.dumps(OnetimeFlag.AFTER_FAIL), True, mongo, "127.0.0.1:161", ir + OnetimeFlag.AFTER_FAIL.value, True, mongo, "127.0.0.1:161", ir ) self.assertFalse(mongo.delete_onetime_walk_result.called) self.assertFalse(mongo.add_onetime_walk_result.called) @@ -219,7 +218,7 @@ def test_process_one_time_flag_after_failure_walk_success(self): mongo.add_onetime_walk_result.return_value = "add_onetime" mongo.delete_onetime_walk_result.return_value = "delete_onetime" process_one_time_flag( - json.dumps(OnetimeFlag.AFTER_FAIL), False, mongo, "127.0.0.1:161", ir + OnetimeFlag.AFTER_FAIL.value, False, mongo, "127.0.0.1:161", ir ) self.assertTrue(mongo.delete_onetime_walk_result.called) self.assertFalse(mongo.add_onetime_walk_result.called)