Skip to content

Commit

Permalink
feat(storage): add teestorage and pipestorage
Browse files Browse the repository at this point in the history
add utility functions similar to those found in stdlib io package
  • Loading branch information
hannahhoward committed Sep 25, 2023
1 parent 70e45bd commit 154344e
Show file tree
Hide file tree
Showing 3 changed files with 306 additions and 0 deletions.
7 changes: 7 additions & 0 deletions storage/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ type PeekableStorage interface {
Peek(ctx context.Context, key string) ([]byte, io.Closer, error)
}

// FinalizableStorage is a future-detection interface in which a storage implementation can be finalized,
// indicating any addition reads, writes, or function calls will fale. FinalizableStorage is analogous
// IPLD concept to io.Closer in the stdlib.
type FinalizableStorage interface {
Finalize() error
}

// the following are all hypothetical additional future interfaces (in varying degress of speculativeness):

// FUTURE: an EnumerableStorage API, that lets you list all keys present?
Expand Down
236 changes: 236 additions & 0 deletions storage/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package storage

import (
"context"
"errors"
"fmt"
"io"
"sync"
)

type storageUnit struct {
key string
data []byte
}

// onceError is an object that will only store an error once.
type onceError struct {
sync.Mutex // guards following
err error
}

func (a *onceError) Store(err error) {
a.Lock()
defer a.Unlock()
if a.err != nil {
return
}
a.err = err

Check warning on line 28 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L22-L28

Added lines #L22 - L28 were not covered by tests
}
func (a *onceError) Load() error {
a.Lock()
defer a.Unlock()
return a.err

Check warning on line 33 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L30-L33

Added lines #L30 - L33 were not covered by tests
}

// ErrMismatchedKey is the error used when read and write keys don't match
var ErrMismatchedKey = errors.New("put/get keys do not match")

// A pipeStorage is the shared pipeStorage structure underlying ReadablePipeStorage and WritablePipeStorage.
type pipeStorage struct {
wrMu sync.Mutex // Serializes Write operations
wrCh chan storageUnit
rdCh chan struct{}

once sync.Once // Protects closing done
done chan struct{}
rerr onceError
werr onceError
keysLk sync.RWMutex
keys map[string]struct{}
}

func (p *pipeStorage) has(ctx context.Context, key string) (bool, error) {
select {
case <-p.done:
return false, p.readableFinalizeError()
default:

Check warning on line 57 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L53-L57

Added lines #L53 - L57 were not covered by tests
}
p.keysLk.RLock()
defer p.keysLk.RUnlock()
_, ok := p.keys[key]
return ok, nil

Check warning on line 62 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L59-L62

Added lines #L59 - L62 were not covered by tests
}

func (p *pipeStorage) get(ctx context.Context, key string) ([]byte, error) {
select {
case <-p.done:
return nil, p.readableFinalizeError()
default:

Check warning on line 69 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L65-L69

Added lines #L65 - L69 were not covered by tests
}

select {
case su := <-p.wrCh:
p.rdCh <- struct{}{}
if su.key != key {
return nil, fmt.Errorf("%w: put %s, got %s", ErrMismatchedKey, su.key, key)
}
return su.data, nil
case <-ctx.Done():
return nil, ctx.Err()
case <-p.done:
return nil, p.readableFinalizeError()

Check warning on line 82 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L72-L82

Added lines #L72 - L82 were not covered by tests
}
}

func (p *pipeStorage) finalizeReadable(err error) error {
if err == nil {
err = io.ErrClosedPipe
}
p.rerr.Store(err)
p.once.Do(func() { close(p.done) })
return nil

Check warning on line 92 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L86-L92

Added lines #L86 - L92 were not covered by tests
}

func (p *pipeStorage) put(ctx context.Context, key string, b []byte) error {
select {
case <-p.done:
return p.writableFinalizeError()
default:
p.wrMu.Lock()
defer p.wrMu.Unlock()

Check warning on line 101 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L95-L101

Added lines #L95 - L101 were not covered by tests
}

select {
case p.wrCh <- storageUnit{key, b}:
p.keysLk.Lock()
p.keys[key] = struct{}{}
p.keysLk.Unlock()
<-p.rdCh
return nil
case <-ctx.Done():
return ctx.Err()
case <-p.done:
return p.writableFinalizeError()

Check warning on line 114 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L104-L114

Added lines #L104 - L114 were not covered by tests
}
}

func (p *pipeStorage) finalizeWritable(err error) error {
if err == nil {
err = io.EOF
}
p.werr.Store(err)
p.once.Do(func() { close(p.done) })
return nil

Check warning on line 124 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L118-L124

Added lines #L118 - L124 were not covered by tests
}

// readableFinalizeError is considered internal to the pipe type.
func (p *pipeStorage) readableFinalizeError() error {
rerr := p.rerr.Load()
if werr := p.werr.Load(); rerr == nil && werr != nil {
return werr
}
return io.ErrClosedPipe

Check warning on line 133 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L128-L133

Added lines #L128 - L133 were not covered by tests
}

// writableFinalizeError is considered internal to the pipe type.
func (p *pipeStorage) writableFinalizeError() error {
werr := p.werr.Load()
if rerr := p.rerr.Load(); werr == nil && rerr != nil {
return rerr
}
return io.ErrClosedPipe

Check warning on line 142 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L137-L142

Added lines #L137 - L142 were not covered by tests
}

// A ReadablePipeStorage is the read half of a pipe.
type ReadablePipeStorage struct {
p *pipeStorage
}

// Has implements the Storage interface
func (r *ReadablePipeStorage) Has(ctx context.Context, key string) (bool, error) {
return r.p.has(ctx, key)

Check warning on line 152 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L151-L152

Added lines #L151 - L152 were not covered by tests
}

