Skip to content

Commit

Permalink
[irods#28] More repo reorganization
Browse files Browse the repository at this point in the history
  • Loading branch information
SwooshyCueb committed Nov 7, 2022
1 parent 41fd851 commit 900e069
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 187 deletions.
15 changes: 14 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,25 @@ find_package(Threads REQUIRED)
find_package(OpenSSL REQUIRED COMPONENTS Crypto SSL)
find_package(nlohmann_json "3.6.1" REQUIRED)

add_library(${PLUGIN} MODULE ${CMAKE_SOURCE_DIR}/src/main.cpp)
add_library(
${PLUGIN}
MODULE
"${CMAKE_CURRENT_SOURCE_DIR}/src/main.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/src/amqp_sender.cpp"
)
set(
${PLUGIN}_HEADERS
"${CMAKE_CURRENT_SOURCE_DIR}/include/irods/private/audit_amqp.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/include/irods/private/audit_b64enc.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/include/irods/private/amqp_sender.hpp"
)
target_sources(${PLUGIN} PRIVATE ${${PLUGIN}_HEADERS})
target_compile_options(${PLUGIN} PRIVATE -Wno-write-strings)
target_compile_definitions(${PLUGIN} PRIVATE ${IRODS_COMPILE_DEFINITIONS} ${IRODS_COMPILE_DEFINITIONS_PRIVATE} IRODS_ENABLE_SYSLOG)
target_include_directories(
${PLUGIN}
PRIVATE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
${IRODS_INCLUDE_DIRS}
${IRODS_EXTERNALS_FULLPATH_ARCHIVE}/include
${IRODS_EXTERNALS_FULLPATH_BOOST}/include
Expand Down
43 changes: 43 additions & 0 deletions include/irods/private/amqp_sender.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef IRODS_AUDIT_AMQP_SENDER_HPP
#define IRODS_AUDIT_AMQP_SENDER_HPP

#include "irods/private/audit_amqp.hpp"

#include <string>

#include <proton/connection.hpp>
#include <proton/connection_options.hpp>
#include <proton/container.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/timestamp.hpp>
#include <proton/tracker.hpp>
#include <proton/transport.hpp>
#include <proton/sender.hpp>
#include <proton/session.hpp>

namespace irods::plugin::rule_engine::audit_amqp
{
class send_handler : public proton::messaging_handler
{
public:
send_handler(const proton::message& _message, const std::string& _url);
void on_container_start(proton::container& _container) override;
void on_sendable(proton::sender& _sender) override;
void on_tracker_accept(proton::tracker& _tracker) override;
void on_tracker_reject(proton::tracker& _tracker) override;
void on_transport_error(proton::transport& _transport) override;
void on_connection_error(proton::connection& _connection) override;
void on_session_error(proton::session& _session) override;
void on_receiver_error(proton::receiver& _receiver) override;
void on_sender_error(proton::sender& _sender) override;
void on_error(const proton::error_condition& _err_cond) override;

private:
const std::string& _amqp_url;
const proton::message& _message;
bool _message_sent;
};
} //namespace irods::plugin::rule_engine::audit_amqp

#endif // IRODS_AUDIT_AMQP_SENDER_HPP
43 changes: 43 additions & 0 deletions include/irods/private/audit_amqp.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef IRODS_AUDIT_AMQP_MAIN_HPP
#define IRODS_AUDIT_AMQP_MAIN_HPP

#include <irods/irods_logger.hpp>

#include <chrono>
#include <string>

#include <boost/config.hpp>

namespace irods::plugin::rule_engine::audit_amqp
{
static inline constexpr const char* const rule_engine_name = "audit_amqp";

using log_re = irods::experimental::log::rule_engine;

#if __cpp_lib_chrono >= 201907
// we use millisecond precision in our timestamps, so we want to use a clock
// that does not implement leap seconds as repeated non-leap seconds, if we can.
using ts_clock = std::chrono::utc_clock;
#else
// fallback to system_clock
using ts_clock = std::chrono::system_clock;
#endif

template <class T>
static BOOST_FORCEINLINE void log_exception(
const T& exception,
const std::string& log_message,
const irods::experimental::log::key_value& context_info)
{
// clang-format off
log_re::info({
{"rule_engine_plugin", rule_engine_name},
{"log_message", log_message},
context_info,
{"exception", exception.what()},
});
// clang-format on
}
} //namespace irods::plugin::rule_engine::audit_amqp

#endif // IRODS_AUDIT_AMQP_MAIN_HPP
51 changes: 51 additions & 0 deletions include/irods/private/audit_b64enc.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#ifndef IRODS_AUDIT_AMQP_B64ENC_HPP
#define IRODS_AUDIT_AMQP_B64ENC_HPP

#include "irods/private/audit_amqp.hpp"

#include <string>

#include <boost/config.hpp>
#include <boost/archive/iterators/base64_from_binary.hpp>
#include <boost/archive/iterators/transform_width.hpp>

#include <nlohmann/json.hpp>

namespace irods::plugin::rule_engine::audit_amqp
{
static BOOST_FORCEINLINE void insert_as_string_or_base64(
nlohmann::json& json_obj,
const std::string& key,
const std::string& val,
const std::uint64_t& time_ms)
{
try {
json_obj[key] = nlohmann::json::parse("\"" + val + "\"");
}
catch (const nlohmann::json::exception&) {
using namespace boost::archive::iterators;
using b64enc = base64_from_binary<transform_width<std::string::const_iterator, 6, 8>>;

// encode into base64 string
std::string val_b64(b64enc(std::begin(val)), b64enc(std::end(val)));
val_b64.append((3 - val.length() % 3) % 3, '='); // add padding ='s

// new key for encoded value
const std::string key_b64 = key + "_b64";

json_obj[key_b64] = val_b64;

// clang-format off
log_re::debug({
{"rule_engine_plugin", rule_engine_name},
{"log_message", "Invalid UTF-8 encountered when adding element to message; added as base64"},
{"element_original_key", key},
{"element_key", key_b64},
{"message_timestamp", std::to_string(time_ms)},
});
// clang-format on
}
}
} //namespace irods::plugin::rule_engine::audit_amqp

#endif // IRODS_AUDIT_AMQP_B64ENC_HPP
94 changes: 94 additions & 0 deletions src/amqp_sender.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#include "irods/private/audit_amqp.hpp"
#include "irods/private/amqp_sender.hpp"

#include <boost/config.hpp>

namespace irods::plugin::rule_engine::audit_amqp
{
namespace
{
BOOST_FORCEINLINE void log_proton_error(const proton::error_condition& err_cond, const std::string& log_message)
{
// clang-format off
log_re::error({
{"rule_engine_plugin", rule_engine_name},
{"log_message", log_message},
{"error_condition::name", err_cond.name()},
{"error_condition::description", err_cond.description()},
{"error_condition::what", err_cond.what()}
});
// clang-format on
}
} // namespace

send_handler::send_handler(const proton::message& _message, const std::string& _url)
: _amqp_url(_url)
, _message(_message)
, _message_sent(false)
{
}

void send_handler::on_container_start(proton::container& _container)
{
proton::connection_options conn_opts;
_container.open_sender(_amqp_url, conn_opts);
}

void send_handler::on_sendable(proton::sender& _sender)
{
if (_sender.credit() && !_message_sent) {
_sender.send(_message);
_message_sent = true;
}
}

void send_handler::on_tracker_accept(proton::tracker& _tracker)
{
// we're only sending one message
// so we don't care about the credit system
// or tracking confirmed messages
if (_message_sent) {
_tracker.connection().close();
}
}

void send_handler::on_tracker_reject([[maybe_unused]] proton::tracker& _tracker)
{
// clang-format off
log_re::error({
{"rule_engine_plugin", rule_engine_name},
{"log_message", "AMQP server unexpectedly rejected message"}
});
// clang-format on
}

void send_handler::on_transport_error(proton::transport& _transport)
{
log_proton_error(_transport.error(), "Transport error in proton messaging handler");
}

void send_handler::on_connection_error(proton::connection& _connection)
{
log_proton_error(_connection.error(), "Connection error in proton messaging handler");
}

void send_handler::on_session_error(proton::session& _session)
{
log_proton_error(_session.error(), "Session error in proton messaging handler");
}

void send_handler::on_receiver_error(proton::receiver& _receiver)
{
log_proton_error(_receiver.error(), "Receiver error in proton messaging handler");
}

void send_handler::on_sender_error(proton::sender& _sender)
{
log_proton_error(_sender.error(), "Sender error in proton messaging handler");
}

void send_handler::on_error(const proton::error_condition& _err_cond)
{
log_proton_error(_err_cond, "Unknown error in proton messaging handler");
}
} //namespace irods::plugin::rule_engine::audit_amqp
Loading

0 comments on commit 900e069

Please sign in to comment.