Skip to content

Commit

Permalink
Merge pull request #1266 from milroy/final-free
Browse files Browse the repository at this point in the history
Support and handle optional "final" flag in .free RPC
  • Loading branch information
mergify[bot] authored Aug 13, 2024
2 parents 5aa005b + 507d368 commit ed3572e
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 11 deletions.
6 changes: 4 additions & 2 deletions qmanager/modules/qmanager_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,15 @@ void qmanager_cb_t::jobmanager_free_cb (flux_t *h, const flux_msg_t *msg, const
{
flux_jobid_t id;
json_t *Res;
int final = 0;
const char *Rstr = NULL;
qmanager_cb_ctx_t *ctx = nullptr;
ctx = static_cast<qmanager_cb_ctx_t *> (arg);
std::shared_ptr<queue_policy_base_t> queue;
std::string queue_name;

if (flux_request_unpack (msg, NULL, "{s:I s:O}", "id", &id, "R", &Res) < 0) {
if (flux_request_unpack (msg, NULL, "{s:I s:O s?b}", "id", &id, "R", &Res, "final", &final)
< 0) {
flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__);
return;
}
Expand All @@ -357,7 +359,7 @@ void qmanager_cb_t::jobmanager_free_cb (flux_t *h, const flux_msg_t *msg, const
static_cast<intmax_t> (id));
goto done;
}
if ((queue->remove (static_cast<void *> (h), id, Rstr)) < 0) {
if ((queue->remove (static_cast<void *> (h), id, final, Rstr)) < 0) {
flux_log_error (h,
"%s: remove (queue=%s id=%jd)",
__FUNCTION__,
Expand Down
66 changes: 58 additions & 8 deletions qmanager/policies/base/queue_policy_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,14 +621,17 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t {
* service module such as qmanager, it is expected
* to be a pointer to a flux_t object.
* \param id jobid of flux_jobid_t type.
* \param final bool indicating if this is the final partial release RPC
* for this jobid.
* \param R Resource set for partial cancel
* \return 0 on success; -1 on error.
* ENOENT: unknown id.
*/
int remove (void *h, flux_jobid_t id, const char *R)
int remove (void *h, flux_jobid_t id, bool final, const char *R)
{
int rc = -1;
bool full_removal = false;
flux_t *flux_h = static_cast<flux_t *> (h);

auto job_it = m_jobs.find (id);
if (job_it == m_jobs.end ()) {
Expand All @@ -643,28 +646,55 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t {
case job_state_kind_t::ALLOC_RUNNING:
// deliberately fall through
case job_state_kind_t::RUNNING:
if ((rc = cancel (h, job_it->second->id, R, true, full_removal) != 0))
break;
if (full_removal) {
if (cancel (h, job_it->second->id, R, true, full_removal) != 0) {
flux_log_error (flux_h,
"%s: .free RPC partial cancel failed for jobid "
"%jd",
__FUNCTION__,
static_cast<intmax_t> (id));
errno = EINVAL;
goto out;
}
// We still want to run the sched loop even if there's an inconsistent state
set_schedulability (true);
if (full_removal || final) {
m_alloced.erase (job_it->second->t_stamps.running_ts);
m_running.erase (job_it->second->t_stamps.running_ts);
job_it->second->t_stamps.complete_ts = m_cq_cnt++;
job_it->second->state = job_state_kind_t::COMPLETE;
m_jobs.erase (job_it);
if (final && !full_removal) {
// This error condition indicates a discrepancy between core and sched.
flux_log_error (flux_h,
"%s: Final .free RPC failed to remove all resources for "
"jobid "
"%jd",
__FUNCTION__,
static_cast<intmax_t> (id));
// Run a full cancel to clean up all remaining allocated resources
if (cancel (h, job_it->second->id, true) != 0) {
flux_log_error (flux_h,
"%s: .free RPC full cancel failed for jobid "
"%jd",
__FUNCTION__,
static_cast<intmax_t> (id));
}
errno = EPROTO;
goto out;
}
}
set_schedulability (true);
break;
default:
break;
}

rc = 0;
out:
cancel_sched_loop ();
// blocked jobs must be reconsidered after a job completes
// this covers cases where jobs that couldn't run because of an
// existing job's reservation can when it completes early
reconsider_blocked_jobs ();

rc = 0;
out:
return rc;
}

Expand All @@ -691,6 +721,26 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t {
return 0;
}

/*! Remove a job whose jobid is id from any internal queues
* (e.g., pending queue, running queue, and alloced queue.)
* If succeeds, it changes the pending queue or resource
* state. This queue becomes "schedulable" if pending job
* queue is not empty: i.e., is_schedulable() returns true;
*
* \param h Opaque handle. How it is used is an implementation
* detail. However, when it is used within a Flux's
* service module such as qmanager, it is expected
* to be a pointer to a flux_t object.
* \param id jobid of flux_jobid_t type.
* \param noent_ok don't return an error on nonexistent jobid
* \return 0 on success; -1 on error.
* ENOENT: unknown id.
*/
virtual int cancel (void *h, flux_jobid_t id, bool noent_ok)
{
return 0;
}

/*! Return true if this queue has become schedulable since
* its state had been reset with set_schedulability (false).
* "Being schedulable" means one or more job or resource events
Expand Down
1 change: 1 addition & 0 deletions qmanager/policies/queue_policy_bf_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class queue_policy_bf_base_t : public queue_policy_base_t {
const char *R,
bool noent_ok,
bool &full_removal) override;
int cancel (void *h, flux_jobid_t id, bool noent_ok) override;

protected:
unsigned int m_reservation_depth;
Expand Down
6 changes: 6 additions & 0 deletions qmanager/policies/queue_policy_bf_base_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ int queue_policy_bf_base_t<reapi_type>::cancel (void *h,
return reapi_type::cancel (h, id, R, noent_ok, full_removal);
}

template<class reapi_type>
int queue_policy_bf_base_t<reapi_type>::cancel (void *h, flux_jobid_t id, bool noent_ok)
{
return reapi_type::cancel (h, id, noent_ok);
}

////////////////////////////////////////////////////////////////////////////////
// Public API of Queue Policy Backfill Base
////////////////////////////////////////////////////////////////////////////////
Expand Down
1 change: 1 addition & 0 deletions qmanager/policies/queue_policy_fcfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class queue_policy_fcfs_t : public queue_policy_base_t {
const char *R,
bool noent_ok,
bool &full_removal) override;
int cancel (void *h, flux_jobid_t id, bool noent_ok) override;

private:
int pack_jobs (json_t *jobs);
Expand Down
6 changes: 6 additions & 0 deletions qmanager/policies/queue_policy_fcfs_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ int queue_policy_fcfs_t<reapi_type>::cancel (void *h,
return reapi_type::cancel (h, id, R, noent_ok, full_removal);
}

template<class reapi_type>
int queue_policy_fcfs_t<reapi_type>::cancel (void *h, flux_jobid_t id, bool noent_ok)
{
return reapi_type::cancel (h, id, noent_ok);
}

////////////////////////////////////////////////////////////////////////////////
// Public API of Queue Policy FCFS
////////////////////////////////////////////////////////////////////////////////
Expand Down
6 changes: 6 additions & 0 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1931,6 +1931,12 @@ static int run_remove (std::shared_ptr<resource_ctx_t> &ctx,
std::shared_ptr<job_info_t> info = ctx->jobs[jobid];
info->state = job_lifecycle_t::ERROR;
}
flux_log (ctx->h,
LOG_ERR,
"%s: dfu_traverser_t::remove (id=%jd): %s",
__FUNCTION__,
static_cast<intmax_t> (jobid),
ctx->traverser->err_message ().c_str ());
goto out;
}
if (full_removal && is_existent_jobid (ctx, jobid))
Expand Down
4 changes: 3 additions & 1 deletion resource/traversers/dfu_impl_update.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,13 @@ int dfu_impl_t::rem_exclusive_filter (vtx_t u, int64_t jobid, const modify_data_

auto span_it = (*m_graph)[u].idata.x_spans.find (jobid);
if (span_it == (*m_graph)[u].idata.x_spans.end ()) {
if (mod_data.mod_type != job_modify_t::PARTIAL_CANCEL) {
if (mod_data.mod_type == job_modify_t::VTX_CANCEL) {
m_err_msg += __FUNCTION__;
m_err_msg += ": jobid isn't found in x_spans table.\n ";
goto done;
} else {
// Valid for CANCEL if clearing inconsistency between
// core and sched.
rc = 0;
goto done;
}
Expand Down
164 changes: 164 additions & 0 deletions t/issues/t6179-flux-core-housekeeping.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#!/bin/bash
#
# Ensure fluxion conforms to updated RFC 27 and behaves as desired if resource state
# discrepancies exit between flux-core and -sched.
#

log() { printf "flux-coreissue#6179: $@\n" >&2; }

cat <<'EOF' >free.py
import flux
import json
import sys
import subprocess as sp
from flux.job import JobID
jobid = JobID(sys.argv[1])
r_obj=json.loads(sp.check_output(['flux', 'job', 'info', sys.argv[1], 'R']).decode())
obj = {'id': jobid, 'R': r_obj, 'final': True}
flux.Flux().rpc('sched.free', obj)
sys.exit(0)
EOF

cat <<'EOF' >incomplete-free.py
import flux
import json
import sys
import subprocess as sp
from flux.job import JobID
jobid = JobID(sys.argv[1])
R_str = '{"version": 1, "execution": {"R_lite": [{"rank": "1", "children": {"core": "0-15", "gpu": "0-3"}}], "nodelist": ["node1"]}}'
r_obj = json.loads(R_str)
obj = {'id': jobid, 'R': r_obj, 'final': True}
flux.Flux().rpc('sched.free', obj)
sys.exit(0)
EOF

cat <<EOF >flux.config
[sched-fluxion-resource]
match-policy = "lonodex"
match-format = "rv1_nosched"
[resource]
noverify = true
norestrict = true
[[resource.config]]
hosts = "node[0-1]"
cores = "0-15"
gpus = "0-3"
EOF

log "Unloading modules..."
flux module remove sched-simple
flux module remove resource

flux config load flux.config

flux module load resource monitor-force-up
flux module load sched-fluxion-resource
flux module load sched-fluxion-qmanager queue-policy="easy"
flux queue start --all --quiet
flux resource list
flux resource status
flux module list

log "Running test job 1"
jobid1=$(flux submit --wait-event=alloc -N2 -t 1h --setattr=exec.test.run_duration=1m sleep inf)
log "Sending final RPC for job 1"
flux python ./free.py ${jobid1}
state=$( flux ion-resource find "sched-now=allocated" )
if [[ ${state} != *"null"* ]]; then
# retry since ./free.py isn't blocking
state=$( flux ion-resource find "sched-now=allocated" )
if [[ ${state} != *"null"* ]]; then
# .free didn't release all resources
exit 1
fi
fi

# Need to execute cancel to remove from job manager
flux cancel ${jobid1}
flux job wait-event -t 5 ${jobid1} release

log "Running test job 2"
jobid2=$(flux submit --wait-event=alloc -N2 -t 1h --setattr=exec.test.run_duration=1m sleep inf)
log "Sending final RPC for job 2"
flux python ./incomplete-free.py ${jobid2}
state=$( flux ion-resource find "sched-now=allocated" )
if [[ ${state} != *"null"* ]]; then
# retry since ./free.py isn't blocking
state=$( flux ion-resource find "sched-now=allocated" )
if [[ ${state} != *"null"* ]]; then
# .free didn't release all resources
exit 1
fi
fi

# Need to execute cancel to remove from job manager
flux cancel ${jobid2}
flux job wait-event -t 5 ${jobid2} release
flux jobs -a

log "reloading sched-simple..."
flux module remove sched-fluxion-qmanager
flux module remove sched-fluxion-resource
flux module load sched-simple

log "Unloading modules for FCFS test..."
flux module remove sched-simple
flux module remove resource

flux config load flux.config

flux module load resource monitor-force-up
flux module load sched-fluxion-resource
flux module load sched-fluxion-qmanager queue-policy="fcfs"
flux queue start --all --quiet
flux resource list
flux resource status
flux module list

log "Running test job 3"
jobid3=$(flux submit --wait-event=alloc -N2 -t 1h --setattr=exec.test.run_duration=1m sleep inf)
log "Sending final RPC for job 3"
flux python ./free.py ${jobid3}
state=$( flux ion-resource find "sched-now=allocated" )
if [[ ${state} != *"null"* ]]; then
# retry since ./free.py isn't blocking
state=$( flux ion-resource find "sched-now=allocated" )
if [[ ${state} != *"null"* ]]; then
# .free didn't release all resources
exit 1
fi
fi

# Need to execute cancel to remove from job manager
flux cancel ${jobid3}
flux job wait-event -t 5 ${jobid3} release

log "Running test job 4"
jobid4=$(flux submit --wait-event=alloc -N2 -t 1h --setattr=exec.test.run_duration=1m sleep inf)
log "Sending final RPC for job 4"
flux python ./incomplete-free.py ${jobid4}
state=$( flux ion-resource find "sched-now=allocated" )
if [[ ${state} != *"null"* ]]; then
# retry since ./free.py isn't blocking
state=$( flux ion-resource find "sched-now=allocated" )
if [[ ${state} != *"null"* ]]; then
# .free didn't release all resources
exit 1
fi
fi

# Need to execute cancel to remove from job manager
flux cancel ${jobid4}
flux job wait-event -t 5 ${jobid4} release
flux jobs -a

log "reloading sched-simple..."
flux module remove sched-fluxion-qmanager
flux module remove sched-fluxion-resource
flux module load sched-simple

0 comments on commit ed3572e

Please sign in to comment.