Skip to content

Commit

Permalink
enhance watch chan async trace
Browse files Browse the repository at this point in the history
Signed-off-by: tinswzy <[email protected]>
  • Loading branch information
tinswzy committed Dec 27, 2024
1 parent 4df444e commit f8aefe8
Show file tree
Hide file tree
Showing 63 changed files with 625 additions and 428 deletions.
13 changes: 10 additions & 3 deletions internal/datacoord/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package datacoord

import (
"context"
"fmt"

"go.uber.org/zap"
Expand Down Expand Up @@ -44,7 +45,8 @@ type RWChannel interface {
UpdateWatchInfo(info *datapb.ChannelWatchInfo)
}

func NewRWChannel(name string,
func NewRWChannel(ctx context.Context,
name string,
collectionID int64,
startPos []*commonpb.KeyDataPair,
schema *schemapb.CollectionSchema,
Expand All @@ -58,6 +60,7 @@ func NewRWChannel(name string,
Schema: schema,
CreateTimestamp: createTs,
DBProperties: dbProperties,
currentCtx: ctx,
}
}

Expand Down Expand Up @@ -140,11 +143,12 @@ type StateChannel struct {

currentState ChannelState
assignedNode int64
currentCtx context.Context
}

var _ RWChannel = (*StateChannel)(nil)

func NewStateChannel(ch RWChannel) *StateChannel {
func NewStateChannel(ctx context.Context, ch RWChannel) *StateChannel {
c := &StateChannel{
Name: ch.GetName(),
CollectionID: ch.GetCollectionID(),
Expand All @@ -155,20 +159,22 @@ func NewStateChannel(ch RWChannel) *StateChannel {
DBProperties: ch.GetDBProperties(),

assignedNode: bufferID,
currentCtx: ctx,
}

c.setState(Standby)
return c
}

func NewStateChannelByWatchInfo(nodeID int64, info *datapb.ChannelWatchInfo) *StateChannel {
func NewStateChannelByWatchInfo(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) *StateChannel {
c := &StateChannel{
Name: info.GetVchan().GetChannelName(),
CollectionID: info.GetVchan().GetCollectionID(),
Schema: info.GetSchema(),
DBProperties: info.GetDbProperties(),
Info: info,
assignedNode: nodeID,
currentCtx: ctx,
}

switch info.GetState() {
Expand Down Expand Up @@ -246,6 +252,7 @@ func (c *StateChannel) Clone() *StateChannel {

currentState: c.currentState,
assignedNode: c.assignedNode,
currentCtx: c.currentCtx,
}
}

Expand Down
Loading

0 comments on commit f8aefe8

Please sign in to comment.