-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: add TTL support #25
base: master
Are you sure you want to change the base?
Changes from 3 commits
aeb97c6
3a32041
ab6f170
7265abb
1b81014
6a89f51
e89e7e2
9b92dfa
323e89e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,11 +16,13 @@ limitations under the License. | |
|
||
package galaxycache | ||
|
||
import "time" | ||
|
||
// Codec includes both the BinaryMarshaler and BinaryUnmarshaler | ||
// interfaces | ||
type Codec interface { | ||
MarshalBinary() ([]byte, error) | ||
UnmarshalBinary(data []byte) error | ||
MarshalBinary() ([]byte, time.Time, error) | ||
UnmarshalBinary(data []byte, expire time.Time) error | ||
} | ||
|
||
// Note: to ensure that unmarshaling is a read-only operation, bytes | ||
|
@@ -32,48 +34,60 @@ func cloneBytes(b []byte) []byte { | |
} | ||
|
||
// ByteCodec is a byte slice type that implements Codec | ||
type ByteCodec []byte | ||
type ByteCodec struct { | ||
bytes []byte | ||
expire time.Time | ||
} | ||
Comment on lines
+37
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change breaks the common use-case of This change makes it impossible to extract the value out for anyone outside this package. |
||
|
||
// MarshalBinary on a ByteCodec returns the bytes | ||
func (c *ByteCodec) MarshalBinary() ([]byte, error) { | ||
return *c, nil | ||
func (c *ByteCodec) MarshalBinary() ([]byte, time.Time, error) { | ||
return c.bytes, c.expire, nil | ||
} | ||
|
||
// UnmarshalBinary on a ByteCodec sets the ByteCodec to | ||
// a copy of the provided data | ||
func (c *ByteCodec) UnmarshalBinary(data []byte) error { | ||
*c = cloneBytes(data) | ||
func (c *ByteCodec) UnmarshalBinary(data []byte, expire time.Time) error { | ||
c.bytes = cloneBytes(data) | ||
c.expire = expire | ||
return nil | ||
} | ||
|
||
// CopyingByteCodec is a byte slice type that implements Codec | ||
// and returns a copy of the bytes when marshaled | ||
type CopyingByteCodec []byte | ||
type CopyingByteCodec struct { | ||
bytes []byte | ||
expire time.Time | ||
} | ||
|
||
// MarshalBinary on a CopyingByteCodec returns a copy of the bytes | ||
func (c *CopyingByteCodec) MarshalBinary() ([]byte, error) { | ||
return cloneBytes(*c), nil | ||
func (c *CopyingByteCodec) MarshalBinary() ([]byte, time.Time, error) { | ||
return cloneBytes(c.bytes), c.expire, nil | ||
} | ||
|
||
// UnmarshalBinary on a CopyingByteCodec sets the ByteCodec to | ||
// a copy of the provided data | ||
func (c *CopyingByteCodec) UnmarshalBinary(data []byte) error { | ||
*c = cloneBytes(data) | ||
func (c *CopyingByteCodec) UnmarshalBinary(data []byte, expire time.Time) error { | ||
c.bytes = cloneBytes(data) | ||
c.expire = expire | ||
return nil | ||
} | ||
|
||
// StringCodec is a string type that implements Codec | ||
type StringCodec string | ||
type StringCodec struct { | ||
str string | ||
expire time.Time | ||
} | ||
|
||
// MarshalBinary on a StringCodec returns the bytes underlying | ||
// the string | ||
func (c *StringCodec) MarshalBinary() ([]byte, error) { | ||
return []byte(*c), nil | ||
func (c *StringCodec) MarshalBinary() ([]byte, time.Time, error) { | ||
return []byte(c.str), c.expire, nil | ||
} | ||
|
||
// UnmarshalBinary on a StringCodec sets the StringCodec to | ||
// a stringified copy of the provided data | ||
func (c *StringCodec) UnmarshalBinary(data []byte) error { | ||
*c = StringCodec(data) | ||
func (c *StringCodec) UnmarshalBinary(data []byte, expire time.Time) error { | ||
c.str = string(data) | ||
c.expire = expire | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -436,7 +436,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { | |
value.stats.touch() | ||
g.recordRequest(ctx, hlvl, false) | ||
g.recordStats(ctx, nil, MValueLength.M(int64(len(value.data)))) | ||
return dest.UnmarshalBinary(value.data) | ||
return dest.UnmarshalBinary(value.data, value.expire) | ||
} | ||
|
||
span.Annotatef([]trace.Attribute{trace.BoolAttribute("cache_hit", false)}, "Cache miss") | ||
|
@@ -456,7 +456,7 @@ func (g *Galaxy) Get(ctx context.Context, key string, dest Codec) error { | |
if destPopulated { | ||
return nil | ||
} | ||
return dest.UnmarshalBinary(value.data) | ||
return dest.UnmarshalBinary(value.data, value.expire) | ||
} | ||
|
||
type valWithLevel struct { | ||
|
@@ -525,7 +525,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi | |
// probably boring (normal task movement), so not | ||
// worth logging I imagine. | ||
} | ||
data, err := g.getLocally(ctx, key, dest) | ||
data, expTm, err := g.getLocally(ctx, key, dest) | ||
if err != nil { | ||
g.Stats.BackendLoadErrors.Add(1) | ||
g.recordStats(ctx, nil, MBackendLoadErrors.M(1)) | ||
|
@@ -535,7 +535,7 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi | |
g.Stats.CoalescedBackendLoads.Add(1) | ||
g.recordStats(ctx, nil, MCoalescedBackendLoads.M(1)) | ||
destPopulated = true // only one caller of load gets this return value | ||
value = newValWithStat(data, nil) | ||
value = newValWithStat(data, nil, expTm) | ||
g.populateCache(ctx, key, value, &g.mainCache) | ||
return &valWithLevel{value, hitBackend, authoritative, peerErr, err}, nil | ||
}) | ||
|
@@ -548,22 +548,22 @@ func (g *Galaxy) load(ctx context.Context, key string, dest Codec) (value *valWi | |
return | ||
} | ||
|
||
func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte, error) { | ||
func (g *Galaxy) getLocally(ctx context.Context, key string, dest Codec) ([]byte, time.Time, error) { | ||
err := g.getter.Get(ctx, key, dest) | ||
if err != nil { | ||
return nil, err | ||
return nil, time.Time{}, err | ||
} | ||
return dest.MarshalBinary() | ||
} | ||
|
||
func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string) (*valWithStat, error) { | ||
data, err := peer.Fetch(ctx, g.name, key) | ||
data, expire, err := peer.Fetch(ctx, g.name, key) | ||
if err != nil { | ||
return nil, err | ||
} | ||
vi, ok := g.candidateCache.get(key) | ||
if !ok { | ||
vi = g.addNewToCandidateCache(key) | ||
vi = g.addNewToCandidateCache(key, expire) | ||
} | ||
|
||
g.maybeUpdateHotCacheStats() // will update if at least a second has passed since the last update | ||
|
@@ -573,7 +573,7 @@ func (g *Galaxy) getFromPeer(ctx context.Context, peer RemoteFetcher, key string | |
KeyQPS: kStats.val(), | ||
HCStats: g.hcStatsWithTime.hcs, | ||
} | ||
value := newValWithStat(data, kStats) | ||
value := newValWithStat(data, kStats, expire) | ||
if g.opts.promoter.ShouldPromote(key, value.data, stats) { | ||
g.populateCache(ctx, key, value, &g.hotCache) | ||
} | ||
|
@@ -692,8 +692,9 @@ func (c *cache) stats() CacheStats { | |
} | ||
|
||
type valWithStat struct { | ||
data []byte | ||
stats *keyStats | ||
data []byte | ||
stats *keyStats | ||
expire time.Time | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a bit memory-inefficient to add the expiration time to both to the LRU's value struct and the value we're storing inside the LRU. |
||
} | ||
|
||
// sizeOfValWithStats returns the total size of the value in the hot/main | ||
|
@@ -704,21 +705,21 @@ func (v *valWithStat) size() int64 { | |
return int64(unsafe.Sizeof(*v.stats)) + int64(len(v.data)) + int64(unsafe.Sizeof(v)) + int64(unsafe.Sizeof(*v)) | ||
} | ||
|
||
func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats)) { | ||
func (c *cache) setLRUOnEvicted(f func(key string, kStats *keyStats, ttl time.Time)) { | ||
c.lru.OnEvicted = func(key lru.Key, value interface{}) { | ||
val := value.(*valWithStat) | ||
c.nbytes -= int64(len(key.(string))) + val.size() | ||
c.nevict++ | ||
if f != nil { | ||
f(key.(string), val.stats) | ||
f(key.(string), val.stats, val.expire) | ||
} | ||
} | ||
} | ||
|
||
func (c *cache) add(key string, value *valWithStat) { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
c.lru.Add(key, value) | ||
c.lru.Add(key, value, value.expire) | ||
c.nbytes += int64(len(key)) + value.size() | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very disruptive way to side-channel the expiration time around that breaks all existing
Codec
implementations as well as allBackendGetter
implementations.