diff --git a/pkg/data/postgres.go b/pkg/data/postgres.go index 61c3af862..7bf913e2e 100644 --- a/pkg/data/postgres.go +++ b/pkg/data/postgres.go @@ -19,18 +19,18 @@ package data import ( "bytes" "context" + "errors" "fmt" "log" - "os" + "os/exec" "strconv" "strings" api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2" cs "kubedb.dev/apimachinery/client/clientset/versioned" - "kubedb.dev/cli/pkg/lib" "github.com/spf13/cobra" - shell "gomodules.xyz/go-sh" + core "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -38,15 +38,75 @@ import ( "k8s.io/client-go/rest" "k8s.io/klog/v2" cmdutil "k8s.io/kubectl/pkg/cmd/util" - "kmodules.xyz/client-go/tools/portforward" ) const ( - pgCaFile = "/tmp/root.crt" - pgCertFile = "/tmp/postgresql.crt" - pgKeyFile = "/tmp/postgresql.key" + pgCaFile = "/tls/certs/client/ca.crt" + pgCertFile = "/tls/certs/client/client.crt" + pgKeyFile = "/tls/certs/client/client.key" + rowLimit = 100000 ) +type postgresOpts struct { + db *api.Postgres + dbImage string + config *rest.Config + client *kubernetes.Clientset + dbClient *cs.Clientset + + username string + pass string + + errWriter *bytes.Buffer +} + +func newPostgresOpts(f cmdutil.Factory, dbName, namespace string) (*postgresOpts, error) { + config, err := f.ToRESTConfig() + if err != nil { + return nil, err + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + + dbClient, err := cs.NewForConfig(config) + if err != nil { + return nil, err + } + + db, err := dbClient.KubedbV1alpha2().Postgreses(namespace).Get(context.TODO(), dbName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + if db.Status.Phase != api.DatabasePhaseReady { + return nil, fmt.Errorf("postgres %s/%s is not ready", namespace, dbName) + } + + dbVersion, err := dbClient.CatalogV1alpha1().PostgresVersions().Get(context.TODO(), db.Spec.Version, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + secret, err := client.CoreV1().Secrets(db.Namespace).Get(context.TODO(), db.Spec.AuthSecret.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + return &postgresOpts{ + db: db, + dbImage: dbVersion.Spec.DB.Image, + config: config, + client: client, + dbClient: dbClient, + username: string(secret.Data[corev1.BasicAuthUsernameKey]), + pass: string(secret.Data[corev1.BasicAuthPasswordKey]), + errWriter: &bytes.Buffer{}, + }, nil +} + func InsertPostgresDataCMD(f cmdutil.Factory) *cobra.Command { var ( dbName string @@ -79,14 +139,18 @@ func InsertPostgresDataCMD(f cmdutil.Factory) *cobra.Command { log.Fatalln(err) } - tunnel, err := lib.TunnelToDBService(opts.config, dbName, namespace, api.PostgresDatabasePort) - if err != nil { - log.Fatal("couldn't create tunnel, error: ", err) + if rows <= 0 { + log.Fatal("rows need to be greater than 0") } - defer tunnel.Close() - err = opts.insertDataExecCmd(tunnel, rows) - if err != nil { - log.Fatal(err) + + if rows <= rowLimit { + command := fmt.Sprintf(`create table if not exists appscode_kubedb_postgres_test_table (values int not null);insert into appscode_kubedb_postgres_test_table (values) values(generate_series(1,%v))`, rows) + err = opts.insertDataExecCmd(command, rows) + if err != nil { + log.Fatal(err) + } + } else { + log.Printf("Atmost %v rows can be inserted per operation", rowLimit) } }, } @@ -96,24 +160,16 @@ func InsertPostgresDataCMD(f cmdutil.Factory) *cobra.Command { return pgInsertCmd } -func (opts *postgresOpts) insertDataExecCmd(tunnel *portforward.Tunnel, rows int) error { +func (opts *postgresOpts) insertDataExecCmd(command string, rows int) error { if rows <= 0 { return fmt.Errorf("rows need to be greater than 0") } - command := ` - DO $$ BEGIN - IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'appscode_kubedb_postgres_test_table') THEN - CREATE TABLE appscode_kubedb_postgres_test_table (value int not null); - END IF; - END $$; - ` + "\n" + - fmt.Sprintf("INSERT INTO appscode_kubedb_postgres_test_table (value) values (generate_series(1,%v));", rows) - - out, err := opts.executeCommand(tunnel.Local, command) + output, err := opts.execCommand(command) if err != nil { return err } + out := string(output) if strings.Contains(strings.TrimSpace(out), strconv.Itoa(rows)) { fmt.Printf("\nSuccess! %d keys inserted in postgres database %s/%s.\n", rows, opts.db.Namespace, opts.db.Name) } else { @@ -153,14 +209,8 @@ func VerifyPostgresDataCMD(f cmdutil.Factory) *cobra.Command { if err != nil { log.Fatalln(err) } - - tunnel, err := lib.TunnelToDBService(opts.config, dbName, namespace, api.PostgresDatabasePort) - if err != nil { - log.Fatal("couldn't create tunnel, error: ", err) - } - defer tunnel.Close() - - err = opts.verifyDataExecCmd(tunnel, rows) + command := `SELECT COUNT(*) FROM appscode_kubedb_postgres_test_table` + err = opts.verifyDataExecCmd(command, rows) if err != nil { log.Fatal(err) } @@ -171,17 +221,16 @@ func VerifyPostgresDataCMD(f cmdutil.Factory) *cobra.Command { return pgVerifyCmd } -func (opts *postgresOpts) verifyDataExecCmd(tunnel *portforward.Tunnel, rows int) error { +func (opts *postgresOpts) verifyDataExecCmd(command string, rows int) error { if rows <= 0 { return fmt.Errorf("rows need to be greater than 0") } - command := "SELECT COUNT(*) FROM appscode_kubedb_postgres_test_table;" - out, err := opts.executeCommand(tunnel.Local, command) + o, err := opts.execCommand(command) if err != nil { return err } - + out := string(o) output := strings.Split(out, "\n") found := strings.TrimSpace(output[2]) @@ -225,14 +274,8 @@ func DropPostgresDataCMD(f cmdutil.Factory) *cobra.Command { if err != nil { log.Fatalln(err) } - - tunnel, err := lib.TunnelToDBService(opts.config, dbName, namespace, api.PostgresDatabasePort) - if err != nil { - log.Fatal("couldn't creat tunnel, error: ", err) - } - defer tunnel.Close() - - err = opts.dropDataExecCmd(tunnel) + command := `DROP TABLE if exists appscode_kubedb_postgres_test_table` + err = opts.dropDataExecCmd(command) if err != nil { log.Fatal(err) } @@ -242,169 +285,76 @@ func DropPostgresDataCMD(f cmdutil.Factory) *cobra.Command { return pgDropCmd } -func (opts *postgresOpts) dropDataExecCmd(tunnel *portforward.Tunnel) error { - command := ` - DO $$ BEGIN - IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'appscode_kubedb_postgres_test_table') THEN - DROP TABLE appscode_kubedb_postgres_test_table; - END IF; - END $$; - ` - - _, err := opts.executeCommand(tunnel.Local, command) +func (opts *postgresOpts) dropDataExecCmd(command string) error { + _, err := opts.execCommand(command) if err != nil { return err } fmt.Printf("\nSuccess: All the CLI inserted rows DELETED from postgres database %s/%s \n", opts.db.Namespace, opts.db.Name) - return nil } -type postgresOpts struct { - db *api.Postgres - dbImage string - config *rest.Config - client *kubernetes.Clientset - dbClient *cs.Clientset - - username string - pass string - - errWriter *bytes.Buffer -} - -func newPostgresOpts(f cmdutil.Factory, dbName, namespace string) (*postgresOpts, error) { - config, err := f.ToRESTConfig() - if err != nil { - return nil, err - } - - client, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - - dbClient, err := cs.NewForConfig(config) - if err != nil { - return nil, err - } - - db, err := dbClient.KubedbV1alpha2().Postgreses(namespace).Get(context.TODO(), dbName, metav1.GetOptions{}) - if err != nil { - return nil, err - } - - if db.Status.Phase != api.DatabasePhaseReady { - return nil, fmt.Errorf("postgres %s/%s is not ready", namespace, dbName) - } - - dbVersion, err := dbClient.CatalogV1alpha1().PostgresVersions().Get(context.TODO(), db.Spec.Version, metav1.GetOptions{}) +func (opts *postgresOpts) execCommand(command string) ([]byte, error) { + cmd, err := opts.getShellCommand(command) if err != nil { return nil, err } - - secret, err := client.CoreV1().Secrets(db.Namespace).Get(context.TODO(), db.Spec.AuthSecret.Name, metav1.GetOptions{}) + output, err := opts.runCMD(cmd) if err != nil { return nil, err } - - return &postgresOpts{ - db: db, - dbImage: dbVersion.Spec.DB.Image, - config: config, - client: client, - dbClient: dbClient, - username: string(secret.Data[corev1.BasicAuthUsernameKey]), - pass: string(secret.Data[corev1.BasicAuthPasswordKey]), - errWriter: &bytes.Buffer{}, - }, nil + return output, nil } -func (opts *postgresOpts) getDockerShellCommand(localPort int, dockerFlags, postgresExtraFlags []interface{}) (*shell.Session, error) { - sh := shell.NewSession() - sh.ShowCMD = false - sh.Stderr = opts.errWriter - +func (opts *postgresOpts) getShellCommand(command string) (string, error) { db := opts.db - dockerCommand := []interface{}{ - "run", "--network=host", - "-e", fmt.Sprintf("PGPASSWORD=%s", opts.pass), - } - dockerCommand = append(dockerCommand, dockerFlags...) + svcName := fmt.Sprintf("svc/%s", db.Name) - postgresCommand := []interface{}{ - "psql", - "--host=127.0.0.1", fmt.Sprintf("--port=%d", localPort), - fmt.Sprintf("--username=%s", opts.username), + cmd := "" + _, password, err := opts.GetPostgresAuthCredentials(db) + if err != nil { + return "", err } if db.Spec.TLS != nil { - secretName := db.CertificateName(api.PostgresClientCert) - certSecret, err := opts.client.CoreV1().Secrets(db.Namespace).Get(context.TODO(), secretName, metav1.GetOptions{}) - if err != nil { - return nil, err - } - - caCrt, ok := certSecret.Data[corev1.ServiceAccountRootCAKey] - if !ok { - return nil, fmt.Errorf("missing %s in secret %s/%s", corev1.ServiceAccountRootCAKey, certSecret.Namespace, certSecret.Name) - } - err = os.WriteFile(pgCaFile, caCrt, 0o644) - if err != nil { - return nil, err - } - - crt, ok := certSecret.Data[corev1.TLSCertKey] - if !ok { - return nil, fmt.Errorf("missing %s in secret %s/%s", corev1.TLSCertKey, certSecret.Namespace, certSecret.Name) - } - err = os.WriteFile(pgCertFile, crt, 0o644) - if err != nil { - return nil, err + if db.Spec.ClientAuthMode == api.ClientAuthModeCert { + cmd = fmt.Sprintf("kubectl exec -n %s %s -c postgres -- env PGSSLMODE='%s' PGSSLROOTCERT='%s' PGSSLCERT='%s' PGSSLKEY='%s' PGPASSWORD='%s' psql -d postgres -U postgres -c '%s'", db.Namespace, svcName, db.Spec.SSLMode, pgCaFile, pgCertFile, pgKeyFile, password, command) + } else { + cmd = fmt.Sprintf("kubectl exec -n %s %s -c postgres -- env PGSSLMODE='%s' PGSSLROOTCERT='%s' PGPASSWORD='%s' psql -d postgres -U postgres -c '%s'", db.Namespace, svcName, db.Spec.SSLMode, pgCaFile, password, command) } - - key, ok := certSecret.Data[corev1.TLSPrivateKeyKey] - if !ok { - return nil, fmt.Errorf("missing %s in secret %s/%s", corev1.TLSPrivateKeyKey, certSecret.Namespace, certSecret.Name) - } - err = os.WriteFile(pgKeyFile, key, 0o600) - if err != nil { - return nil, err - } - - dockerCommand = append(dockerCommand, - "-v", fmt.Sprintf("%s:%s", "/tmp/", "/root/.postgresql/"), - ) + } else { + cmd = fmt.Sprintf("kubectl exec -n %s %s -c postgres -- env PGSSLMODE=%s PGPASSWORD='%s' psql -d postgres -U postgres -c '%s'", db.Namespace, svcName, db.Spec.SSLMode, password, command) } - dockerCommand = append(dockerCommand, opts.dbImage) - finalCommand := append(dockerCommand, postgresCommand...) - if postgresExtraFlags != nil { - finalCommand = append(finalCommand, postgresExtraFlags...) - } - return sh.Command("docker", finalCommand...).SetStdin(os.Stdin), nil + return cmd, err } -func (opts *postgresOpts) executeCommand(localPort int, command string) (string, error) { - postgresExtraFlags := []interface{}{ - fmt.Sprintf("--command=%s", command), +func (opts *postgresOpts) runCMD(cmd string) ([]byte, error) { + sh := exec.Command("/bin/sh", "-c", cmd) + stdout := bytes.NewBuffer(nil) + stderr := bytes.NewBuffer(nil) + sh.Stdout = stdout + sh.Stderr = stderr + err := sh.Run() + out := stdout.Bytes() + errOut := stderr.Bytes() + errOutput := string(errOut) + if errOutput != "" && !strings.Contains(errOutput, "NOTICE") { + return nil, fmt.Errorf("failed to execute command, stderr: %s", errOutput) } - shSession, err := opts.getDockerShellCommand(localPort, nil, postgresExtraFlags) if err != nil { - return "", err - } - out, err := shSession.Output() - if err != nil { - return "", fmt.Errorf("failed to execute command, error: %s, output: %s\n", err, out) + return nil, err } - output := "" - if string(out) != "" { - output = string(out) + return out, nil +} + +func (fi *postgresOpts) GetPostgresAuthCredentials(db *api.Postgres) (string, string, error) { + if db.Spec.AuthSecret == nil { + return "", "", errors.New("no database secret") } - errOutput := opts.errWriter.String() - if errOutput != "" { - return "", fmt.Errorf("failed to execute command, stderr: %s%s", errOutput, output) + secret, err := fi.client.CoreV1().Secrets(db.Namespace).Get(context.TODO(), db.Spec.AuthSecret.Name, metav1.GetOptions{}) + if err != nil { + return "", "", err } - - return string(out), nil + return string(secret.Data[core.BasicAuthUsernameKey]), string(secret.Data[core.BasicAuthPasswordKey]), nil } diff --git a/vendor/gomodules.xyz/go-sh/pipe.go b/vendor/gomodules.xyz/go-sh/pipe.go index e3a22bb78..3af3a05f4 100644 --- a/vendor/gomodules.xyz/go-sh/pipe.go +++ b/vendor/gomodules.xyz/go-sh/pipe.go @@ -2,6 +2,7 @@ package sh import ( "bytes" + "fmt" "encoding/json" "encoding/xml" "errors" @@ -36,15 +37,19 @@ func (s *Session) UnmarshalXML(data interface{}) (err error) { // start command func (s *Session) Start() (err error) { + + fmt.Println("Start") s.started = true var rd *io.PipeReader var wr *io.PipeWriter var length = len(s.cmds) if s.ShowCMD { + fmt.Println("show cmd") var cmds = make([]string, 0, 4) for _, cmd := range s.cmds { cmds = append(cmds, strings.Join(cmd.Args, " ")) } + fmt.Printf("CMDS = %+v \n", cmds) s.writePrompt(strings.Join(cmds, " | ")) } for index, cmd := range s.cmds {