Skip to content

Commit

Permalink
fix: ensure restore configuration points to manager wal-restore (#68)
Browse files Browse the repository at this point in the history
Signed-off-by: Armando Ruocco <[email protected]>
Signed-off-by: Leonardo Cecchi <[email protected]>
Signed-off-by: Francesco Canovai <[email protected]>
Co-authored-by: Leonardo Cecchi <[email protected]>
Co-authored-by: Francesco Canovai <[email protected]>
  • Loading branch information
3 people authored Nov 28, 2024
1 parent 74d4f5d commit afd4603
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 72 deletions.
2 changes: 2 additions & 0 deletions internal/cmd/instance/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func NewCmd() *cobra.Command {
"cluster-name",
"pod-name",
"spool-directory",
"server-name",
}

for _, k := range requiredSettings {
Expand All @@ -40,6 +41,7 @@ func NewCmd() *cobra.Command {
_ = viper.BindEnv("pod-name", "POD_NAME")
_ = viper.BindEnv("pgdata", "PGDATA")
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
_ = viper.BindEnv("server-name", "SERVER_NAME")

return cmd
}
4 changes: 4 additions & 0 deletions internal/cmd/restore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func NewCmd() *cobra.Command {
"cluster-name",
"pod-name",
"spool-directory",
"barman-object-name",
"server-name",
}

for _, k := range requiredSettings {
Expand All @@ -40,6 +42,8 @@ func NewCmd() *cobra.Command {
_ = viper.BindEnv("pod-name", "POD_NAME")
_ = viper.BindEnv("pgdata", "PGDATA")
_ = viper.BindEnv("spool-directory", "SPOOL_DIRECTORY")
_ = viper.BindEnv("barman-object-name", "BARMAN_OBJECT_NAME")
_ = viper.BindEnv("server-name", "SERVER_NAME")

return cmd
}
32 changes: 6 additions & 26 deletions internal/cnpgi/instance/wal.go → internal/cnpgi/common/wal.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package instance
package common

import (
"context"
Expand All @@ -19,12 +19,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/common"
"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata"
)

// WALServiceImplementation is the implementation of the WAL Service
type WALServiceImplementation struct {
ServerName string
BarmanObjectKey client.ObjectKey
ClusterObjectKey client.ObjectKey
Client client.Client
Expand Down Expand Up @@ -73,16 +73,6 @@ func (w WALServiceImplementation) Archive(
return nil, err
}

// TODO: refactor this code elsewhere
serverName := cluster.Name
for _, plugin := range cluster.Spec.Plugins {
if plugin.IsEnabled() && plugin.Name == metadata.PluginName {
if pluginServerName, ok := plugin.Parameters["serverName"]; ok {
serverName = pluginServerName
}
}
}

var objectStore barmancloudv1.ObjectStore
if err := w.Client.Get(ctx, w.BarmanObjectKey, &objectStore); err != nil {
return nil, err
Expand Down Expand Up @@ -112,7 +102,7 @@ func (w WALServiceImplementation) Archive(
return nil, err
}

options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, serverName)
options, err := arch.BarmanCloudWalArchiveOptions(ctx, &objectStore.Spec.Configuration, w.ServerName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -152,7 +142,7 @@ func (w WALServiceImplementation) Restore(

barmanConfiguration := &objectStore.Spec.Configuration

env := common.GetRestoreCABundleEnv(barmanConfiguration)
env := GetRestoreCABundleEnv(barmanConfiguration)
credentialsEnv, err := barmanCredentials.EnvSetBackupCloudCredentials(
ctx,
w.Client,
Expand All @@ -163,19 +153,9 @@ func (w WALServiceImplementation) Restore(
if err != nil {
return nil, fmt.Errorf("while getting recover credentials: %w", err)
}
env = common.MergeEnv(env, credentialsEnv)

// TODO: refactor this code elsewhere
serverName := cluster.Name
for _, plugin := range cluster.Spec.Plugins {
if plugin.IsEnabled() && plugin.Name == metadata.PluginName {
if pluginServerName, ok := plugin.Parameters["serverName"]; ok {
serverName = pluginServerName
}
}
}
env = MergeEnv(env, credentialsEnv)

options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, serverName)
options, err := barmanCommand.CloudWalRestoreOptions(ctx, barmanConfiguration, w.ServerName)
if err != nil {
return nil, fmt.Errorf("while getting barman-cloud-wal-restore options: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package instance
package common

import (
"errors"
Expand Down
14 changes: 3 additions & 11 deletions internal/cnpgi/instance/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type BackupServiceImplementation struct {
ClusterObjectKey client.ObjectKey
Client client.Client
InstanceName string
ServerName string
backup.UnimplementedBackupServer
}

Expand Down Expand Up @@ -111,21 +112,12 @@ func (b BackupServiceImplementation) Backup(
return nil, err
}

serverName := cluster.Name
for _, plugin := range cluster.Spec.Plugins {
if plugin.IsEnabled() && plugin.Name == metadata.PluginName {
if pluginServerName, ok := plugin.Parameters["serverName"]; ok {
serverName = pluginServerName
}
}
}

backupName := fmt.Sprintf("backup-%v", pgTime.ToCompactISO8601(time.Now()))

if err = backupCmd.Take(
ctx,
backupName,
serverName,
b.ServerName,
env,
barmanCloudExecutor{},
postgres.BackupTemporaryDirectory,
Expand All @@ -137,7 +129,7 @@ func (b BackupServiceImplementation) Backup(
executedBackupInfo, err := backupCmd.GetExecutedBackupInfo(
ctx,
backupName,
serverName,
b.ServerName,
barmanCloudExecutor{},
env)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/cnpgi/instance/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func Start(ctx context.Context) error {
Name: clusterName,
},
BarmanObjectKey: barmanObjectKey,
ServerName: viper.GetString("server-name"),
InstanceName: podName,
// TODO: improve
PGDataPath: viper.GetString("pgdata"),
Expand Down
7 changes: 6 additions & 1 deletion internal/cnpgi/instance/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (
"github.com/cloudnative-pg/cnpg-i/pkg/wal"
"google.golang.org/grpc"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/common"
)

// CNPGI is the implementation of the PostgreSQL sidecar
type CNPGI struct {
Client client.Client
BarmanObjectKey client.ObjectKey
ServerName string
ClusterObjectKey client.ObjectKey
PGDataPath string
PGWALPath string
Expand All @@ -26,9 +29,10 @@ type CNPGI struct {
// Start starts the GRPC service
func (c *CNPGI) Start(ctx context.Context) error {
enrich := func(server *grpc.Server) error {
wal.RegisterWALServer(server, WALServiceImplementation{
wal.RegisterWALServer(server, common.WALServiceImplementation{
BarmanObjectKey: c.BarmanObjectKey,
ClusterObjectKey: c.ClusterObjectKey,
ServerName: c.ServerName,
InstanceName: c.InstanceName,
Client: c.Client,
SpoolDirectory: c.SpoolDirectory,
Expand All @@ -38,6 +42,7 @@ func (c *CNPGI) Start(ctx context.Context) error {
backup.RegisterBackupServer(server, BackupServiceImplementation{
Client: c.Client,
BarmanObjectKey: c.BarmanObjectKey,
ServerName: c.ServerName,
ClusterObjectKey: c.ClusterObjectKey,
InstanceName: c.InstanceName,
})
Expand Down
11 changes: 11 additions & 0 deletions internal/cnpgi/operator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (e *ConfigurationError) IsEmpty() bool {
// PluginConfiguration is the configuration of the plugin
type PluginConfiguration struct {
BarmanObjectName string
ServerName string
RecoveryBarmanObjectName string
RecoveryBarmanServerName string
}
Expand All @@ -56,6 +57,15 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration {
metadata.PluginName,
)

serverName := cluster.Name
for _, plugin := range cluster.Spec.Plugins {
if plugin.IsEnabled() && plugin.Name == metadata.PluginName {
if pluginServerName, ok := plugin.Parameters["serverName"]; ok {
serverName = pluginServerName
}
}
}

recoveryServerName := ""
recoveryBarmanObjectName := ""

Expand All @@ -70,6 +80,7 @@ func NewFromCluster(cluster *cnpgv1.Cluster) *PluginConfiguration {
result := &PluginConfiguration{
// used for the backup/archive
BarmanObjectName: helper.Parameters["barmanObjectName"],
ServerName: serverName,
// used for restore/wal_restore
RecoveryBarmanServerName: recoveryServerName,
RecoveryBarmanObjectName: recoveryBarmanObjectName,
Expand Down
23 changes: 19 additions & 4 deletions internal/cnpgi/operator/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ func reconcileJob(
return nil, nil
}

// Since we're recovering from an existing object store,
// we set our primary object store name to the recovery one.
// This won't be needed anymore when wal-restore will be able
// to check two object stores
pluginConfiguration.BarmanObjectName = pluginConfiguration.RecoveryBarmanObjectName
pluginConfiguration.ServerName = pluginConfiguration.RecoveryBarmanServerName

var job batchv1.Job
if err := decoder.DecodeObject(
request.GetObjectDefinition(),
Expand Down Expand Up @@ -175,10 +182,14 @@ func reconcilePod(

mutatedPod := pod.DeepCopy()

if err := reconcilePodSpec(pluginConfiguration, cluster, &mutatedPod.Spec, "postgres", corev1.Container{
Args: []string{"instance"},
}); err != nil {
return nil, fmt.Errorf("while reconciling pod spec for pod: %w", err)
if len(pluginConfiguration.BarmanObjectName) != 0 {
if err := reconcilePodSpec(pluginConfiguration, cluster, &mutatedPod.Spec, "postgres", corev1.Container{
Args: []string{"instance"},
}); err != nil {
return nil, fmt.Errorf("while reconciling pod spec for pod: %w", err)
}
} else {
contextLogger.Debug("No need to mutate instance with no backup & archiving configuration")
}

patch, err := object.CreatePatch(mutatedPod, pod)
Expand Down Expand Up @@ -212,6 +223,10 @@ func reconcilePodSpec(
Name: "BARMAN_OBJECT_NAME",
Value: cfg.BarmanObjectName,
},
{
Name: "SERVER_NAME",
Value: cfg.ServerName,
},
{
// TODO: should we really use this one?
// should we mount an emptyDir volume just for that?
Expand Down
7 changes: 7 additions & 0 deletions internal/cnpgi/restore/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ func (i IdentityImplementation) GetPluginCapabilities(
},
},
},
{
Type: &identity.PluginCapability_Service_{
Service: &identity.PluginCapability_Service{
Type: identity.PluginCapability_Service_TYPE_WAL_SERVICE,
},
},
},
},
}, nil
}
Expand Down
20 changes: 18 additions & 2 deletions internal/cnpgi/restore/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func Start(ctx context.Context) error {
setupLog.Info("Starting barman cloud instance plugin")
namespace := viper.GetString("namespace")
clusterName := viper.GetString("cluster-name")
boName := viper.GetString("barman-object-name")

objs := map[client.Object]cache.ByObject{
&cnpgv1.Cluster{}: {
Expand All @@ -43,6 +44,15 @@ func Start(ctx context.Context) error {
},
}

if boName != "" {
objs[&barmancloudv1.ObjectStore{}] = cache.ByObject{
Field: fields.OneTermEqualSelector("metadata.name", boName),
Namespaces: map[string]cache.Config{
namespace: {},
},
}
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Cache: cache.Options{
Expand All @@ -65,12 +75,18 @@ func Start(ctx context.Context) error {
if err := mgr.Add(&CNPGI{
PluginPath: viper.GetString("plugin-path"),
SpoolDirectory: viper.GetString("spool-directory"),
BarmanObjectKey: client.ObjectKey{
Namespace: namespace,
Name: boName,
},
ClusterObjectKey: client.ObjectKey{
Namespace: namespace,
Name: clusterName,
},
Client: mgr.GetClient(),
PGDataPath: viper.GetString("pgdata"),
Client: mgr.GetClient(),
PGDataPath: viper.GetString("pgdata"),
InstanceName: viper.GetString("pod-name"),
ServerName: viper.GetString("server-name"),
}); err != nil {
setupLog.Error(err, "unable to create CNPGI runnable")
return err
Expand Down
Loading

0 comments on commit afd4603

Please sign in to comment.