Skip to content

Commit

Permalink
fix: improve handling of streaming error state changes/logging (#439)
Browse files Browse the repository at this point in the history
The existing handling for data source state transitions when receiving
`sse` errors was incorrect. It was treating any error as a state
transition to `kOff/kShutdown`, and logging at the `error` level.

The correct behavior would be making that transition only if the error
is unrecoverable. Additionally, recoverable errors should be logged at
the debug level, in order to not overstate the impact of the issue.

This bug did not actually affect the backoff behavior of the
eventsource, but it did impact status listeners and any code that queried the existing status within the SDK.
  • Loading branch information
cwaldren-ld committed Sep 30, 2024
1 parent 1dcba43 commit 2800445
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,15 @@ enum LDDataSourceStatus_State {
LD_DATASOURCESTATUS_STATE_SHUTDOWN = 4
};

/**
*
* @return Returns the name of the given LDDataSourceStatus_State as a string.
* If the enum value is not recognized, the default string value is returned.
*/
LD_EXPORT(char const*)
LDDataSourceStatus_State_Name(enum LDDataSourceStatus_State,
char const* default_if_unknown);

/**
* Get an enumerated value representing the overall current state of the data
* source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,7 @@ enum class DataSourceState {
* SDK key will never become valid), or because the SDK client was
* explicitly shut down.
*/
kShutdown = 4,

// BackgroundDisabled,

// TODO: A plugin of sorts would likely be required to implement
// network availability.
// kNetworkUnavailable,
kShutdown = 4
};

using DataSourceStatus =
Expand Down Expand Up @@ -116,6 +110,14 @@ class IDataSourceStatusProvider {
IDataSourceStatusProvider() = default;
};

/**
*
* @return Returns the name of the given DataSourceState as a string. If
* the enum value is not recognized, the default string value is returned.
*/
char const* GetDataSourceStateName(DataSourceState state,
char const* default_if_unknown);

std::ostream& operator<<(std::ostream& out,
DataSourceStatus::DataSourceState const& state);

Expand Down
8 changes: 8 additions & 0 deletions libs/client-sdk/src/bindings/c/sdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,14 @@ LDDataSourceStatus_GetState(LDDataSourceStatus status) {
TO_DATASOURCESTATUS(status)->State());
}

LD_EXPORT(char const*)
LDDataSourceStatus_State_Name(enum LDDataSourceStatus_State state,
char const* default_if_unknown) {
return GetDataSourceStateName(
static_cast<data_sources::DataSourceStatus::DataSourceState>(state),
default_if_unknown);
}

