From 2100a65cc40fb412f23dc749ef174a770fe4a081 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Fri, 27 Sep 2024 15:36:54 -0700 Subject: [PATCH] fix: improve handling of streaming error state changes/logging --- .../data_sources/streaming_data_source.cpp | 57 +++++++++++++++++-- .../data_sources/streaming_data_source.hpp | 2 + .../data_source_status_manager_base.hpp | 5 +- .../streaming/streaming_data_source.cpp | 57 +++++++++++++++++-- .../streaming/streaming_data_source.hpp | 2 + .../include/launchdarkly/sse/error.hpp | 8 +-- libs/server-sent-events/src/client.cpp | 4 +- libs/server-sent-events/src/error.cpp | 10 ++-- 8 files changed, 119 insertions(+), 26 deletions(-) diff --git a/libs/client-sdk/src/data_sources/streaming_data_source.cpp b/libs/client-sdk/src/data_sources/streaming_data_source.cpp index d26370327..d28b36677 100644 --- a/libs/client-sdk/src/data_sources/streaming_data_source.cpp +++ b/libs/client-sdk/src/data_sources/streaming_data_source.cpp @@ -140,12 +140,11 @@ void StreamingDataSource::Start() { client_builder.errors([weak_self](auto error) { if (auto self = weak_self.lock()) { - auto error_string = launchdarkly::sse::ErrorToString(error); - LD_LOG(self->logger_, LogLevel::kError) << error_string; - self->status_manager_.SetState( - DataSourceStatus::DataSourceState::kShutdown, - DataSourceStatus::ErrorInfo::ErrorKind::kErrorResponse, - std::move(error_string)); + std::string error_string = sse::ErrorToString(error); + LD_LOG(self->logger_, sse::IsRecoverable(error) ? LogLevel::kDebug + : LogLevel::kError); + self->HandleErrorStateChange(std::move(error), + std::move(error_string)); } }); @@ -162,6 +161,52 @@ void StreamingDataSource::Start() { client_->async_connect(); } +template +inline constexpr bool always_false_v = false; + +void StreamingDataSource::HandleErrorStateChange(sse::Error error, + std::string error_string) { + auto const state = sse::IsRecoverable(error) ? DataSourceState::kInterrupted + : DataSourceState::kShutdown; + std::visit( + [this, state, error_string = std::move(error_string)](auto error) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + this->status_manager_.SetState( + state, + DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError, + std::move(error_string)); + + } else if constexpr (std::is_same_v< + T, + sse::errors::UnrecoverableClientError>) { + this->status_manager_.SetState( + state, + static_cast( + error.status), + std::move(error_string)); + + } else if constexpr (std::is_same_v< + T, sse::errors::InvalidRedirectLocation>) { + this->status_manager_.SetState( + state, + DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError, + std::move(error_string)); + + } else if constexpr (std::is_same_v) { + this->status_manager_.SetState( + state, + DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError, + std::move(error_string)); + } else { + static_assert(always_false_v, + "non-exhaustive visitor"); + } + }, + std::move(error)); +} + void StreamingDataSource::ShutdownAsync(std::function completion) { if (client_) { status_manager_.SetState( diff --git a/libs/client-sdk/src/data_sources/streaming_data_source.hpp b/libs/client-sdk/src/data_sources/streaming_data_source.hpp index 82c5b3f3a..b644b711b 100644 --- a/libs/client-sdk/src/data_sources/streaming_data_source.hpp +++ b/libs/client-sdk/src/data_sources/streaming_data_source.hpp @@ -39,6 +39,8 @@ class StreamingDataSource final void ShutdownAsync(std::function) override; private: + void HandleErrorStateChange(sse::Error error, std::string error_string); + Context context_; boost::asio::any_io_executor exec_; DataSourceStatusManager& status_manager_; diff --git a/libs/internal/include/launchdarkly/data_sources/data_source_status_manager_base.hpp b/libs/internal/include/launchdarkly/data_sources/data_source_status_manager_base.hpp index b791d4e0c..a91215816 100644 --- a/libs/internal/include/launchdarkly/data_sources/data_source_status_manager_base.hpp +++ b/libs/internal/include/launchdarkly/data_sources/data_source_status_manager_base.hpp @@ -18,6 +18,9 @@ namespace launchdarkly::internal::data_sources { template class DataSourceStatusManagerBase : public TInterface { public: + using StatusCodeType = + typename TDataSourceStatus::ErrorInfo::StatusCodeType; + /** * Set the state. * @@ -39,7 +42,7 @@ class DataSourceStatusManagerBase : public TInterface { * @param message The message to associate with the error. */ void SetState(typename TDataSourceStatus::DataSourceState state, - typename TDataSourceStatus::ErrorInfo::StatusCodeType code, + StatusCodeType code, std::string message) { { std::lock_guard lock(status_mutex_); diff --git a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp index 576c051ea..778eef153 100644 --- a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp +++ b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.cpp @@ -137,12 +137,11 @@ void StreamingDataSource::StartAsync( client_builder.errors([weak_self](auto error) { if (auto self = weak_self.lock()) { - std::string error_string = launchdarkly::sse::ErrorToString(error); - LD_LOG(self->logger_, LogLevel::kError) << error_string; - self->status_manager_.SetState( - DataSourceStatus::DataSourceState::kOff, - DataSourceStatus::ErrorInfo::ErrorKind::kErrorResponse, - std::move(error_string)); + std::string error_string = sse::ErrorToString(error); + LD_LOG(self->logger_, sse::IsRecoverable(error) ? LogLevel::kDebug + : LogLevel::kError); + self->HandleErrorStateChange(std::move(error), + std::move(error_string)); } }); @@ -159,6 +158,52 @@ void StreamingDataSource::StartAsync( client_->async_connect(); } +template +inline constexpr bool always_false_v = false; + +void StreamingDataSource::HandleErrorStateChange(sse::Error error, + std::string error_string) { + auto const state = sse::IsRecoverable(error) ? DataSourceState::kInterrupted + : DataSourceState::kOff; + std::visit( + [this, state, error_string = std::move(error_string)](auto error) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + this->status_manager_.SetState( + state, + DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError, + std::move(error_string)); + + } else if constexpr (std::is_same_v< + T, + sse::errors::UnrecoverableClientError>) { + this->status_manager_.SetState( + state, + static_cast(error.status), + std::move(error_string)); + + } else if constexpr (std::is_same_v< + T, sse::errors::InvalidRedirectLocation>) { + this->status_manager_.SetState( + state, + DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError, + std::move(error_string)); + + } else if constexpr (std::is_same_v) { + this->status_manager_.SetState( + state, + DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError, + std::move(error_string)); + } else { + static_assert(always_false_v, + "non-exhaustive visitor"); + } + }, + std::move(error)); +} + void StreamingDataSource::ShutdownAsync(std::function completion) { if (client_) { status_manager_.SetState( diff --git a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.hpp b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.hpp index 647eb3319..f606686c1 100644 --- a/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.hpp +++ b/libs/server-sdk/src/data_systems/background_sync/sources/streaming/streaming_data_source.hpp @@ -33,6 +33,8 @@ class StreamingDataSource final [[nodiscard]] std::string const& Identity() const override; private: + void HandleErrorStateChange(sse::Error error, std::string error_string); + boost::asio::any_io_executor io_; Logger const& logger_; diff --git a/libs/server-sent-events/include/launchdarkly/sse/error.hpp b/libs/server-sent-events/include/launchdarkly/sse/error.hpp index 1345d983f..ce56eb803 100644 --- a/libs/server-sent-events/include/launchdarkly/sse/error.hpp +++ b/libs/server-sent-events/include/launchdarkly/sse/error.hpp @@ -9,9 +9,6 @@ namespace launchdarkly::sse { namespace errors { -struct NoContent {}; -std::ostream& operator<<(std::ostream& out, NoContent const&); - struct InvalidRedirectLocation { std::string location; }; @@ -32,12 +29,13 @@ std::ostream& operator<<(std::ostream& out, UnrecoverableClientError const&); } // namespace errors -using Error = std::variant; +bool IsRecoverable(Error const& error); + std::ostream& operator<<(std::ostream& out, Error const& error); std::string ErrorToString(Error const& error); diff --git a/libs/server-sent-events/src/client.cpp b/libs/server-sent-events/src/client.cpp index aebe5b2c3..72e7c8492 100644 --- a/libs/server-sent-events/src/client.cpp +++ b/libs/server-sent-events/src/client.cpp @@ -222,7 +222,8 @@ class FoxyClient : public Client, if (status_class == beast::http::status_class::successful) { if (response.result() == beast::http::status::no_content) { - errors_(errors::NoContent{}); + errors_( + errors::UnrecoverableClientError{http::status::no_content}); return; } if (!correct_content_type(response)) { @@ -355,7 +356,6 @@ class FoxyClient : public Client, logger_("exception closing stream: " + std::string(err.what())); } - // Ideally we would call session_->async_shutdown() here to gracefully // terminate the SSL session. For unknown reasons, this call appears to // hang indefinitely and never complete until the SDK client is diff --git a/libs/server-sent-events/src/error.cpp b/libs/server-sent-events/src/error.cpp index 274641e77..b4cf1c44c 100644 --- a/libs/server-sent-events/src/error.cpp +++ b/libs/server-sent-events/src/error.cpp @@ -5,12 +5,6 @@ namespace launchdarkly::sse { namespace errors { -std::ostream& operator<<(std::ostream& out, NoContent const&) { - out << "received HTTP error 204 (no content) - giving up " - "permanently"; - return out; -} - std::ostream& operator<<(std::ostream& out, InvalidRedirectLocation const& invalid) { out << "received invalid redirect from server, cannot follow (" @@ -60,4 +54,8 @@ std::string ErrorToString(Error const& error) { return ss.str(); } +bool IsRecoverable(Error const& error) { + return std::holds_alternative(error); +} + } // namespace launchdarkly::sse