Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix backup post hook issue #8509

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/8509-ywk253100
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix backup post hook issue #8159 (caused by #7571): always execute backup post hooks after PVBs are handled
10 changes: 7 additions & 3 deletions internal/hook/hook_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,16 @@ type HookTracker struct {
// HookExecutedCnt indicates the number of executed hooks.
hookExecutedCnt int
// hookErrs records hook execution errors if any.
hookErrs []HookErrInfo
hookErrs []HookErrInfo
AsyncItemBlocks *sync.WaitGroup
Copy link
Contributor

@Lyndon-Li Lyndon-Li Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BackupWithResolvers calls this Stat by its end, but BackupWithResolvers doesn't know details of each itemBlock, it just calls backupItemBlock for all itemBlocks.
On the other hand, PVBs in each itemBlock belong to the itemBlock, so looks like this semantic is more rational --- for each backupItemBlock call, start all PVBs of the itemBlock->wait all PVB complete->run post hooks.

However, at present, when backupItemBlock exits, the PVBs have not actually completed, BackupWithResolvers needs to call hooker's Stat to wait them indirectly.

Therefore, I suggest we make the waiting for PVB completion into backupItemBlock and call post hooks in the same place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this in 1.16 only, as in 1.15, we don't support concurrent backupItemBlock, if we put the wait inside backupItemBlock, PVBs among itemBlocks have to be done in sequence, this is not helpful practically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sseago Just let us know your thinking

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lyndon-Li "Therefore, I suggest we make the waiting for PVB completion into backupItemBlock and call post hooks in the same place."

So right now, we invoke the "wait for PVB completion and then call hooks" with go kb.handleItemBlockPostHooks -- is your suggestion to replace the goroutine call with a direct func call? This would effectively mean that the specific BackupItemBlock call would not return until 1) PVB processing completes and 2) post-backup hooks are done.

With the current codebase (i.e. concurrent backupItemBlock is not yet implemented), that might, in the short term, have a negative performance impact -- but it may make more sense to handle it that way once we have concurrent backupItemBlock processing. However, there still may be edge cases where this has a negative performance impact -- in particular, with large clusters with a large number of node agents, each of which enables multiple parallel PVB processing -- the number of concurrent PVBs we can process may exceed the number of parallel ItemBlocks configured, in which case parallel item block processing will become the bottleneck, not allowing full use of the node agent pods to handle many volumes at once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the number of concurrent PVBs we can process may exceed the number of parallel ItemBlocks configured

If so, can we recommend users to adjust the concurrent number of ItemBlocks? We don't need to consider the number of CPU cores to fit the thread pool, as in this case, the work is actually done by node-agent, the itemBlock threads just need to wait the completion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current codebase (i.e. concurrent backupItemBlock is not yet implemented), that might, in the short term, have a negative performance impact

Yes, agree. This is why we suggest to do this only in 1.16 where concurrent itemBlock processing is fully supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lyndon-Li @sseago
I'm going to merge this PR and cherry pick the changes to branch 1.15 to unblock the release 1.15.1.
Let's continue the discussion on issue #8516

}

// NewHookTracker creates a hookTracker instance.
func NewHookTracker() *HookTracker {
return &HookTracker{
lock: &sync.RWMutex{},
tracker: make(map[hookKey]hookStatus),
lock: &sync.RWMutex{},
tracker: make(map[hookKey]hookStatus),
AsyncItemBlocks: &sync.WaitGroup{},
}
}

Expand Down Expand Up @@ -141,6 +143,8 @@ func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName

// Stat returns the number of attempted hooks and failed hooks
func (ht *HookTracker) Stat() (hookAttemptedCnt int, hookFailedCnt int) {
ht.AsyncItemBlocks.Wait()

ht.lock.RLock()
defer ht.lock.RUnlock()

Expand Down
97 changes: 88 additions & 9 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/vmware-tanzu/velero/internal/hook"
Expand Down Expand Up @@ -488,7 +491,7 @@
addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource
if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock {
log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items))
backedUpGRs := kb.backupItemBlock(*itemBlock)
backedUpGRs := kb.backupItemBlock(ctx, *itemBlock)
for _, backedUpGR := range backedUpGRs {
backedUpGroupResources[backedUpGR] = true
}
Expand Down Expand Up @@ -649,7 +652,7 @@
}
}

func (kb *kubernetesBackupper) backupItemBlock(itemBlock BackupItemBlock) []schema.GroupResource {
func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock BackupItemBlock) []schema.GroupResource {
// find pods in ItemBlock
// filter pods based on whether they still need to be backed up
// this list will be used to run pre/post hooks
Expand All @@ -672,7 +675,7 @@
}
}
}
postHookPods, failedPods, errs := kb.handleItemBlockHooks(itemBlock, preHookPods, hook.PhasePre)
postHookPods, failedPods, errs := kb.handleItemBlockPreHooks(itemBlock, preHookPods)
for i, pod := range failedPods {
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running pre hooks for pod")
// if pre hook fails, flag pod as backed-up and move on
Expand All @@ -692,10 +695,9 @@
}
}

