diff --git a/CMakeLists.txt b/CMakeLists.txt index 7f6c5f5aa..be5061d5c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,12 +46,25 @@ find_package(Threads REQUIRED) find_package(OpenSSL REQUIRED COMPONENTS Crypto SSL) find_package(nlohmann_json "3.6.1" REQUIRED) -add_library(${PLUGIN} MODULE ${CMAKE_SOURCE_DIR}/src/main.cpp) +add_library( + ${PLUGIN} + MODULE + "${CMAKE_CURRENT_SOURCE_DIR}/src/main.cpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/amqp_sender.cpp" + ) +set( + ${PLUGIN}_HEADERS + "${CMAKE_CURRENT_SOURCE_DIR}/include/irods/private/audit_amqp.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/include/irods/private/audit_b64enc.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/include/irods/private/amqp_sender.hpp" + ) +target_sources(${PLUGIN} PRIVATE ${${PLUGIN}_HEADERS}) target_compile_options(${PLUGIN} PRIVATE -Wno-write-strings) target_compile_definitions(${PLUGIN} PRIVATE ${IRODS_COMPILE_DEFINITIONS} ${IRODS_COMPILE_DEFINITIONS_PRIVATE} IRODS_ENABLE_SYSLOG) target_include_directories( ${PLUGIN} PRIVATE + $ ${IRODS_INCLUDE_DIRS} ${IRODS_EXTERNALS_FULLPATH_ARCHIVE}/include ${IRODS_EXTERNALS_FULLPATH_BOOST}/include diff --git a/include/irods/private/amqp_sender.hpp b/include/irods/private/amqp_sender.hpp new file mode 100644 index 000000000..f41ba25fb --- /dev/null +++ b/include/irods/private/amqp_sender.hpp @@ -0,0 +1,43 @@ +#ifndef IRODS_AUDIT_AMQP_SENDER_HPP +#define IRODS_AUDIT_AMQP_SENDER_HPP + +#include "irods/private/audit_amqp.hpp" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace irods::plugin::rule_engine::audit_amqp +{ + class send_handler : public proton::messaging_handler + { + public: + send_handler(const proton::message& _message, const std::string& _url); + void on_container_start(proton::container& _container) override; + void on_sendable(proton::sender& _sender) override; + void on_tracker_accept(proton::tracker& _tracker) override; + void on_tracker_reject(proton::tracker& _tracker) override; + void on_transport_error(proton::transport& _transport) override; + void on_connection_error(proton::connection& _connection) override; + void on_session_error(proton::session& _session) override; + void on_receiver_error(proton::receiver& _receiver) override; + void on_sender_error(proton::sender& _sender) override; + void on_error(const proton::error_condition& _err_cond) override; + + private: + const std::string& _amqp_url; + const proton::message& _message; + bool _message_sent; + }; +} //namespace irods::plugin::rule_engine::audit_amqp + +#endif // IRODS_AUDIT_AMQP_SENDER_HPP diff --git a/include/irods/private/audit_amqp.hpp b/include/irods/private/audit_amqp.hpp new file mode 100644 index 000000000..2ad38b8ab --- /dev/null +++ b/include/irods/private/audit_amqp.hpp @@ -0,0 +1,43 @@ +#ifndef IRODS_AUDIT_AMQP_MAIN_HPP +#define IRODS_AUDIT_AMQP_MAIN_HPP + +#include + +#include +#include + +#include + +namespace irods::plugin::rule_engine::audit_amqp +{ + static inline constexpr const char* const rule_engine_name = "audit_amqp"; + + using log_re = irods::experimental::log::rule_engine; + +#if __cpp_lib_chrono >= 201907 + // we use millisecond precision in our timestamps, so we want to use a clock + // that does not implement leap seconds as repeated non-leap seconds, if we can. + using ts_clock = std::chrono::utc_clock; +#else + // fallback to system_clock + using ts_clock = std::chrono::system_clock; +#endif + + template + static BOOST_FORCEINLINE void log_exception( + const T& exception, + const std::string& log_message, + const irods::experimental::log::key_value& context_info) + { + // clang-format off + log_re::info({ + {"rule_engine_plugin", rule_engine_name}, + {"log_message", log_message}, + context_info, + {"exception", exception.what()}, + }); + // clang-format on + } +} //namespace irods::plugin::rule_engine::audit_amqp + +#endif // IRODS_AUDIT_AMQP_MAIN_HPP diff --git a/include/irods/private/audit_b64enc.hpp b/include/irods/private/audit_b64enc.hpp new file mode 100644 index 000000000..6b26915d1 --- /dev/null +++ b/include/irods/private/audit_b64enc.hpp @@ -0,0 +1,51 @@ +#ifndef IRODS_AUDIT_AMQP_B64ENC_HPP +#define IRODS_AUDIT_AMQP_B64ENC_HPP + +#include "irods/private/audit_amqp.hpp" + +#include + +#include +#include +#include + +#include + +namespace irods::plugin::rule_engine::audit_amqp +{ + static BOOST_FORCEINLINE void insert_as_string_or_base64( + nlohmann::json& json_obj, + const std::string& key, + const std::string& val, + const std::uint64_t& time_ms) + { + try { + json_obj[key] = nlohmann::json::parse("\"" + val + "\""); + } + catch (const nlohmann::json::exception&) { + using namespace boost::archive::iterators; + using b64enc = base64_from_binary>; + + // encode into base64 string + std::string val_b64(b64enc(std::begin(val)), b64enc(std::end(val))); + val_b64.append((3 - val.length() % 3) % 3, '='); // add padding ='s + + // new key for encoded value + const std::string key_b64 = key + "_b64"; + + json_obj[key_b64] = val_b64; + + // clang-format off + log_re::debug({ + {"rule_engine_plugin", rule_engine_name}, + {"log_message", "Invalid UTF-8 encountered when adding element to message; added as base64"}, + {"element_original_key", key}, + {"element_key", key_b64}, + {"message_timestamp", std::to_string(time_ms)}, + }); + // clang-format on + } + } +} //namespace irods::plugin::rule_engine::audit_amqp + +#endif // IRODS_AUDIT_AMQP_B64ENC_HPP diff --git a/src/amqp_sender.cpp b/src/amqp_sender.cpp new file mode 100644 index 000000000..d7254bcfe --- /dev/null +++ b/src/amqp_sender.cpp @@ -0,0 +1,94 @@ +#include "irods/private/audit_amqp.hpp" +#include "irods/private/amqp_sender.hpp" + +#include + +namespace irods::plugin::rule_engine::audit_amqp +{ + namespace + { + BOOST_FORCEINLINE void log_proton_error(const proton::error_condition& err_cond, const std::string& log_message) + { + // clang-format off + log_re::error({ + {"rule_engine_plugin", rule_engine_name}, + {"log_message", log_message}, + {"error_condition::name", err_cond.name()}, + {"error_condition::description", err_cond.description()}, + {"error_condition::what", err_cond.what()} + }); + // clang-format on + } + } // namespace + + send_handler::send_handler(const proton::message& _message, const std::string& _url) + : _amqp_url(_url) + , _message(_message) + , _message_sent(false) + { + } + + void send_handler::on_container_start(proton::container& _container) + { + proton::connection_options conn_opts; + _container.open_sender(_amqp_url, conn_opts); + } + + void send_handler::on_sendable(proton::sender& _sender) + { + if (_sender.credit() && !_message_sent) { + _sender.send(_message); + _message_sent = true; + } + } + + void send_handler::on_tracker_accept(proton::tracker& _tracker) + { + // we're only sending one message + // so we don't care about the credit system + // or tracking confirmed messages + if (_message_sent) { + _tracker.connection().close(); + } + } + + void send_handler::on_tracker_reject([[maybe_unused]] proton::tracker& _tracker) + { + // clang-format off + log_re::error({ + {"rule_engine_plugin", rule_engine_name}, + {"log_message", "AMQP server unexpectedly rejected message"} + }); + // clang-format on + } + + void send_handler::on_transport_error(proton::transport& _transport) + { + log_proton_error(_transport.error(), "Transport error in proton messaging handler"); + } + + void send_handler::on_connection_error(proton::connection& _connection) + { + log_proton_error(_connection.error(), "Connection error in proton messaging handler"); + } + + void send_handler::on_session_error(proton::session& _session) + { + log_proton_error(_session.error(), "Session error in proton messaging handler"); + } + + void send_handler::on_receiver_error(proton::receiver& _receiver) + { + log_proton_error(_receiver.error(), "Receiver error in proton messaging handler"); + } + + void send_handler::on_sender_error(proton::sender& _sender) + { + log_proton_error(_sender.error(), "Sender error in proton messaging handler"); + } + + void send_handler::on_error(const proton::error_condition& _err_cond) + { + log_proton_error(_err_cond, "Unknown error in proton messaging handler"); + } +} //namespace irods::plugin::rule_engine::audit_amqp diff --git a/src/main.cpp b/src/main.cpp index e8e80364b..c810409b0 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,4 +1,7 @@ // irods includes +#include "irods/private/audit_amqp.hpp" +#include "irods/private/audit_b64enc.hpp" +#include "irods/private/amqp_sender.hpp" #include #include #include @@ -60,194 +63,39 @@ namespace fs = boost::filesystem; #endif // clang-format on -namespace +namespace irods::plugin::rule_engine::audit_amqp { - const auto pep_regex_flavor = std::regex::ECMAScript; - - // NOLINTBEGIN(cert-err58-cpp, cppcoreguidelines-avoid-non-const-global-variables) - const std::string_view default_pep_regex_to_match{"audit_.*"}; - const std::string_view default_amqp_url{"localhost:5672/irods_audit_messages"}; - - const fs::path default_log_path_prefix{fs::temp_directory_path()}; - const bool default_test_mode = false; - - std::string audit_pep_regex_to_match; - std::string audit_amqp_url; - - fs::path log_path_prefix; - bool test_mode; - - bool warned_amqp_options = false; - - fs::path log_file_path; - std::ofstream log_file_ofstream; - - // audit_pep_regex is initially populated with an unoptimized default, as optimization - // makes construction slower, and we don't expect it to be used before configuration is read. - std::regex audit_pep_regex{audit_pep_regex_to_match, pep_regex_flavor}; - - std::mutex audit_plugin_mutex; - // NOLINTEND(cert-err58-cpp, cppcoreguidelines-avoid-non-const-global-variables) - - const char* const rule_engine_name = "audit_amqp"; - - using log_re = irods::experimental::log::rule_engine; - -#if __cpp_lib_chrono >= 201907 - // we use millisecond precision in our timestamps, so we want to use a clock - // that does not implement leap seconds as repeated non-leap seconds, if we can. - using ts_clock = std::chrono::utc_clock; -#else - // fallback to system_clock - using ts_clock = std::chrono::system_clock; -#endif - - BOOST_FORCEINLINE void log_proton_error(const proton::error_condition& err_cond, const std::string& log_message) + namespace { - // clang-format off - log_re::error({ - {"rule_engine_plugin", rule_engine_name}, - {"log_message", log_message}, - {"error_condition::name", err_cond.name()}, - {"error_condition::description", err_cond.description()}, - {"error_condition::what", err_cond.what()} - }); - // clang-format on - } + const auto pep_regex_flavor = std::regex::ECMAScript; - // See qpid-cpp docs - // https://qpid.apache.org/releases/qpid-proton-0.36.0/proton/cpp/api/simple_send_8cpp-example.html - class send_handler : public proton::messaging_handler - { - public: - send_handler(const proton::message& _message, const std::string& _url) - : _amqp_url(_url) - , _message(_message) - , _message_sent(false) - { - } + // NOLINTBEGIN(cert-err58-cpp, cppcoreguidelines-avoid-non-const-global-variables) + const std::string_view default_pep_regex_to_match{"audit_.*"}; + const std::string_view default_amqp_url{"localhost:5672/irods_audit_messages"}; - void on_container_start(proton::container& container) override - { - proton::connection_options conn_opts; - container.open_sender(_amqp_url, conn_opts); - } + const fs::path default_log_path_prefix{fs::temp_directory_path()}; + const bool default_test_mode = false; - void on_sendable(proton::sender& _sender) override - { - if (_sender.credit() && !_message_sent) { - _sender.send(_message); - _message_sent = true; - } - } + std::string audit_pep_regex_to_match; + std::string audit_amqp_url; - void on_tracker_accept(proton::tracker& tracker) override - { - // we're only sending one message - // so we don't care about the credit system - // or tracking confirmed messages - if (_message_sent) { - tracker.connection().close(); - } - } + fs::path log_path_prefix; + bool test_mode; - void on_tracker_reject([[maybe_unused]] proton::tracker& _tracker) override - { - // clang-format off - log_re::error({ - {"rule_engine_plugin", rule_engine_name}, - {"log_message", "AMQP server unexpectedly rejected message"} - }); - // clang-format on - } + bool warned_amqp_options = false; - void on_transport_error(proton::transport& _transport) override - { - log_proton_error(_transport.error(), "Transport error in proton messaging handler"); - } + fs::path log_file_path; + std::ofstream log_file_ofstream; - void on_connection_error(proton::connection& _connection) override - { - log_proton_error(_connection.error(), "Connection error in proton messaging handler"); - } + // audit_pep_regex is initially populated with an unoptimized default, as optimization + // makes construction slower, and we don't expect it to be used before configuration is read. + std::regex audit_pep_regex{audit_pep_regex_to_match, pep_regex_flavor}; - void on_session_error(proton::session& _session) override - { - log_proton_error(_session.error(), "Session error in proton messaging handler"); - } - - void on_receiver_error(proton::receiver& _receiver) override - { - log_proton_error(_receiver.error(), "Receiver error in proton messaging handler"); - } - - void on_sender_error(proton::sender& _sender) override - { - log_proton_error(_sender.error(), "Sender error in proton messaging handler"); - } - - void on_error(const proton::error_condition& err_cond) override - { - log_proton_error(err_cond, "Unknown error in proton messaging handler"); - } + std::mutex audit_plugin_mutex; + // NOLINTEND(cert-err58-cpp, cppcoreguidelines-avoid-non-const-global-variables) + } // namespace - private: - const std::string& _amqp_url; - const proton::message& _message; - bool _message_sent; - }; // class send_handler - - template - BOOST_FORCEINLINE void log_exception( - const T& exception, - const std::string& log_message, - const irods::experimental::log::key_value& context_info) - { - // clang-format off - log_re::info({ - {"rule_engine_plugin", rule_engine_name}, - {"log_message", log_message}, - context_info, - {"exception", exception.what()}, - }); - // clang-format on - } - - BOOST_FORCEINLINE void insert_as_string_or_base64( - nlohmann::json& json_obj, - const std::string& key, - const std::string& val, - const std::uint64_t& time_ms) - { - try { - json_obj[key] = nlohmann::json::parse("\"" + val + "\""); - } - catch (const nlohmann::json::exception&) { - using namespace boost::archive::iterators; - using b64enc = base64_from_binary>; - - // encode into base64 string - std::string val_b64(b64enc(std::begin(val)), b64enc(std::end(val))); - val_b64.append((3 - val.length() % 3) % 3, '='); // add padding ='s - - // new key for encoded value - const std::string key_b64 = key + "_b64"; - - json_obj[key_b64] = val_b64; - - // clang-format off - log_re::debug({ - {"rule_engine_plugin", rule_engine_name}, - {"log_message", "Invalid UTF-8 encountered when adding element to message; added as base64"}, - {"element_original_key", key}, - {"element_key", key_b64}, - {"message_timestamp", std::to_string(time_ms)}, - }); - // clang-format on - } - } - - BOOST_FORCEINLINE void set_default_configs() + static BOOST_FORCEINLINE void set_default_configs() { audit_pep_regex_to_match = default_pep_regex_to_match; audit_amqp_url = default_amqp_url; @@ -257,7 +105,7 @@ namespace audit_pep_regex = std::regex(audit_pep_regex_to_match, pep_regex_flavor | std::regex::optimize); } - auto get_re_configs(const std::string& _instance_name) -> irods::error + static auto get_re_configs(const std::string& _instance_name) -> irods::error { try { const auto& rule_engines = irods::get_server_property( @@ -343,7 +191,8 @@ namespace return ERROR(SYS_INVALID_INPUT_PARAM, "failed to find plugin configuration"); } - auto start([[maybe_unused]] irods::default_re_ctx& _re_ctx, const std::string& _instance_name) -> irods::error + static auto start([[maybe_unused]] irods::default_re_ctx& _re_ctx, const std::string& _instance_name) + -> irods::error { std::lock_guard lock(audit_plugin_mutex); @@ -412,7 +261,7 @@ namespace return SUCCESS(); } - auto stop([[maybe_unused]] irods::default_re_ctx& _re_ctx, const std::string& _instance_name) -> irods::error + static auto stop([[maybe_unused]] irods::default_re_ctx& _re_ctx, const std::string& _instance_name) -> irods::error { std::lock_guard lock(audit_plugin_mutex); @@ -468,7 +317,7 @@ namespace return SUCCESS(); } - auto rule_exists([[maybe_unused]] irods::default_re_ctx& _re_ctx, const std::string& _rn, bool& _ret) + static auto rule_exists([[maybe_unused]] irods::default_re_ctx& _re_ctx, const std::string& _rn, bool& _ret) -> irods::error { try { @@ -485,13 +334,14 @@ namespace return SUCCESS(); } - auto list_rules([[maybe_unused]] irods::default_re_ctx& _re_ctx, [[maybe_unused]] std::vector& _rules) - -> irods::error + static auto list_rules( + [[maybe_unused]] irods::default_re_ctx& _re_ctx, + [[maybe_unused]] std::vector& _rules) -> irods::error { return SUCCESS(); } - auto exec_rule( + static auto exec_rule( [[maybe_unused]] irods::default_re_ctx& _re_ctx, const std::string& _rn, std::list& _ps, @@ -607,8 +457,7 @@ namespace return err; } - -} // namespace +} // namespace irods::plugin::rule_engine::audit_amqp // // Plugin Factory @@ -618,6 +467,8 @@ using pluggable_rule_engine = irods::pluggable_rule_engine pluggable_rule_engine* { + using namespace irods::plugin::rule_engine::audit_amqp; + set_default_configs(); const auto not_supported = [](auto&&...) { return ERROR(SYS_NOT_SUPPORTED, "Not supported."); };