Skip to content

Commit

Permalink
Merge pull request #203 from weaveworks/195-rewrite-promotion-algo
Browse files Browse the repository at this point in the history
Implementing promotion algorithm defined in level-triggered architecture rfc
  • Loading branch information
luizbafilho authored Oct 26, 2023
2 parents d19807f + 26eb06a commit 78b95a7
Show file tree
Hide file tree
Showing 13 changed files with 536 additions and 84 deletions.
2 changes: 2 additions & 0 deletions api/v1alpha1/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const (
TargetClusterNotReadyReason string = "TargetClusterNotReady"
// ReconciliationSucceededReason signals that a Pipeline has been successfully reconciled.
ReconciliationSucceededReason string = "ReconciliationSucceeded"
// EnvironmentNotReadyReason signals the environment is not ready.
EnvironmentNotReadyReason string = "EnvironmentNotReady"
)

// Reasons used by the level-triggered controller.
Expand Down
148 changes: 147 additions & 1 deletion controllers/leveltriggered/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/weaveworks/pipeline-controller/api/v1alpha1"
"github.com/weaveworks/pipeline-controller/pkg/conditions"
"github.com/weaveworks/pipeline-controller/server/strategy"
)

// PipelineReconciler reconciles a Pipeline object
Expand All @@ -30,16 +31,18 @@ type PipelineReconciler struct {
targetScheme *runtime.Scheme
ControllerName string
recorder record.EventRecorder
stratReg strategy.StrategyRegistry
}

func NewPipelineReconciler(c client.Client, s *runtime.Scheme, controllerName string) *PipelineReconciler {
func NewPipelineReconciler(c client.Client, s *runtime.Scheme, controllerName string, stratReg strategy.StrategyRegistry) *PipelineReconciler {
targetScheme := runtime.NewScheme()

return &PipelineReconciler{
Client: c,
Scheme: s,
targetScheme: targetScheme,
ControllerName: controllerName,
stratReg: stratReg,
}
}

Expand Down Expand Up @@ -182,9 +185,152 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
pipeline.GetNamespace(), pipeline.GetName(),
)

firstEnv := pipeline.Spec.Environments[0]

latestRevision := checkAllTargetsHaveSameRevision(pipeline.Status.Environments[firstEnv.Name])
if latestRevision == "" {
// not all targets have the same revision, or have no revision set, so we can't proceed
if err := r.setPendingCondition(ctx, pipeline, v1alpha1.EnvironmentNotReadyReason, "Waiting for all targets to have the same revision"); err != nil {
return ctrl.Result{Requeue: true}, fmt.Errorf("error setting pending condition: %w", err)
}

return ctrl.Result{}, nil
}

if !checkAllTargetsAreReady(pipeline.Status.Environments[firstEnv.Name]) {
// not all targets are ready, so we can't proceed
if err := r.setPendingCondition(ctx, pipeline, v1alpha1.EnvironmentNotReadyReason, "Waiting for all targets to be ready"); err != nil {
return ctrl.Result{}, fmt.Errorf("error setting pending condition: %w", err)
}

return ctrl.Result{}, nil
}

if err := r.removePendingCondition(ctx, pipeline); err != nil {
return ctrl.Result{}, fmt.Errorf("error removing pending condition: %w", err)
}

for _, env := range pipeline.Spec.Environments[1:] {
// if all targets run the latest revision and are ready, we can skip this environment
if checkAllTargetsRunRevision(pipeline.Status.Environments[env.Name], latestRevision) && checkAllTargetsAreReady(pipeline.Status.Environments[env.Name]) {
continue
}

if checkAnyTargetHasRevision(pipeline.Status.Environments[env.Name], latestRevision) {
return ctrl.Result{}, nil
}

err := r.promoteLatestRevision(ctx, pipeline, env, latestRevision)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error promoting new version: %w", err)
}

break
}

return ctrl.Result{}, nil
}

func (r *PipelineReconciler) setPendingCondition(ctx context.Context, pipeline v1alpha1.Pipeline, reason, message string) error {
condition := metav1.Condition{
Type: conditions.PromotionPendingCondition,
Status: metav1.ConditionTrue,
Reason: reason,
Message: message,
}

apimeta.SetStatusCondition(&pipeline.Status.Conditions, condition)

if err := r.patchStatus(ctx, client.ObjectKeyFromObject(&pipeline), pipeline.Status); err != nil {
return err
}

return nil
}

func (r *PipelineReconciler) removePendingCondition(ctx context.Context, pipeline v1alpha1.Pipeline) error {
apimeta.RemoveStatusCondition(&pipeline.Status.Conditions, conditions.PromotionPendingCondition)

if err := r.patchStatus(ctx, client.ObjectKeyFromObject(&pipeline), pipeline.Status); err != nil {
return err
}

return nil
}

func (r *PipelineReconciler) promoteLatestRevision(ctx context.Context, pipeline v1alpha1.Pipeline, env v1alpha1.Environment, revision string) error {
// none of the current strategies are idepontent, using it now to keep the ball rolling, but we need to implement
// strategies that are.

promotion := pipeline.Spec.GetPromotion(env.Name)
if promotion == nil {
return nil
}

strat, err := r.stratReg.Get(*promotion)
if err != nil {
return fmt.Errorf("error getting strategy from registry: %w", err)
}

prom := strategy.Promotion{
PipelineName: pipeline.Name,
PipelineNamespace: pipeline.Namespace,
Environment: env,
Version: revision,
}

_, err = strat.Promote(ctx, *pipeline.Spec.Promotion, prom)

return err
}

func checkAnyTargetHasRevision(env *v1alpha1.EnvironmentStatus, revision string) bool {
for _, target := range env.Targets {
if target.Revision == revision {
return true
}
}

return false
}

func checkAllTargetsRunRevision(env *v1alpha1.EnvironmentStatus, revision string) bool {
for _, target := range env.Targets {
if target.Revision != revision {
return false
}
}

return true
}

// checkAllTargetsHaveSameRevision returns a revision if all targets in the environment have the same,
// non-empty revision, and an empty string otherwise.
func checkAllTargetsHaveSameRevision(env *v1alpha1.EnvironmentStatus) string {
if len(env.Targets) == 0 {
return ""
}

revision := env.Targets[0].Revision
for _, target := range env.Targets {
if target.Revision != revision {
return ""
}
}

return revision
}

func checkAllTargetsAreReady(env *v1alpha1.EnvironmentStatus) bool {
for _, target := range env.Targets {
if !target.Ready {
return false
}
}

return true
}

// getClusterClient retrieves or creates a client for the cluster in question. A `nil` value for the argument indicates the local cluster.
func (r *PipelineReconciler) getClusterClient(cluster *clusterctrlv1alpha1.GitopsCluster) (client.Client, error) {
if cluster == nil {
Expand Down
Loading

0 comments on commit 78b95a7

Please sign in to comment.