From 9c27df35e7dbc6899733d0aee1e897738cf2500d Mon Sep 17 00:00:00 2001 From: Khurram Baig Date: Fri, 4 Oct 2024 14:21:22 +0530 Subject: [PATCH] Converted GCS and S3 to use VFS Library GCS and S3 now uses vfs library GCS also archive instead of doing per file operations. This reduces class B operations for fetch and A operation for upload. --- dev/pipeline/pipeline.yaml | 14 ++++ dev/pr/pipelinerun-gcs.yaml | 25 +++++++ dev/step-action/cache-fetch.yaml | 15 ++++- dev/step-action/cache-upload.yaml | 16 ++++- internal/fetch/fetch.go | 17 +++-- internal/provider/gcs/common.go | 41 ------------ internal/provider/gcs/fetcher.go | 58 ---------------- internal/provider/gcs/upload.go | 68 ------------------- internal/provider/gcs/vfs_file.go | 18 +++++ internal/provider/s3/s3.go | 107 ------------------------------ internal/provider/s3/vfs_file.go | 42 ++++++++++++ internal/provider/vfs/fetch.go | 42 ++++++++++++ internal/provider/vfs/upload.go | 39 +++++++++++ internal/upload/upload.go | 31 ++++----- 14 files changed, 232 insertions(+), 301 deletions(-) create mode 100644 dev/pr/pipelinerun-gcs.yaml delete mode 100644 internal/provider/gcs/common.go delete mode 100644 internal/provider/gcs/fetcher.go delete mode 100644 internal/provider/gcs/upload.go create mode 100644 internal/provider/gcs/vfs_file.go delete mode 100644 internal/provider/s3/s3.go create mode 100644 internal/provider/s3/vfs_file.go create mode 100644 internal/provider/vfs/fetch.go create mode 100644 internal/provider/vfs/upload.go diff --git a/dev/pipeline/pipeline.yaml b/dev/pipeline/pipeline.yaml index c473eb449..7e2f4fee5 100644 --- a/dev/pipeline/pipeline.yaml +++ b/dev/pipeline/pipeline.yaml @@ -74,8 +74,16 @@ spec: value: $(workspaces.source.path)/cache - name: workingdir value: $(workspaces.source.path)/repo + - name: awsCredentialFile + value: $(workspaces.cred.path)/credentials + - name: awsConfigFile + value: $(workspaces.cred.path)/config - name: cred-store value: $(workspaces.cred.path) + - name: cred-store + value: $(workspaces.cred.path) + - name: googleCredentialsPath + value: $(workspaces.cred.path)/creds.json - name: run-go-build workingDir: $(workspaces.source.path)/repo @@ -104,5 +112,11 @@ spec: value: $(workspaces.source.path)/repo - name: cred-store value: $(workspaces.cred.path) + - name: awsCredentialFile + value: $(workspaces.cred.path)/credentials + - name: awsConfigFile + value: $(workspaces.cred.path)/config - name: force-cache-upload value: $(params.force-cache-upload) + - name: googleCredentialsPath + value: $(workspaces.cred.path)/creds.json diff --git a/dev/pr/pipelinerun-gcs.yaml b/dev/pr/pipelinerun-gcs.yaml new file mode 100644 index 000000000..d9f78445c --- /dev/null +++ b/dev/pr/pipelinerun-gcs.yaml @@ -0,0 +1,25 @@ +--- +apiVersion: tekton.dev/v1 +kind: PipelineRun +metadata: + generateName: pipelinerun-s3- +spec: + pipelineRef: + name: pipeline + params: + - name: repo_url + value: https://github.com/chmouel/go-helloworld + - name: revision + value: main + - name: registry + value: gs://tekton-cache-assdfsdfdsvbdor0934o5 + - name: buildCommand + value: go build -v ./ + - name: image + value: golang:1.21 + workspaces: + - name: cred + secret: + secretName: aws-cred + - name: source + emptyDir: {} diff --git a/dev/step-action/cache-fetch.yaml b/dev/step-action/cache-fetch.yaml index d89db3c74..49cef245f 100644 --- a/dev/step-action/cache-fetch.yaml +++ b/dev/step-action/cache-fetch.yaml @@ -38,6 +38,16 @@ spec: The path where to find the google credentials. If left empty, it is ignored. type: string default: "" + - name: awsConfigFile + description: | + The path to the aws config file. If left empty, it is ignored. + type: string + default: "" + - name: awsCredentialFile + description: | + The path to find the aws credentials file. If left empty, it is ignored. + type: string + default: "" - name: cred-store description: | The path where to find the creds to download cache files . If left empty, it is ignored. @@ -60,7 +70,10 @@ spec: value: $(params.googleCredentialsPath) - name: CRED_STORE value: $(params.cred-store) - + - name: AWS_CONFIG_FILE + value: $(params.awsConfigFile) + - name: AWS_SHARED_CREDENTIALS_FILE + value: $(params.awsCredentialFile) # FIXME: use a released version once something is released :) image: ko://github.com/openshift-pipelines/tekton-caches/cmd/cache args: ["$(params.patterns[*])"] diff --git a/dev/step-action/cache-upload.yaml b/dev/step-action/cache-upload.yaml index e4330bdc8..b052e6eca 100644 --- a/dev/step-action/cache-upload.yaml +++ b/dev/step-action/cache-upload.yaml @@ -48,6 +48,16 @@ spec: The path where to find the google credentials. If left empty, it is ignored. type: string default: "" + - name: awsConfigFile + description: | + The path to the aws config file. If left empty, it is ignored. + type: string + default: "" + - name: awsCredentialFile + description: | + The path to find the aws credentials file. If left empty, it is ignored. + type: string + default: "" - name: cred-store description: | The path where to find the creds to upload cache files . If left empty, it is ignored. @@ -68,8 +78,10 @@ spec: value: $(params.force-cache-upload) - name: GOOGLE_APPLICATION_CREDENTIALS value: $(params.googleCredentialsPath) - - name: CRED_STORE - value: $(params.cred-store) + - name: AWS_CONFIG_FILE + value: $(params.awsConfigFile) + - name: AWS_SHARED_CREDENTIALS_FILE + value: $(params.awsCredentialFile) # FIXME: use a released version once something is released :) image: ko://github.com/openshift-pipelines/tekton-caches/cmd/cache args: ["$(params.patterns[*])"] diff --git a/internal/fetch/fetch.go b/internal/fetch/fetch.go index 87631938c..93bc4875f 100644 --- a/internal/fetch/fetch.go +++ b/internal/fetch/fetch.go @@ -7,9 +7,8 @@ import ( "os" "strings" - "github.com/openshift-pipelines/tekton-caches/internal/tar" - "github.com/openshift-pipelines/tekton-caches/internal/provider/s3" + "github.com/openshift-pipelines/tekton-caches/internal/provider/vfs" "github.com/openshift-pipelines/tekton-caches/internal/provider/gcs" "github.com/openshift-pipelines/tekton-caches/internal/provider/oci" @@ -26,20 +25,24 @@ func Fetch(ctx context.Context, hash, target, folder string, insecure bool) erro if err != nil { return err } + target = strings.ReplaceAll(target, "{{hash}}", hash) source := strings.TrimPrefix(target, u.Scheme+"://") - source = strings.ReplaceAll(source, "{{hash}}", hash) - file, _ := os.CreateTemp("", "cache.tar") switch u.Scheme { case "oci": return oci.Fetch(ctx, hash, source, folder, insecure) case "s3": - if err := s3.Fetch(ctx, source, file.Name()); err != nil { + remoteFile, err := s3.File(ctx, target) + if err != nil { return err } - return tar.Untar(ctx, file, folder) + return vfs.Fetch(ctx, folder, remoteFile) case "gs": - return gcs.Fetch(ctx, hash, source, folder) + remoteFile, err := gcs.File(ctx, target) + if err != nil { + return err + } + return vfs.Fetch(ctx, folder, remoteFile) default: return fmt.Errorf("unknown schema: %s", target) } diff --git a/internal/provider/gcs/common.go b/internal/provider/gcs/common.go deleted file mode 100644 index 979f3b2d6..000000000 --- a/internal/provider/gcs/common.go +++ /dev/null @@ -1,41 +0,0 @@ -package gcs - -import ( - "context" - "io" - "os" - "path/filepath" - - "cloud.google.com/go/storage" -) - -// realGCS is a wrapper over the GCS client functions. -type realGCS struct { - client *storage.Client - manifestObject string -} - -func (gp realGCS) NewWriter(ctx context.Context, bucket, object string) io.WriteCloser { - if object != gp.manifestObject { - object = filepath.Join(".cache", object) - } - return gp.client.Bucket(bucket).Object(object). - If(storage.Conditions{DoesNotExist: true}). // Skip upload if already exists. - NewWriter(ctx) -} - -func (gp realGCS) NewReader(ctx context.Context, bucket, object string) (io.ReadCloser, error) { - return gp.client.Bucket(bucket).Object(object).NewReader(ctx) -} - -// realOS merely wraps the os package implementations. -type realOS struct{} - -func (realOS) EvalSymlinks(path string) (string, error) { return filepath.EvalSymlinks(path) } -func (realOS) Stat(path string) (os.FileInfo, error) { return os.Stat(path) } -func (realOS) Rename(oldpath, newpath string) error { return os.Rename(oldpath, newpath) } -func (realOS) Chmod(name string, mode os.FileMode) error { return os.Chmod(name, mode) } -func (realOS) Create(name string) (*os.File, error) { return os.Create(name) } -func (realOS) MkdirAll(path string, perm os.FileMode) error { return os.MkdirAll(path, perm) } -func (realOS) Open(name string) (*os.File, error) { return os.Open(name) } -func (realOS) RemoveAll(path string) error { return os.RemoveAll(path) } diff --git a/internal/provider/gcs/fetcher.go b/internal/provider/gcs/fetcher.go deleted file mode 100644 index 2453dd667..000000000 --- a/internal/provider/gcs/fetcher.go +++ /dev/null @@ -1,58 +0,0 @@ -package gcs - -import ( - "context" - "fmt" - "os" - "path/filepath" - "strings" - "time" - - "cloud.google.com/go/storage" - "github.com/GoogleCloudPlatform/cloud-builders/gcs-fetcher/pkg/common" - "github.com/GoogleCloudPlatform/cloud-builders/gcs-fetcher/pkg/fetcher" - "google.golang.org/api/option" -) - -const ( - sourceType = "Manifest" - stagingFolder = ".download/" - backoff = 100 * time.Millisecond - retries = 0 -) - -func Fetch(ctx context.Context, hash, target, folder string) error { - location := "gs://" + target + hash + ".json" - bucket, object, generation, err := common.ParseBucketObject(location) - if err != nil { - return fmt.Errorf("parsing location from %q failed: %w", location, err) - } - client, err := storage.NewClient(ctx, option.WithUserAgent(userAgent)) - if err != nil { - return fmt.Errorf("failed to create a new gcs client: %w", err) - } - gcs := &fetcher.Fetcher{ - GCS: realGCS{client, object}, - OS: realOS{}, - DestDir: folder, - StagingDir: filepath.Join(folder, stagingFolder), - CreatedDirs: map[string]bool{}, - Bucket: bucket, - Object: object, - Generation: generation, - TimeoutGCS: true, - WorkerCount: workerCount, - Retries: retries, - Backoff: backoff, - SourceType: sourceType, - KeepSource: false, - Verbose: false, - Stdout: os.Stdout, - Stderr: os.Stderr, - } - err = gcs.Fetch(ctx) - if err != nil && !strings.Contains(err.Error(), "storage: object doesn't exist") { - return fmt.Errorf("failed to fetch: %w", err) - } - return nil -} diff --git a/internal/provider/gcs/upload.go b/internal/provider/gcs/upload.go deleted file mode 100644 index 551bb1f50..000000000 --- a/internal/provider/gcs/upload.go +++ /dev/null @@ -1,68 +0,0 @@ -package gcs - -import ( - "context" - "errors" - "fmt" - "os" - "path/filepath" - - "cloud.google.com/go/storage" - "github.com/GoogleCloudPlatform/cloud-builders/gcs-fetcher/pkg/common" - "github.com/GoogleCloudPlatform/cloud-builders/gcs-fetcher/pkg/uploader" - "google.golang.org/api/option" -) - -const ( - userAgent = "tekton-caches" - // The number of files to upload in parallel. - workerCount = 200 -) - -// gcs-uploader -dir examples/ -location gs://tekton-caches-tests/test - -func Upload(ctx context.Context, hash, target, folder string) error { - location := "gs://" + target + hash + ".json" - client, err := storage.NewClient(ctx, option.WithUserAgent(userAgent)) - if err != nil { - return fmt.Errorf("failed to create a new gcs client: %w", err) - } - bucket, object, generation, err := common.ParseBucketObject(location) - if err != nil { - return fmt.Errorf("parsing location from %q failed: %w", location, err) - } - if generation != 0 { - return errors.New("cannot specify manifest file generation") - } - _, err = client.Bucket(bucket).Object(object).NewReader(ctx) - if err != nil && !errors.Is(err, storage.ErrObjectNotExist) { - return fmt.Errorf("failed to fetch the object: %w", err) - } - // if !errors.Is(err, storage.ErrObjectNotExist) { - // // Delete if the object already exists… - // // It's a workaround to not have the precondition failure… - // if err := client.Bucket(bucket).Object(object).Delete(ctx); err != nil { - // return err - // } - // } - - u := uploader.New(ctx, realGCS{client, object}, realOS{}, bucket, object, workerCount) - - if err := filepath.Walk(folder, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if info.IsDir() { - return nil - } - - return u.Do(ctx, path, info) - }); err != nil { - return fmt.Errorf("failed to walk the path: %w", err) - } - - if err := u.Done(ctx); err != nil { - return fmt.Errorf("failed to upload: %w", err) - } - return nil -} diff --git a/internal/provider/gcs/vfs_file.go b/internal/provider/gcs/vfs_file.go new file mode 100644 index 000000000..3c7e36abf --- /dev/null +++ b/internal/provider/gcs/vfs_file.go @@ -0,0 +1,18 @@ +package gcs + +import ( + "context" + + "github.com/c2fo/vfs/v6" + "github.com/c2fo/vfs/v6/backend" + "github.com/c2fo/vfs/v6/backend/gs" + "github.com/c2fo/vfs/v6/vfssimple" +) + +func File(ctx context.Context, file string) (vfs.File, error) { + bucketAuth := gs.NewFileSystem() + bucketAuth = bucketAuth.WithContext(ctx) + backend.Register("gs://", bucketAuth) + + return vfssimple.NewFile(file) +} diff --git a/internal/provider/s3/s3.go b/internal/provider/s3/s3.go deleted file mode 100644 index 2243cf1f1..000000000 --- a/internal/provider/s3/s3.go +++ /dev/null @@ -1,107 +0,0 @@ -package s3 - -import ( - "context" - "fmt" - "io" - "log" - "os" - "path/filepath" - "strings" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/s3" -) - -const ( - AwsEnvName = "AWS_SHARED_CREDENTIALS_FILE" - AwsConfigFile = "AWS_CONFIG_FILE" -) - -func Upload(ctx context.Context, target, filePath string) error { - log.Printf("S3: Uploading to %s", target) - return upload(ctx, target, filePath) -} - -func Fetch(ctx context.Context, source, filePath string) error { - log.Printf("S3: Downloading %s", source) - return fetch(ctx, source, filePath) -} - -func getS3Client(ctx context.Context) *s3.Client { - credStore := os.Getenv("CRED_STORE") - if credStore != "" { - os.Setenv(AwsEnvName, credStore+"/credentials") - os.Setenv(AwsConfigFile, credStore+"/config") - - log.Printf("Setting %s to %s", AwsEnvName, os.Getenv(AwsEnvName)) - log.Printf("Setting %s to %s", AwsConfigFile, os.Getenv(AwsConfigFile)) - } - cfg, err := config.LoadDefaultConfig(ctx) - if err != nil { - log.Fatal(err) - } - s3Client := s3.NewFromConfig(cfg) - return s3Client -} - -func fetch(ctx context.Context, source, filePath string) error { - s3Client := getS3Client(ctx) - index := strings.Index(source, "/") - if index == -1 { - return fmt.Errorf("invalid S3 URL: %s", source) - } - bucket := source[:index] - key := source[index+1:] - log.Printf("Downloading from S3. FIle: %s, Bucket: %s Key : %s", filePath, bucket, key) - - if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil { - log.Fatal("failed to create folder: " + filePath) - return err - } - - result, err := s3Client.GetObject(context.TODO(), &s3.GetObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - }) - if err != nil { - log.Printf("Couldn't get object %v:%v. Error: %v", bucket, key, err) - return err - } - - defer result.Body.Close() - - f, err := os.Create(filePath) - if err != nil { - return err - } - - defer f.Close() - body, err := io.ReadAll(result.Body) - if err != nil { - log.Printf("Couldn't read object body from %v. Error: %v\n", key, err) - } - _, err = f.Write(body) - return err -} - -func upload(ctx context.Context, target, filePath string) error { - // Upload the file - index := strings.Index(target, "/") - if index == -1 { - return fmt.Errorf("invalid S3 URL: %s", target) - } - bucket := target[:index] - key := target[index+1:] - log.Printf("Uploading to S3. Bucket: %s Key : %s", bucket, key) - - s3Client := getS3Client(ctx) - file, _ := os.Open(filePath) - _, err := s3Client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(key), - Body: file, - }) - return err -} diff --git a/internal/provider/s3/vfs_file.go b/internal/provider/s3/vfs_file.go new file mode 100644 index 000000000..2db8ca7e0 --- /dev/null +++ b/internal/provider/s3/vfs_file.go @@ -0,0 +1,42 @@ +package s3 + +import ( + "context" + "log" + "os" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + + "github.com/c2fo/vfs/v6" + + "github.com/c2fo/vfs/v6/vfssimple" + + "github.com/c2fo/vfs/v6/backend" + vfss3 "github.com/c2fo/vfs/v6/backend/s3" +) + +func getS3Client(ctx context.Context) *s3.Client { + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + log.Fatal(err) + } + + s3Client := s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = true + }) + + return s3Client +} + +func File(ctx context.Context, file string) (vfs.File, error) { + bucketAuth := vfss3.NewFileSystem().WithClient(getS3Client(ctx)).WithOptions(vfss3.Options{ + ForcePathStyle: true, + Endpoint: os.Getenv("AWS_ENDPOINT_URL"), + }) + + backend.Register("s3://", bucketAuth) + + log.Printf("setting location: %s", file) + return vfssimple.NewFile(file) +} diff --git a/internal/provider/vfs/fetch.go b/internal/provider/vfs/fetch.go new file mode 100644 index 000000000..4549bd0f4 --- /dev/null +++ b/internal/provider/vfs/fetch.go @@ -0,0 +1,42 @@ +package vfs + +import ( + "context" + "log" + "os" + + "github.com/c2fo/vfs/v6" + "github.com/c2fo/vfs/v6/vfssimple" + "github.com/openshift-pipelines/tekton-caches/internal/tar" +) + +func Fetch(ctx context.Context, folder string, remoteFile vfs.File) error { + file, err := os.CreateTemp("", "cache.tar.gz") + if err != nil { + return err + } + defer os.Remove(file.Name()) + + localFile, err := vfssimple.NewFile("file://" + file.Name()) + if err != nil { + log.Printf("error creating file: %s", err) + return err + } + + err = remoteFile.CopyToFile(localFile) + if err != nil { + log.Printf("error copying to location: %s", err) + return err + } + + log.Printf("cache downloaded from %s to %s\n", localFile, remoteFile) + + if err := tar.Untar(ctx, file, folder); err != nil { + log.Printf("error creating tar file: %s", err) + return err + } + + log.Printf("cache untarred %s", folder) + + return nil +} diff --git a/internal/provider/vfs/upload.go b/internal/provider/vfs/upload.go new file mode 100644 index 000000000..30e194b8c --- /dev/null +++ b/internal/provider/vfs/upload.go @@ -0,0 +1,39 @@ +package vfs + +import ( + "log" + "os" + + "github.com/c2fo/vfs/v6" + "github.com/openshift-pipelines/tekton-caches/internal/tar" + + "github.com/c2fo/vfs/v6/vfssimple" +) + +func Upload(folder string, remoteFile vfs.File) error { + file, err := os.CreateTemp("", "cache.tar.gz") + if err != nil { + return err + } + defer os.Remove(file.Name()) + + if err := tar.Tarit(folder, file.Name()); err != nil { + log.Printf("error creating tar file: %s", err) + return err + } + + localFile, err := vfssimple.NewFile("file://" + file.Name()) + if err != nil { + log.Printf("error creating file: %s", err) + return err + } + + err = localFile.CopyToFile(remoteFile) + if err != nil { + log.Printf("error copying to location: %s", err) + return err + } + + log.Printf("cache uploaded from %s to %s\n", localFile, remoteFile) + return nil +} diff --git a/internal/upload/upload.go b/internal/upload/upload.go index 67ff7e9ff..d13e67d1a 100644 --- a/internal/upload/upload.go +++ b/internal/upload/upload.go @@ -3,41 +3,38 @@ package upload import ( "context" "fmt" - "log" "net/url" - "os" "strings" - "github.com/openshift-pipelines/tekton-caches/internal/tar" - - "github.com/openshift-pipelines/tekton-caches/internal/provider/s3" - "github.com/openshift-pipelines/tekton-caches/internal/provider/gcs" "github.com/openshift-pipelines/tekton-caches/internal/provider/oci" + "github.com/openshift-pipelines/tekton-caches/internal/provider/s3" + "github.com/openshift-pipelines/tekton-caches/internal/provider/vfs" ) func Upload(ctx context.Context, hash, target, folder string, insecure bool) error { + target = strings.ReplaceAll(target, "{{hash}}", hash) u, err := url.Parse(target) if err != nil { return err } newTarget := strings.TrimPrefix(target, u.Scheme+"://") - newTarget = strings.ReplaceAll(newTarget, "{{hash}}", hash) - tarFile, err := os.CreateTemp("", "cache.tar") - if err != nil { - log.Fatal(err) - } - if err := tar.Tarit(folder, tarFile.Name()); err != nil { - return err - } - defer os.Remove(tarFile.Name()) + switch u.Scheme { case "oci": return oci.Upload(ctx, hash, newTarget, folder, insecure) case "s3": - return s3.Upload(ctx, newTarget, tarFile.Name()) + remoteFile, err := s3.File(ctx, target) + if err != nil { + return err + } + return vfs.Upload(folder, remoteFile) case "gs": - return gcs.Upload(ctx, hash, newTarget, folder) + remoteFile, err := gcs.File(ctx, target) + if err != nil { + return err + } + return vfs.Upload(folder, remoteFile) default: return fmt.Errorf("unknown schema: %s", target) }