Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobtap: add flux_jobtap_jobspec_update_id_pack() #6500

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/modules/job-manager/jobtap.c
Original file line number Diff line number Diff line change
Expand Up @@ -2263,6 +2263,61 @@
return rc;
}

int flux_jobtap_jobspec_update_id_pack (flux_plugin_t *p,
flux_jobid_t id,
const char *fmt,
...)
{
int rc = -1;
va_list ap;
struct jobtap *jobtap;
struct job *job;
json_error_t error;
json_t *update = NULL;

if (!p
|| !(jobtap = flux_plugin_aux_get (p, "flux::jobtap"))
|| !(job = jobtap_lookup_active_jobid (p, id))
|| job->state == FLUX_JOB_STATE_RUN
|| job->state == FLUX_JOB_STATE_CLEANUP
|| job->eventlog_readonly) {
errno = EINVAL;
return -1;

Check warning on line 2285 in src/modules/job-manager/jobtap.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-manager/jobtap.c#L2284-L2285

Added lines #L2284 - L2285 were not covered by tests
}

/* This interface is only appropriate from outside a jobtap callback,
* i.e. called asynchronously to update a job. If 'job' is equivalent
* to the current job at the top of the jobtap stack, return an error.
*/
if (job == current_job (jobtap)) {
errno = EINVAL;
return -1;
}

va_start (ap, fmt);
update = json_vpack_ex (&error, 0, fmt, ap);
va_end (ap);
if (!update) {
errno = EINVAL;
return -1;

Check warning on line 2302 in src/modules/job-manager/jobtap.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-manager/jobtap.c#L2301-L2302

Added lines #L2301 - L2302 were not covered by tests
}
if (!validate_jobspec_updates (update)) {
errno = EINVAL;
goto out;

Check warning on line 2306 in src/modules/job-manager/jobtap.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-manager/jobtap.c#L2305-L2306

Added lines #L2305 - L2306 were not covered by tests
}
/* XXX: should job.validate be called on these updates before posting?
*/
rc = event_job_post_pack (jobtap->ctx->event,
job,
"jobspec-update",
0,
"O",
update);
out:
ERRNO_SAFE_WRAP (json_decref, update);
return rc;
}

int flux_jobtap_jobspec_update_pack (flux_plugin_t *p, const char *fmt, ...)
{
int rc = -1;
Expand Down
13 changes: 13 additions & 0 deletions src/modules/job-manager/jobtap.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,19 @@ int flux_jobtap_event_post_pack (flux_plugin_t *p,
*/
int flux_jobtap_jobspec_update_pack (flux_plugin_t *p, const char *fmt, ...);

/* Similar to flux_jobtap_jobspec_update_pack(), but asynchronously update
* a specific jobid. This version assumes the job is quiescent, so the event
* is applied immediately.
*
* Returns -1 with errno set to EINVAL for invalid arguments, if the job
* does not exist, if the target job is in RUN, CLEANUP or INACTIVE states,
* or if the function is called from a jobtap callback for the target job.
*/
int flux_jobtap_jobspec_update_id_pack (flux_plugin_t *p,
flux_jobid_t id,
const char *fmt,
...);

/* Return a flux_plugin_arg_t object for a job.
*
* The result can then be unpacked with flux_plugin_arg_unpack(3) to get
Expand Down
35 changes: 34 additions & 1 deletion t/job-manager/plugins/jobspec-update.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ static int get_and_update_jobspec_name (flux_error_t *errp,
const char **cur_namep,
char *name)
{
flux_jobid_t id;
const char *current_name = NULL;
char *copy = NULL;
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:{s:{s?{s?{s?s}}}}}",
"{s:I s:{s:{s?{s?{s?s}}}}}",
"id", &id,
"jobspec",
"attributes",
"system",
Expand All @@ -41,6 +43,20 @@ static int get_and_update_jobspec_name (flux_error_t *errp,
flux_plugin_arg_strerror (args));
return -1;
}

/* flux_jobtap_jobspec_update_id_pack() should fail here, since this
* function is always called in the context of a jobtap callback:
*/
if (flux_jobtap_jobspec_update_id_pack (p,
id,
"{s:s}",
"attributes.system.foo",
"bar") == 0) {
errprintf (errp,
"flux_jobtap_jobspec_update_id_pack() unexpected success");
return -1;
}

if (current_name && !(copy = strdup (current_name))) {
errprintf (errp, "failed to copy job name");
return -1;
Expand Down Expand Up @@ -199,6 +215,20 @@ static int run_cb (flux_plugin_t *p,
return 0;
}

static void update_msg_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
flux_plugin_t *p = arg;
flux_jobid_t id;
json_t *update;

if (flux_msg_unpack (msg, "{s:I s:o}", "id", &id, "update", &update) < 0
|| flux_jobtap_jobspec_update_id_pack (p, id, "O", update) < 0)
flux_jobtap_raise_exception (p, id, "test", 0, "update failed");
flux_respond (h, msg, NULL);
}

static const struct flux_plugin_handler tab[] = {
{ "job.new", new_cb, NULL },
Expand All @@ -214,6 +244,9 @@ int flux_plugin_init (flux_plugin_t *p)
{
if (flux_plugin_register (p, "jobspec-update", tab) < 0)
return -1;
if (flux_jobtap_service_register (p, "update", update_msg_cb, p) < 0)
flux_log_error (flux_jobtap_get_flux (p),
"flux_jobtap_service_register");
return 0;
}

Expand Down
16 changes: 16 additions & 0 deletions t/t2212-job-manager-plugins.t
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,22 @@ test_expect_success 'job-manager: plugins can update jobspec' '
--match-context=attributes.system.job.name=new \
$jobid jobspec-update
'
test_expect_success 'job-manager: plugin can asynchronously update jobspec' '
flux dmesg -H | grep jobspec-update &&
jobid=$(flux submit -n1 --urgency=hold sleep 0) &&
cat <<-EOF >update-test.py &&
import flux
from flux.job import JobID
id=JobID("$jobid")
flux.Flux().rpc(
"job-manager.jobspec-update.update",
dict(id=id, update={"attributes.system.job.name": "test"}),
).get()
EOF
flux python update-test.py &&
flux job wait-event -Hv -t 30 $jobid jobspec-update &&
flux jobs $jobid
'
test_expect_success 'job-manager: plugin fails to load on config.update error' '
flux jobtap remove all &&
test_must_fail flux jobtap load ${PLUGINPATH}/config.so 2>config.err
Expand Down
Loading