Skip to content

Commit

Permalink
Add CLI data features using shell command on Postgres (#721)
Browse files Browse the repository at this point in the history
Signed-off-by: souravbiswassanto <[email protected]>
  • Loading branch information
souravbiswassanto authored Sep 25, 2023
1 parent bbe4b2e commit 3b6a62e
Showing 1 changed file with 105 additions and 187 deletions.
292 changes: 105 additions & 187 deletions pkg/data/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,80 @@ import (
"context"
"fmt"
"log"
"os"
"os/exec"
"strconv"
"strings"

api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2"
cs "kubedb.dev/apimachinery/client/clientset/versioned"
"kubedb.dev/cli/pkg/lib"

"github.com/spf13/cobra"
shell "gomodules.xyz/go-sh"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"kmodules.xyz/client-go/tools/portforward"
)

const (
pgCaFile = "/tmp/root.crt"
pgCertFile = "/tmp/postgresql.crt"
pgKeyFile = "/tmp/postgresql.key"
pgCaFile = "/tls/certs/client/ca.crt"
pgCertFile = "/tls/certs/client/client.crt"
pgKeyFile = "/tls/certs/client/client.key"
rowLimit = 100000
)

type postgresOpts struct {
db *api.Postgres
config *rest.Config
client *kubernetes.Clientset
dbClient *cs.Clientset

username string
pass string
}

func newPostgresOpts(f cmdutil.Factory, dbName, namespace string) (*postgresOpts, error) {
config, err := f.ToRESTConfig()
if err != nil {
return nil, err
}

client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

dbClient, err := cs.NewForConfig(config)
if err != nil {
return nil, err
}

db, err := dbClient.KubedbV1alpha2().Postgreses(namespace).Get(context.TODO(), dbName, metav1.GetOptions{})
if err != nil {
return nil, err
}

if db.Status.Phase != api.DatabasePhaseReady {
return nil, fmt.Errorf("postgres %s/%s is not ready", namespace, dbName)
}

secret, err := client.CoreV1().Secrets(db.Namespace).Get(context.TODO(), db.Spec.AuthSecret.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

return &postgresOpts{
db: db,
config: config,
client: client,
dbClient: dbClient,
username: string(secret.Data[corev1.BasicAuthUsernameKey]),
pass: string(secret.Data[corev1.BasicAuthPasswordKey]),
}, nil
}

func InsertPostgresDataCMD(f cmdutil.Factory) *cobra.Command {
var (
dbName string
Expand Down Expand Up @@ -79,14 +127,17 @@ func InsertPostgresDataCMD(f cmdutil.Factory) *cobra.Command {
log.Fatalln(err)
}

tunnel, err := lib.TunnelToDBService(opts.config, dbName, namespace, api.PostgresDatabasePort)
if err != nil {
log.Fatal("couldn't create tunnel, error: ", err)
if rows <= 0 {
log.Fatal("rows need to be greater than 0")
}
defer tunnel.Close()
err = opts.insertDataExecCmd(tunnel, rows)
if err != nil {
log.Fatal(err)

if rows <= rowLimit {
err = opts.insertDataExecCmd(rows)
if err != nil {
log.Fatal(err)
}
} else {
log.Printf("At most %v rows can be inserted per operation", rowLimit)
}
},
}
Expand All @@ -96,21 +147,12 @@ func InsertPostgresDataCMD(f cmdutil.Factory) *cobra.Command {
return pgInsertCmd
}

func (opts *postgresOpts) insertDataExecCmd(tunnel *portforward.Tunnel, rows int) error {
func (opts *postgresOpts) insertDataExecCmd(rows int) error {
if rows <= 0 {
return fmt.Errorf("rows need to be greater than 0")
}

command := `
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'appscode_kubedb_postgres_test_table') THEN
CREATE TABLE appscode_kubedb_postgres_test_table (value int not null);
END IF;
END $$;
` + "\n" +
fmt.Sprintf("INSERT INTO appscode_kubedb_postgres_test_table (value) values (generate_series(1,%v));", rows)

out, err := opts.executeCommand(tunnel.Local, command)
command := fmt.Sprintf(`create table if not exists appscode_kubedb_postgres_test_table (values int not null);insert into appscode_kubedb_postgres_test_table (values) values(generate_series(1,%v))`, rows)
out, err := opts.execCommand(command)
if err != nil {
return err
}
Expand Down Expand Up @@ -154,13 +196,7 @@ func VerifyPostgresDataCMD(f cmdutil.Factory) *cobra.Command {
log.Fatalln(err)
}

tunnel, err := lib.TunnelToDBService(opts.config, dbName, namespace, api.PostgresDatabasePort)
if err != nil {
log.Fatal("couldn't create tunnel, error: ", err)
}
defer tunnel.Close()

err = opts.verifyDataExecCmd(tunnel, rows)
err = opts.verifyDataExecCmd(rows)
if err != nil {
log.Fatal(err)
}
Expand All @@ -171,17 +207,15 @@ func VerifyPostgresDataCMD(f cmdutil.Factory) *cobra.Command {
return pgVerifyCmd
}

func (opts *postgresOpts) verifyDataExecCmd(tunnel *portforward.Tunnel, rows int) error {
func (opts *postgresOpts) verifyDataExecCmd(rows int) error {
if rows <= 0 {
return fmt.Errorf("rows need to be greater than 0")
}

command := "SELECT COUNT(*) FROM appscode_kubedb_postgres_test_table;"
out, err := opts.executeCommand(tunnel.Local, command)
command := `SELECT COUNT(*) FROM appscode_kubedb_postgres_test_table`
out, err := opts.execCommand(command)
if err != nil {
return err
}

output := strings.Split(out, "\n")

found := strings.TrimSpace(output[2])
Expand Down Expand Up @@ -226,13 +260,7 @@ func DropPostgresDataCMD(f cmdutil.Factory) *cobra.Command {
log.Fatalln(err)
}

tunnel, err := lib.TunnelToDBService(opts.config, dbName, namespace, api.PostgresDatabasePort)
if err != nil {
log.Fatal("couldn't creat tunnel, error: ", err)
}
defer tunnel.Close()

err = opts.dropDataExecCmd(tunnel)
err = opts.dropDataExecCmd()
if err != nil {
log.Fatal(err)
}
Expand All @@ -242,169 +270,59 @@ func DropPostgresDataCMD(f cmdutil.Factory) *cobra.Command {
return pgDropCmd
}

func (opts *postgresOpts) dropDataExecCmd(tunnel *portforward.Tunnel) error {
command := `
DO $$ BEGIN
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = 'appscode_kubedb_postgres_test_table') THEN
DROP TABLE appscode_kubedb_postgres_test_table;
END IF;
END $$;
`

_, err := opts.executeCommand(tunnel.Local, command)
func (opts *postgresOpts) dropDataExecCmd() error {
command := `DROP TABLE if exists appscode_kubedb_postgres_test_table`
_, err := opts.execCommand(command)
if err != nil {
return err
}
fmt.Printf("\nSuccess: All the CLI inserted rows DELETED from postgres database %s/%s \n", opts.db.Namespace, opts.db.Name)

return nil
}

type postgresOpts struct {
db *api.Postgres
dbImage string
config *rest.Config
client *kubernetes.Clientset
dbClient *cs.Clientset

username string
pass string

errWriter *bytes.Buffer
}

func newPostgresOpts(f cmdutil.Factory, dbName, namespace string) (*postgresOpts, error) {
config, err := f.ToRESTConfig()
if err != nil {
return nil, err
}

client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

dbClient, err := cs.NewForConfig(config)
if err != nil {
return nil, err
}

db, err := dbClient.KubedbV1alpha2().Postgreses(namespace).Get(context.TODO(), dbName, metav1.GetOptions{})
if err != nil {
return nil, err
}

if db.Status.Phase != api.DatabasePhaseReady {
return nil, fmt.Errorf("postgres %s/%s is not ready", namespace, dbName)
}

dbVersion, err := dbClient.CatalogV1alpha1().PostgresVersions().Get(context.TODO(), db.Spec.Version, metav1.GetOptions{})
if err != nil {
return nil, err
}
func (opts *postgresOpts) execCommand(command string) (string, error) {
cmd := opts.getShellCommand(command)

secret, err := client.CoreV1().Secrets(db.Namespace).Get(context.TODO(), db.Spec.AuthSecret.Name, metav1.GetOptions{})
output, err := opts.runCMD(cmd)
if err != nil {
return nil, err
return "", err
}

return &postgresOpts{
db: db,
dbImage: dbVersion.Spec.DB.Image,
config: config,
client: client,
dbClient: dbClient,
username: string(secret.Data[corev1.BasicAuthUsernameKey]),
pass: string(secret.Data[corev1.BasicAuthPasswordKey]),
errWriter: &bytes.Buffer{},
}, nil
return string(output), nil
}

func (opts *postgresOpts) getDockerShellCommand(localPort int, dockerFlags, postgresExtraFlags []interface{}) (*shell.Session, error) {
sh := shell.NewSession()
sh.ShowCMD = false
sh.Stderr = opts.errWriter

func (opts *postgresOpts) getShellCommand(command string) string {
db := opts.db
dockerCommand := []interface{}{
"run", "--network=host",
"-e", fmt.Sprintf("PGPASSWORD=%s", opts.pass),
}
dockerCommand = append(dockerCommand, dockerFlags...)

postgresCommand := []interface{}{
"psql",
"--host=127.0.0.1", fmt.Sprintf("--port=%d", localPort),
fmt.Sprintf("--username=%s", opts.username),
}
svcName := fmt.Sprintf("svc/%s", db.Name)

cmd := ""
if db.Spec.TLS != nil {
secretName := db.CertificateName(api.PostgresClientCert)
certSecret, err := opts.client.CoreV1().Secrets(db.Namespace).Get(context.TODO(), secretName, metav1.GetOptions{})
if err != nil {
return nil, err
}

caCrt, ok := certSecret.Data[corev1.ServiceAccountRootCAKey]
if !ok {
return nil, fmt.Errorf("missing %s in secret %s/%s", corev1.ServiceAccountRootCAKey, certSecret.Namespace, certSecret.Name)
}
err = os.WriteFile(pgCaFile, caCrt, 0o644)
if err != nil {
return nil, err
if db.Spec.ClientAuthMode == api.ClientAuthModeCert {
cmd = fmt.Sprintf("kubectl exec -n %s %s -c postgres -- env PGSSLMODE='%s' PGSSLROOTCERT='%s' PGSSLCERT='%s' PGSSLKEY='%s' PGPASSWORD='%s' psql -d postgres -U %s -c '%s'", db.Namespace, svcName, db.Spec.SSLMode, pgCaFile, pgCertFile, pgKeyFile, opts.pass, opts.username, command)
} else {
cmd = fmt.Sprintf("kubectl exec -n %s %s -c postgres -- env PGSSLMODE='%s' PGSSLROOTCERT='%s' PGPASSWORD='%s' psql -d postgres -U %s -c '%s'", db.Namespace, svcName, db.Spec.SSLMode, pgCaFile, opts.pass, opts.username, command)
}

crt, ok := certSecret.Data[corev1.TLSCertKey]
if !ok {
return nil, fmt.Errorf("missing %s in secret %s/%s", corev1.TLSCertKey, certSecret.Namespace, certSecret.Name)
}
err = os.WriteFile(pgCertFile, crt, 0o644)
if err != nil {
return nil, err
}

key, ok := certSecret.Data[corev1.TLSPrivateKeyKey]
if !ok {
return nil, fmt.Errorf("missing %s in secret %s/%s", corev1.TLSPrivateKeyKey, certSecret.Namespace, certSecret.Name)
}
err = os.WriteFile(pgKeyFile, key, 0o600)
if err != nil {
return nil, err
}

dockerCommand = append(dockerCommand,
"-v", fmt.Sprintf("%s:%s", "/tmp/", "/root/.postgresql/"),
)
} else {
cmd = fmt.Sprintf("kubectl exec -n %s %s -c postgres -- env PGSSLMODE=%s PGPASSWORD='%s' psql -d postgres -U %s -c '%s'", db.Namespace, svcName, db.Spec.SSLMode, opts.pass, opts.username, command)
}

dockerCommand = append(dockerCommand, opts.dbImage)
finalCommand := append(dockerCommand, postgresCommand...)
if postgresExtraFlags != nil {
finalCommand = append(finalCommand, postgresExtraFlags...)
}
return sh.Command("docker", finalCommand...).SetStdin(os.Stdin), nil
return cmd
}

func (opts *postgresOpts) executeCommand(localPort int, command string) (string, error) {
postgresExtraFlags := []interface{}{
fmt.Sprintf("--command=%s", command),
}
shSession, err := opts.getDockerShellCommand(localPort, nil, postgresExtraFlags)
if err != nil {
return "", err
func (opts *postgresOpts) runCMD(cmd string) ([]byte, error) {
sh := exec.Command("/bin/sh", "-c", cmd)
stdout := bytes.NewBuffer(nil)
stderr := bytes.NewBuffer(nil)
sh.Stdout = stdout
sh.Stderr = stderr
err := sh.Run()
out := stdout.Bytes()
errOut := stderr.Bytes()
errOutput := string(errOut)
if errOutput != "" && !strings.Contains(errOutput, "NOTICE") {
return nil, fmt.Errorf("failed to execute command, stderr: %s", errOutput)
}
out, err := shSession.Output()
if err != nil {
return "", fmt.Errorf("failed to execute command, error: %s, output: %s\n", err, out)
}
output := ""
if string(out) != "" {
output = string(out)
}
errOutput := opts.errWriter.String()
if errOutput != "" {
return "", fmt.Errorf("failed to execute command, stderr: %s%s", errOutput, output)
return nil, err
}

return string(out), nil
return out, nil
}

0 comments on commit 3b6a62e

Please sign in to comment.