diff --git a/cache_test.go b/cache_test.go index fa91df6..1ca9c9f 100644 --- a/cache_test.go +++ b/cache_test.go @@ -308,7 +308,7 @@ func TestZeroDequeFrequency(t *testing.T) { // 999 is evicted automatically, because tail entry in slru has frequency 1 // but 999 frequency is 0 // so increase 999 frequency - for i := 0; i < 128; i++ { + for i := 0; i < 1280; i++ { _, ok := client.Get(999) require.False(t, ok) } diff --git a/internal/buffer.go b/internal/buffer.go new file mode 100644 index 0000000..b65c88c --- /dev/null +++ b/internal/buffer.go @@ -0,0 +1,167 @@ +// Copyright (c) 2024 Yiling-J. All rights reserved. +// Copyright (c) 2023 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "runtime" + "sync/atomic" + "unsafe" + + "github.com/Yiling-J/theine-go/internal/xruntime" +) + +const ( + // The maximum number of elements per buffer. + capacity = 16 + mask = uint64(capacity - 1) +) + +func castToPointer[K comparable, V any](ptr unsafe.Pointer) *ReadBufItem[K, V] { + return (*ReadBufItem[K, V])(ptr) +} + +// PolicyBuffers is the set of buffers returned by the lossy buffer. +type PolicyBuffers[K comparable, V any] struct { + Returned []ReadBufItem[K, V] +} + +// Buffer is a circular ring buffer stores the elements being transferred by the producers to the consumer. +// The monotonically increasing count of reads and writes allow indexing sequentially to the next +// element location based upon a power-of-two sizing. +// +// The producers race to read the counts, check if there is available capacity, and if so then try +// once to CAS to the next write count. If the increment is successful then the producer lazily +// publishes the element. The producer does not retry or block when unsuccessful due to a failed +// CAS or the buffer being full. +// +// The consumer reads the counts and takes the available elements. The clearing of the elements +// and the next read count are lazily set. +// +// This implementation is striped to further increase concurrency. +type Buffer[K comparable, V any] struct { + head atomic.Uint64 + // headPadding + _ [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte + tail atomic.Uint64 + // tailPadding + _ [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte + returned unsafe.Pointer + // returnedPadding + _ [xruntime.CacheLineSize - 8]byte + policyBuffers unsafe.Pointer + // returnedSlicePadding + _ [xruntime.CacheLineSize - 8]byte + buffer [capacity]unsafe.Pointer +} + +// New creates a new lossy Buffer. +func NewBuffer[K comparable, V any]() *Buffer[K, V] { + pb := &PolicyBuffers[K, V]{ + Returned: make([]ReadBufItem[K, V], 0, capacity), + } + b := &Buffer[K, V]{ + policyBuffers: unsafe.Pointer(pb), + } + b.returned = b.policyBuffers + return b +} + +// Add lazily publishes the item to the consumer. +// +// item may be lost due to contention. +func (b *Buffer[K, V]) Add(n ReadBufItem[K, V]) *PolicyBuffers[K, V] { + head := b.head.Load() + tail := b.tail.Load() + size := tail - head + if size >= capacity { + // full buffer + return nil + } + if b.tail.CompareAndSwap(tail, tail+1) { + // success + index := int(tail & mask) + atomic.StorePointer(&b.buffer[index], unsafe.Pointer(&ReadBufItem[K, V]{ + entry: n.entry, + hash: n.hash, + })) + if size == capacity-1 { + // try return new buffer + if !atomic.CompareAndSwapPointer(&b.returned, b.policyBuffers, nil) { + // somebody already get buffer + return nil + } + + pb := (*PolicyBuffers[K, V])(b.policyBuffers) + for i := 0; i < capacity; i++ { + index := int(head & mask) + v := atomic.LoadPointer(&b.buffer[index]) + if v != nil { + // published + pb.Returned = append(pb.Returned, *castToPointer[K, V](v)) + // release + atomic.StorePointer(&b.buffer[index], nil) + } + head++ + } + + b.head.Store(head) + return pb + } + } + + // failed + return nil +} + +// Load all items in buffer, used in test only to update policy proactive proactively +func (b *Buffer[K, V]) items() []ReadBufItem[K, V] { + head := b.head.Load() + returned := []ReadBufItem[K, V]{} + // try return new buffer + for _, pt := range b.buffer { + v := atomic.LoadPointer(&pt) + if v != nil { + returned = append(returned, *castToPointer[K, V](v)) + } + head++ + } + + return returned +} + +// Free returns the processed buffer back and also clears it. +func (b *Buffer[K, V]) Free() { + pb := (*PolicyBuffers[K, V])(b.policyBuffers) + for i := 0; i < len(pb.Returned); i++ { + pb.Returned[i].entry = nil + pb.Returned[i].hash = 0 + } + pb.Returned = pb.Returned[:0] + atomic.StorePointer(&b.returned, b.policyBuffers) +} + +// Clear clears the lossy Buffer and returns it to the default state. +func (b *Buffer[K, V]) Clear() { + for !atomic.CompareAndSwapPointer(&b.returned, b.policyBuffers, nil) { + runtime.Gosched() + } + for i := 0; i < capacity; i++ { + atomic.StorePointer(&b.buffer[i], nil) + } + b.Free() + b.tail.Store(0) + b.head.Store(0) +} diff --git a/internal/clock/clock.go b/internal/clock/clock.go index 92c49bf..d480c50 100644 --- a/internal/clock/clock.go +++ b/internal/clock/clock.go @@ -1,6 +1,8 @@ package clock -import "time" +import ( + "time" +) type Clock struct { Start time.Time @@ -15,5 +17,5 @@ func (c *Clock) ExpireNano(ttl time.Duration) int64 { } func (c *Clock) SetStart(ts int64) { - c.Start = time.Unix(0, ts).UTC() + c.Start = time.Unix(0, ts) } diff --git a/internal/counter.go b/internal/counter.go new file mode 100644 index 0000000..16ef2db --- /dev/null +++ b/internal/counter.go @@ -0,0 +1,116 @@ +// Copyright 2024 Yiling-J +// Copyright 2024 Andrei Pechkurov + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "sync" + "sync/atomic" + + "github.com/Yiling-J/theine-go/internal/xruntime" +) + +// pool for P tokens +var ptokenPool sync.Pool + +// a P token is used to point at the current OS thread (P) +// on which the goroutine is run; exact identity of the thread, +// as well as P migration tolerance, is not important since +// it's used to as a best effort mechanism for assigning +// concurrent operations (goroutines) to different stripes of +// the counter +type ptoken struct { + idx uint32 + //lint:ignore U1000 prevents false sharing + pad [xruntime.CacheLineSize - 4]byte +} + +// A Counter is a striped int64 counter. +// +// Should be preferred over a single atomically updated int64 +// counter in high contention scenarios. +// +// A Counter must not be copied after first use. +type Counter struct { + stripes []cstripe + mask uint32 +} + +type cstripe struct { + c int64 + //lint:ignore U1000 prevents false sharing + pad [xruntime.CacheLineSize - 8]byte +} + +// NewCounter creates a new Counter instance. +func NewCounter() *Counter { + nstripes := RoundUpPowerOf2(xruntime.Parallelism()) + c := Counter{ + stripes: make([]cstripe, nstripes), + mask: nstripes - 1, + } + return &c +} + +// Inc increments the counter by 1. +func (c *Counter) Inc() { + c.Add(1) +} + +// Dec decrements the counter by 1. +func (c *Counter) Dec() { + c.Add(-1) +} + +// Add adds the delta to the counter. +func (c *Counter) Add(delta int64) { + t, ok := ptokenPool.Get().(*ptoken) + if !ok { + t = new(ptoken) + t.idx = xruntime.Fastrand() + } + for { + stripe := &c.stripes[t.idx&c.mask] + cnt := atomic.LoadInt64(&stripe.c) + if atomic.CompareAndSwapInt64(&stripe.c, cnt, cnt+delta) { + break + } + // Give a try with another randomly selected stripe. + t.idx = xruntime.Fastrand() + } + ptokenPool.Put(t) +} + +// Value returns the current counter value. +// The returned value may not include all of the latest operations in +// presence of concurrent modifications of the counter. +func (c *Counter) Value() int64 { + v := int64(0) + for i := 0; i < len(c.stripes); i++ { + stripe := &c.stripes[i] + v += atomic.LoadInt64(&stripe.c) + } + return v +} + +// Reset resets the counter to zero. +// This method should only be used when it is known that there are +// no concurrent modifications of the counter. +func (c *Counter) Reset() { + for i := 0; i < len(c.stripes); i++ { + stripe := &c.stripes[i] + atomic.StoreInt64(&stripe.c, 0) + } +} diff --git a/internal/mpsc.go b/internal/mpsc.go deleted file mode 100644 index 7e61ccc..0000000 --- a/internal/mpsc.go +++ /dev/null @@ -1,74 +0,0 @@ -// Package mpsc provides an efficient implementation of a multi-producer, single-consumer lock-free queue. -// -// The Push function is safe to call from multiple goroutines. The Pop and Empty APIs must only be -// called from a single, consumer goroutine. -package internal - -// This implementation is based on http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue - -import ( - "sync" - "sync/atomic" - "unsafe" -) - -type node[V any] struct { - next *node[V] - val V -} - -type Queue[V any] struct { - head, tail *node[V] - nodePool sync.Pool -} - -func NewQueue[V any]() *Queue[V] { - q := &Queue[V]{nodePool: sync.Pool{New: func() any { - return new(node[V]) - }}} - stub := &node[V]{} - q.head = stub - q.tail = stub - return q -} - -// Push adds x to the back of the queue. -// -// Push can be safely called from multiple goroutines -func (q *Queue[V]) Push(x V) { - n := q.nodePool.Get().(*node[V]) - n.val = x - // current producer acquires head node - prev := (*node[V])(atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.head)), unsafe.Pointer(n))) - - // release node to consumer - atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&prev.next)), unsafe.Pointer(n)) -} - -// Pop removes the item from the front of the queue or nil if the queue is empty -// -// Pop must be called from a single, consumer goroutine -func (q *Queue[V]) Pop() (V, bool) { - tail := q.tail - next := (*node[V])(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next)))) // acquire - if next != nil { - var null V - q.tail = next - v := next.val - next.val = null - tail.next = nil - q.nodePool.Put(tail) - return v, true - } - var null V - return null, false -} - -// Empty returns true if the queue is empty -// -// Empty must be called from a single, consumer goroutine -func (q *Queue[V]) Empty() bool { - tail := q.tail - next := (*node[V])(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next)))) - return next == nil -} diff --git a/internal/mpsc_test.go b/internal/mpsc_test.go deleted file mode 100644 index c83d7f5..0000000 --- a/internal/mpsc_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package internal - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestQueue_PushPop(t *testing.T) { - q := NewQueue[int]() - - q.Push(1) - q.Push(2) - v, ok := q.Pop() - assert.True(t, ok) - assert.Equal(t, 1, v) - v, ok = q.Pop() - assert.True(t, ok) - assert.Equal(t, 2, v) - _, ok = q.Pop() - assert.False(t, ok) -} - -func TestQueue_Empty(t *testing.T) { - q := NewQueue[int]() - assert.True(t, q.Empty()) - q.Push(1) - assert.False(t, q.Empty()) -} diff --git a/internal/persistence_test.go b/internal/persistence_test.go index 6a0120c..f884199 100644 --- a/internal/persistence_test.go +++ b/internal/persistence_test.go @@ -2,6 +2,8 @@ package internal import ( "os" + "strconv" + "strings" "testing" "time" @@ -17,12 +19,21 @@ func TestStorePersistence(t *testing.T) { for i := 0; i < 10; i++ { _, _ = store.Get(i) } - store.drainRead() + + for _, buf := range store.stripedBuffer { + store.drainRead(buf.items()) + } // now 0-9 in protected and 10-19 in probation require.Equal(t, 10, store.policy.slru.protected.Len()) require.Equal(t, 10, store.policy.slru.probation.Len()) - require.Equal(t, "9/8/7/6/5/4/3/2/1/0", store.policy.slru.protected.display()) - require.Equal(t, "19/18/17/16/15/14/13/12/11/10", store.policy.slru.probation.display()) + require.ElementsMatch(t, + strings.Split("9/8/7/6/5/4/3/2/1/0", "/"), + strings.Split(store.policy.slru.protected.display(), "/"), + ) + require.ElementsMatch(t, + strings.Split("19/18/17/16/15/14/13/12/11/10", "/"), + strings.Split(store.policy.slru.probation.display(), "/"), + ) // add 5 entries to shard deque for i := 20; i < 25; i++ { entry := &Entry[int, int]{ @@ -37,7 +48,9 @@ func TestStorePersistence(t *testing.T) { for i := 0; i < 10; i++ { _, _ = store.Get(5) } - store.drainRead() + for _, buf := range store.stripedBuffer { + store.drainRead(buf.items()) + } count := store.policy.sketch.Estimate(store.hasher.hash(5)) require.True(t, count > 5) @@ -69,7 +82,11 @@ func TestStorePersistence(t *testing.T) { } require.Equal(t, 10, new.policy.slru.protected.Len()) require.Equal(t, 10, new.policy.slru.probation.Len()) - require.Equal(t, "5/9/8/7/6/4/3/2/1/0", new.policy.slru.protected.display()) + + require.ElementsMatch(t, + strings.Split("9/8/7/6/5/4/3/2/1/0", "/"), + strings.Split(store.policy.slru.protected.display(), "/"), + ) require.Equal(t, "19/18/17/16/15/14/13/12/11/10", new.policy.slru.probation.display()) count = new.policy.sketch.Estimate(store.hasher.hash(5)) @@ -135,7 +152,9 @@ func TestStorePersistenceResize(t *testing.T) { for i := 0; i < 500; i++ { _, _ = store.Get(i) } - store.drainRead() + for _, buf := range store.stripedBuffer { + store.drainRead(buf.items()) + } // now 0-499 in protected and 500-999 in probation require.Equal(t, 500, store.policy.slru.protected.Len()) require.Equal(t, 500, store.policy.slru.probation.Len()) @@ -157,8 +176,17 @@ func TestStorePersistenceResize(t *testing.T) { require.Equal(t, 80, new.policy.slru.protected.Len()) // new cache probation size is 20, should contains latest 20 entries of original probation require.Equal(t, 20, new.policy.slru.probation.Len()) - expected := "499/498/497/496/495/494/493/492/491/490/489/488/487/486/485/484/483/482/481/480/479/478/477/476/475/474/473/472/471/470/469/468/467/466/465/464/463/462/461/460/459/458/457/456/455/454/453/452/451/450/449/448/447/446/445/444/443/442/441/440/439/438/437/436/435/434/433/432/431/430/429/428/427/426/425/424/423/422/421/420" - require.Equal(t, expected, new.policy.slru.protected.display()) - expected = "999/998/997/996/995/994/993/992/991/990/989/988/987/986/985/984/983/982/981/980" - require.Equal(t, expected, new.policy.slru.probation.display()) + + for _, i := range strings.Split(new.policy.slru.protected.display(), "/") { + in, err := strconv.Atoi(i) + require.Nil(t, err) + require.True(t, in < 500) + } + + for _, i := range strings.Split(new.policy.slru.probation.display(), "/") { + in, err := strconv.Atoi(i) + require.Nil(t, err) + require.True(t, in >= 500 && in < 1000) + } + } diff --git a/internal/rbmutex.go b/internal/rbmutex.go new file mode 100644 index 0000000..4ec5bec --- /dev/null +++ b/internal/rbmutex.go @@ -0,0 +1,205 @@ +// Copyright 2024 Yiling-J +// Copyright 2024 Andrei Pechkurov + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "runtime" + "sync" + "sync/atomic" + "time" + + "github.com/Yiling-J/theine-go/internal/xruntime" +) + +// slow-down guard +const nslowdown = 7 + +// pool for reader tokens +var rtokenPool sync.Pool + +// RToken is a reader lock token. +type RToken struct { + slot uint32 + //lint:ignore U1000 prevents false sharing + pad [xruntime.CacheLineSize - 4]byte +} + +// A RBMutex is a reader biased reader/writer mutual exclusion lock. +// The lock can be held by an many readers or a single writer. +// The zero value for a RBMutex is an unlocked mutex. +// +// A RBMutex must not be copied after first use. +// +// RBMutex is based on a modified version of BRAVO +// (Biased Locking for Reader-Writer Locks) algorithm: +// https://arxiv.org/pdf/1810.01553.pdf +// +// RBMutex is a specialized mutex for scenarios, such as caches, +// where the vast majority of locks are acquired by readers and write +// lock acquire attempts are infrequent. In such scenarios, RBMutex +// performs better than sync.RWMutex on large multicore machines. +// +// RBMutex extends sync.RWMutex internally and uses it as the "reader +// bias disabled" fallback, so the same semantics apply. The only +// noticeable difference is in reader tokens returned from the +// RLock/RUnlock methods. +type RBMutex struct { + rslots []rslot + rmask uint32 + rbias int32 + inhibitUntil time.Time + rw sync.RWMutex +} + +type rslot struct { + mu int32 + //lint:ignore U1000 prevents false sharing + pad [xruntime.CacheLineSize - 4]byte +} + +// NewRBMutex creates a new RBMutex instance. +func NewRBMutex() *RBMutex { + nslots := RoundUpPowerOf2(xruntime.Parallelism()) + mu := RBMutex{ + rslots: make([]rslot, nslots), + rmask: nslots - 1, + rbias: 1, + } + return &mu +} + +// TryRLock tries to lock m for reading without blocking. +// When TryRLock succeeds, it returns true and a reader token. +// In case of a failure, a false is returned. +func (mu *RBMutex) TryRLock() (bool, *RToken) { + if t := mu.fastRlock(); t != nil { + return true, t + } + // Optimistic slow path. + if mu.rw.TryRLock() { + if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) { + atomic.StoreInt32(&mu.rbias, 1) + } + return true, nil + } + return false, nil +} + +// RLock locks m for reading and returns a reader token. The +// token must be used in the later RUnlock call. +// +// Should not be used for recursive read locking; a blocked Lock +// call excludes new readers from acquiring the lock. +func (mu *RBMutex) RLock() *RToken { + if t := mu.fastRlock(); t != nil { + return t + } + // Slow path. + mu.rw.RLock() + if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) { + atomic.StoreInt32(&mu.rbias, 1) + } + return nil +} + +func (mu *RBMutex) fastRlock() *RToken { + if atomic.LoadInt32(&mu.rbias) == 1 { + t, ok := rtokenPool.Get().(*RToken) + if !ok { + t = new(RToken) + t.slot = xruntime.Fastrand() + } + // Try all available slots to distribute reader threads to slots. + for i := 0; i < len(mu.rslots); i++ { + slot := t.slot + uint32(i) + rslot := &mu.rslots[slot&mu.rmask] + rslotmu := atomic.LoadInt32(&rslot.mu) + if atomic.CompareAndSwapInt32(&rslot.mu, rslotmu, rslotmu+1) { + if atomic.LoadInt32(&mu.rbias) == 1 { + // Hot path succeeded. + t.slot = slot + return t + } + // The mutex is no longer reader biased. Roll back. + atomic.AddInt32(&rslot.mu, -1) + rtokenPool.Put(t) + return nil + } + // Contention detected. Give a try with the next slot. + } + } + return nil +} + +// RUnlock undoes a single RLock call. A reader token obtained from +// the RLock call must be provided. RUnlock does not affect other +// simultaneous readers. A panic is raised if m is not locked for +// reading on entry to RUnlock. +func (mu *RBMutex) RUnlock(t *RToken) { + if t == nil { + mu.rw.RUnlock() + return + } + if atomic.AddInt32(&mu.rslots[t.slot&mu.rmask].mu, -1) < 0 { + panic("invalid reader state detected") + } + rtokenPool.Put(t) +} + +// TryLock tries to lock m for writing without blocking. +func (mu *RBMutex) TryLock() bool { + if mu.rw.TryLock() { + if atomic.LoadInt32(&mu.rbias) == 1 { + atomic.StoreInt32(&mu.rbias, 0) + for i := 0; i < len(mu.rslots); i++ { + if atomic.LoadInt32(&mu.rslots[i].mu) > 0 { + // There is a reader. Roll back. + atomic.StoreInt32(&mu.rbias, 1) + mu.rw.Unlock() + return false + } + } + } + return true + } + return false +} + +// Lock locks m for writing. If the lock is already locked for +// reading or writing, Lock blocks until the lock is available. +func (mu *RBMutex) Lock() { + mu.rw.Lock() + if atomic.LoadInt32(&mu.rbias) == 1 { + atomic.StoreInt32(&mu.rbias, 0) + start := time.Now() + for i := 0; i < len(mu.rslots); i++ { + for atomic.LoadInt32(&mu.rslots[i].mu) > 0 { + runtime.Gosched() + } + } + mu.inhibitUntil = time.Now().Add(time.Since(start) * nslowdown) + } +} + +// Unlock unlocks m for writing. A panic is raised if m is not locked +// for writing on entry to Unlock. +// +// As with RWMutex, a locked RBMutex is not associated with a +// particular goroutine. One goroutine may RLock (Lock) a RBMutex and +// then arrange for another goroutine to RUnlock (Unlock) it. +func (mu *RBMutex) Unlock() { + mu.rw.Unlock() +} diff --git a/internal/store.go b/internal/store.go index 9815a9c..7065c60 100644 --- a/internal/store.go +++ b/internal/store.go @@ -9,13 +9,13 @@ import ( "math/rand" "runtime" "sync" - "sync/atomic" "time" "github.com/gammazero/deque" "github.com/zeebo/xxh3" "github.com/Yiling-J/theine-go/internal/bf" + "github.com/Yiling-J/theine-go/internal/xruntime" ) const ( @@ -33,9 +33,16 @@ const ( ) var ( - VersionMismatch = errors.New("version mismatch") + VersionMismatch = errors.New("version mismatch") + maxStripedBufferSize int ) +func init() { + parallelism := xruntime.Parallelism() + roundedParallelism := int(RoundUpPowerOf2(parallelism)) + maxStripedBufferSize = 4 * roundedParallelism +} + type Shard[K comparable, V any] struct { hashmap map[K]*Entry[K, V] dookeeper *bf.Bloomfilter @@ -46,7 +53,7 @@ type Shard[K comparable, V any] struct { qsize uint qlen int counter uint - mu sync.RWMutex + mu *RBMutex } func NewShard[K comparable, V any](size uint, qsize uint, doorkeeper bool) *Shard[K, V] { @@ -57,6 +64,7 @@ func NewShard[K comparable, V any](size uint, qsize uint, doorkeeper bool) *Shar deque: deque.New[*Entry[K, V]](), group: NewGroup[K, Loaded[V]](), vgroup: NewGroup[K, V](), + mu: NewRBMutex(), } if doorkeeper { s.dookeeper = bf.New(0.01) @@ -105,9 +113,9 @@ type Store[K comparable, V any] struct { kvBuilder func(entry *Entry[K, V]) dequeKV[K, V] policy *TinyLfu[K, V] timerwheel *TimerWheel[K, V] - readbuf *Queue[ReadBufItem[K, V]] + stripedBuffer []*Buffer[K, V] + mask uint32 cost func(V) int64 - readCounter *atomic.Uint32 shards []*Shard[K, V] cap uint shardCount uint @@ -158,16 +166,21 @@ func NewStore[K comparable, V any]( costfn = cost } + stripedBuffer := make([]*Buffer[K, V], 0, maxStripedBufferSize) + for i := 0; i < maxStripedBufferSize; i++ { + stripedBuffer = append(stripedBuffer, NewBuffer[K, V]()) + } + s := &Store[K, V]{ - cap: uint(maxsize), - hasher: hasher, - policy: NewTinyLfu[K, V](uint(policySize), hasher), - readCounter: &atomic.Uint32{}, - readbuf: NewQueue[ReadBufItem[K, V]](), - writebuf: make(chan WriteBufItem[K, V], writeBufSize), - entryPool: sync.Pool{New: func() any { return &Entry[K, V]{} }}, - shardCount: uint(shardCount), - doorkeeper: doorkeeper, + cap: uint(maxsize), + hasher: hasher, + policy: NewTinyLfu[K, V](uint(policySize), hasher), + stripedBuffer: stripedBuffer, + mask: uint32(maxStripedBufferSize - 1), + writebuf: make(chan WriteBufItem[K, V], writeBufSize), + entryPool: sync.Pool{New: func() any { return &Entry[K, V]{} }}, + shardCount: uint(shardCount), + doorkeeper: doorkeeper, kvBuilder: func(entry *Entry[K, V]) dequeKV[K, V] { return dequeKV[K, V]{ k: entry.key, @@ -205,8 +218,7 @@ func NewStore[K comparable, V any]( } func (s *Store[K, V]) getFromShard(key K, hash uint64, shard *Shard[K, V]) (V, bool) { - new := s.readCounter.Add(1) - shard.mu.RLock() + tk := shard.mu.RLock() entry, ok := shard.get(key) var value V if ok { @@ -218,23 +230,18 @@ func (s *Store[K, V]) getFromShard(key K, hash uint64, shard *Shard[K, V]) (V, b value = entry.value } } - shard.mu.RUnlock() - switch { - case new < MAX_READ_BUFF_SIZE: - var send ReadBufItem[K, V] - send.hash = hash - if ok { - send.entry = entry - } - s.readbuf.Push(send) - case new == MAX_READ_BUFF_SIZE: - var send ReadBufItem[K, V] - send.hash = hash - if ok { - send.entry = entry - } - s.readbuf.Push(send) - s.drainRead() + shard.mu.RUnlock(tk) + + idx := s.getReadBufferIdx() + var send ReadBufItem[K, V] + send.hash = hash + if ok { + send.entry = entry + } + pb := s.stripedBuffer[idx].Add(send) + if pb != nil { + s.drainRead(pb.Returned) + s.stripedBuffer[idx].Free() } return value, ok } @@ -458,9 +465,9 @@ func (s *Store[K, V]) DeleteWithSecondary(key K) error { func (s *Store[K, V]) Len() int { total := 0 for _, s := range s.shards { - s.mu.RLock() + tk := s.mu.RLock() total += s.len() - s.mu.RUnlock() + s.mu.RUnlock(tk) } return total } @@ -521,25 +528,20 @@ func (s *Store[K, V]) removeEntry(entry *Entry[K, V], reason RemoveReason) { case REMOVED: _, index := s.index(entry.key) shard := s.shards[index] - shard.mu.RLock() + tk := shard.mu.RLock() kv := s.kvBuilder(entry) - shard.mu.RUnlock() + shard.mu.RUnlock(tk) _ = s.removalCallback(kv, reason) } } -func (s *Store[K, V]) drainRead() { +func (s *Store[K, V]) drainRead(buffer []ReadBufItem[K, V]) { s.policy.total.Add(MAX_READ_BUFF_SIZE) s.mlock.Lock() - for { - v, ok := s.readbuf.Pop() - if !ok { - break - } - s.policy.Access(v) + for _, e := range buffer { + s.policy.Access(e) } s.mlock.Unlock() - s.readCounter.Store(0) } func (s *Store[K, V]) maintance() { @@ -624,26 +626,26 @@ func (s *Store[K, V]) maintance() { func (s *Store[K, V]) Range(f func(key K, value V) bool) { now := s.timerwheel.clock.NowNano() for _, shard := range s.shards { - shard.mu.RLock() + tk := shard.mu.RLock() for _, entry := range shard.hashmap { expire := entry.expire.Load() if expire != 0 && expire <= now { continue } if !f(entry.key, entry.value) { - shard.mu.RUnlock() + shard.mu.RUnlock(tk) return } } - shard.mu.RUnlock() + shard.mu.RUnlock(tk) } } func (s *Store[K, V]) Close() { for _, s := range s.shards { - s.mu.RLock() + tk := s.mu.RLock() s.hashmap = nil - s.mu.RUnlock() + s.mu.RUnlock(tk) } s.mlock.Lock() s.closed = true @@ -652,6 +654,10 @@ func (s *Store[K, V]) Close() { close(s.writebuf) } +func (s *Store[K, V]) getReadBufferIdx() int { + return int(xruntime.Fastrand() & s.mask) +} + type StoreMeta struct { Version uint64 StartNano int64 @@ -717,12 +723,12 @@ func (s *Store[K, V]) Persist(version uint64, writer io.Writer) error { s.mlock.Unlock() for _, sd := range s.shards { - sd.mu.RLock() + tk := sd.mu.RLock() err = persistDeque(sd.deque, writer, blockEncoder) if err != nil { return err } - sd.mu.RUnlock() + sd.mu.RUnlock(tk) } // write end block @@ -744,7 +750,7 @@ func (s *Store[K, V]) insertSimple(entry *Entry[K, V]) { func (s *Store[K, V]) processSecondary() { for item := range s.secondaryCacheBuf { - item.shard.mu.RLock() + tk := item.shard.mu.RLock() // first double check key still exists in map, // not exist means key already deleted by Delete API _, exist := item.shard.get(item.entry.key) @@ -753,7 +759,7 @@ func (s *Store[K, V]) processSecondary() { item.entry.key, item.entry.value, item.entry.cost.Load(), item.entry.expire.Load(), ) - item.shard.mu.RUnlock() + item.shard.mu.RUnlock(tk) if err != nil { s.secondaryCache.HandleAsyncError(err) continue @@ -767,7 +773,7 @@ func (s *Store[K, V]) processSecondary() { item.shard.mu.Unlock() } } else { - item.shard.mu.RUnlock() + item.shard.mu.RUnlock(tk) } } } diff --git a/internal/timerwheel.go b/internal/timerwheel.go index 7bc5b5b..b648e5e 100644 --- a/internal/timerwheel.go +++ b/internal/timerwheel.go @@ -29,7 +29,7 @@ type TimerWheel[K comparable, V any] struct { } func NewTimerWheel[K comparable, V any](size uint) *TimerWheel[K, V] { - clock := &clock.Clock{Start: time.Now().UTC()} + clock := &clock.Clock{Start: time.Now()} buckets := []uint{64, 64, 32, 4, 1} spans := []uint{ next2Power(uint((1 * time.Second).Nanoseconds())), diff --git a/internal/tlfu.go b/internal/tlfu.go index 55850f0..acc1e8c 100644 --- a/internal/tlfu.go +++ b/internal/tlfu.go @@ -10,8 +10,8 @@ type TinyLfu[K comparable, V any] struct { hasher *Hasher[K] size uint counter uint - total atomic.Uint32 - hit atomic.Uint32 + total *Counter + hit *Counter hr float32 threshold atomic.Int32 lruFactor uint8 @@ -25,6 +25,8 @@ func NewTinyLfu[K comparable, V any](size uint, hasher *Hasher[K]) *TinyLfu[K, V sketch: NewCountMinSketch(), step: 1, hasher: hasher, + total: NewCounter(), + hit: NewCounter(), } // default threshold to -1 so all entries are admitted until cache is full tlfu.threshold.Store(-1) @@ -32,8 +34,8 @@ func NewTinyLfu[K comparable, V any](size uint, hasher *Hasher[K]) *TinyLfu[K, V } func (t *TinyLfu[K, V]) climb() { - total := t.total.Load() - hit := t.hit.Load() + total := t.total.Value() + hit := t.hit.Value() current := float32(hit) / float32(total) delta := current - t.hr var diff int8 @@ -74,8 +76,8 @@ func (t *TinyLfu[K, V]) climb() { } t.threshold.Add(-int32(diff)) t.hr = current - t.hit.Store(0) - t.total.Store(0) + t.hit.Reset() + t.total.Reset() } func (t *TinyLfu[K, V]) Set(entry *Entry[K, V]) *Entry[K, V] { diff --git a/internal/utils.go b/internal/utils.go index 78499f0..983cbde 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -42,3 +42,18 @@ func (h *Hasher[K]) hash(key K) uint64 { } return xxh3.HashString(strKey) } + +// RoundUpPowerOf2 is based on https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2. +func RoundUpPowerOf2(v uint32) uint32 { + if v == 0 { + return 1 + } + v-- + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + v++ + return v +} diff --git a/internal/xruntime/rand.go b/internal/xruntime/rand.go new file mode 100644 index 0000000..cbbff19 --- /dev/null +++ b/internal/xruntime/rand.go @@ -0,0 +1,26 @@ +//go:build !go1.22 + +// Copyright (c) 2024 Yiling-J. All rights reserved. +// Copyright (c) 2023 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xruntime + +import ( + _ "unsafe" +) + +//go:noescape +//go:linkname Fastrand runtime.fastrand +func Fastrand() uint32 diff --git a/internal/xruntime/rand_1.22.go b/internal/xruntime/rand_1.22.go new file mode 100644 index 0000000..b2c2dfc --- /dev/null +++ b/internal/xruntime/rand_1.22.go @@ -0,0 +1,26 @@ +//go:build go1.22 + +// Copyright (c) 2024 Yiling-J. All rights reserved. +// Copyright (c) 2023 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xruntime + +import ( + "math/rand/v2" +) + +func Fastrand() uint32 { + return rand.Uint32() +} diff --git a/internal/xruntime/xruntime.go b/internal/xruntime/xruntime.go new file mode 100644 index 0000000..09f5b55 --- /dev/null +++ b/internal/xruntime/xruntime.go @@ -0,0 +1,35 @@ +// Copyright (c) 2024 Yiling-J. All rights reserved. +// Copyright (c) 2023 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package xruntime + +import ( + "runtime" +) + +const ( + // CacheLineSize is useful for preventing false sharing. + CacheLineSize = 64 +) + +// Parallelism returns the maximum possible number of concurrently running goroutines. +func Parallelism() uint32 { + maxProcs := uint32(runtime.GOMAXPROCS(0)) + numCPU := uint32(runtime.NumCPU()) + if maxProcs < numCPU { + return maxProcs + } + return numCPU +}