Skip to content

Commit

Permalink
Add context parameter to ClusterStore methods
Browse files Browse the repository at this point in the history
Updated all methods of the ClusterStore interface to include a context.Context parameter for better control over request lifecycles. This change affects Set, Delete, Get, Scan, and the newly introduced SetMulti method.
  • Loading branch information
ehsannm committed Nov 28, 2024
1 parent e6c2d60 commit 84d5db7
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 22 deletions.
16 changes: 10 additions & 6 deletions kit/ctx_store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kit

import "time"
import (
"context"
"time"
)

// Store identifies a key-value store, which its value doesn't have any limitation.
type Store interface {
Expand All @@ -15,13 +18,14 @@ type Store interface {
// different instances of the cluster. Also, the value type is only string.
type ClusterStore interface {
// Set creates/updates a key-value pair in the cluster
Set(key, value string, ttl time.Duration) error
Set(ctx context.Context, key, value string, ttl time.Duration) error
SetMulti(ctx context.Context, kv map[string]string, ttl time.Duration) error
// Delete deletes the key-value pair from the cluster
Delete(key string) error
Delete(ctx context.Context, key string) error
// Get returns the value bind to the key
Get(key string) (string, error)
Get(ctx context.Context, key string) (string, error)
// Scan scans through the keys which have the prefix.
// If callback returns `false`, then the scan is aborted.
Scan(prefix string, cb func(string) bool) error
ScanWithValue(prefix string, cb func(string, string) bool) error
Scan(ctx context.Context, prefix string, cb func(string) bool) error
ScanWithValue(ctx context.Context, prefix string, cb func(string, string) bool) error
}
22 changes: 17 additions & 5 deletions kit/edge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (t *testCluster) Store() kit.ClusterStore {
return t
}

func (t *testCluster) Set(key, value string, _ time.Duration) error {
func (t *testCluster) Set(ctx context.Context, key, value string, _ time.Duration) error {
if t.kv == nil {
t.kv = map[string]string{}
}
Expand All @@ -158,7 +158,19 @@ func (t *testCluster) Set(key, value string, _ time.Duration) error {
return nil
}

func (t *testCluster) Delete(key string) error {
func (t *testCluster) SetMulti(ctx context.Context, kv map[string]string, ttl time.Duration) error {
if t.kv == nil {
t.kv = map[string]string{}
}

for k, v := range kv {
t.kv[k] = v
}

return nil
}

func (t *testCluster) Delete(ctx context.Context, key string) error {
if t.kv == nil {
t.kv = map[string]string{}
}
Expand All @@ -168,11 +180,11 @@ func (t *testCluster) Delete(key string) error {
return nil
}

func (t *testCluster) Get(key string) (string, error) {
func (t *testCluster) Get(ctx context.Context, key string) (string, error) {
return t.kv[key], nil
}

func (t *testCluster) Scan(prefix string, cb func(key string) bool) error {
func (t *testCluster) Scan(ctx context.Context, prefix string, cb func(key string) bool) error {
for k := range t.kv {
if strings.HasPrefix(k, prefix) {
if !cb(k) {
Expand All @@ -184,7 +196,7 @@ func (t *testCluster) Scan(prefix string, cb func(key string) bool) error {
return nil
}

func (t *testCluster) ScanWithValue(prefix string, cb func(string, string) bool) error {
func (t *testCluster) ScanWithValue(ctx context.Context, prefix string, cb func(string, string) bool) error {
for k, v := range t.kv {
if strings.HasPrefix(k, prefix) {
if !cb(k, v) {
Expand Down
34 changes: 23 additions & 11 deletions std/clusters/rediscluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/redis/go-redis/v9"
)

// KeepTTL is used in Set method of the ClusterStore interface.
// KeepTTL can be used in `Set` method of the ClusterStore interface.
const KeepTTL = -1

type cluster struct {
Expand Down Expand Up @@ -118,30 +118,43 @@ func (c *cluster) Store() kit.ClusterStore {
return c
}

func (c *cluster) Set(key, value string, ttl time.Duration) error {
func (c *cluster) Set(ctx context.Context, key, value string, ttl time.Duration) error {
return c.rc.Set(
context.Background(),
ctx,
fmt.Sprintf("%s:kv:%s", c.prefix, key), value,
ttl,
).Err()
}

func (c *cluster) Delete(key string) error {
func (c *cluster) SetMulti(ctx context.Context, m map[string]string, ttl time.Duration) error {
pipe := c.rc.Pipeline()
for k, v := range m {
pipe.Set(
ctx,
fmt.Sprintf("%s:kv:%s", c.prefix, k), v,
ttl,
)
}
_, err := pipe.Exec(ctx)

return err
}

func (c *cluster) Delete(ctx context.Context, key string) error {
return c.rc.Del(
context.Background(),
ctx,
fmt.Sprintf("%s:kv:%s", c.prefix, key),
).Err()
}

func (c *cluster) Get(key string) (string, error) {
func (c *cluster) Get(ctx context.Context, key string) (string, error) {
return c.rc.Get(
context.Background(),
ctx,
fmt.Sprintf("%s:kv:%s", c.prefix, key),
).Result()
}

func (c *cluster) Scan(prefix string, cb func(string) bool) error {
ctx := context.Background()
func (c *cluster) Scan(ctx context.Context, prefix string, cb func(string) bool) error {
iter := c.rc.Scan(ctx, 0, fmt.Sprintf("%s:kv:%s*", c.prefix, prefix), 512).Iterator()

for iter.Next(ctx) {
Expand All @@ -153,8 +166,7 @@ func (c *cluster) Scan(prefix string, cb func(string) bool) error {
return nil
}

func (c *cluster) ScanWithValue(prefix string, cb func(string, string) bool) error {
ctx := context.Background()
func (c *cluster) ScanWithValue(ctx context.Context, prefix string, cb func(string, string) bool) error {
iter := c.rc.Scan(ctx, 0, fmt.Sprintf("%s:kv:%s*", c.prefix, prefix), 512).Iterator()

for iter.Next(ctx) {
Expand Down

0 comments on commit 84d5db7

Please sign in to comment.