Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various minor fixes #825

Merged
merged 7 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion api/v2beta2/helmrelease_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ const (
HelmReleaseFinalizer = "finalizers.fluxcd.io"
)

const (
// defaultMaxHistory is the default number of Helm release versions to keep.
defaultMaxHistory = 5
)

// Kustomize Helm PostRenderer specification.
type Kustomize struct {
// Strategic merge and JSON patches, defined as inline YAML objects,
Expand Down Expand Up @@ -1200,7 +1205,7 @@ func (in HelmRelease) GetTimeout() metav1.Duration {
// GetMaxHistory returns the configured MaxHistory, or the default of 5.
func (in HelmRelease) GetMaxHistory() int {
if in.Spec.MaxHistory == nil {
return 5
return defaultMaxHistory
}
return *in.Spec.MaxHistory
}
Expand Down
11 changes: 10 additions & 1 deletion api/v2beta2/snapshot_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,13 @@ func (in Snapshots) Previous(ignoreTests bool) *Snapshot {
}

// Truncate removes all Snapshots up to the Previous deployed Snapshot.
// If there is no previous-deployed Snapshot, no Snapshots are removed.
// If there is no previous-deployed Snapshot, the most recent 5 Snapshots are
// retained.
func (in *Snapshots) Truncate(ignoreTests bool) {
if in.Len() < 2 {
return
}

in.SortByVersion()
for i := range (*in)[1:] {
s := (*in)[i+1]
Expand All @@ -96,6 +98,13 @@ func (in *Snapshots) Truncate(ignoreTests bool) {
}
}
}

if in.Len() > defaultMaxHistory {
// If none of the Snapshots are deployed or superseded, and there
// are more than the defaultMaxHistory, truncate to the most recent
// Snapshots.
*in = (*in)[:defaultMaxHistory]
}
}

// Snapshot captures a point-in-time copy of the status information for a Helm release,
Expand Down
18 changes: 18 additions & 0 deletions api/v2beta2/snapshot_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,24 @@ func TestSnapshots_Truncate(t *testing.T) {
}},
},
},
{
name: "retains most recent snapshots when all have failed",
in: Snapshots{
{Version: 6, Status: "deployed"},
{Version: 5, Status: "failed"},
{Version: 4, Status: "failed"},
{Version: 3, Status: "failed"},
{Version: 2, Status: "failed"},
{Version: 1, Status: "failed"},
},
want: Snapshots{
{Version: 6, Status: "deployed"},
{Version: 5, Status: "failed"},
{Version: 4, Status: "failed"},
{Version: 3, Status: "failed"},
{Version: 2, Status: "failed"},
},
},
{
name: "without previous snapshot",
in: Snapshots{
Expand Down
57 changes: 48 additions & 9 deletions internal/controller/helmrelease_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (

"github.com/hashicorp/go-retryablehttp"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
apierrutil "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/rest"
kuberecorder "k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -59,6 +61,7 @@ import (
"github.com/fluxcd/helm-controller/internal/action"
"github.com/fluxcd/helm-controller/internal/chartutil"
"github.com/fluxcd/helm-controller/internal/digest"
interrors "github.com/fluxcd/helm-controller/internal/errors"
"github.com/fluxcd/helm-controller/internal/features"
"github.com/fluxcd/helm-controller/internal/kube"
"github.com/fluxcd/helm-controller/internal/loader"
Expand Down Expand Up @@ -97,6 +100,11 @@ type HelmReleaseReconcilerOptions struct {
RateLimiter ratelimiter.RateLimiter
}

var (
errWaitForDependency = errors.New("must wait for dependency")
errWaitForChart = errors.New("must wait for chart")
)

func (r *HelmReleaseReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opts HelmReleaseReconcilerOptions) error {
// Index the HelmRelease by the HelmChart references they point at
if err := mgr.GetFieldIndexer().IndexField(ctx, &v2.HelmRelease{}, v2.SourceIndexKey,
Expand Down Expand Up @@ -167,16 +175,29 @@ func (r *HelmReleaseReconciler) Reconcile(ctx context.Context, req ctrl.Request)
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
}

// We do not want to return these errors, but rather wait for the
// designated RequeueAfter to expire and try again.
// However, not returning an error will cause the patch helper to
// patch the observed generation, which we do not want. So we ignore
// these errors here after patching.
retErr = interrors.Ignore(retErr, errWaitForDependency, errWaitForChart)

if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
if !obj.DeletionTimestamp.IsZero() {
err = apierrutil.FilterOut(err, func(e error) bool { return apierrors.IsNotFound(e) })
}
retErr = apierrutil.Reduce(apierrutil.NewAggregate([]error{retErr, err}))
}

// Always record suspend, readiness and duration metrics.
r.Metrics.RecordSuspend(ctx, obj, obj.Spec.Suspend)
r.Metrics.RecordReadiness(ctx, obj)
// Wait for the object to have synced in-cache after patching.
// This is required to ensure that the next reconciliation will
// operate on the patched object when an immediate reconcile is
// requested.
if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 10*time.Second, true, r.waitForHistoryCacheSync(obj)); err != nil {
log.Error(err, "failed to wait for object to sync in-cache after patching")
}

// Record the duration of the reconciliation.
r.Metrics.RecordDuration(ctx, obj, start)
}()

Expand Down Expand Up @@ -213,7 +234,7 @@ func (r *HelmReleaseReconciler) reconcileRelease(ctx context.Context, patchHelpe

// Mark the resource as under reconciliation.
conditions.MarkReconciling(obj, meta.ProgressingReason, "Fulfilling prerequisites")
if err := patchHelper.Patch(ctx, obj); err != nil {
if err := patchHelper.Patch(ctx, obj, patch.WithOwnedConditions{Conditions: intreconcile.OwnedConditions}, patch.WithFieldOwner(r.FieldManager)); err != nil {
return ctrl.Result{}, err
}

Expand All @@ -230,7 +251,7 @@ func (r *HelmReleaseReconciler) reconcileRelease(ctx context.Context, patchHelpe

// Exponential backoff would cause execution to be prolonged too much,
// instead we requeue on a fixed interval.
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
return ctrl.Result{RequeueAfter: r.requeueDependency}, errWaitForDependency
}

log.Info("all dependencies are ready")
Expand Down Expand Up @@ -262,7 +283,7 @@ func (r *HelmReleaseReconciler) reconcileRelease(ctx context.Context, patchHelpe
conditions.MarkFalse(obj, meta.ReadyCondition, "HelmChartNotReady", msg)
// Do not requeue immediately, when the artifact is created
// the watcher should trigger a reconciliation.
return jitter.JitteredRequeueInterval(ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}), nil
return jitter.JitteredRequeueInterval(ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}), errWaitForChart
}

// Compose values based from the spec and references.
Expand All @@ -276,10 +297,10 @@ func (r *HelmReleaseReconciler) reconcileRelease(ctx context.Context, patchHelpe
loadedChart, err := loader.SecureLoadChartFromURL(r.httpClient, hc.GetArtifact().URL, hc.GetArtifact().Digest)
if err != nil {
if errors.Is(err, loader.ErrFileNotFound) {
msg := fmt.Sprintf("Chart not ready: artifact not found. Retrying in %s", r.requeueDependency)
msg := fmt.Sprintf("Chart not ready: artifact not found. Retrying in %s", r.requeueDependency.String())
conditions.MarkFalse(obj, meta.ReadyCondition, v2.ArtifactFailedReason, msg)
log.Info(msg)
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
return ctrl.Result{RequeueAfter: r.requeueDependency}, errWaitForDependency
}

conditions.MarkFalse(obj, meta.ReadyCondition, v2.ArtifactFailedReason, fmt.Sprintf("Could not load chart: %s", err.Error()))
Expand Down Expand Up @@ -352,7 +373,7 @@ func (r *HelmReleaseReconciler) reconcileRelease(ctx context.Context, patchHelpe
if errors.Is(err, intreconcile.ErrMustRequeue) {
return ctrl.Result{Requeue: true}, nil
}
if errors.Is(err, intreconcile.ErrExceededMaxRetries) {
if interrors.IsOneOf(err, intreconcile.ErrExceededMaxRetries, intreconcile.ErrMissingRollbackTarget) {
err = reconcile.TerminalError(err)
}
return ctrl.Result{}, err
Expand Down Expand Up @@ -633,6 +654,24 @@ func (r *HelmReleaseReconciler) getHelmChart(ctx context.Context, obj *v2.HelmRe
return &hc, nil
}

// waitForHistoryCacheSync returns a function that can be used to wait for the
// cache backing the Kubernetes client to be in sync with the current state of
// the v2beta2.HelmRelease.
// This is a trade-off between not caching at all, and introducing a slight
// delay to ensure we always have the latest history state.
func (r *HelmReleaseReconciler) waitForHistoryCacheSync(obj *v2.HelmRelease) wait.ConditionWithContextFunc {
newObj := &v2.HelmRelease{}
return func(ctx context.Context) (bool, error) {
if err := r.Get(ctx, client.ObjectKeyFromObject(obj), newObj); err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
return false, err
}
return apiequality.Semantic.DeepEqual(obj.Status.History, newObj.Status.History), nil
}
}

func (r *HelmReleaseReconciler) requestsForHelmChartChange(ctx context.Context, o client.Object) []reconcile.Request {
hc, ok := o.(*sourcev1.HelmChart)
if !ok {
Expand Down
120 changes: 115 additions & 5 deletions internal/controller/helmrelease_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestHelmReleaseReconciler_reconcileRelease(t *testing.T) {
}

res, err := r.reconcileRelease(context.TODO(), patch.NewSerialPatcher(obj, r.Client), obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).To(Equal(errWaitForDependency))
g.Expect(res.RequeueAfter).To(Equal(r.requeueDependency))

g.Expect(obj.Status.Conditions).To(conditions.MatchConditions([]metav1.Condition{
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestHelmReleaseReconciler_reconcileRelease(t *testing.T) {
}

res, err := r.reconcileRelease(context.TODO(), patch.NewSerialPatcher(obj, r.Client), obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).To(Equal(errWaitForChart))
g.Expect(res.RequeueAfter).To(Equal(obj.Spec.Interval.Duration))

g.Expect(obj.Status.Conditions).To(conditions.MatchConditions([]metav1.Condition{
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestHelmReleaseReconciler_reconcileRelease(t *testing.T) {
}

res, err := r.reconcileRelease(context.TODO(), patch.NewSerialPatcher(obj, r.Client), obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).To(Equal(errWaitForChart))
g.Expect(res.RequeueAfter).To(Equal(obj.Spec.Interval.Duration))

g.Expect(obj.Status.Conditions).To(conditions.MatchConditions([]metav1.Condition{
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestHelmReleaseReconciler_reconcileRelease(t *testing.T) {
}

res, err := r.reconcileRelease(context.TODO(), patch.NewSerialPatcher(obj, r.Client), obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).To(Equal(errWaitForChart))
g.Expect(res.RequeueAfter).To(Equal(obj.Spec.Interval.Duration))

g.Expect(obj.Status.Conditions).To(conditions.MatchConditions([]metav1.Condition{
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestHelmReleaseReconciler_reconcileRelease(t *testing.T) {
}

res, err := r.reconcileRelease(context.TODO(), patch.NewSerialPatcher(obj, r.Client), obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(err).To(Equal(errWaitForDependency))
g.Expect(res.RequeueAfter).To(Equal(r.requeueDependency))

g.Expect(obj.Status.Conditions).To(conditions.MatchConditions([]metav1.Condition{
Expand Down Expand Up @@ -1931,6 +1931,116 @@ func TestHelmReleaseReconciler_getHelmChart(t *testing.T) {
}
}

func Test_waitForHistoryCacheSync(t *testing.T) {
tests := []struct {
name string
rel *v2.HelmRelease
cacheRel *v2.HelmRelease
want bool
}{
{
name: "different history",
rel: &v2.HelmRelease{
ObjectMeta: metav1.ObjectMeta{
Name: "some-name",
},
Status: v2.HelmReleaseStatus{
History: v2.Snapshots{
{
Version: 2,
Status: "deployed",
},
{
Version: 1,
Status: "failed",
},
},
},
},
cacheRel: &v2.HelmRelease{
ObjectMeta: metav1.ObjectMeta{
Name: "some-name",
},
Status: v2.HelmReleaseStatus{
History: v2.Snapshots{
{
Version: 1,
Status: "deployed",
},
},
},
},
want: false,
},
{
name: "same history",
rel: &v2.HelmRelease{
ObjectMeta: metav1.ObjectMeta{
Name: "some-name",
},
Status: v2.HelmReleaseStatus{
History: v2.Snapshots{
{
Version: 2,
Status: "deployed",
},
{
Version: 1,
Status: "failed",
},
},
},
},
cacheRel: &v2.HelmRelease{
ObjectMeta: metav1.ObjectMeta{
Name: "some-name",
},
Status: v2.HelmReleaseStatus{
History: v2.Snapshots{
{
Version: 2,
Status: "deployed",
},
{
Version: 1,
Status: "failed",
},
},
},
},
want: true,
},
{
name: "does not exist",
rel: &v2.HelmRelease{
ObjectMeta: metav1.ObjectMeta{
Name: "some-name",
},
},
cacheRel: nil,
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

c := fake.NewClientBuilder()
c.WithScheme(NewTestScheme())
if tt.cacheRel != nil {
c.WithObjects(tt.cacheRel)
}
r := &HelmReleaseReconciler{
Client: c.Build(),
}

got, err := r.waitForHistoryCacheSync(tt.rel)(context.Background())
g.Expect(err).ToNot(HaveOccurred())
g.Expect(got == tt.want).To(BeTrue())
})
}
}

func TestValuesReferenceValidation(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading