Skip to content

Commit

Permalink
read performance (#42)
Browse files Browse the repository at this point in the history
* use striped lossy buffer as read buffer

* remove mpsc

* fix lint

* Use RBMutex as shard mutex

* add striped counter
  • Loading branch information
Yiling-J authored Aug 12, 2024
1 parent 0f4aee0 commit b78ab76
Show file tree
Hide file tree
Showing 15 changed files with 703 additions and 178 deletions.
2 changes: 1 addition & 1 deletion cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func TestZeroDequeFrequency(t *testing.T) {
// 999 is evicted automatically, because tail entry in slru has frequency 1
// but 999 frequency is 0
// so increase 999 frequency
for i := 0; i < 128; i++ {
for i := 0; i < 1280; i++ {
_, ok := client.Get(999)
require.False(t, ok)
}
Expand Down
167 changes: 167 additions & 0 deletions internal/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright (c) 2024 Yiling-J. All rights reserved.
// Copyright (c) 2023 Alexey Mayshev. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"runtime"
"sync/atomic"
"unsafe"

"github.com/Yiling-J/theine-go/internal/xruntime"
)

const (
// The maximum number of elements per buffer.
capacity = 16
mask = uint64(capacity - 1)
)

func castToPointer[K comparable, V any](ptr unsafe.Pointer) *ReadBufItem[K, V] {
return (*ReadBufItem[K, V])(ptr)
}

// PolicyBuffers is the set of buffers returned by the lossy buffer.
type PolicyBuffers[K comparable, V any] struct {
Returned []ReadBufItem[K, V]
}

// Buffer is a circular ring buffer stores the elements being transferred by the producers to the consumer.
// The monotonically increasing count of reads and writes allow indexing sequentially to the next
// element location based upon a power-of-two sizing.
//
// The producers race to read the counts, check if there is available capacity, and if so then try
// once to CAS to the next write count. If the increment is successful then the producer lazily
// publishes the element. The producer does not retry or block when unsuccessful due to a failed
// CAS or the buffer being full.
//
// The consumer reads the counts and takes the available elements. The clearing of the elements
// and the next read count are lazily set.
//
// This implementation is striped to further increase concurrency.
type Buffer[K comparable, V any] struct {
head atomic.Uint64
// headPadding
_ [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
tail atomic.Uint64
// tailPadding
_ [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
returned unsafe.Pointer
// returnedPadding
_ [xruntime.CacheLineSize - 8]byte
policyBuffers unsafe.Pointer
// returnedSlicePadding
_ [xruntime.CacheLineSize - 8]byte
buffer [capacity]unsafe.Pointer
}

// New creates a new lossy Buffer.
func NewBuffer[K comparable, V any]() *Buffer[K, V] {
pb := &PolicyBuffers[K, V]{
Returned: make([]ReadBufItem[K, V], 0, capacity),
}
b := &Buffer[K, V]{
policyBuffers: unsafe.Pointer(pb),
}
b.returned = b.policyBuffers
return b
}

// Add lazily publishes the item to the consumer.
//
// item may be lost due to contention.
func (b *Buffer[K, V]) Add(n ReadBufItem[K, V]) *PolicyBuffers[K, V] {
head := b.head.Load()
tail := b.tail.Load()
size := tail - head
if size >= capacity {
// full buffer
return nil
}
if b.tail.CompareAndSwap(tail, tail+1) {
// success
index := int(tail & mask)
atomic.StorePointer(&b.buffer[index], unsafe.Pointer(&ReadBufItem[K, V]{
entry: n.entry,
hash: n.hash,
}))
if size == capacity-1 {
// try return new buffer
if !atomic.CompareAndSwapPointer(&b.returned, b.policyBuffers, nil) {
// somebody already get buffer
return nil
}

pb := (*PolicyBuffers[K, V])(b.policyBuffers)
for i := 0; i < capacity; i++ {
index := int(head & mask)
v := atomic.LoadPointer(&b.buffer[index])
if v != nil {
// published
pb.Returned = append(pb.Returned, *castToPointer[K, V](v))
// release
atomic.StorePointer(&b.buffer[index], nil)
}
head++
}

b.head.Store(head)
return pb
}
}

// failed
return nil
}

// Load all items in buffer, used in test only to update policy proactive proactively
func (b *Buffer[K, V]) items() []ReadBufItem[K, V] {
head := b.head.Load()
returned := []ReadBufItem[K, V]{}
// try return new buffer
for _, pt := range b.buffer {
v := atomic.LoadPointer(&pt)
if v != nil {
returned = append(returned, *castToPointer[K, V](v))
}
head++
}

return returned
}

// Free returns the processed buffer back and also clears it.
func (b *Buffer[K, V]) Free() {
pb := (*PolicyBuffers[K, V])(b.policyBuffers)
for i := 0; i < len(pb.Returned); i++ {
pb.Returned[i].entry = nil
pb.Returned[i].hash = 0
}
pb.Returned = pb.Returned[:0]
atomic.StorePointer(&b.returned, b.policyBuffers)
}

// Clear clears the lossy Buffer and returns it to the default state.
func (b *Buffer[K, V]) Clear() {
for !atomic.CompareAndSwapPointer(&b.returned, b.policyBuffers, nil) {
runtime.Gosched()
}
for i := 0; i < capacity; i++ {
atomic.StorePointer(&b.buffer[i], nil)
}
b.Free()
b.tail.Store(0)
b.head.Store(0)
}
6 changes: 4 additions & 2 deletions internal/clock/clock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package clock

import "time"
import (
"time"
)

type Clock struct {
Start time.Time
Expand All @@ -15,5 +17,5 @@ func (c *Clock) ExpireNano(ttl time.Duration) int64 {
}

func (c *Clock) SetStart(ts int64) {
c.Start = time.Unix(0, ts).UTC()
c.Start = time.Unix(0, ts)
}
116 changes: 116 additions & 0 deletions internal/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2024 Yiling-J
// Copyright 2024 Andrei Pechkurov

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"sync"
"sync/atomic"

"github.com/Yiling-J/theine-go/internal/xruntime"
)

// pool for P tokens
var ptokenPool sync.Pool

// a P token is used to point at the current OS thread (P)
// on which the goroutine is run; exact identity of the thread,
// as well as P migration tolerance, is not important since
// it's used to as a best effort mechanism for assigning
// concurrent operations (goroutines) to different stripes of
// the counter
type ptoken struct {
idx uint32
//lint:ignore U1000 prevents false sharing
pad [xruntime.CacheLineSize - 4]byte
}

// A Counter is a striped int64 counter.
//
// Should be preferred over a single atomically updated int64
// counter in high contention scenarios.
//
// A Counter must not be copied after first use.
type Counter struct {
stripes []cstripe
mask uint32
}

type cstripe struct {
c int64
//lint:ignore U1000 prevents false sharing
pad [xruntime.CacheLineSize - 8]byte
}

// NewCounter creates a new Counter instance.
func NewCounter() *Counter {
nstripes := RoundUpPowerOf2(xruntime.Parallelism())
c := Counter{
stripes: make([]cstripe, nstripes),
mask: nstripes - 1,
}
return &c
}

// Inc increments the counter by 1.
func (c *Counter) Inc() {
c.Add(1)
}

// Dec decrements the counter by 1.
func (c *Counter) Dec() {
c.Add(-1)
}

// Add adds the delta to the counter.
func (c *Counter) Add(delta int64) {
t, ok := ptokenPool.Get().(*ptoken)
if !ok {
t = new(ptoken)
t.idx = xruntime.Fastrand()
}
for {
stripe := &c.stripes[t.idx&c.mask]
cnt := atomic.LoadInt64(&stripe.c)
if atomic.CompareAndSwapInt64(&stripe.c, cnt, cnt+delta) {
break
}
// Give a try with another randomly selected stripe.
t.idx = xruntime.Fastrand()
}
ptokenPool.Put(t)
}

// Value returns the current counter value.
// The returned value may not include all of the latest operations in
// presence of concurrent modifications of the counter.
func (c *Counter) Value() int64 {
v := int64(0)
for i := 0; i < len(c.stripes); i++ {
stripe := &c.stripes[i]
v += atomic.LoadInt64(&stripe.c)
}
return v
}

// Reset resets the counter to zero.
// This method should only be used when it is known that there are
// no concurrent modifications of the counter.
func (c *Counter) Reset() {
for i := 0; i < len(c.stripes); i++ {
stripe := &c.stripes[i]
atomic.StoreInt64(&stripe.c, 0)
}
}
74 changes: 0 additions & 74 deletions internal/mpsc.go

This file was deleted.

Loading

0 comments on commit b78ab76

Please sign in to comment.