Skip to content

Commit

Permalink
fix: timeout behavior in WaitForContainerStart
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Dec 12, 2024
1 parent a005782 commit 17e5747
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 33 deletions.
7 changes: 6 additions & 1 deletion context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,15 @@ func (k *Context) KubernetesRestConfig() *rest.Config {
return nil
}

func (k *Context) KubernetesDynamicClient() *dutyKubernetes.DynamicClient {
func (k *Context) KubernetesClient() *dutyKubernetes.Client {
return dutyKubernetes.NewKubeClient(k.Kubernetes(), k.KubernetesRestConfig())
}

// Deprecated: Use KubernetesClient
func (k *Context) KubernetesDynamicClient() *dutyKubernetes.Client {
return k.KubernetesClient()
}

func (k *Context) WithKubeconfig(input types.EnvVar) (*Context, error) {
if k.GetNamespace() == "" {
return nil, k.Oops().Errorf("namespace is required")
Expand Down
86 changes: 54 additions & 32 deletions kubernetes/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,18 @@ import (
"k8s.io/client-go/tools/remotecommand"
)

// DynamicClient is an updated & stripped of kommons client
type DynamicClient struct {
type Client struct {
client kubernetes.Interface
restMapper *restmapper.DeferredDiscoveryRESTMapper
dynamicClient *dynamic.DynamicClient
config *rest.Config
}

func NewKubeClient(client kubernetes.Interface, config *rest.Config) *DynamicClient {
return &DynamicClient{config: config, client: client}
func NewKubeClient(client kubernetes.Interface, config *rest.Config) *Client {
return &Client{config: config, client: client}
}

func (c *DynamicClient) FetchResources(
func (c *Client) FetchResources(
ctx context.Context,
resources ...unstructured.Unstructured,
) ([]unstructured.Unstructured, error) {
Expand Down Expand Up @@ -83,7 +82,7 @@ func (c *DynamicClient) FetchResources(
return output, nil
}

func (c *DynamicClient) GetClientByGroupVersionKind(
func (c *Client) GetClientByGroupVersionKind(
group, version, kind string,
) (dynamic.NamespaceableResourceInterface, error) {
dynamicClient, err := c.GetDynamicClient()
Expand Down Expand Up @@ -115,7 +114,7 @@ func (c *DynamicClient) GetClientByGroupVersionKind(
// example: helmchrats.helm.cattle.io & helmcharts.source.toolkit.fluxcd.io both have HelmChart as the kind.
//
// Use GetClientByGroupVersionKind instead.
func (c *DynamicClient) GetClientByKind(kind string) (dynamic.NamespaceableResourceInterface, error) {
func (c *Client) GetClientByKind(kind string) (dynamic.NamespaceableResourceInterface, error) {
dynamicClient, err := c.GetDynamicClient()
if err != nil {
return nil, err
Expand All @@ -135,7 +134,7 @@ func (c *DynamicClient) GetClientByKind(kind string) (dynamic.NamespaceableResou
return dynamicClient.Resource(mapping.Resource), nil
}

func (c *DynamicClient) DeleteByGVK(ctx context.Context, namespace, name string, gvk schema.GroupVersionKind) (bool, error) {
func (c *Client) DeleteByGVK(ctx context.Context, namespace, name string, gvk schema.GroupVersionKind) (bool, error) {
client, err := c.GetClientByGroupVersionKind(gvk.Group, gvk.Version, gvk.Kind)
if err != nil {
return false, err
Expand All @@ -151,7 +150,7 @@ func (c *DynamicClient) DeleteByGVK(ctx context.Context, namespace, name string,
}

// GetDynamicClient creates a new k8s client
func (c *DynamicClient) GetDynamicClient() (dynamic.Interface, error) {
func (c *Client) GetDynamicClient() (dynamic.Interface, error) {
if c.dynamicClient != nil {
return c.dynamicClient, nil
}
Expand All @@ -161,7 +160,7 @@ func (c *DynamicClient) GetDynamicClient() (dynamic.Interface, error) {
return c.dynamicClient, err
}

func (c *DynamicClient) GetRestMapper() (meta.RESTMapper, error) {
func (c *Client) GetRestMapper() (meta.RESTMapper, error) {
if c.restMapper != nil {
return c.restMapper, nil
}
Expand All @@ -185,7 +184,7 @@ func (c *DynamicClient) GetRestMapper() (meta.RESTMapper, error) {
return c.restMapper, err
}

func (c *DynamicClient) ExecutePodf(
func (c *Client) ExecutePodf(
ctx context.Context,
namespace, pod, container string,
command ...string,
Expand Down Expand Up @@ -228,7 +227,7 @@ func (c *DynamicClient) ExecutePodf(
return _stdout, _stderr, nil
}

func (c *DynamicClient) GetPodLogs(ctx context.Context, namespace, podName, container string) (io.ReadCloser, error) {
func (c *Client) GetPodLogs(ctx context.Context, namespace, podName, container string) (io.ReadCloser, error) {
podLogOptions := v1.PodLogOptions{}
if container != "" {
podLogOptions.Container = container
Expand All @@ -245,7 +244,12 @@ func (c *DynamicClient) GetPodLogs(ctx context.Context, namespace, podName, cont

// WaitForPod waits for a pod to be in the specified phase, or returns an
// error if the timeout is exceeded
func (c *DynamicClient) WaitForPod(ctx context.Context, namespace, name string, timeout time.Duration, phases ...v1.PodPhase) error {
func (c *Client) WaitForPod(
ctx context.Context,
namespace, name string,
timeout time.Duration,
phases ...v1.PodPhase,
) error {
start := time.Now()

pods := c.client.CoreV1().Pods(namespace)
Expand All @@ -271,7 +275,12 @@ func (c *DynamicClient) WaitForPod(ctx context.Context, namespace, name string,
}
}

func (c *DynamicClient) StreamLogsV2(ctx context.Context, namespace, name string, timeout time.Duration, containerNames ...string) error {
func (c *Client) StreamLogsV2(
ctx context.Context,
namespace, name string,
timeout time.Duration,
containerNames ...string,
) error {
podsClient := c.client.CoreV1().Pods(namespace)
pod, err := podsClient.Get(ctx, name, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -346,33 +355,46 @@ func (c *DynamicClient) StreamLogsV2(ctx context.Context, namespace, name string
}

// WaitForContainerStart waits for the specified containers to be started (or any container if no names are specified) - returns an error if the timeout is exceeded
func (c *DynamicClient) WaitForContainerStart(ctx context.Context, namespace, name string, timeout time.Duration, containerNames ...string) error {
start := time.Now()
func (c *Client) WaitForContainerStart(
ctx context.Context,
namespace, name string,
timeout time.Duration,
containerNames ...string,
) error {
timeoutTimer := time.NewTimer(timeout)
defer timeoutTimer.Stop()

podsClient := c.client.CoreV1().Pods(namespace)
for {
pod, err := podsClient.Get(ctx, name, metav1.GetOptions{})
if err != nil {
if apiErrors.IsNotFound(err) {
time.Sleep(time.Second)
continue
}
select {
case <-timeoutTimer.C:
return fmt.Errorf("timeout exceeded waiting for %s", name)

return err
}
case <-ctx.Done():
return ctx.Err()

if start.Add(timeout).Before(time.Now()) {
return fmt.Errorf("timeout exceeded waiting for %s is %s, error: %v", name, pod.Status.Phase, err)
}
default:
pod, err := podsClient.Get(ctx, name, metav1.GetOptions{})
if err != nil {
if apiErrors.IsNotFound(err) {
time.Sleep(time.Second)
continue
}

for _, container := range append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...) {
if len(containerNames) > 0 && !lo.Contains(containerNames, container.Name) {
continue
return err
}

if container.State.Running != nil || container.State.Terminated != nil {
return nil
for _, container := range append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...) {
if len(containerNames) > 0 && !lo.Contains(containerNames, container.Name) {
continue
}

if container.State.Running != nil || container.State.Terminated != nil {
return nil
}
}

time.Sleep(time.Second)
}
}
}
Expand Down

0 comments on commit 17e5747

Please sign in to comment.