diff --git a/internal/cmd/images/manifest_cmd.go b/internal/cmd/images/manifest_cmd.go index 3ac3d5d..3892771 100644 --- a/internal/cmd/images/manifest_cmd.go +++ b/internal/cmd/images/manifest_cmd.go @@ -7,12 +7,11 @@ import ( "strings" "github.com/airbytehq/abctl/internal/cmd/local/helm" - "github.com/airbytehq/abctl/internal/cmd/local/k8s" + "github.com/airbytehq/abctl/internal/common" "github.com/airbytehq/abctl/internal/trace" helmlib "github.com/mittwald/go-helm-client" "helm.sh/helm/v3/pkg/repo" - "github.com/airbytehq/abctl/internal/common" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -27,16 +26,11 @@ type ManifestCmd struct { Values string `type:"existingfile" help:"An Airbyte helm chart values file to configure helm."` } -func (c *ManifestCmd) Run(ctx context.Context, provider k8s.Provider) error { +func (c *ManifestCmd) Run(ctx context.Context) error { ctx, span := trace.NewSpan(ctx, "images manifest") defer span.End() - client, err := helm.New(provider.Kubeconfig, provider.Context, common.AirbyteNamespace) - if err != nil { - return err - } - - images, err := c.findAirbyteImages(ctx, client) + images, err := c.findAirbyteImages(ctx) if err != nil { return err } @@ -48,7 +42,7 @@ func (c *ManifestCmd) Run(ctx context.Context, provider k8s.Provider) error { return nil } -func (c *ManifestCmd) findAirbyteImages(ctx context.Context, client helm.Client) ([]string, error) { +func (c *ManifestCmd) findAirbyteImages(ctx context.Context) ([]string, error) { valuesYaml, err := helm.BuildAirbyteValues(ctx, helm.ValuesOpts{ ValuesFile: c.Values, }) @@ -57,11 +51,20 @@ func (c *ManifestCmd) findAirbyteImages(ctx context.Context, client helm.Client) } airbyteChartLoc := helm.LocateLatestAirbyteChart(c.ChartVersion, c.Chart) - return findImagesFromChart(client, valuesYaml, airbyteChartLoc, c.ChartVersion) + return FindImagesFromChart(valuesYaml, airbyteChartLoc, c.ChartVersion) } -func findImagesFromChart(client helm.Client, valuesYaml, chartName, chartVersion string) ([]string, error) { - err := client.AddOrUpdateChartRepo(repo.Entry{ +func FindImagesFromChart(valuesYaml, chartName, chartVersion string) ([]string, error) { + + // sharing a helm client with the install code causes some weird issues, + // and templating the chart doesn't need details about the k8s provider, + // we create a throwaway helm client here. + client, err := helmlib.New(helm.ClientOptions(common.AirbyteNamespace)) + if err != nil { + return nil, err + } + + err = client.AddOrUpdateChartRepo(repo.Entry{ Name: common.AirbyteRepoName, URL: common.AirbyteRepoURL, }) @@ -88,7 +91,7 @@ func findImagesFromChart(client helm.Client, valuesYaml, chartName, chartVersion // It returns a unique, sorted list of images found. func findAllImages(chartYaml string) []string { objs := decodeK8sResources(chartYaml) - imageSet := set[string]{} + imageSet := common.Set[string]{} for _, obj := range objs { @@ -98,7 +101,7 @@ func findAllImages(chartYaml string) []string { if strings.HasSuffix(z.Name, "airbyte-env") { for k, v := range z.Data { if strings.HasSuffix(k, "_IMAGE") { - imageSet.add(v) + imageSet.Add(v) } } } @@ -116,15 +119,15 @@ func findAllImages(chartYaml string) []string { } for _, c := range podSpec.InitContainers { - imageSet.add(c.Image) + imageSet.Add(c.Image) } for _, c := range podSpec.Containers { - imageSet.add(c.Image) + imageSet.Add(c.Image) } } var out []string - for _, k := range imageSet.items() { + for _, k := range imageSet.Items() { if k != "" { out = append(out, k) } @@ -149,22 +152,3 @@ func decodeK8sResources(renderedYaml string) []runtime.Object { } return out } - -type set[T comparable] struct { - vals map[T]struct{} -} - -func (s *set[T]) add(v T) { - if s.vals == nil { - s.vals = map[T]struct{}{} - } - s.vals[v] = struct{}{} -} - -func (s *set[T]) items() []T { - out := make([]T, len(s.vals)) - for k := range s.vals { - out = append(out, k) - } - return out -} diff --git a/internal/cmd/images/manifest_cmd_test.go b/internal/cmd/images/manifest_cmd_test.go index b7b0baf..3196000 100644 --- a/internal/cmd/images/manifest_cmd_test.go +++ b/internal/cmd/images/manifest_cmd_test.go @@ -20,11 +20,10 @@ func getHelmTestClient(t *testing.T) helm.Client { } func TestManifestCmd(t *testing.T) { - client := getHelmTestClient(t) cmd := ManifestCmd{ ChartVersion: "1.1.0", } - actual, err := cmd.findAirbyteImages(context.Background(), client) + actual, err := cmd.findAirbyteImages(context.Background()) if err != nil { t.Fatal(err) } @@ -48,12 +47,11 @@ func TestManifestCmd(t *testing.T) { } func TestManifestCmd_Enterprise(t *testing.T) { - client := getHelmTestClient(t) cmd := ManifestCmd{ ChartVersion: "1.1.0", Values: "testdata/enterprise.values.yaml", } - actual, err := cmd.findAirbyteImages(context.Background(), client) + actual, err := cmd.findAirbyteImages(context.Background()) if err != nil { t.Fatal(err) } @@ -81,13 +79,12 @@ func TestManifestCmd_Enterprise(t *testing.T) { } func TestManifestCmd_Nightly(t *testing.T) { - client := getHelmTestClient(t) cmd := ManifestCmd{ // This version includes chart fixes that expose images more consistently and completely. ChartVersion: "1.1.0-nightly-1728428783-9025e1a46e", Values: "testdata/enterprise.values.yaml", } - actual, err := cmd.findAirbyteImages(context.Background(), client) + actual, err := cmd.findAirbyteImages(context.Background()) if err != nil { t.Fatal(err) } diff --git a/internal/cmd/local/docker/docker.go b/internal/cmd/local/docker/docker.go index 9fb5500..b602e36 100644 --- a/internal/cmd/local/docker/docker.go +++ b/internal/cmd/local/docker/docker.go @@ -43,6 +43,7 @@ type Client interface { ImageList(ctx context.Context, options image.ListOptions) ([]image.Summary, error) ImagePull(ctx context.Context, refStr string, options image.PullOptions) (io.ReadCloser, error) + ImageSave(ctx context.Context, imageIDs []string) (io.ReadCloser, error) ServerVersion(ctx context.Context) (types.Version, error) VolumeInspect(ctx context.Context, volumeID string) (volume.Volume, error) diff --git a/internal/cmd/local/docker/dockertest/dockertest.go b/internal/cmd/local/docker/dockertest/dockertest.go index 42412ae..46a1a39 100644 --- a/internal/cmd/local/docker/dockertest/dockertest.go +++ b/internal/cmd/local/docker/dockertest/dockertest.go @@ -26,6 +26,7 @@ type MockClient struct { FnContainerExecStart func(ctx context.Context, execID string, config container.ExecStartOptions) error FnImageList func(ctx context.Context, options image.ListOptions) ([]image.Summary, error) FnImagePull func(ctx context.Context, refStr string, options image.PullOptions) (io.ReadCloser, error) + FnImageSave func(ctx context.Context, imageIDs []string) (io.ReadCloser, error) FnServerVersion func(ctx context.Context) (types.Version, error) FnVolumeInspect func(ctx context.Context, volumeID string) (volume.Volume, error) FnInfo func(ctx context.Context) (system.Info, error) @@ -82,6 +83,10 @@ func (m MockClient) ImagePull(ctx context.Context, refStr string, options image. return m.FnImagePull(ctx, refStr, options) } +func (m MockClient) ImageSave(ctx context.Context, imageIDs []string) (io.ReadCloser, error) { + return m.ImageSave(ctx, imageIDs) +} + func (m MockClient) ServerVersion(ctx context.Context) (types.Version, error) { return m.FnServerVersion(ctx) } diff --git a/internal/cmd/local/helm/helm.go b/internal/cmd/local/helm/helm.go index fab4ff3..9aaf782 100644 --- a/internal/cmd/local/helm/helm.go +++ b/internal/cmd/local/helm/helm.go @@ -26,6 +26,18 @@ type Client interface { TemplateChart(spec *helmclient.ChartSpec, options *helmclient.HelmTemplateOptions) ([]byte, error) } +func ClientOptions(namespace string) *helmclient.Options { + logger := helmLogger{} + return &helmclient.Options{ + Namespace: namespace, + Output: logger, + DebugLog: logger.Debug, + Debug: true, + RepositoryCache: paths.HelmRepoCache, + RepositoryConfig: paths.HelmRepoConfig, + } +} + // New returns the default helm client func New(kubecfg, kubectx, namespace string) (Client, error) { k8sCfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( @@ -38,16 +50,8 @@ func New(kubecfg, kubectx, namespace string) (Client, error) { return nil, fmt.Errorf("%w: unable to create rest config: %w", localerr.ErrKubernetes, err) } - logger := helmLogger{} helm, err := helmclient.NewClientFromRestConf(&helmclient.RestConfClientOptions{ - Options: &helmclient.Options{ - Namespace: namespace, - Output: logger, - DebugLog: logger.Debug, - Debug: true, - RepositoryCache: paths.HelmRepoCache, - RepositoryConfig: paths.HelmRepoConfig, - }, + Options: ClientOptions(namespace), RestConfig: restCfg, }) if err != nil { diff --git a/internal/cmd/local/k8s/cluster.go b/internal/cmd/local/k8s/cluster.go index 072510e..d10af5a 100644 --- a/internal/cmd/local/k8s/cluster.go +++ b/internal/cmd/local/k8s/cluster.go @@ -7,6 +7,7 @@ import ( "os" "time" + "github.com/airbytehq/abctl/internal/cmd/local/docker" "github.com/airbytehq/abctl/internal/cmd/local/k8s/kind" "github.com/airbytehq/abctl/internal/cmd/local/paths" "github.com/airbytehq/abctl/internal/trace" @@ -30,6 +31,7 @@ type Cluster interface { Delete(ctx context.Context) error // Exists returns true if the cluster exists, false otherwise. Exists(ctx context.Context) bool + LoadImages(ctx context.Context, dockerClient docker.Client, images []string) } // interface sanity check @@ -110,6 +112,23 @@ func (k *kindCluster) Exists(ctx context.Context) bool { return false } +// LoadImages pulls images from Docker Hub, and loads them into the kind cluster. +// This is a best-effort optimization, which is why it doesn't return an error. +// It's possible that only some images will be loaded. +func (k *kindCluster) LoadImages(ctx context.Context, dockerClient docker.Client, images []string) { + // Get a list of Kind nodes. + nodes, err := k.p.ListNodes(k.clusterName) + if err != nil { + pterm.Debug.Printfln("failed to load images: %s", err) + return + } + + err = loadImages(ctx, dockerClient, nodes, images) + if err != nil { + pterm.Debug.Printfln("failed to load images: %s", err) + } +} + func formatKindErr(err error) error { var kindErr *kindExec.RunError if errors.As(err, &kindErr) { diff --git a/internal/cmd/local/k8s/load_images.go b/internal/cmd/local/k8s/load_images.go new file mode 100644 index 0000000..c898cd5 --- /dev/null +++ b/internal/cmd/local/k8s/load_images.go @@ -0,0 +1,192 @@ +package k8s + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/airbytehq/abctl/internal/cmd/local/docker" + "github.com/airbytehq/abctl/internal/common" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/image" + "github.com/pterm/pterm" + nodeslib "sigs.k8s.io/kind/pkg/cluster/nodes" + "sigs.k8s.io/kind/pkg/cluster/nodeutils" + "sigs.k8s.io/kind/pkg/exec" + "sigs.k8s.io/kind/pkg/fs" +) + +// loadImages pulls and loads images into the kind cluster. +// It will pull all images in parallel, skip any images that already exist on the nodes, +// save the rest to an image archive (tar file), and load archive onto the nodes. +func loadImages(ctx context.Context, dockerClient docker.Client, nodes []nodeslib.Node, images []string) error { + + // Pull all the images via "docker pull", in parallel. + var wg sync.WaitGroup + wg.Add(len(images)) + for _, img := range images { + pterm.Info.Printfln("Pulling image %s", img) + + go func(img string) { + defer wg.Done() + r, err := dockerClient.ImagePull(ctx, img, image.PullOptions{}) + if err != nil { + pterm.Debug.Printfln("error pulling image %s", err) + // image pull errors are intentionally dropped because we're in a goroutine, + // and because we don't want to interrupt other image pulls. + } + defer r.Close() + io.Copy(io.Discard, r) + }(img) + } + wg.Wait() + + // The context could be canceled by now. If so, return early. + if ctx.Err() != nil { + return ctx.Err() + } + + // Determine which images need to be loaded onto the nodes. + needed := determineImagesForLoading(ctx, dockerClient, images, nodes) + if len(needed) == 0 { + return nil + } + + // Save all the images to an archive, images.tar + imagesTarPath, err := saveImageArchive(ctx, dockerClient, needed) + if err != nil { + return fmt.Errorf("failed to save image archive: %w", err) + } + defer os.RemoveAll(imagesTarPath) + + // Load the image archive into the Kind nodes. + f, err := os.Open(imagesTarPath) + if err != nil { + return fmt.Errorf("failed to open image archive: %w", err) + } + defer f.Close() + + for _, n := range nodes { + pterm.Debug.Printfln("loading image archive into kind node %s", n) + err := nodeutils.LoadImageArchive(n, f) + if err != nil { + pterm.Debug.Printfln("%s", err) + } + } + return nil +} + +// getExistingImageDigests returns the set of images that already exist on the nodes. +func getExistingImageDigests(ctx context.Context, nodes []nodeslib.Node) common.Set[string] { + existingByNode := map[string]int{} + + for _, n := range nodes { + + out, err := exec.CombinedOutputLines(n.CommandContext(ctx, "ctr", "--namespace=k8s.io", "images", "list")) + if err != nil { + // ignore the error because discovering these images is just an optimization. + pterm.Debug.Printfln("error discovering existing images: %s %s", err, out) + continue + } + if len(out) < 1 { + continue + } + + // the first line is a header. verify the columns we expect, just in case the format ever changes. + header := strings.Fields(out[0]) + if len(header) < 1 || header[0] != "REF" { + pterm.Debug.Printfln("unexpected format from ctr image list. skipping node %s.", n) + continue + } + + // skip the first line, which is a header. + for _, l := range out[1:] { + fields := strings.Fields(l) + if len(fields) < 1 { + continue + } + ref := fields[0] + pterm.Debug.Printfln("found existing image with ref %s", ref) + existingByNode[ref] += 1 + } + } + + existing := common.Set[string]{} + for ref, count := range existingByNode { + if count == len(nodes) { + existing.Add(ref) + } + } + return existing +} + +// determineImagesForLoading gets the IDs of the desired images (using "docker images"), +// subtracts the images that already exist on the nodes, and returns the resulting list. +func determineImagesForLoading(ctx context.Context, dockerClient docker.Client, images []string, nodes []nodeslib.Node) []string { + + // Get the digests of the images that already exist on the nodes. + existing := getExistingImageDigests(ctx, nodes) + if existing.Len() == 0 { + return images + } + + // Get the digests of the requested images, so we can compare them to the existing images. + imgFilter := filters.NewArgs() + for _, img := range images { + imgFilter.Add("reference", img) + } + + imgList, err := dockerClient.ImageList(ctx, image.ListOptions{Filters: imgFilter}) + if err != nil { + // ignore errors from the image digest list – it's an optimization. + pterm.Debug.Printfln("error getting image digests: %s", err) + return images + } + + // Subtract the images that already exist on the nodes. + var needed []string + for _, img := range imgList { + if !existing.Contains(img.ID) { + pterm.Debug.Printfln("image does not exist: %s %v", img.ID, img.RepoTags) + for _, tag := range img.RepoTags { + needed = append(needed, tag) + } + } else { + pterm.Debug.Printfln("image already exists: %s", img.ID) + } + } + return needed +} + +func saveImageArchive(ctx context.Context, dockerClient docker.Client, images []string) (string, error) { + + // Setup the tar path where the images will be saved. + dir, err := fs.TempDir("", "images-tar-") + if err != nil { + return "", err + } + + imagesTarPath := filepath.Join(dir, "images.tar") + pterm.Debug.Printfln("saving image archive to %s", imagesTarPath) + + wf, err := os.Create(imagesTarPath) + if err != nil { + return "", err + } + defer wf.Close() + + r, err := dockerClient.ImageSave(ctx, images) + if err != nil { + return "", err + } + + if _, err := io.Copy(wf, r); err != nil { + return "", err + } + + return imagesTarPath, nil +} diff --git a/internal/cmd/local/local/install.go b/internal/cmd/local/local/install.go index fa06afa..aae939b 100644 --- a/internal/cmd/local/local/install.go +++ b/internal/cmd/local/local/install.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/airbytehq/abctl/internal/cmd/images" "github.com/airbytehq/abctl/internal/cmd/local/docker" "github.com/airbytehq/abctl/internal/cmd/local/helm" "github.com/airbytehq/abctl/internal/cmd/local/k8s" @@ -151,6 +152,18 @@ func (c *Command) persistentVolumeClaim(ctx context.Context, namespace, name, vo return nil } +// PrepImages determines the docker images needed by the chart, pulls them, and loads them into the cluster. +// This is best effort, so errors are dropped here. +func (c *Command) PrepImages(ctx context.Context, cluster k8s.Cluster, opts *InstallOpts) { + manifest, err := images.FindImagesFromChart(opts.HelmValuesYaml, opts.AirbyteChartLoc, opts.HelmChartVersion) + if err != nil { + pterm.Debug.Printfln("error building image manifest: %s", err) + return + } + + cluster.LoadImages(ctx, c.docker.Client, manifest) +} + // Install handles the installation of Airbyte func (c *Command) Install(ctx context.Context, opts *InstallOpts) error { ctx, span := trace.NewSpan(ctx, "command.Install") diff --git a/internal/cmd/local/local_install.go b/internal/cmd/local/local_install.go index 51efb6b..d33f113 100644 --- a/internal/cmd/local/local_install.go +++ b/internal/cmd/local/local_install.go @@ -164,6 +164,9 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t return fmt.Errorf("unable to initialize local command: %w", err) } + spinner.UpdateText("Pulling images") + lc.PrepImages(ctx, cluster, opts) + if err := lc.Install(ctx, opts); err != nil { spinner.Fail("Unable to install Airbyte locally") return err diff --git a/internal/common/set.go b/internal/common/set.go new file mode 100644 index 0000000..5443f55 --- /dev/null +++ b/internal/common/set.go @@ -0,0 +1,32 @@ +package common + +type Set[T comparable] struct { + vals map[T]struct{} +} + +func (s *Set[T]) Add(v T) { + if s.vals == nil { + s.vals = map[T]struct{}{} + } + s.vals[v] = struct{}{} +} + +func (s *Set[T]) Contains(v T) bool { + if s.vals == nil { + return false + } + _, ok := s.vals[v] + return ok +} + +func (s *Set[T]) Len() int { + return len(s.vals) +} + +func (s *Set[T]) Items() []T { + out := make([]T, len(s.vals)) + for k := range s.vals { + out = append(out, k) + } + return out +}