Skip to content

Commit

Permalink
Merge pull request #743 from SteVwonder/job-shell-staging-plugin
Browse files Browse the repository at this point in the history
Add job shell staging plugin
  • Loading branch information
dongahn authored Nov 7, 2020
2 parents 9c33175 + e33fed7 commit 894e5af
Show file tree
Hide file tree
Showing 36 changed files with 28,334 additions and 17 deletions.
4 changes: 4 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ AC_SUBST(fluxrc3dir)
fluxmod_ldflags="-Wl,--no-undefined -avoid-version -export-symbols-regex '^mod_(main|name|service)\$\$' --disable-static -shared -export-dynamic"
AC_SUBST(fluxmod_ldflags)

shellplugin_ldflags="-Wl,--no-undefined -avoid-version -export-symbols-regex '^flux_plugin_init\$\$' --disable-static -shared -export-dynamic"
AC_SUBST(shellplugin_ldflags)

fluxlib_ldflags="-shared -export-dynamic --disable-static -Wl,--no-undefined"
AC_SUBST(fluxlib_ldflags)

Expand All @@ -158,6 +161,7 @@ AC_CONFIG_FILES([Makefile
src/common/c++wrappers/Makefile
src/common/c++wrappers/test/Makefile
src/cmd/Makefile
src/shell/Makefile
resource/Makefile
resource/planner/Makefile
resource/planner/test/Makefile
Expand Down
2 changes: 2 additions & 0 deletions resource/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ libresource_la_SOURCES = \
schema/infra_data.cpp \
schema/sched_data.cpp \
schema/color.cpp \
schema/ephemeral.cpp \
traversers/dfu.cpp \
traversers/dfu_impl.cpp \
traversers/dfu_impl_update.cpp \
Expand Down Expand Up @@ -55,6 +56,7 @@ libresource_la_SOURCES = \
schema/sched_data.hpp \
schema/resource_data.hpp \
schema/color.hpp \
schema/ephemeral.hpp \
traversers/dfu.hpp \
traversers/dfu_impl.hpp \
policies/base/dfu_match_cb.hpp \
Expand Down
25 changes: 16 additions & 9 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ static int process_args (std::shared_ptr<resource_ctx_t> &ctx,
dflt = args.match_format;
args.match_format = strstr (argv[i], "=") + 1;
if (!known_match_format (args.match_format)) {
args.match_format = dflt;
flux_log (ctx->h, LOG_ERR,
"%s: unknown match format (%s)! use default (%s).",
__FUNCTION__,
Expand Down Expand Up @@ -1843,13 +1842,17 @@ static void notify_request_cb (flux_t *h, flux_msg_handler_t *w,
}

static int run_find (std::shared_ptr<resource_ctx_t>& ctx,
const std::string &criteria, json_t **R)
const std::string &criteria,
const std::string &format_str,
json_t **R)
{
int rc = -1;
json_t *o = nullptr;
std::shared_ptr<match_writers_t> w = nullptr;

if ( !(w = match_writers_factory_t::create (match_format_t::RV1_NOSCHED)))
match_format_t format = match_writers_factory_t::
get_writers_type (format_str);
if ( !(w = match_writers_factory_t::create (format)))
goto error;
if ( (rc = ctx->traverser->find (w, criteria)) < 0) {
if (ctx->traverser->err_message () != "") {
Expand Down Expand Up @@ -1877,12 +1880,16 @@ static void find_request_cb (flux_t *h, flux_msg_handler_t *w,
json_t *R = nullptr;
int saved_errno;
const char *criteria = nullptr;
const char *format_str = "rv1_nosched";
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);

if (flux_request_unpack (msg, nullptr, "{s:s}",
"criteria", &criteria) < 0)
if (flux_request_unpack (msg, nullptr,
"{s:s, s?:s}",
"criteria", &criteria,
"format", &format_str) < 0)
goto error;
if (run_find (ctx, criteria, &R) < 0)

if (run_find (ctx, criteria, format_str, &R) < 0)
goto error;
if (flux_respond_pack (h, msg, "{s:o?}",
"R", R) < 0) {
Expand Down Expand Up @@ -1910,11 +1917,11 @@ static void status_request_cb (flux_t *h, flux_msg_handler_t *w,
json_t *R_alloc = nullptr;
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);

if (run_find (ctx, "status=up or status=down", &R_all) < 0)
if (run_find (ctx, "status=up or status=down", "rv1_nosched", &R_all) < 0)
goto error;
if (run_find (ctx, "status=down", &R_down) < 0)
if (run_find (ctx, "status=down", "rv1_nosched", &R_down) < 0)
goto error;
if (run_find (ctx, "sched-now=allocated", &R_alloc) < 0)
if (run_find (ctx, "sched-now=allocated", "rv1_nosched", &R_alloc) < 0)
goto error;
if (flux_respond_pack (h, msg, "{s:o? s:o? s:o?}",
"all", R_all,
Expand Down
104 changes: 104 additions & 0 deletions resource/schema/ephemeral.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*****************************************************************************\
* LLNL-CODE-658032 All rights reserved.
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the license, or (at your option)
* any later version.
*
* Flux is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
* See also: http://www.gnu.org/licenses/
\*****************************************************************************/

#include "resource/schema/ephemeral.hpp"

namespace Flux {
namespace resource_model {

int ephemeral_t::insert (uint64_t epoch,
const std::string &key,
const std::string &value)
{
int rc = 0;

try {
check_and_clear_if_stale (epoch);
m_epoch = epoch;
auto ret = m_store.insert(std::make_pair (key, value));
if (!ret.second) {
errno = EEXIST;
rc = -1;
}
} catch (std::bad_alloc &) {
errno = ENOMEM;
rc = -1;
}

return rc;
}

boost::optional<std::string> ephemeral_t::get (uint64_t epoch, const std::string &key)
{
if (check_and_clear_if_stale (epoch)) {
return boost::none;
}

try {
auto it = m_store.find (key);
if (it == m_store.end ()) {
return boost::none;
}

auto value = (*it).second;
return value;
} catch (const std::out_of_range& oor) {
return boost::none;
}
}



const std::map<std::string, std::string>& ephemeral_t::to_map (uint64_t epoch)
{
check_and_clear_if_stale (epoch);
return this->to_map ();
}

const std::map<std::string, std::string>& ephemeral_t::to_map () const
{
return m_store;
}

bool ephemeral_t::check_and_clear_if_stale (uint64_t epoch)
{
if (m_epoch < epoch) {
// data is stale
this->clear ();
return true;
}
return false;
}


void ephemeral_t::clear ()
{
m_store.clear ();
}


} // resource_model
} // Flux

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
56 changes: 56 additions & 0 deletions resource/schema/ephemeral.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*****************************************************************************\
* LLNL-CODE-658032 All rights reserved.
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the license, or (at your option)
* any later version.
*
* Flux is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
* See also: http://www.gnu.org/licenses/
\*****************************************************************************/

#ifndef EPHEMERAL_H
#define EPHEMERAL_H

#include <cstdint>
#include <map>
#include <boost/optional.hpp>

namespace Flux {
namespace resource_model {

class ephemeral_t {
public:
int insert (uint64_t epoch,
const std::string &key,
const std::string &value);
boost::optional<std::string> get (uint64_t epoch, const std::string &key);
const std::map<std::string, std::string>& to_map (uint64_t epoch);
const std::map<std::string, std::string>& to_map () const;
bool check_and_clear_if_stale (uint64_t epoch);
void clear ();

private:
std::map<std::string, std::string> m_store;
uint64_t m_epoch;
};

} // resource_model
} // Flux

#endif // EPHEMERAL_H

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
1 change: 1 addition & 0 deletions resource/schema/infra_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ void pool_infra_t::scrub ()
colors.clear ();
if (x_checker)
planner_destroy (&x_checker);
ephemeral.clear ();
}


Expand Down
2 changes: 2 additions & 0 deletions resource/schema/infra_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <map>
#include <cstdint>
#include "resource/schema/data_std.hpp"
#include "resource/schema/ephemeral.hpp"
#include "resource/planner/planner_multi.h"

namespace Flux {
Expand Down Expand Up @@ -57,6 +58,7 @@ struct pool_infra_t : public infra_base_t {
planner_t *x_checker = NULL;
std::map<subsystem_t, planner_multi_t *> subplans;
std::map<subsystem_t, uint64_t> colors;
ephemeral_t ephemeral;
};

class relation_infra_t : public infra_base_t {
Expand Down
27 changes: 26 additions & 1 deletion resource/traversers/dfu_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,21 @@ int dfu_impl_t::dom_dfv (const jobmeta_t &meta, vtx_t u,
goto done;
to_parent.set_avail (avail);
to_parent.set_overall_score (dfu.overall_score ());

for (auto &resource: resources) {
if ((resource.type == (*m_graph)[u].type) &&
(!resource.label.empty())) {
rc = (*m_graph)[u].idata.ephemeral.insert (m_best_k_cnt,
"label",
resource.label);
if (rc < 0) {
m_err_msg += "dom_dfv: inserting label into ephemeral failed.\n";
m_err_msg += strerror (errno);
m_err_msg += ".\n";
goto done;
}
}
}
done:
return rc;
}
Expand Down Expand Up @@ -659,12 +674,22 @@ int dfu_impl_t::dom_find_dfv (std::shared_ptr<match_writers_t> &w,
goto done;
} else if (!result && !nchildren) {
goto done;
} else if ( (rc = w->emit_vtx (level (), *m_graph, u,
}

// Need to clear out any stale data from the ephemeral object before
// emitting the vertex, since data could be leftover from previous
// traversals where the vertex was matched but not emitted
(*m_graph)[u].idata.ephemeral.check_and_clear_if_stale (m_best_k_cnt);
if ( (rc = w->emit_vtx (level (), *m_graph, u,
(*m_graph)[u].size, true)) < 0) {
m_err_msg += __FUNCTION__;
m_err_msg += std::string (": error from emit_vtx: ") + strerror (errno);
goto done;
}
// Need to clear out all data from the ephemeral object after
// emitting the vertex to minimize the amount of stale data
(*m_graph)[u].idata.ephemeral.clear ();

rc = nchildren + 1;
done:
m_trav_level--;
Expand Down
11 changes: 11 additions & 0 deletions resource/utilities/resource-query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <readline/readline.h>
#include <readline/history.h>
#include <boost/algorithm/string.hpp>
#include <boost/filesystem.hpp>
#include "resource/utilities/command.hpp"
#include "resource/store/resource_graph_store.hpp"
#include "resource/policies/dfu_match_policy_factory.hpp"
Expand All @@ -45,6 +46,7 @@ extern "C" {
#endif
}

namespace fs = boost::filesystem;
using namespace Flux::resource_model;

#define OPTIONS "L:f:W:S:P:F:g:o:p:t:r:edh"
Expand Down Expand Up @@ -611,6 +613,15 @@ static void process_args (std::shared_ptr<resource_context_t> &ctx,
break;
case 'L': /* --load-file */
ctx->params.load_file = optarg;
if (!fs::exists(ctx->params.load_file)) {
std::cerr << "[ERROR] file does not exist for --load-file: ";
std::cerr << optarg << std::endl;
usage (1);
} else if (fs::is_directory(ctx->params.load_file)) {
std::cerr << "[ERROR] path passed to --load-file is a directory: ";
std::cerr << optarg << std::endl;
usage (1);
}
break;
case 'f': /* --load-format */
ctx->params.load_format = optarg;
Expand Down
10 changes: 8 additions & 2 deletions resource/writers/match_writers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ int jgf_match_writers_t::emit_vtx (const std::string &prefix,
int rc = 0;
json_t *o = NULL;
json_t *b = NULL;
auto &ephemeral = g[u].idata.ephemeral;
auto &eph_map = ephemeral.to_map ();

if (!m_vout || !m_eout) {
rc = -1;
Expand All @@ -288,6 +290,10 @@ int jgf_match_writers_t::emit_vtx (const std::string &prefix,
json_decref (b);
goto out;
}
if ((rc = map2json (b, eph_map, "ephemeral") < 0)) {
json_decref (b);
goto out;
}
if ((o = json_pack ("{s:s s:o}",
"id", std::to_string (g[u].uniq_id).c_str (),
"metadata", b)) == NULL) {
Expand Down Expand Up @@ -443,8 +449,8 @@ int jgf_match_writers_t::map2json (json_t *o,
errno = ENOMEM;
goto out;
}
}
if ((rc = json_object_set_new (o, key, p)) == -1) {
}
if ((rc = json_object_set_new (o, key, p)) == -1) {
errno = ENOMEM;
goto out;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.NOTPARALLEL:

SUBDIRS = common cmd
SUBDIRS = common cmd shell

check-local: all
Loading

0 comments on commit 894e5af

Please sign in to comment.