Skip to content

Commit

Permalink
Support access log dynamic config and prepare for support resutful in…
Browse files Browse the repository at this point in the history
…terface access log

Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd committed Mar 8, 2024
1 parent d19a82e commit 996eec2
Show file tree
Hide file tree
Showing 16 changed files with 593 additions and 371 deletions.
7 changes: 4 additions & 3 deletions internal/proxy/accesslog/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proxy/accesslog/info"
"github.com/milvus-io/milvus/internal/proxy/connection"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -64,7 +65,7 @@ func BenchmarkAccesslog(b *testing.B) {
Params.Save(Params.ProxyCfg.AccessLog.Enable.Key, "true")
Params.Save(Params.ProxyCfg.AccessLog.Filename.Key, "")
Params.Save(Params.CommonCfg.ClusterPrefix.Key, "in-test")
initAccessLogger(&Params.ProxyCfg.AccessLog, &Params.MinioCfg)
InitAccessLogger(Params)
paramtable.Get().CommonCfg.ClusterPrefix.GetValue()

clientInfo := &commonpb.ClientInfo{
Expand All @@ -81,9 +82,9 @@ func BenchmarkAccesslog(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
data := datas[i%len(datas)]
accessInfo := NewGrpcAccessInfo(ctx, rpcInfo, data.req)
accessInfo := info.NewGrpcAccessInfo(ctx, rpcInfo, data.req)
accessInfo.UpdateCtx(ctx)
accessInfo.SetResult(data.resp, data.err)
accessInfo.Write()
_globalL.Write(accessInfo)
}
}
61 changes: 31 additions & 30 deletions internal/proxy/accesslog/formater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/status"

"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proxy/accesslog/info"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/crypto"
Expand Down Expand Up @@ -103,28 +104,28 @@ func (s *LogFormatterSuite) TestFormatNames() {
formatter := NewFormatter(fmt)

for _, req := range s.reqs {
info := NewGrpcAccessInfo(s.ctx, s.serverinfo, req)
fs := formatter.Format(info)
s.False(strings.Contains(fs, unknownString))
i := info.NewGrpcAccessInfo(s.ctx, s.serverinfo, req)
fs := formatter.Format(i)
s.False(strings.Contains(fs, info.Unknown))
}

info := NewGrpcAccessInfo(s.ctx, s.serverinfo, nil)
fs := formatter.Format(info)
s.True(strings.Contains(fs, unknownString))
i := info.NewGrpcAccessInfo(s.ctx, s.serverinfo, nil)
fs := formatter.Format(i)
s.True(strings.Contains(fs, info.Unknown))
}

func (s *LogFormatterSuite) TestFormatTime() {
fmt := "$time_now: $time_start: $time_end: $time_cost: $time_now"
formatter := NewFormatter(fmt)

for id, req := range s.reqs {
info := NewGrpcAccessInfo(s.ctx, s.serverinfo, req)
fs := formatter.Format(info)
s.True(strings.Contains(fs, unknownString))
info.UpdateCtx(s.ctx)
info.SetResult(s.resps[id], s.errs[id])
fs = formatter.Format(info)
s.False(strings.Contains(fs, unknownString))
i := info.NewGrpcAccessInfo(s.ctx, s.serverinfo, req)
fs := formatter.Format(i)
s.True(strings.Contains(fs, info.Unknown))
i.UpdateCtx(s.ctx)
i.SetResult(s.resps[id], s.errs[id])
fs = formatter.Format(i)
s.False(strings.Contains(fs, info.Unknown))
}
}

Expand All @@ -133,34 +134,34 @@ func (s *LogFormatterSuite) TestFormatUserInfo() {
formatter := NewFormatter(fmt)

for _, req := range s.reqs {
info := NewGrpcAccessInfo(s.ctx, s.serverinfo, req)
fs := formatter.Format(info)
s.False(strings.Contains(fs, unknownString))
i := info.NewGrpcAccessInfo(s.ctx, s.serverinfo, req)
fs := formatter.Format(i)
s.False(strings.Contains(fs, info.Unknown))
}

// test unknown
info := NewGrpcAccessInfo(context.Background(), &grpc.UnaryServerInfo{}, nil)
fs := formatter.Format(info)
s.True(strings.Contains(fs, unknownString))
i := info.NewGrpcAccessInfo(context.Background(), &grpc.UnaryServerInfo{}, nil)
fs := formatter.Format(i)
s.True(strings.Contains(fs, info.Unknown))
}

func (s *LogFormatterSuite) TestFormatMethodInfo() {
fmt := "$method_name: $method_status $trace_id"
formatter := NewFormatter(fmt)

metaContext := metadata.AppendToOutgoingContext(s.ctx, clientRequestIDKey, s.traceID)
metaContext := metadata.AppendToOutgoingContext(s.ctx, info.ClientRequestIDKey, s.traceID)
for _, req := range s.reqs {
info := NewGrpcAccessInfo(metaContext, s.serverinfo, req)
fs := formatter.Format(info)
i := info.NewGrpcAccessInfo(metaContext, s.serverinfo, req)
fs := formatter.Format(i)
log.Info(fs)
s.True(strings.Contains(fs, s.traceID))
}

traceContext, traceSpan := otel.Tracer(typeutil.ProxyRole).Start(s.ctx, "test")
trueTraceID := traceSpan.SpanContext().TraceID().String()
for _, req := range s.reqs {
info := NewGrpcAccessInfo(traceContext, s.serverinfo, req)
fs := formatter.Format(info)
i := info.NewGrpcAccessInfo(traceContext, s.serverinfo, req)
fs := formatter.Format(i)
log.Info(fs)
s.True(strings.Contains(fs, trueTraceID))
}
Expand All @@ -171,13 +172,13 @@ func (s *LogFormatterSuite) TestFormatMethodResult() {
formatter := NewFormatter(fmt)

for id, req := range s.reqs {
info := NewGrpcAccessInfo(s.ctx, s.serverinfo, req)
fs := formatter.Format(info)
s.True(strings.Contains(fs, unknownString))
i := info.NewGrpcAccessInfo(s.ctx, s.serverinfo, req)
fs := formatter.Format(i)
s.True(strings.Contains(fs, info.Unknown))

info.SetResult(s.resps[id], s.errs[id])
fs = formatter.Format(info)
s.False(strings.Contains(fs, unknownString))
i.SetResult(s.resps[id], s.errs[id])
fs = formatter.Format(i)
s.False(strings.Contains(fs, info.Unknown))
}
}

Expand Down
37 changes: 6 additions & 31 deletions internal/proxy/accesslog/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,15 @@ import (
"fmt"
"strings"

"github.com/milvus-io/milvus/internal/proxy/accesslog/info"
"github.com/milvus-io/milvus/pkg/util/merr"
)

const (
unknownString = "Unknown"
fomaterkey = "format"
methodKey = "methods"
fomaterkey = "format"
methodKey = "methods"
)

type getMetricFunc func(i *GrpcAccessInfo) string

// supported metrics
var metricFuncMap = map[string]getMetricFunc{
"$method_name": getMethodName,
"$method_status": getMethodStatus,
"$trace_id": getTraceID,
"$user_addr": getAddr,
"$user_name": getUserName,
"$response_size": getResponseSize,
"$error_code": getErrorCode,
"$error_msg": getErrorMsg,
"$database_name": getDbName,
"$collection_name": getCollectionName,
"$partition_name": getPartitionName,
"$time_cost": getTimeCost,
"$time_now": getTimeNow,
"$time_start": getTimeStart,
"$time_end": getTimeEnd,
"$method_expr": getExpr,
"$output_fields": getOutputFields,
"$sdk_version": getSdkVersion,
"$cluster_prefix": getClusterPrefix,
}

var BaseFormatterKey = "base"

// Formaater manager not concurrent safe
Expand Down Expand Up @@ -128,7 +103,7 @@ func (f *Formatter) buildMetric(metric string, prefixs []string) ([]string, []st
func (f *Formatter) build() {
prefixs := []string{f.base}
f.fields = []string{}
for metric := range metricFuncMap {
for metric := range info.MetricFuncMap {
if strings.Contains(f.base, metric) {
f.fields, prefixs = f.buildMetric(metric, prefixs)
}
Expand All @@ -144,8 +119,8 @@ func (f *Formatter) build() {
f.fmt += "\n"
}

func (f *Formatter) Format(i AccessInfo) string {
fieldValues := i.Get(f.fields...)
func (f *Formatter) Format(i info.AccessInfo) string {
fieldValues := info.Get(i, f.fields...)
return fmt.Sprintf(f.fmt, fieldValues...)
}

Expand Down
Loading

0 comments on commit 996eec2

Please sign in to comment.