From f0de03e78752c8da70d4fd38d2ae502d17850872 Mon Sep 17 00:00:00 2001 From: Phillip Davis Date: Wed, 8 Jun 2022 14:01:09 -0400 Subject: [PATCH] [#42,#73,#95,#102] Refactor to use nlohmann-json and qpid-cpp Also, complete clang-tidy and clang-format refactor Also also, removed of json wrapper tokens Co-authored-by: Markus Kitsinger (SwooshyCueb) --- .gitignore | 1 + CMakeLists.txt | 6 +- libirods_rule_engine_plugin-audit_amqp.cpp | 863 +++++++++++---------- packaging/audit_plugin_message_worker.py | 5 - packaging/message_broker_consumer.py | 5 - packaging/test_plugin_audit_amqp.py | 4 - 6 files changed, 459 insertions(+), 425 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..722d5e71d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.vscode diff --git a/CMakeLists.txt b/CMakeLists.txt index c66719101..3abf6527d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,7 +18,6 @@ include(IrodsRunpathDefaults) include(IrodsExternals) -IRODS_MACRO_CHECK_DEPENDENCY_SET_FULLPATH_ADD_TO_IRODS_PACKAGE_DEPENDENCIES_LIST(JANSSON jansson2.7-0) IRODS_MACRO_CHECK_DEPENDENCY_SET_FULLPATH_ADD_TO_IRODS_PACKAGE_DEPENDENCIES_LIST(QPID_PROTON qpid-proton0.36.0-1) string(REPLACE ";" ", " ${PLUGIN}_PACKAGE_DEPENDENCIES_STRING "${IRODS_PACKAGE_DEPENDENCIES_LIST}") @@ -47,7 +46,6 @@ target_include_directories( ${IRODS_EXTERNALS_FULLPATH_ARCHIVE}/include ${IRODS_EXTERNALS_FULLPATH_BOOST}/include ${IRODS_EXTERNALS_FULLPATH_FMT}/include - ${IRODS_EXTERNALS_FULLPATH_JANSSON}/include ${IRODS_EXTERNALS_FULLPATH_QPID_PROTON}/include ) @@ -56,8 +54,8 @@ target_link_libraries( PRIVATE irods_server irods_common - ${IRODS_EXTERNALS_FULLPATH_JANSSON}/lib/libjansson.so - ${IRODS_EXTERNALS_FULLPATH_QPID_PROTON}/lib/libqpid-proton.so + nlohmann_json::nlohmann_json + ${IRODS_EXTERNALS_FULLPATH_QPID_PROTON}/lib/libqpid-proton-cpp.so ${IRODS_EXTERNALS_FULLPATH_BOOST}/lib/libboost_regex.so ${IRODS_EXTERNALS_FULLPATH_BOOST}/lib/libboost_system.so dl diff --git a/libirods_rule_engine_plugin-audit_amqp.cpp b/libirods_rule_engine_plugin-audit_amqp.cpp index e25f72a75..94527efc6 100644 --- a/libirods_rule_engine_plugin-audit_amqp.cpp +++ b/libirods_rule_engine_plugin-audit_amqp.cpp @@ -1,414 +1,454 @@ -// =-=-=-=-=-=-=- // irods includes #include #include #include -#undef LIST +// LIST is #defined in irods/reconstants.hpp +// and is an enum entry in proton/type_id.hpp +#ifdef LIST +# undef LIST +#endif -// =-=-=-=-=-=-=- -// stl includes -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -// =-=-=-=-=-=-=- // boost includes #include +#include #include #include #include +#include +#include #include -// =-=-=-=-=-=-=- -// proton includes -#include -#include - -#include - -static std::string audit_pep_regex_to_match = "audit_.*"; -static std::string audit_amqp_topic = "irods_audit_messages"; -static std::string audit_amqp_location = "localhost:5672"; -static std::string audit_amqp_options = ""; -static std::string log_path_prefix = "/tmp"; -static bool test_mode = false; -static std::ofstream log_file_ofstream; - -static pn_messenger_t * messenger = nullptr; - -static std::mutex audit_plugin_mutex; - -// Insert the key arg into arg_map and storing the number of insertions of arg as the value. -// The value (number of insertions) is returned. -int insert_arg_into_counter_map(std::map& arg_map, const std::string& arg) { - std::map::iterator iter = arg_map.find(arg); - if (iter == arg_map.end()) { - arg_map.insert(std::make_pair(arg, 1)); - return 1; - } else { - iter->second = iter->second+1; - return iter->second; - } -} - -irods::error get_re_configs( - const std::string& _instance_name ) { - - try { - const auto& rule_engines = irods::get_server_property< const nlohmann::json& >(std::vector{ irods::KW_CFG_PLUGIN_CONFIGURATION, irods::KW_CFG_PLUGIN_TYPE_RULE_ENGINE } ); - for ( const auto& rule_engine : rule_engines ) { - const auto& inst_name = rule_engine.at( irods::KW_CFG_INSTANCE_NAME).get_ref(); - if ( inst_name == _instance_name ) { - if ( rule_engine.count( irods::KW_CFG_PLUGIN_SPECIFIC_CONFIGURATION) > 0 ) { - - const auto& plugin_spec_cfg = rule_engine.at(irods::KW_CFG_PLUGIN_SPECIFIC_CONFIGURATION); - - audit_pep_regex_to_match = plugin_spec_cfg.at("pep_regex_to_match").get(); - audit_amqp_topic = plugin_spec_cfg.at("amqp_topic").get(); - audit_amqp_location = plugin_spec_cfg.at("amqp_location").get(); - audit_amqp_options = plugin_spec_cfg.at("amqp_options").get(); - - // look for a test mode setting. if it doesn't exist just keep test_mode at false. - // if test_mode = true and log_path_prefix isn't set just leave the default - const auto test_mode_cfg = plugin_spec_cfg.find("test_mode"); - if (test_mode_cfg != plugin_spec_cfg.end()) { - const std::string& test_mode_str = test_mode_cfg->get_ref(); - test_mode = boost::iequals(test_mode_str, "true"); - if (test_mode) { - const auto log_path_prefix_cfg = plugin_spec_cfg.find("log_path_prefix"); - if (log_path_prefix_cfg != plugin_spec_cfg.end()) { - log_path_prefix = log_path_prefix_cfg->get(); - } - } - } - } - else { - rodsLog( - LOG_DEBUG, - "%s - using default configuration: regex - %s, topic - %s, location - %s", - audit_pep_regex_to_match.c_str(), - audit_amqp_topic.c_str(), - audit_amqp_location.c_str() ); - } - - return SUCCESS(); - } - } - } - catch (const boost::bad_any_cast& e) { - return ERROR(INVALID_ANY_CAST, e.what()); - } - catch (const std::out_of_range& e) { - return ERROR(KEY_NOT_FOUND, e.what()); - } - catch (const nlohmann::json::exception& e) { - return ERROR(SYS_LIBRARY_ERROR, fmt::format("[{}:{}] - [{}]", __func__, __LINE__, e.what())); - } - catch (const std::exception& e) { - return ERROR(SYS_INTERNAL_ERR, fmt::format("[{}:{}] - [{}]", __func__, __LINE__, e.what())); - } - catch (...) { - return ERROR(SYS_UNKNOWN_ERROR, fmt::format("[{}:{}] - an unknown error occurred", __func__, __LINE__)); - } - - std::stringstream msg; - msg << "failed to find configuration for audit_amqp plugin [" - << _instance_name << "]"; - rodsLog( LOG_ERROR, "%s", msg.str().c_str() ); - return ERROR( SYS_INVALID_INPUT_PARAM, msg.str() );; -} - - -irods::error start(irods::default_re_ctx& _u,const std::string& _instance_name) { - (void) _u; - - std::lock_guard lock(audit_plugin_mutex); - - irods::error ret = get_re_configs( _instance_name ); - if( !ret.ok() ) { - irods::log(PASS(ret)); - } - - messenger = pn_messenger(NULL); - pn_messenger_start(messenger); - pn_messenger_set_blocking(messenger, false); // do not block - - json_t* obj = json_object(); - if( !obj ) { - return ERROR(SYS_MALLOC_ERR, "json_object() failed"); - } - - struct timeval tv; - gettimeofday(&tv, NULL); - unsigned long time_ms = tv.tv_sec * 1000 + tv.tv_usec / 1000; - std::stringstream time_str; time_str << time_ms; - json_object_set(obj, "time_stamp", json_string(time_str.str().c_str())); - - char host_name[MAX_NAME_LEN]; - gethostname( host_name, MAX_NAME_LEN ); - json_object_set(obj, "hostname", json_string(host_name)); - - pid_t pid = getpid(); - std::stringstream pid_str; pid_str << pid; - json_object_set(obj, "pid", json_string(pid_str.str().c_str())); - - json_object_set(obj, "action", json_string("START")); - - std::string log_file; - if (test_mode) { - log_file = str(boost::format("%s/%06i.txt") % log_path_prefix % pid); - json_object_set(obj, "log_file", json_string(log_file.c_str())); - } - - char* tmp_buf = json_dumps( obj, JSON_INDENT( 0 ) ); - std::string msg_str = std::string("__BEGIN_JSON__") + std::string(tmp_buf) + std::string("__END_JSON__"); - - pn_message_t * message; - pn_data_t * body; - - message = pn_message(); - - std::string address = audit_amqp_location + "/" + audit_amqp_topic; - - pn_message_set_address(message, address.c_str()); - body = pn_message_body(message); - pn_data_put_string(body, pn_bytes(msg_str.length(), msg_str.c_str())); - pn_messenger_put(messenger, message); - pn_messenger_send(messenger, -1); - - pn_message_free(message); - - free(tmp_buf); - json_decref(obj); - - if (test_mode) { - //std::ofstream log_file_ofstream; - log_file_ofstream.open(log_file); - log_file_ofstream << msg_str << std::endl; - } - - return SUCCESS(); -} - -irods::error stop(irods::default_re_ctx& _u,const std::string& _instance_name) { - - std::lock_guard lock(audit_plugin_mutex); - - json_t* obj = json_object(); - if( !obj ) { - return ERROR(SYS_MALLOC_ERR, "json_object() failed"); - } - - struct timeval tv; - gettimeofday(&tv, NULL); - unsigned long time_ms = tv.tv_sec * 1000 + tv.tv_usec / 1000; - std::stringstream time_str; time_str << time_ms; - json_object_set(obj, "time_stamp", json_string(time_str.str().c_str())); - - char host_name[MAX_NAME_LEN]; - gethostname( host_name, MAX_NAME_LEN ); - json_object_set(obj, "hostname", json_string(host_name)); - - pid_t pid = getpid(); - std::stringstream pid_str; pid_str << pid; - json_object_set(obj, "pid", json_string(pid_str.str().c_str())); - - json_object_set(obj, "action", json_string("END")); - - std::string log_file; - if (test_mode) { - log_file = str(boost::format("%s/%06i.txt") % log_path_prefix % pid); - json_object_set(obj, "log_file", json_string(log_file.c_str())); - - } - - char* tmp_buf = json_dumps( obj, JSON_INDENT( 0 ) ); - std::string msg_str = std::string("__BEGIN_JSON__") + std::string(tmp_buf) + std::string("__END_JSON__"); - - pn_message_t * message; - pn_data_t * body; - - message = pn_message(); - std::string address = audit_amqp_location + "/" + audit_amqp_topic; - pn_message_set_address(message, address.c_str()); - - body = pn_message_body(message); - - pn_data_put_string(body, pn_bytes(msg_str.length(), msg_str.c_str())); - pn_messenger_put(messenger, message); - pn_messenger_send(messenger, -1); - - pn_message_free(message); - - free(tmp_buf); - json_decref(obj); - - pn_messenger_stop(messenger); - pn_messenger_free(messenger); +// proton-cpp includes +#include +#include +#include +#include +#include +#include - if (test_mode) { - log_file_ofstream << msg_str << std::endl; - log_file_ofstream.close(); - } +// nlohmann includes +#include - return SUCCESS(); -} +// fmt includes +#include +#include -irods::error rule_exists(irods::default_re_ctx&, const std::string& _rn, bool& _ret) { - - try { - boost::smatch matches; - boost::regex expr( audit_pep_regex_to_match ); - _ret = boost::regex_match( _rn, matches, expr ); - } - catch ( const boost::exception& _e ) { - std::string what = boost::diagnostic_information(_e); - return ERROR( - SYS_INTERNAL_ERR, - what.c_str() ); - } - - return SUCCESS(); -} +// stl includes +#include +#include +#include +#include +#include +#include +#include +#include +#include -irods::error list_rules(irods::default_re_ctx&, std::vector&) { - return SUCCESS(); -} +namespace +{ -irods::error exec_rule( - irods::default_re_ctx&, - const std::string& _rn, - std::list& _ps, - irods::callback _eff_hdlr) { - - std::lock_guard lock(audit_plugin_mutex); - - using namespace std::chrono; - - - // stores a counter of unique arg types - std::map arg_type_map; - - ruleExecInfo_t* rei = nullptr; - irods::error err = _eff_hdlr("unsafe_ms_ctx", &rei); - if(!err.ok()) { - return err; - } - - json_t* obj = json_object(); - if( !obj ) { - return ERROR( - SYS_MALLOC_ERR, - "json_object() failed"); - } - - struct timeval tv; - gettimeofday(&tv, NULL); - unsigned long time_ms = tv.tv_sec * 1000 + tv.tv_usec / 1000; - std::stringstream time_str; time_str << time_ms; - json_object_set( - obj, - "time_stamp", - json_string(time_str.str().c_str())); - - char host_name[MAX_NAME_LEN]; - gethostname( host_name, MAX_NAME_LEN ); - json_object_set( - obj, - "hostname", - json_string(host_name)); - - pid_t pid = getpid(); - std::stringstream pid_str; pid_str << pid; - json_object_set( - obj, - "pid", - json_string(pid_str.str().c_str())); - - json_object_set( - obj, - "rule_name", - json_string(_rn.c_str())); - - for( auto itr : _ps ) { - // The BytesBuf parameter should not be serialized because this commonly contains - // the entirety of the contents of files. These could be very big and cause the - // message broker to explode. - if (std::type_index(typeid(BytesBuf*)) == std::type_index(itr.type())) { - rodsLog(LOG_DEBUG9, "[{}:{}] - skipping serialization of BytesBuf parameter", - __FILE__, __LINE__); - continue; - } - - // serialize the parameter to a map - irods::re_serialization::serialized_parameter_t param; - irods::error ret = irods::re_serialization::serialize_parameter(itr, param); - if(!ret.ok()) { - rodsLog( - LOG_ERROR, - "unsupported argument for calling re rules from the rule language"); - continue; - } - - for( auto elem : param ) { - - size_t ctr = insert_arg_into_counter_map(arg_type_map, elem.first); - std::stringstream ctr_str; - ctr_str << ctr; - - std::string key = elem.first; - if (ctr > 1) { - key += "__"; - key += ctr_str.str(); - } - - json_object_set( - obj, - key.c_str(), - json_string(elem.second.c_str())); - - ++ctr; - ctr_str.clear(); - - } // for elem - } // for itr - - char* tmp_buf = json_dumps( obj, JSON_INDENT( 0 ) ); - std::string msg_str = std::string("__BEGIN_JSON__") + std::string(tmp_buf) + std::string("__END_JSON__"); - - pn_message_t * message; - pn_data_t * body; - - message = pn_message(); - - std::string address = audit_amqp_location + "/" + audit_amqp_topic; - - pn_message_set_address(message, address.c_str()); - body = pn_message_body(message); - pn_data_put_string(body, pn_bytes(msg_str.length(), msg_str.c_str())); - pn_messenger_put(messenger, message); - pn_messenger_send(messenger, -1); - - pn_message_free(message); - - free(tmp_buf); - json_decref(obj); - - if (test_mode) { - log_file_ofstream << msg_str << std::endl; - } - - return err; -} + // NOLINTBEGIN(cert-err58-cpp, cppcoreguidelines-avoid-non-const-global-variables) + std::string audit_pep_regex_to_match{"audit_.*"}; + std::string audit_amqp_topic{"irods_audit_messages"}; + std::string audit_amqp_location{"localhost:5672"}; + std::string audit_amqp_options; + std::string log_path_prefix{"/tmp"}; + bool test_mode = false; + std::ofstream log_file_ofstream; + + std::mutex audit_plugin_mutex; + // NOLINTEND(cert-err58-cpp, cppcoreguidelines-avoid-non-const-global-variables) + + // See qpid-cpp docs + // https://qpid.apache.org/releases/qpid-proton-0.36.0/proton/cpp/api/simple_send_8cpp-example.html + class send_handler : public proton::messaging_handler + { + public: + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + send_handler(const std::string& _message_body, const std::string& _location, const std::string& _topic) + : _amqp_location(_location) + , _amqp_topic(_topic) + , _message(_message_body) + , _message_sent(false) + { + } + + void on_container_start(proton::container& container) override + { + proton::connection_options conn_opts; + container.open_sender( + fmt::format(FMT_COMPILE("{0:s}/{1:s}"), _amqp_location, _amqp_topic), + conn_opts); + } + + void on_sendable(proton::sender& _sender) override + { + if (_sender.credit() && !_message_sent) { + _sender.send(_message); + _message_sent = true; + } + } + + void on_tracker_accept(proton::tracker& tracker) override + { + // we're only sending one message + // so we don't care about the credit system + // or tracking confirmed messages + if (_message_sent) { + tracker.connection().close(); + } + } + + private: + const std::string& _amqp_location; + const std::string& _amqp_topic; + proton::message _message; + bool _message_sent; + }; // class send_handler + + BOOST_FORCEINLINE void + insert_as_string_or_base64(nlohmann::json& json_obj, const std::string& key, const std::string& val) + { + try { + json_obj[key] = nlohmann::json::parse("\"" + val + "\""); + } + catch (const nlohmann::json::exception&) { + using namespace boost::archive::iterators; + using b64enc = base64_from_binary>; + + // encode into base64 string + std::string val_b64(b64enc(std::begin(val)), b64enc(std::end(val))); + val_b64.append((3 - val.length() % 3) % 3, '='); // add padding ='s + + // new key for encoded value + const std::string key_b64 = key + "_b64"; + + json_obj[key_b64] = val_b64; + + irods::log( + LOG_DEBUG, + fmt::format( + "[AUDIT] - Message with timestamp:[{}] had invalid UTF-8 in key:[{}] and was stored as base64 in key:[{}].", + json_obj.at("time_stamp").get_ref(), + key, + key_b64)); // irods::log + } + } + + // NOLINTNEXTLINE(readability-function-cognitive-complexity) + auto get_re_configs(const std::string& _instance_name) -> irods::error + { + try { + const auto& rule_engines = irods::get_server_property( + std::vector{irods::KW_CFG_PLUGIN_CONFIGURATION, irods::KW_CFG_PLUGIN_TYPE_RULE_ENGINE}); + for (const auto& rule_engine : rule_engines) { + const auto& inst_name = rule_engine.at(irods::KW_CFG_INSTANCE_NAME).get_ref(); + if (inst_name == _instance_name) { + if (rule_engine.count(irods::KW_CFG_PLUGIN_SPECIFIC_CONFIGURATION) > 0) { + const auto& plugin_spec_cfg = rule_engine.at(irods::KW_CFG_PLUGIN_SPECIFIC_CONFIGURATION); + + audit_pep_regex_to_match = plugin_spec_cfg.at("pep_regex_to_match").get(); + audit_amqp_topic = plugin_spec_cfg.at("amqp_topic").get(); + audit_amqp_location = plugin_spec_cfg.at("amqp_location").get(); + audit_amqp_options = plugin_spec_cfg.at("amqp_options").get(); + + // look for a test mode setting. if it doesn't exist just keep test_mode at false. + // if test_mode = true and log_path_prefix isn't set just leave the default + const auto test_mode_cfg = plugin_spec_cfg.find("test_mode"); + if (test_mode_cfg != plugin_spec_cfg.end()) { + const auto& test_mode_str = test_mode_cfg->get_ref(); + test_mode = boost::iequals(test_mode_str, "true"); + if (test_mode) { + const auto log_path_prefix_cfg = plugin_spec_cfg.find("log_path_prefix"); + if (log_path_prefix_cfg != plugin_spec_cfg.end()) { + log_path_prefix = log_path_prefix_cfg->get(); + } + } + } + } + else { + rodsLog( + LOG_DEBUG, + "%s - using default configuration: regex - %s, topic - %s, location - %s", + audit_pep_regex_to_match.c_str(), + audit_amqp_topic.c_str(), + audit_amqp_location.c_str()); + } + + return SUCCESS(); + } + } + } + catch (const std::out_of_range& e) { + return ERROR(KEY_NOT_FOUND, e.what()); + } + catch (const nlohmann::json::exception& e) { + return ERROR(SYS_LIBRARY_ERROR, fmt::format("[{}:{}] - [{}]", __func__, __LINE__, e.what())); + } + catch (const std::exception& e) { + return ERROR(SYS_INTERNAL_ERR, fmt::format("[{}:{}] - [{}]", __func__, __LINE__, e.what())); + } + catch (...) { + return ERROR(SYS_UNKNOWN_ERROR, fmt::format("[{}:{}] - an unknown error occurred", __func__, __LINE__)); + } + + std::stringstream msg; + msg << "failed to find configuration for audit_amqp plugin [" << _instance_name << "]"; + rodsLog(LOG_ERROR, "%s", msg.str().c_str()); + return ERROR(SYS_INVALID_INPUT_PARAM, msg.str()); + } + + auto start([[maybe_unused]] irods::default_re_ctx& _re_ctx, const std::string& _instance_name) -> irods::error + { + std::lock_guard lock(audit_plugin_mutex); + + irods::error ret = get_re_configs(_instance_name); + if (!ret.ok()) { + irods::log(PASS(ret)); + } + + nlohmann::json json_obj; + + struct timeval tv; + gettimeofday(&tv, NULL); + unsigned long time_ms = tv.tv_sec * 1000 + tv.tv_usec / 1000; + + char host_name[MAX_NAME_LEN]; + gethostname(host_name, MAX_NAME_LEN); + + pid_t pid = getpid(); + + std::string msg_str; + std::string log_file; + + try { + json_obj["time_stamp"] = std::to_string(time_ms); + json_obj["hostname"] = host_name; + json_obj["pid"] = std::to_string(pid); + json_obj["action"] = "START"; + + if (test_mode) { + log_file = str(boost::format("%s/%06i.txt") % log_path_prefix % pid); + json_obj["log_file"] = log_file; + } + } + catch (const irods::exception& e) { + rodsLog(LOG_ERROR, e.client_display_what()); + return ERROR(e.code(), fmt::format("[{}:{}] - [{}]", __func__, __LINE__, e.what())); + } + catch (const nlohmann::json::exception& e) { + const std::string msg = fmt::format("[{}:{}] - [{}]", __func__, __LINE__, e.what()); + rodsLog(LOG_ERROR, msg.data()); + return ERROR(SYS_LIBRARY_ERROR, msg); + } + catch (const std::exception& e) { + rodsLog(LOG_ERROR, e.what()); + return ERROR(SYS_INTERNAL_ERR, fmt::format("[{}:{}] - [{}]", __func__, __LINE__, e.what())); + } + catch (...) { + return ERROR(SYS_UNKNOWN_ERROR, fmt::format("[{}:{}] - unknown error occurred", __func__, __LINE__)); + } + + msg_str = json_obj.dump(); + send_handler s(msg_str, audit_amqp_location, audit_amqp_topic); // NOLINT(readability-identifier-length) + proton::container(s).run(); + + if (test_mode) { + log_file_ofstream.open(log_file); + log_file_ofstream << msg_str << std::endl; + } + + return SUCCESS(); + } + + auto stop([[maybe_unused]] irods::default_re_ctx& _re_ctx, [[maybe_unused]] const std::string& _instance_name) + -> irods::error + { + std::lock_guard lock(audit_plugin_mutex); + + nlohmann::json json_obj; + + std::string msg_str; + std::string log_file; + + try { + struct timeval tv; + gettimeofday(&tv, NULL); + unsigned long time_ms = tv.tv_sec * 1000 + tv.tv_usec / 1000; + json_obj["time_stamp"] = std::to_string(time_ms); + + char host_name[MAX_NAME_LEN]; + gethostname(host_name, MAX_NAME_LEN); + json_obj["hostname"] = host_name; + + pid_t pid = getpid(); + json_obj["pid"] = std::to_string(pid); + + json_obj["action"] = "STOP"; + + if (test_mode) { + json_obj["log_file"] = str(boost::format("%s/%06i.txt") % log_path_prefix % pid); + } + } + catch (const irods::exception& e) { + rodsLog(LOG_ERROR, e.client_display_what()); + return ERROR(e.code(), fmt::format("[{}:{}] - [{}]", __func__, __LINE__, e.what())); + } + catch (const nlohmann::json::exception& e) { + const std::string msg = fmt::format("[{}:{}] - [{}]", __func__, __LINE__, e.what()); + rodsLog(LOG_ERROR, msg.data()); + return ERROR(SYS_LIBRARY_ERROR, msg); + } + catch (const std::exception& e) { + rodsLog(LOG_ERROR, e.what()); + return ERROR(SYS_INTERNAL_ERR, fmt::format("[{}:{}] - [{}]", __func__, __LINE__, e.what())); + } + catch (...) { + return ERROR(SYS_UNKNOWN_ERROR, fmt::format("[{}:{}] - unknown error occurred", __func__, __LINE__)); + } + + msg_str = json_obj.dump(); + send_handler s(msg_str, audit_amqp_location, audit_amqp_topic); // NOLINT(readability-identifier-length) + proton::container(s).run(); + + if (test_mode) { + log_file_ofstream << msg_str << std::endl; + log_file_ofstream.close(); + } + + return SUCCESS(); + } + + auto rule_exists([[maybe_unused]] irods::default_re_ctx& _re_ctx, const std::string& _rn, bool& _ret) + -> irods::error + { + try { + boost::smatch matches; + boost::regex expr(audit_pep_regex_to_match); + _ret = boost::regex_match(_rn, matches, expr); + } + catch (const boost::exception& _e) { + std::string what = boost::diagnostic_information(_e); + return ERROR(SYS_INTERNAL_ERR, what); + } + + return SUCCESS(); + } + + auto list_rules([[maybe_unused]] irods::default_re_ctx& _re_ctx, [[maybe_unused]] std::vector& _rules) + -> irods::error + { + return SUCCESS(); + } + + auto exec_rule( + [[maybe_unused]] irods::default_re_ctx& _re_ctx, + const std::string& _rn, + std::list& _ps, + irods::callback _eff_hdlr) -> irods::error + { + std::lock_guard lock(audit_plugin_mutex); + + // stores a counter of unique arg types + std::map arg_type_map; + + ruleExecInfo_t* rei = nullptr; + irods::error err = _eff_hdlr("unsafe_ms_ctx", &rei); + if (!err.ok()) { + return err; + } + + nlohmann::json json_obj; + + std::string msg_str; + std::string log_file; + + try { + struct timeval tv; + gettimeofday(&tv, NULL); + unsigned long time_ms = tv.tv_sec * 1000 + tv.tv_usec / 1000; + json_obj["time_stamp"] = std::to_string(time_ms); + + char host_name[MAX_NAME_LEN]; + gethostname(host_name, MAX_NAME_LEN); + json_obj["hostname"] = host_name; + + pid_t pid = getpid(); + json_obj["pid"] = std::to_string(pid); + + json_obj["rule_name"] = _rn; + + for (const auto& itr : _ps) { + // The BytesBuf parameter should not be serialized because this commonly contains + // the entirety of the contents of files. These could be very big and cause the + // message broker to explode. + if (std::type_index(typeid(BytesBuf*)) == std::type_index(itr.type())) { + rodsLog(LOG_DEBUG9, "[{}:{}] - skipping serialization of BytesBuf parameter", __FILE__, __LINE__); + continue; + } + + // serialize the parameter to a map + irods::re_serialization::serialized_parameter_t param; + irods::error ret = irods::re_serialization::serialize_parameter(itr, param); + if (!ret.ok()) { + rodsLog(LOG_ERROR, "unsupported argument for calling re rules from the rule language"); + continue; + } + + for (const auto& elem : param) { + const std::string& arg = elem.first; + + std::size_t ctr; + const auto iter = arg_type_map.find(arg); + if (iter == arg_type_map.end()) { + arg_type_map.insert(std::make_pair(arg, static_cast(1))); + ctr = 1; + } + else { + ctr = iter->second + 1; + iter->second = ctr; + } + + if (ctr > 1) { + const std::string key = fmt::format(FMT_COMPILE("{0:s}__{1:d}"), arg, ctr); + insert_as_string_or_base64(json_obj, key, elem.second); + } + else { + insert_as_string_or_base64(json_obj, arg, elem.second); + } + } + } + } + catch (const irods::exception& e) { + rodsLog(LOG_ERROR, e.client_display_what()); + return ERROR(e.code(), fmt::format("[{}:{}] - [{}]", __func__, __LINE__, e.what())); + } + catch (const nlohmann::json::exception& e) { + const std::string msg = fmt::format("[{}:{}] - [{}]", __func__, __LINE__, e.what()); + rodsLog(LOG_ERROR, msg.data()); + return ERROR(SYS_LIBRARY_ERROR, msg); + } + catch (const std::exception& e) { + rodsLog(LOG_ERROR, e.what()); + return ERROR(SYS_INTERNAL_ERR, fmt::format("[{}:{}] - [{}]", __func__, __LINE__, e.what())); + } + catch (...) { + return ERROR(SYS_UNKNOWN_ERROR, fmt::format("[{}:{}] - unknown error occurred", __func__, __LINE__)); + } + + msg_str = json_obj.dump(); + send_handler s(msg_str, audit_amqp_location, audit_amqp_topic); // NOLINT(readability-identifier-length) + proton::container(s).run(); + + if (test_mode) { + log_file_ofstream << msg_str << std::endl; + } + + return err; + } + +} // namespace // // Plugin Factory @@ -416,28 +456,37 @@ irods::error exec_rule( using pluggable_rule_engine = irods::pluggable_rule_engine; -extern "C" -auto plugin_factory(const std::string& _inst_name, const std::string& _context) -> pluggable_rule_engine* +extern "C" auto plugin_factory(const std::string& _inst_name, const std::string& _context) -> pluggable_rule_engine* { - const auto not_supported = [](auto&&...) { return ERROR(SYS_NOT_SUPPORTED, "Not supported."); }; + const auto not_supported = [](auto&&...) { return ERROR(SYS_NOT_SUPPORTED, "Not supported."); }; - auto* re = new irods::pluggable_rule_engine( _inst_name , _context); + auto* rule_engine = new irods::pluggable_rule_engine(_inst_name, _context); - re->add_operation("start", std::function(start)); + rule_engine->add_operation("start", std::function(start)); - re->add_operation("stop", std::function(stop)); + rule_engine->add_operation("stop", std::function(stop)); - re->add_operation("rule_exists", std::function(rule_exists)); + rule_engine->add_operation( + "rule_exists", std::function(rule_exists)); - re->add_operation("list_rules", std::function&)>(list_rules)); + rule_engine->add_operation( + "list_rules", std::function&)>(list_rules)); - re->add_operation("exec_rule", std::function&, irods::callback)>(exec_rule)); + rule_engine->add_operation( + "exec_rule", + std::function&, irods::callback)>(exec_rule)); - re->add_operation("exec_rule_text", std::function(not_supported)); + rule_engine->add_operation( + "exec_rule_text", + std::function( + not_supported)); - re->add_operation("exec_rule_expression", std::function(not_supported)); - - return re; + rule_engine->add_operation( + "exec_rule_expression", + std::function( + not_supported)); + return rule_engine; } - diff --git a/packaging/audit_plugin_message_worker.py b/packaging/audit_plugin_message_worker.py index 91d448182..1e209e457 100644 --- a/packaging/audit_plugin_message_worker.py +++ b/packaging/audit_plugin_message_worker.py @@ -28,11 +28,6 @@ def run(self): for line in pid_file: new_line = line - if "__BEGIN_JSON__" in new_line: - new_line = new_line.split("__BEGIN_JSON__")[1] - if "__END_JSON__" in new_line: - new_line = new_line.split("__END_JSON__")[0] - json_str = json.loads(new_line) next_str = json.loads(next_task[count]) diff --git a/packaging/message_broker_consumer.py b/packaging/message_broker_consumer.py index ffc849301..768f028b1 100644 --- a/packaging/message_broker_consumer.py +++ b/packaging/message_broker_consumer.py @@ -27,11 +27,6 @@ def on_message(self, event): message = event.message.body pid = 0 - if "__BEGIN_JSON__" in message: - message = message.split("__BEGIN_JSON__")[1] - if "__END_JSON__" in message: - message = message.split("__END_JSON__")[0] - json_data = json.loads(message) if 'pid' in json_data: diff --git a/packaging/test_plugin_audit_amqp.py b/packaging/test_plugin_audit_amqp.py index 871ff891e..43f712bef 100644 --- a/packaging/test_plugin_audit_amqp.py +++ b/packaging/test_plugin_audit_amqp.py @@ -26,10 +26,6 @@ def on_error(self, headers, message): print('received an error "%s"' % message) def on_message(self, headers, message): - if "__BEGIN_JSON__" in message: - message = message.split("__BEGIN_JSON__")[1] - if "__END_JSON__" in message: - message = message.split("__END_JSON__")[0] rule_name = json.loads(message)['rule_name'] # print("%s" % rule_name)