Skip to content

Commit

Permalink
Add internal retry for SourceCommitAndDir and readHydratedDir (#886)
Browse files Browse the repository at this point in the history
If the internal retry fails eventually, it returns an internal error and
surfaces the error in the R*Sync status field.
  • Loading branch information
nan-yu authored Sep 28, 2023
1 parent 6b17d41 commit 5402b3a
Show file tree
Hide file tree
Showing 8 changed files with 603 additions and 162 deletions.
2 changes: 1 addition & 1 deletion e2e/testcases/policy_dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestInvalidPolicyDir(t *testing.T) {
nomostest.SetPolicyDir(nt, configsync.RootSyncName, "some-nonexistent-policydir")

nt.T.Log("Expect an error to be present in status.source.errors")
nt.WaitForRootSyncSourceError(configsync.RootSyncName, status.PathErrorCode, "")
nt.WaitForRootSyncSourceError(configsync.RootSyncName, status.SourceErrorCode, "")

nt.T.Log("Fix the policydir in the repo")
nomostest.SetPolicyDir(nt, configsync.RootSyncName, "acme")
Expand Down
90 changes: 66 additions & 24 deletions pkg/hydrate/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (
"time"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"kpt.dev/configsync/pkg/api/configsync"
"kpt.dev/configsync/pkg/api/configsync/v1beta1"
"kpt.dev/configsync/pkg/importer/filesystem/cmpath"
"kpt.dev/configsync/pkg/metadata"
"kpt.dev/configsync/pkg/reconcilermanager"
"kpt.dev/configsync/pkg/status"
"kpt.dev/configsync/pkg/util"
)

const (
Expand Down Expand Up @@ -89,9 +91,16 @@ func (h *Hydrator) Run(ctx context.Context) {
hydrateErr = h.rehydrateOnError(hydrateErr, srcCommit, syncDir)
rehydrateTimer.Reset(h.RehydratePeriod) // Schedule rehydrate attempt
case <-runTimer.C:
srcCommit, syncDir, err = SourceCommitAndDir(h.SourceType, absSourceDir, h.SyncDir, h.ReconcilerName)
// pull the source commit and directory with retries within 5 minutes.
srcCommit, syncDir, err = SourceCommitAndDirWithRetry(util.SourceRetryBackoff, h.SourceType, absSourceDir, h.SyncDir, h.ReconcilerName)
if err != nil {
klog.Errorf("failed to get the commit hash and sync directory from the source directory %s: %v", absSourceDir.OSPath(), err)
hydrateErr = NewInternalError(errors.Wrapf(err,
"failed to get the commit hash and sync directory from the source directory %s",
absSourceDir.OSPath()))
if err := h.complete(srcCommit, hydrateErr); err != nil {
klog.Errorf("failed to complete the rendering execution for commit %q: %v",
srcCommit, err)
}
} else if DoneCommit(h.DonePath.OSPath()) != srcCommit {
// If the commit has been processed before, regardless of success or failure,
// skip the hydration to avoid repeated execution.
Expand Down Expand Up @@ -326,14 +335,34 @@ func deleteErrorFile(file string) error {
return nil
}

// SourceCommitAndDir returns the source hash (a git commit hash or an OCI image digest or a helm chart version), the absolute path of the sync directory, and source errors.
func SourceCommitAndDir(sourceType v1beta1.SourceType, sourceRevDir cmpath.Absolute, syncDir cmpath.Relative, reconcilerName string) (string, cmpath.Absolute, status.Error) {
// Check if the source root directory is mounted
// SourceCommitAndDirWithRetry returns the source hash (a git commit hash or an
// OCI image digest or a helm chart version), the absolute path of the sync
// directory, and source errors.
// It retries with the provided backoff.
func SourceCommitAndDirWithRetry(backoff wait.Backoff, sourceType v1beta1.SourceType, sourceRevDir cmpath.Absolute, syncDir cmpath.Relative, reconcilerName string) (commit string, sourceDir cmpath.Absolute, _ status.Error) {
err := util.RetryWithBackoff(backoff, func() error {
var err error
commit, sourceDir, err = SourceCommitAndDir(sourceType, sourceRevDir, syncDir, reconcilerName)
return err
})
// If a retriable error can't be addressed with retry, it is identified as a
// source error, and will be exposed in the R*Sync status.
return commit, sourceDir, status.SourceError.Wrap(err).Build()
}

// SourceCommitAndDir returns the source hash (a git commit hash or an OCI image
// digest or a helm chart version), the absolute path of the sync directory,
// and source errors.
func SourceCommitAndDir(sourceType v1beta1.SourceType, sourceRevDir cmpath.Absolute, syncDir cmpath.Relative, reconcilerName string) (string, cmpath.Absolute, error) {
sourceRoot := path.Dir(sourceRevDir.OSPath())
if _, err := os.Stat(sourceRoot); err != nil && os.IsNotExist(err) {
return "", "", status.TransientError(err)
}
// Check if the source configs are synced successfully.
if _, err := os.Stat(sourceRoot); err != nil {
// It fails to check the source root directory status, either because of
// the path doesn't exist, or other OS failures. The root cause is
// probably because the *-sync container is not ready yet, so retry until
// it becomes ready.
return "", "", util.NewRetriableError(fmt.Errorf("failed to check the status of the source root directory %q: %v", sourceRoot, err))
}
// Check if the source configs are pulled successfully.
errFilePath := filepath.Join(sourceRoot, ErrorFile)

var containerName string
Expand All @@ -347,24 +376,34 @@ func SourceCommitAndDir(sourceType v1beta1.SourceType, sourceRevDir cmpath.Absol
}

content, err := os.ReadFile(errFilePath)
if err != nil && !os.IsNotExist(err) {
return "", "", status.TransientError(
fmt.Errorf("unable to load %s: %v. Please "+
"check %s logs for more info: kubectl logs -n %s -l %s -c %s",
errFilePath, err, containerName, configsync.ControllerNamespace,
metadata.ReconcilerLabel, reconcilerName))
} else if err == nil && len(content) == 0 {
return "", "", status.SourceError.Sprintf("%s is "+
"empty. Please check %s logs for more info: kubectl logs -n %s -l %s -c %s",
switch {
case err != nil && !os.IsNotExist(err):
// It fails to get the status of the source error file, probably because
// the *-sync container is not ready yet, so retry until it becomes ready.
return "", "", util.NewRetriableError(fmt.Errorf("failed to check the status of the source error file %q: %v", errFilePath, err))
case err == nil && len(content) == 0:
// The source error file exists, which indicates the *-sync container is
// ready. The file is empty probably because *-sync fails to write to the
// error file. Hence, no need to retry.
return "", "", fmt.Errorf("%s is empty. Please check %s logs for more info: kubectl logs -n %s -l %s -c %s",
errFilePath, containerName, configsync.ControllerNamespace,
metadata.ReconcilerLabel, reconcilerName).Build()
} else if err == nil {
return "", "", status.SourceError.Sprintf("error in the %s container: %s", containerName, string(content)).Build()
metadata.ReconcilerLabel, reconcilerName)
case err == nil && len(content) != 0:
// The source error file exists, which indicates the *-sync container is
// ready, so return the error directly without retry.
return "", "", fmt.Errorf("error in the %s container: %s", containerName, string(content))
default:
// The sourceRoot directory exists, but the source error file doesn't exist.
// It indicates that *-sync is ready, but no errors so far.
}

gitDir, err := sourceRevDir.EvalSymlinks()
if err != nil {
return "", "", status.SourceError.Sprintf("unable to evaluate the source link %s", sourceRevDir).Wrap(err).Build()
// `sourceRevDir` points to the directory with commit SHA that holds the
// checked-out files. It can't be evaluated probably because *-sync
// container is ready, but hasn't finished creating the symlink yet, so
// retry until the symlink is created.
return "", "", util.NewRetriableError(fmt.Errorf("failed to evaluate the source rev directory %q: %v", sourceRevDir, err))
}

commit := filepath.Base(gitDir.OSPath())
Expand All @@ -379,8 +418,11 @@ func SourceCommitAndDir(sourceType v1beta1.SourceType, sourceRevDir cmpath.Absol
relSyncDir := gitDir.Join(syncDir)
sourceDir, err := relSyncDir.EvalSymlinks()
if err != nil {
return commit, "", status.PathWrapError(
errors.Wrap(err, "evaluating symbolic link to policy sourceRoot"), relSyncDir.OSPath())
// `relSyncDir` points to the source config directory. Now the commit SHA
// has been checked out, but it fails to evaluate the symlink with the
// config directory, which indicates a misconfiguration of `spec.git.dir`,
// `spec.oci.dir`, or `spec.helm.dir`, so return the error without retry.
return "", "", fmt.Errorf("failed to evaluate the config directory %q: %w", relSyncDir.OSPath(), err)
}
return commit, sourceDir, nil
}
162 changes: 159 additions & 3 deletions pkg/hydrate/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package hydrate

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait"
"kpt.dev/configsync/pkg/api/configsync/v1beta1"
"kpt.dev/configsync/pkg/importer/filesystem/cmpath"
ft "kpt.dev/configsync/pkg/importer/filesystem/filesystemtest"
Expand All @@ -35,6 +37,160 @@ const (
kustomization = "namespace: test-ns"
)

func TestSourceCommitAndDirWithRetry(t *testing.T) {
commit := "abcd123"
syncDir := "configs"
testCases := []struct {
name string
retryCap time.Duration
srcRootCreateLatency time.Duration
symlinkCreateLatency time.Duration
syncDir string
errFileExists bool
errFileContent string
expectedSourceCommit string
expectedErrMsg string
}{
{
name: "source root directory isn't created within the retry cap",
retryCap: 5 * time.Millisecond,
srcRootCreateLatency: 10 * time.Millisecond,
expectedErrMsg: "failed to check the status of the source root directory",
},
{
name: "source root directory created within the retry cap",
retryCap: 10 * time.Millisecond,
srcRootCreateLatency: time.Millisecond,
expectedSourceCommit: commit,
},
{
name: "symlink isn't created within the retry cap",
retryCap: 5 * time.Millisecond,
symlinkCreateLatency: 10 * time.Millisecond,
expectedErrMsg: "failed to evaluate the source rev directory",
},
{
name: "symlink created within the retry cap",
retryCap: 10 * time.Millisecond,
symlinkCreateLatency: 5 * time.Millisecond,
expectedSourceCommit: commit,
},
{
name: "error file exists with empty content",
retryCap: 10 * time.Millisecond,
errFileExists: true,
expectedErrMsg: "is empty. Please check git-sync logs for more info",
},
{
name: "error file exists with non-empty content",
retryCap: 10 * time.Millisecond,
errFileExists: true,
errFileContent: "git-sync error",
expectedErrMsg: "git-sync error",
},
{
name: "sync directory doesn't exist",
retryCap: 10 * time.Millisecond,
syncDir: "unknown",
expectedErrMsg: "failed to evaluate the config directory",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
backoff := wait.Backoff{
Duration: time.Millisecond,
Factor: 2,
Steps: 10,
Cap: tc.retryCap,
Jitter: 0.1,
}

srcRootDir := filepath.Join(os.TempDir(), "test-srcCommit-syncDir", rand.String(10))
commitDir := filepath.Join(srcRootDir, commit)
if len(tc.syncDir) == 0 {
tc.syncDir = syncDir
}
t.Cleanup(func() {
if err := os.RemoveAll(srcRootDir); err != nil {
t.Errorf("failed to remove source root directory: %v", err)
}
})

// Simulating the creation of source configs and errors in the background
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
err := func() error {
srcRootDirCreated := false
// Create the source root directory conditionally with latency
if tc.srcRootCreateLatency > 0 {
t.Logf("sleeping for %q before creating source root directory %q", tc.srcRootCreateLatency, srcRootDir)
time.Sleep(tc.srcRootCreateLatency)
}
if tc.srcRootCreateLatency <= tc.retryCap {
if err := os.MkdirAll(srcRootDir, 0700); err != nil {
return fmt.Errorf("failed to create source root directory %q: %v", srcRootDir, err)
}
srcRootDirCreated = true
t.Logf("source root directory %q created at %v", srcRootDir, time.Now())
}

// Create the error file based on the test condition
if srcRootDirCreated && tc.errFileExists {
errFilePath := filepath.Join(srcRootDir, ErrorFile)
if err := os.WriteFile(errFilePath, []byte(tc.errFileContent), 0644); err != nil {
return fmt.Errorf("failed to write to error file %q: %v", errFilePath, err)
}
}

// Create the symlink conditionally with latency
if tc.symlinkCreateLatency > 0 {
t.Logf("sleeping for %q before creating the symlink %q", tc.symlinkCreateLatency, srcRootDir)
time.Sleep(tc.symlinkCreateLatency)
}
if srcRootDirCreated && tc.symlinkCreateLatency <= tc.retryCap {
if err := os.Mkdir(commitDir, os.ModePerm); err != nil {
return fmt.Errorf("failed to create the commit directory %q: %v", commitDir, err)
}
// Create the sync directory if syncDir is the same as tc.syncDir
if tc.syncDir == syncDir {
syncDirPath := filepath.Join(commitDir, syncDir)
if err := os.Mkdir(syncDirPath, os.ModePerm); err != nil {
return fmt.Errorf("failed to create the sync directory %q: %v", syncDirPath, err)
}
t.Logf("sync directory %q created at %v", syncDirPath, time.Now())
}
symDir := filepath.Join(srcRootDir, "rev")
if err := os.Symlink(commitDir, symDir); err != nil {
return fmt.Errorf("failed to create the symlink %q: %v", symDir, err)
}
t.Logf("symlink %q created and linked to %q at %v", symDir, commitDir, time.Now())
}
return nil
}()
if err != nil {
t.Log(err)
}
}()

srcCommit, srcSyncDir, err := SourceCommitAndDirWithRetry(backoff, v1beta1.GitSource, cmpath.Absolute(commitDir), cmpath.RelativeOS(tc.syncDir), "root-reconciler")
if tc.expectedErrMsg == "" {
assert.Nil(t, err, "got unexpected error %v", err)
assert.Equal(t, tc.expectedSourceCommit, srcCommit)
assert.Equal(t, filepath.Join(commitDir, syncDir), srcSyncDir.OSPath())
} else {
assert.NotNil(t, err)
assert.Contains(t, err.Error(), tc.expectedErrMsg)
}

// Block and wait for the goroutine to complete.
<-doneCh
})
}

}

func TestRunHydrate(t *testing.T) {
testCases := []struct {
name string
Expand All @@ -56,7 +212,7 @@ func TestRunHydrate(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// create a temporary directory with a commit hash
tempDir, err := ioutil.TempDir(os.TempDir(), "run-hydrate-test")
tempDir, err := os.MkdirTemp(os.TempDir(), "run-hydrate-test")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -130,7 +286,7 @@ func TestComputeCommit(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// create a temporary directory with a commit hash
tempDir, err := ioutil.TempDir(os.TempDir(), "compute-commit-test")
tempDir, err := os.MkdirTemp(os.TempDir(), "compute-commit-test")
if err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/parse/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"kpt.dev/configsync/pkg/metadata"
"kpt.dev/configsync/pkg/metrics"
"kpt.dev/configsync/pkg/status"
"kpt.dev/configsync/pkg/util"
webhookconfiguration "kpt.dev/configsync/pkg/webhook/configuration"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -185,7 +186,8 @@ func Run(ctx context.Context, p Parser) {
func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) {
var syncDir cmpath.Absolute
gs := sourceStatus{}
gs.commit, syncDir, gs.errs = hydrate.SourceCommitAndDir(p.options().SourceType, p.options().SourceDir, p.options().SyncDir, p.options().reconcilerName)
// pull the source commit and directory with retries within 5 minutes.
gs.commit, syncDir, gs.errs = hydrate.SourceCommitAndDirWithRetry(util.SourceRetryBackoff, p.options().SourceType, p.options().SourceDir, p.options().SyncDir, p.options().reconcilerName)

// If failed to fetch the source commit and directory, set `.status.source` to fail early.
// Otherwise, set `.status.rendering` before `.status.source` because the parser needs to
Expand Down Expand Up @@ -356,7 +358,8 @@ func parseHydrationState(p Parser, srcState sourceState, hydrationStatus renderi

var hydrationErr hydrate.HydrationError
if _, err := os.Stat(absHydratedRoot.OSPath()); err == nil {
srcState, hydrationErr = options.readHydratedDir(absHydratedRoot, options.reconcilerName, srcState)
// pull the hydrated commit and directory with retries within 1 minute.
srcState, hydrationErr = options.readHydratedDirWithRetry(util.HydratedRetryBackoff, absHydratedRoot, options.reconcilerName, srcState)
if hydrationErr != nil {
hydrationStatus.message = RenderingFailed
hydrationStatus.errs = status.HydrationError(hydrationErr.Code(), hydrationErr)
Expand Down
Loading

0 comments on commit 5402b3a

Please sign in to comment.