Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implementing promotion algorithm defined in level-triggered architect…
Browse files Browse the repository at this point in the history
…ure rfc
Luiz Filho committed Oct 13, 2023
1 parent d19807f commit 8f9253e
Showing 10 changed files with 330 additions and 63 deletions.
70 changes: 69 additions & 1 deletion controllers/leveltriggered/controller.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ import (

"github.com/weaveworks/pipeline-controller/api/v1alpha1"
"github.com/weaveworks/pipeline-controller/pkg/conditions"
"github.com/weaveworks/pipeline-controller/server/strategy"
)

// PipelineReconciler reconciles a Pipeline object
@@ -30,16 +31,18 @@ type PipelineReconciler struct {
targetScheme *runtime.Scheme
ControllerName string
recorder record.EventRecorder
stratReg strategy.StrategyRegistry
}

func NewPipelineReconciler(c client.Client, s *runtime.Scheme, controllerName string) *PipelineReconciler {
func NewPipelineReconciler(c client.Client, s *runtime.Scheme, controllerName string, stratReg strategy.StrategyRegistry) *PipelineReconciler {
targetScheme := runtime.NewScheme()

return &PipelineReconciler{
Client: c,
Scheme: s,
targetScheme: targetScheme,
ControllerName: controllerName,
stratReg: stratReg,
}
}

@@ -182,9 +185,74 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
pipeline.GetNamespace(), pipeline.GetName(),
)

firstEnv := pipeline.Spec.Environments[0]

ok, latestRevision := checkAllTargetsHaveSameRevision(pipeline.Status.Environments[firstEnv.Name])
if !ok {
// not all targets have the same revision, so we can't proceed
return ctrl.Result{}, nil
}

for i := 1; i < len(pipeline.Spec.Environments); i++ {
env := pipeline.Spec.Environments[i]

ok, revision := checkAllTargetsHaveSameRevision(pipeline.Status.Environments[env.Name])
if !ok {
// not all targets have the same revision, so we can't proceed
return ctrl.Result{}, nil
}

if revision != latestRevision {
err := r.promoteLatestRevision(ctx, pipeline, env, latestRevision)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error promoting new version: %w", err)
}

return ctrl.Result{}, nil
}
}

return ctrl.Result{}, nil
}

func (r *PipelineReconciler) promoteLatestRevision(ctx context.Context, pipeline v1alpha1.Pipeline, env v1alpha1.Environment, revision string) error {
// none of the current strategies are idepontent, using it now to keep the ball rolling, but we need to implement
// strategies that are.

strat, err := r.stratReg.Get(*pipeline.Spec.Promotion)
if err != nil {
return fmt.Errorf("error getting strategy from registry: %w", err)
}

prom := strategy.Promotion{
PipelineName: pipeline.Name,
PipelineNamespace: pipeline.Namespace,
Environment: env,
Version: revision,
}

_, err = strat.Promote(ctx, *pipeline.Spec.Promotion, prom)

return err
}

// checkAllTargetsHaveSameRevision loops thought the targets of an environment and returns the revision only if all targets contains the same value,
// otherwise it returns an empty string.
func checkAllTargetsHaveSameRevision(env *v1alpha1.EnvironmentStatus) (bool, string) {
if len(env.Targets) == 0 {
return false, ""
}

revision := env.Targets[0].Revision
for _, target := range env.Targets {
if target.Revision != revision {
return false, ""
}
}

return true, revision
}

