Skip to content

Commit

Permalink
Merge pull request #6500 from grondo/issue#5957
Browse files Browse the repository at this point in the history
jobtap: add `flux_jobtap_jobspec_update_id_pack()`
  • Loading branch information
mergify[bot] authored Dec 11, 2024
2 parents 0af65c5 + c5d9fda commit 7d4d63b
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 1 deletion.
55 changes: 55 additions & 0 deletions src/modules/job-manager/jobtap.c
Original file line number Diff line number Diff line change
Expand Up @@ -2263,6 +2263,61 @@ int flux_jobtap_event_post_pack (flux_plugin_t *p,
return rc;
}

int flux_jobtap_jobspec_update_id_pack (flux_plugin_t *p,
flux_jobid_t id,
const char *fmt,
...)
{
int rc = -1;
va_list ap;
struct jobtap *jobtap;
struct job *job;
json_error_t error;
json_t *update = NULL;

if (!p
|| !(jobtap = flux_plugin_aux_get (p, "flux::jobtap"))
|| !(job = jobtap_lookup_active_jobid (p, id))
|| job->state == FLUX_JOB_STATE_RUN
|| job->state == FLUX_JOB_STATE_CLEANUP
|| job->eventlog_readonly) {
errno = EINVAL;
return -1;
}

/* This interface is only appropriate from outside a jobtap callback,
* i.e. called asynchronously to update a job. If 'job' is equivalent
* to the current job at the top of the jobtap stack, return an error.
*/
if (job == current_job (jobtap)) {
errno = EINVAL;
return -1;
}

va_start (ap, fmt);
update = json_vpack_ex (&error, 0, fmt, ap);
va_end (ap);
if (!update) {
errno = EINVAL;
return -1;
}
if (!validate_jobspec_updates (update)) {
errno = EINVAL;
goto out;
}
/* XXX: should job.validate be called on these updates before posting?
*/
rc = event_job_post_pack (jobtap->ctx->event,
job,
"jobspec-update",
0,
"O",
update);
out:
ERRNO_SAFE_WRAP (json_decref, update);
return rc;
}

int flux_jobtap_jobspec_update_pack (flux_plugin_t *p, const char *fmt, ...)
{
int rc = -1;
Expand Down
13 changes: 13 additions & 0 deletions src/modules/job-manager/jobtap.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,19 @@ int flux_jobtap_event_post_pack (flux_plugin_t *p,
*/
int flux_jobtap_jobspec_update_pack (flux_plugin_t *p, const char *fmt, ...);

/* Similar to flux_jobtap_jobspec_update_pack(), but asynchronously update
* a specific jobid. This version assumes the job is quiescent, so the event
* is applied immediately.
*
* Returns -1 with errno set to EINVAL for invalid arguments, if the job
* does not exist, if the target job is in RUN, CLEANUP or INACTIVE states,
* or if the function is called from a jobtap callback for the target job.
*/
int flux_jobtap_jobspec_update_id_pack (flux_plugin_t *p,
flux_jobid_t id,
const char *fmt,
...);

/* Return a flux_plugin_arg_t object for a job.
*
* The result can then be unpacked with flux_plugin_arg_unpack(3) to get
Expand Down
35 changes: 34 additions & 1 deletion t/job-manager/plugins/jobspec-update.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ static int get_and_update_jobspec_name (flux_error_t *errp,
const char **cur_namep,
char *name)
{
flux_jobid_t id;
const char *current_name = NULL;
char *copy = NULL;
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:{s:{s?{s?{s?s}}}}}",
"{s:I s:{s:{s?{s?{s?s}}}}}",
"id", &id,
"jobspec",
"attributes",
"system",
Expand All @@ -41,6 +43,20 @@ static int get_and_update_jobspec_name (flux_error_t *errp,
flux_plugin_arg_strerror (args));
return -1;
}

/* flux_jobtap_jobspec_update_id_pack() should fail here, since this
* function is always called in the context of a jobtap callback:
*/
if (flux_jobtap_jobspec_update_id_pack (p,
id,
"{s:s}",
"attributes.system.foo",
"bar") == 0) {
errprintf (errp,
"flux_jobtap_jobspec_update_id_pack() unexpected success");
return -1;
}

if (current_name && !(copy = strdup (current_name))) {
errprintf (errp, "failed to copy job name");
return -1;
Expand Down Expand Up @@ -199,6 +215,20 @@ static int run_cb (flux_plugin_t *p,
return 0;
}

static void update_msg_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
flux_plugin_t *p = arg;
flux_jobid_t id;
json_t *update;

if (flux_msg_unpack (msg, "{s:I s:o}", "id", &id, "update", &update) < 0
|| flux_jobtap_jobspec_update_id_pack (p, id, "O", update) < 0)
flux_jobtap_raise_exception (p, id, "test", 0, "update failed");
flux_respond (h, msg, NULL);
}

static const struct flux_plugin_handler tab[] = {
{ "job.new", new_cb, NULL },
Expand All @@ -214,6 +244,9 @@ int flux_plugin_init (flux_plugin_t *p)
{
if (flux_plugin_register (p, "jobspec-update", tab) < 0)
return -1;
if (flux_jobtap_service_register (p, "update", update_msg_cb, p) < 0)
flux_log_error (flux_jobtap_get_flux (p),
"flux_jobtap_service_register");
return 0;
}

Expand Down
16 changes: 16 additions & 0 deletions t/t2212-job-manager-plugins.t
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,22 @@ test_expect_success 'job-manager: plugins can update jobspec' '
--match-context=attributes.system.job.name=new \
$jobid jobspec-update
'
test_expect_success 'job-manager: plugin can asynchronously update jobspec' '
flux dmesg -H | grep jobspec-update &&
jobid=$(flux submit -n1 --urgency=hold sleep 0) &&
cat <<-EOF >update-test.py &&
import flux
from flux.job import JobID
id=JobID("$jobid")
flux.Flux().rpc(
"job-manager.jobspec-update.update",
dict(id=id, update={"attributes.system.job.name": "test"}),
).get()
EOF
flux python update-test.py &&
flux job wait-event -Hv -t 30 $jobid jobspec-update &&
flux jobs $jobid
'
test_expect_success 'job-manager: plugin fails to load on config.update error' '
flux jobtap remove all &&
test_must_fail flux jobtap load ${PLUGINPATH}/config.so 2>config.err
Expand Down

0 comments on commit 7d4d63b

Please sign in to comment.