Skip to content

Commit

Permalink
fix: commit after deriving codebase and updating configs (#501)
Browse files Browse the repository at this point in the history
* fix: commit after deriving codebase and updating configs

Signed-off-by: Lin Yang <[email protected]>

* chore: adjust log level [skip ci]

Signed-off-by: Lin Yang <[email protected]>

---------

Signed-off-by: Lin Yang <[email protected]>
  • Loading branch information
reaver-flomesh authored Dec 5, 2024
1 parent 2e0036b commit 1ab2bbb
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 54 deletions.
15 changes: 7 additions & 8 deletions pkg/controllers/gateway/v1/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
_ "embed"
"fmt"
"sync"
"time"

extv1alpha1 "github.com/flomesh-io/fsm/pkg/apis/extension/v1alpha1"

Expand Down Expand Up @@ -540,13 +539,13 @@ func (r *gatewayReconciler) applyGateway(gateway *gwv1.Gateway, update *gw.Gatew
}

func (r *gatewayReconciler) deriveCodebases(gw *gwv1.Gateway, _ configurator.Configurator) (ctrl.Result, error) {
gwPath := utils.GatewayCodebasePath(gw.Namespace, gw.Name)
parentPath := utils.GetDefaultGatewaysPath()
if err := r.fctx.RepoClient.DeriveCodebase(gwPath, parentPath); err != nil {
defer r.recorder.Eventf(gw, corev1.EventTypeWarning, "Codebase", "Failed to derive codebase of gateway: %s", err)

return ctrl.Result{RequeueAfter: 1 * time.Second}, err
}
//gwPath := utils.GatewayCodebasePath(gw.Namespace, gw.Name)
//parentPath := utils.GetDefaultGatewaysPath()
//if err := r.fctx.RepoClient.DeriveCodebaseOnly(gwPath, parentPath); err != nil {
// defer r.recorder.Eventf(gw, corev1.EventTypeWarning, "Codebase", "Failed to derive codebase of gateway: %s", err)
//
// return ctrl.Result{RequeueAfter: 1 * time.Second}, err
//}

return ctrl.Result{}, nil
}
Expand Down
51 changes: 49 additions & 2 deletions pkg/gateway/processor/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"strings"

"github.com/flomesh-io/fsm/pkg/constants"

"k8s.io/apimachinery/pkg/util/sets"

"github.com/ghodss/yaml"
Expand All @@ -16,6 +18,7 @@ import (
"github.com/tidwall/gjson"

"github.com/flomesh-io/fsm/pkg/gateway/fgw"
mrepo "github.com/flomesh-io/fsm/pkg/manager/repo"
"github.com/flomesh-io/fsm/pkg/repo"
"github.com/flomesh-io/fsm/pkg/utils"
)
Expand All @@ -25,13 +28,41 @@ func (c *GatewayProcessor) BuildConfigs() {
c.mutex.Lock()
defer c.mutex.Unlock()

if !c.preCheck() {
return
}

for _, gw := range gwutils.GetActiveGateways(c.client) {
cfg := NewGatewayConfigGenerator(gw, c, c.client, c.cfg).Generate()

go c.syncConfigDir(gw, cfg)
}
}

func (c *GatewayProcessor) preCheck() bool {
if !c.repoClient.IsRepoUp() {
log.Trace().Msg("Repo is not up, ignore ...")
return false
}

if !c.repoClient.CodebaseExists(constants.DefaultGatewayBasePath) {
if err := c.repoClient.Batch([]repo.Batch{mrepo.GatewaysBatch()}); err != nil {
log.Error().Msgf("Failed to write gateway scripts to repo: %s", err)
return false
}
}

defaultGatewaysPath := utils.GetDefaultGatewaysPath()
if !c.repoClient.CodebaseExists(defaultGatewaysPath) {
if err := c.repoClient.DeriveCodebase(defaultGatewaysPath, constants.DefaultGatewayBasePath); err != nil {
log.Error().Msgf("%q failed to derive codebase %q: %s", defaultGatewaysPath, constants.DefaultGatewayBasePath, err)
return false
}
}

return true
}

//func (c *GatewayProcessor) syncConfig(gateway *gwv1.Gateway, config fgw.Config) {
// gatewayPath := utils.GatewayCodebasePath(gateway.Namespace, gateway.Name)
// if exists := c.repoClient.CodebaseExists(gatewayPath); !exists {
Expand Down Expand Up @@ -67,11 +98,12 @@ func (c *GatewayProcessor) BuildConfigs() {
//}

func (c *GatewayProcessor) syncConfigDir(gateway *gwv1.Gateway, config fgw.Config) {
gatewayPath := utils.GatewayCodebasePath(gateway.Namespace, gateway.Name)
if exists := c.repoClient.CodebaseExists(gatewayPath); !exists {
if !c.checkGatewayCodebase(gateway) {
return
}

gatewayPath := utils.GatewayCodebasePath(gateway.Namespace, gateway.Name)

jsonVersion, err := c.getVersion(gatewayPath, "config/version.json")
if err != nil {
return
Expand Down Expand Up @@ -145,6 +177,21 @@ func (c *GatewayProcessor) syncConfigDir(gateway *gwv1.Gateway, config fgw.Confi
}
}

func (c *GatewayProcessor) checkGatewayCodebase(gateway *gwv1.Gateway) bool {
gatewayPath := utils.GatewayCodebasePath(gateway.Namespace, gateway.Name)
parentPath := utils.GetDefaultGatewaysPath()

if !c.repoClient.CodebaseExists(gatewayPath) {
// Derive codebase only, don't commit it, the codebase will be committed when all configs are ready
if err := c.repoClient.DeriveCodebaseOnly(gatewayPath, parentPath); err != nil {
log.Error().Msgf("Failed to derive codebase %q: %s", gatewayPath, err)
return false
}
}

return true
}

func (c *GatewayProcessor) getDelItems(gatewayPath string, batch repo.Batch) ([]string, error) {
files, err := c.repoClient.ListFiles(gatewayPath)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/processor/v2/triggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (c *GatewayProcessor) IsFilterConfigReferred(kind string, config client.Obj
func (c *GatewayProcessor) IsHeadlessServiceWithoutSelector(key client.ObjectKey) bool {
service, err := c.getServiceFromCache(key)
if err != nil {
log.Error().Msgf("failed to get service from processor: %v", err)
log.Warn().Msgf("failed to get service from processor: %v", err)
return false
}

Expand Down
72 changes: 37 additions & 35 deletions pkg/manager/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ import (

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"

nsigv1alpha1 "github.com/flomesh-io/fsm/pkg/apis/namespacedingress/v1alpha1"
gwutils "github.com/flomesh-io/fsm/pkg/gateway/utils"
mutils "github.com/flomesh-io/fsm/pkg/manager/utils"

"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -144,6 +142,10 @@ func gatewaysBatch() repo.Batch {
return createBatch(constants.DefaultGatewayBasePath, fmt.Sprintf("%s/gateways", scriptsRoot))
}

func GatewaysBatch() repo.Batch {
return gatewaysBatch()
}

func createBatch(repoPath, scriptsDir string) repo.Batch {
batch := repo.Batch{
Basepath: repoPath,
Expand Down Expand Up @@ -233,9 +235,9 @@ func (r *rebuilder) rebuildRepoJob() error {
if !r.repoClient.CodebaseExists(constants.DefaultServiceBasePath) {
batches = append(batches, servicesBatch())
}
if !r.repoClient.CodebaseExists(constants.DefaultGatewayBasePath) {
batches = append(batches, gatewaysBatch())
}
//if !r.repoClient.CodebaseExists(constants.DefaultGatewayBasePath) {
// batches = append(batches, gatewaysBatch())
//}

if len(batches) > 0 {
if err := r.repoClient.Batch(batches); err != nil {
Expand Down Expand Up @@ -279,36 +281,36 @@ func (r *rebuilder) rebuildRepoJob() error {
}
}

if r.mc.IsGatewayAPIEnabled() {
defaultGatewaysPath := utils.GetDefaultGatewaysPath()
if err := r.repoClient.DeriveCodebase(defaultGatewaysPath, constants.DefaultGatewayBasePath); err != nil {
log.Error().Msgf("%q failed to derive codebase %q: %s", defaultGatewaysPath, constants.DefaultGatewayBasePath, err)
return err
}

gatewayList := &gwv1.GatewayList{}
if err := r.client.List(
context.TODO(),
gatewayList,
client.InNamespace(corev1.NamespaceAll),
); err != nil {
log.Error().Msgf("Failed to list all gateways: %s", err)
return err
}

log.Debug().Msgf("Found %d gateways", len(gatewayList.Items))

for _, gw := range gatewayList.Items {
gw := gw // fix lint GO-LOOP-REF
if gwutils.IsActiveGateway(&gw) {
gwPath := utils.GatewayCodebasePath(gw.Namespace, gw.Name)
parentPath := utils.GetDefaultGatewaysPath()
if err := r.repoClient.DeriveCodebase(gwPath, parentPath); err != nil {
return err
}
}
}
}
//if r.mc.IsGatewayAPIEnabled() {
// defaultGatewaysPath := utils.GetDefaultGatewaysPath()
// if err := r.repoClient.DeriveCodebase(defaultGatewaysPath, constants.DefaultGatewayBasePath); err != nil {
// log.Error().Msgf("%q failed to derive codebase %q: %s", defaultGatewaysPath, constants.DefaultGatewayBasePath, err)
// return err
// }

//gatewayList := &gwv1.GatewayList{}
//if err := r.client.List(
// context.TODO(),
// gatewayList,
// client.InNamespace(corev1.NamespaceAll),
//); err != nil {
// log.Error().Msgf("Failed to list all gateways: %s", err)
// return err
//}
//
//log.Debug().Msgf("Found %d gateways", len(gatewayList.Items))
//
//for _, gw := range gatewayList.Items {
// gw := gw // fix lint GO-LOOP-REF
// if gwutils.IsActiveGateway(&gw) {
// gwPath := utils.GatewayCodebasePath(gw.Namespace, gw.Name)
// parentPath := utils.GetDefaultGatewaysPath()
// if err := r.repoClient.DeriveCodebaseOnly(gwPath, parentPath); err != nil {
// return err
// }
// }
//}
//}

log.Trace().Msg("<<<<<< rebuilding repo - end >>>>>> ")
return nil
Expand Down
29 changes: 21 additions & 8 deletions pkg/repo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,22 @@ func (p *PipyRepoClient) Batch(batches []Batch) error {
return nil
}

// DeriveCodebase derives a codebase from a base codebase and commit it
func (p *PipyRepoClient) DeriveCodebase(path, base string) error {
log.Debug().Msgf("Checking if exists, codebase %q", path)
exists, _ := p.codebaseExists(path)
return p.deriveCodebaseAndCommit(path, base, true)
}

if exists {
// DeriveCodebaseOnly derives a codebase from a base codebase without committing it
func (p *PipyRepoClient) DeriveCodebaseOnly(path, base string) error {
return p.deriveCodebaseAndCommit(path, base, false)
}

func (p *PipyRepoClient) deriveCodebaseAndCommit(path, base string, commit bool) error {
if baseExists, _ := p.codebaseExists(base); !baseExists {
return fmt.Errorf("base codebase %q doesn't exist", base)
}

if exists, _ := p.codebaseExists(path); exists {
log.Debug().Msgf("Codebase %q already exists, ignore deriving ...", path)
} else {
log.Debug().Msgf("Codebase %q doesn't exist, deriving ...", path)
Expand All @@ -358,12 +369,14 @@ func (p *PipyRepoClient) DeriveCodebase(path, base string) error {
}
log.Debug().Msgf("Successfully derived codebase %q", path)

log.Debug().Msgf("Committing the changes of codebase %q", path)
if err = p.commit(path, result.Version); err != nil {
log.Error().Msgf("Committing codebase %q error: %v", path, err)
return err
if commit {
log.Debug().Msgf("Committing the changes of codebase %q", path)
if err = p.commit(path, result.Version); err != nil {
log.Error().Msgf("Committing codebase %q error: %v", path, err)
return err
}
log.Debug().Msgf("Successfully committed codebase %q", path)
}
log.Debug().Msgf("Successfully committed codebase %q", path)
}

return nil
Expand Down

0 comments on commit 1ab2bbb

Please sign in to comment.