From 17e5747f912e6da1a24117dd5c48e921d4488832 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Thu, 12 Dec 2024 15:14:53 +0545 Subject: [PATCH] fix: timeout behavior in WaitForContainerStart --- context/context.go | 7 +++- kubernetes/dynamic.go | 86 +++++++++++++++++++++++++++---------------- 2 files changed, 60 insertions(+), 33 deletions(-) diff --git a/context/context.go b/context/context.go index 5611af70..4e6f49f6 100644 --- a/context/context.go +++ b/context/context.go @@ -336,10 +336,15 @@ func (k *Context) KubernetesRestConfig() *rest.Config { return nil } -func (k *Context) KubernetesDynamicClient() *dutyKubernetes.DynamicClient { +func (k *Context) KubernetesClient() *dutyKubernetes.Client { return dutyKubernetes.NewKubeClient(k.Kubernetes(), k.KubernetesRestConfig()) } +// Deprecated: Use KubernetesClient +func (k *Context) KubernetesDynamicClient() *dutyKubernetes.Client { + return k.KubernetesClient() +} + func (k *Context) WithKubeconfig(input types.EnvVar) (*Context, error) { if k.GetNamespace() == "" { return nil, k.Oops().Errorf("namespace is required") diff --git a/kubernetes/dynamic.go b/kubernetes/dynamic.go index 2aeff8e6..a3c9c217 100644 --- a/kubernetes/dynamic.go +++ b/kubernetes/dynamic.go @@ -31,19 +31,18 @@ import ( "k8s.io/client-go/tools/remotecommand" ) -// DynamicClient is an updated & stripped of kommons client -type DynamicClient struct { +type Client struct { client kubernetes.Interface restMapper *restmapper.DeferredDiscoveryRESTMapper dynamicClient *dynamic.DynamicClient config *rest.Config } -func NewKubeClient(client kubernetes.Interface, config *rest.Config) *DynamicClient { - return &DynamicClient{config: config, client: client} +func NewKubeClient(client kubernetes.Interface, config *rest.Config) *Client { + return &Client{config: config, client: client} } -func (c *DynamicClient) FetchResources( +func (c *Client) FetchResources( ctx context.Context, resources ...unstructured.Unstructured, ) ([]unstructured.Unstructured, error) { @@ -83,7 +82,7 @@ func (c *DynamicClient) FetchResources( return output, nil } -func (c *DynamicClient) GetClientByGroupVersionKind( +func (c *Client) GetClientByGroupVersionKind( group, version, kind string, ) (dynamic.NamespaceableResourceInterface, error) { dynamicClient, err := c.GetDynamicClient() @@ -115,7 +114,7 @@ func (c *DynamicClient) GetClientByGroupVersionKind( // example: helmchrats.helm.cattle.io & helmcharts.source.toolkit.fluxcd.io both have HelmChart as the kind. // // Use GetClientByGroupVersionKind instead. -func (c *DynamicClient) GetClientByKind(kind string) (dynamic.NamespaceableResourceInterface, error) { +func (c *Client) GetClientByKind(kind string) (dynamic.NamespaceableResourceInterface, error) { dynamicClient, err := c.GetDynamicClient() if err != nil { return nil, err @@ -135,7 +134,7 @@ func (c *DynamicClient) GetClientByKind(kind string) (dynamic.NamespaceableResou return dynamicClient.Resource(mapping.Resource), nil } -func (c *DynamicClient) DeleteByGVK(ctx context.Context, namespace, name string, gvk schema.GroupVersionKind) (bool, error) { +func (c *Client) DeleteByGVK(ctx context.Context, namespace, name string, gvk schema.GroupVersionKind) (bool, error) { client, err := c.GetClientByGroupVersionKind(gvk.Group, gvk.Version, gvk.Kind) if err != nil { return false, err @@ -151,7 +150,7 @@ func (c *DynamicClient) DeleteByGVK(ctx context.Context, namespace, name string, } // GetDynamicClient creates a new k8s client -func (c *DynamicClient) GetDynamicClient() (dynamic.Interface, error) { +func (c *Client) GetDynamicClient() (dynamic.Interface, error) { if c.dynamicClient != nil { return c.dynamicClient, nil } @@ -161,7 +160,7 @@ func (c *DynamicClient) GetDynamicClient() (dynamic.Interface, error) { return c.dynamicClient, err } -func (c *DynamicClient) GetRestMapper() (meta.RESTMapper, error) { +func (c *Client) GetRestMapper() (meta.RESTMapper, error) { if c.restMapper != nil { return c.restMapper, nil } @@ -185,7 +184,7 @@ func (c *DynamicClient) GetRestMapper() (meta.RESTMapper, error) { return c.restMapper, err } -func (c *DynamicClient) ExecutePodf( +func (c *Client) ExecutePodf( ctx context.Context, namespace, pod, container string, command ...string, @@ -228,7 +227,7 @@ func (c *DynamicClient) ExecutePodf( return _stdout, _stderr, nil } -func (c *DynamicClient) GetPodLogs(ctx context.Context, namespace, podName, container string) (io.ReadCloser, error) { +func (c *Client) GetPodLogs(ctx context.Context, namespace, podName, container string) (io.ReadCloser, error) { podLogOptions := v1.PodLogOptions{} if container != "" { podLogOptions.Container = container @@ -245,7 +244,12 @@ func (c *DynamicClient) GetPodLogs(ctx context.Context, namespace, podName, cont // WaitForPod waits for a pod to be in the specified phase, or returns an // error if the timeout is exceeded -func (c *DynamicClient) WaitForPod(ctx context.Context, namespace, name string, timeout time.Duration, phases ...v1.PodPhase) error { +func (c *Client) WaitForPod( + ctx context.Context, + namespace, name string, + timeout time.Duration, + phases ...v1.PodPhase, +) error { start := time.Now() pods := c.client.CoreV1().Pods(namespace) @@ -271,7 +275,12 @@ func (c *DynamicClient) WaitForPod(ctx context.Context, namespace, name string, } } -func (c *DynamicClient) StreamLogsV2(ctx context.Context, namespace, name string, timeout time.Duration, containerNames ...string) error { +func (c *Client) StreamLogsV2( + ctx context.Context, + namespace, name string, + timeout time.Duration, + containerNames ...string, +) error { podsClient := c.client.CoreV1().Pods(namespace) pod, err := podsClient.Get(ctx, name, metav1.GetOptions{}) if err != nil { @@ -346,33 +355,46 @@ func (c *DynamicClient) StreamLogsV2(ctx context.Context, namespace, name string } // WaitForContainerStart waits for the specified containers to be started (or any container if no names are specified) - returns an error if the timeout is exceeded -func (c *DynamicClient) WaitForContainerStart(ctx context.Context, namespace, name string, timeout time.Duration, containerNames ...string) error { - start := time.Now() +func (c *Client) WaitForContainerStart( + ctx context.Context, + namespace, name string, + timeout time.Duration, + containerNames ...string, +) error { + timeoutTimer := time.NewTimer(timeout) + defer timeoutTimer.Stop() podsClient := c.client.CoreV1().Pods(namespace) for { - pod, err := podsClient.Get(ctx, name, metav1.GetOptions{}) - if err != nil { - if apiErrors.IsNotFound(err) { - time.Sleep(time.Second) - continue - } + select { + case <-timeoutTimer.C: + return fmt.Errorf("timeout exceeded waiting for %s", name) - return err - } + case <-ctx.Done(): + return ctx.Err() - if start.Add(timeout).Before(time.Now()) { - return fmt.Errorf("timeout exceeded waiting for %s is %s, error: %v", name, pod.Status.Phase, err) - } + default: + pod, err := podsClient.Get(ctx, name, metav1.GetOptions{}) + if err != nil { + if apiErrors.IsNotFound(err) { + time.Sleep(time.Second) + continue + } - for _, container := range append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...) { - if len(containerNames) > 0 && !lo.Contains(containerNames, container.Name) { - continue + return err } - if container.State.Running != nil || container.State.Terminated != nil { - return nil + for _, container := range append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...) { + if len(containerNames) > 0 && !lo.Contains(containerNames, container.Name) { + continue + } + + if container.State.Running != nil || container.State.Terminated != nil { + return nil + } } + + time.Sleep(time.Second) } } }