From 5b5ca3412afe5f5f34e81ef4b3eaed9ca4206b13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wenkai=20Yin=28=E5=B0=B9=E6=96=87=E5=BC=80=29?= Date: Thu, 12 Dec 2024 18:02:26 +0800 Subject: [PATCH] Fix backup post hook issue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix backup post hook issue Fixes #8159 Signed-off-by: Wenkai Yin(尹文开) --- changelogs/unreleased/8509-ywk253100 | 1 + internal/hook/hook_tracker.go | 10 ++- pkg/backup/backup.go | 113 ++++++++++++++++++++++++--- 3 files changed, 112 insertions(+), 12 deletions(-) create mode 100644 changelogs/unreleased/8509-ywk253100 diff --git a/changelogs/unreleased/8509-ywk253100 b/changelogs/unreleased/8509-ywk253100 new file mode 100644 index 0000000000..3476feea69 --- /dev/null +++ b/changelogs/unreleased/8509-ywk253100 @@ -0,0 +1 @@ +Fix backup post hook issue #8159 (caused by #7571): always execute backup post hooks after PVBs are handled \ No newline at end of file diff --git a/internal/hook/hook_tracker.go b/internal/hook/hook_tracker.go index afcb334ea4..39cd6fb166 100644 --- a/internal/hook/hook_tracker.go +++ b/internal/hook/hook_tracker.go @@ -69,14 +69,16 @@ type HookTracker struct { // HookExecutedCnt indicates the number of executed hooks. hookExecutedCnt int // hookErrs records hook execution errors if any. - hookErrs []HookErrInfo + hookErrs []HookErrInfo + AsyncItemBlocks *sync.WaitGroup } // NewHookTracker creates a hookTracker instance. func NewHookTracker() *HookTracker { return &HookTracker{ - lock: &sync.RWMutex{}, - tracker: make(map[hookKey]hookStatus), + lock: &sync.RWMutex{}, + tracker: make(map[hookKey]hookStatus), + AsyncItemBlocks: &sync.WaitGroup{}, } } @@ -141,6 +143,8 @@ func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName // Stat returns the number of attempted hooks and failed hooks func (ht *HookTracker) Stat() (hookAttemptedCnt int, hookFailedCnt int) { + ht.AsyncItemBlocks.Wait() + ht.lock.RLock() defer ht.lock.RUnlock() diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index 465f0753f4..1f41b1dbfc 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -35,9 +35,12 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/selection" kubeerrs "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" kbclient "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/internal/hook" @@ -488,7 +491,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers( addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock { log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items)) - backedUpGRs := kb.backupItemBlock(*itemBlock) + backedUpGRs := kb.backupItemBlock(ctx, *itemBlock) for _, backedUpGR := range backedUpGRs { backedUpGroupResources[backedUpGR] = true } @@ -649,7 +652,7 @@ func (kb *kubernetesBackupper) executeItemBlockActions( } } -func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []schema.GroupResource { +func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock BackupItemBlock) []schema.GroupResource { // find pods in ItemBlock // filter pods based on whether they still need to be backed up // this list will be used to run pre/post hooks @@ -672,7 +675,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche } } } - postHookPods, failedPods, errs := kb.handleItemBlockHooks(itemBlock, preHookPods, hook.PhasePre) + postHookPods, failedPods, errs := kb.handleItemBlockPreHooks(itemBlock, preHookPods) for i, pod := range failedPods { itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running pre hooks for pod") // if pre hook fails, flag pod as backed-up and move on @@ -692,10 +695,9 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche } } - itemBlock.Log.Debug("Executing post hooks") - _, failedPods, errs = kb.handleItemBlockHooks(itemBlock, postHookPods, hook.PhasePost) - for i, pod := range failedPods { - itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod") + if len(postHookPods) > 0 { + itemBlock.Log.Debug("Executing post hooks") + kb.handleItemBlockPostHooks(ctx, itemBlock, postHookPods) } return grList @@ -714,12 +716,12 @@ func (kb *kubernetesBackupper) itemMetadataAndKey(item itemblock.ItemBlockItem) return metadata, key, nil } -func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem, phase hook.HookPhase) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) { +func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) { var successPods []itemblock.ItemBlockItem var failedPods []itemblock.ItemBlockItem var errs []error for _, pod := range hookPods { - err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, phase, itemBlock.itemBackupper.hookTracker) + err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, hook.PhasePre, itemBlock.itemBackupper.hookTracker) if err == nil { successPods = append(successPods, pod) } else { @@ -730,6 +732,99 @@ func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, h return successPods, failedPods, errs } +func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) { + log := itemBlock.Log + + // Handle post hooks for file system backup + // The hooks cannot execute until the PVBs to be processed + isFileSystemBackup := itemBlock.itemBackupper.backupRequest.Spec.DefaultVolumesToFsBackup + if isFileSystemBackup != nil && *isFileSystemBackup { + itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1) + go func() { + defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done() + + if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil { + log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock") + return + } + + for _, pod := range hookPods { + if err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, + hook.PhasePost, itemBlock.itemBackupper.hookTracker); err != nil { + log.WithError(err).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod") + } + } + }() + return + } + + // not file system backup + for _, pod := range hookPods { + if err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, + hook.PhasePost, itemBlock.itemBackupper.hookTracker); err != nil { + log.WithError(err).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod") + } + } +} + +func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error { + requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)}) + if err != nil { + return errors.Wrapf(err, "failed to create label requirement") + } + options := &kbclient.ListOptions{ + LabelSelector: labels.NewSelector().Add(*requirement), + } + pvbList := &velerov1api.PodVolumeBackupList{} + if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil { + return errors.Wrap(err, "failed to list PVBs") + } + + podMap := map[string]struct{}{} + for _, pod := range pods { + podMap[string(pod.Item.GetUID())] = struct{}{} + } + + pvbMap := map[*velerov1api.PodVolumeBackup]bool{} + for i, pvb := range pvbList.Items { + if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist { + continue + } + + processed := false + if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || + pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { + processed = true + } + pvbMap[&pvbList.Items[i]] = processed + } + + checkFunc := func(context.Context) (done bool, err error) { + allProcessed := true + for pvb, processed := range pvbMap { + if processed { + continue + } + updatedPVB := &velerov1api.PodVolumeBackup{} + if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil { + allProcessed = false + log.Infof("failed to get PVB: %v", err) + continue + } + if updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || + updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { + pvbMap[pvb] = true + continue + } + allProcessed = false + } + + return allProcessed, nil + } + + return wait.PollUntilContextCancel(ctx, 5*time.Second, false, checkFunc) +} + func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource, itemBlock *BackupItemBlock) bool { backedUpItem, _, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, false, false, itemBlock) if aggregate, ok := err.(kubeerrs.Aggregate); ok {