Skip to content

Commit

Permalink
Support Watch function in redis cache (#237)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlcarl authored Sep 8, 2023
1 parent d015fcf commit 6022cf2
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 0 deletions.
1 change: 1 addition & 0 deletions cache/redis/client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type redisClient interface {
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd
Del(ctx context.Context, keys ...string) *redis.IntCmd
Pipeline() redis.Pipeliner
Watch(ctx context.Context, fn func(tx *redis.Tx) error, keys ...string) error

Ping(ctx context.Context) *redis.StatusCmd
Scan(ctx context.Context, cursor uint64, match string, count int64) *redis.ScanCmd
Expand Down
4 changes: 4 additions & 0 deletions cache/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ func (r *Redis) Delete(ctx context.Context, key string) error {
return nil
}

func (r *Redis) Watch(ctx context.Context, fn func(tx *redis.Tx) error, keys ...string) error {
return r.client.Watch(ctx, fn, keys...)
}

func (r *Redis) IsAvailable(ctx context.Context) bool {
return r.client.Ping(ctx).Err() == nil
}
Expand Down
35 changes: 35 additions & 0 deletions cache/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/alicebob/miniredis/v2"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -214,6 +215,40 @@ func TestRedis_Delete(t *testing.T) {
}
}

func TestRedis_Watch(t *testing.T) {
redisInitFns := []redisInitFn{redisInit, redisClusterInit}
for _, redisInit := range redisInitFns {
t.Run("", func(t *testing.T) {
r, err := redisInit(t)
assert.Nil(t, err)

ctx := context.TODO()
key := "test"
err = r.Watch(ctx, func(tx *redis.Tx) error {
n, err := tx.Get(ctx, key).Int()
if err != nil && err != redis.Nil {
return err
}

// Actual operation (local in optimistic lock).
n++

// Operation is commited only if the watched keys remain unchanged.
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, key, fmt.Sprintf("%d", n), 0)
return nil
})
return err
}, key)
assert.Nil(t, err)

newValue, err := r.GetBytes(context.TODO(), key)
assert.Nil(t, err)
assert.Equal(t, "1", string(newValue))
})
}
}

func TestRedis_IsAvailable(t *testing.T) {
redisInitFns := []redisInitFn{redisInit, redisClusterInit}
for _, redisInit := range redisInitFns {
Expand Down

0 comments on commit 6022cf2

Please sign in to comment.