// getClusterClient retrieves or creates a client for the cluster in question. A `nil` value for the argument indicates the local cluster.
func (r *PipelineReconciler) getClusterClient(cluster *clusterctrlv1alpha1.GitopsCluster) (client.Client, error) {
if cluster == nil {
117 changes: 117 additions & 0 deletions controllers/leveltriggered/controller_test.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ import (
"github.com/fluxcd/pkg/runtime/conditions"
. "github.com/onsi/gomega"
clusterctrlv1alpha1 "github.com/weaveworks/cluster-controller/api/v1alpha1"
"go.uber.org/mock/gomock"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@@ -19,6 +20,7 @@ import (

"github.com/weaveworks/pipeline-controller/api/v1alpha1"
"github.com/weaveworks/pipeline-controller/internal/testingutils"
"github.com/weaveworks/pipeline-controller/server/strategy"
)

const (
@@ -121,6 +123,121 @@ func TestReconcile(t *testing.T) {
}, "5s", "0.2s").Should(BeTrue())
g.Expect(targetStatus.Revision).To(Equal(appRevision))
})

t.Run("promotes revision to all environments", func(t *testing.T) {
mockStrategy := setStrategyRegistry(t, pipelineReconciler)

name := "pipeline-" + rand.String(5)

managementNs := testingutils.NewNamespace(ctx, g, k8sClient)
devNs := testingutils.NewNamespace(ctx, g, k8sClient)
stagingNs := testingutils.NewNamespace(ctx, g, k8sClient)
prodNs := testingutils.NewNamespace(ctx, g, k8sClient)

pipeline := &v1alpha1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: managementNs.Name,
},
Spec: v1alpha1.PipelineSpec{
AppRef: v1alpha1.LocalAppReference{
APIVersion: "helm.toolkit.fluxcd.io/v2beta1",
Kind: "HelmRelease",
Name: name,
},
Environments: []v1alpha1.Environment{
{
Name: "dev",
Targets: []v1alpha1.Target{
{Namespace: devNs.Name},
},
},
{
Name: "staging",
Targets: []v1alpha1.Target{
{Namespace: stagingNs.Name},
},
},
{
Name: "prod",
Targets: []v1alpha1.Target{
{Namespace: prodNs.Name},
},
},
},
Promotion: &v1alpha1.Promotion{
Strategy: v1alpha1.Strategy{
Notification: &v1alpha1.NotificationPromotion{},
},
},
},
}

devApp := newApp(ctx, g, name, devNs.Name)
setAppRevision(ctx, g, devApp, "v1.0.0")

stagingApp := newApp(ctx, g, name, stagingNs.Name)
setAppRevision(ctx, g, stagingApp, "v1.0.0")

prodApp := newApp(ctx, g, name, prodNs.Name)
setAppRevision(ctx, g, prodApp, "v1.0.0")

g.Expect(k8sClient.Create(ctx, pipeline)).To(Succeed())
checkReadyCondition(ctx, g, client.ObjectKeyFromObject(pipeline), metav1.ConditionTrue, v1alpha1.ReconciliationSucceededReason)

mockStrategy.EXPECT().Handles(gomock.Any()).Return(true).AnyTimes()

versionToPromote := "v1.0.1"

mockStrategy.EXPECT().
Promote(gomock.Any(), *pipeline.Spec.Promotion, gomock.Any()).
AnyTimes().
Do(func(ctx context.Context, p v1alpha1.Promotion, prom strategy.Promotion) {
switch prom.Environment.Name {
case "staging":
setAppRevision(ctx, g, stagingApp, prom.Version)
case "prod":
setAppRevision(ctx, g, prodApp, prom.Version)
}
})

// Bumping dev revision to trigger the promotion
setAppRevision(ctx, g, devApp, versionToPromote)

// checks if the revision of all target status is v1.0.1
g.Eventually(func() bool {
p := getPipeline(ctx, g, client.ObjectKeyFromObject(pipeline))

for _, env := range p.Status.Environments {
for _, target := range env.Targets {
if target.Revision != versionToPromote {
return false
}
}
}

return true
}, "5s", "0.2s").Should(BeTrue())
})
}

func setAppRevision(ctx context.Context, g Gomega, hr *helmv2.HelmRelease, revision string) {
hr.Status.LastAppliedRevision = revision
apimeta.SetStatusCondition(&hr.Status.Conditions, metav1.Condition{Type: "Ready", Status: metav1.ConditionTrue, Reason: "test"})
g.Expect(k8sClient.Status().Update(ctx, hr)).To(Succeed())
}

func setStrategyRegistry(t *testing.T, r *PipelineReconciler) *strategy.MockStrategy {
mockCtrl := gomock.NewController(t)

var stratReg strategy.StrategyRegistry

mockStrategy := strategy.NewMockStrategy(mockCtrl)
stratReg.Register(mockStrategy)

r.stratReg = stratReg

return mockStrategy
}

