Skip to content

Commit

Permalink
Alerts REST client
Browse files Browse the repository at this point in the history
  • Loading branch information
kkedziak-splunk committed Dec 2, 2024
1 parent 1016f87 commit 53613e6
Show file tree
Hide file tree
Showing 3 changed files with 468 additions and 5 deletions.
252 changes: 252 additions & 0 deletions solnlib/alerts_rest_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
#
# Copyright 2024 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
from enum import Enum
from typing import Tuple, Union, Optional

from solnlib import splunk_rest_client as rest_client


class AlertType(Enum):
CUSTOM = "custom"
NUMBER_OF_EVENTS = "number of events"
NUMBER_OF_HOSTS = "number of hosts"
NUMBER_OF_SOURCES = "number of sources"


class AlertSeverity(Enum):
DEBUG = 1
INFO = 2
WARN = 3
ERROR = 4
SEVERE = 5
FATAL = 6


class AlertComparator(Enum):
GREATER_THAN = "greater than"
LESS_THAN = "less than"
EQUAL_TO = "equal to"
RISES_BY = "rises by"
DROPS_BY = "drops by"
RISES_BY_PERC = "rises by perc"
DROPS_BY_PERC = "drops by perc"


class AlertsRestClient:
"""REST client for handling alerts."""

ENDPOINT = "/services/saved/searches"
headers = [("Content-Type", "application/json")]

def __init__(
self,
session_key: str,
app: str,
**context: dict,
):
"""Initializes AlertsRestClient.
Arguments:
session_key: Splunk access token.
app: App name of namespace.
context: Other configurations for Splunk rest client.
"""
self.session_key = session_key
self.app = app

self._rest_client = rest_client.SplunkRestClient(
self.session_key, app=self.app, **context
)

def create_search_alert(
self,
name: str,
search: str,
*,
disabled: bool = True,
description: str = "",
alert_type: AlertType = AlertType.NUMBER_OF_EVENTS,
alert_condition: str = "",
alert_comparator: AlertComparator = AlertComparator.GREATER_THAN,
alert_threshold: Union[int, float, str] = 0,
time_window: Tuple[str, str] = ("-15m", "now"),
alert_severity: AlertSeverity = AlertSeverity.WARN,
cron_schedule: str = "* * * * *",
expires: Union[int, str] = "24h",
**kwargs,
):
"""Creates a search alert in Splunk.
Arguments:
name: Name of the alert.
search: Search query for the alert.
disabled: Whether the alert is disabled. Default is True.
description: Description of the alert.
alert_type: Type of the alert (see AlertType). If it equals to CUSTOM, Splunk executes a check in
alert_condition. Otherwise, alert_comparator and alert_threshold are used.
alert_condition: Condition for the alert.
alert_comparator: Comparator for the alert. Default is GREATER_THAN.
alert_threshold: Threshold for the alert. Default is 0.
time_window: Time window for the alert. Tuple of earliest and latest time. Default is ("-1h", "now").
alert_severity: Severity level of the alert. Default is WARN.
cron_schedule: Cron schedule for the alert. Default is "* * * * *".
expires: Expiration time for the alert (i.e. how long you can access the result of triggered alert).
Default is "24h".
kwargs: Additional parameters for the alert. See Splunk documentation for more details.
"""
params = {
"output_mode": "json",
"name": name,
"search": search,
"description": description,
"alert_type": alert_type.value,
"alert_condition": alert_condition,
"alert_comparator": alert_comparator.value,
"alert_threshold": alert_threshold,
"alert.severity": str(alert_severity.value),
"is_scheduled": "1",
"cron_schedule": cron_schedule,
"dispatch.earliest_time": time_window[0],
"dispatch.latest_time": time_window[1],
"alert.digest_mode": "1",
"alert.expires": str(expires),
"disabled": "1" if disabled else "0",
"realtime_schedule": "1",
}

params.update(kwargs)

self._rest_client.post(self.ENDPOINT, body=params, headers=self.headers)