itemBlock.Log.Debug("Executing post hooks")
_, failedPods, errs = kb.handleItemBlockHooks(itemBlock, postHookPods, hook.PhasePost)
for i, pod := range failedPods {
itemBlock.Log.WithError(errs[i]).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
if len(postHookPods) > 0 {
itemBlock.Log.Debug("Executing post hooks")
go kb.handleItemBlockPostHooks(ctx, itemBlock, postHookPods)
}

return grList
Expand All @@ -714,12 +716,12 @@
return metadata, key, nil
}

func (kb *kubernetesBackupper) handleItemBlockHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem, phase hook.HookPhase) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
var successPods []itemblock.ItemBlockItem
var failedPods []itemblock.ItemBlockItem
var errs []error
for _, pod := range hookPods {
err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, phase, itemBlock.itemBackupper.hookTracker)
err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks, hook.PhasePre, itemBlock.itemBackupper.hookTracker)
if err == nil {
successPods = append(successPods, pod)
} else {
Expand All @@ -730,6 +732,83 @@
return successPods, failedPods, errs
}

// The hooks cannot execute until the PVBs to be processed
func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
log := itemBlock.Log
itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1)
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()

if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil {
log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock")
return
}

for _, pod := range hookPods {
if err := itemBlock.itemBackupper.itemHookHandler.HandleHooks(itemBlock.Log, pod.Gr, pod.Item, itemBlock.itemBackupper.backupRequest.ResourceHooks,
hook.PhasePost, itemBlock.itemBackupper.hookTracker); err != nil {
log.WithError(err).WithField("name", pod.Item.GetName()).Error("Error running post hooks for pod")
}

Check warning on line 750 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L749-L750

Added lines #L749 - L750 were not covered by tests
}
}

func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error {
requirement, err := labels.NewRequirement(velerov1api.BackupUIDLabel, selection.Equals, []string{string(itemBlock.itemBackupper.backupRequest.UID)})
if err != nil {
return errors.Wrapf(err, "failed to create label requirement")
}

Check warning on line 758 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L757-L758

Added lines #L757 - L758 were not covered by tests
options := &kbclient.ListOptions{
LabelSelector: labels.NewSelector().Add(*requirement),
}
pvbList := &velerov1api.PodVolumeBackupList{}
if err := kb.kbClient.List(context.Background(), pvbList, options); err != nil {
return errors.Wrap(err, "failed to list PVBs")
}

Check warning on line 765 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L764-L765

Added lines #L764 - L765 were not covered by tests

podMap := map[string]struct{}{}
for _, pod := range pods {
podMap[string(pod.Item.GetUID())] = struct{}{}
}

pvbMap := map[*velerov1api.PodVolumeBackup]bool{}
for i, pvb := range pvbList.Items {
if _, exist := podMap[string(pvb.Spec.Pod.UID)]; !exist {
continue

Check warning on line 775 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L774-L775

Added lines #L774 - L775 were not covered by tests
}

processed := false
if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
processed = true
}
pvbMap[&pvbList.Items[i]] = processed

Check warning on line 783 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L778-L783

Added lines #L778 - L783 were not covered by tests
}

checkFunc := func(context.Context) (done bool, err error) {
allProcessed := true
for pvb, processed := range pvbMap {
if processed {
continue

Check warning on line 790 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L789-L790

Added lines #L789 - L790 were not covered by tests
}
updatedPVB := &velerov1api.PodVolumeBackup{}
if err := kb.kbClient.Get(ctx, kbclient.ObjectKeyFromObject(pvb), updatedPVB); err != nil {
allProcessed = false
log.Infof("failed to get PVB: %v", err)
continue

Check warning on line 796 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L792-L796

Added lines #L792 - L796 were not covered by tests
}
if updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted ||
updatedPVB.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed {
pvbMap[pvb] = true
continue

Check warning on line 801 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L798-L801

Added lines #L798 - L801 were not covered by tests
}
allProcessed = false

Check warning on line 803 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L803

Added line #L803 was not covered by tests
}

return allProcessed, nil
}

return wait.PollUntilContextCancel(ctx, 5*time.Second, false, checkFunc)
}