func getPipeline(ctx context.Context, g Gomega, key client.ObjectKey) *v1alpha1.Pipeline {
6 changes: 4 additions & 2 deletions controllers/leveltriggered/suite_test.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ var k8sClient client.Client
var testEnv *envtest.Environment
var kubeConfig []byte
var eventRecorder *testEventRecorder
var pipelineReconciler *PipelineReconciler

type testEvent struct {
object runtime.Object
@@ -131,12 +132,13 @@ func TestMain(m *testing.M) {

eventRecorder = &testEventRecorder{events: map[string][]testEvent{}}

err = (&PipelineReconciler{
pipelineReconciler = &PipelineReconciler{
Client: k8sManager.GetClient(),
Scheme: scheme.Scheme,
targetScheme: scheme.Scheme,
recorder: eventRecorder,
}).SetupWithManager(k8sManager)
}
err = pipelineReconciler.SetupWithManager(k8sManager)
if err != nil {
log.Fatalf("setup pipeline controller failed: %s", err)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -137,6 +137,7 @@ require (
github.com/xanzy/ssh-agent v0.3.3 // indirect
github.com/xlab/treeprint v1.1.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/mock v0.3.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -509,6 +509,8 @@ go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/mock v0.3.0 h1:3mUxI1No2/60yUYax92Pt8eNOEecx2D3lcXZh2NEZJo=
go.uber.org/mock v0.3.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
1 change: 1 addition & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
@@ -781,6 +781,7 @@ golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
41 changes: 21 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
@@ -111,12 +111,33 @@ func main() {
os.Exit(1)
}

pullRequestStrategy, err := pullrequest.New(
mgr.GetClient(),
log.WithValues("strategy", "pullrequest"),
)
if err != nil {
setupLog.Error(err, "unable to create GitHub promotion strategy")
os.Exit(1)
}

var eventRecorder *events.Recorder
if eventRecorder, err = events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName); err != nil {
setupLog.Error(err, "unable to create event recorder")
os.Exit(1)
}
notificationStrat, _ := notification.NewNotification(mgr.GetClient(), eventRecorder)

var stratReg strategy.StrategyRegistry
stratReg.Register(pullRequestStrategy)
stratReg.Register(notificationStrat)

var startErr error
if useLevelTriggeredController {
startErr = leveltriggered.NewPipelineReconciler(
mgr.GetClient(),
mgr.GetScheme(),
controllerName,
stratReg,
).SetupWithManager(mgr)
} else {
startErr = controllers.NewPipelineReconciler(
@@ -142,26 +163,6 @@ func main() {

ctx := ctrl.SetupSignalHandler()

pullRequestStrategy, err := pullrequest.New(
mgr.GetClient(),
log.WithValues("strategy", "pullrequest"),
)
if err != nil {
setupLog.Error(err, "unable to create GitHub promotion strategy")
os.Exit(1)
}

var eventRecorder *events.Recorder
if eventRecorder, err = events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName); err != nil {
setupLog.Error(err, "unable to create event recorder")
os.Exit(1)
}
notificationStrat, _ := notification.NewNotification(mgr.GetClient(), eventRecorder)

var stratReg strategy.StrategyRegistry
stratReg.Register(pullRequestStrategy)
stratReg.Register(notificationStrat)

promServer, err := server.NewPromotionServer(
mgr.GetClient(),
server.WithRateLimit(promotionRateLimit, time.Duration(promotionRateLimitIntervalSeconds)*time.Second),
69 changes: 69 additions & 0 deletions server/strategy/mock_strategy.go
84 changes: 44 additions & 40 deletions server/strategy/pullrequest/mock_gitprovider_test.go
2 changes: 2 additions & 0 deletions server/strategy/strategy.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,8 @@ import (
pipelinev1alpha1 "github.com/weaveworks/pipeline-controller/api/v1alpha1"
)

//go:generate mockgen -destination mock_strategy_test.go -package strategy github.com/weaveworks/pipeline-controller/server/strategy Strategy

// Strategy is the interface that all types need to implement that intend to handle at least one of the strategies requested in a Pipeline's
// `.spec.promotion` field.
type Strategy interface {

0 comments on commit 8f9253e

Please sign in to comment.