Skip to content

Commit

Permalink
enhance: Add cgo call metrics for load/write API (milvus-io#37405)
Browse files Browse the repository at this point in the history
Cgo API cost is not observerable since not metrics is related to them.
This PR add metrics for some sync cgo call related to load & write

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Nov 13, 2024
1 parent 25c9699 commit 7933eba
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 0 deletions.
11 changes: 11 additions & 0 deletions internal/querynodev2/segments/load_index_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ import "C"

import (
"context"
"fmt"
"runtime"
"time"
"unsafe"

"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/cgopb"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand Down Expand Up @@ -197,6 +200,14 @@ func (li *LoadIndexInfo) loadIndex(ctx context.Context) error {
var status C.CStatus

_, _ = GetLoadPool().Submit(func() (any, error) {
start := time.Now()
defer func() {
metrics.QueryNodeCGOCallLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
"AppendIndexV2",
"Sync",
).Observe(float64(time.Since(start).Milliseconds()))
}()
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
status = C.AppendIndexV3(li.cLoadIndexInfo)
} else {
Expand Down
41 changes: 41 additions & 0 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"io"
"runtime"
"strings"
"time"
"unsafe"

"github.com/apache/arrow/go/v12/arrow/array"
Expand Down Expand Up @@ -795,6 +796,14 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []
var status C.CStatus

GetDynamicPool().Submit(func() (any, error) {
start := time.Now()
defer func() {
metrics.QueryNodeCGOCallLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
"Insert",
"Sync",
).Observe(float64(time.Since(start).Milliseconds()))
}()
status = C.Insert(s.ptr,
cOffset,
cNumOfRows,
Expand Down Expand Up @@ -870,6 +879,14 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.Primary
}
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
start := time.Now()
defer func() {
metrics.QueryNodeCGOCallLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
"Delete",
"Sync",
).Observe(float64(time.Since(start).Milliseconds()))
}()
status = C.Delete(s.ptr,
cOffset,
cSize,
Expand Down Expand Up @@ -932,6 +949,14 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {

var status C.CStatus
GetLoadPool().Submit(func() (any, error) {
start := time.Now()
defer func() {
metrics.QueryNodeCGOCallLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
"LoadFieldData",
"Sync",
).Observe(float64(time.Since(start).Milliseconds()))
}()
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.ID())
if err != nil {
Expand Down Expand Up @@ -1010,6 +1035,14 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun

var status C.CStatus
GetLoadPool().Submit(func() (any, error) {
start := time.Now()
defer func() {
metrics.QueryNodeCGOCallLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
"LoadFieldData",
"Sync",
).Observe(float64(time.Since(start).Milliseconds()))
}()
log.Info("submitted loadFieldData task to load pool")
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.ID())
Expand Down Expand Up @@ -1236,6 +1269,14 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
*/
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
start := time.Now()
defer func() {
metrics.QueryNodeCGOCallLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
"LoadDeletedRecord",
"Sync",
).Observe(float64(time.Since(start).Milliseconds()))
}()
status = C.LoadDeletedRecord(s.ptr, loadInfo)
return nil, nil
}).Await()
Expand Down
2 changes: 2 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ const (
lockOp = "lock_op"
loadTypeName = "load_type"
pathLabelName = "path"
cgoNameLabelName = `cgo_name`
cgoTypeLabelName = `cgo_type`

// entities label
LoadedLabel = "loaded"
Expand Down
14 changes: 14 additions & 0 deletions pkg/metrics/querynode_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,19 @@ var (
channelNameLabelName,
},
)

QueryNodeCGOCallLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "cgo_latency",
Help: "latency of each cgo call",
Buckets: buckets,
}, []string{
nodeIDLabelName,
cgoNameLabelName,
cgoTypeLabelName,
})
)

// RegisterQueryNode registers QueryNode metrics
Expand Down Expand Up @@ -859,6 +872,7 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeSearchHitSegmentNum)
registry.MustRegister(QueryNodeDeleteBufferSize)
registry.MustRegister(QueryNodeDeleteBufferRowNum)
registry.MustRegister(QueryNodeCGOCallLatency)
// Add cgo metrics
RegisterCGOMetrics(registry)
}
Expand Down

0 comments on commit 7933eba

Please sign in to comment.