Skip to content

Commit

Permalink
feat: implement proxyMode for serve s3 command
Browse files Browse the repository at this point in the history
  • Loading branch information
saw-jan committed Feb 21, 2024
1 parent 9c6bea3 commit ad95a71
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 61 deletions.
147 changes: 96 additions & 51 deletions cmd/serve/s3/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/hex"
"io"
"net/http"
"os"
"path"
"strings"
Expand All @@ -25,22 +26,26 @@ var (
// backend for gofakes3
type s3Backend struct {
opt *Options
vfs *vfs.VFS
s *Server
meta *sync.Map
}

// newBackend creates a new SimpleBucketBackend.
func newBackend(vfs *vfs.VFS, opt *Options) gofakes3.Backend {
func newBackend(s *Server, opt *Options) gofakes3.Backend {
return &s3Backend{
vfs: vfs,
opt: opt,
s: s,
meta: new(sync.Map),
}
}

// ListBuckets always returns the default bucket.
func (b *s3Backend) ListBuckets() ([]gofakes3.BucketInfo, error) {
dirEntries, err := getDirEntries("/", b.vfs)
func (b *s3Backend) ListBuckets(r *http.Request) ([]gofakes3.BucketInfo, error) {
_vfs, err := b.s.getVFS(r.Context())
if err != nil {
return nil, err
}
dirEntries, err := getDirEntries("/", _vfs)
if err != nil {
return nil, err
}
Expand All @@ -59,8 +64,12 @@ func (b *s3Backend) ListBuckets() ([]gofakes3.BucketInfo, error) {
}

// ListBucket lists the objects in the given bucket.
func (b *s3Backend) ListBucket(bucket string, prefix *gofakes3.Prefix, page gofakes3.ListBucketPage) (*gofakes3.ObjectList, error) {
_, err := b.vfs.Stat(bucket)
func (b *s3Backend) ListBucket(r *http.Request, bucket string, prefix *gofakes3.Prefix, page gofakes3.ListBucketPage) (*gofakes3.ObjectList, error) {
_vfs, err := b.s.getVFS(r.Context())
if err != nil {
return nil, err
}
_, err = _vfs.Stat(bucket)
if err != nil {
return nil, gofakes3.BucketNotFound(bucket)
}
Expand All @@ -79,7 +88,7 @@ func (b *s3Backend) ListBucket(bucket string, prefix *gofakes3.Prefix, page gofa
response := gofakes3.NewObjectList()
path, remaining := prefixParser(prefix)

err = b.entryListR(bucket, path, remaining, prefix.HasDelimiter, response)
err = b.entryListR(_vfs, bucket, path, remaining, prefix.HasDelimiter, response)
if err == gofakes3.ErrNoSuchKey {
// AWS just returns an empty list
response = gofakes3.NewObjectList()
Expand All @@ -93,14 +102,18 @@ func (b *s3Backend) ListBucket(bucket string, prefix *gofakes3.Prefix, page gofa
// HeadObject returns the fileinfo for the given object name.
//
// Note that the metadata is not supported yet.
func (b *s3Backend) HeadObject(bucketName, objectName string) (*gofakes3.Object, error) {
_, err := b.vfs.Stat(bucketName)
func (b *s3Backend) HeadObject(r *http.Request, bucketName, objectName string) (*gofakes3.Object, error) {
_vfs, err := b.s.getVFS(r.Context())
if err != nil {
return nil, err
}
_, err = _vfs.Stat(bucketName)
if err != nil {
return nil, gofakes3.BucketNotFound(bucketName)
}

fp := path.Join(bucketName, objectName)
node, err := b.vfs.Stat(fp)
node, err := _vfs.Stat(fp)
if err != nil {
return nil, gofakes3.KeyNotFound(objectName)
}
Expand Down Expand Up @@ -140,14 +153,18 @@ func (b *s3Backend) HeadObject(bucketName, objectName string) (*gofakes3.Object,
}

// GetObject fetchs the object from the filesystem.
func (b *s3Backend) GetObject(bucketName, objectName string, rangeRequest *gofakes3.ObjectRangeRequest) (obj *gofakes3.Object, err error) {
_, err = b.vfs.Stat(bucketName)
func (b *s3Backend) GetObject(r *http.Request, bucketName, objectName string, rangeRequest *gofakes3.ObjectRangeRequest) (obj *gofakes3.Object, err error) {
_vfs, err := b.s.getVFS(r.Context())
if err != nil {
return nil, err
}
_, err = _vfs.Stat(bucketName)
if err != nil {
return nil, gofakes3.BucketNotFound(bucketName)
}

fp := path.Join(bucketName, objectName)
node, err := b.vfs.Stat(fp)
node, err := _vfs.Stat(fp)
if err != nil {
return nil, gofakes3.KeyNotFound(objectName)
}
Expand Down Expand Up @@ -214,20 +231,24 @@ func (b *s3Backend) GetObject(bucketName, objectName string, rangeRequest *gofak
}

// TouchObject creates or updates meta on specified object.
func (b *s3Backend) TouchObject(fp string, meta map[string]string) (result gofakes3.PutObjectResult, err error) {
_, err = b.vfs.Stat(fp)
func (b *s3Backend) TouchObject(r *http.Request, fp string, meta map[string]string) (result gofakes3.PutObjectResult, err error) {
_vfs, err := b.s.getVFS(r.Context())
if err != nil {
return result, err
}
_, err = _vfs.Stat(fp)
if err == vfs.ENOENT {
f, err := b.vfs.Create(fp)
f, err := _vfs.Create(fp)
if err != nil {
return result, err
}
_ = f.Close()
return b.TouchObject(fp, meta)
return b.TouchObject(r, fp, meta)
} else if err != nil {
return result, err
}

_, err = b.vfs.Stat(fp)
_, err = _vfs.Stat(fp)
if err != nil {
return result, err
}
Expand All @@ -237,15 +258,15 @@ func (b *s3Backend) TouchObject(fp string, meta map[string]string) (result gofak
if val, ok := meta["X-Amz-Meta-Mtime"]; ok {
ti, err := swift.FloatStringToTime(val)
if err == nil {
return result, b.vfs.Chtimes(fp, ti, ti)
return result, _vfs.Chtimes(fp, ti, ti)
}
// ignore error since the file is successfully created
}

if val, ok := meta["mtime"]; ok {
ti, err := swift.FloatStringToTime(val)
if err == nil {
return result, b.vfs.Chtimes(fp, ti, ti)
return result, _vfs.Chtimes(fp, ti, ti)
}
// ignore error since the file is successfully created
}
Expand All @@ -254,12 +275,16 @@ func (b *s3Backend) TouchObject(fp string, meta map[string]string) (result gofak
}

// PutObject creates or overwrites the object with the given name.
func (b *s3Backend) PutObject(
func (b *s3Backend) PutObject(r *http.Request,
bucketName, objectName string,
meta map[string]string,
input io.Reader, size int64,
) (result gofakes3.PutObjectResult, err error) {
_, err = b.vfs.Stat(bucketName)
_vfs, err := b.s.getVFS(r.Context())
if err != nil {
return result, err
}
_, err = _vfs.Stat(bucketName)
if err != nil {
return result, gofakes3.BucketNotFound(bucketName)
}
Expand All @@ -273,30 +298,30 @@ func (b *s3Backend) PutObject(
// }

if objectDir != "." {
if err := mkdirRecursive(objectDir, b.vfs); err != nil {
if err := mkdirRecursive(objectDir, _vfs); err != nil {
return result, err
}
}

f, err := b.vfs.Create(fp)
f, err := _vfs.Create(fp)
if err != nil {
return result, err
}

if _, err := io.Copy(f, input); err != nil {
// remove file when i/o error occurred (FsPutErr)
_ = f.Close()
_ = b.vfs.Remove(fp)
_ = _vfs.Remove(fp)
return result, err
}

if err := f.Close(); err != nil {
// remove file when close error occurred (FsPutErr)
_ = b.vfs.Remove(fp)
_ = _vfs.Remove(fp)
return result, err
}

_, err = b.vfs.Stat(fp)
_, err = _vfs.Stat(fp)
if err != nil {
return result, err
}
Expand All @@ -306,15 +331,15 @@ func (b *s3Backend) PutObject(
if val, ok := meta["X-Amz-Meta-Mtime"]; ok {
ti, err := swift.FloatStringToTime(val)
if err == nil {
return result, b.vfs.Chtimes(fp, ti, ti)
return result, _vfs.Chtimes(fp, ti, ti)
}
// ignore error since the file is successfully created
}

if val, ok := meta["mtime"]; ok {
ti, err := swift.FloatStringToTime(val)
if err == nil {
return result, b.vfs.Chtimes(fp, ti, ti)
return result, _vfs.Chtimes(fp, ti, ti)
}
// ignore error since the file is successfully created
}
Expand All @@ -323,9 +348,9 @@ func (b *s3Backend) PutObject(
}

// DeleteMulti deletes multiple objects in a single request.
func (b *s3Backend) DeleteMulti(bucketName string, objects ...string) (result gofakes3.MultiDeleteResult, rerr error) {
func (b *s3Backend) DeleteMulti(r *http.Request, bucketName string, objects ...string) (result gofakes3.MultiDeleteResult, rerr error) {
for _, object := range objects {
if err := b.deleteObject(bucketName, object); err != nil {
if err := b.deleteObject(r, bucketName, object); err != nil {
fs.Errorf("serve s3", "delete object failed: %v", err)
result.Error = append(result.Error, gofakes3.ErrorResult{
Code: gofakes3.ErrInternal,
Expand All @@ -343,32 +368,40 @@ func (b *s3Backend) DeleteMulti(bucketName string, objects ...string) (result go
}

// DeleteObject deletes the object with the given name.
func (b *s3Backend) DeleteObject(bucketName, objectName string) (result gofakes3.ObjectDeleteResult, rerr error) {
return result, b.deleteObject(bucketName, objectName)
func (b *s3Backend) DeleteObject(r *http.Request, bucketName, objectName string) (result gofakes3.ObjectDeleteResult, rerr error) {
return result, b.deleteObject(r, bucketName, objectName)
}

// deleteObject deletes the object from the filesystem.
func (b *s3Backend) deleteObject(bucketName, objectName string) error {
_, err := b.vfs.Stat(bucketName)
func (b *s3Backend) deleteObject(r *http.Request, bucketName, objectName string) error {
_vfs, err := b.s.getVFS(r.Context())
if err != nil {
return err
}
_, err = _vfs.Stat(bucketName)
if err != nil {
return gofakes3.BucketNotFound(bucketName)
}

fp := path.Join(bucketName, objectName)
// S3 does not report an error when attemping to delete a key that does not exist, so
// we need to skip IsNotExist errors.
if err := b.vfs.Remove(fp); err != nil && !os.IsNotExist(err) {
if err := _vfs.Remove(fp); err != nil && !os.IsNotExist(err) {
return err
}

// FIXME: unsafe operation
rmdirRecursive(fp, b.vfs)
rmdirRecursive(fp, _vfs)
return nil
}

// CreateBucket creates a new bucket.
func (b *s3Backend) CreateBucket(name string) error {
_, err := b.vfs.Stat(name)
func (b *s3Backend) CreateBucket(r *http.Request, name string) error {
_vfs, err := b.s.getVFS(r.Context())
if err != nil {
return err
}
_, err = _vfs.Stat(name)
if err != nil && err != vfs.ENOENT {
return gofakes3.ErrInternal
}
Expand All @@ -377,29 +410,37 @@ func (b *s3Backend) CreateBucket(name string) error {
return gofakes3.ErrBucketAlreadyExists
}

if err := b.vfs.Mkdir(name, 0755); err != nil {
if err := _vfs.Mkdir(name, 0755); err != nil {
return gofakes3.ErrInternal
}
return nil
}

// DeleteBucket deletes the bucket with the given name.
func (b *s3Backend) DeleteBucket(name string) error {
_, err := b.vfs.Stat(name)
func (b *s3Backend) DeleteBucket(r *http.Request, name string) error {
_vfs, err := b.s.getVFS(r.Context())
if err != nil {
return err
}
_, err = _vfs.Stat(name)
if err != nil {
return gofakes3.BucketNotFound(name)
}

if err := b.vfs.Remove(name); err != nil {
if err := _vfs.Remove(name); err != nil {
return gofakes3.ErrBucketNotEmpty
}

return nil
}

// BucketExists checks if the bucket exists.
func (b *s3Backend) BucketExists(name string) (exists bool, err error) {
_, err = b.vfs.Stat(name)
func (b *s3Backend) BucketExists(r *http.Request, name string) (exists bool, err error) {
_vfs, err := b.s.getVFS(r.Context())
if err != nil {
return false, err
}
_, err = _vfs.Stat(name)
if err != nil {
return false, nil
}
Expand All @@ -408,7 +449,11 @@ func (b *s3Backend) BucketExists(name string) (exists bool, err error) {
}

// CopyObject copy specified object from srcKey to dstKey.
func (b *s3Backend) CopyObject(srcBucket, srcKey, dstBucket, dstKey string, meta map[string]string) (result gofakes3.CopyObjectResult, err error) {
func (b *s3Backend) CopyObject(r *http.Request, srcBucket, srcKey, dstBucket, dstKey string, meta map[string]string) (result gofakes3.CopyObjectResult, err error) {
_vfs, err := b.s.getVFS(r.Context())
if err != nil {
return result, err
}
fp := path.Join(srcBucket, srcKey)
if srcBucket == dstBucket && srcKey == dstKey {
b.meta.Store(fp, meta)
Expand All @@ -425,15 +470,15 @@ func (b *s3Backend) CopyObject(srcBucket, srcKey, dstBucket, dstKey string, meta
return result, nil
}

return result, b.vfs.Chtimes(fp, ti, ti)
return result, _vfs.Chtimes(fp, ti, ti)
}

cStat, err := b.vfs.Stat(fp)
cStat, err := _vfs.Stat(fp)
if err != nil {
return
}

c, err := b.GetObject(srcBucket, srcKey, nil)
c, err := b.GetObject(r, srcBucket, srcKey, nil)
if err != nil {
return
}
Expand All @@ -450,7 +495,7 @@ func (b *s3Backend) CopyObject(srcBucket, srcKey, dstBucket, dstKey string, meta
meta["mtime"] = swift.TimeToFloatString(cStat.ModTime())
}

_, err = b.PutObject(dstBucket, dstKey, meta, c.Contents, c.Size)
_, err = b.PutObject(r, dstBucket, dstKey, meta, c.Contents, c.Size)
if err != nil {
return
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/serve/s3/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"strings"

"github.com/Mikubill/gofakes3"
"github.com/rclone/rclone/vfs"
)

func (b *s3Backend) entryListR(bucket, fdPath, name string, addPrefix bool, response *gofakes3.ObjectList) error {
func (b *s3Backend) entryListR(_vfs *vfs.VFS, bucket, fdPath, name string, addPrefix bool, response *gofakes3.ObjectList) error {
fp := path.Join(bucket, fdPath)

dirEntries, err := getDirEntries(fp, b.vfs)
dirEntries, err := getDirEntries(fp, _vfs)
if err != nil {
return err
}
Expand All @@ -30,7 +31,7 @@ func (b *s3Backend) entryListR(bucket, fdPath, name string, addPrefix bool, resp
response.AddPrefix(gofakes3.URLEncode(objectPath))
continue
}
err := b.entryListR(bucket, path.Join(fdPath, object), "", false, response)
err := b.entryListR(_vfs, bucket, path.Join(fdPath, object), "", false, response)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit ad95a71

Please sign in to comment.