def delete_search_alert(self, name: str):
"""Deletes a search alert in Splunk.
Arguments:
name: Name of the alert to delete.
"""
self._rest_client.delete(f"{self.ENDPOINT}/{name}")

def get_search_alert(self, name: str):
"""Retrieves a specific search alert from Splunk.
Arguments:
name: Name of the alert to retrieve.
Returns:
A dictionary containing the alert details.
"""
response = (
self._rest_client.get(f"{self.ENDPOINT}/{name}", output_mode="json")
.body.read()
.decode("utf-8")
)

return json.loads(response)

def get_all_search_alerts(self):
"""Retrieves all search alerts from Splunk.
Returns:
A dictionary containing all search alerts.
"""
response = (
self._rest_client.get(self.ENDPOINT, output_mode="json")
.body.read()
.decode("utf-8")
)

return json.loads(response)

def update_search_alert(
self,
name: str,
*,
search: Optional[str] = None,
disabled: Optional[bool] = None,
description: Optional[str] = None,
alert_type: Optional[AlertType] = None,
alert_condition: Optional[str] = None,
alert_comparator: Optional[AlertComparator] = None,
alert_threshold: Optional[Union[int, float, str]] = None,
time_window: Optional[Tuple[str, str]] = None,
alert_severity: Optional[AlertSeverity] = None,
cron_schedule: Optional[str] = None,
expires: Optional[Union[int, str]] = None,
**kwargs,
):
"""Updates a search alert in Splunk.
Arguments:
name: Name of the alert to update.
search: Search query for the alert.
disabled: Whether the alert is disabled.
description: Description of the alert.
alert_type: Type of the alert (see AlertType). If it equals to CUSTOM, Splunk executes a check in
alert_condition. Otherwise, alert_comparator and alert_threshold are used.
alert_condition: Condition for the alert.
alert_comparator: Comparator for the alert.
alert_threshold: Threshold for the alert.
time_window: Time window for the alert. Tuple of earliest and latest time.
alert_severity: Severity level of the alert.
cron_schedule: Cron schedule for the alert.
expires: Expiration time for the alert.
kwargs: Additional parameters for the alert. See Splunk documentation for more details.
"""
params = {
"output_mode": "json",
}

if search:
params["search"] = search

if disabled is not None:
params["disabled"] = "1" if disabled else "0"

if description:
params["description"] = description

if alert_type:
params["alert_type"] = alert_type.value

if alert_condition:
params["alert_condition"] = alert_condition

if alert_comparator:
params["alert_comparator"] = alert_comparator.value

if alert_threshold:
params["alert_threshold"] = str(alert_threshold)

if time_window:
params["dispatch.earliest_time"] = time_window[0]
params["dispatch.latest_time"] = time_window[1]

if alert_severity:
params["alert.severity"] = str(alert_severity.value)

if cron_schedule:
params["is_scheduled"] = "1"
params["cron_schedule"] = cron_schedule

if expires:
params["alert.expires"] = str(expires)

params.update(kwargs)

self._rest_client.post(
f"{self.ENDPOINT}/{name}", body=params, headers=self.headers
)
25 changes: 20 additions & 5 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
import os
import sys

# path manipulation get the 'splunk' library for the imports while running on GH Actions
sys.path.append(
os.path.sep.join([os.environ["SPLUNK_HOME"], "lib", "python3.7", "site-packages"])
)
# TODO: 'python3.7' needs to be updated as and when Splunk has new folder for Python.
import pytest

import context


@pytest.fixture(autouse=True, scope="session")
def setup_env():
# path manipulation get the 'splunk' library for the imports while running on GH Actions
if "SPLUNK_HOME" in os.environ:
sys.path.append(
os.path.sep.join(
[os.environ["SPLUNK_HOME"], "lib", "python3.7", "site-packages"]
)
)
# TODO: 'python3.7' needs to be updated as and when Splunk has new folder for Python.


@pytest.fixture(scope="session")
def session_key():
return context.get_session_key()
Loading

0 comments on commit 53613e6

Please sign in to comment.