diff --git a/pgbouncer/client.go b/pgbouncer/client.go index 85476775a..539de8919 100644 --- a/pgbouncer/client.go +++ b/pgbouncer/client.go @@ -17,7 +17,11 @@ limitations under the License. package pgbouncer import ( + "context" "database/sql" + "sync" + + api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2" "xorm.io/xorm" ) @@ -29,3 +33,14 @@ type Client struct { type XormClient struct { *xorm.Engine } + +type XormClientList struct { + List []*XormClient + Mutex sync.Mutex + WG sync.WaitGroup + + context context.Context + pb *api.PgBouncer + auth *Auth + dbName string +} diff --git a/pgbouncer/kubedb_client_builder.go b/pgbouncer/kubedb_client_builder.go index 1f6c6935e..337aaf354 100644 --- a/pgbouncer/kubedb_client_builder.go +++ b/pgbouncer/kubedb_client_builder.go @@ -18,162 +18,210 @@ package pgbouncer import ( "context" - "database/sql" - "errors" "fmt" api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2" _ "github.com/lib/pq" core "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" - "kmodules.xyz/client-go/tools/certholder" + appbinding "kmodules.xyz/custom-resources/apis/appcatalog/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/client" "xorm.io/xorm" ) const ( - DefaultPgBouncerDB = "pgbouncer" + DefaultBackendDBType = "postgres" + TLSModeDisable = "disable" ) +type Auth struct { + UserName string + Password string +} + type KubeDBClientBuilder struct { - kc client.Client - db *api.PgBouncer - url string - podName string - pgBouncerDB string + kc client.Client + pgbouncer *api.PgBouncer + url string + podName string + backendDBName string + ctx context.Context + databaseRef *api.Database + auth *Auth } -func NewKubeDBClientBuilder(kc client.Client, db *api.PgBouncer) *KubeDBClientBuilder { +func NewKubeDBClientBuilder(kc client.Client, pb *api.PgBouncer) *KubeDBClientBuilder { return &KubeDBClientBuilder{ - kc: kc, - db: db, + kc: kc, + pgbouncer: pb, } } +func (o *KubeDBClientBuilder) WithURL(url string) *KubeDBClientBuilder { + o.url = url + return o +} + +func (o *KubeDBClientBuilder) WithAuth(auth *Auth) *KubeDBClientBuilder { + if auth != nil && auth.UserName != "" && auth.Password != "" { + o.auth = auth + } + return o +} + func (o *KubeDBClientBuilder) WithPod(podName string) *KubeDBClientBuilder { o.podName = podName return o } -func (o *KubeDBClientBuilder) WithURL(url string) *KubeDBClientBuilder { - o.url = url +func (o *KubeDBClientBuilder) WithDatabaseRef(db *api.Database) *KubeDBClientBuilder { + o.databaseRef = db return o } -func (o *KubeDBClientBuilder) WithPgBouncerDB(pgDB string) *KubeDBClientBuilder { - o.pgBouncerDB = pgDB +func (o *KubeDBClientBuilder) WithPostgresDBName(dbName string) *KubeDBClientBuilder { + if dbName == "" { + o.backendDBName = o.databaseRef.DatabaseName + } else { + o.backendDBName = dbName + } return o } -func (o *KubeDBClientBuilder) GetPgbouncerXormClient(ctx context.Context) (*XormClient, error) { - connector, err := o.getConnectionString(ctx) +func (o *KubeDBClientBuilder) WithContext(ctx context.Context) *KubeDBClientBuilder { + o.ctx = ctx + return o +} + +func (o *KubeDBClientBuilder) GetPgBouncerXormClient() (*XormClient, error) { + if o.ctx == nil { + o.ctx = context.Background() + } + + connector, err := o.getConnectionString() if err != nil { return nil, err } - engine, err := xorm.NewEngine("postgres", connector) + engine, err := xorm.NewEngine(DefaultBackendDBType, connector) if err != nil { - return nil, fmt.Errorf("failed to generate pgbouncer client using connection string: %v", err) + return nil, err } - _, err = engine.Query("SHOW HELP") - if err != nil { - return nil, fmt.Errorf("failed to run query: %v", err) + if engine == nil { + return nil, fmt.Errorf("Xorm Engine can't be build for pgbouncer") } - return &XormClient{engine}, nil + + engine.SetDefaultContext(o.ctx) + return &XormClient{ + engine, + }, nil } func (o *KubeDBClientBuilder) getURL() string { - return fmt.Sprintf("%s.%s.%s.svc", o.podName, o.db.GoverningServiceName(), o.db.Namespace) + return fmt.Sprintf("%s.%s.%s.svc", o.podName, o.pgbouncer.GoverningServiceName(), o.pgbouncer.Namespace) } -func (o *KubeDBClientBuilder) getPgBouncerAuthCredentials(ctx context.Context) (string, string, error) { - if o.db.Spec.AuthSecret == nil { - return "", "", errors.New("no database secret") +func (o *KubeDBClientBuilder) getBackendAuth() (string, string, error) { + if o.auth != nil { + return o.auth.UserName, o.auth.Password, nil } - var secret core.Secret - err := o.kc.Get(ctx, client.ObjectKey{Namespace: o.db.Namespace, Name: o.db.Spec.AuthSecret.Name}, &secret) + + db := o.databaseRef + + if db == nil { + return "", "", fmt.Errorf("there is no DatabaseReference found for pgBouncer %s/%s", o.pgbouncer.Namespace, o.pgbouncer.Name) + } + appBinding := &appbinding.AppBinding{} + err := o.kc.Get(o.ctx, types.NamespacedName{ + Name: db.DatabaseRef.Name, + Namespace: db.DatabaseRef.Namespace, + }, appBinding) if err != nil { return "", "", err } - return string(secret.Data[core.BasicAuthUsernameKey]), string(secret.Data[core.BasicAuthPasswordKey]), nil -} + if appBinding.Spec.Secret == nil { + return "", "", fmt.Errorf("backend postgres auth secret unspecified for pgBouncer %s/%s", o.pgbouncer.Namespace, o.pgbouncer.Name) + } -func (o *KubeDBClientBuilder) GetPgBouncerClient(ctx context.Context) (*Client, error) { - connector, err := o.getConnectionString(ctx) + var secret core.Secret + err = o.kc.Get(o.ctx, client.ObjectKey{Namespace: appBinding.Namespace, Name: appBinding.Spec.Secret.Name}, &secret) if err != nil { - return nil, err + return "", "", err } - // connect to database - db, err := sql.Open("postgres", connector) - if err != nil { - return nil, err + + user, present := secret.Data[core.BasicAuthUsernameKey] + if !present { + return "", "", fmt.Errorf("error getting backend username") } - // ping to database to check the connection - if _, err := db.QueryContext(ctx, "SHOW HELP;"); err != nil { - closeErr := db.Close() - if closeErr != nil { - klog.Errorf("Failed to close client. error: %v", closeErr) - } - return nil, err + pass, present := secret.Data[core.BasicAuthPasswordKey] + if !present { + return "", "", fmt.Errorf("error getting backend password") } - return &Client{db}, nil + return string(user), string(pass), nil } -func (o *KubeDBClientBuilder) getConnectionString(ctx context.Context) (string, error) { +func (o *KubeDBClientBuilder) getConnectionString() (string, error) { + user, pass, err := o.getBackendAuth() + if err != nil { + return "", err + } + if o.podName != "" { o.url = o.getURL() } - dnsName := o.url - port := api.PgBouncerDatabasePort - if o.pgBouncerDB == "" { - o.pgBouncerDB = DefaultPgBouncerDB + var listeningPort int = api.PgBouncerDatabasePort + if o.pgbouncer.Spec.ConnectionPool.Port != nil { + listeningPort = int(*o.pgbouncer.Spec.ConnectionPool.Port) } + // TODO ssl mode is disable now need to work on this after adding tls support + connector := fmt.Sprintf("user=%s password=%s host=%s port=%d connect_timeout=10 dbname=%s sslmode=%s", user, pass, o.url, listeningPort, o.backendDBName, TLSModeDisable) + return connector, nil +} - user, pass, err := o.getPgBouncerAuthCredentials(ctx) - if err != nil { - return "", fmt.Errorf("DB basic auth is not found for PgBouncer %v/%v", o.db.Namespace, o.db.Name) - } - cnnstr := "" - sslMode := o.db.Spec.SSLMode - if sslMode == "" { - sslMode = api.PgBouncerSSLModeDisable +func GetXormClientList(kc client.Client, pb *api.PgBouncer, ctx context.Context, auth *Auth, dbName string) (*XormClientList, error) { + clientlist := &XormClientList{ + List: []*XormClient{}, } + clientlist.context = ctx + clientlist.pb = pb + clientlist.auth = auth + clientlist.dbName = dbName - if o.db.Spec.TLS != nil { - paths, err := o.getTLSConfig(ctx) + for i := 0; int32(i) < *pb.Spec.Replicas; i++ { + podName := fmt.Sprintf("%s-%d", pb.OffshootName(), i) + pod := core.Pod{} + err := kc.Get(ctx, types.NamespacedName{Name: podName, Namespace: pb.Namespace}, &pod) if err != nil { - return "", err + return clientlist, err } - if o.db.Spec.ConnectionPool.AuthType == api.PgBouncerClientAuthModeCert || o.db.Spec.SSLMode == api.PgBouncerSSLModeVerifyCA || o.db.Spec.SSLMode == api.PgBouncerSSLModeVerifyFull { - cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%d connect_timeout=15 dbname=%s sslmode=%s sslrootcert=%s sslcert=%s sslkey=%s", user, pass, dnsName, port, o.pgBouncerDB, sslMode, paths.CACert, paths.Cert, paths.Key) - } else { - cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%d connect_timeout=15 dbname=%s sslmode=%s sslrootcert=%s", user, pass, dnsName, port, o.pgBouncerDB, sslMode, paths.CACert) - } - } else { - cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%d connect_timeout=15 dbname=%s sslmode=%s", user, pass, dnsName, port, o.pgBouncerDB, sslMode) + clientlist.Mutex.Lock() + clientlist.WG.Add(1) + clientlist.Mutex.Unlock() + go clientlist.addXormClient(kc, podName) } - return cnnstr, nil -} -func (o *KubeDBClientBuilder) getTLSConfig(ctx context.Context) (*certholder.Paths, error) { - secretName := o.db.GetCertSecretName(api.PgBouncerClientCert) + clientlist.WG.Wait() - var certSecret core.Secret - err := o.kc.Get(ctx, client.ObjectKey{Namespace: o.db.Namespace, Name: secretName}, &certSecret) - if err != nil { - klog.Error(err, "failed to get certificate secret.", secretName) - return nil, err + if len(clientlist.List) != int(*pb.Spec.Replicas) { + return clientlist, fmt.Errorf("Failed to generate Xorm Client List") } - certs, _ := certholder.DefaultHolder.ForResource(api.SchemeGroupVersion.WithResource(api.ResourcePluralPgBouncer), o.db.ObjectMeta) - paths, err := certs.Save(&certSecret) + return clientlist, nil +} + +func (l *XormClientList) addXormClient(kc client.Client, podName string) { + xormClient, err := NewKubeDBClientBuilder(kc, l.pb).WithContext(l.context).WithDatabaseRef(&l.pb.Spec.Database).WithPod(podName).WithAuth(l.auth).WithPostgresDBName(l.dbName).GetPgBouncerXormClient() + l.Mutex.Lock() + defer l.Mutex.Unlock() if err != nil { - klog.Error(err, "failed to save certificate") - return nil, err + klog.V(5).ErrorS(err, fmt.Sprintf("failed to create xorm client for pgbouncer %s/%s ", l.pb.Namespace, l.pb.Name)) + } else { + l.List = append(l.List, xormClient) } - return paths, nil + l.WG.Done() }