diff --git a/pkg/apis/sedna/v1alpha1/jointinferenceservice_types.go b/pkg/apis/sedna/v1alpha1/jointinferenceservice_types.go index 1a9caad27..79cc062e0 100644 --- a/pkg/apis/sedna/v1alpha1/jointinferenceservice_types.go +++ b/pkg/apis/sedna/v1alpha1/jointinferenceservice_types.go @@ -17,6 +17,7 @@ limitations under the License. package v1alpha1 import ( + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -40,6 +41,7 @@ type JointInferenceService struct { type JointInferenceServiceSpec struct { EdgeWorker EdgeWorker `json:"edgeWorker"` CloudWorker CloudWorker `json:"cloudWorker"` + appsv1.DeploymentSpec `json:",inline"` } // EdgeWorker describes the data a edge worker should have diff --git a/pkg/globalmanager/controllers/jointinference/jointinferenceservice.go b/pkg/globalmanager/controllers/jointinference/jointinferenceservice.go index 39a1f310d..7bf8a1687 100644 --- a/pkg/globalmanager/controllers/jointinference/jointinferenceservice.go +++ b/pkg/globalmanager/controllers/jointinference/jointinferenceservice.go @@ -23,16 +23,17 @@ import ( "strconv" "time" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilrand "k8s.io/apimachinery/pkg/util/rand" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + appslisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -80,6 +81,11 @@ type Controller struct { // A store of service serviceLister sednav1listers.JointInferenceServiceLister + //deploymentStoreSynced returns true if the deployment store has been synced at least once. + deploymentStoreSynced cache.InformerSynced + // A store of deployment + deploymentsLister appslisters.DeploymentLister + // JointInferenceServices that need to be updated queue workqueue.RateLimitingInterface @@ -274,6 +280,7 @@ func (c *Controller) sync(key string) (bool, error) { selector, _ := runtime.GenerateSelector(&service) pods, err := c.podStore.Pods(service.Namespace).List(selector) + deployments, err := c.deploymentsLister.Deployments(service.Namespace).List(selector) if err != nil { return false, err @@ -283,18 +290,26 @@ func (c *Controller) sync(key string) (bool, error) { latestConditionLen := len(service.Status.Conditions) - active := runtime.CalcActivePodCount(pods) - var failed int32 = 0 + activePods := runtime.CalcActivePodCount(pods) + activeDeployments := runtime.CalcActiveDeploymentCount(deployments) + activeCloudPod := false + activeCloudDeployment := false + activeEdgePod := false + activeEdgeDeployment := false + + var failedPods, failedDeployments int32 = 0, 0 // neededCounts means that two pods should be created successfully in a jointinference service currently - // two pods consist of edge pod and cloud pod - var neededCounts int32 = 2 + // two deployments consist of edge pod and cloud pod + var neededPodCounts int32 = 2 + var neededDeploymentCounts int32 = 2 if service.Status.StartTime == nil { now := metav1.Now() service.Status.StartTime = &now } else { - failed = neededCounts - active + failedPods = neededPodCounts - activePods + failedDeployments = neededDeploymentCounts - activeDeployments } var manageServiceErr error @@ -313,22 +328,28 @@ func (c *Controller) sync(key string) (bool, error) { var reason string var message string - if failed > 0 { + if failedPods > 0 || failedDeployments > 0 { serviceFailed = true - // TODO: get the failed worker, and knows that which worker fails, edge inference worker or cloud inference worker - reason = "workerFailed" - message = "the worker of service failed" + if (!activeCloudPod) || (!activeCloudDeployment) { + reason = "cloudWorkerFailed\n" + message = "the cloud worker of service failed\n" + } + if (!activeEdgePod) || (!activeEdgeDeployment) { + reason += "edgeWorkerFailed\n" + message += "the edge worker of service failed\n" + } newCondtionType = sednav1.JointInferenceServiceCondFailed c.recorder.Event(&service, v1.EventTypeWarning, reason, message) } else { if len(pods) == 0 { - active, manageServiceErr = c.createWorkers(&service) + activePods, activeDeployments, manageServiceErr = c.createWorkers(&service, &activeCloudPod, &activeCloudDeployment, &activeEdgePod, &activeEdgeDeployment) } if manageServiceErr != nil { serviceFailed = true message = error.Error(manageServiceErr) newCondtionType = sednav1.JointInferenceServiceCondFailed - failed = neededCounts - active + failedPods = neededPodCounts - activePods + failedDeployments = neededDeploymentCounts - activeDeployments } else { // TODO: handle the case that the pod phase is PodSucceeded newCondtionType = sednav1.JointInferenceServiceCondRunning @@ -341,10 +362,13 @@ func (c *Controller) sync(key string) (bool, error) { } forget := false + // calculate the number of active pods and deployments + active := activePods + activeDeployments + failed := failedPods + failedDeployments // no need to update the jointinferenceservice if the status hasn't changed since last time if service.Status.Active != active || service.Status.Failed != failed || len(service.Status.Conditions) != latestConditionLen { - service.Status.Active = active - service.Status.Failed = failed + service.Status.Active = activePods + service.Status.Failed = failedPods if err := c.updateStatus(&service); err != nil { return forget, err @@ -395,35 +419,35 @@ func isServiceFinished(j *sednav1.JointInferenceService) bool { return false } -func (c *Controller) createWorkers(service *sednav1.JointInferenceService) (active int32, err error) { - active = 0 - +func (c *Controller) createWorkers(service *sednav1.JointInferenceService, activeCloudPod *bool, activeCloudDeployment *bool, activeEdgePod *bool, activeEdgeDeployment *bool) (activePods, activeDeployments int32, err error) { var bigModelPort int32 = BigModelPort // create cloud worker - err = c.createCloudWorker(service, bigModelPort) + err = c.createCloudWorker(service, bigModelPort, activeCloudPod, activeCloudDeployment) if err != nil { - return active, err + return activePods, activeDeployments, fmt.Errorf("failed to create cloudWorker: %w", err) } - active++ + activePods++ + activeDeployments++ // create k8s service for cloudPod bigModelHost, err := runtime.CreateEdgeMeshService(c.kubeClient, service, jointInferenceForCloud, bigModelPort) if err != nil { - return active, err + return activePods, activeDeployments, fmt.Errorf("failed to create edgemesh service: %w", err) } // create edge worker - err = c.createEdgeWorker(service, bigModelHost, bigModelPort) + err = c.createEdgeWorker(service, bigModelHost, bigModelPort, activeEdgePod, activeEdgeDeployment) if err != nil { - return active, err + return activePods, activeDeployments, fmt.Errorf("failed to create edgeWorker: %w", err) } - active++ + activePods++ + activeDeployments++ - return active, err + return activePods, activeDeployments, err } -func (c *Controller) createCloudWorker(service *sednav1.JointInferenceService, bigModelPort int32) error { - // deliver pod for cloudworker +func (c *Controller) createCloudWorker(service *sednav1.JointInferenceService, bigModelPort int32, activeCloudPod *bool, activeCloudDeployment *bool) error { + // deliver deployment for cloudworker cloudModelName := service.Spec.CloudWorker.Model.Name cloudModel, err := c.client.Models(service.Namespace).Get(context.Background(), cloudModelName, metav1.GetOptions{}) if err != nil { @@ -449,25 +473,36 @@ func (c *Controller) createCloudWorker(service *sednav1.JointInferenceService, b }) workerParam.Env = map[string]string{ - "NAMESPACE": service.Namespace, - "SERVICE_NAME": service.Name, - "WORKER_NAME": "cloudworker-" + utilrand.String(5), - + "NAMESPACE": service.Namespace, + "SERVICE_NAME": service.Name, + "WORKER_NAME": "cloudworker", "BIG_MODEL_BIND_PORT": strconv.Itoa(int(bigModelPort)), } workerParam.WorkerType = jointInferenceForCloud - // create cloud pod - _, err = runtime.CreatePodWithTemplate(c.kubeClient, + cloudWorkerDeployment := &appsv1.DeploymentSpec{ + Template: service.Spec.CloudWorker.Template, + } + // Create cloudWorker deployment AND related pods (as part of the deployment creation) + _, err = runtime.CreateDeploymentWithTemplate(c.kubeClient, service, - &service.Spec.CloudWorker.Template, - &workerParam) - return err + cloudWorkerDeployment, + &workerParam, + bigModelPort, + ) + if err != nil { + return fmt.Errorf("failed to create cloudWorker deployment: %w", err) + } + + *activeCloudDeployment = true + *activeCloudPod = true + + return nil } -func (c *Controller) createEdgeWorker(service *sednav1.JointInferenceService, bigModelHost string, bigModelPort int32) error { - // deliver pod for edgeworker +func (c *Controller) createEdgeWorker(service *sednav1.JointInferenceService, bigModelHost string, bigModelPort int32, activeEdgePod *bool, activeEdgeDeployment *bool) error { + // deliver edge deployment for edgeworker ctx := context.Background() edgeModelName := service.Spec.EdgeWorker.Model.Name edgeModel, err := c.client.Models(service.Namespace).Get(ctx, edgeModelName, metav1.GetOptions{}) @@ -501,7 +536,7 @@ func (c *Controller) createEdgeWorker(service *sednav1.JointInferenceService, bi workerParam.Env = map[string]string{ "NAMESPACE": service.Namespace, "SERVICE_NAME": service.Name, - "WORKER_NAME": "edgeworker-" + utilrand.String(5), + "WORKER_NAME": "edgeworker", "BIG_MODEL_IP": bigModelHost, "BIG_MODEL_PORT": strconv.Itoa(int(bigModelPort)), @@ -515,12 +550,24 @@ func (c *Controller) createEdgeWorker(service *sednav1.JointInferenceService, bi workerParam.WorkerType = jointInferenceForEdge workerParam.HostNetwork = true + edgeWorkerDeployment := &appsv1.DeploymentSpec{ + Template: service.Spec.EdgeWorker.Template, + } + // create edge pod - _, err = runtime.CreatePodWithTemplate(c.kubeClient, + _, err = runtime.CreateDeploymentWithTemplate(c.kubeClient, service, - &service.Spec.EdgeWorker.Template, - &workerParam) - return err + edgeWorkerDeployment, + &workerParam, + bigModelPort, + ) + if err != nil { + return fmt.Errorf("failed to create edgeWorker deployment: %w", err) + } + + *activeEdgeDeployment = true + *activeEdgePod = true + return nil } // New creates a new JointInferenceService controller that keeps the relevant pods @@ -532,6 +579,8 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { serviceInformer := cc.SednaInformerFactory.Sedna().V1alpha1().JointInferenceServices() + deploymentInformer := cc.KubeInformerFactory.Apps().V1().Deployments() + eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.KubeClient.CoreV1().Events("")}) @@ -573,5 +622,73 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) { jc.podStore = podInformer.Lister() jc.podStoreSynced = podInformer.Informer().HasSynced + deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: jc.addDeployment, + UpdateFunc: jc.updateDeployment, + DeleteFunc: jc.deleteDeployment, + }) + + jc.deploymentsLister = deploymentInformer.Lister() + jc.deploymentStoreSynced = deploymentInformer.Informer().HasSynced + return jc, nil } + +func (c *Controller) addDeployment(obj interface{}) { + deployment := obj.(*appsv1.Deployment) + c.enqueueByDeployment(deployment) +} + +// deleteDeployment enqueues the JointInferenceService obj When a deleteDeployment is deleted +func (c *Controller) deleteDeployment(obj interface{}) { + deployment, ok := obj.(*appsv1.Deployment) + + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.Warningf("couldn't get object from tombstone %+v", obj) + return + } + deployment, ok = tombstone.Obj.(*appsv1.Deployment) + if !ok { + klog.Warningf("tombstone contained object that is not a Deployment %+v", obj) + return + } + } + c.enqueueByDeployment(deployment) +} + +func (c *Controller) updateDeployment(old, cur interface{}) { + oldDeployment := old.(*appsv1.Deployment) + curDeployment := cur.(*appsv1.Deployment) + // no deployment update, no queue + if curDeployment.ResourceVersion == oldDeployment.ResourceVersion { + return + } + c.addDeployment(curDeployment) +} + +func (c *Controller) enqueueByDeployment(deployment *appsv1.Deployment) { + controllerRef := metav1.GetControllerOf(deployment) + + klog.Infof("Deployment enqueued %v", deployment.Kind) + + if controllerRef == nil { + return + } + + if controllerRef.Kind != Kind.Kind { + return + } + + service, err := c.serviceLister.JointInferenceServices(deployment.Namespace).Get(deployment.Name) + if err != nil { + return + } + + if service.UID != controllerRef.UID { + return + } + + c.enqueueController(service, true) +}