Skip to content

Commit

Permalink
Merge pull request #1265 from trws/qmanager-stats
Browse files Browse the repository at this point in the history
qmanager: add get-stats and clear-stats callbacks
  • Loading branch information
mergify[bot] authored Aug 13, 2024
2 parents 8b2cb13 + fdf1038 commit 5aa005b
Show file tree
Hide file tree
Showing 20 changed files with 405 additions and 25,603 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pkg_check_modules(UUID REQUIRED IMPORTED_TARGET uuid)
set(Boost_USE_STATIC_LIBS OFF)
set(Boost_USE_MULTITHREADED ON)
set(Boost_USE_STATIC_RUNTIME OFF)
find_package(Boost 1.50 REQUIRED COMPONENTS
find_package(Boost 1.66 REQUIRED COMPONENTS
system
filesystem
graph
Expand Down
1 change: 1 addition & 0 deletions qmanager/modules/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ target_link_libraries(sched-fluxion-qmanager PRIVATE
flux::schedutil
PkgConfig::JANSSON
intern
cppwrappers
)
18 changes: 18 additions & 0 deletions qmanager/modules/qmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class fluxion_resource_interface_t {

struct qmanager_ctx_t : public qmanager_cb_ctx_t, public fluxion_resource_interface_t {
flux_msg_handler_t **hndlr{nullptr};
flux_msg_handler_t **stats_hndlr{nullptr};
};

fluxion_resource_interface_t::~fluxion_resource_interface_t ()
Expand Down Expand Up @@ -591,6 +592,7 @@ static void qmanager_destroy (std::shared_ptr<qmanager_ctx_t> &ctx)
flux_watcher_destroy (ctx->check);
flux_watcher_destroy (ctx->idle);
flux_msg_handler_delvec (ctx->hndlr);
flux_msg_handler_delvec (ctx->stats_hndlr);
errno = saved_errno;
}
}
Expand All @@ -601,6 +603,17 @@ static const struct flux_msg_handler_spec htab[] = {
{FLUX_MSGTYPE_REQUEST, "*.params", params_request_cb, FLUX_ROLE_USER},
FLUX_MSGHANDLER_TABLE_END,
};
static const struct flux_msg_handler_spec statstab[] = {
{FLUX_MSGTYPE_REQUEST,
"sched-fluxion-qmanager.stats-get",
qmanager_safe_cb_t::jobmanager_stats_get_cb,
FLUX_ROLE_USER},
{FLUX_MSGTYPE_REQUEST,
"sched-fluxion-qmanager.stats-clear",
qmanager_safe_cb_t::jobmanager_stats_clear_cb,
FLUX_ROLE_USER},
FLUX_MSGHANDLER_TABLE_END,
};

////////////////////////////////////////////////////////////////////////////////
// Module Main
Expand Down Expand Up @@ -652,6 +665,11 @@ int mod_start (flux_t *h, int argc, char **argv)
qmanager_destroy (ctx);
return rc;
}
if ((rc = flux_msg_handler_addvec (h, statstab, (void *)ctx.get (), &ctx->stats_hndlr)) < 0) {
flux_log_error (h, "%s: flux_msg_handler_addvec", __FUNCTION__);
qmanager_destroy (ctx);
return rc;
}
if ((rc = flux_reactor_run (flux_get_reactor (h), 0)) < 0)
flux_log_error (h, "%s: flux_reactor_run", __FUNCTION__);
qmanager_destroy (ctx);
Expand Down
57 changes: 56 additions & 1 deletion qmanager/modules/qmanager_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ extern "C" {
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <jansson.h>
}

#include "qmanager/modules/qmanager_callbacks.hpp"
#include "resource/libjobspec/jobspec.hpp"
#include "src/common/c++wrappers/eh_wrapper.hpp"

#include <jansson.hpp>

using namespace Flux;
using namespace Flux::Jobspec;
using namespace Flux::queue_manager;
Expand Down Expand Up @@ -221,6 +222,38 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const
return rc;
}

void qmanager_cb_t::jobmanager_stats_get_cb (flux_t *h,
flux_msg_handler_t *w,
const flux_msg_t *msg,
void *arg)
{
qmanager_cb_ctx_t *ctx = static_cast<qmanager_cb_ctx_t *> (arg);

json::value stats;
stats.emplace_object ();
json::value queues;
queues.emplace_object ();
json_object_set (stats.get (), "queues", queues.get ());
for (auto &[qname, queue] : ctx->queues) {
json::value qv;
queue->to_json_value (qv);
if (json_object_set (queues.get (), qname.c_str (), qv.get ()) < 0 && false)
throw std::system_error ();
}
char *resp = json_dumps (stats.get (), 0);
if (flux_respond (h, msg, resp) < 0) {
flux_log_error (h, "%s: flux_respond", __PRETTY_FUNCTION__);
}
free (resp);
}
void qmanager_cb_t::jobmanager_stats_clear_cb (flux_t *h,
flux_msg_handler_t *w,
const flux_msg_t *msg,
void *arg)
{
qmanager_cb_ctx_t *ctx = static_cast<qmanager_cb_ctx_t *> (arg);
}

void qmanager_cb_t::jobmanager_alloc_cb (flux_t *h, const flux_msg_t *msg, void *arg)
{
qmanager_cb_ctx_t *ctx = nullptr;
Expand Down Expand Up @@ -477,6 +510,28 @@ int qmanager_safe_cb_t::jobmanager_hello_cb (flux_t *h,
return rc;
}

void qmanager_safe_cb_t::jobmanager_stats_get_cb (flux_t *h,
flux_msg_handler_t *w,
const flux_msg_t *msg,
void *arg)
{
eh_wrapper_t exception_safe_wrapper;
exception_safe_wrapper (qmanager_cb_t::jobmanager_stats_get_cb, h, w, msg, arg);
if (exception_safe_wrapper.bad ())
flux_log_error (h, "%s: %s", __FUNCTION__, exception_safe_wrapper.get_err_message ());
}

void qmanager_safe_cb_t::jobmanager_stats_clear_cb (flux_t *h,
flux_msg_handler_t *w,
const flux_msg_t *msg,
void *arg)
{
eh_wrapper_t exception_safe_wrapper;
exception_safe_wrapper (qmanager_cb_t::jobmanager_stats_clear_cb, h, w, msg, arg);
if (exception_safe_wrapper.bad ())
flux_log_error (h, "%s: %s", __FUNCTION__, exception_safe_wrapper.get_err_message ());
}

void qmanager_safe_cb_t::jobmanager_alloc_cb (flux_t *h, const flux_msg_t *msg, void *arg)
{
eh_wrapper_t exception_safe_wrapper;
Expand Down
16 changes: 16 additions & 0 deletions qmanager/modules/qmanager_callbacks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ struct qmanager_cb_ctx_t {
class qmanager_cb_t {
protected:
static int jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const char *R, void *arg);
static void jobmanager_stats_get_cb (flux_t *h,
flux_msg_handler_t *w,
const flux_msg_t *msg,
void *arg);
static void jobmanager_stats_clear_cb (flux_t *h,
flux_msg_handler_t *w,
const flux_msg_t *msg,
void *arg);
static void jobmanager_alloc_cb (flux_t *h, const flux_msg_t *msg, void *arg);
static void jobmanager_free_cb (flux_t *h, const flux_msg_t *msg, const char *R, void *arg);
static void jobmanager_cancel_cb (flux_t *h, const flux_msg_t *msg, void *arg);
Expand All @@ -54,6 +62,14 @@ class qmanager_cb_t {

struct qmanager_safe_cb_t : public qmanager_cb_t {
static int jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const char *R, void *arg);
static void jobmanager_stats_get_cb (flux_t *h,
flux_msg_handler_t *w,
const flux_msg_t *msg,
void *arg);
static void jobmanager_stats_clear_cb (flux_t *h,
flux_msg_handler_t *w,
const flux_msg_t *msg,
void *arg);
static void jobmanager_alloc_cb (flux_t *h, const flux_msg_t *msg, void *arg);
static void jobmanager_free_cb (flux_t *h, const flux_msg_t *msg, const char *R, void *arg);
static void jobmanager_cancel_cb (flux_t *h, const flux_msg_t *msg, void *arg);
Expand Down
123 changes: 120 additions & 3 deletions qmanager/policies/base/queue_policy_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ extern "C" {
#include <memory>
#include <cstdint>
#include <tuple>
#include <jansson.hpp>
#include <iostream>

#include "resource/reapi/bindings/c++/reapi.hpp"
#include "qmanager/config/queue_system_defaults.hpp"
Expand Down Expand Up @@ -309,7 +311,7 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t {
* \param p_p string to which to print queue parameters
* (e.g., "reservation-depth=1024,foo=bar")
*/
void get_params (std::string &q_p, std::string &p_p)
void get_params (std::string &q_p, std::string &p_p) const
{
std::unordered_map<std::string, std::string>::const_iterator i;
for (i = m_qparams.begin (); i != m_qparams.end (); i++) {
Expand All @@ -324,6 +326,121 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t {
}
}

virtual const std::string_view policy () const = 0;

/*! Write json stats into the json::value parameter
*
*/
virtual void to_json_value (json::value &jv) const
{
json::value qparams;
to_json (qparams, m_qparams);
json::value pparams;
to_json (pparams, m_pparams);
char buf[128] = {};
auto add_queue = [&] (json_t *a, auto &map) {
for (auto &[k, jobid] : map) {
if (flux_job_id_encode (jobid, "f58plain", buf, sizeof buf) < 0)
json_array_append_new (a, json_integer (jobid));
else
json_array_append_new (a, json_string (buf));
}
};
json::value pending;
pending.emplace_object ();
json::value pending_arr;
pending_arr.emplace_array ();
json_object_set (pending.get (), "pending", pending_arr.get ());
add_queue (pending_arr.get (), m_pending);
pending_arr.emplace_array ();
json_object_set (pending.get (), "pending_provisional", pending_arr.get ());
add_queue (pending_arr.get (), m_pending_provisional);
pending_arr.emplace_array ();
json_object_set (pending.get (), "blocked", pending_arr.get ());
add_queue (pending_arr.get (), m_blocked);

json::value scheduled;
scheduled.emplace_object ();
json::value scheduled_arr;
scheduled_arr.emplace_array ();
json_object_set (scheduled.get (), "running", scheduled_arr.get ());
add_queue (scheduled_arr.get (), m_running);
scheduled_arr.emplace_array ();
json_object_set (scheduled.get (), "rejected", scheduled_arr.get ());
add_queue (scheduled_arr.get (), m_rejected);
scheduled_arr.emplace_array ();
json_object_set (scheduled.get (), "canceled", scheduled_arr.get ());
add_queue (scheduled_arr.get (), m_canceled);

json_error_t err = {0};
jv = json::value (json::no_incref{},
json_pack_ex (&err,
0,
// begin object
"{"
// policy
"s:s%"
// queue_depth
"s:I"
// max_queue_depth
"s:I"
// queue parameters
"s:O"
// policy parameters
"s:O"
// action counts
"s:o"
// pending queues
"s:O"
// scheduled queues
"s:O"
// end object
"}",
// VALUE START
// policy, str+length style
"policy",
this->policy ().data (),
this->policy ().length (),
// queue_depth
"queue_depth",
(json_int_t)m_queue_depth,
// max_queue_depth
"max_queue_depth",
(json_int_t)m_max_queue_depth,
// queue parameters
"queue_parameters",
qparams.get (),
// policy parameters
"policy_parameters",
pparams.get (),
// action counts
"action_counts",
json_pack ("{s:I s:I s:I s:I s:I s:I s:I}",
"pending",
m_pq_cnt,
"running",
m_rq_cnt,
"reserved",
m_oq_cnt,
"rejected",
m_dq_cnt,
"complete",
m_cq_cnt,
"cancelled",
m_cancel_cnt,
"reprioritized",
m_reprio_cnt),
// pending queues
"pending_queues",
pending.get (),
// scheduled queues
"scheduled_queues",
scheduled.get ()));
if (!jv.get ()) {
throw std::runtime_error (err.text);
}
}

/*! Return the queue depth used for this queue. The queue depth
* is the depth of its pending-job queue only upto which it
* considers for scheduling to deal with unbounded queue length.
Expand Down Expand Up @@ -616,7 +733,7 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t {
* so that this queue can be adapted for use within high-level
* resource API. Return true if the scheduling loop is active.
*/
virtual bool is_sched_loop_active ()
bool is_sched_loop_active () override
{
return m_sched_loop_active;
}
Expand All @@ -632,7 +749,7 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t {
* - ENOENT (job is not found from some queue)
* - EEXIST (enqueue fails due to an existent entry)
*/
virtual int set_sched_loop_active (bool active)
int set_sched_loop_active (bool active) override
{
int rc = 0;
bool prev = m_sched_loop_active;
Expand Down
19 changes: 9 additions & 10 deletions qmanager/policies/queue_policy_bf_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@ namespace detail {
template<class reapi_type>
class queue_policy_bf_base_t : public queue_policy_base_t {
public:
virtual ~queue_policy_bf_base_t ();
virtual int run_sched_loop (void *h, bool use_alloced_queue);
int run_sched_loop (void *h, bool use_alloced_queue) override;
int cancel_sched_loop () override;
virtual int reconstruct_resource (void *h, std::shared_ptr<job_t> job, std::string &R_out);
virtual int apply_params ();
virtual int handle_match_success (flux_jobid_t jobid,
const char *status,
const char *R,
int64_t at,
double ov);
virtual int handle_match_failure (flux_jobid_t jobid, int errcode);
int reconstruct_resource (void *h, std::shared_ptr<job_t> job, std::string &R_out) override;
int apply_params () override;
int handle_match_success (flux_jobid_t jobid,
const char *status,
const char *R,
int64_t at,
double ov) override;
int handle_match_failure (flux_jobid_t jobid, int errcode) override;
int cancel (void *h,
flux_jobid_t id,
const char *R,
Expand Down
5 changes: 0 additions & 5 deletions qmanager/policies/queue_policy_bf_base_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,6 @@ int queue_policy_bf_base_t<reapi_type>::cancel (void *h,
// Public API of Queue Policy Backfill Base
////////////////////////////////////////////////////////////////////////////////

template<class reapi_type>
queue_policy_bf_base_t<reapi_type>::~queue_policy_bf_base_t ()
{
}

template<class reapi_type>
int queue_policy_bf_base_t<reapi_type>::apply_params ()
{
Expand Down
7 changes: 5 additions & 2 deletions qmanager/policies/queue_policy_conservative.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ namespace detail {
template<class reapi_type>
class queue_policy_conservative_t : public queue_policy_bf_base_t<reapi_type> {
public:
virtual ~queue_policy_conservative_t ();
queue_policy_conservative_t ();
queue_policy_conservative_t (const queue_policy_conservative_t &p) = default;
queue_policy_conservative_t (queue_policy_conservative_t &&p) = default;
queue_policy_conservative_t &operator= (const queue_policy_conservative_t &p) = default;
queue_policy_conservative_t &operator= (queue_policy_conservative_t &&p) = default;

virtual int apply_params ();
int apply_params () override;
const std::string_view policy () const override
{
return "conservative";
}
};

} // namespace detail
Expand Down
Loading

0 comments on commit 5aa005b

Please sign in to comment.