Skip to content
This repository has been archived by the owner on Dec 17, 2021. It is now read-only.

Commit

Permalink
Feature/redo walk (#95)
Browse files Browse the repository at this point in the history
* feat: ADDON-37033 (add sysUpTimeInstance initial support)
- added some code for handling sysUpTimeInstance and a general
  function that should decide -based on some SNMP data- when a walk
  should be executed.

* feat: ADDON-37033 (add support for mongodb real-time data storage)

* feat: ADDON-37033 (add real-time task that handles SNMP WALK)
- added a new task that will potentially redo a SNMPWALK based on sysUpTimeInstance
- general refactoring (Poller)
- fixed some issues with mongo when retrieving REAL-TIME-DATA-COLLECTION

* feat: ADDON-37033 (add sync SNMPGET)
- added a way to get 1.3.6.1.2.1.1.3.0 before checking whether a WALK is needed

* feat: ADDON-37033 (refactoring)
- created a named method for creating scheduler map keys
  (create_poller_scheduler_entry_key)

* feat: ADDON-37033 (refactoring)
- scheduler was set to one second (fixed that)

* fox: ADDON-37033 (sanitizers #1)

* fox: ADDON-37033 (sanitizers #2)

* fox: ADDON-37033 (sanitizers #3)

* fox: ADDON-37033 (sanitizers #4)

* fox: ADDON-37033 (code-review #1)

* fox: ADDON-37033 (code-review #2)
- again lint was complaining about non-sorted imports

* fox: ADDON-37033 (code-review #3)
- we were calling int(frequency) even if frequency was already an int.
  • Loading branch information
lstoppa authored Jul 21, 2021
1 parent d5879c5 commit ebe3ebf
Show file tree
Hide file tree
Showing 8 changed files with 480 additions and 133 deletions.
207 changes: 76 additions & 131 deletions splunk_connect_for_snmp_poller/manager/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import csv
import functools
import logging.config
import time

import schedule
from splunk_connect_for_snmp_poller.manager.tasks import snmp_polling
from splunk_connect_for_snmp_poller.manager.validator.inventory_validator import (
is_valid_inventory_line_from_dict,
should_process_inventory_line,
from pysnmp.hlapi import SnmpEngine
from splunk_connect_for_snmp_poller.manager.poller_utilities import (
automatic_realtime_task,
create_poller_scheduler_entry_key,
parse_inventory_file,
)
from splunk_connect_for_snmp_poller.manager.tasks import snmp_polling
from splunk_connect_for_snmp_poller.mongo import WalkedHostsRepository
from splunk_connect_for_snmp_poller.utilities import (
file_was_modified,
Expand All @@ -46,133 +47,102 @@ def __init__(self, args, server_config):
self._mongo_walked_hosts_coll = WalkedHostsRepository(
self._server_config["mongo"]
)
self._local_snmp_engine = SnmpEngine()

def get_splunk_indexes(self):
index = {
def __get_splunk_indexes(self):
return {
"event_index": self._args.event_index,
"metric_index": self._args.metric_index,
}
return index

def __get_realtime_task_frequency(self):
return self._args.realtime_task_frequency

def run(self):
self.__start_realtime_scheduler_task()
counter = 0
while True:
if counter == 0:
self.check_inventory()
self.__check_inventory()
counter = int(self._args.refresh_interval)

schedule.run_pending()
time.sleep(1)
counter -= 1

def should_process_current_line(self, host, version, community, profile, frequency):
return should_process_inventory_line(
host
) and is_valid_inventory_line_from_dict(
host, version, community, profile, frequency
)

def check_inventory(self):
splunk_indexes = self.get_splunk_indexes()

# check if config was modified
def __check_inventory(self):
server_config_modified, self._config_mod_time = file_was_modified(
self._args.config, self._config_mod_time
)
if server_config_modified:
self._server_config = parse_config_file(self._args.config)

# check if inventory was modified
inventory_config_modified, self._inventory_mod_time = file_was_modified(
self._args.inventory, self._inventory_mod_time
)

# update job when either inventory changes or config changes
if server_config_modified or inventory_config_modified:
with open(self._args.inventory, newline="") as csvfile:
inventory = csv.DictReader(csvfile, delimiter=",")

inventory_hosts = set()

for agent in inventory:
host = agent["host"]
version = agent["version"]
community = agent["community"]
profile = agent["profile"]
frequency_str = agent["freqinseconds"]
if self.should_process_current_line(
host, version, community, profile, frequency_str
):
entry_key = host + "#" + profile
frequency = int(agent["freqinseconds"])

if entry_key in inventory_hosts:
logger.error(
(
f"{host},{version},{community},{profile},{frequency_str} has duplicated "
f"hostname {host} and {profile} in the inventory,"
f" cannot use the same profile twice for the same device"
)
)
continue

inventory_hosts.add(entry_key)

logger.info(
f"[-] server_config['profiles']: {self._server_config['profiles']}"
inventory_hosts = set()
for ir in parse_inventory_file(self._args.inventory):
entry_key = create_poller_scheduler_entry_key(ir.host, ir.profile)
frequency = int(ir.frequency_str)
if entry_key in inventory_hosts:
logger.error(
(
f"{ir.host},{ir.version},{ir.community},{ir.profile},{ir.frequency_str} has duplicated "
f"hostname {ir.host} and {ir.profile} in the inventory,"
f" cannot use the same profile twice for the same device"
)
# perform one-time walk for the entire tree for each un-walked host
self.one_time_walk(
host,
version,
community,
Poller.universal_base_oid,
)
continue

inventory_hosts.add(entry_key)
logger.info(
f"[-] server_config['profiles']: {self._server_config['profiles']}"
)
if entry_key not in self._jobs_map:
logger.debug(f"Adding configuration for job {entry_key}")
job_reference = schedule.every(frequency).seconds.do(
scheduled_task,
ir.host,
ir.version,
ir.community,
ir.profile,
self._server_config,
self.__get_splunk_indexes(),
)
self._jobs_map[entry_key] = job_reference
else:
old_conf = self._jobs_map.get(entry_key).job_func.args
if (
old_conf
!= (
ir.host,
ir.version,
ir.community,
ir.profile,
self._server_config,
splunk_indexes,
self.__get_splunk_indexes(),
)
or frequency != self._jobs_map.get(entry_key).interval
):
self.__update_schedule(
ir.community,
ir.frequency,
ir.host,
ir.profile,
ir.version,
self._server_config,
self.__get_splunk_indexes(),
)

if entry_key not in self._jobs_map:
logger.debug(f"Adding configuration for job {entry_key}")
job_reference = schedule.every(int(frequency)).seconds.do(
scheduled_task,
host,
version,
community,
profile,
self._server_config,
splunk_indexes,
)
self._jobs_map[entry_key] = job_reference
else:
old_conf = self._jobs_map.get(entry_key).job_func.args
if (
old_conf
!= (
host,
version,
community,
profile,
self._server_config,
splunk_indexes,
)
or frequency != self._jobs_map.get(entry_key).interval
):
self.update_schedule(
community,
frequency,
host,
profile,
version,
self._server_config,
splunk_indexes,
)
for entry_key in list(self._jobs_map):
if entry_key not in inventory_hosts:
logger.debug(f"Removing job for {entry_key}")
schedule.cancel_job(self._jobs_map.get(entry_key))
del self._jobs_map[entry_key]

def update_schedule(
def __update_schedule(
self,
community,
frequency,
Expand Down Expand Up @@ -206,25 +176,17 @@ def update_schedule(
old_next_run if new_next_run > old_next_run else new_next_run
)

def one_time_walk(
self, host, version, community, profile, server_config, splunk_indexes
):
logger.debug(
f"[-]walked flag: {self._mongo_walked_hosts_coll.contains_host(host)}"
def __start_realtime_scheduler_task(self):
# schedule.every().second.do(
# For debugging purposes better change it to "one second"
schedule.every(self._args.realtime_task_frequency).seconds.do(
automatic_realtime_task,
self._mongo_walked_hosts_coll,
self._args.inventory,
self.__get_splunk_indexes(),
self._server_config,
self._local_snmp_engine,
)
if self._mongo_walked_hosts_coll.contains_host(host) == 0:
schedule.every().second.do(
onetime_task,
host,
version,
community,
profile,
server_config,
splunk_indexes,
)
self._mongo_walked_hosts_coll.add_host(host)
else:
logger.debug(f"[-] One time walk executed for {host}!")


def scheduled_task(host, version, community, profile, server_config, splunk_indexes):
Expand All @@ -233,20 +195,3 @@ def scheduled_task(host, version, community, profile, server_config, splunk_inde
)

snmp_polling.delay(host, version, community, profile, server_config, splunk_indexes)


def onetime_task(host, version, community, profile, server_config, splunk_indexes):
logger.debug(
f"Executing onetime_task for {host} version={version} community={community} profile={profile}"
)

snmp_polling.delay(
host,
version,
community,
profile,
server_config,
splunk_indexes,
one_time_flag=True,
)
return schedule.CancelJob
Loading

0 comments on commit ebe3ebf

Please sign in to comment.