diff --git a/LICENSE b/LICENSE index 0e53635111..0147fe37fe 100644 --- a/LICENSE +++ b/LICENSE @@ -264,6 +264,9 @@ Meta Velox ./cpp/celeborn/utils/ProcessBase.cpp ./cpp/celeborn/utils/StackTrace.h ./cpp/celeborn/utils/StackTrace.cpp +./cpp/celeborn/utils/CelebornException.h +./cpp/celeborn/utils/CelebornException.cpp +./cpp/celeborn/utils/flags.cpp ------------------------------------------------------------------------------------ diff --git a/cpp/celeborn/utils/CMakeLists.txt b/cpp/celeborn/utils/CMakeLists.txt index f473fb5e6a..ed53401807 100644 --- a/cpp/celeborn/utils/CMakeLists.txt +++ b/cpp/celeborn/utils/CMakeLists.txt @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_library(utils ProcessBase.cpp StackTrace.cpp) +add_library(utils ProcessBase.cpp StackTrace.cpp CelebornException.cpp flags.cpp) target_link_libraries( utils diff --git a/cpp/celeborn/utils/CelebornException.cpp b/cpp/celeborn/utils/CelebornException.cpp new file mode 100644 index 0000000000..aa9c63e92d --- /dev/null +++ b/cpp/celeborn/utils/CelebornException.cpp @@ -0,0 +1,281 @@ +/* + * Based on VeloxException.cpp from Facebook Velox + * + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "celeborn/utils/CelebornException.h" + +#include +#include + +#include + +namespace celeborn { + +std::exception_ptr toCelebornException(const std::exception_ptr& exceptionPtr) { + try { + std::rethrow_exception(exceptionPtr); + } catch (const CelebornException&) { + return exceptionPtr; + } catch (const std::exception& e) { + return std::make_exception_ptr( + CelebornUserError(std::current_exception(), e.what(), false)); + } +} + +int64_t& threadNumCelebornThrow() { + thread_local int64_t numThrow; + return numThrow; +} + +ExceptionContext& getExceptionContext() { + thread_local ExceptionContext context; + return context; +} + +// Retrieves the message of the top-level ancestor of the current exception +// context. If the top-level context message is not empty and is the same as the +// current one, returns a string indicating they are the same. +std::string getTopLevelExceptionContextString( + CelebornException::Type exceptionType, + const std::string& currentMessage) { + auto* context = &getExceptionContext(); + if (context->parent && context->parent->parent) { + while (context->parent && context->parent->parent) { + context = context->parent; + } + auto topLevelMessage = context->message(exceptionType); + if (!topLevelMessage.empty() && topLevelMessage == currentMessage) { + return "Same as context."; + } else { + return topLevelMessage; + } + } + + if (!currentMessage.empty()) { + return "Same as context."; + } + return ""; +} + +CelebornException::CelebornException( + const char* file, + size_t line, + const char* function, + std::string_view failingExpression, + std::string_view message, + std::string_view errorSource, + std::string_view errorCode, + bool isRetriable, + Type exceptionType, + std::string_view exceptionName) + : CelebornException(State::make(exceptionType, [&](auto& state) { + state.exceptionType = exceptionType; + state.exceptionName = exceptionName; + state.file = file; + state.line = line; + state.function = function; + state.failingExpression = failingExpression; + state.message = message; + state.errorSource = errorSource; + state.errorCode = errorCode; + state.context = getExceptionContext().message(exceptionType); + state.topLevelContext = + getTopLevelExceptionContextString(exceptionType, state.context); + state.isRetriable = isRetriable; + })) {} + +CelebornException::CelebornException( + const std::exception_ptr& e, + std::string_view message, + std::string_view errorSource, + std::string_view errorCode, + bool isRetriable, + Type exceptionType, + std::string_view exceptionName) + : CelebornException(State::make([&](auto& state) { + state.exceptionType = exceptionType; + state.exceptionName = exceptionName; + state.file = "UNKNOWN"; + state.line = 0; + state.function = ""; + state.failingExpression = ""; + state.message = message; + state.errorSource = errorSource; + state.errorCode = errorCode; + state.context = getExceptionContext().message(exceptionType); + state.topLevelContext = + getTopLevelExceptionContextString(exceptionType, state.context); + state.isRetriable = isRetriable; + state.wrappedException = e; + })) {} + +namespace { + +/// returns whether CelebornException stacktraces are enabled and whether, if +/// they are rate-limited, whether the rate-limit check passes +bool isStackTraceEnabled(CelebornException::Type type) { + using namespace std::literals::chrono_literals; + const bool isSysException = type == CelebornException::Type::kSystem; + if ((isSysException && !FLAGS_celeborn_exception_system_stacktrace_enabled) || + (!isSysException && !FLAGS_celeborn_exception_user_stacktrace_enabled)) { + // CelebornException stacktraces are disabled. + return false; + } + + const int32_t rateLimitMs = isSysException + ? FLAGS_celeborn_exception_system_stacktrace_rate_limit_ms + : FLAGS_celeborn_exception_user_stacktrace_rate_limit_ms; + // not static so the gflag can be manipulated at runtime + if (0 == rateLimitMs) { + // CelebornException stacktraces are not rate-limited + return true; + } + static folly::AtomicStruct systemLast; + static folly::AtomicStruct userLast; + auto* last = isSysException ? &systemLast : &userLast; + + auto const now = std::chrono::steady_clock::now(); + auto latest = last->load(std::memory_order_relaxed); + if (now < latest + std::chrono::milliseconds(rateLimitMs)) { + // CelebornException stacktraces are rate-limited and the rate-limit check + // failed + return false; + } + + // CelebornException stacktraces are rate-limited and the rate-limit check + // passed + // + // the cas happens only here, so the rate-limit check in effect gates not + // only computation of the stacktrace but also contention on this atomic + // variable + return last->compare_exchange_strong(latest, now, std::memory_order_relaxed); +} + +} // namespace + +template +std::shared_ptr CelebornException::State::make( + CelebornException::Type exceptionType, + F f) { + auto state = std::make_shared(); + if (isStackTraceEnabled(exceptionType)) { + // new v.s. make_unique to avoid any extra frames from make_unique + state->stackTrace.reset(new utils::StackTrace()); + } + f(*state); + return state; +} + +/* +Not much to say. Constructs the elaborate message from the available +pieces of information. + */ +void CelebornException::State::finalize() const { + assert(elaborateMessage.empty()); + + // Fill elaborateMessage_ + if (!exceptionName.empty()) { + elaborateMessage += "Exception: "; + elaborateMessage += exceptionName; + elaborateMessage += '\n'; + } + + if (!errorSource.empty()) { + elaborateMessage += "Error Source: "; + elaborateMessage += errorSource; + elaborateMessage += '\n'; + } + + if (!errorCode.empty()) { + elaborateMessage += "Error Code: "; + elaborateMessage += errorCode; + elaborateMessage += '\n'; + } + + if (!message.empty()) { + elaborateMessage += "Reason: "; + elaborateMessage += message; + elaborateMessage += '\n'; + } + + elaborateMessage += "Retriable: "; + elaborateMessage += isRetriable ? "True" : "False"; + elaborateMessage += '\n'; + + if (!failingExpression.empty()) { + elaborateMessage += "Expression: "; + elaborateMessage += failingExpression; + elaborateMessage += '\n'; + } + + if (!context.empty()) { + elaborateMessage += "Context: " + context + "\n"; + } + + if (!topLevelContext.empty()) { + elaborateMessage += "Top-Level Context: " + topLevelContext + "\n"; + } + + if (function) { + elaborateMessage += "Function: "; + elaborateMessage += function; + elaborateMessage += '\n'; + } + + if (file) { + elaborateMessage += "File: "; + elaborateMessage += file; + elaborateMessage += '\n'; + } + + if (line) { + elaborateMessage += "Line: "; + auto len = elaborateMessage.size(); + size_t t = line; + do { + elaborateMessage += static_cast('0' + t % 10); + t /= 10; + } while (t); + reverse(elaborateMessage.begin() + len, elaborateMessage.end()); + elaborateMessage += '\n'; + } + + elaborateMessage += "Stack trace:\n"; + if (stackTrace) { + elaborateMessage += stackTrace->toString(); + } else { + elaborateMessage += "Stack trace has been disabled."; + if (exceptionType == CelebornException::Type::kSystem) { + elaborateMessage += + " Use --celeborn_exception_system_stacktrace_enabled=true to enable it.\n"; + } else { + elaborateMessage += + " Use --celeborn_exception_user_stacktrace_enabled=true to enable it.\n"; + } + } +} + +const char* CelebornException::State::what() const noexcept { + try { + folly::call_once(once, [&] { finalize(); }); + return elaborateMessage.c_str(); + } catch (...) { + return ""; + } +} + +} // namespace celeborn diff --git a/cpp/celeborn/utils/CelebornException.h b/cpp/celeborn/utils/CelebornException.h new file mode 100644 index 0000000000..a711ca78ee --- /dev/null +++ b/cpp/celeborn/utils/CelebornException.h @@ -0,0 +1,415 @@ +/* + * Based on VeloxException.h from Facebook Velox + * + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "celeborn/utils/StackTrace.h" + +DECLARE_bool(celeborn_exception_user_stacktrace_enabled); +DECLARE_bool(celeborn_exception_system_stacktrace_enabled); + +DECLARE_int32(celeborn_exception_user_stacktrace_rate_limit_ms); +DECLARE_int32(celeborn_exception_system_stacktrace_rate_limit_ms); + +namespace celeborn { + +namespace error_source { +using namespace folly::string_literals; + +// Errors where the root cause of the problem is either because of bad input +// or an unsupported pattern of use are classified with source USER. Examples +// of errors in this category include syntax errors, unavailable names or +// objects. +inline constexpr auto kErrorSourceUser = "USER"_fs; + +// Errors where the root cause of the problem is an unexpected internal state in +// the system. +inline constexpr auto kErrorSourceRuntime = "RUNTIME"_fs; + +// Errors where the root cause of the problem is some unreliable aspect of the +// system are classified with source SYSTEM. +inline constexpr auto kErrorSourceSystem = "SYSTEM"_fs; +} // namespace error_source + +namespace error_code { +using namespace folly::string_literals; + +//====================== User Error Codes ======================: + +// A generic user error code +inline constexpr auto kGenericUserError = "GENERIC_USER_ERROR"_fs; + +// An error raised when an argument verification fails +inline constexpr auto kInvalidArgument = "INVALID_ARGUMENT"_fs; + +// An error raised when a requested operation is not supported. +inline constexpr auto kUnsupported = "UNSUPPORTED"_fs; + +// Arithmetic errors - underflow, overflow, divide by zero etc. +inline constexpr auto kArithmeticError = "ARITHMETIC_ERROR"_fs; + +// Arithmetic errors - underflow, overflow, divide by zero etc. +inline constexpr auto kSchemaMismatch = "SCHEMA_MISMATCH"_fs; + +//====================== Runtime Error Codes ======================: + +// An error raised when the current state of a component is invalid. +inline constexpr auto kInvalidState = "INVALID_STATE"_fs; + +// An error raised when unreachable code point was executed. +inline constexpr auto kUnreachableCode = "UNREACHABLE_CODE"_fs; + +// An error raised when a requested operation is not yet supported. +inline constexpr auto kNotImplemented = "NOT_IMPLEMENTED"_fs; + +// An error raised when memory pool exceeds limits. +inline constexpr auto kMemCapExceeded = "MEM_CAP_EXCEEDED"_fs; + +// An error raised when memory pool is aborted. +inline constexpr auto kMemAborted = "MEM_ABORTED"_fs; + +// Error caused by memory allocation failure (inclusive of allocator memory cap +// exceeded). +inline constexpr auto kMemAllocError = "MEM_ALLOC_ERROR"_fs; + +// Error caused by failing to allocate cache buffer space for IO. +inline constexpr auto kNoCacheSpace = "NO_CACHE_SPACE"_fs; + +// An error raised when spill bytes exceeds limits. +inline constexpr auto kSpillLimitExceeded = "SPILL_LIMIT_EXCEEDED"_fs; + +// Errors indicating file read corruptions. +inline constexpr auto kFileCorruption = "FILE_CORRUPTION"_fs; + +// Errors indicating file not found. +inline constexpr auto kFileNotFound = "FILE_NOT_FOUND"_fs; + +// We do not know how to classify it yet. +inline constexpr auto kUnknown = "UNKNOWN"_fs; +} // namespace error_code + +class CelebornException : public std::exception { + public: + enum class Type { kUser = 0, kSystem = 1 }; + + CelebornException( + const char* file, + size_t line, + const char* function, + std::string_view expression, + std::string_view message, + std::string_view errorSource, + std::string_view errorCode, + bool isRetriable, + Type exceptionType = Type::kSystem, + std::string_view exceptionName = "CelebornException"); + + /// Wrap an std::exception. + CelebornException( + const std::exception_ptr& e, + std::string_view message, + std::string_view errorSource, + std::string_view errorCode, + bool isRetriable, + Type exceptionType = Type::kSystem, + std::string_view exceptionName = "CelebornException"); + + CelebornException( + const std::exception_ptr& e, + std::string_view message, + std::string_view errorSource, + bool isRetriable, + Type exceptionType = Type::kSystem, + std::string_view exceptionName = "CelebornException") + : CelebornException( + e, + message, + errorSource, + "", + isRetriable, + exceptionType, + exceptionName) {} + + // Inherited + const char* what() const noexcept override { + return state_->what(); + } + + // Introduced nonvirtuals + const utils::StackTrace* stackTrace() const { + return state_->stackTrace.get(); + } + const char* file() const { + return state_->file; + } + size_t line() const { + return state_->line; + } + const char* function() const { + return state_->function; + } + const std::string& failingExpression() const { + return state_->failingExpression; + } + const std::string& message() const { + return state_->message; + } + + const std::string& errorCode() const { + return state_->errorCode; + } + + const std::string& errorSource() const { + return state_->errorSource; + } + + Type exceptionType() const { + return state_->exceptionType; + } + + const std::string& exceptionName() const { + return state_->exceptionName; + } + + bool isRetriable() const { + return state_->isRetriable; + } + + bool isUserError() const { + return state_->errorSource == error_source::kErrorSourceUser; + } + + const std::string& context() const { + return state_->context; + } + + const std::string& topLevelContext() const { + return state_->topLevelContext; + } + + const std::exception_ptr& wrappedException() const { + return state_->wrappedException; + } + + private: + struct State { + std::unique_ptr stackTrace; + Type exceptionType = Type::kSystem; + std::string exceptionName; + const char* file = nullptr; + size_t line = 0; + const char* function = nullptr; + std::string failingExpression; + std::string message; + std::string errorSource; + std::string errorCode; + // The current exception context. + std::string context; + // The top-level ancestor of the current exception context. + std::string topLevelContext; + bool isRetriable; + // The original std::exception. + std::exception_ptr wrappedException; + + mutable folly::once_flag once; + mutable std::string elaborateMessage; + + template + static std::shared_ptr make(Type exceptionType, F); + + template + static std::shared_ptr make(F f) { + auto state = std::make_shared(); + f(*state); + return state; + } + + void finalize() const; + + const char* what() const noexcept; + }; + + explicit CelebornException(std::shared_ptr state) noexcept + : state_(std::move(state)) {} + + const std::shared_ptr state_; +}; + +class CelebornUserError : public CelebornException { + public: + CelebornUserError( + const char* file, + size_t line, + const char* function, + std::string_view expression, + std::string_view message, + std::string_view /* errorSource */, + std::string_view errorCode, + bool isRetriable, + std::string_view exceptionName = "CelebornUserError") + : CelebornException( + file, + line, + function, + expression, + message, + error_source::kErrorSourceUser, + errorCode, + isRetriable, + Type::kUser, + exceptionName) {} + + /// Wrap an std::exception. + CelebornUserError( + const std::exception_ptr& e, + std::string_view message, + bool isRetriable, + std::string_view exceptionName = "CelebornUserError") + : CelebornException( + e, + message, + error_source::kErrorSourceUser, + error_code::kInvalidArgument, + isRetriable, + Type::kUser, + exceptionName) {} +}; + +class CelebornRuntimeError final : public CelebornException { + public: + CelebornRuntimeError( + const char* file, + size_t line, + const char* function, + std::string_view expression, + std::string_view message, + std::string_view /* errorSource */, + std::string_view errorCode, + bool isRetriable, + std::string_view exceptionName = "CelebornRuntimeError") + : CelebornException( + file, + line, + function, + expression, + message, + error_source::kErrorSourceRuntime, + errorCode, + isRetriable, + Type::kSystem, + exceptionName) {} + + /// Wrap an std::exception. + CelebornRuntimeError( + const std::exception_ptr& e, + std::string_view message, + bool isRetriable, + std::string_view exceptionName = "CelebornRuntimeError") + : CelebornException( + e, + message, + error_source::kErrorSourceRuntime, + isRetriable, + Type::kSystem, + exceptionName) {} +}; + +/// Returns a reference to a thread level counter of Celeborn error throws. +int64_t& threadNumCelebornThrow(); + +/// Holds a pointer to a function that provides addition context to be +/// added to the detailed error message in case of an exception. +struct ExceptionContext { + using MessageFunction = + std::string (*)(CelebornException::Type exceptionType, void* arg); + + /// Function to call in case of an exception to get additional context. + MessageFunction messageFunc{nullptr}; + + /// Value to pass to `messageFunc`. Can be null. + void* arg{nullptr}; + + /// Pointer to the parent context when there are hierarchical exception + /// contexts. + ExceptionContext* parent{nullptr}; + + /// Calls `messageFunc(arg)` and returns the result. Returns empty string if + /// `messageFunc` is null. + std::string message(CelebornException::Type exceptionType) { + if (!messageFunc || suspended) { + return ""; + } + + std::string theMessage; + + try { + // Make sure not to call messageFunc again in case it throws. + suspended = true; + theMessage = messageFunc(exceptionType, arg); + suspended = false; + } catch (...) { + return "Failed to produce additional context."; + } + + return theMessage; + } + + bool suspended{false}; +}; + +/// If exceptionPtr represents an std::exception, convert it to +/// CelebornUserError to add useful context for debugging. +std::exception_ptr toCelebornException(const std::exception_ptr& exceptionPtr); + +/// Returns a reference to thread_local variable that holds a function that can +/// be used to get addition context to be added to the detailed error message in +/// case an exception occurs. This is to used in cases when stack trace would +/// not provide enough information, e.g. in case of hierarchical processing like +/// expression evaluation. +ExceptionContext& getExceptionContext(); + +/// RAII class to set and restore context for exceptions. Links the new +/// exception context with the previous context held by the thread_local +/// variable to allow retrieving the top-level context when there is an +/// exception context hierarchy. +class ExceptionContextSetter { + public: + explicit ExceptionContextSetter(ExceptionContext value) + : prev_{getExceptionContext()} { + value.parent = &prev_; + getExceptionContext() = std::move(value); + } + + ~ExceptionContextSetter() { + getExceptionContext() = std::move(prev_); + } + + private: + ExceptionContext prev_; +}; +} // namespace celeborn diff --git a/cpp/celeborn/utils/flags.cpp b/cpp/celeborn/utils/flags.cpp new file mode 100644 index 0000000000..ab4b9ab9ec --- /dev/null +++ b/cpp/celeborn/utils/flags.cpp @@ -0,0 +1,49 @@ +/* + * Based on flags.cpp from Facebook Velox + * + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +/* Used in utils/CelebornException.cpp */ + +DEFINE_bool( + celeborn_exception_user_stacktrace_enabled, + false, + "Enable the stacktrace for user type of CelebornException"); + +DEFINE_bool( + celeborn_exception_system_stacktrace_enabled, + true, + "Enable the stacktrace for system type of CelebornException"); + +DEFINE_int32( + celeborn_exception_user_stacktrace_rate_limit_ms, + 0, // effectively turns off rate-limiting + "Min time interval in milliseconds between stack traces captured in" + " user type of CelebornException; off when set to 0 (the default)"); + +DEFINE_int32( + celeborn_exception_system_stacktrace_rate_limit_ms, + 0, // effectively turns off rate-limiting + "Min time interval in milliseconds between stack traces captured in" + " system type of CelebornException; off when set to 0 (the default)"); + +/* Used in utils/ProcessBase.cpp */ + +DEFINE_bool(celeborn_avx2, true, "Enables use of AVX2 when available"); + +DEFINE_bool(celeborn_bmi2, true, "Enables use of BMI2 when available");