Skip to content

Commit

Permalink
Enable the joint inference service to have self-healing ability
Browse files Browse the repository at this point in the history
create deployment resources to enable pod to rebuild after manual deletion.
accomplish TODO: get the failed worker and know which worker fails, edge inference worker or cloud inference worker.

Signed-off-by: shemol <[email protected]>
  • Loading branch information
SherlockShemol committed Aug 13, 2024
1 parent f75ebbe commit 8949300
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 43 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/sedna/v1alpha1/jointinferenceservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1

import (
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -40,6 +41,7 @@ type JointInferenceService struct {
type JointInferenceServiceSpec struct {
EdgeWorker EdgeWorker `json:"edgeWorker"`
CloudWorker CloudWorker `json:"cloudWorker"`
appsv1.DeploymentSpec `json:",inline"`
}

// EdgeWorker describes the data a edge worker should have
Expand Down
203 changes: 160 additions & 43 deletions pkg/globalmanager/controllers/jointinference/jointinferenceservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ import (
"strconv"
"time"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilrand "k8s.io/apimachinery/pkg/util/rand"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -80,6 +81,11 @@ type Controller struct {
// A store of service
serviceLister sednav1listers.JointInferenceServiceLister

//deploymentStoreSynced returns true if the deployment store has been synced at least once.
deploymentStoreSynced cache.InformerSynced
// A store of deployment
deploymentsLister appslisters.DeploymentLister

// JointInferenceServices that need to be updated
queue workqueue.RateLimitingInterface

Expand Down Expand Up @@ -274,6 +280,7 @@ func (c *Controller) sync(key string) (bool, error) {

selector, _ := runtime.GenerateSelector(&service)
pods, err := c.podStore.Pods(service.Namespace).List(selector)
deployments, err := c.deploymentsLister.Deployments(service.Namespace).List(selector)

if err != nil {
return false, err
Expand All @@ -283,18 +290,26 @@ func (c *Controller) sync(key string) (bool, error) {

latestConditionLen := len(service.Status.Conditions)

active := runtime.CalcActivePodCount(pods)
var failed int32 = 0
activePods := runtime.CalcActivePodCount(pods)
activeDeployments := runtime.CalcActiveDeploymentCount(deployments)
activeCloudPod := false
activeCloudDeployment := false
activeEdgePod := false
activeEdgeDeployment := false

var failedPods, failedDeployments int32 = 0, 0

// neededCounts means that two pods should be created successfully in a jointinference service currently
// two pods consist of edge pod and cloud pod
var neededCounts int32 = 2
// two deployments consist of edge pod and cloud pod
var neededPodCounts int32 = 2
var neededDeploymentCounts int32 = 2

if service.Status.StartTime == nil {
now := metav1.Now()
service.Status.StartTime = &now
} else {
failed = neededCounts - active
failedPods = neededPodCounts - activePods
failedDeployments = neededDeploymentCounts - activeDeployments
}

var manageServiceErr error
Expand All @@ -313,22 +328,28 @@ func (c *Controller) sync(key string) (bool, error) {
var reason string
var message string

if failed > 0 {
if failedPods > 0 || failedDeployments > 0 {
serviceFailed = true
// TODO: get the failed worker, and knows that which worker fails, edge inference worker or cloud inference worker
reason = "workerFailed"
message = "the worker of service failed"
if (!activeCloudPod) || (!activeCloudDeployment) {
reason = "cloudWorkerFailed\n"
message = "the cloud worker of service failed\n"
}
if (!activeEdgePod) || (!activeEdgeDeployment) {
reason += "edgeWorkerFailed\n"
message += "the edge worker of service failed\n"
}
newCondtionType = sednav1.JointInferenceServiceCondFailed
c.recorder.Event(&service, v1.EventTypeWarning, reason, message)
} else {
if len(pods) == 0 {
active, manageServiceErr = c.createWorkers(&service)
activePods, activeDeployments, manageServiceErr = c.createWorkers(&service, &activeCloudPod, &activeCloudDeployment, &activeEdgePod, &activeEdgeDeployment)
}
if manageServiceErr != nil {
serviceFailed = true
message = error.Error(manageServiceErr)
newCondtionType = sednav1.JointInferenceServiceCondFailed
failed = neededCounts - active
failedPods = neededPodCounts - activePods
failedDeployments = neededDeploymentCounts - activeDeployments
} else {
// TODO: handle the case that the pod phase is PodSucceeded
newCondtionType = sednav1.JointInferenceServiceCondRunning
Expand All @@ -341,10 +362,13 @@ func (c *Controller) sync(key string) (bool, error) {
}
forget := false

// calculate the number of active pods and deployments
active := activePods + activeDeployments
failed := failedPods + failedDeployments
// no need to update the jointinferenceservice if the status hasn't changed since last time
if service.Status.Active != active || service.Status.Failed != failed || len(service.Status.Conditions) != latestConditionLen {
service.Status.Active = active
service.Status.Failed = failed
service.Status.Active = activePods
service.Status.Failed = failedPods

if err := c.updateStatus(&service); err != nil {
return forget, err
Expand Down Expand Up @@ -395,35 +419,35 @@ func isServiceFinished(j *sednav1.JointInferenceService) bool {
return false
}

func (c *Controller) createWorkers(service *sednav1.JointInferenceService) (active int32, err error) {
active = 0

func (c *Controller) createWorkers(service *sednav1.JointInferenceService, activeCloudPod *bool, activeCloudDeployment *bool, activeEdgePod *bool, activeEdgeDeployment *bool) (activePods, activeDeployments int32, err error) {
var bigModelPort int32 = BigModelPort
// create cloud worker
err = c.createCloudWorker(service, bigModelPort)
err = c.createCloudWorker(service, bigModelPort, activeCloudPod, activeCloudDeployment)
if err != nil {
return active, err
return activePods, activeDeployments, fmt.Errorf("failed to create cloudWorker: %w", err)
}
active++
activePods++
activeDeployments++

// create k8s service for cloudPod
bigModelHost, err := runtime.CreateEdgeMeshService(c.kubeClient, service, jointInferenceForCloud, bigModelPort)
if err != nil {
return active, err
return activePods, activeDeployments, fmt.Errorf("failed to create edgemesh service: %w", err)
}

// create edge worker
err = c.createEdgeWorker(service, bigModelHost, bigModelPort)
err = c.createEdgeWorker(service, bigModelHost, bigModelPort, activeEdgePod, activeEdgeDeployment)
if err != nil {
return active, err
return activePods, activeDeployments, fmt.Errorf("failed to create edgeWorker: %w", err)
}
active++
activePods++
activeDeployments++

return active, err
return activePods, activeDeployments, err
}

func (c *Controller) createCloudWorker(service *sednav1.JointInferenceService, bigModelPort int32) error {
// deliver pod for cloudworker
func (c *Controller) createCloudWorker(service *sednav1.JointInferenceService, bigModelPort int32, activeCloudPod *bool, activeCloudDeployment *bool) error {
// deliver deployment for cloudworker
cloudModelName := service.Spec.CloudWorker.Model.Name
cloudModel, err := c.client.Models(service.Namespace).Get(context.Background(), cloudModelName, metav1.GetOptions{})
if err != nil {
Expand All @@ -449,25 +473,36 @@ func (c *Controller) createCloudWorker(service *sednav1.JointInferenceService, b
})

workerParam.Env = map[string]string{
"NAMESPACE": service.Namespace,
"SERVICE_NAME": service.Name,
"WORKER_NAME": "cloudworker-" + utilrand.String(5),

"NAMESPACE": service.Namespace,
"SERVICE_NAME": service.Name,
"WORKER_NAME": "cloudworker",
"BIG_MODEL_BIND_PORT": strconv.Itoa(int(bigModelPort)),
}

workerParam.WorkerType = jointInferenceForCloud

// create cloud pod
_, err = runtime.CreatePodWithTemplate(c.kubeClient,
cloudWorkerDeployment := &appsv1.DeploymentSpec{
Template: service.Spec.CloudWorker.Template,
}
// Create cloudWorker deployment AND related pods (as part of the deployment creation)
_, err = runtime.CreateDeploymentWithTemplate(c.kubeClient,
service,
&service.Spec.CloudWorker.Template,
&workerParam)
return err
cloudWorkerDeployment,
&workerParam,
bigModelPort,
)
if err != nil {
return fmt.Errorf("failed to create cloudWorker deployment: %w", err)
}

*activeCloudDeployment = true
*activeCloudPod = true

return nil
}

func (c *Controller) createEdgeWorker(service *sednav1.JointInferenceService, bigModelHost string, bigModelPort int32) error {
// deliver pod for edgeworker
func (c *Controller) createEdgeWorker(service *sednav1.JointInferenceService, bigModelHost string, bigModelPort int32, activeEdgePod *bool, activeEdgeDeployment *bool) error {
// deliver edge deployment for edgeworker
ctx := context.Background()
edgeModelName := service.Spec.EdgeWorker.Model.Name
edgeModel, err := c.client.Models(service.Namespace).Get(ctx, edgeModelName, metav1.GetOptions{})
Expand Down Expand Up @@ -501,7 +536,7 @@ func (c *Controller) createEdgeWorker(service *sednav1.JointInferenceService, bi
workerParam.Env = map[string]string{
"NAMESPACE": service.Namespace,
"SERVICE_NAME": service.Name,
"WORKER_NAME": "edgeworker-" + utilrand.String(5),
"WORKER_NAME": "edgeworker",

"BIG_MODEL_IP": bigModelHost,
"BIG_MODEL_PORT": strconv.Itoa(int(bigModelPort)),
Expand All @@ -515,12 +550,24 @@ func (c *Controller) createEdgeWorker(service *sednav1.JointInferenceService, bi
workerParam.WorkerType = jointInferenceForEdge
workerParam.HostNetwork = true

edgeWorkerDeployment := &appsv1.DeploymentSpec{
Template: service.Spec.EdgeWorker.Template,
}

// create edge pod
_, err = runtime.CreatePodWithTemplate(c.kubeClient,
_, err = runtime.CreateDeploymentWithTemplate(c.kubeClient,
service,
&service.Spec.EdgeWorker.Template,
&workerParam)
return err
edgeWorkerDeployment,
&workerParam,
bigModelPort,
)
if err != nil {
return fmt.Errorf("failed to create edgeWorker deployment: %w", err)
}

*activeEdgeDeployment = true
*activeEdgePod = true
return nil
}

// New creates a new JointInferenceService controller that keeps the relevant pods
Expand All @@ -532,6 +579,8 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) {

serviceInformer := cc.SednaInformerFactory.Sedna().V1alpha1().JointInferenceServices()

deploymentInformer := cc.KubeInformerFactory.Apps().V1().Deployments()

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.KubeClient.CoreV1().Events("")})

Expand Down Expand Up @@ -573,5 +622,73 @@ func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) {
jc.podStore = podInformer.Lister()
jc.podStoreSynced = podInformer.Informer().HasSynced

deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jc.addDeployment,
UpdateFunc: jc.updateDeployment,
DeleteFunc: jc.deleteDeployment,
})

jc.deploymentsLister = deploymentInformer.Lister()
jc.deploymentStoreSynced = deploymentInformer.Informer().HasSynced

return jc, nil
}

func (c *Controller) addDeployment(obj interface{}) {
deployment := obj.(*appsv1.Deployment)
c.enqueueByDeployment(deployment)
}

// deleteDeployment enqueues the JointInferenceService obj When a deleteDeployment is deleted
func (c *Controller) deleteDeployment(obj interface{}) {
deployment, ok := obj.(*appsv1.Deployment)

if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Warningf("couldn't get object from tombstone %+v", obj)
return
}
deployment, ok = tombstone.Obj.(*appsv1.Deployment)
if !ok {
klog.Warningf("tombstone contained object that is not a Deployment %+v", obj)
return
}
}
c.enqueueByDeployment(deployment)
}

func (c *Controller) updateDeployment(old, cur interface{}) {
oldDeployment := old.(*appsv1.Deployment)
curDeployment := cur.(*appsv1.Deployment)
// no deployment update, no queue
if curDeployment.ResourceVersion == oldDeployment.ResourceVersion {
return
}
c.addDeployment(curDeployment)
}

func (c *Controller) enqueueByDeployment(deployment *appsv1.Deployment) {
controllerRef := metav1.GetControllerOf(deployment)

klog.Infof("Deployment enqueued %v", deployment.Kind)

if controllerRef == nil {
return
}

if controllerRef.Kind != Kind.Kind {
return
}

service, err := c.serviceLister.JointInferenceServices(deployment.Namespace).Get(deployment.Name)
if err != nil {
return
}

if service.UID != controllerRef.UID {
return
}

c.enqueueController(service, true)
}

0 comments on commit 8949300

Please sign in to comment.