diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 788b47f0..2b0c16b7 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -7,7 +7,7 @@ jobs: strategy: matrix: os: [macOS-latest, ubuntu-latest] - goversion: [1.13, 1.14, 1.15] + goversion: [1.17] steps: - name: Set up Go ${{matrix.goversion}} on ${{matrix.os}} uses: actions/setup-go@v1 @@ -18,20 +18,11 @@ jobs: - name: Check out code into the Go module directory uses: actions/checkout@v1 - - name: gofmt - run: | - [[ -z $(gofmt -l $(find . -name '*.go') ) ]] - - name: Get dependencies env: GO111MODULE: on run: go mod download - - name: Vet - env: - GO111MODULE: on - run: go vet -mod=readonly ./... - - name: Test env: GO111MODULE: on diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml new file mode 100644 index 00000000..94938019 --- /dev/null +++ b/.github/workflows/golangci-lint.yml @@ -0,0 +1,46 @@ +name: golangci-lint +on: + push: + tags: + - v* + branches: + - master + - main + pull_request: +permissions: + contents: read + # Optional: allow read access to pull request. Use with `only-new-issues` option. + pull-requests: read +jobs: + golangci: + name: lint + runs-on: ubuntu-latest + steps: + - uses: actions/setup-go@v3 + with: + go-version: 1.17 + - uses: actions/checkout@v3 + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version + version: v1.29 + + # Optional: working directory, useful for monorepos + # working-directory: somedir + + # Optional: golangci-lint command line arguments. + # args: --issues-exit-code=0 + + # Optional: show only new issues if it's a pull request. The default value is `false`. + only-new-issues: true + + # Optional: if set to true then the all caching functionality will be complete disabled, + # takes precedence over all other caching options. + # skip-cache: true + + # Optional: if set to true then the action don't cache or restore ~/go/pkg. + # skip-pkg-cache: true + + # Optional: if set to true then the action don't cache or restore ~/.cache/go-build. + # skip-build-cache: true diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 00000000..89bdde5e --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,5 @@ +issues: + exclude-rules: + - linters: + - staticcheck + text: "SA1019:" diff --git a/codec.go b/codec.go index 53be4c8d..13639ec3 100644 --- a/codec.go +++ b/codec.go @@ -16,11 +16,13 @@ limitations under the License. package galaxycache +import "time" + // Codec includes both the BinaryMarshaler and BinaryUnmarshaler // interfaces type Codec interface { - MarshalBinary() ([]byte, error) - UnmarshalBinary(data []byte) error + MarshalBinary() ([]byte, time.Time, error) + UnmarshalBinary(data []byte, expire time.Time) error } // Note: to ensure that unmarshaling is a read-only operation, bytes @@ -32,48 +34,60 @@ func cloneBytes(b []byte) []byte { } // ByteCodec is a byte slice type that implements Codec -type ByteCodec []byte +type ByteCodec struct { + bytes []byte + expire time.Time +} // MarshalBinary on a ByteCodec returns the bytes -func (c *ByteCodec) MarshalBinary() ([]byte, error) { - return *c, nil +func (c *ByteCodec) MarshalBinary() ([]byte, time.Time, error) { + return c.bytes, c.expire, nil } // UnmarshalBinary on a ByteCodec sets the ByteCodec to // a copy of the provided data -func (c *ByteCodec) UnmarshalBinary(data []byte) error { - *c = cloneBytes(data) +func (c *ByteCodec) UnmarshalBinary(data []byte, expire time.Time) error { + c.bytes = cloneBytes(data) + c.expire = expire return nil } // CopyingByteCodec is a byte slice type that implements Codec // and returns a copy of the bytes when marshaled -type CopyingByteCodec []byte +type CopyingByteCodec struct { + bytes []byte + expire time.Time +} // MarshalBinary on a CopyingByteCodec returns a copy of the bytes -func (c *CopyingByteCodec) MarshalBinary() ([]byte, error) { - return cloneBytes(*c), nil +func (c *CopyingByteCodec) MarshalBinary() ([]byte, time.Time, error) { + return cloneBytes(c.bytes), c.expire, nil } // UnmarshalBinary on a CopyingByteCodec sets the ByteCodec to // a copy of the provided data -func (c *CopyingByteCodec) UnmarshalBinary(data []byte) error { - *c = cloneBytes(data) +func (c *CopyingByteCodec) UnmarshalBinary(data []byte, expire time.Time) error { + c.bytes = cloneBytes(data) + c.expire = expire return nil } // StringCodec is a string type that implements Codec -type StringCodec string +type StringCodec struct { + str string + expire time.Time +} // MarshalBinary on a StringCodec returns the bytes underlying // the string -func (c *StringCodec) MarshalBinary() ([]byte, error) { - return []byte(*c), nil +func (c *StringCodec) MarshalBinary() ([]byte, time.Time, error) { + return []byte(c.str), c.expire, nil } // UnmarshalBinary on a StringCodec sets the StringCodec to // a stringified copy of the provided data -func (c *StringCodec) UnmarshalBinary(data []byte) error { - *c = StringCodec(data) +func (c *StringCodec) UnmarshalBinary(data []byte, expire time.Time) error { + c.str = string(data) + c.expire = expire return nil } diff --git a/codec_test.go b/codec_test.go index 76342233..d29ddc40 100644 --- a/codec_test.go +++ b/codec_test.go @@ -19,6 +19,9 @@ package galaxycache import ( "bytes" "testing" + "time" + + "github.com/stretchr/testify/require" ) const testBytes = "some bytes" @@ -51,25 +54,31 @@ func TestCodec(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { inBytes := []byte(testBytes) - tc.codec.UnmarshalBinary(inBytes) + require.NoError(t, tc.codec.UnmarshalBinary(inBytes, time.Time{})) inBytes[0] = 'a' // change the original byte slice to ensure copy was made - marshaledBytes, err := tc.codec.MarshalBinary() + marshaledBytes, expTm, err := tc.codec.MarshalBinary() if err != nil { t.Errorf("Error marshaling from byteCodec: %s", err) } + if !expTm.Equal(time.Time{}) { + t.Errorf("Expected empty expiration time") + } if string(marshaledBytes) != testBytes { t.Errorf("Unmarshal/Marshal resulted in %q; want %q", marshaledBytes, testBytes) } if tc.checkCopy { marshaledBytes[0] = 'a' // change marshaled bytes to ensure full copy on marshal - secondMarshaledBytes, errM := tc.codec.MarshalBinary() + secondMarshaledBytes, expTm, errM := tc.codec.MarshalBinary() if errM != nil { t.Errorf("Error marshaling from byteCodec: %s", errM) } if bytes.Equal(marshaledBytes, secondMarshaledBytes) { t.Errorf("Marshaling did not copy the bytes") } + if !expTm.Equal(time.Time{}) { + t.Errorf("Expected empty expiration time") + } } }) } diff --git a/galaxycache.go b/galaxycache.go index 626db13d..ecd537db 100644 --- a/galaxycache.go +++ b/galaxycache.go @@ -400,6 +400,35 @@ func (g *Galaxy) recordRequest(ctx context.Context, h hitLevel, localAuthoritati } } +// GetMultiple is like Get but fetches multiple keys at once into the respective +// destinations (codecs). +func (g *Galaxy) GetMultiple(ctx context.Context, keys []string, destinations []Codec) error { + if len(keys) != len(destinations) { + return fmt.Errorf("number of keys vs. codecs doesn't match (%d vs. %d)", len(keys), len(destinations)) + } + ctx, tagErr := tag.New(ctx, tag.Upsert(GalaxyKey, g.name)) + if tagErr != nil { + return fmt.Errorf("Error tagging context: %s", tagErr) + } + + ctx, span := trace.StartSpan(ctx, "galaxycache.(*Galaxy).GetMultiple on "+g.name) + startTime := time.Now() + defer func() { + g.recordStats(ctx, nil, MRoundtripLatencyMilliseconds.M(sinceInMilliseconds(startTime))) + span.End() + }() + + g.Stats.Gets.Add(1) + g.recordStats(ctx, nil, MGets.M(1)) + + // TODO: + // Group each key by peer. + // Request all of those keys from that one peer concurrently. + // Try to load what is missing. + + return nil +} + // Get as defined here is the primary "get" called on a galaxy to // find the value for the given key, using the following logic: // - First, try the local cache; if its a cache hit, we're done @@ -436,7 +465,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { value.stats.touch() g.recordRequest(ctx, hlvl, false) g.recordStats(ctx, nil, MValueLength.M(int64(len(value.data)))) - return dest.UnmarshalBinary(value.data) + return dest.UnmarshalBinary(value.data, value.expire) } span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", false)}, "Cache miss") @@ -456,7 +485,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { if destPopulated { return nil } - return dest.UnmarshalBinary(value.data) + return dest.UnmarshalBinary(value.data, value.expire) } type valWithLevel struct { @@ -525,7 +554,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi // probably boring (normal task movement), so not // worth logging I imagine. } - data, err := g.getLocally(ctx, key, dest) + data, expTm, err := g.getLocally(ctx, key, dest) if err != nil { g.Stats.BackendLoadErrors.Add(1) g.recordStats(ctx, nil, MBackendLoadErrors.M(1)) @@ -535,7 +564,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi g.Stats.CoalescedBackendLoads.Add(1) g.recordStats(ctx, nil, MCoalescedBackendLoads.M(1)) destPopulated = true // only one caller of load gets this return value - value = newValWithStat(data, nil) + value = newValWithStat(data, nil, expTm) g.populateCache(ctx, key, value, &g.mainCache) return &valWithLevel{value, hitBackend, authoritative, peerErr, err}, nil }) @@ -548,22 +577,23 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi return } -func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte, error) { +func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte, time.Time, error) { err := g.getter.Get(ctx, key, dest) if err != nil { - return nil, err + return nil, time.Time{}, err } return dest.MarshalBinary() } func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string) (*valWithStat, error) { - data, err := peer.Fetch(ctx, g.name, key) + data, err := peer.Fetch(ctx, g.name, []string{key}) if err != nil { return nil, err } + expire := data[0].TTL vi, ok := g.candidateCache.get(key) if !ok { - vi = g.addNewToCandidateCache(key) + vi = g.addNewToCandidateCache(key, expire) } g.maybeUpdateHotCacheStats() // will update if at least a second has passed since the last update @@ -573,7 +603,7 @@ func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string KeyQPS: kStats.val(), HCStats: g.hcStatsWithTime.hcs, } - value := newValWithStat(data, kStats) + value := newValWithStat(data[0].Data, kStats, expire) if g.opts.promoter.ShouldPromote(key, value.data, stats) { g.populateCache(ctx, key, value, &g.hotCache) } @@ -627,7 +657,7 @@ func (g *Galaxy) populateCache(ctx context.Context, key string, value *valWithSt } func (g *Galaxy) recordStats(ctx context.Context, mutators []tag.Mutator, measurements ...stats.Measurement) { - stats.RecordWithOptions( + _ = stats.RecordWithOptions( ctx, stats.WithMeasurements(measurements...), stats.WithTags(mutators...), @@ -692,8 +722,9 @@ func (c *cache) stats() CacheStats { } type valWithStat struct { - data []byte - stats *keyStats + data []byte + stats *keyStats + expire time.Time } // sizeOfValWithStats returns the total size of the value in the hot/main @@ -704,13 +735,13 @@ func (v *valWithStat) size() int64 { return int64(unsafe.Sizeof(*v.stats)) + int64(len(v.data)) + int64(unsafe.Sizeof(v)) + int64(unsafe.Sizeof(*v)) } -func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats)) { +func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats, ttl time.Time)) { c.lru.OnEvicted = func(key lru.Key, value interface{}) { val := value.(*valWithStat) c.nbytes -= int64(len(key.(string))) + val.size() c.nevict++ if f != nil { - f(key.(string), val.stats) + f(key.(string), val.stats, val.expire) } } } @@ -718,7 +749,7 @@ func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats)) { func (c *cache) add(key string, value *valWithStat) { c.mu.Lock() defer c.mu.Unlock() - c.lru.Add(key, value) + c.lru.Add(key, value, value.expire) c.nbytes += int64(len(key)) + value.size() } diff --git a/galaxycache_test.go b/galaxycache_test.go index fac06279..0aff2da5 100644 --- a/galaxycache_test.go +++ b/galaxycache_test.go @@ -31,6 +31,7 @@ import ( "time" "unsafe" + "github.com/stretchr/testify/require" "github.com/vimeo/galaxycache/promoter" "go.opencensus.io/stats/view" "go.opencensus.io/tag" @@ -54,7 +55,7 @@ func setupStringGalaxyTest(cacheFills *AtomicInt) (*Galaxy, context.Context, cha } cacheFills.Add(1) str := "ECHO:" + key - return dest.UnmarshalBinary([]byte(str)) + return dest.UnmarshalBinary([]byte(str), time.Now().Add(5*time.Minute)) })) return stringGalaxy, ctx, stringc } @@ -75,7 +76,13 @@ func TestGetDupSuppress(t *testing.T) { resc <- "ERROR:" + err.Error() return } - resc <- string(s) + + ret, _, err := s.MarshalBinary() + if err != nil { + resc <- "ERROR MARSHAL: " + err.Error() + return + } + resc <- string(ret) }() } @@ -149,8 +156,13 @@ func TestCacheEviction(t *testing.T) { for bytesFlooded < cacheSize+1024 { var res StringCodec key := fmt.Sprintf("dummy-key-%d", bytesFlooded) - stringGalaxy.Get(ctx, key, &res) - bytesFlooded += int64(len(key) + len(res)) + require.NoError(t, stringGalaxy.Get(ctx, key, &res)) + + ret, _, err := res.MarshalBinary() + if err != nil { + t.Fatalf("marshaling binary: %v", err.Error()) + } + bytesFlooded += int64(len(key) + len(ret)) } evicts := stringGalaxy.mainCache.nevict - evict0 if evicts <= 0 { @@ -177,14 +189,17 @@ func (fetcher *TestFetcher) Close() error { return nil } -type testFetchers []RemoteFetcher - -func (fetcher *TestFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]byte, error) { +func (fetcher *TestFetcher) Fetch(ctx context.Context, galaxy string, keys []string) ([]ValueWithTTL, error) { if fetcher.fail { - return nil, errors.New("simulated error from peer") + return []ValueWithTTL{}, errors.New("simulated error from peer") } fetcher.hits++ - return []byte("got:" + key), nil + return []ValueWithTTL{ + { + Data: []byte("got:" + keys[0]), + TTL: time.Time{}, + }, + }, nil } func (proto *TestProtocol) NewFetcher(url string) (RemoteFetcher, error) { @@ -266,12 +281,12 @@ func TestPeers(t *testing.T) { universe := NewUniverseWithOpts(testproto, "fetcher0", hashOpts) dummyCtx := context.TODO() - universe.Set("fetcher0", "fetcher1", "fetcher2", "fetcher3") + require.NoError(t, universe.Set("fetcher0", "fetcher1", "fetcher2", "fetcher3")) getter := func(_ context.Context, key string, dest Codec) error { // these are local hits testproto.TestFetchers["fetcher0"].hits++ - return dest.UnmarshalBinary([]byte("got:" + key)) + return dest.UnmarshalBinary([]byte("got:"+key), time.Now().Add(5*time.Minute)) } testGalaxy := universe.NewGalaxy("TestPeers-galaxy", tc.cacheSize, GetterFunc(getter), WithPromoter(&promoter.ProbabilisticPromoter{ProbDenominator: 10})) @@ -293,8 +308,14 @@ func TestPeers(t *testing.T) { t.Errorf("%s: error on key %q: %v", tc.testName, key, err) continue } - if string(got) != want { - t.Errorf("%s: for key %q, got %q; want %q", tc.testName, key, got, want) + + ret, _, err := got.MarshalBinary() + if err != nil { + t.Errorf("%s: error marshaling on key %q: %v", tc.testName, key, err) + continue + } + if string(ret) != want { + t.Errorf("%s: for key %q, got %q; want %q", tc.testName, key, ret, want) } } for name, fetcher := range testproto.TestFetchers { @@ -335,7 +356,7 @@ func TestNoDedup(t *testing.T) { const testkey = "testkey" const testval = "testval" g := universe.NewGalaxy("testgalaxy", 1024, GetterFunc(func(_ context.Context, key string, dest Codec) error { - return dest.UnmarshalBinary([]byte(testval)) + return dest.UnmarshalBinary([]byte(testval), time.Now().Add(5*time.Minute)) })) orderedGroup := &orderedFlightGroup{ @@ -359,7 +380,13 @@ func TestNoDedup(t *testing.T) { resc <- "ERROR:" + err.Error() return } - resc <- string(s) + + ret, _, err := s.MarshalBinary() + if err != nil { + resc <- "ERROR MARSHAL:" + err.Error() + return + } + resc <- string(ret) }() } @@ -386,7 +413,7 @@ func TestNoDedup(t *testing.T) { // upon entry, we would increment nbytes twice but the entry would // only be in the cache once. testKStats := keyStats{dQPS: dampedQPS{period: time.Second}} - testvws := newValWithStat([]byte(testval), &testKStats) + testvws := newValWithStat([]byte(testval), &testKStats, time.Now().Add(1*time.Second)) wantBytes := int64(len(testkey)) + testvws.size() if g.mainCache.nbytes != wantBytes { t.Errorf("cache has %d bytes, want %d", g.mainCache.nbytes, wantBytes) @@ -454,14 +481,14 @@ func TestHotcache(t *testing.T) { t.Run(tc.name, func(t *testing.T) { u := NewUniverse(&TestProtocol{}, "test-universe") g := u.NewGalaxy("test-galaxy", 1<<20, GetterFunc(func(_ context.Context, key string, dest Codec) error { - return dest.UnmarshalBinary([]byte("hello")) + return dest.UnmarshalBinary([]byte("hello"), time.Now().Add(5*time.Minute)) })) kStats := &keyStats{ dQPS: dampedQPS{ period: time.Second, }, } - value := newValWithStat([]byte("hello"), kStats) + value := newValWithStat([]byte("hello"), kStats, time.Now().Add(1*time.Second)) g.hotCache.add(keyToAdd, value) now := time.Now() // blast the key in the hotcache with a bunch of hypothetical gets every few seconds @@ -476,7 +503,7 @@ func TestHotcache(t *testing.T) { if math.Abs(val-tc.expectedBurstQPS) > val/100 { // ensure less than %1 error t.Errorf("QPS after bursts: %f, Wanted: %f", val, tc.expectedBurstQPS) } - value2 := newValWithStat([]byte("hello there"), nil) + value2 := newValWithStat([]byte("hello there"), nil, time.Now().Add(1*time.Second)) g.hotCache.add(keyToAdd+"2", value2) // ensure that hcStats are properly updated after adding g.maybeUpdateHotCacheStats() @@ -551,8 +578,9 @@ func TestPromotion(t *testing.T) { if okHot { t.Error("Found candidate in hotcache") } - g.getFromPeer(ctx, tf, key) - val, okHot := g.hotCache.get(key) + _, err := g.getFromPeer(ctx, tf, key) + require.NoError(t, err) + val, _ := g.hotCache.get(key) if string(val.(*valWithStat).data) != "got:"+testKey { t.Error("Did not promote from candidacy") } @@ -568,11 +596,12 @@ func TestPromotion(t *testing.T) { fetcher := &TestFetcher{} testProto := &TestProtocol{} getter := func(_ context.Context, key string, dest Codec) error { - return dest.UnmarshalBinary([]byte("got:" + key)) + return dest.UnmarshalBinary([]byte("got:"+key), time.Now().Add(5*time.Minute)) } universe := NewUniverse(testProto, "promotion-test") galaxy := universe.NewGalaxy("test-galaxy", tc.cacheSize, GetterFunc(getter), WithPromoter(tc.promoter)) - galaxy.getFromPeer(ctx, fetcher, testKey) + _, err := galaxy.getFromPeer(ctx, fetcher, testKey) + require.NoError(t, err) _, okCandidate := galaxy.candidateCache.get(testKey) value, okHot := galaxy.hotCache.get(testKey) tc.checkCache(ctx, t, testKey, value, okCandidate, okHot, fetcher, galaxy) @@ -591,10 +620,10 @@ func TestRecorder(t *testing.T) { TagKeys: []tag.Key{GalaxyKey}, Aggregation: view.Count(), } - meter.Register(testView) + require.NoError(t, meter.Register(testView)) getter := func(_ context.Context, key string, dest Codec) error { - return dest.UnmarshalBinary([]byte("got:" + key)) + return dest.UnmarshalBinary([]byte("got:"+key), time.Now().Add(5*time.Minute)) } u := NewUniverse(&TestProtocol{}, "test-universe", WithRecorder(meter)) g := u.NewGalaxy("test", 1024, GetterFunc(getter)) diff --git a/galaxycachepb/galaxycache.pb.go b/galaxycachepb/galaxycache.pb.go index 116e3d79..8d42a5f5 100644 --- a/galaxycachepb/galaxycache.pb.go +++ b/galaxycachepb/galaxycache.pb.go @@ -1,212 +1,266 @@ +// +//Copyright 2012 Google Inc. +// +//Licensed under the Apache License, Version 2.0 (the "License"); +//you may not use this file except in compliance with the License. +//You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, software +//distributed under the License is distributed on an "AS IS" BASIS, +//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//See the License for the specific language governing permissions and +//limitations under the License. + // Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.6.1 // source: galaxycache.proto package galaxycachepb import ( - context "context" - fmt "fmt" - proto "github.com/golang/protobuf/proto" - grpc "google.golang.org/grpc" - math "math" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) type GetRequest struct { - Galaxy string `protobuf:"bytes,1,opt,name=galaxy,proto3" json:"galaxy,omitempty"` - Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields -func (m *GetRequest) Reset() { *m = GetRequest{} } -func (m *GetRequest) String() string { return proto.CompactTextString(m) } -func (*GetRequest) ProtoMessage() {} -func (*GetRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_23bd509ca7b74957, []int{0} + Galaxy string `protobuf:"bytes,1,opt,name=galaxy,proto3" json:"galaxy,omitempty"` + // Deprecated: please use the keys field. + Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` // not actually required/guaranteed to be UTF-8 + Keys []string `protobuf:"bytes,3,rep,name=keys,proto3" json:"keys,omitempty"` } -func (m *GetRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_GetRequest.Unmarshal(m, b) -} -func (m *GetRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_GetRequest.Marshal(b, m, deterministic) -} -func (m *GetRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_GetRequest.Merge(m, src) +func (x *GetRequest) Reset() { + *x = GetRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_galaxycache_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } } -func (m *GetRequest) XXX_Size() int { - return xxx_messageInfo_GetRequest.Size(m) + +func (x *GetRequest) String() string { + return protoimpl.X.MessageStringOf(x) } -func (m *GetRequest) XXX_DiscardUnknown() { - xxx_messageInfo_GetRequest.DiscardUnknown(m) + +func (*GetRequest) ProtoMessage() {} + +func (x *GetRequest) ProtoReflect() protoreflect.Message { + mi := &file_galaxycache_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) } -var xxx_messageInfo_GetRequest proto.InternalMessageInfo +// Deprecated: Use GetRequest.ProtoReflect.Descriptor instead. +func (*GetRequest) Descriptor() ([]byte, []int) { + return file_galaxycache_proto_rawDescGZIP(), []int{0} +} -func (m *GetRequest) GetGalaxy() string { - if m != nil { - return m.Galaxy +func (x *GetRequest) GetGalaxy() string { + if x != nil { + return x.Galaxy } return "" } -func (m *GetRequest) GetKey() string { - if m != nil { - return m.Key +func (x *GetRequest) GetKey() string { + if x != nil { + return x.Key } return "" } -type GetResponse struct { - Value []byte `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` - MinuteQps float64 `protobuf:"fixed64,2,opt,name=minute_qps,json=minuteQps,proto3" json:"minute_qps,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` +func (x *GetRequest) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil } -func (m *GetResponse) Reset() { *m = GetResponse{} } -func (m *GetResponse) String() string { return proto.CompactTextString(m) } -func (*GetResponse) ProtoMessage() {} -func (*GetResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_23bd509ca7b74957, []int{1} -} +type GetResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields -func (m *GetResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_GetResponse.Unmarshal(m, b) -} -func (m *GetResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_GetResponse.Marshal(b, m, deterministic) + Value []byte `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` + MinuteQps float64 `protobuf:"fixed64,2,opt,name=minute_qps,json=minuteQps,proto3" json:"minute_qps,omitempty"` + Expire int64 `protobuf:"varint,3,opt,name=expire,proto3" json:"expire,omitempty"` } -func (m *GetResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_GetResponse.Merge(m, src) + +func (x *GetResponse) Reset() { + *x = GetResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_galaxycache_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } } -func (m *GetResponse) XXX_Size() int { - return xxx_messageInfo_GetResponse.Size(m) + +func (x *GetResponse) String() string { + return protoimpl.X.MessageStringOf(x) } -func (m *GetResponse) XXX_DiscardUnknown() { - xxx_messageInfo_GetResponse.DiscardUnknown(m) + +func (*GetResponse) ProtoMessage() {} + +func (x *GetResponse) ProtoReflect() protoreflect.Message { + mi := &file_galaxycache_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) } -var xxx_messageInfo_GetResponse proto.InternalMessageInfo +// Deprecated: Use GetResponse.ProtoReflect.Descriptor instead. +func (*GetResponse) Descriptor() ([]byte, []int) { + return file_galaxycache_proto_rawDescGZIP(), []int{1} +} -func (m *GetResponse) GetValue() []byte { - if m != nil { - return m.Value +func (x *GetResponse) GetValue() []byte { + if x != nil { + return x.Value } return nil } -func (m *GetResponse) GetMinuteQps() float64 { - if m != nil { - return m.MinuteQps +func (x *GetResponse) GetMinuteQps() float64 { + if x != nil { + return x.MinuteQps } return 0 } -func init() { - proto.RegisterType((*GetRequest)(nil), "galaxycachepb.GetRequest") - proto.RegisterType((*GetResponse)(nil), "galaxycachepb.GetResponse") +func (x *GetResponse) GetExpire() int64 { + if x != nil { + return x.Expire + } + return 0 } -func init() { proto.RegisterFile("galaxycache.proto", fileDescriptor_23bd509ca7b74957) } +var File_galaxycache_proto protoreflect.FileDescriptor -var fileDescriptor_23bd509ca7b74957 = []byte{ - // 186 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4c, 0x4f, 0xcc, 0x49, - 0xac, 0xa8, 0x4c, 0x4e, 0x4c, 0xce, 0x48, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x45, - 0x12, 0x2a, 0x48, 0x52, 0x32, 0xe3, 0xe2, 0x72, 0x4f, 0x2d, 0x09, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, - 0x2e, 0x11, 0x12, 0xe3, 0x62, 0x83, 0x48, 0x4b, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x41, 0x79, - 0x42, 0x02, 0x5c, 0xcc, 0xd9, 0xa9, 0x95, 0x12, 0x4c, 0x60, 0x41, 0x10, 0x53, 0xc9, 0x89, 0x8b, - 0x1b, 0xac, 0xaf, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x55, 0x48, 0x84, 0x8b, 0xb5, 0x2c, 0x31, 0xa7, - 0x34, 0x15, 0xac, 0x8f, 0x27, 0x08, 0xc2, 0x11, 0x92, 0xe5, 0xe2, 0xca, 0xcd, 0xcc, 0x2b, 0x2d, - 0x49, 0x8d, 0x2f, 0x2c, 0x28, 0x06, 0xeb, 0x66, 0x0c, 0xe2, 0x84, 0x88, 0x04, 0x16, 0x14, 0x1b, - 0x85, 0x72, 0x71, 0xbb, 0x83, 0xcd, 0x77, 0x06, 0x39, 0x46, 0xc8, 0x0d, 0x6c, 0xa4, 0x5b, 0x51, - 0x7e, 0x6e, 0x40, 0x6a, 0x6a, 0x91, 0x90, 0xa4, 0x1e, 0x8a, 0x4b, 0xf5, 0x10, 0xce, 0x94, 0x92, - 0xc2, 0x26, 0x05, 0x71, 0x89, 0x12, 0x43, 0x12, 0x1b, 0xd8, 0xa3, 0xc6, 0x80, 0x00, 0x00, 0x00, - 0xff, 0xff, 0x84, 0x48, 0x7a, 0xc8, 0xfd, 0x00, 0x00, 0x00, +var file_galaxycache_proto_rawDesc = []byte{ + 0x0a, 0x11, 0x67, 0x61, 0x6c, 0x61, 0x78, 0x79, 0x63, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x12, 0x0d, 0x67, 0x61, 0x6c, 0x61, 0x78, 0x79, 0x63, 0x61, 0x63, 0x68, 0x65, + 0x70, 0x62, 0x22, 0x4a, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x16, 0x0a, 0x06, 0x67, 0x61, 0x6c, 0x61, 0x78, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x67, 0x61, 0x6c, 0x61, 0x78, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, + 0x79, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x22, 0x5a, + 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x6d, 0x69, 0x6e, 0x75, 0x74, 0x65, 0x5f, 0x71, 0x70, + 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x01, 0x52, 0x09, 0x6d, 0x69, 0x6e, 0x75, 0x74, 0x65, 0x51, + 0x70, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x03, 0x52, 0x06, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x32, 0x55, 0x0a, 0x0b, 0x47, 0x61, + 0x6c, 0x61, 0x78, 0x79, 0x43, 0x61, 0x63, 0x68, 0x65, 0x12, 0x46, 0x0a, 0x0b, 0x47, 0x65, 0x74, + 0x46, 0x72, 0x6f, 0x6d, 0x50, 0x65, 0x65, 0x72, 0x12, 0x19, 0x2e, 0x67, 0x61, 0x6c, 0x61, 0x78, + 0x79, 0x63, 0x61, 0x63, 0x68, 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x67, 0x61, 0x6c, 0x61, 0x78, 0x79, 0x63, 0x61, 0x63, 0x68, + 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x42, 0x11, 0x5a, 0x0f, 0x2e, 0x2f, 0x67, 0x61, 0x6c, 0x61, 0x78, 0x79, 0x63, 0x61, 0x63, + 0x68, 0x65, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// GalaxyCacheClient is the client API for GalaxyCache service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type GalaxyCacheClient interface { - GetFromPeer(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetResponse, error) -} - -type galaxyCacheClient struct { - cc *grpc.ClientConn -} - -func NewGalaxyCacheClient(cc *grpc.ClientConn) GalaxyCacheClient { - return &galaxyCacheClient{cc} -} +var ( + file_galaxycache_proto_rawDescOnce sync.Once + file_galaxycache_proto_rawDescData = file_galaxycache_proto_rawDesc +) -func (c *galaxyCacheClient) GetFromPeer(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetResponse, error) { - out := new(GetResponse) - err := c.cc.Invoke(ctx, "/galaxycachepb.GalaxyCache/GetFromPeer", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil +func file_galaxycache_proto_rawDescGZIP() []byte { + file_galaxycache_proto_rawDescOnce.Do(func() { + file_galaxycache_proto_rawDescData = protoimpl.X.CompressGZIP(file_galaxycache_proto_rawDescData) + }) + return file_galaxycache_proto_rawDescData } -// GalaxyCacheServer is the server API for GalaxyCache service. -type GalaxyCacheServer interface { - GetFromPeer(context.Context, *GetRequest) (*GetResponse, error) +var file_galaxycache_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_galaxycache_proto_goTypes = []interface{}{ + (*GetRequest)(nil), // 0: galaxycachepb.GetRequest + (*GetResponse)(nil), // 1: galaxycachepb.GetResponse } - -func RegisterGalaxyCacheServer(s *grpc.Server, srv GalaxyCacheServer) { - s.RegisterService(&_GalaxyCache_serviceDesc, srv) +var file_galaxycache_proto_depIdxs = []int32{ + 0, // 0: galaxycachepb.GalaxyCache.GetFromPeer:input_type -> galaxycachepb.GetRequest + 1, // 1: galaxycachepb.GalaxyCache.GetFromPeer:output_type -> galaxycachepb.GetResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name } -func _GalaxyCache_GetFromPeer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(GetRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(GalaxyCacheServer).GetFromPeer(ctx, in) +func init() { file_galaxycache_proto_init() } +func file_galaxycache_proto_init() { + if File_galaxycache_proto != nil { + return } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/galaxycachepb.GalaxyCache/GetFromPeer", + if !protoimpl.UnsafeEnabled { + file_galaxycache_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_galaxycache_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(GalaxyCacheServer).GetFromPeer(ctx, req.(*GetRequest)) - } - return interceptor(ctx, in, info, handler) -} - -var _GalaxyCache_serviceDesc = grpc.ServiceDesc{ - ServiceName: "galaxycachepb.GalaxyCache", - HandlerType: (*GalaxyCacheServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "GetFromPeer", - Handler: _GalaxyCache_GetFromPeer_Handler, + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_galaxycache_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "galaxycache.proto", + GoTypes: file_galaxycache_proto_goTypes, + DependencyIndexes: file_galaxycache_proto_depIdxs, + MessageInfos: file_galaxycache_proto_msgTypes, + }.Build() + File_galaxycache_proto = out.File + file_galaxycache_proto_rawDesc = nil + file_galaxycache_proto_goTypes = nil + file_galaxycache_proto_depIdxs = nil } diff --git a/galaxycachepb/galaxycache.proto b/galaxycachepb/galaxycache.proto index a30fb340..04d4b053 100644 --- a/galaxycachepb/galaxycache.proto +++ b/galaxycachepb/galaxycache.proto @@ -17,17 +17,22 @@ limitations under the License. syntax = "proto3"; package galaxycachepb; +option go_package = "./galaxycachepb"; + message GetRequest { string galaxy = 1; + // Deprecated: please use the keys field. `key` has precendence over `keys`. string key = 2; // not actually required/guaranteed to be UTF-8 + repeated string keys = 3; } message GetResponse { bytes value = 1; double minute_qps = 2; + int64 expire = 3; } service GalaxyCache { rpc GetFromPeer(GetRequest) returns (GetResponse) {} -} \ No newline at end of file +} diff --git a/galaxycachepb/galaxycache_grpc.pb.go b/galaxycachepb/galaxycache_grpc.pb.go new file mode 100644 index 00000000..3719ccef --- /dev/null +++ b/galaxycachepb/galaxycache_grpc.pb.go @@ -0,0 +1,101 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package galaxycachepb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// GalaxyCacheClient is the client API for GalaxyCache service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type GalaxyCacheClient interface { + GetFromPeer(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetResponse, error) +} + +type galaxyCacheClient struct { + cc grpc.ClientConnInterface +} + +func NewGalaxyCacheClient(cc grpc.ClientConnInterface) GalaxyCacheClient { + return &galaxyCacheClient{cc} +} + +func (c *galaxyCacheClient) GetFromPeer(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetResponse, error) { + out := new(GetResponse) + err := c.cc.Invoke(ctx, "/galaxycachepb.GalaxyCache/GetFromPeer", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// GalaxyCacheServer is the server API for GalaxyCache service. +// All implementations must embed UnimplementedGalaxyCacheServer +// for forward compatibility +type GalaxyCacheServer interface { + GetFromPeer(context.Context, *GetRequest) (*GetResponse, error) + mustEmbedUnimplementedGalaxyCacheServer() +} + +// UnimplementedGalaxyCacheServer must be embedded to have forward compatible implementations. +type UnimplementedGalaxyCacheServer struct { +} + +func (UnimplementedGalaxyCacheServer) GetFromPeer(context.Context, *GetRequest) (*GetResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetFromPeer not implemented") +} +func (UnimplementedGalaxyCacheServer) mustEmbedUnimplementedGalaxyCacheServer() {} + +// UnsafeGalaxyCacheServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to GalaxyCacheServer will +// result in compilation errors. +type UnsafeGalaxyCacheServer interface { + mustEmbedUnimplementedGalaxyCacheServer() +} + +func RegisterGalaxyCacheServer(s grpc.ServiceRegistrar, srv GalaxyCacheServer) { + s.RegisterService(&GalaxyCache_ServiceDesc, srv) +} + +func _GalaxyCache_GetFromPeer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(GalaxyCacheServer).GetFromPeer(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/galaxycachepb.GalaxyCache/GetFromPeer", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(GalaxyCacheServer).GetFromPeer(ctx, req.(*GetRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// GalaxyCache_ServiceDesc is the grpc.ServiceDesc for GalaxyCache service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var GalaxyCache_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "galaxycachepb.GalaxyCache", + HandlerType: (*GalaxyCacheServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetFromPeer", + Handler: _GalaxyCache_GetFromPeer_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "galaxycache.proto", +} diff --git a/go.mod b/go.mod index 12af9092..16977b14 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,9 @@ go 1.12 require ( github.com/golang/protobuf v1.4.3 + github.com/stretchr/testify v1.7.4 go.opencensus.io v0.22.5 golang.org/x/net v0.0.0-20210119194325-5f4716e94777 // indirect google.golang.org/grpc v1.35.0 + google.golang.org/protobuf v1.25.0 ) diff --git a/go.sum b/go.sum index 8f329b03..4dc870b1 100644 --- a/go.sum +++ b/go.sum @@ -4,17 +4,19 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -27,30 +29,46 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.4 h1:wZRexSlwd7ZXfKINDLsO4r7WBt3gTKONc6K/VesHvHM= +github.com/stretchr/testify v1.7.4/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210119194325-5f4716e94777 h1:003p0dJM77cxMSyCPFphvZf/Y5/NXf5fzg6ufd1/Oew= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -58,10 +76,13 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd h1:r7DufRZuZbWB7j439YfAzP8RPDa9unLkpwQKUYbIMPI= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -73,18 +94,22 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb h1:i1Ppqkc3WQXikh8bXiwHqAN5Rv3/qDCcRk0/Otx73BY= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= @@ -103,5 +128,8 @@ google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4 google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/grpc/grpc_test.go b/grpc/grpc_test.go index 625aee7e..5b791f6b 100644 --- a/grpc/grpc_test.go +++ b/grpc/grpc_test.go @@ -24,6 +24,7 @@ import ( "strconv" "sync" "testing" + "time" gc "github.com/vimeo/galaxycache" @@ -79,10 +80,15 @@ func TestGRPCPeerServer(t *testing.T) { if err := g.Get(ctx, key, &value); err != nil { t.Fatal(err) } - if string(value) != ":"+key { - t.Errorf("Unexpected value: Get(%q) = %q, expected %q", key, value, ":"+key) + + ret, _, err := value.MarshalBinary() + if err != nil { + t.Fatal(err) + } + if string(ret) != ":"+key { + t.Errorf("Unexpected value: Get(%q) = %q, expected %q", key, ret, ":"+key) } - t.Logf("Get key=%q, value=%q (peer:key)", key, value) + t.Logf("Get key=%q, value=%q (peer:key)", key, ret) } cancel() wg.Wait() @@ -104,8 +110,7 @@ func runTestPeerGRPCServer(ctx context.Context, t testing.TB, addresses []string } getter := gc.GetterFunc(func(ctx context.Context, key string, dest gc.Codec) error { - dest.UnmarshalBinary([]byte(":" + key)) - return nil + return dest.UnmarshalBinary([]byte(":"+key), time.Now().Add(5*time.Minute)) }) universe.NewGalaxy("peerFetchTest", 1<<20, getter) wg.Add(1) diff --git a/grpc/grpcclient.go b/grpc/grpcclient.go index d39f4c94..78982c71 100644 --- a/grpc/grpcclient.go +++ b/grpc/grpcclient.go @@ -18,7 +18,9 @@ package grpc import ( "context" + "time" + "github.com/vimeo/galaxycache" gc "github.com/vimeo/galaxycache" pb "github.com/vimeo/galaxycache/galaxycachepb" "go.opencensus.io/plugin/ocgrpc" @@ -70,18 +72,18 @@ func (gp *GRPCFetchProtocol) NewFetcher(address string) (gc.RemoteFetcher, error // Fetch here implements the RemoteFetcher interface for // sending Gets to peers over an RPC connection -func (g *grpcFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]byte, error) { +func (g *grpcFetcher) Fetch(ctx context.Context, galaxy string, keys []string) ([]galaxycache.ValueWithTTL, error) { span := trace.FromContext(ctx) span.Annotatef(nil, "fetching from %s; connection state %s", g.address, g.conn.GetState()) resp, err := g.client.GetFromPeer(ctx, &pb.GetRequest{ Galaxy: galaxy, - Key: key, + Keys: keys, }) if err != nil { - return nil, status.Errorf(status.Code(err), "Failed to fetch from peer over RPC [%q, %q]: %s", galaxy, g.address, err) + return []galaxycache.ValueWithTTL{}, status.Errorf(status.Code(err), "Failed to fetch from peer over RPC [%q, %q]: %s", galaxy, g.address, err) } - return resp.Value, nil + return []galaxycache.ValueWithTTL{{Data: resp.Value, TTL: time.UnixMilli(resp.Expire)}}, nil } // Close here implements the RemoteFetcher interface for diff --git a/grpc/grpcserver.go b/grpc/grpcserver.go index f4b8faf9..697341cb 100644 --- a/grpc/grpcserver.go +++ b/grpc/grpcserver.go @@ -20,6 +20,7 @@ import ( "context" gc "github.com/vimeo/galaxycache" + "github.com/vimeo/galaxycache/galaxycachepb" pb "github.com/vimeo/galaxycache/galaxycachepb" "google.golang.org/grpc" @@ -31,6 +32,7 @@ import ( // interface generated by the GalaxyCache pb service type serviceImpl struct { universe *gc.Universe + galaxycachepb.UnimplementedGalaxyCacheServer } // RegisterGRPCServer registers the given grpc.Server with @@ -53,10 +55,23 @@ func (gp *serviceImpl) GetFromPeer(ctx context.Context, req *pb.GetRequest) (*pb galaxy.Stats.ServerRequests.Add(1) // keep track of the num of req var value unsafeByteCodec - err := galaxy.Get(ctx, req.Key, &value) + + var keys []string + //lint:ignore SA1019 Remove usage of req.Key once it is removed from the gRPC API. + if len(req.Key) > 0 { + keys = []string{req.Key} + } else { + keys = req.Keys + } + err := galaxy.Get(ctx, keys[0], &value) if err != nil { return nil, status.Errorf(status.Code(err), "Failed to retrieve [%s]: %v", req, err) } - return &pb.GetResponse{Value: value}, nil + ret, expTm, err := value.MarshalBinary() + if err != nil { + return nil, status.Errorf(status.Code(err), "Failed to marshal [%s]: %v", req, err) + } + + return &pb.GetResponse{Value: ret, Expire: expTm.UnixMilli()}, nil } diff --git a/grpc/unsafe_byte_codec.go b/grpc/unsafe_byte_codec.go index 1ff1d7d0..c215cd76 100644 --- a/grpc/unsafe_byte_codec.go +++ b/grpc/unsafe_byte_codec.go @@ -1,17 +1,23 @@ package grpc +import "time" + // unsafeByteCodec is a byte slice type that implements Codec -type unsafeByteCodec []byte +type unsafeByteCodec struct { + bytes []byte + expire time.Time +} // MarshalBinary returns the contained byte-slice -func (c *unsafeByteCodec) MarshalBinary() ([]byte, error) { - return *c, nil +func (c *unsafeByteCodec) MarshalBinary() ([]byte, time.Time, error) { + return c.bytes, c.expire, nil } // UnmarshalBinary to provided data so they share the same backing array // this is a generally unsafe performance optimization, but safe in the context // of the gRPC server. -func (c *unsafeByteCodec) UnmarshalBinary(data []byte) error { - *c = data +func (c *unsafeByteCodec) UnmarshalBinary(data []byte, expire time.Time) error { + c.bytes = data + c.expire = expire return nil } diff --git a/hotcache.go b/hotcache.go index 9521734a..f7a9de42 100644 --- a/hotcache.go +++ b/hotcache.go @@ -57,14 +57,15 @@ type keyStats struct { dQPS dampedQPS } -func newValWithStat(data []byte, kStats *keyStats) *valWithStat { +func newValWithStat(data []byte, kStats *keyStats, expire time.Time) *valWithStat { if kStats == nil { kStats = &keyStats{dampedQPS{period: time.Second}} } return &valWithStat{ - data: data, - stats: kStats, + data: data, + stats: kStats, + expire: expire, } } @@ -126,19 +127,19 @@ func (d *dampedQPS) val(now time.Time) float64 { return d.curDQPS } -func (g *Galaxy) addNewToCandidateCache(key string) *keyStats { +func (g *Galaxy) addNewToCandidateCache(key string, expire time.Time) *keyStats { kStats := &keyStats{ dQPS: dampedQPS{ period: time.Second, }, } - g.candidateCache.addToCandidateCache(key, kStats) + g.candidateCache.addToCandidateCache(key, kStats, expire) return kStats } -func (c *cache) addToCandidateCache(key string, kStats *keyStats) { +func (c *cache) addToCandidateCache(key string, kStats *keyStats, expire time.Time) { c.mu.Lock() defer c.mu.Unlock() - c.lru.Add(key, kStats) + c.lru.Add(key, kStats, expire) } diff --git a/http/http.go b/http/http.go index 928dad0d..b04abdd0 100644 --- a/http/http.go +++ b/http/http.go @@ -19,11 +19,18 @@ package http import ( "context" "fmt" + "io" "io/ioutil" + "mime" + "mime/multipart" "net/http" + "net/textproto" "net/url" + "strconv" "strings" + "time" + "github.com/vimeo/galaxycache" gc "github.com/vimeo/galaxycache" "go.opencensus.io/plugin/ochttp" @@ -32,6 +39,9 @@ import ( const defaultBasePath = "/_galaxycache/" +// When the retrieved value should expire. Unix timestamp in milliseconds. +const ttlHeader = "X-Galaxycache-Expire" + // HTTPFetchProtocol specifies HTTP specific options for HTTP-based // peer communication type HTTPFetchProtocol struct { @@ -125,17 +135,7 @@ func RegisterHTTPHandler(universe *gc.Universe, opts *HTTPOptions, serveMux *htt } } -func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // Parse request. - if !strings.HasPrefix(r.URL.Path, h.basePath) { - panic("HTTPHandler serving unexpected path: " + r.URL.Path) - } - strippedPath := r.URL.Path[len(h.basePath):] - needsUnescaping := false - if r.URL.RawPath != "" && r.URL.RawPath != r.URL.Path { - strippedPath = r.URL.RawPath[len(h.basePath):] - needsUnescaping = true - } +func (h *HTTPHandler) singleKeyServe(w http.ResponseWriter, r *http.Request, strippedPath string, needsUnescaping bool) { parts := strings.SplitN(strippedPath, "/", 2) if len(parts) != 2 { http.Error(w, "bad request", http.StatusBadRequest) @@ -169,11 +169,15 @@ func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // TODO: remove galaxy.Stats from here galaxy.Stats.ServerRequests.Add(1) - stats.RecordWithOptions( + if err := stats.RecordWithOptions( ctx, stats.WithMeasurements(gc.MServerRequests.M(1)), stats.WithRecorder(h.recorder), - ) + ); err != nil { + http.Error(w, "failed to record stats", http.StatusInternalServerError) + return + } + var value gc.ByteCodec err := galaxy.Get(ctx, key, &value) if err != nil { @@ -181,7 +185,99 @@ func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } w.Header().Set("Content-Type", "application/octet-stream") - w.Write(value) + b, expTm, err := value.MarshalBinary() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set(ttlHeader, fmt.Sprintf("%d", expTm.UnixMilli())) + _, err = w.Write(b) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (h *HTTPHandler) multipleKeyServe(w http.ResponseWriter, r *http.Request) { + galaxyName := r.Header["Galaxy"][0] + keys := r.Header["Key"] + + // Fetch the value for this galaxy/key. + galaxy := h.universe.GetGalaxy(galaxyName) + if galaxy == nil { + http.Error(w, "no such galaxy: "+galaxyName, http.StatusNotFound) + return + } + + mw := multipart.NewWriter(w) + w.Header().Add("Content-Type", mw.FormDataContentType()) + + for _, key := range keys { + ctx := r.Context() + + // TODO: remove galaxy.Stats from here + galaxy.Stats.ServerRequests.Add(1) + if err := stats.RecordWithOptions( + ctx, + stats.WithMeasurements(gc.MServerRequests.M(1)), + stats.WithRecorder(h.recorder), + ); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + var value gc.ByteCodec + err := galaxy.Get(ctx, key, &value) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + b, expTm, err := value.MarshalBinary() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + partw, err := mw.CreatePart(textproto.MIMEHeader{ + ttlHeader: []string{fmt.Sprintf("%d", expTm.UnixMilli())}, + "Content-Length": []string{fmt.Sprintf("%d", len(b))}, + }) + if err != nil { + w.WriteHeader(500) + _, _ = w.Write([]byte(fmt.Sprintf("failed to create part: %s", err.Error()))) + return + } + _, err = partw.Write(b) + if err != nil { + w.WriteHeader(500) + _, _ = w.Write([]byte(fmt.Sprintf("failed to write part: %s", err.Error()))) + return + } + } + err := mw.Close() + if err != nil { + w.WriteHeader(500) + _, _ = w.Write([]byte(fmt.Sprintf("failed to close multi-part writer: %s", err.Error()))) + return + } +} + +func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, h.basePath) { + panic("HTTPHandler serving unexpected path: " + r.URL.Path) + } + strippedPath := r.URL.Path[len(h.basePath):] + needsUnescaping := false + if r.URL.RawPath != "" && r.URL.RawPath != r.URL.Path { + strippedPath = r.URL.RawPath[len(h.basePath):] + needsUnescaping = true + } + if strippedPath != "" { + h.singleKeyServe(w, r, strippedPath, needsUnescaping) + return + } + h.multipleKeyServe(w, r) + } type httpFetcher struct { @@ -190,30 +286,67 @@ type httpFetcher struct { } // Fetch here implements the RemoteFetcher interface for sending a GET request over HTTP to a peer -func (h *httpFetcher) Fetch(ctx context.Context, galaxy string, key string) ([]byte, error) { - u := fmt.Sprintf( - "%v%v/%v", - h.baseURL, - url.PathEscape(galaxy), - url.PathEscape(key), - ) - req, err := http.NewRequest("GET", u, nil) +func (h *httpFetcher) Fetch(ctx context.Context, galaxy string, keys []string) ([]galaxycache.ValueWithTTL, error) { + req, err := http.NewRequest("GET", h.baseURL, http.NoBody) if err != nil { - return nil, err + return []galaxycache.ValueWithTTL{}, err } + req.Header["Key"] = keys + req.Header["Galaxy"] = []string{galaxy} + res, err := h.transport.RoundTrip(req.WithContext(ctx)) if err != nil { - return nil, err + return []galaxycache.ValueWithTTL{}, err } defer res.Body.Close() if res.StatusCode != http.StatusOK { - return nil, fmt.Errorf("server returned HTTP response status code: %v", res.Status) + return []galaxycache.ValueWithTTL{}, fmt.Errorf("server returned HTTP response status code: %v", res.Status) + } + + _, params, _ := mime.ParseMediaType(res.Header.Get("Content-Type")) + + mr := multipart.NewReader(res.Body, params["boundary"]) + // TODO(GiedriusS): convert this into a loop. + part, err := mr.NextPart() + if err != nil { + return []galaxycache.ValueWithTTL{}, err + } + lengthHeader := part.Header["Content-Length"] + + var data []byte + + if len(lengthHeader) > 0 { + lengthVal := lengthHeader[0] + length, err := strconv.ParseInt(string(lengthVal), 10, 64) + if err != nil { + return []galaxycache.ValueWithTTL{}, fmt.Errorf("got non-integer Content-Length header (%v): %v", lengthHeader, err) + } + dataBuffer := make([]byte, length) + + _, err = part.Read(dataBuffer) + if err != nil && err != io.EOF { + return []galaxycache.ValueWithTTL{}, fmt.Errorf("reading part: %v", err) + } + + data = dataBuffer + } else { + dataBuffer, err := ioutil.ReadAll(part) + if err != nil { + return []galaxycache.ValueWithTTL{}, fmt.Errorf("reading response body: %v", err) + } + + data = dataBuffer + } + + expireStr := part.Header.Get(ttlHeader) + if len(expireStr) == 0 { + return []galaxycache.ValueWithTTL{}, fmt.Errorf("failed reading TTL header %s", ttlHeader) } - data, err := ioutil.ReadAll(res.Body) + expire, err := strconv.ParseInt(string(expireStr), 10, 64) if err != nil { - return nil, fmt.Errorf("reading response body: %v", err) + return []galaxycache.ValueWithTTL{}, fmt.Errorf("parsing TTL header %s: %w", ttlHeader, err) } - return data, nil + return []galaxycache.ValueWithTTL{{Data: data, TTL: time.UnixMilli(expire)}}, nil } // Close here implements the RemoteFetcher interface for closing (does nothing for HTTP) diff --git a/http/http_test.go b/http/http_test.go index 5c3371b1..00e6ce82 100644 --- a/http/http_test.go +++ b/http/http_test.go @@ -23,21 +23,15 @@ import ( "net/http" "strconv" "strings" - "sync" "testing" + "time" + "github.com/stretchr/testify/require" gc "github.com/vimeo/galaxycache" "go.opencensus.io/plugin/ochttp" - "go.opencensus.io/stats/view" ) -type testStatsExporter struct { - mu sync.Mutex - data []*view.Data - t *testing.T -} - func TestHTTPHandler(t *testing.T) { const ( @@ -86,11 +80,21 @@ func TestHTTPHandler(t *testing.T) { if err := g.Get(ctx, key, &value); err != nil { t.Fatal(err) } - if suffix := ":" + key; !strings.HasSuffix(string(value), suffix) { - t.Errorf("Get(%q) = %q, want value ending in %q", key, value, suffix) + ret, expTm, err := value.MarshalBinary() + if err != nil { + t.Fatal(err) + } + if expTm.Equal(time.Time{}) { + t.Fatal("expiry time must be set") + } + if suffix := ":" + key; !strings.HasSuffix(string(ret), suffix) { + t.Errorf("Get(%q) = %q, want value ending in %q", key, ret, suffix) } - t.Logf("Get key=%q, value=%q (peer:key)", key, value) + t.Logf("Get key=%q, value=%q (peer:key)", key, ret) } + + currentExp := time.Now().Add(2 * time.Second) + time.Sleep(5 * time.Second) // Try it again, this time with a slash in the middle to ensure we're // handling those characters properly for _, key := range testKeys(nGets) { @@ -99,10 +103,19 @@ func TestHTTPHandler(t *testing.T) { if err := g.Get(ctx, testKey, &value); err != nil { t.Fatal(err) } - if suffix := ":" + testKey; !strings.HasSuffix(string(value), suffix) { - t.Errorf("Get(%q) = %q, want value ending in %q", key, value, suffix) + ret, expTm, err := value.MarshalBinary() + if err != nil { + t.Fatal(err) + } + + // Ensure that the keys were regenerated. + if expTm.Before(currentExp) { + t.Fatalf("expected key to expire after the current time i.e. it should be regenerated (%s, %s)", expTm, currentExp) + } + if suffix := ":" + testKey; !strings.HasSuffix(string(ret), suffix) { + t.Errorf("Get(%q) = %q, want value ending in %q", key, ret, suffix) } - t.Logf("Get key=%q, value=%q (peer:key)", testKey, value) + t.Logf("Get key=%q, value=%q (peer:key)", testKey, ret) } }) @@ -119,8 +132,7 @@ func makeHTTPServerUniverse(ctx context.Context, t testing.TB, galaxyName string t.Errorf("Error setting peers: %s", err) } getter := gc.GetterFunc(func(ctx context.Context, key string, dest gc.Codec) error { - dest.UnmarshalBinary([]byte(":" + key)) - return nil + return dest.UnmarshalBinary([]byte(":"+key), time.Now().Add(2*time.Second)) }) universe.NewGalaxy(galaxyName, 1<<20, getter) newServer := http.Server{Handler: wrappedHandler} @@ -132,7 +144,7 @@ func makeHTTPServerUniverse(ctx context.Context, t testing.TB, galaxyName string }() <-ctx.Done() - newServer.Shutdown(ctx) + require.NoError(t, newServer.Shutdown(context.Background())) } func testKeys(n int) (keys []string) { diff --git a/lru/lru.go b/lru/lru.go index a9f42c3f..11176448 100644 --- a/lru/lru.go +++ b/lru/lru.go @@ -19,6 +19,7 @@ package lru // import "github.com/vimeo/galaxycache/lru" import ( "container/list" + "time" ) // Cache is an LRU cache. It is not safe for concurrent access. @@ -39,8 +40,9 @@ type Cache struct { type Key interface{} type entry struct { - key Key - value interface{} + key Key + value interface{} + expire time.Time } // New creates a new Cache. @@ -55,7 +57,7 @@ func New(maxEntries int) *Cache { } // Add adds a value to the cache. -func (c *Cache) Add(key Key, value interface{}) { +func (c *Cache) Add(key Key, value interface{}, expire time.Time) { if c.cache == nil { c.cache = make(map[interface{}]*list.Element) c.ll = list.New() @@ -65,7 +67,7 @@ func (c *Cache) Add(key Key, value interface{}) { ele.Value.(*entry).value = value return } - ele := c.ll.PushFront(&entry{key, value}) + ele := c.ll.PushFront(&entry{key, value, expire}) c.cache[key] = ele if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries { c.RemoveOldest() @@ -78,8 +80,15 @@ func (c *Cache) Get(key Key) (value interface{}, ok bool) { return } if ele, hit := c.cache[key]; hit { + entry := ele.Value.(*entry) + // If the entry has expired, remove it from the cache + if !entry.expire.IsZero() && entry.expire.Before(time.Now()) { + c.removeElement(ele) + return nil, false + } + c.ll.MoveToFront(ele) - return ele.Value.(*entry).value, true + return entry.value, true } return } diff --git a/lru/lru_test.go b/lru/lru_test.go index 870fe38c..16622f59 100644 --- a/lru/lru_test.go +++ b/lru/lru_test.go @@ -19,6 +19,7 @@ package lru import ( "fmt" "testing" + "time" ) type simpleStruct struct { @@ -48,7 +49,7 @@ func TestGet(t *testing.T) { for _, tt := range getTests { lru := New(0) - lru.Add(tt.keyToAdd, 1234) + lru.Add(tt.keyToAdd, 1234, time.Time{}) val, ok := lru.Get(tt.keyToGet) if ok != tt.expectedOk { t.Fatalf("%s: cache hit = %v; want %v", tt.name, ok, !ok) @@ -60,7 +61,7 @@ func TestGet(t *testing.T) { func TestRemove(t *testing.T) { lru := New(0) - lru.Add("myKey", 1234) + lru.Add("myKey", 1234, time.Time{}) if val, ok := lru.Get("myKey"); !ok { t.Fatal("TestRemove returned no match") } else if val != 1234 { @@ -82,7 +83,7 @@ func TestEvict(t *testing.T) { lru := New(20) lru.OnEvicted = onEvictedFun for i := 0; i < 22; i++ { - lru.Add(fmt.Sprintf("myKey%d", i), 1234) + lru.Add(fmt.Sprintf("myKey%d", i), 1234, time.Time{}) } if len(evictedKeys) != 2 { @@ -95,3 +96,28 @@ func TestEvict(t *testing.T) { t.Fatalf("got %v in second evicted key; want %s", evictedKeys[1], "myKey1") } } + +func TestExpire(t *testing.T) { + var tests = []struct { + name string + key interface{} + expectedOk bool + expire time.Duration + wait time.Duration + }{ + {"not-expired", "myKey", true, time.Second * 1, time.Duration(0)}, + {"expired", "expiredKey", false, time.Millisecond * 100, time.Millisecond * 150}, + } + + for _, tt := range tests { + lru := New(0) + lru.Add(tt.key, 1234, time.Now().Add(tt.expire)) + time.Sleep(tt.wait) + val, ok := lru.Get(tt.key) + if ok != tt.expectedOk { + t.Fatalf("%s: cache hit = %v; want %v", tt.name, ok, !ok) + } else if ok && val != 1234 { + t.Fatalf("%s expected get to return 1234 but got %v", tt.name, val) + } + } +} diff --git a/peers.go b/peers.go index bf86733c..a3362f67 100644 --- a/peers.go +++ b/peers.go @@ -28,17 +28,24 @@ import ( "errors" "fmt" "sync" + "time" "github.com/vimeo/galaxycache/consistenthash" ) const defaultReplicas = 50 +type ValueWithTTL struct { + Data []byte + TTL time.Time +} + // RemoteFetcher is the interface that must be implemented to fetch from // other peers; the PeerPicker contains a map of these fetchers corresponding // to each other peer address type RemoteFetcher interface { - Fetch(context context.Context, galaxy string, key string) ([]byte, error) + // The value and when it should expire. + Fetch(context context.Context, galaxy string, keys []string) ([]ValueWithTTL, error) // Close closes a client-side connection (may be a nop) Close() error } @@ -166,8 +173,8 @@ func (n *NullFetchProtocol) NewFetcher(url string) (RemoteFetcher, error) { type nullFetchFetcher struct{} -func (n *nullFetchFetcher) Fetch(context context.Context, galaxy string, key string) ([]byte, error) { - return nil, errors.New("empty fetcher") +func (n *nullFetchFetcher) Fetch(context context.Context, galaxy string, keys []string) ([]ValueWithTTL, error) { + return []ValueWithTTL{}, errors.New("empty fetcher") } // Close closes a client-side connection (may be a nop) diff --git a/proto.sh b/proto.sh new file mode 100755 index 00000000..4927f6d4 --- /dev/null +++ b/proto.sh @@ -0,0 +1,2 @@ +#!/bin/bash +protoc --go-grpc_out=. --proto_path=galaxycachepb --go_out=galaxycachepb --go_opt=paths=source_relative galaxycache.proto