diff --git a/qmanager/modules/qmanager_callbacks.cpp b/qmanager/modules/qmanager_callbacks.cpp index f953e907b..c9a6c1b95 100644 --- a/qmanager/modules/qmanager_callbacks.cpp +++ b/qmanager/modules/qmanager_callbacks.cpp @@ -335,13 +335,15 @@ void qmanager_cb_t::jobmanager_free_cb (flux_t *h, const flux_msg_t *msg, const { flux_jobid_t id; json_t *Res; + int final = 0; const char *Rstr = NULL; qmanager_cb_ctx_t *ctx = nullptr; ctx = static_cast (arg); std::shared_ptr queue; std::string queue_name; - if (flux_request_unpack (msg, NULL, "{s:I s:O}", "id", &id, "R", &Res) < 0) { + if (flux_request_unpack (msg, NULL, "{s:I s:O s?b}", "id", &id, "R", &Res, "final", &final) + < 0) { flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__); return; } @@ -357,7 +359,7 @@ void qmanager_cb_t::jobmanager_free_cb (flux_t *h, const flux_msg_t *msg, const static_cast (id)); goto done; } - if ((queue->remove (static_cast (h), id, Rstr)) < 0) { + if ((queue->remove (static_cast (h), id, final, Rstr)) < 0) { flux_log_error (h, "%s: remove (queue=%s id=%jd)", __FUNCTION__, diff --git a/qmanager/policies/base/queue_policy_base.hpp b/qmanager/policies/base/queue_policy_base.hpp index 4e1362e32..ba1e3b791 100644 --- a/qmanager/policies/base/queue_policy_base.hpp +++ b/qmanager/policies/base/queue_policy_base.hpp @@ -621,14 +621,17 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t { * service module such as qmanager, it is expected * to be a pointer to a flux_t object. * \param id jobid of flux_jobid_t type. + * \param final bool indicating if this is the final partial release RPC + * for this jobid. * \param R Resource set for partial cancel * \return 0 on success; -1 on error. * ENOENT: unknown id. */ - int remove (void *h, flux_jobid_t id, const char *R) + int remove (void *h, flux_jobid_t id, bool final, const char *R) { int rc = -1; bool full_removal = false; + flux_t *flux_h = static_cast (h); auto job_it = m_jobs.find (id); if (job_it == m_jobs.end ()) { @@ -643,28 +646,55 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t { case job_state_kind_t::ALLOC_RUNNING: // deliberately fall through case job_state_kind_t::RUNNING: - if ((rc = cancel (h, job_it->second->id, R, true, full_removal) != 0)) - break; - if (full_removal) { + if (cancel (h, job_it->second->id, R, true, full_removal) != 0) { + flux_log_error (flux_h, + "%s: .free RPC partial cancel failed for jobid " + "%jd", + __FUNCTION__, + static_cast (id)); + errno = EINVAL; + goto out; + } + // We still want to run the sched loop even if there's an inconsistent state + set_schedulability (true); + if (full_removal || final) { m_alloced.erase (job_it->second->t_stamps.running_ts); m_running.erase (job_it->second->t_stamps.running_ts); job_it->second->t_stamps.complete_ts = m_cq_cnt++; job_it->second->state = job_state_kind_t::COMPLETE; m_jobs.erase (job_it); + if (final && !full_removal) { + // This error condition indicates a discrepancy between core and sched. + flux_log_error (flux_h, + "%s: Final .free RPC failed to remove all resources for " + "jobid " + "%jd", + __FUNCTION__, + static_cast (id)); + // Run a full cancel to clean up all remaining allocated resources + if (cancel (h, job_it->second->id, true) != 0) { + flux_log_error (flux_h, + "%s: .free RPC full cancel failed for jobid " + "%jd", + __FUNCTION__, + static_cast (id)); + } + errno = EPROTO; + goto out; + } } - set_schedulability (true); break; default: break; } + + rc = 0; + out: cancel_sched_loop (); // blocked jobs must be reconsidered after a job completes // this covers cases where jobs that couldn't run because of an // existing job's reservation can when it completes early reconsider_blocked_jobs (); - - rc = 0; - out: return rc; } @@ -691,6 +721,26 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t { return 0; } + /*! Remove a job whose jobid is id from any internal queues + * (e.g., pending queue, running queue, and alloced queue.) + * If succeeds, it changes the pending queue or resource + * state. This queue becomes "schedulable" if pending job + * queue is not empty: i.e., is_schedulable() returns true; + * + * \param h Opaque handle. How it is used is an implementation + * detail. However, when it is used within a Flux's + * service module such as qmanager, it is expected + * to be a pointer to a flux_t object. + * \param id jobid of flux_jobid_t type. + * \param noent_ok don't return an error on nonexistent jobid + * \return 0 on success; -1 on error. + * ENOENT: unknown id. + */ + virtual int cancel (void *h, flux_jobid_t id, bool noent_ok) + { + return 0; + } + /*! Return true if this queue has become schedulable since * its state had been reset with set_schedulability (false). * "Being schedulable" means one or more job or resource events diff --git a/qmanager/policies/queue_policy_bf_base.hpp b/qmanager/policies/queue_policy_bf_base.hpp index cf4a68b31..740bceda6 100644 --- a/qmanager/policies/queue_policy_bf_base.hpp +++ b/qmanager/policies/queue_policy_bf_base.hpp @@ -36,6 +36,7 @@ class queue_policy_bf_base_t : public queue_policy_base_t { const char *R, bool noent_ok, bool &full_removal) override; + int cancel (void *h, flux_jobid_t id, bool noent_ok) override; protected: unsigned int m_reservation_depth; diff --git a/qmanager/policies/queue_policy_bf_base_impl.hpp b/qmanager/policies/queue_policy_bf_base_impl.hpp index 21cdf812c..bbeedbd18 100644 --- a/qmanager/policies/queue_policy_bf_base_impl.hpp +++ b/qmanager/policies/queue_policy_bf_base_impl.hpp @@ -113,6 +113,12 @@ int queue_policy_bf_base_t::cancel (void *h, return reapi_type::cancel (h, id, R, noent_ok, full_removal); } +template +int queue_policy_bf_base_t::cancel (void *h, flux_jobid_t id, bool noent_ok) +{ + return reapi_type::cancel (h, id, noent_ok); +} + //////////////////////////////////////////////////////////////////////////////// // Public API of Queue Policy Backfill Base //////////////////////////////////////////////////////////////////////////////// diff --git a/qmanager/policies/queue_policy_fcfs.hpp b/qmanager/policies/queue_policy_fcfs.hpp index 7ba624031..7298d7b72 100644 --- a/qmanager/policies/queue_policy_fcfs.hpp +++ b/qmanager/policies/queue_policy_fcfs.hpp @@ -39,6 +39,7 @@ class queue_policy_fcfs_t : public queue_policy_base_t { const char *R, bool noent_ok, bool &full_removal) override; + int cancel (void *h, flux_jobid_t id, bool noent_ok) override; private: int pack_jobs (json_t *jobs); diff --git a/qmanager/policies/queue_policy_fcfs_impl.hpp b/qmanager/policies/queue_policy_fcfs_impl.hpp index ddb0e8f53..8d866793b 100644 --- a/qmanager/policies/queue_policy_fcfs_impl.hpp +++ b/qmanager/policies/queue_policy_fcfs_impl.hpp @@ -150,6 +150,12 @@ int queue_policy_fcfs_t::cancel (void *h, return reapi_type::cancel (h, id, R, noent_ok, full_removal); } +template +int queue_policy_fcfs_t::cancel (void *h, flux_jobid_t id, bool noent_ok) +{ + return reapi_type::cancel (h, id, noent_ok); +} + //////////////////////////////////////////////////////////////////////////////// // Public API of Queue Policy FCFS //////////////////////////////////////////////////////////////////////////////// diff --git a/resource/modules/resource_match.cpp b/resource/modules/resource_match.cpp index 4741d0521..9333ce55b 100644 --- a/resource/modules/resource_match.cpp +++ b/resource/modules/resource_match.cpp @@ -1931,6 +1931,12 @@ static int run_remove (std::shared_ptr &ctx, std::shared_ptr info = ctx->jobs[jobid]; info->state = job_lifecycle_t::ERROR; } + flux_log (ctx->h, + LOG_ERR, + "%s: dfu_traverser_t::remove (id=%jd): %s", + __FUNCTION__, + static_cast (jobid), + ctx->traverser->err_message ().c_str ()); goto out; } if (full_removal && is_existent_jobid (ctx, jobid)) diff --git a/resource/traversers/dfu_impl_update.cpp b/resource/traversers/dfu_impl_update.cpp index 79efbff35..f97e95ac6 100644 --- a/resource/traversers/dfu_impl_update.cpp +++ b/resource/traversers/dfu_impl_update.cpp @@ -368,11 +368,13 @@ int dfu_impl_t::rem_exclusive_filter (vtx_t u, int64_t jobid, const modify_data_ auto span_it = (*m_graph)[u].idata.x_spans.find (jobid); if (span_it == (*m_graph)[u].idata.x_spans.end ()) { - if (mod_data.mod_type != job_modify_t::PARTIAL_CANCEL) { + if (mod_data.mod_type == job_modify_t::VTX_CANCEL) { m_err_msg += __FUNCTION__; m_err_msg += ": jobid isn't found in x_spans table.\n "; goto done; } else { + // Valid for CANCEL if clearing inconsistency between + // core and sched. rc = 0; goto done; } diff --git a/t/issues/t6179-flux-core-housekeeping.sh b/t/issues/t6179-flux-core-housekeeping.sh new file mode 100755 index 000000000..138491da3 --- /dev/null +++ b/t/issues/t6179-flux-core-housekeeping.sh @@ -0,0 +1,164 @@ +#!/bin/bash +# +# Ensure fluxion conforms to updated RFC 27 and behaves as desired if resource state +# discrepancies exit between flux-core and -sched. +# + +log() { printf "flux-coreissue#6179: $@\n" >&2; } + +cat <<'EOF' >free.py +import flux +import json +import sys +import subprocess as sp +from flux.job import JobID + +jobid = JobID(sys.argv[1]) +r_obj=json.loads(sp.check_output(['flux', 'job', 'info', sys.argv[1], 'R']).decode()) +obj = {'id': jobid, 'R': r_obj, 'final': True} +flux.Flux().rpc('sched.free', obj) +sys.exit(0) +EOF + +cat <<'EOF' >incomplete-free.py +import flux +import json +import sys +import subprocess as sp +from flux.job import JobID + +jobid = JobID(sys.argv[1]) +R_str = '{"version": 1, "execution": {"R_lite": [{"rank": "1", "children": {"core": "0-15", "gpu": "0-3"}}], "nodelist": ["node1"]}}' +r_obj = json.loads(R_str) +obj = {'id': jobid, 'R': r_obj, 'final': True} +flux.Flux().rpc('sched.free', obj) +sys.exit(0) +EOF + +cat <flux.config +[sched-fluxion-resource] +match-policy = "lonodex" +match-format = "rv1_nosched" + +[resource] +noverify = true +norestrict = true + +[[resource.config]] +hosts = "node[0-1]" +cores = "0-15" +gpus = "0-3" +EOF + +log "Unloading modules..." +flux module remove sched-simple +flux module remove resource + +flux config load flux.config + +flux module load resource monitor-force-up +flux module load sched-fluxion-resource +flux module load sched-fluxion-qmanager queue-policy="easy" +flux queue start --all --quiet +flux resource list +flux resource status +flux module list + +log "Running test job 1" +jobid1=$(flux submit --wait-event=alloc -N2 -t 1h --setattr=exec.test.run_duration=1m sleep inf) +log "Sending final RPC for job 1" +flux python ./free.py ${jobid1} +state=$( flux ion-resource find "sched-now=allocated" ) +if [[ ${state} != *"null"* ]]; then + # retry since ./free.py isn't blocking + state=$( flux ion-resource find "sched-now=allocated" ) + if [[ ${state} != *"null"* ]]; then + # .free didn't release all resources + exit 1 + fi +fi + +# Need to execute cancel to remove from job manager +flux cancel ${jobid1} +flux job wait-event -t 5 ${jobid1} release + +log "Running test job 2" +jobid2=$(flux submit --wait-event=alloc -N2 -t 1h --setattr=exec.test.run_duration=1m sleep inf) +log "Sending final RPC for job 2" +flux python ./incomplete-free.py ${jobid2} +state=$( flux ion-resource find "sched-now=allocated" ) +if [[ ${state} != *"null"* ]]; then + # retry since ./free.py isn't blocking + state=$( flux ion-resource find "sched-now=allocated" ) + if [[ ${state} != *"null"* ]]; then + # .free didn't release all resources + exit 1 + fi +fi + +# Need to execute cancel to remove from job manager +flux cancel ${jobid2} +flux job wait-event -t 5 ${jobid2} release +flux jobs -a + +log "reloading sched-simple..." +flux module remove sched-fluxion-qmanager +flux module remove sched-fluxion-resource +flux module load sched-simple + +log "Unloading modules for FCFS test..." +flux module remove sched-simple +flux module remove resource + +flux config load flux.config + +flux module load resource monitor-force-up +flux module load sched-fluxion-resource +flux module load sched-fluxion-qmanager queue-policy="fcfs" +flux queue start --all --quiet +flux resource list +flux resource status +flux module list + +log "Running test job 3" +jobid3=$(flux submit --wait-event=alloc -N2 -t 1h --setattr=exec.test.run_duration=1m sleep inf) +log "Sending final RPC for job 3" +flux python ./free.py ${jobid3} +state=$( flux ion-resource find "sched-now=allocated" ) +if [[ ${state} != *"null"* ]]; then + # retry since ./free.py isn't blocking + state=$( flux ion-resource find "sched-now=allocated" ) + if [[ ${state} != *"null"* ]]; then + # .free didn't release all resources + exit 1 + fi +fi + +# Need to execute cancel to remove from job manager +flux cancel ${jobid3} +flux job wait-event -t 5 ${jobid3} release + +log "Running test job 4" +jobid4=$(flux submit --wait-event=alloc -N2 -t 1h --setattr=exec.test.run_duration=1m sleep inf) +log "Sending final RPC for job 4" +flux python ./incomplete-free.py ${jobid4} +state=$( flux ion-resource find "sched-now=allocated" ) +if [[ ${state} != *"null"* ]]; then + # retry since ./free.py isn't blocking + state=$( flux ion-resource find "sched-now=allocated" ) + if [[ ${state} != *"null"* ]]; then + # .free didn't release all resources + exit 1 + fi +fi + +# Need to execute cancel to remove from job manager +flux cancel ${jobid4} +flux job wait-event -t 5 ${jobid4} release +flux jobs -a + +log "reloading sched-simple..." +flux module remove sched-fluxion-qmanager +flux module remove sched-fluxion-resource +flux module load sched-simple +