diff --git a/protocol/monitoring/alerting.go b/protocol/monitoring/alerting.go index 02d5c3aba..65a5fd26b 100644 --- a/protocol/monitoring/alerting.go +++ b/protocol/monitoring/alerting.go @@ -36,7 +36,8 @@ const ( ) type AlertingOptions struct { - Url string // where to send the alerts + Url string // where to send the alerts + TelegramAlertingOptions Logging bool // wether to log alerts to stdout Identifier string // a unique identifier added to all alerts SubscriptionCUPercentageAlert float64 @@ -81,6 +82,7 @@ type Alerting struct { suppressedAlerts uint64 // monitoring payload map[string]interface{} colorToggle bool + TelegramAlerting TelegramAlertingOptions } func NewAlerting(options AlertingOptions) *Alerting { @@ -161,6 +163,9 @@ func (al *Alerting) SendAlert(alert string, attributes []AlertAttribute) { if al.url != "" { go al.AppendUrlAlert(alert, attrs) } + if al.TelegramAlerting.TelegramBotToken != "" && al.TelegramAlerting.TelegramChannelID != "" { + al.SendTelegramAlert(alert, attrs) + } if al.logging { if al.identifier != "" { alert = alert + " - " + al.identifier diff --git a/protocol/monitoring/health_cmd.go b/protocol/monitoring/health_cmd.go index b0c8f9df2..f3d2532b1 100644 --- a/protocol/monitoring/health_cmd.go +++ b/protocol/monitoring/health_cmd.go @@ -52,6 +52,8 @@ const ( AllProvidersMarker = "all" ConsumerGrpcTLSFlagName = "consumer-grpc-tls" allowInsecureConsumerDialingFlagName = "allow-insecure-consumer-dialing" + telegramBotTokenFlagName = "telegram-bot-token" + telegramChannelIDFlagName = "telegram-channel-id" ) func ParseEndpoints(keyName string, viper_endpoints *viper.Viper) (endpoints []*HealthRPCEndpoint, err error) { @@ -247,6 +249,8 @@ reference_endpoints: cmdTestHealth.Flags().Bool(AllProvidersFlagName, false, "a flag to overwrite the provider addresses with all the currently staked providers") cmdTestHealth.Flags().Bool(ConsumerGrpcTLSFlagName, true, "use tls configuration for grpc connections to your consumer") cmdTestHealth.Flags().Bool(allowInsecureConsumerDialingFlagName, false, "used to test grpc, to allow insecure (self signed cert).") + cmdTestHealth.Flags().String(telegramBotTokenFlagName, "", "telegram bot token used for sending alerts to telegram channels (obtain from @BotFather)") + cmdTestHealth.Flags().String(telegramChannelIDFlagName, "", "telegram channel ID where alerts will be sent (must start with -100)") viper.BindPFlag(queryRetriesFlagName, cmdTestHealth.Flags().Lookup(queryRetriesFlagName)) // bind the flag flags.AddQueryFlagsToCmd(cmdTestHealth) common.AddRollingLogConfig(cmdTestHealth) diff --git a/protocol/monitoring/telegram_alerting.go b/protocol/monitoring/telegram_alerting.go new file mode 100644 index 000000000..23d312345 --- /dev/null +++ b/protocol/monitoring/telegram_alerting.go @@ -0,0 +1,58 @@ +package monitoring + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + + "github.com/lavanet/lava/v4/utils" +) + +type TelegramAlertingOptions struct { + TelegramBotToken string + TelegramChannelID string +} + +const TELEGRAM_URL = "https://api.telegram.org" + +func NewTelegramAlerting(options TelegramAlertingOptions) *TelegramAlertingOptions { + return &TelegramAlertingOptions{ + TelegramBotToken: options.TelegramBotToken, + TelegramChannelID: options.TelegramChannelID, + } +} + +func (al *Alerting) SendTelegramAlert(alert string, attrs []utils.Attribute) error { + if al.TelegramAlerting.TelegramBotToken == "" || al.TelegramAlerting.TelegramChannelID == "" { + return fmt.Errorf("telegram configuration missing") + } + + message := fmt.Sprintf("%s\n", alert) + for _, attr := range attrs { + message += fmt.Sprintf("%s: %v\n", attr.Key, attr.Value) + } + + payload := map[string]string{ + "chat_id": al.TelegramAlerting.TelegramChannelID, + "text": message, + } + + jsonData, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal payload: %v", err) + } + + url := fmt.Sprintf("%s/bot%s/sendMessage", TELEGRAM_URL, al.TelegramAlerting.TelegramBotToken) + resp, err := http.Post(url, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to send telegram alert: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("telegram API returned non-200 status: %d", resp.StatusCode) + } + + return nil +} diff --git a/protocol/monitoring/telegram_alerting_test.go b/protocol/monitoring/telegram_alerting_test.go new file mode 100644 index 000000000..980662f6d --- /dev/null +++ b/protocol/monitoring/telegram_alerting_test.go @@ -0,0 +1,134 @@ +package monitoring_test + +import ( + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/lavanet/lava/v4/protocol/monitoring" + "github.com/lavanet/lava/v4/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTelegramAlerting_SendTelegramAlert(t *testing.T) { + tests := []struct { + name string + botToken string + channelID string + alert string + attrs []utils.Attribute + mockResponse string + mockStatus int + expectedError bool + checkRequest func(*testing.T, *http.Request) + }{ + { + name: "successful alert", + botToken: "test_token", + channelID: "test_channel", + alert: "Test Alert", + attrs: []utils.Attribute{ + {Key: "severity", Value: "high"}, + {Key: "service", Value: "test-service"}, + }, + mockResponse: `{"ok":true}`, + mockStatus: http.StatusOK, + expectedError: false, + checkRequest: func(t *testing.T, r *http.Request) { + // Check method and content type + assert.Equal(t, "POST", r.Method) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + + // Read and verify request body + body, err := io.ReadAll(r.Body) + require.NoError(t, err) + + // Check if body contains expected content + bodyStr := string(body) + assert.Contains(t, bodyStr, "Test Alert") + assert.Contains(t, bodyStr, "severity") + assert.Contains(t, bodyStr, "high") + assert.Contains(t, bodyStr, "service") + assert.Contains(t, bodyStr, "test-service") + }, + }, + { + name: "missing configuration", + botToken: "", + channelID: "", + alert: "Test Alert", + attrs: []utils.Attribute{}, + expectedError: true, + }, + { + name: "server error", + botToken: "test_token", + channelID: "test_channel", + alert: "Test Alert", + attrs: []utils.Attribute{}, + mockResponse: `{"ok":false}`, + mockStatus: http.StatusInternalServerError, + expectedError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create test server if mockResponse is provided + var ts *httptest.Server + if tt.mockResponse != "" { + ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tt.checkRequest != nil { + tt.checkRequest(t, r) + } + w.WriteHeader(tt.mockStatus) + w.Write([]byte(tt.mockResponse)) + })) + defer ts.Close() + } + + // Initialize TelegramAlerting + options := monitoring.TelegramAlertingOptions{ + TelegramBotToken: tt.botToken, + TelegramChannelID: tt.channelID, + } + alerting := monitoring.NewTelegramAlerting(options) + + // Send alert + al := &monitoring.Alerting{TelegramAlerting: *alerting} + err := al.SendTelegramAlert(tt.alert, tt.attrs) + + // Check error + if tt.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +// Optional: Integration test (disabled by default) +func TestTelegramAlerting_Integration(t *testing.T) { + t.Skip("Integration test - run manually with valid credentials") + + options := monitoring.TelegramAlertingOptions{ + TelegramBotToken: "YOUR_BOT_TOKEN", // Replace with actual token + TelegramChannelID: "YOUR_CHANNEL_ID", // Replace with actual channel ID + } + + alerting := monitoring.NewTelegramAlerting(options) + + al := &monitoring.Alerting{TelegramAlerting: *alerting} + err := al.SendTelegramAlert( + "Integration Test Alert", + []utils.Attribute{ + {Key: "test_key", Value: "test_value"}, + {Key: "timestamp", Value: "2024-03-14 12:00:00"}, + }, + ) + + assert.NoError(t, err) +}