diff --git a/internal/actions/dump/dump.go b/internal/actions/dump/dump.go index 3820a7a3..1afdcd0a 100644 --- a/internal/actions/dump/dump.go +++ b/internal/actions/dump/dump.go @@ -92,6 +92,7 @@ func (action Dump) Run(ctx context.Context) (err error) { plogger, false, nil, + 0, ); err != nil { return err } diff --git a/internal/actions/exec/exec.go b/internal/actions/exec/exec.go index 74f521cc..778b8b0c 100644 --- a/internal/actions/exec/exec.go +++ b/internal/actions/exec/exec.go @@ -3,6 +3,7 @@ package exec import ( "context" "os" + "time" "github.com/clevyr/kubedb/internal/command" "github.com/clevyr/kubedb/internal/config" @@ -43,6 +44,7 @@ func (action Exec) Run(ctx context.Context) error { os.Stderr, t.IsTerminalIn(), sizeQueue, + 5*time.Second, ) }) } diff --git a/internal/actions/restore/restore.go b/internal/actions/restore/restore.go index 4b1a900a..9b27541f 100644 --- a/internal/actions/restore/restore.go +++ b/internal/actions/restore/restore.go @@ -202,6 +202,7 @@ func (action Restore) runInDatabasePod(ctx context.Context, r *io.PipeReader, st stderr, false, nil, + 0, ); err != nil { return err } diff --git a/internal/config/flags/database.go b/internal/config/flags/database.go index d2a5468e..782d392d 100644 --- a/internal/config/flags/database.go +++ b/internal/config/flags/database.go @@ -3,6 +3,7 @@ package flags import ( "os" "strings" + "time" "github.com/clevyr/kubedb/internal/config" "github.com/clevyr/kubedb/internal/database/dialect" @@ -184,7 +185,7 @@ func listDatabases(cmd *cobra.Command, args []string, toComplete string) ([]stri func queryInDatabase(cmd *cobra.Command, args []string, conf config.Exec) ([]string, cobra.ShellCompDirective) { var buf strings.Builder sqlCmd := conf.Dialect.ExecCommand(conf) - err := conf.Client.Exec(cmd.Context(), conf.Pod, sqlCmd.String(), nil, &buf, os.Stderr, false, nil) + err := conf.Client.Exec(cmd.Context(), conf.Pod, sqlCmd.String(), nil, &buf, os.Stderr, false, nil, 5*time.Second) if err != nil { return nil, cobra.ShellCompDirectiveError } diff --git a/internal/database/dialect/postgres.go b/internal/database/dialect/postgres.go index 11fea89a..657d8759 100644 --- a/internal/database/dialect/postgres.go +++ b/internal/database/dialect/postgres.go @@ -6,6 +6,7 @@ import ( "io" "os" "strings" + "time" "github.com/clevyr/kubedb/internal/command" "github.com/clevyr/kubedb/internal/config" @@ -80,7 +81,7 @@ func (Postgres) FilterPods(ctx context.Context, client kubernetes.KubeClient, po ) var buf strings.Builder - err := client.Exec(ctx, pods[0], cmd.String(), strings.NewReader(""), &buf, os.Stderr, false, nil) + err := client.Exec(ctx, pods[0], cmd.String(), strings.NewReader(""), &buf, os.Stderr, false, nil, 5*time.Second) if err != nil { return pods, err } diff --git a/internal/kubernetes/pod.go b/internal/kubernetes/pod.go index 6436e9ae..45053806 100644 --- a/internal/kubernetes/pod.go +++ b/internal/kubernetes/pod.go @@ -5,12 +5,15 @@ import ( "errors" "fmt" "io" + "net/http" + "time" log "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/httpstream/spdy" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" ) @@ -31,7 +34,7 @@ func (client KubeClient) GetNamespacedPods(ctx context.Context) (*v1.PodList, er return pods, nil } -func (client KubeClient) Exec(ctx context.Context, pod v1.Pod, cmd string, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error { +func (client KubeClient) Exec(ctx context.Context, pod v1.Pod, cmd string, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue, pingPeriod time.Duration) error { req := client.ClientSet.CoreV1().RESTClient().Post(). Resource("pods"). Namespace(client.Namespace). @@ -44,7 +47,28 @@ func (client KubeClient) Exec(ctx context.Context, pod v1.Pod, cmd string, stdin Stderr: stderr != nil, TTY: tty, }, scheme.ParameterCodec) - exec, err := remotecommand.NewSPDYExecutor(client.ClientConfig, "POST", req.URL()) + + tlsConfig, err := rest.TLSConfigFor(client.ClientConfig) + if err != nil { + return err + } + proxy := http.ProxyFromEnvironment + if client.ClientConfig.Proxy != nil { + proxy = client.ClientConfig.Proxy + } + upgradeRoundTripper := spdy.NewRoundTripperWithConfig(spdy.RoundTripperConfig{ + TLS: tlsConfig, + Proxier: proxy, + // Needs to be 0 for dump/restore to prevent unexpected EOF. + // See https://github.com/kubernetes/kubernetes/issues/60140#issuecomment-1411477275 + PingPeriod: pingPeriod, + }) + wrapper, err := rest.HTTPWrappersForConfig(client.ClientConfig, upgradeRoundTripper) + if err != nil { + return err + } + + exec, err := remotecommand.NewSPDYExecutorForTransports(wrapper, upgradeRoundTripper, "POST", req.URL()) if err != nil { return err }