LD_EXPORT(LDDataSourceStatus_ErrorInfo)
LDDataSourceStatus_GetLastError(LDDataSourceStatus status) {
LD_ASSERT_NOT_NULL(status);
Expand Down
18 changes: 8 additions & 10 deletions libs/client-sdk/src/client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ static bool IsInitializedSuccessfully(DataSourceStatus::DataSourceState state) {
// Was any attempt made to initialize the data source (with a successful or
// permanent failure outcome?)
static bool IsInitialized(DataSourceStatus::DataSourceState state) {
return IsInitializedSuccessfully(state) ||
(state == DataSourceStatus::DataSourceState::kShutdown);
return state != DataSourceStatus::DataSourceState::kInitializing;
}

std::future<bool> ClientImpl::IdentifyAsync(Context context) {
Expand All @@ -155,7 +154,7 @@ std::future<bool> ClientImpl::IdentifyAsync(Context context) {
event_processor_->SendAsync(events::IdentifyEventParams{
std::chrono::system_clock::now(), std::move(context)});

return StartAsyncInternal(IsInitializedSuccessfully);
return StartAsyncInternal();
}

void ClientImpl::RestartDataSource() {
Expand All @@ -169,16 +168,15 @@ void ClientImpl::RestartDataSource() {
data_source_->ShutdownAsync(start_op);
}

std::future<bool> ClientImpl::StartAsyncInternal(
std::function<bool(DataSourceStatus::DataSourceState)> result_predicate) {
std::future<bool> ClientImpl::StartAsyncInternal() {
auto init_promise = std::make_shared<std::promise<bool>>();
auto init_future = init_promise->get_future();

status_manager_.OnDataSourceStatusChangeEx(
[result = std::move(result_predicate),
init_promise](data_sources::DataSourceStatus const& status) {
[init_promise](data_sources::DataSourceStatus const& status) {
if (auto const state = status.State(); IsInitialized(state)) {
init_promise->set_value(result(status.State()));
init_promise->set_value(
IsInitializedSuccessfully(status.State()));
return true; /* delete this change listener since the desired
state was reached */
}
Expand All @@ -191,11 +189,11 @@ std::future<bool> ClientImpl::StartAsyncInternal(
}

std::future<bool> ClientImpl::StartAsync() {
return StartAsyncInternal(IsInitializedSuccessfully);
return StartAsyncInternal();
}

bool ClientImpl::Initialized() const {
return IsInitializedSuccessfully(status_manager_.Status().State());
return IsInitialized(status_manager_.Status().State());
}

std::unordered_map<Client::FlagKey, Value> ClientImpl::AllFlags() const {
Expand Down
5 changes: 1 addition & 4 deletions libs/client-sdk/src/client_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ class ClientImpl : public IClient {

void RestartDataSource();

std::future<bool> StartAsyncInternal(
std::function<bool(data_sources::DataSourceStatus::DataSourceState)>
predicate);

std::future<bool> StartAsyncInternal();
Config config_;
Logger logger_;

Expand Down
25 changes: 13 additions & 12 deletions libs/client-sdk/src/data_sources/data_source_status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,27 @@

namespace launchdarkly::client_side::data_sources {

std::ostream& operator<<(std::ostream& out,
DataSourceStatus::DataSourceState const& state) {
char const* GetDataSourceStateName(DataSourceState state,
char const* default_if_unknown) {
switch (state) {
case DataSourceStatus::DataSourceState::kInitializing:
out << "INITIALIZING";
break;
return "INITIALIZING";
case DataSourceStatus::DataSourceState::kValid:
out << "VALID";
break;
return "VALID";
case DataSourceStatus::DataSourceState::kInterrupted:
out << "INTERRUPTED";
break;
return "INTERRUPTED";
case DataSourceStatus::DataSourceState::kSetOffline:
out << "OFFLINE";
break;
return "OFFLINE";
case DataSourceStatus::DataSourceState::kShutdown:
out << "SHUTDOWN";
break;
return "SHUTDOWN";
default:
return default_if_unknown;
}
}

std::ostream& operator<<(std::ostream& out,
DataSourceStatus::DataSourceState const& state) {
out << GetDataSourceStateName(state, "UNKNOWN");
return out;
}

Expand Down
57 changes: 51 additions & 6 deletions libs/client-sdk/src/data_sources/streaming_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,11 @@ void StreamingDataSource::Start() {

client_builder.errors([weak_self](auto error) {
if (auto self = weak_self.lock()) {
auto error_string = launchdarkly::sse::ErrorToString(error);
LD_LOG(self->logger_, LogLevel::kError) << error_string;
self->status_manager_.SetState(
DataSourceStatus::DataSourceState::kShutdown,
DataSourceStatus::ErrorInfo::ErrorKind::kErrorResponse,
std::move(error_string));
std::string error_string = sse::ErrorToString(error);
LD_LOG(self->logger_, sse::IsRecoverable(error) ? LogLevel::kDebug
: LogLevel::kError);
self->HandleErrorStateChange(std::move(error),
std::move(error_string));
}
});

Expand All @@ -162,6 +161,52 @@ void StreamingDataSource::Start() {
client_->async_connect();
}

template <class>
inline constexpr bool always_false_v = false;

void StreamingDataSource::HandleErrorStateChange(sse::Error error,
std::string error_string) {
auto const state = sse::IsRecoverable(error) ? DataSourceState::kInterrupted
: DataSourceState::kShutdown;
std::visit(
[this, state, error_string = std::move(error_string)](auto error) {
using T = std::decay_t<decltype(error)>;
if constexpr (std::is_same_v<T, sse::errors::ReadTimeout>) {
this->status_manager_.SetState(
state,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
std::move(error_string));

} else if constexpr (std::is_same_v<
T,
sse::errors::UnrecoverableClientError>) {
this->status_manager_.SetState(
state,
static_cast<DataSourceStatusManager::StatusCodeType>(
error.status),
std::move(error_string));

} else if constexpr (std::is_same_v<
T, sse::errors::InvalidRedirectLocation>) {
this->status_manager_.SetState(
state,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
std::move(error_string));

} else if constexpr (std::is_same_v<T,
sse::errors::NotRedirectable>) {
this->status_manager_.SetState(
state,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
std::move(error_string));
} else {
static_assert(always_false_v<decltype(error)>,
"non-exhaustive visitor");
}
},
std::move(error));
}

void StreamingDataSource::ShutdownAsync(std::function<void()> completion) {
if (client_) {
status_manager_.SetState(
Expand Down
2 changes: 2 additions & 0 deletions libs/client-sdk/src/data_sources/streaming_data_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class StreamingDataSource final
void ShutdownAsync(std::function<void()>) override;

private:
void HandleErrorStateChange(sse::Error error, std::string error_string);

Context context_;
boost::asio::any_io_executor exec_;
DataSourceStatusManager& status_manager_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ namespace launchdarkly::internal::data_sources {
template <typename TDataSourceStatus, typename TInterface>
class DataSourceStatusManagerBase : public TInterface {
public:
using StatusCodeType =
typename TDataSourceStatus::ErrorInfo::StatusCodeType;

/**
* Set the state.
*
Expand All @@ -39,7 +42,7 @@ class DataSourceStatusManagerBase : public TInterface {
* @param message The message to associate with the error.
*/
void SetState(typename TDataSourceStatus::DataSourceState state,
typename TDataSourceStatus::ErrorInfo::StatusCodeType code,
StatusCodeType code,
std::string message) {
{
std::lock_guard lock(status_mutex_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,11 @@ void StreamingDataSource::StartAsync(

client_builder.errors([weak_self](auto error) {
if (auto self = weak_self.lock()) {
std::string error_string = launchdarkly::sse::ErrorToString(error);
LD_LOG(self->logger_, LogLevel::kError) << error_string;
self->status_manager_.SetState(
DataSourceStatus::DataSourceState::kOff,
DataSourceStatus::ErrorInfo::ErrorKind::kErrorResponse,
std::move(error_string));
std::string error_string = sse::ErrorToString(error);
LD_LOG(self->logger_, sse::IsRecoverable(error) ? LogLevel::kDebug
: LogLevel::kError);
self->HandleErrorStateChange(std::move(error),
std::move(error_string));
}
});

Expand All @@ -159,6 +158,52 @@ void StreamingDataSource::StartAsync(
client_->async_connect();
}

template <class>
inline constexpr bool always_false_v = false;

void StreamingDataSource::HandleErrorStateChange(sse::Error error,
std::string error_string) {
auto const state = sse::IsRecoverable(error) ? DataSourceState::kInterrupted
: DataSourceState::kOff;
std::visit(
[this, state, error_string = std::move(error_string)](auto error) {
using T = std::decay_t<decltype(error)>;
if constexpr (std::is_same_v<T, sse::errors::ReadTimeout>) {
this->status_manager_.SetState(
state,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
std::move(error_string));

} else if constexpr (std::is_same_v<
T,
sse::errors::UnrecoverableClientError>) {
this->status_manager_.SetState(
state,
static_cast<data_components::DataSourceStatusManager::
StatusCodeType>(error.status),
std::move(error_string));

} else if constexpr (std::is_same_v<
T, sse::errors::InvalidRedirectLocation>) {
this->status_manager_.SetState(
state,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
std::move(error_string));

} else if constexpr (std::is_same_v<T,
sse::errors::NotRedirectable>) {
this->status_manager_.SetState(
state,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
std::move(error_string));
} else {
static_assert(always_false_v<decltype(error)>,
"non-exhaustive visitor");
}
},
std::move(error));
}

void StreamingDataSource::ShutdownAsync(std::function<void()> completion) {
if (client_) {
status_manager_.SetState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class StreamingDataSource final
[[nodiscard]] std::string const& Identity() const override;

private:
void HandleErrorStateChange(sse::Error error, std::string error_string);

boost::asio::any_io_executor io_;
Logger const& logger_;

Expand Down
8 changes: 3 additions & 5 deletions libs/server-sent-events/include/launchdarkly/sse/error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
namespace launchdarkly::sse {
namespace errors {

struct NoContent {};
std::ostream& operator<<(std::ostream& out, NoContent const&);

struct InvalidRedirectLocation {
std::string location;
};
Expand All @@ -32,12 +29,13 @@ std::ostream& operator<<(std::ostream& out, UnrecoverableClientError const&);

} // namespace errors

using Error = std::variant<errors::NoContent,
errors::InvalidRedirectLocation,
using Error = std::variant<errors::InvalidRedirectLocation,
errors::NotRedirectable,
errors::ReadTimeout,
errors::UnrecoverableClientError>;

bool IsRecoverable(Error const& error);

std::ostream& operator<<(std::ostream& out, Error const& error);

std::string ErrorToString(Error const& error);
Expand Down
4 changes: 2 additions & 2 deletions libs/server-sent-events/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ class FoxyClient : public Client,

if (status_class == beast::http::status_class::successful) {
if (response.result() == beast::http::status::no_content) {
errors_(errors::NoContent{});
errors_(
errors::UnrecoverableClientError{http::status::no_content});
return;
}
if (!correct_content_type(response)) {
Expand Down Expand Up @@ -355,7 +356,6 @@ class FoxyClient : public Client,
logger_("exception closing stream: " + std::string(err.what()));
}


// Ideally we would call session_->async_shutdown() here to gracefully
// terminate the SSL session. For unknown reasons, this call appears to
// hang indefinitely and never complete until the SDK client is
Expand Down
Loading

0 comments on commit 2800445

Please sign in to comment.