// Get implements the ReadableStorage interface:
// it reads data from the pipe, blocking until a writer
// arrives or the write end is closed.
// If the write end is closed with an error, that error is
// returned as err; otherwise err is EOF.
func (r *ReadablePipeStorage) Get(ctx context.Context, key string) ([]byte, error) {
return r.p.get(ctx, key)

Check warning on line 161 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L160-L161

Added lines #L160 - L161 were not covered by tests
}

// Finalize closes the reader; subsequent writes to the
// write half of the pipe will return the error ErrClosedPipe.
func (r *ReadablePipeStorage) Finalize() error {
return r.FinalizeWithError(nil)

Check warning on line 167 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L166-L167

Added lines #L166 - L167 were not covered by tests
}

// FinalizeWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error err.
//
// FinalizeWithError never overwrites the previous error if it exists
// and always returns nil.
func (r *ReadablePipeStorage) FinalizeWithError(err error) error {
return r.p.finalizeReadable(err)

Check warning on line 176 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L175-L176

Added lines #L175 - L176 were not covered by tests
}

// A WritablePipeStorage is the write half of a pipe.
type WritablePipeStorage struct {
p *pipeStorage
}

// Has implements the Storage interface
func (w *WritablePipeStorage) Has(ctx context.Context, key string) (bool, error) {
return w.p.has(ctx, key)

Check warning on line 186 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L185-L186

Added lines #L185 - L186 were not covered by tests
}

// Put implements the standard Write interface:
// it writes data to the pipe, blocking until one or more readers
// have consumed all the data or the read end is closed.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is ErrClosedPipe.
func (w *WritablePipeStorage) Put(ctx context.Context, key string, data []byte) error {
return w.p.put(ctx, key, data)

Check warning on line 195 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L194-L195

Added lines #L194 - L195 were not covered by tests
}

// Finalize closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and EOF.
func (w *WritablePipeStorage) Finalize() error {
return w.FinalizeWithError(nil)

Check warning on line 201 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L200-L201

Added lines #L200 - L201 were not covered by tests
}

// FinalizeWithError closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and the error err,
// or EOF if err is nil.
//
// FinalizeWithError never overwrites the previous error if it exists
// and always returns nil.
func (w *WritablePipeStorage) FinalizeWithError(err error) error {
return w.p.finalizeWritable(err)

Check warning on line 211 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L210-L211

Added lines #L210 - L211 were not covered by tests
}

// PipeStorage creates a synchronous in-memory pipe.
// It can be used to connect code expecting an io.Reader
// with code expecting an io.Writer.
//
// Reads and Writes on the pipe are matched one to one
// except when multiple Reads are needed to consume a single Write.
// That is, each Write to the PipeWriter blocks until it has satisfied
// one or more Reads from the PipeReader that fully consume
// the written data.
// The data is copied directly from the Write to the corresponding
// Read (or Reads); there is no internal buffering.
//
// It is safe to call Read and Write in parallel with each other or with Close.
// Parallel calls to Read and parallel calls to Write are also safe:
// the individual calls will be gated sequentially.
func PipeStorage() (*ReadablePipeStorage, *WritablePipeStorage) {
p := &pipeStorage{
wrCh: make(chan storageUnit),
rdCh: make(chan struct{}),
done: make(chan struct{}),
}
return &ReadablePipeStorage{p}, &WritablePipeStorage{p}

Check warning on line 235 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L229-L235

Added lines #L229 - L235 were not covered by tests
}
63 changes: 63 additions & 0 deletions storage/teestorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package storage

import (
"context"
"io"
)

type teeStorage struct {
ReadableStorage
out WritableStorage
}

type teeReadCloser struct {
io.Reader
readCloser io.Closer
key string
writeClose func(string) error
}

func (trc teeReadCloser) Close() error {
err := trc.readCloser.Close()
if err != nil {
return err
}
err = trc.writeClose(trc.key)
if err != nil {
return err
}
return nil

Check warning on line 29 in storage/teestorage.go

View check run for this annotation

Codecov / codecov/patch

storage/teestorage.go#L20-L29

Added lines #L20 - L29 were not covered by tests
}

func (ts teeStorage) GetStream(ctx context.Context, key string) (io.ReadCloser, error) {
rdr, err := GetStream(ctx, ts.ReadableStorage, key)
if err != nil {
return nil, err
}
writer, committer, err := PutStream(ctx, ts.out)
if err != nil {
return nil, err
}
return teeReadCloser{
Reader: io.TeeReader(rdr, writer),
readCloser: rdr,
writeClose: committer,
key: key,
}, nil

Check warning on line 46 in storage/teestorage.go

View check run for this annotation

Codecov / codecov/patch

storage/teestorage.go#L32-L46

Added lines #L32 - L46 were not covered by tests
}

func (ts teeStorage) Get(ctx context.Context, key string) ([]byte, error) {
data, err := ts.ReadableStorage.Get(ctx, key)
if err != nil {
return nil, err
}
err = ts.out.Put(ctx, key, data)
return data, err

Check warning on line 55 in storage/teestorage.go

View check run for this annotation

Codecov / codecov/patch

storage/teestorage.go#L49-L55

Added lines #L49 - L55 were not covered by tests
}

func TeeStorage(in ReadableStorage, out WritableStorage) ReadableStorage {
return teeStorage{
ReadableStorage: in,
out: out,
}

Check warning on line 62 in storage/teestorage.go

View check run for this annotation

Codecov / codecov/patch

storage/teestorage.go#L58-L62

Added lines #L58 - L62 were not covered by tests
}

0 comments on commit 154344e

Please sign in to comment.