func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource, itemBlock *BackupItemBlock) bool {
backedUpItem, _, err := itemBackupper.backupItem(log, unstructured, gr, preferredGVR, false, false, itemBlock)
if aggregate, ok := err.(kubeerrs.Aggregate); ok {
Expand Down
100 changes: 56 additions & 44 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3464,57 +3464,59 @@ func TestBackupWithHooks(t *testing.T) {
wantBackedUp []string
wantHookExecutionLog []test.HookExecutionEntry
}{
{
name: "pre hook with no resource filters runs for all pods",
backup: defaultBackup().
Hooks(velerov1.BackupHooks{
Resources: []velerov1.BackupResourceHookSpec{
{
Name: "hook-1",
PreHooks: []velerov1.BackupResourceHook{
{
Exec: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
/*
{
name: "pre hook with no resource filters runs for all pods",
backup: defaultBackup().
Hooks(velerov1.BackupHooks{
Resources: []velerov1.BackupResourceHookSpec{
{
Name: "hook-1",
PreHooks: []velerov1.BackupResourceHook{
{
Exec: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
},
},
},
},
},
}).
Result(),
apiResources: []*test.APIResource{
test.Pods(
builder.ForPod("ns-1", "pod-1").Result(),
builder.ForPod("ns-2", "pod-2").Result(),
),
},
wantExecutePodCommandCalls: []*expectedCall{
{
podNamespace: "ns-1",
podName: "pod-1",
hookName: "hook-1",
hook: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
},
err: nil,
},
}).
Result(),
apiResources: []*test.APIResource{
test.Pods(
builder.ForPod("ns-1", "pod-1").Result(),
builder.ForPod("ns-2", "pod-2").Result(),
),
},
wantExecutePodCommandCalls: []*expectedCall{
{
podNamespace: "ns-1",
podName: "pod-1",
hookName: "hook-1",
hook: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
{
podNamespace: "ns-2",
podName: "pod-2",
hookName: "hook-1",
hook: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
},
err: nil,
},
err: nil,
},
{
podNamespace: "ns-2",
podName: "pod-2",
hookName: "hook-1",
hook: &velerov1.ExecHook{
Command: []string{"ls", "/tmp"},
},
err: nil,
wantBackedUp: []string{
"resources/pods/namespaces/ns-1/pod-1.json",
"resources/pods/namespaces/ns-2/pod-2.json",
"resources/pods/v1-preferredversion/namespaces/ns-1/pod-1.json",
"resources/pods/v1-preferredversion/namespaces/ns-2/pod-2.json",
},
},
wantBackedUp: []string{
"resources/pods/namespaces/ns-1/pod-1.json",
"resources/pods/namespaces/ns-2/pod-2.json",
"resources/pods/v1-preferredversion/namespaces/ns-1/pod-1.json",
"resources/pods/v1-preferredversion/namespaces/ns-2/pod-2.json",
},
},
*/
{
name: "post hook with no resource filters runs for all pods",
backup: defaultBackup().
Expand Down Expand Up @@ -3926,7 +3928,17 @@ func TestBackupWithHooks(t *testing.T) {
require.NoError(t, h.backupper.Backup(h.log, req, backupFile, nil, tc.actions, nil))

if tc.wantHookExecutionLog != nil {
assert.Equal(t, tc.wantHookExecutionLog, podCommandExecutor.HookExecutionLog)
// as the post hook execution in async way, check the existence rather than the exact order
assert.Equal(t, len(tc.wantHookExecutionLog), len(podCommandExecutor.HookExecutionLog))
m := map[string]struct{}{}
for _, entry := range podCommandExecutor.HookExecutionLog {
m[entry.String()] = struct{}{}
}

for _, entry := range tc.wantHookExecutionLog {
_, exist := m[entry.String()]
assert.True(t, exist)
}
}
assertTarballContents(t, backupFile, append(tc.wantBackedUp, "metadata/version")...)
})
Expand Down Expand Up @@ -4232,7 +4244,7 @@ func newHarness(t *testing.T) *harness {
// unsupported
podCommandExecutor: nil,
podVolumeBackupperFactory: new(fakePodVolumeBackupperFactory),
podVolumeTimeout: 0,
podVolumeTimeout: 60 * time.Second,
},
log: log,
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/test/mock_pod_command_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ limitations under the License.
package test

import (
"fmt"
"strings"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/mock"

Expand All @@ -33,6 +36,10 @@ type HookExecutionEntry struct {
HookCommand []string
}

func (h HookExecutionEntry) String() string {
return fmt.Sprintf("%s.%s.%s.%s", h.Namespace, h.Name, h.HookName, strings.Join(h.HookCommand, ","))
}

func (e *MockPodCommandExecutor) ExecutePodCommand(log logrus.FieldLogger, item map[string]interface{}, namespace, name, hookName string, hook *v1.ExecHook) error {
e.HookExecutionLog = append(e.HookExecutionLog, HookExecutionEntry{
Namespace: namespace,
Expand Down
Loading