Skip to content

Commit

Permalink
Merge pull request #13 from vinted/vinted/zygis/redis-for-boost
Browse files Browse the repository at this point in the history
boost: generate cache key and hit redis in vtgate
  • Loading branch information
DeathBorn authored Mar 27, 2024
2 parents fe4f52b + b06b768 commit d3e8953
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 11 deletions.
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts *Options) error {

streamSize := 10
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig, schemaTracker, nil)
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig, schemaTracker, nil, nil)

queryLogBufferSize := 10
vtgate.QueryLogger = streamlog.New("VTGate", queryLogBufferSize)
Expand Down
9 changes: 5 additions & 4 deletions go/vt/vtgate/boost/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package boost
type Columns map[string]string

type PlanConfig struct {
IsBoosted bool
BoostColumns Columns
IsBoosted bool
Columns Columns
Table string
}

func NonBoostedPlanConfig() *PlanConfig {
return &PlanConfig{
IsBoosted: false,
BoostColumns: Columns{},
IsBoosted: false,
Columns: Columns{},
}
}

Expand Down
9 changes: 7 additions & 2 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"sync"
"time"
"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/vt/vtgate/boost"

"vitess.io/vitess/go/acl"
Expand Down Expand Up @@ -103,6 +104,7 @@ type Executor struct {
schemaTracker SchemaInfo

queryFilterConfigs *boost.QueryFilterConfigs
boostCache *redis.Cache
}

var executorOnce sync.Once
Expand All @@ -122,6 +124,7 @@ func NewExecutor(
cacheCfg *cache.Config,
schemaTracker SchemaInfo,
queryFilterConfigs *boost.QueryFilterConfigs,
boostCache *redis.Cache,
) *Executor {
e := &Executor{
serv: serv,
Expand All @@ -135,6 +138,7 @@ func NewExecutor(
streamSize: streamSize,
schemaTracker: schemaTracker,
queryFilterConfigs: queryFilterConfigs,
boostCache: boostCache,
}

vschemaacl.Init()
Expand Down Expand Up @@ -1257,8 +1261,9 @@ func configForBoost(configs *boost.QueryFilterConfigs, columns boost.Columns, in

if keysMatch(columns, filterConfig.Columns) {
return &boost.PlanConfig{
IsBoosted: true,
BoostColumns: columns,
IsBoosted: true,
Columns: columns,
Table: tableName,
}
}
}
Expand Down
37 changes: 36 additions & 1 deletion go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package vtgate

import (
"context"
"fmt"
"time"
"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/vt/vtgate/boost"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -121,7 +124,17 @@ func (e *Executor) newExecute(ctx context.Context, safeSession *SafeSession, sql
}

// Check if boosted and hit Redis
// plan.BoostPlanConfig.IsBoosted == true

if plan.BoostPlanConfig != nil && plan.BoostPlanConfig.IsBoosted {
cacheKey := cacheKey(plan.BoostPlanConfig, bindVars)

fmt.Println("Cache Key: ", cacheKey)

redisResults, err := e.boostCache.Get(cacheKey)

fmt.Println("Redis Results: ", redisResults)
fmt.Println("Error: ", err)
}

statementTypeResult, sqlResult, err := e.executePlan(ctx, plan, vcursor, bindVars, execStart)(logStats, safeSession)

Expand All @@ -130,6 +143,28 @@ func (e *Executor) newExecute(ctx context.Context, safeSession *SafeSession, sql
return statementTypeResult, sqlResult, err
}

func cacheKey(config *boost.PlanConfig, vars map[string]*querypb.BindVariable) string {
return redis.GenerateCacheKey(cacheKeyParams(config, vars)...)
}

func cacheKeyParams(boostConfig *boost.PlanConfig, vars map[string]*querypb.BindVariable) []string {
var allColumns []string
var allValues []string

for key, vtgValueKey := range boostConfig.Columns {
allColumns = append(allColumns, key)

var byteArray = vars[vtgValueKey].Value
var stringValue = string(byteArray)

allValues = append(allValues, stringValue)
}

tail := append(allColumns, allValues...)

return append([]string{boostConfig.Table}, tail...)
}

func (e *Executor) startTxIfNecessary(ctx context.Context, safeSession *SafeSession) error {
if !safeSession.Autocommit && !safeSession.InTransaction() {
if err := e.txConn.Begin(ctx, safeSession); err != nil {
Expand Down
9 changes: 6 additions & 3 deletions go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"strings"
"time"
"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/vt/vtgate/boost"

"vitess.io/vitess/go/vt/key"
Expand Down Expand Up @@ -216,8 +217,9 @@ func Init(ctx context.Context, serv srvtopo.Server, cell string, tabletTypesToWa
}

queryFilterConfigs, _ := boost.Load(boostQueryFilterConfigPath)
boostCache := redis.NewCache()

executor := NewExecutor(ctx, serv, cell, resolver, *normalizeQueries, *warnShardedOnly, *streamBufferSize, cacheCfg, si, queryFilterConfigs)
executor := NewExecutor(ctx, serv, cell, resolver, *normalizeQueries, *warnShardedOnly, *streamBufferSize, cacheCfg, si, queryFilterConfigs, boostCache)

// connect the schema tracker with the vschema manager
if *enableSchemaChangeSignal {
Expand Down Expand Up @@ -619,11 +621,12 @@ func LegacyInit(ctx context.Context, hc discovery.LegacyHealthCheck, serv srvtop
MaxMemoryUsage: *queryPlanCacheMemory,
LFU: *queryPlanCacheLFU,
}

queryFilterConfigs := &boost.QueryFilterConfigs{}
boostCache := redis.NewCache()

rpcVTGate = &VTGate{
executor: NewExecutor(ctx, serv, cell, resolver, *normalizeQueries, *warnShardedOnly, *streamBufferSize, cacheCfg, nil, queryFilterConfigs),
executor: NewExecutor(ctx, serv, cell, resolver, *normalizeQueries, *warnShardedOnly, *streamBufferSize, cacheCfg, nil, queryFilterConfigs, boostCache),
resolver: resolver,
vsm: vsm,
txConn: tc,
Expand Down

0 comments on commit d3e8953

Please sign in to comment.