-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(backend): Add Semaphore and Mutex fields to Workflow CR #11370
base: master
Are you sure you want to change the base?
Conversation
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
5c420db
to
449cdda
Compare
add |
We should edit the kubeflow manifest to deploy a skeleton of this configmap. You can do that in here or in a follow-up PR. |
I would probably delete this line (and maybe edit the PR title), because that reads like things you enhanced on Workflow itself. We're just setting fields on it... |
What does Argo Workflows do when both are set? A better verification would be to do two separate test pipelines -- one where you use mutex, and one where you use semaphore. And then in addition to verifying the Workflow yaml, also verify that multiple runs are being locked like they should be. |
@@ -77,9 +77,20 @@ func (t *V2Spec) ScheduledWorkflow(modelJob *model.Job) (*scheduledworkflow.Sche | |||
} | |||
} | |||
|
|||
var pipeline_options argocompiler.Options | |||
for _, platform := range t.platformSpec.Platforms { | |||
if platform.PipelineConfig.SemaphoreKey != "" || platform.PipelineConfig.MutexName != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We should specifically check for the "kubernetes" platform here. There could be others, and we would ignore all others
- Check that if you put some other non-kubernetes platform in there that this line doesn't cause a panic. I don't think it will, but just check :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. It now iterates over the platformSpec.Platforms
map and adds a check to ensure that only the "kubernetes" platform is processed. It also includes the nil checks for platform and platform.PipelineConfig
to prevent nil pointer dereference errors.
if platform.PipelineConfig.SemaphoreKey != "" || platform.PipelineConfig.MutexName != "" { | ||
pipeline_options = argocompiler.Options{ | ||
SemaphoreKey: platform.PipelineConfig.SemaphoreKey, | ||
MutexName: platform.PipelineConfig.MutexName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add these individually, and only if they are specified in the IR. Users will typically only add one or the other. (So don't add mutex if the user only wants a semaphore, and vice versa.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, added these individually.
@@ -300,9 +311,20 @@ func (t *V2Spec) RunWorkflow(modelRun *model.Run, options RunWorkflowOptions) (u | |||
} | |||
} | |||
|
|||
var pipeline_options *argocompiler.Options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks copied from above. Is there any way to reuse it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is copied, but the context is different.
First one, argocompiler.Options
is a non pointer value that represents a copy of the actual data. Second one, *argocompiler.Options
is a pointer that stores the memory address of the value.
I thought over how this code could be put into a function and called in twice, without having to reuse it. Here's a pseudo code I was thinking might work:
func setPipelineOptions(platformSpec map[string]*PlatformSpec, pipelineOptions interface{}) {
for key, platform := range platformSpec {
if key == "kubernetes" && platform != nil && platform.PipelineConfig != nil {
if platform.PipelineConfig.SemaphoreKey != "" {
SemaphoreKey = platform.PipelineConfig.SemaphoreKey
}
if platform.PipelineConfig.MutexName != "" {
MutexName = platform.PipelineConfig.MutexName
}
break
}
}
}
and then just calling it each time with the pointer and the non-pointer. But it gets slightly complicated with passing a pointer into a function (needs to be de-referenced).
However, there's a pattern of repetition of the same code between pointer and non-pointer references in this file. For instance here, and here.
Which is why I would suggest keeping this repetition as is, and we can later refactor this file to remove repeated code by putting them into functions. This could be a separate task out of the scope of this PR/task.
Signed-off-by: ddalvi <[email protected]>
449cdda
to
497bc6b
Compare
/lgtm |
Resolves #6553
Description of your changes:
This PR introduces support for Pipeline-level
Semaphores
andMutexes
in the KFP backend.Changes Introduced:
Added the ability to specify a semaphore for pipelines, which controls the number of concurrent instances of a pipeline that can run. The semaphore is configured via a fixed ConfigMap named
semaphore-config
. The semaphore key is provided through the pipeline configuration.Added mutex support for pipelines, ensuring that only one instance of the pipeline can run at a time if the specified mutex is defined. Mutex names are defined per pipeline, and each pipeline instance respects the specified mutex.
The Workflow CR now includes a
Synchronization
field, wheresemaphore
andmutex
are appropriately set.If a pipeline has a semaphore, the backend maps the semaphore to the
semaphore-config
ConfigMap using the key provided by the user. Mutexes are represented by their name, ensuring mutual exclusion.This PR should be merged only after #11340 gets merged.
Testing instructions
Build the API Server image and push to an image registry
Upload main.yaml file from here
Check in KFP UI Pipeline Spec tab if the following snippet is present:
Checklist: