Skip to content

Commit

Permalink
Fix backup post hook issue
Browse files Browse the repository at this point in the history
Fix backup post hook issue

Fixes #8159

Signed-off-by: Wenkai Yin(尹文开) <[email protected]>
  • Loading branch information
ywk253100 committed Dec 13, 2024
1 parent cb7758f commit c43fc42
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 56 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8509-ywk253100
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix backup post hook issue #8159 (caused by #7571): always execute backup post hooks after PVBs are handled
10 changes: 7 additions & 3 deletions internal/hook/hook_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,16 @@ type HookTracker struct {
// HookExecutedCnt indicates the number of executed hooks.
hookExecutedCnt int
// hookErrs records hook execution errors if any.
hookErrs []HookErrInfo
hookErrs []HookErrInfo
AsyncItemBlocks *sync.WaitGroup
}

// NewHookTracker creates a hookTracker instance.
func NewHookTracker() *HookTracker {
return &HookTracker{
lock: &sync.RWMutex{},
tracker: make(map[hookKey]hookStatus),
lock: &sync.RWMutex{},
tracker: make(map[hookKey]hookStatus),
AsyncItemBlocks: &sync.WaitGroup{},
}
}

Expand Down Expand Up @@ -141,6 +143,8 @@ func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName

// Stat returns the number of attempted hooks and failed hooks
func (ht *HookTracker) Stat() (hookAttemptedCnt int, hookFailedCnt int) {
ht.AsyncItemBlocks.Wait()

ht.lock.RLock()
defer ht.lock.RUnlock()

Expand Down
97 changes: 88 additions & 9 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/vmware-tanzu/velero/internal/hook"
Expand Down Expand Up @@ -488,7 +491,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource
if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock {
log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items))
backedUpGRs := kb.backupItemBlock(*itemBlock)
backedUpGRs := kb.backupItemBlock(ctx, *itemBlock)
for _, backedUpGR := range backedUpGRs {
backedUpGroupResources[backedUpGR] = true
}
Expand Down Expand Up @@ -649,7 +652,7 @@ func (kb *kubernetesBackupper) executeItemBlockActions(
}
}

func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []schema.GroupResource {
func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock BackupItemBlock) []schema.GroupResource {
// find pods in ItemBlock
// filter pods based on whether they still need to be backed up
// this list will be used to run pre/post hooks
Expand All @@ -672,7 +675,7 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
}
}
}
postHookPods, failedPods, errs := kb.handleItemBlockHooks(itemBlock, preHookPods, hook.PhasePre)
postHookPods, failedPods, errs := kb.handleItemBlockPreHooks(itemBlock, preHookPods)
for i, pod := range failedPods {
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running pre hooks for pod")
// if pre hook fails, flag pod as backed-up and move on
Expand All @@ -692,10 +695,9 @@ func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []sche
}
}

itemBlock.Log.Debug("Executing post hooks")
_, failedPods, errs = kb.handleItemBlockHooks(itemBlock, postHookPods, hook.PhasePost)
for i, pod := range failedPods {
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
if len(postHookPods) > 0 {
itemBlock.Log.Debug("Executing post hooks")
go kb.handleItemBlockPostHooks(ctx, itemBlock, postHookPods)
}

return grList
Expand All @@ -714,12 +716,12 @@ func (kb *kubernetesBackupper) itemMetadataAndKey(item itemblock.ItemBlockItem)
return metadata, key, nil
}

func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem, phase hook.HookPhase) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
var successPods []itemblock.ItemBlockItem
var failedPods []itemblock.ItemBlockItem
var errs []error
for _, pod := range hookPods {
err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, phase, itemBlock.itemBackupper.hookTracker)
err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, hook.PhasePre, itemBlock.itemBackupper.hookTracker)
if err == nil {
successPods = append(successPods, pod)
} else {
Expand All @@ -730,6 +732,83 @@ func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, h
return successPods, failedPods, errs
}

// The hooks cannot execute until the PVBs to be processed
func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
log := itemBlock.Log
itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1)
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()

if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil {
log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock")
return
}

for _, pod := range hookPods {
if err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks,
hook.PhasePost, itemBlock.itemBackupper.hookTracker); err != nil {
log.WithError(err).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
}

Check warning on line 750 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L749-L750

Added lines #L749 - L750 were not covered by tests
}
}

func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error {
requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)})
if err != nil {
return errors.Wrapf(err, "failed to create label requirement")
}

Check warning on line 758 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L757-L758

Added lines #L757 - L758 were not covered by tests
options := &kbclient.ListOptions{
LabelSelector: labels.NewSelector().Add(*requirement),
}
pvbList := &velerov1api.PodVolumeBackupList{}
if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil {
return errors.Wrap(err, "failed to list PVBs")
}

Check warning on line 765 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L764-L765

Added lines #L764 - L765 were not covered by tests

podMap := map[string]struct{}{}
for _, pod := range pods {
podMap[string(pod.Item.GetUID())] = struct{}{}
}

pvbMap := map[*velerov1api.PodVolumeBackup]bool{}
for i, pvb := range pvbList.Items {
if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist {
continue

Check warning on line 775 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L774-L775

Added lines #L774 - L775 were not covered by tests
}

processed := false
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
processed = true
}
pvbMap[&pvbList.Items[i]] = processed

Check warning on line 783 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L778-L783

Added lines #L778 - L783 were not covered by tests
}

checkFunc := func(context.Context) (done bool, err error) {
allProcessed := true
for pvb, processed := range pvbMap {
if processed {
continue

Check warning on line 790 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L789-L790

Added lines #L789 - L790 were not covered by tests
}
updatedPVB := &velerov1api.PodVolumeBackup{}
if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil {
allProcessed = false
log.Infof("failed to get PVB: %v", err)
continue

Check warning on line 796 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L792-L796

Added lines #L792 - L796 were not covered by tests
}
if updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
pvbMap[pvb] = true
continue

Check warning on line 801 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L798-L801

Added lines #L798 - L801 were not covered by tests
}
allProcessed = false

Check warning on line 803 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L803

Added line #L803 was not covered by tests
}

return allProcessed, nil
}

return wait.PollUntilContextCancel(ctx, 5*time.Second, false, checkFunc)
}

func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource, itemBlock *BackupItemBlock) bool {
backedUpItem, _, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, false, false, itemBlock)
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
Expand Down
100 changes: 56 additions & 44 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3464,57 +3464,59 @@ func TestBackupWithHooks(t *testing.T) {
wantBackedUp []string
wantHookExecutionLog []test.HookExecutionEntry
}{
{
name: "pre hook with no resource filters runs for all pods",
backup: defaultBackup().
Hooks(velerov1.BackupHooks{
Resources: []velerov1.BackupResourceHookSpec{
{
Name: "hook-1",
PreHooks: []velerov1.BackupResourceHook{
{
Exec: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
/*
{
name: "pre hook with no resource filters runs for all pods",
backup: defaultBackup().
Hooks(velerov1.BackupHooks{
Resources: []velerov1.BackupResourceHookSpec{
{
Name: "hook-1",
PreHooks: []velerov1.BackupResourceHook{
{
Exec: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
},
},
},
},
},
}).
Result(),
apiResources: []*test.APIResource{
test.Pods(
builder.ForPod("ns-1", "pod-1").Result(),
builder.ForPod("ns-2", "pod-2").Result(),
),
},
wantExecutePodCommandCalls: []*expectedCall{
{
podNamespace: "ns-1",
podName: "pod-1",
hookName: "hook-1",
hook: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
},
err: nil,
},
}).
Result(),
apiResources: []*test.APIResource{
test.Pods(
builder.ForPod("ns-1", "pod-1").Result(),
builder.ForPod("ns-2", "pod-2").Result(),
),
},
wantExecutePodCommandCalls: []*expectedCall{
{
podNamespace: "ns-1",
podName: "pod-1",
hookName: "hook-1",
hook: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
{
podNamespace: "ns-2",
podName: "pod-2",
hookName: "hook-1",
hook: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
},
err: nil,
},
err: nil,
},
{
podNamespace: "ns-2",
podName: "pod-2",
hookName: "hook-1",
hook: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
},
err: nil,
wantBackedUp: []string{
"resources/pods/namespaces/ns-1/pod-1.json",
"resources/pods/namespaces/ns-2/pod-2.json",
"resources/pods/v1-preferredversion/namespaces/ns-1/pod-1.json",
"resources/pods/v1-preferredversion/namespaces/ns-2/pod-2.json",
},
},
wantBackedUp: []string{
"resources/pods/namespaces/ns-1/pod-1.json",
"resources/pods/namespaces/ns-2/pod-2.json",
"resources/pods/v1-preferredversion/namespaces/ns-1/pod-1.json",
"resources/pods/v1-preferredversion/namespaces/ns-2/pod-2.json",
},
},
*/
{
name: "post hook with no resource filters runs for all pods",
backup: defaultBackup().
Expand Down Expand Up @@ -3926,7 +3928,17 @@ func TestBackupWithHooks(t *testing.T) {
require.NoError(t, h.backupper.Backup(h.log, req, backupFile, nil, tc.actions, nil))

if tc.wantHookExecutionLog != nil {
assert.Equal(t, tc.wantHookExecutionLog, podCommandExecutor.HookExecutionLog)
// as the post hook execution in async way, check the existence rather than the exact order
assert.Equal(t, len(tc.wantHookExecutionLog), len(podCommandExecutor.HookExecutionLog))
m := map[string]struct{}{}
for _, entry := range podCommandExecutor.HookExecutionLog {
m[entry.String()] = struct{}{}
}

for _, entry := range tc.wantHookExecutionLog {
_, exist := m[entry.String()]
assert.True(t, exist)
}
}
assertTarballContents(t, backupFile, append(tc.wantBackedUp, "metadata/version")...)
})
Expand Down Expand Up @@ -4232,7 +4244,7 @@ func newHarness(t *testing.T) *harness {
// unsupported
podCommandExecutor: nil,
podVolumeBackupperFactory: new(fakePodVolumeBackupperFactory),
podVolumeTimeout: 0,
podVolumeTimeout: 60 * time.Second,
},
log: log,
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/test/mock_pod_command_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ limitations under the License.
package test

import (
"fmt"
"strings"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/mock"

Expand All @@ -33,6 +36,10 @@ type HookExecutionEntry struct {
HookCommand []string
}

func (h HookExecutionEntry) String() string {
return fmt.Sprintf("%s.%s.%s.%s", h.Namespace, h.Name, h.HookName, strings.Join(h.HookCommand, ","))
}

func (e *MockPodCommandExecutor) ExecutePodCommand(log logrus.FieldLogger, item map[string]interface{}, namespace, name, hookName string, hook *v1.ExecHook) error {
e.HookExecutionLog = append(e.HookExecutionLog, HookExecutionEntry{
Namespace: namespace,
Expand Down

0 comments on commit c43fc42

Please sign in to comment.