From ad9c4eb7ca62a907f0499b9774788a8528f2465a Mon Sep 17 00:00:00 2001 From: a_nackov Date: Wed, 11 Dec 2024 14:22:17 +0200 Subject: [PATCH] Reference #883: env timeout open k8s logstream (#1154) Signed-off-by: Adrian Nackov --- pkg/workceptor/kubernetes.go | 21 +++++++++++- pkg/workceptor/kubernetes_test.go | 55 +++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/pkg/workceptor/kubernetes.go b/pkg/workceptor/kubernetes.go index 310af3492..c766685ff 100644 --- a/pkg/workceptor/kubernetes.go +++ b/pkg/workceptor/kubernetes.go @@ -229,6 +229,25 @@ func podRunningAndReady() func(event watch.Event) (bool, error) { return inner } +func GetTimeoutOpenLogstream(kw *KubeUnit) int { + // RECEPTOR_OPEN_LOGSTREAM_TIMEOUT + // default: 1 + openLogStreamTimeout := 1 + envTimeout := os.Getenv("RECEPTOR_OPEN_LOGSTREAM_TIMEOUT") + if envTimeout != "" { + var err error + openLogStreamTimeout, err = strconv.Atoi(envTimeout) + if err != nil || openLogStreamTimeout < 1 { + // ignore error, use default + kw.GetWorkceptor().nc.GetLogger().Warning("Invalid value for RECEPTOR_OPEN_LOGSTREAM_TIMEOUT: %s. Ignoring", envTimeout) + openLogStreamTimeout = 1 + } + } + kw.GetWorkceptor().nc.GetLogger().Debug("RECEPTOR_OPEN_LOGSTREAM_TIMEOUT: %d", openLogStreamTimeout) + + return openLogStreamTimeout +} + func (kw *KubeUnit) kubeLoggingConnectionHandler(timestamps bool, sinceTime time.Time) (io.ReadCloser, error) { var logStream io.ReadCloser var err error @@ -257,7 +276,7 @@ func (kw *KubeUnit) kubeLoggingConnectionHandler(timestamps bool, sinceTime time retries, err, ) - time.Sleep(time.Second) + time.Sleep(time.Duration(GetTimeoutOpenLogstream(kw)) * time.Second) } if err != nil { errMsg := fmt.Sprintf("Error opening log stream for pod %s/%s. Error: %s", podNamespace, podName, err) diff --git a/pkg/workceptor/kubernetes_test.go b/pkg/workceptor/kubernetes_test.go index 0cd25bdbb..48c7f34e8 100644 --- a/pkg/workceptor/kubernetes_test.go +++ b/pkg/workceptor/kubernetes_test.go @@ -106,6 +106,61 @@ func TestShouldUseReconnect(t *testing.T) { } } +func TestGetTimeoutOpenLogstream(t *testing.T) { + const envVariable string = "RECEPTOR_OPEN_LOGSTREAM_TIMEOUT" + + kw, err := startNetceptorNodeWithWorkceptor() + if err != nil { + t.Fatal(err) + } + + tests := []struct { + name string + envValue string + want int + }{ + { + name: "No env value set", + envValue: "", + want: 1, + }, + { + name: "Env value set incorrectly to text", + envValue: "text instead of int", + want: 1, + }, + { + name: "Env value set incorrectly to negative", + envValue: "-1", + want: 1, + }, + { + name: "Env value set incorrectly to zero", + envValue: "0", + want: 1, + }, + { + name: "Env value set correctly", + envValue: "2", + want: 2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.envValue != "" { + os.Setenv(envVariable, tt.envValue) + defer os.Unsetenv(envVariable) + } else { + os.Unsetenv(envVariable) + } + + if got := workceptor.GetTimeoutOpenLogstream(kw); got != tt.want { + t.Errorf("GetTimeoutOpenLogstream() = %v, want %v", got, tt.want) + } + }) + } +} + func TestParseTime(t *testing.T) { type args struct { s string