From dbb2193c3215ee86ea5a54a19efecc0985b4511b Mon Sep 17 00:00:00 2001 From: Tedi Mitiku Date: Sat, 9 Nov 2024 16:56:55 -0800 Subject: [PATCH] fix: remove closeChannelWhenEmpty busy loop (#2594) ## Description Addresses https://github.com/kurtosis-tech/kurtosis/issues/2593 by getting rid of the busy loop entirely. The loop was checking to make sure all values from the channel were read before closing the channel. When following logs, if a stream is cancelled before reading all logs in the channel, it loops infinitely. This changes the consumer to loop until it's read everything from the channel, even after its been closed. Also, adds a port for pprof to help with profiling in the future. ## Is this change user facing? YES ## References https://github.com/kurtosis-tech/kurtosis/issues/2593 --- cli/cli/commands/web/web.go | 6 ++-- .../user_support_constants.go | 2 -- .../persistent_volume_logs_database_client.go | 16 ++-------- ...istent_volume_logs_database_client_test.go | 6 ++-- engine/server/engine/main.go | 29 ++++++++++--------- .../server/engine_connect_server_service.go | 6 ++-- 6 files changed, 29 insertions(+), 36 deletions(-) diff --git a/cli/cli/commands/web/web.go b/cli/cli/commands/web/web.go index 3a4aafdfb9..2fb08b0419 100644 --- a/cli/cli/commands/web/web.go +++ b/cli/cli/commands/web/web.go @@ -7,9 +7,9 @@ import ( "github.com/kurtosis-tech/kurtosis/cli/cli/command_framework/lowlevel/flags" "github.com/kurtosis-tech/kurtosis/cli/cli/command_str_consts" "github.com/kurtosis-tech/kurtosis/cli/cli/helpers/multi_os_command_executor" - "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/user_support_constants" "github.com/kurtosis-tech/kurtosis/contexts-config-store/store" "github.com/kurtosis-tech/stacktrace" + "github.com/sirupsen/logrus" ) var WebCmd = &lowlevel.LowlevelKurtosisCommand{ @@ -35,9 +35,7 @@ func run(_ context.Context, _ *flags.ParsedFlags, _ *args.ParsedArgs) error { return stacktrace.Propagate(err, "tried fetching the current Kurtosis context but failed, we can't switch clusters without this information. This is a bug in Kurtosis") } if store.IsRemote(currentKurtosisContext) { - if err := multi_os_command_executor.OpenFile(user_support_constants.KurtosisCloudLink); err != nil { - return stacktrace.Propagate(err, "An error occurred while opening the Kurtosis Cloud Web UI") - } + logrus.Warn("Kurtosis Cloud has been deprecated. Switch to a local kurtosis context to use the local Kurtosis web UI instead.") } if err := multi_os_command_executor.OpenFile(webUILink); err != nil { diff --git a/container-engine-lib/lib/user_support_constants/user_support_constants.go b/container-engine-lib/lib/user_support_constants/user_support_constants.go index a50b590632..166b69f601 100644 --- a/container-engine-lib/lib/user_support_constants/user_support_constants.go +++ b/container-engine-lib/lib/user_support_constants/user_support_constants.go @@ -31,7 +31,6 @@ const ( FeedbackEmail = "feedback@" + OldDomain FeedbackEmailLink = "mailto:" + FeedbackEmail KurtosisTechTwitterProfileLink = "https://twitter.com/KurtosisTech" - KurtosisCloudLink = "https://cloud." + Domain // If you add new URLs above, make sure to add them to the urlsToValidateInTest below!!! // WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING @@ -56,6 +55,5 @@ var urlsToValidateInTest = []string{ KurtosisDiscordUrl, KurtosisOnBoardCalendlyUrl, HowImportWorksLink, - KurtosisCloudLink, KurtosisTechTwitterProfileLink, } diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go index 67bd58b6bb..1afdd99f30 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client.go @@ -90,14 +90,13 @@ func (client *persistentVolumeLogsDatabaseClient) StreamUserServiceLogs( // this go routine handles the stream cancellation go func() { - //wait for stream go routine to end + // wait for stream go routine to end wgSenders.Wait() - // send all buffered log lines + // flush should send remainder of logs in the buffer to the channel to be read logLineSender.Flush() - // wait until the channel has been fully read/empty before closing it - closeChannelWhenEmpty(logsByKurtosisUserServiceUuidChan) + close(logsByKurtosisUserServiceUuidChan) close(streamErrChan) //then cancel the context @@ -174,12 +173,3 @@ func (client *persistentVolumeLogsDatabaseClient) streamServiceLogLines( shouldReturnAllLogs, numLogLines) } - -func closeChannelWhenEmpty(logsChan chan map[service.ServiceUUID][]logline.LogLine) { - for { - if len(logsChan) == 0 { - close(logsChan) - return - } - } -} diff --git a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go index d245581bde..6e3d77c2a6 100644 --- a/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go +++ b/engine/server/engine/centralized_logs/client_implementations/persistent_volume/persistent_volume_logs_database_client_test.go @@ -721,11 +721,13 @@ func executeStreamCallAndGetReceivedServiceLogLines( case <-time.Tick(testTimeOut): return nil, stacktrace.NewError("Receiving stream logs in the test has reached the '%v' time out", testTimeOut) case streamErr, isChanOpen := <-errChan: - if !isChanOpen { + if !isChanOpen && len(userServiceLogsByUuidChan) == 0 { shouldReceiveStream = false break } - return nil, stacktrace.Propagate(streamErr, "Receiving streaming error.") + if isChanOpen && streamErr != nil { + return nil, stacktrace.Propagate(streamErr, "Receiving streaming error.") + } case userServiceLogsByUuid, isChanOpen := <-userServiceLogsByUuidChan: if !isChanOpen { shouldReceiveStream = false diff --git a/engine/server/engine/main.go b/engine/server/engine/main.go index 4670968c54..936bf24aa9 100644 --- a/engine/server/engine/main.go +++ b/engine/server/engine/main.go @@ -8,18 +8,6 @@ package main import ( "context" "fmt" - "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/file_layout" - "io/fs" - "math" - "net" - "net/http" - "os" - "path" - "path/filepath" - "runtime" - "strings" - "time" - "github.com/kurtosis-tech/kurtosis/api/golang/core/kurtosis_core_rpc_api_bindings" "github.com/kurtosis-tech/kurtosis/api/golang/engine/kurtosis_engine_rpc_api_bindings/kurtosis_engine_rpc_api_bindingsconnect" enclaveApi "github.com/kurtosis-tech/kurtosis/api/golang/http_rest/server/core_rest_api" @@ -39,6 +27,7 @@ import ( "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs" "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/kurtosis_backend" "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume" + "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/file_layout" "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/log_file_manager" "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/logs_clock" "github.com/kurtosis-tech/kurtosis/engine/server/engine/centralized_logs/client_implementations/persistent_volume/stream_logs_strategy" @@ -57,6 +46,17 @@ import ( echomiddleware "github.com/labstack/echo/v4/middleware" "github.com/rs/cors" "github.com/sirupsen/logrus" + "io/fs" + "math" + "net" + "net/http" + _ "net/http/pprof" + "os" + "path" + "path/filepath" + "runtime" + "strings" + "time" ) const ( @@ -73,6 +73,7 @@ const ( functionPathSeparator = "." emptyFunctionName = "" webappPortAddr = ":9711" + pprofPath = "/debug/pprof/" remoteBackendConfigFilename = "remote_backend_config.json" pathToStaticFolder = "/run/webapp" @@ -213,8 +214,9 @@ func runMain() error { } logrus.Debugf("Created environment js file with content: \n%s", envJsFileContent) + handler := http.NewServeMux() fileServer := http.FileServer(http.Dir(pathToStaticFolder)) - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { path, err := filepath.Abs(r.URL.Path) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -236,6 +238,7 @@ func runMain() error { w.Header().Add("Cache-Control", "no-store") fileServer.ServeHTTP(w, r) }) + handler.Handle(pprofPath, http.HandlerFunc(http.DefaultServeMux.ServeHTTP)) err := http.ListenAndServe(webappPortAddr, handler) if err != nil { diff --git a/engine/server/engine/server/engine_connect_server_service.go b/engine/server/engine/server/engine_connect_server_service.go index ea2d266e9e..c19df02394 100644 --- a/engine/server/engine/server/engine_connect_server_service.go +++ b/engine/server/engine/server/engine_connect_server_service.go @@ -366,8 +366,10 @@ func (service *EngineConnectServerService) GetServiceLogs(ctx context.Context, c logrus.Debug("Exiting the stream because an error from the logs database client was received through the error chan.") return stacktrace.Propagate(err, "An error occurred streaming user service logs.") } - logrus.Debug("Exiting the stream loop after receiving a close signal from the error chan") - return nil + if len(serviceLogsByServiceUuidChan) == 0 { + logrus.Debug("Exiting the stream loop after receiving a close signal from the error chan") + return nil + } } } }