diff --git a/docs/proposals/images/fl-crdupdate-solution.png b/docs/proposals/images/fl-crdupdate-solution.png new file mode 100644 index 000000000..3057d906d Binary files /dev/null and b/docs/proposals/images/fl-crdupdate-solution.png differ diff --git a/docs/proposals/images/fl-recreate-solution.png b/docs/proposals/images/fl-recreate-solution.png new file mode 100644 index 000000000..e3c1b2aed Binary files /dev/null and b/docs/proposals/images/fl-recreate-solution.png differ diff --git a/docs/proposals/images/informer-workflow.png b/docs/proposals/images/informer-workflow.png deleted file mode 100644 index 4b3a0e5f3..000000000 Binary files a/docs/proposals/images/informer-workflow.png and /dev/null differ diff --git a/docs/proposals/images/ji-crdupdate-solution.png b/docs/proposals/images/ji-crdupdate-solution.png new file mode 100644 index 000000000..e0daa4c72 Binary files /dev/null and b/docs/proposals/images/ji-crdupdate-solution.png differ diff --git a/docs/proposals/images/ji-deployment-process.png b/docs/proposals/images/ji-deployment-process.png new file mode 100644 index 000000000..0e965848e Binary files /dev/null and b/docs/proposals/images/ji-deployment-process.png differ diff --git a/docs/proposals/images/ji-restartworker.png b/docs/proposals/images/ji-restartworker.png deleted file mode 100644 index 55dcafb65..000000000 Binary files a/docs/proposals/images/ji-restartworker.png and /dev/null differ diff --git a/docs/proposals/images/jointinference-pod-reconstruction.png b/docs/proposals/images/jointinference-pod-reconstruction.png deleted file mode 100644 index 9217569b0..000000000 Binary files a/docs/proposals/images/jointinference-pod-reconstruction.png and /dev/null differ diff --git a/docs/proposals/images/k8s-controller.png b/docs/proposals/images/k8s-controller.png new file mode 100644 index 000000000..321bffbd3 Binary files /dev/null and b/docs/proposals/images/k8s-controller.png differ diff --git a/docs/proposals/images/k8s-informer.png b/docs/proposals/images/k8s-informer.png new file mode 100644 index 000000000..e2f7cf63d Binary files /dev/null and b/docs/proposals/images/k8s-informer.png differ diff --git a/docs/proposals/images/replicaset-managereplicas.png b/docs/proposals/images/replicaset-managereplicas.png deleted file mode 100644 index fcd9d999a..000000000 Binary files a/docs/proposals/images/replicaset-managereplicas.png and /dev/null differ diff --git a/docs/proposals/sedna-controller-enhancement.md b/docs/proposals/sedna-controller-enhancement.md index 9e89c9289..7cc92f135 100644 --- a/docs/proposals/sedna-controller-enhancement.md +++ b/docs/proposals/sedna-controller-enhancement.md @@ -1,19 +1,34 @@ - [Goals](#goals) - [Proposal](#proposal) - * [Scope](#scope) - * [Content](#content) - * [Target Users](#target-users) + - [Scope](#scope) + - [Content](#content) + - [Target Users](#target-users) - [Design Details](#design-details) - * [Garbage Collection](#garbage-collection) - + [Cascade Deletion in Kubernetes](#cascade-deletion-in-kubernetes) - + [Sedna Controller Garbage Collection](#sedna-controller-garbage-collection) + - [Garbage Collection](#garbage-collection) + - [Cascade Deletion in Kubernetes](#cascade-deletion-in-kubernetes) + - [Sedna Controller Garbage Collection](#sedna-controller-garbage-collection) - [Setting Owner References in Joint Inference Controller](#setting-owner-references-in-joint-inference-controller) - * [Pod Reconstruction](#pod-reconstruction) - + [Method for Controlling Pod Count in ReplicaSet](#method-for-controlling-pod-count-in-replicaset) - + [k8s Informer Mechanism](#k8s-informer-mechanism) + - [Pod Reconstruction](#pod-reconstruction) + - [K8S Informer Mechanism](#k8s-informer-mechanism) - [Informer Workflow](#informer-workflow) - + [Joint Inference and Federated Learning Controller Pod Deletion and Reconstruction Design](#joint-inference-and-federated-learning-controller-pod-deletion-and-reconstruction-design) -- [Roadmap](#roadmap) + - [Informer Workflow for Joint Inference and Federated Learning Controllers](#informer-workflow-for-joint-inference-and-federated-learning-controllers) + - [Design of the Pod Reconstruction Solution for Federated Learning](#design-of-the-pod-reconstruction-solution-for-federated-learning) + - [Code Logic](#code-logic) + - [Design of the Pod Reconstruction Solution for Joint Inference](#design-of-the-pod-reconstruction-solution-for-joint-inference) + - [Federated learning modifies the CRD to update the pod](#federated-learning-modifies-the-crd-to-update-the-pod) + - [Code Logic](#code-logic-1) + - [Joint Inference modifies the CRD to update the pod](#joint-inference-modifies-the-crd-to-update-the-pod) + - [Code Logic](#code-logic-2) +- [Testing](#testing) + - [Federated Learning Unit Tests](#federated-learning-unit-tests) + - [`Test_deletePod()`](#test_deletepod) + - [Delete the existing pod and recreate it](#delete-the-existing-pod-and-recreate-it) + - [Delete non-existent pods](#delete-non-existent-pods) + - [`Test_updateJob()`](#test_updatejob) + - [Joint Inference Unit Tests](#joint-inference-unit-tests) + - [`Test_UpdateService()`](#test_updateservice) + + Sedna leverages KubeEdge's edge-cloud collaboration capabilities to enable cross-edge-cloud collaborative training and inference. Joint inference and federated learning are essential applications of Sedna's edge-cloud collaborative inference. However, the current Sedna project has several functional deficiencies in its joint inference and federated learning controllers, primarily reflected in the following aspects: @@ -23,18 +38,29 @@ Sedna leverages KubeEdge's edge-cloud collaboration capabilities to enable cross This proposal aims to optimize the joint inference and federated learning controllers to improve task reconstruction and cascade deletion, thereby enhancing the controllers' functionality and application efficiency. # Goals + The goals of the optimization of Sedna's joint inference and federated learning controllers optimization are to: 1. Implement cascade deletion for joint inference and federated learning controllers to ensure that task instances and service configurations are synchronously deleted when the tasks are removed, preventing conflicts in subsequent task creation. 2. Enhance the self-healing capabilities of joint inference and federated learning tasks to ensure that task instances are automatically rebuilt and resume normal operation after failure or manual deletion. +3. Modify the Joint Inference and Federated Learning CRD to enable automatic updates of pod configurations. # Proposal ## Scope -Sedna joint inference and federated learning controller optimization only modifies the joint inference and federated learning controller codes. The controllers are decoupled from each other, and the function interface and library functions are not modified, so it will not affect the functions of other controllers. For existing application cases, since the CRD is not modified, the deployment of existing applications will not be affected. -Specific modifications are limited to the files `/pkg/globalmanager/controllers/jointinference/jointinferenceservice.go` and `pkg/globalmanager/controllers/federatedlearning/federatedlearningjob.go`. +The optimization of the Sedna Joint Inference and Federated Learning controllers only modifies the Joint Inference and Federated Learning controller code. The controllers are decoupled from each other, and the functions that were changed have been updated in the affected controllers as well. Since no changes were made to the CRD, the deployment of existing applications will not be impacted. + +The main changes are in the files `/pkg/globalmanager/controllers/jointinference/jointinferenceservice.go` and `/pkg/globalmanager/controllers/federatedlearning/federatedlearningjob.go`. + +Additionally, in the file `/pkg/globalmanager/runtime/worker.go`, to meet the need for creating a `deployment` in joint inference, we modified the `CreateDeploymentWithTemplate` function and the `injectDeploymentParam`. The necessary updates were also made in the files`/pkg/globalmanager/controllers/objectsearch/objectsearchservice.go` and `/pkg/globalmanager/controllers/featureextraction/featureextractionservice.go` where these functions are used. We also added a new function, `UpdateDeploymentWithTemplate`, to handle deployment updates. + +The test files are `/pkg/globalmanager/controllers/jointinference/jointinferenceservice_test.go` and `/pkg/globalmanager/controllers/federatedlearning/federatedlearningjob_test.go`. + +The test files introduced the `fake` package, which brought in some additional dependency code, increasing the pull request size. + ## Content + This project focuses on optimizing the logic of joint inference and federated learning task controllers, with no impact on other modules. 1. **Cascade Deletion of Joint Inference and Federated Learning Tasks** @@ -53,6 +79,7 @@ This project focuses on optimizing the logic of joint inference and federated le 4. **Documentation Updates** - Update application instance demonstration documentation to describe the reasons for modifications and the usage methods of the optimized cases. + ## Target Users Users of Sedna's joint inference and federated learning applications. @@ -66,16 +93,25 @@ Users of Sedna's joint inference and federated learning applications. In Kubernetes, the Owner Reference provides information about the relationships between objects, allowing the control plane and other API clients to clean up associated resources when an object is deleted. The garbage collector can implement cascade deletion functionality based on this relationship. Each resource's metadata includes an `ownerReferences` field, an array indicating its owners. When an owner resource is deleted, it is removed from this array, and the garbage collector will reclaim the resource once all owners are deleted. + ### Sedna Controller Garbage Collection + The Owner Reference relationships for JointInferenceService is depicted in the following diagrams: -![](./images/ji-ownerreference.png) + + + The Owner Reference relationships for FederatedLearningJob is depicted in the following diagrams: -![](./images/fl-ownerreference.png) + + #### Setting Owner References in Joint Inference Controller + Taking the Joint Inference Controller as an example, the Owner Reference setting process involves several steps: -![](./images/ji-setownerreference.png) + + + In `pkg/globalmanager/controllers/jointinference/jointinferenceservice.go`, the controller name and CR kind name are defined: + ```go // Name is this controller name Name = "JointInference" @@ -83,16 +119,22 @@ In `pkg/globalmanager/controllers/jointinference/jointinferenceservice.go`, the // KindName is the kind name of CR this controller controls KindName = "JointInferenceService" ``` + The GroupVersionKind for the controller type is defined as follows: + ```go // Kind contains the schema.GroupVersionKind for this controller type. var Kind = sednav1.SchemeGroupVersion.WithKind(Name) ``` + The `run` function starts the controller, defines the number of workers, and sets up worker threads: + ```go for i := 0; i < workers; i++ { go wait.Until(c.worker, time.Second, stopCh) } ``` + In the `worker` function, tasks are dequeued, processed, and marked as done: + ```go // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the sync is never invoked concurrently with the same key. @@ -101,7 +143,9 @@ func (c *Controller) worker() { } } ``` + The `sync` function parses the key to get the namespace and name: + ```go ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { @@ -111,7 +155,9 @@ if len(ns) == 0 || len(name) == 0 { return false, fmt.Errorf("invalid jointinference service key %q: either namespace or name is missing", key) } ``` + It then retrieves the JointInferenceService object from the lister: + ```go sharedService, err := c.serviceLister.JointInferenceServices(ns).Get(name) if err != nil { @@ -123,11 +169,15 @@ if err != nil { } service := *sharedService ``` + The GroupVersionKind is set: + ```go service.SetGroupVersionKind(Kind) ``` + A selector is generated, and associated pods are listed: + ```go selector, _ := runtime.GenerateSelector(&service) pods, err := c.podStore.Pods(service.Namespace).List(selector) @@ -136,138 +186,317 @@ if err != nil { } klog.V(4).Infof("list jointinference service %v/%v, %v pods: %v", service.Namespace, service.Name, len(pods), pods) ``` + When no worker failures occur, and if no associated pods exist, the `createWorkers` function is called to create pods: + ```go else { if len(pods) == 0 { active, manageServiceErr = c.createWorkers(&service) } ``` + In the `createWorkers` function, `createCloudWorker` and `createEdgeWorker` are called to create cloud and edge worker pods. The `runtime.CreatePodWithTemplate` function creates pods, with the OwnerReference set as follows: + ```go if controllerRef != nil { pod.OwnerReferences = append(pod.OwnerReferences, *controllerRef) } ``` + ## Pod Reconstruction -Pod automatic reconstruction upon failure is determined by the restart policy (RestartPolicy). In JointInferenceService, RestartPolicy is not explicitly set, defaulting to `always`. During joint inference tasks, if issues such as improper EdgeMesh configuration arise, causing the edge pod to fail to access the cloud for inference, the edge pod will continuously restart. In FederatedLearningJob, RestartPolicy is set to `OnFailure`. -### Method for Controlling Pod Count in ReplicaSet -![](./images/replicaset-managereplicas.png) -The ReplicaSet controller ensures the actual state matches the desired state by creating or deleting pods to achieve the desired replica count. The `syncReplicaSet` function is responsible for this, including the following steps: -Parse namespace and name: -```go -namespace, name, err := cache.SplitMetaNamespaceKey(key) -``` -Get the ReplicaSet object: -```go -rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name) -``` -Filter and claim the pods managed by the ReplicaSet: -```go -filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods) -``` -Ensure the ReplicaSet's actual state matches the desired state by calling `manageReplicas` to create or delete pods: -```go -if rsNeedsSync && rs.DeletionTimestamp == nil { - manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs) -} -``` -Calculate the difference between the current pod count and the desired replica count: -```go -diff := len(filteredPods) - int(*(rs.Spec.Replicas)) -``` -Create or delete pods based on the difference: +The automatic reconstruction of a failed Pod is determined by the RestartPolicy. In the JointInferenceService, the RestartPolicy is not explicitly set, so the default is always. During the joint inference task, if an issue arises, such as EdgeMesh being improperly configured, preventing the edge from accessing the cloud's port 5000 for cloud-based inference, the edge pod will continuously restart. In the FederatedLearningJob, the RestartPolicy is set to OnFailure. + +### K8S Informer Mechanism + +Pod Reconstruction + +The automatic reconstruction of a failed Pod is determined by the RestartPolicy. In the JointInferenceService, the RestartPolicy is not explicitly set, so the default is always. During the joint inference task, if an issue arises, such as EdgeMesh being improperly configured, preventing the edge from accessing the cloud's port 5000 for cloud-based inference, the edge pod will continuously restart. In the FederatedLearningJob, the RestartPolicy is set to OnFailure. +K8S Informer Mechanism + +Kubernetes uses an Informer instead of a Controller to access the API Server. All Controller operations interact with the Informer, which does not access the API Server each time. Informer uses a ListAndWatch mechanism. When the Informer starts for the first time, it calls the LIST API to retrieve the latest versions of all resource objects and then uses the WATCH API to monitor changes to these objects. This event information is maintained in a read-only cache queue to improve query efficiency and reduce the load on the API Server. + + + +According to the diagram, let's explain the roles of some components in the Informer: + +Controller: The core of the Informer, it can create reflectors and control the processLoop. The processLoop pops data from the DeltaFIFO queue, first calls the Indexer for caching and indexing, and then distributes it to the processor for handling. + +Reflector: The Informer does not directly access the k8s API server but uses a Reflector to do so. The Reflector monitors specific Kubernetes resources through ListAndWatch. When a resource changes, such as an Added event, the resource object is stored in the local DeltaFIFO cache. + +DeltaFIFO: A first-in-first-out cache queue used to store various events returned by the Watch API, such as Added, Updated, and Deleted. + +LocalStore: The cache of the Informer, which stores objects from the API server (some of which may still be in DeltaFIFO). Users query the cache directly, reducing the API server's load. LocalStore is accessed by the Lister's List/Get methods. + +WorkQueue: After DeltaFIFO receives an event, it stores the event in its data structure, directly operates the data stored in the Store, updates the Store, and then pops the event into the WorkQueue. The Controller triggers the corresponding callback function based on the event type received in the WorkQueue. + + + +#### Informer Workflow + +- Informer first lists/watches the API server. The Reflector handles the connection, listing all instances of the resource and monitoring changes through the watch method. If the connection is interrupted, it resumes from the latest resourceVersion. When instances are created, deleted, or updated, the Reflector receives event notifications, which are placed into DeltaFIFO. + +- Informer reads these deltas from DeltaFIFO, determines the event type, and updates the local cache accordingly. + +- For Added events, the Informer saves the object to the local cache via the Indexer and creates an index. For Deleted events, it removes the object from the cache. + +- DeltaFIFO then pops the event into the Controller, which calls the ResourceEventHandler callback for handling. + +- ResourceEventHandler filters and places the relevant objects into the WorkQueue. + +- The Controller retrieves objects from the WorkQueue, launches a worker, and executes business logic. This typically involves comparing the cluster's current state with the desired state and instructing the API server to make adjustments, like creating new pods for a deployment. + +- Workers can use the Lister to get resources from the cache, avoiding frequent access to the API server. + +There are three types of `ResourceEventHandler` functions in Informer: + ```go -if diff < 0 { - ... - err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind)) - ... -} else if diff > 0 { - ... - rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs) - ... +// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or +// as few of the notification functions as you want while still implementing +// ResourceEventHandler. +type ResourceEventHandlerFuncs struct { + AddFunc func(obj interface{}) + UpdateFunc func(oldObj, newObj interface{}) + DeleteFunc func(obj interface{}) } ``` -### k8s Informer Mechanism -In Kubernetes, Informers are used instead of Controllers to access the API Server. All operations of the Controller interact with the Informer, and the Informer does not access the API Server every time. The Informer uses the ListAndWatch mechanism. When the Informer starts for the first time, it calls the LIST API to get all the latest versions of the resource objects, and then uses the WATCH API to monitor changes to these objects. It maintains event information in a read-only cache queue, improving query efficiency and reducing the load on the API Server. -![](./images/informer-workflow.png) -Explain the components in the Informer Mechanism based on the flowchart: -- **Controller**: The implementation carrier of the Informer, which can create a Reflector and control the processLoop. The processLoop pops data from the DeltaFIFO queue, first calling the Indexer to cache and index the data, and then distributing it to the processor for handling. -- **Reflector**: The Informer does not directly access the Kubernetes API server but does so through an object called a Reflector. The Reflector monitors specified Kubernetes resources through ListAndWatch. When a resource changes, such as an Added event, it stores the resource object in the local cache DeltaFIFO. -- **DeltaFIFO**: A first-in, first-out cache queue used to store various events returned by the Watch API, such as Added, Updated, and Deleted. -- **LocalStore**: The cache of the Informer, which stores objects from the API server (some of which may still be in the DeltaFIFO). When users query objects, they directly retrieve them from the cache, reducing the load on the API server. LocalStore is only accessed by the Lister's List/Get methods. -- **WorkQueue**: After DeltaFIFO receives an event, it first stores the event in its data structure and then directly operates on the data stored in the Store. After updating the Store, DeltaFIFO pops the event to the WorkQueue. The Controller receives the event from the WorkQueue and triggers the corresponding callback function based on the event type. -#### Informer Workflow -- Informer will first list/watch the apiserver. The Reflector package used by Informer is responsible for establishing a connection with the apiserver. Reflector uses the ListAndWatch method to list all instances of the resource from the apiserver. The list will get the latest resourceVersion of the object, and then use the watch method to monitor all changes after the resourceVersion. If an exception occurs in the middle, reflector will try to monitor all changes from the disconnected resourceVersion again. Once the instance of the object is created, deleted, or updated, Reflector will receive an "event notification". At this time, the combination of the event and its corresponding API object is called a delta, which will be placed in the DeltaFIFO. -- Informer will continuously read deltas from this DeltaFIFO. Every time an object is taken out, Informer will determine the time type of the delta and then create or update the local cache, that is, the store. -- If the event type is Added, the Informer will save the API object in the increment to the local cache through the Indexer library and create an index for it. If it is a delete operation, the object will be deleted from the local cache. -- DeltaFIFO pops this event to the controller, and the controller calls the previously registered ResourceEventHandler callback function to process it. -- In the ResourceEventHandler callback function, it actually just does a simple filtering, and then puts the object of interest into the workqueue. -- The Controller takes the object from the workqueue and starts a worker to execute its own business logic. The business logic usually calculates the difference between the current state of the cluster and the state that the user wants to achieve, and then lets the apiserver evolve the state to the state that the user wants to achieve, such as creating new pods for deployment, or scaling up/down deployment. -- In the worker, you can use the lister to obtain resources without frequently accessing the apiserver, because changes to resources in the apiserver will be reflected in the local cache. -### Joint Inference and Federated Learning Controller Pod Deletion and Reconstruction Design -![](./images/jointinference-pod-reconstruction.png) -Taking `jointinferenceservice.go` as an example, the `New()` function creates a new `JointInferenceService` controller to keep the related pods in sync with the corresponding `JointInferenceService` objects. Within the `New()` function, the `informers` are initialized as follows: +The logic for these three functions is user-defined. After registering the `ResourceEventHandler` during controller initialization, the corresponding `ResourceEventHandler` will be triggered whenever an instance of the object is created, deleted, or updated. + +### Informer Workflow for Joint Inference and Federated Learning Controllers + + + +Taking `jointinferenceservice.go` as an example, the `New()` function creates a new `JointInferenceService` controller to keep the relevant pods in sync with the corresponding `JointInferenceService` objects. In the `New()` function, the `informer` is initialized. + ```go podInformer := cc.KubeInformerFactory.Core().V1().Pods() serviceInformer := cc.SednaInformerFactory.Sedna().V1alpha1().JointInferenceServices() ``` + The service informer uses a custom handler: + ```go -serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - jc.enqueueController(obj, true) - jc.syncToEdge(watch.Added, obj) - }, - UpdateFunc: func(old, cur interface{}) { - jc.enqueueController(cur, true) - jc.syncToEdge(watch.Added, cur) - }, - DeleteFunc: func(obj interface{}) { - jc.enqueueController(obj, true) - jc.syncToEdge(watch.Deleted, obj) - }, -}) +    serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ +        AddFunc: func(obj interface{}) { +            jc.enqueueController(obj, true) +            jc.syncToEdge(watch.Added, obj) +        }, +        UpdateFunc: func(old, cur interface{}) { +            jc.enqueueController(cur, true) +            jc.syncToEdge(watch.Added, cur) +        }, + +        DeleteFunc: func(obj interface{}) { +            jc.enqueueController(obj, true) +            jc.syncToEdge(watch.Deleted, obj) +        }, +    }) ``` + The pod informer uses a custom handler: + ```go -podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: jc.addPod, - UpdateFunc: jc.updatePod, - DeleteFunc: jc.deletePod, -}) +    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ +        AddFunc:    jc.addPod, +        UpdateFunc: jc.updatePod, +        DeleteFunc: jc.deletePod, +    }) ``` -The `EventHandler` (`addPod`, `updatePod`, `deletePod`) here merely adds the related objects to the queue without performing any additional processing. -`podInformer.Lister()` creates a `Lister()` to retrieve Pod resources, and `podInformer.Informer().HasSynced` checks if the informer's cache has synced. + +The `EventHandler` functions (`addPod`, `updatePod`, `deletePod`) here simply add the relevant objects to the queue without performing any other processing. + +`podInformer.Lister()` creates a `Lister()` to retrieve Pod resources, and `podInformer.Informer().HasSynced` checks whether the Informer's cache has been synchronized. + ```go -jc.serviceLister = serviceInformer.Lister() -jc.serviceStoreSynced = serviceInformer.Informer().HasSynced -//... -jc.podStore = podInformer.Lister() -jc.podStoreSynced = podInformer.Informer().HasSynced +    jc.serviceLister = serviceInformer.Lister() +    jc.serviceStoreSynced = serviceInformer.Informer().HasSynced + //... +    jc.podStore = podInformer.Lister() +    jc.podStoreSynced = podInformer.Informer().HasSynced ``` -The process of synchronizing resources from the API server is handled in the `Run()` function of the `jointinferenceservice` controller. The `Run()` function starts the main goroutine responsible for watch and sync services. + +The process of synchronizing resources from the API server is carried out by the jointinferenceservice controller in the Run() function. The Run() function starts the main goroutine responsible for the watch and sync services. + ```go -if !cache.WaitForNamedCacheSync(Name, stopCh, c.podStoreSynced, c.serviceStoreSynced) { - klog.Errorf("failed to wait for %s caches to sync", Name) - return -} +    if !cache.WaitForNamedCacheSync(Name, stopCh, c.podStoreSynced, c.serviceStoreSynced) { +        klog.Errorf("failed to wait for %s caches to sync", Name) +        return + +    } ``` -After starting the informer and waiting for the local cache to sync, workers are started. Upon receiving a change event, the changed object is retrieved from the event, an object key in the form of `namespace/name` is generated, and the key is placed into the worker queue. The `worker()` function calls `c.processNextWorkItem()` function. + +After starting the Informer, it waits for the local cache to sync before launching the workers. When a change event is received, the changed object is retrieved from the event, an object key (in the form of namespace/name) is generated, and the key is placed into the worker queue. The `worker()` function then calls the `c.processNextWorkItem()` function. + ```go func (c *Controller) worker() { - for c.processNextWorkItem() { - } +    for c.processNextWorkItem() { +    } +} +``` + +The `processNextWorkItem` function retrieves the key from the worker queue and calls `sync()`. In the `sync()` function, it fetches the actual object from the local cache using the lister and performs the relevant synchronization operations. + +### Design of the Pod Reconstruction Solution for Federated Learning + + + +Listen for delete events. When the Informer detects a delete event with `OwnerReference` set to `FederatedLearning`, it triggers the `DeletePod` function, which reconstructs the pod. The newly created pod will be almost identical to the original pod, retaining its configuration and specifications, but its resource version and UID will be reset to allow for regeneration. + +#### Code Logic + +- Pre-Reconstruction Check + - Check if the pod is owned by a `FederatedLearningJob`. + - Check if the pod has already been recreated: use `c.recreatedPods.Load(pod.Name)` to verify if this pod has already been recreated. If it has been recreated, do not attempt to recreate it again. +- Recreate Pods + - Use `pod.DeepCopy()` to create a deep copy of the pod. + - Reset certain unique identifiers (such as `ResourceVersion` and `UID`) and status fields. + - Call the Kubernetes API to create a new pod using `c.kubeClient.CoreV1().Pods(pod.Namespace).Create`. + - If the creation is successful, log the event and mark the pod as recreated. + +```go +    // Create a deep copy of the old pod +    newPod := pod.DeepCopy() +    // Reset the resource version and UID as they are unique to each object +    newPod.ResourceVersion = "" +    newPod.UID = "" +    // Clear the status +    newPod.Status = v1.PodStatus{} +    // Remove the deletion timestamp +    newPod.DeletionTimestamp = nil +    // Remove the deletion grace period seconds +    newPod.DeletionGracePeriodSeconds = nil +    _, err := c.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), newPod, metav1.CreateOptions{}) +    if err != nil { +        return +    } +``` + +- Mark as Recreated and Set Timer to Clean Up Record: + - Use `c.recreatedPods.Store(pod.Name, true)` to mark that this pod has been recreated. + - Set a timer to clear this mark after 5 seconds (`c.recreatedPods.Delete(pod.Name)`), allowing the reconstruction logic to be triggered again if the pod is deleted once more. + +```go +    // mark the pod as recreated +    c.recreatedPods.Store(newPod.Name, true) +    // set a timer to delete the record from the map after a while +    go func() { +        time.Sleep(5 * time.Second) +        c.recreatedPods.Delete(pod.Name) +    }() +``` + +We add a `sync.Map` type field called `recreatedPods` to the Controller's data structure to prevent a pod from being recreated multiple times during a single delete event. When a pod is manually deleted and successfully recreated, its name is added to `recreatedPods`. If the `deletePod` function is called again during the same delete event, the pod name will already exist in `recreatedPods`, preventing the manually deleted pod from being deleted and recreated again. A timer is also set to clear the mark after 5 seconds, allowing the reconstruction logic to be triggered again if the pod is manually deleted later. + +```go +type Controller struct{ + //... + preventRecreation bool + //... } ``` -The `processNextWorkItem` function retrieves the key from the worker queue, calls `sync()`, which uses the lister to fetch the actual object from the local cache, and executes the actual business logic. Therefore, in the `sync()` function, logic related to automatically rebuilding pods upon deletion is added. Below is the code logic diagram for pod rebuilding. -![](./images/ji-restartworker.png) -# Roadmap - -- **2024-07-26 to 2024-08-01:** Implement all cascade deletion functionalities. -- **2024-08-02 to 2024-08-15:** Implement pod reconstruction functionality. -- **2024-08-16 to 2024-08-22:** Submit pull request (PR). -- **2024-08-23 to 2024-08-29:** Complete testing, demonstrate cases, and conclude the project. \ No newline at end of file + + + +### Design of the Pod Reconstruction Solution for Joint Inference + + + +Inference tasks are stateless workloads, so Kubernetes' native component `Deployment` can be leveraged for pod self-healing. A complete process for monitoring and handling resource changes needs to be built. The `Deployment Informer` can replace the `Pod Informer` to manage `worker` control. + +- Obtain the `Deployment Informer` resource from the informer factory. +- Register event handlers (`addDeployment`, `updateDeployment`, `deleteDeployment`) in the informer. +- Start the informer and sync data with the Kubernetes API. +- Before processing events, wait for the local cache to synchronize with the API Server. +- When a `Deployment` resource in the cluster changes, the `Informer` will trigger the corresponding event handler. + + + +In addition, to ensure that the `deployment` resource is not accidentally deleted, we adopt the same method used in Federated Learning for recreating `pods` to recreate the `deployment`. + +### Federated learning modifies the CRD to update the pod + +Flowchart for updating pod by modifying the CRD in the Federated Learning controller: + + + +Listen for update events: When the Informer detects an update event related to the `FederatedLearningJob`, if there are changes in the CRD, the original pod is deleted, and a new pod is created based on the updated parameters in the CRD. + +#### Code Logic + +The `updateJob` function is triggered, and it first checks whether an update is necessary. + +- If `old` and `cur` cannot be converted into `*sednav1.FederatedLearningJob` objects, it returns immediately. +- If the `ResourceVersion` of `oldJob` and `curJob` are the same, it indicates there are no changes, so the function returns without processing the update. +- Set the `preventRecreation` flag to true to prevent Pod recreation during the update process. + Next, compare the parameters of `oldJob` and `curJob`. +- Compare the `old.Generation` and `cur.Generation` fields. The CRD has a `Generation` field within its `Spec`. This field is automatically generated and increments by 1 every time the CRD object is created or modified, starting at 1 during creation. The `Generation` field only changes when the `Spec` is modified, not when the `Status` is updated. Therefore, this field can be used to determine if a change occurred in the `Spec`. If they are not equal, it means that the parameters of the `FederatedLearningJob` have changed. +- Then, it iterates through the list of Pods and deletes each one. +- Use the updated `curJob.Spec` to recreate the `AggWorker` and `TrainWorker`. +- Finally, reset the `preventRecreation` flag to `false` to allow future Pod self-healing without interference. + +### Joint Inference modifies the CRD to update the pod + +Flowchart for updating pod by modifying the CRD in the Joint Inference controller: + + + +When an update event is detected, if the Informer detects an update to the `JointInferenceService`, and there are changes to the CRD, the `deployment` will be updated according to the new parameters in the CRD. + +#### Code Logic + +The logic for updating pods by modifying the CRD in Joint Inference is largely the same as in Federated Learning. + +Relevant fields are compared, and if they are not equal, it indicates that the parameters of the `JointInferenceService` have changed. The `cloudWorker` and `edgeWorker` are updated using the new `curService.Spec`. + + + +# Testing + +## Federated Learning Unit Tests + +Unit tests are used instead of end-to-end tests, focusing on testing the two modified functions: `deletePod()` and `updateJob()`. + +### `Test_deletePod()` + +#### Delete the existing pod and recreate it + +- A simulated Kubernetes client is created using `fake.NewSimpleClientset()`. +- A test pod is created through the `fakeclient`. +- A controller is created, and the `fakeclient` is registered with it. +- The test pod is passed to the `controller.deletePod()` function, and `fakeClient.CoreV1().Pods("default").Get(context.TODO(), "test-pod", metav1.GetOptions{})` is used to check if the pod has been recreated; if it has not been recreated, the test fails. + +#### Delete non-existent pods + +- Create a simulated client. +- Create a controller. +- Call `controller.deletePod()` to delete a non-existent pod. +- Confirm whether an error occurs. + +### `Test_updateJob()` + +- Mock the pod list process. +- Create a simulated client. +- Create datasets, models, and relevant job and pod resources. +- Initialize the controller, passing in the simulated client, test jobs, mocked pod list, and other necessary dependencies. +- Define a new job and update some of its parameters (changing the `batch_size` in `TrainingWorker` from `32` to `16`). +- Call the `updateJob` function to update the old job to the new job, simulating the job update process in a real environment. +- Validate the update results; if the parameters match the expectations after the update, the test passes. + + + +## Joint Inference Unit Tests + +### `Test_UpdateService()` + +- Create simulated clients for Sedna and Kubernetes. +- Create an old service that includes configuration information for the cloud worker and edge worker, as well as two model resources. +- Create deployment and pod resources based on the old service. +- Initialize the controller, setting it up with the simulated client, pod list, and deployment list. +- The controller's `sendToEdgeFunc` is set to an empty function (not performing actual edge communication). +- A copy of the old joint inference service is made, modifying the hard example mining parameter in the edge worker from `value1` to `value2`, while also incrementing the service's `Generation` value. +- Call the controller's `updateService` function to trigger an update to the joint inference service. +- The test verifies whether the updated joint inference service can be successfully retrieved through the simulated client. +- Check whether the updated HEM parameter has been correctly changed from `value1` to `value2`, ensuring that the service update logic executes correctly.