Skip to content

Commit

Permalink
fix: parse error given duplicated plan cache key (#37334)
Browse files Browse the repository at this point in the history
See: #37016

---------

Signed-off-by: Ted Xu <[email protected]>
  • Loading branch information
tedxu authored Nov 7, 2024
1 parent 00f6d0e commit e47bf21
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 56 deletions.
88 changes: 51 additions & 37 deletions internal/parser/planparserv2/plan_parser_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,68 +12,82 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
planparserv2 "github.com/milvus-io/milvus/internal/parser/planparserv2/generated"
"github.com/milvus-io/milvus/internal/proto/planpb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// exprParseKey is used to cache the parse result. Currently only collectionName is used besides expr string, which implies
// that the same collectionName will have the same schema thus the same parse result. In the future, if there is case that the
// schema changes without changing the collectionName, we need to change the cache key.
type exprParseKey struct {
collectionName string
expr string
}

var exprCache = expirable.NewLRU[exprParseKey, any](256, nil, time.Minute*10)

func handleExpr(schema *typeutil.SchemaHelper, exprStr string) interface{} {
parseKey := exprParseKey{collectionName: schema.GetCollectionName(), expr: exprStr}
val, ok := exprCache.Get(parseKey)
if !ok {
exprStr = convertHanToASCII(exprStr)
val = handleExprWithErrorListener(schema, exprStr, &errorListenerImpl{})
// Note that the errors will be cached, too.
exprCache.Add(parseKey, val)
var (
exprCache = expirable.NewLRU[string, any](1024, nil, time.Minute*10)
trueLiteral = &ExprWithType{
dataType: schemapb.DataType_Bool,
expr: alwaysTrueExpr(),
}
)

return val
}

func handleExprWithErrorListener(schema *typeutil.SchemaHelper, exprStr string, errorListener errorListener) interface{} {
if isEmptyExpression(exprStr) {
return &ExprWithType{
dataType: schemapb.DataType_Bool,
expr: alwaysTrueExpr(),
func handleInternal(exprStr string) (ast planparserv2.IExprContext, err error) {
val, ok := exprCache.Get(exprStr)
if ok {
switch v := val.(type) {
case planparserv2.IExprContext:
return v, nil
case error:
return nil, v
default:
return nil, fmt.Errorf("unknown cache error: %v", v)
}
}

inputStream := antlr.NewInputStream(exprStr)
lexer := getLexer(inputStream, errorListener)
if errorListener.Error() != nil {
return errorListener.Error()
// Note that the errors will be cached, too.
defer func() {
if err != nil {
exprCache.Add(exprStr, err)
}
}()
exprNormal := convertHanToASCII(exprStr)
listener := &errorListenerImpl{}

inputStream := antlr.NewInputStream(exprNormal)
lexer := getLexer(inputStream, listener)
if err = listener.Error(); err != nil {
return
}

parser := getParser(lexer, errorListener)
if errorListener.Error() != nil {
return errorListener.Error()
parser := getParser(lexer, listener)
if err = listener.Error(); err != nil {
return
}

ast := parser.Expr()
if errorListener.Error() != nil {
return errorListener.Error()
ast = parser.Expr()
if err = listener.Error(); err != nil {
return
}

if parser.GetCurrentToken().GetTokenType() != antlr.TokenEOF {
log.Info("invalid expression", zap.String("expr", exprStr))
return fmt.Errorf("invalid expression: %s", exprStr)
err = fmt.Errorf("invalid expression: %s", exprStr)
return
}

// lexer & parser won't be used by this thread, can be put into pool.
putLexer(lexer)
putParser(parser)

exprCache.Add(exprStr, ast)
return
}

func handleExpr(schema *typeutil.SchemaHelper, exprStr string) interface{} {
if isEmptyExpression(exprStr) {
return trueLiteral
}
ast, err := handleInternal(exprStr)
if err != nil {
return err
}

visitor := NewParserVisitor(schema)
return ast.Accept(visitor)
}
Expand Down
34 changes: 15 additions & 19 deletions internal/parser/planparserv2/plan_parser_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ func Test_FixErrorListenerNotRemoved(t *testing.T) {

normal := "1 < Int32Field < (Int16Field)"
for i := 0; i < 10; i++ {
err := handleExprWithErrorListener(schemaHelper, normal, &errorListenerTest{})
err := handleExpr(schemaHelper, normal)
err1, ok := err.(error)
assert.True(t, ok)
assert.Error(t, err1)
Expand Down Expand Up @@ -1379,25 +1379,21 @@ func BenchmarkPlanCache(b *testing.B) {

b.ResetTimer()

for i := 0; i < b.N; i++ {
r := handleExpr(schemaHelper, "array_length(ArrayField) == 10")
err := getError(r)
assert.NoError(b, err)
}
}

func BenchmarkNoPlanCache(b *testing.B) {
schema := newTestSchema()
schemaHelper, err := typeutil.CreateSchemaHelper(schema)
require.NoError(b, err)

b.ResetTimer()
b.Run("cached", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r := handleExpr(schemaHelper, "array_length(ArrayField) == 10")
err := getError(r)
assert.NoError(b, err)
}
})

for i := 0; i < b.N; i++ {
r := handleExpr(schemaHelper, fmt.Sprintf("array_length(ArrayField) == %d", i))
err := getError(r)
assert.NoError(b, err)
}
b.Run("uncached", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r := handleExpr(schemaHelper, fmt.Sprintf("array_length(ArrayField) == %d", i))
err := getError(r)
assert.NoError(b, err)
}
})
}

func randomChineseString(length int) string {
Expand Down

0 comments on commit e47bf21

Please sign in to comment.