From 1ab2bbb571e083aa622d8fb62288752298824cba Mon Sep 17 00:00:00 2001 From: Lin Yang Date: Thu, 5 Dec 2024 10:12:01 +0800 Subject: [PATCH] fix: commit after deriving codebase and updating configs (#501) * fix: commit after deriving codebase and updating configs Signed-off-by: Lin Yang * chore: adjust log level [skip ci] Signed-off-by: Lin Yang --------- Signed-off-by: Lin Yang --- .../gateway/v1/gateway_controller.go | 15 ++-- pkg/gateway/processor/v2/config.go | 51 ++++++++++++- pkg/gateway/processor/v2/triggers.go | 2 +- pkg/manager/repo/repo.go | 72 ++++++++++--------- pkg/repo/client.go | 29 +++++--- 5 files changed, 115 insertions(+), 54 deletions(-) diff --git a/pkg/controllers/gateway/v1/gateway_controller.go b/pkg/controllers/gateway/v1/gateway_controller.go index e8d29bc60..0ab1ce07a 100644 --- a/pkg/controllers/gateway/v1/gateway_controller.go +++ b/pkg/controllers/gateway/v1/gateway_controller.go @@ -29,7 +29,6 @@ import ( _ "embed" "fmt" "sync" - "time" extv1alpha1 "github.com/flomesh-io/fsm/pkg/apis/extension/v1alpha1" @@ -540,13 +539,13 @@ func (r *gatewayReconciler) applyGateway(gateway *gwv1.Gateway, update *gw.Gatew } func (r *gatewayReconciler) deriveCodebases(gw *gwv1.Gateway, _ configurator.Configurator) (ctrl.Result, error) { - gwPath := utils.GatewayCodebasePath(gw.Namespace, gw.Name) - parentPath := utils.GetDefaultGatewaysPath() - if err := r.fctx.RepoClient.DeriveCodebase(gwPath, parentPath); err != nil { - defer r.recorder.Eventf(gw, corev1.EventTypeWarning, "Codebase", "Failed to derive codebase of gateway: %s", err) - - return ctrl.Result{RequeueAfter: 1 * time.Second}, err - } + //gwPath := utils.GatewayCodebasePath(gw.Namespace, gw.Name) + //parentPath := utils.GetDefaultGatewaysPath() + //if err := r.fctx.RepoClient.DeriveCodebaseOnly(gwPath, parentPath); err != nil { + // defer r.recorder.Eventf(gw, corev1.EventTypeWarning, "Codebase", "Failed to derive codebase of gateway: %s", err) + // + // return ctrl.Result{RequeueAfter: 1 * time.Second}, err + //} return ctrl.Result{}, nil } diff --git a/pkg/gateway/processor/v2/config.go b/pkg/gateway/processor/v2/config.go index 4c1f809ab..8744469a6 100644 --- a/pkg/gateway/processor/v2/config.go +++ b/pkg/gateway/processor/v2/config.go @@ -4,6 +4,8 @@ import ( "fmt" "strings" + "github.com/flomesh-io/fsm/pkg/constants" + "k8s.io/apimachinery/pkg/util/sets" "github.com/ghodss/yaml" @@ -16,6 +18,7 @@ import ( "github.com/tidwall/gjson" "github.com/flomesh-io/fsm/pkg/gateway/fgw" + mrepo "github.com/flomesh-io/fsm/pkg/manager/repo" "github.com/flomesh-io/fsm/pkg/repo" "github.com/flomesh-io/fsm/pkg/utils" ) @@ -25,6 +28,10 @@ func (c *GatewayProcessor) BuildConfigs() { c.mutex.Lock() defer c.mutex.Unlock() + if !c.preCheck() { + return + } + for _, gw := range gwutils.GetActiveGateways(c.client) { cfg := NewGatewayConfigGenerator(gw, c, c.client, c.cfg).Generate() @@ -32,6 +39,30 @@ func (c *GatewayProcessor) BuildConfigs() { } } +func (c *GatewayProcessor) preCheck() bool { + if !c.repoClient.IsRepoUp() { + log.Trace().Msg("Repo is not up, ignore ...") + return false + } + + if !c.repoClient.CodebaseExists(constants.DefaultGatewayBasePath) { + if err := c.repoClient.Batch([]repo.Batch{mrepo.GatewaysBatch()}); err != nil { + log.Error().Msgf("Failed to write gateway scripts to repo: %s", err) + return false + } + } + + defaultGatewaysPath := utils.GetDefaultGatewaysPath() + if !c.repoClient.CodebaseExists(defaultGatewaysPath) { + if err := c.repoClient.DeriveCodebase(defaultGatewaysPath, constants.DefaultGatewayBasePath); err != nil { + log.Error().Msgf("%q failed to derive codebase %q: %s", defaultGatewaysPath, constants.DefaultGatewayBasePath, err) + return false + } + } + + return true +} + //func (c *GatewayProcessor) syncConfig(gateway *gwv1.Gateway, config fgw.Config) { // gatewayPath := utils.GatewayCodebasePath(gateway.Namespace, gateway.Name) // if exists := c.repoClient.CodebaseExists(gatewayPath); !exists { @@ -67,11 +98,12 @@ func (c *GatewayProcessor) BuildConfigs() { //} func (c *GatewayProcessor) syncConfigDir(gateway *gwv1.Gateway, config fgw.Config) { - gatewayPath := utils.GatewayCodebasePath(gateway.Namespace, gateway.Name) - if exists := c.repoClient.CodebaseExists(gatewayPath); !exists { + if !c.checkGatewayCodebase(gateway) { return } + gatewayPath := utils.GatewayCodebasePath(gateway.Namespace, gateway.Name) + jsonVersion, err := c.getVersion(gatewayPath, "config/version.json") if err != nil { return @@ -145,6 +177,21 @@ func (c *GatewayProcessor) syncConfigDir(gateway *gwv1.Gateway, config fgw.Confi } } +func (c *GatewayProcessor) checkGatewayCodebase(gateway *gwv1.Gateway) bool { + gatewayPath := utils.GatewayCodebasePath(gateway.Namespace, gateway.Name) + parentPath := utils.GetDefaultGatewaysPath() + + if !c.repoClient.CodebaseExists(gatewayPath) { + // Derive codebase only, don't commit it, the codebase will be committed when all configs are ready + if err := c.repoClient.DeriveCodebaseOnly(gatewayPath, parentPath); err != nil { + log.Error().Msgf("Failed to derive codebase %q: %s", gatewayPath, err) + return false + } + } + + return true +} + func (c *GatewayProcessor) getDelItems(gatewayPath string, batch repo.Batch) ([]string, error) { files, err := c.repoClient.ListFiles(gatewayPath) if err != nil { diff --git a/pkg/gateway/processor/v2/triggers.go b/pkg/gateway/processor/v2/triggers.go index 38aaa2a63..484923f54 100644 --- a/pkg/gateway/processor/v2/triggers.go +++ b/pkg/gateway/processor/v2/triggers.go @@ -349,7 +349,7 @@ func (c *GatewayProcessor) IsFilterConfigReferred(kind string, config client.Obj func (c *GatewayProcessor) IsHeadlessServiceWithoutSelector(key client.ObjectKey) bool { service, err := c.getServiceFromCache(key) if err != nil { - log.Error().Msgf("failed to get service from processor: %v", err) + log.Warn().Msgf("failed to get service from processor: %v", err) return false } diff --git a/pkg/manager/repo/repo.go b/pkg/manager/repo/repo.go index bb8ace15f..38e3641c6 100644 --- a/pkg/manager/repo/repo.go +++ b/pkg/manager/repo/repo.go @@ -38,10 +38,8 @@ import ( corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" - gwv1 "sigs.k8s.io/gateway-api/apis/v1" nsigv1alpha1 "github.com/flomesh-io/fsm/pkg/apis/namespacedingress/v1alpha1" - gwutils "github.com/flomesh-io/fsm/pkg/gateway/utils" mutils "github.com/flomesh-io/fsm/pkg/manager/utils" "k8s.io/apimachinery/pkg/util/wait" @@ -144,6 +142,10 @@ func gatewaysBatch() repo.Batch { return createBatch(constants.DefaultGatewayBasePath, fmt.Sprintf("%s/gateways", scriptsRoot)) } +func GatewaysBatch() repo.Batch { + return gatewaysBatch() +} + func createBatch(repoPath, scriptsDir string) repo.Batch { batch := repo.Batch{ Basepath: repoPath, @@ -233,9 +235,9 @@ func (r *rebuilder) rebuildRepoJob() error { if !r.repoClient.CodebaseExists(constants.DefaultServiceBasePath) { batches = append(batches, servicesBatch()) } - if !r.repoClient.CodebaseExists(constants.DefaultGatewayBasePath) { - batches = append(batches, gatewaysBatch()) - } + //if !r.repoClient.CodebaseExists(constants.DefaultGatewayBasePath) { + // batches = append(batches, gatewaysBatch()) + //} if len(batches) > 0 { if err := r.repoClient.Batch(batches); err != nil { @@ -279,36 +281,36 @@ func (r *rebuilder) rebuildRepoJob() error { } } - if r.mc.IsGatewayAPIEnabled() { - defaultGatewaysPath := utils.GetDefaultGatewaysPath() - if err := r.repoClient.DeriveCodebase(defaultGatewaysPath, constants.DefaultGatewayBasePath); err != nil { - log.Error().Msgf("%q failed to derive codebase %q: %s", defaultGatewaysPath, constants.DefaultGatewayBasePath, err) - return err - } - - gatewayList := &gwv1.GatewayList{} - if err := r.client.List( - context.TODO(), - gatewayList, - client.InNamespace(corev1.NamespaceAll), - ); err != nil { - log.Error().Msgf("Failed to list all gateways: %s", err) - return err - } - - log.Debug().Msgf("Found %d gateways", len(gatewayList.Items)) - - for _, gw := range gatewayList.Items { - gw := gw // fix lint GO-LOOP-REF - if gwutils.IsActiveGateway(&gw) { - gwPath := utils.GatewayCodebasePath(gw.Namespace, gw.Name) - parentPath := utils.GetDefaultGatewaysPath() - if err := r.repoClient.DeriveCodebase(gwPath, parentPath); err != nil { - return err - } - } - } - } + //if r.mc.IsGatewayAPIEnabled() { + // defaultGatewaysPath := utils.GetDefaultGatewaysPath() + // if err := r.repoClient.DeriveCodebase(defaultGatewaysPath, constants.DefaultGatewayBasePath); err != nil { + // log.Error().Msgf("%q failed to derive codebase %q: %s", defaultGatewaysPath, constants.DefaultGatewayBasePath, err) + // return err + // } + + //gatewayList := &gwv1.GatewayList{} + //if err := r.client.List( + // context.TODO(), + // gatewayList, + // client.InNamespace(corev1.NamespaceAll), + //); err != nil { + // log.Error().Msgf("Failed to list all gateways: %s", err) + // return err + //} + // + //log.Debug().Msgf("Found %d gateways", len(gatewayList.Items)) + // + //for _, gw := range gatewayList.Items { + // gw := gw // fix lint GO-LOOP-REF + // if gwutils.IsActiveGateway(&gw) { + // gwPath := utils.GatewayCodebasePath(gw.Namespace, gw.Name) + // parentPath := utils.GetDefaultGatewaysPath() + // if err := r.repoClient.DeriveCodebaseOnly(gwPath, parentPath); err != nil { + // return err + // } + // } + //} + //} log.Trace().Msg("<<<<<< rebuilding repo - end >>>>>> ") return nil diff --git a/pkg/repo/client.go b/pkg/repo/client.go index ef7316c64..01586c84a 100644 --- a/pkg/repo/client.go +++ b/pkg/repo/client.go @@ -343,11 +343,22 @@ func (p *PipyRepoClient) Batch(batches []Batch) error { return nil } +// DeriveCodebase derives a codebase from a base codebase and commit it func (p *PipyRepoClient) DeriveCodebase(path, base string) error { - log.Debug().Msgf("Checking if exists, codebase %q", path) - exists, _ := p.codebaseExists(path) + return p.deriveCodebaseAndCommit(path, base, true) +} - if exists { +// DeriveCodebaseOnly derives a codebase from a base codebase without committing it +func (p *PipyRepoClient) DeriveCodebaseOnly(path, base string) error { + return p.deriveCodebaseAndCommit(path, base, false) +} + +func (p *PipyRepoClient) deriveCodebaseAndCommit(path, base string, commit bool) error { + if baseExists, _ := p.codebaseExists(base); !baseExists { + return fmt.Errorf("base codebase %q doesn't exist", base) + } + + if exists, _ := p.codebaseExists(path); exists { log.Debug().Msgf("Codebase %q already exists, ignore deriving ...", path) } else { log.Debug().Msgf("Codebase %q doesn't exist, deriving ...", path) @@ -358,12 +369,14 @@ func (p *PipyRepoClient) DeriveCodebase(path, base string) error { } log.Debug().Msgf("Successfully derived codebase %q", path) - log.Debug().Msgf("Committing the changes of codebase %q", path) - if err = p.commit(path, result.Version); err != nil { - log.Error().Msgf("Committing codebase %q error: %v", path, err) - return err + if commit { + log.Debug().Msgf("Committing the changes of codebase %q", path) + if err = p.commit(path, result.Version); err != nil { + log.Error().Msgf("Committing codebase %q error: %v", path, err) + return err + } + log.Debug().Msgf("Successfully committed codebase %q", path) } - log.Debug().Msgf("Successfully committed codebase %q", path) } return nil