Skip to content

Commit

Permalink
enhance watch chan async trace
Browse files Browse the repository at this point in the history
Signed-off-by: tinswzy <[email protected]>
  • Loading branch information
tinswzy committed Dec 13, 2024
1 parent 10460ed commit 4c2d9be
Show file tree
Hide file tree
Showing 63 changed files with 619 additions and 424 deletions.
13 changes: 10 additions & 3 deletions internal/datacoord/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package datacoord

import (
"context"
"fmt"

"go.uber.org/zap"
Expand All @@ -43,7 +44,8 @@ type RWChannel interface {
UpdateWatchInfo(info *datapb.ChannelWatchInfo)
}

func NewRWChannel(name string,
func NewRWChannel(ctx context.Context,
name string,
collectionID int64,
startPos []*commonpb.KeyDataPair,
schema *schemapb.CollectionSchema,
Expand All @@ -55,6 +57,7 @@ func NewRWChannel(name string,
StartPositions: startPos,
Schema: schema,
CreateTimestamp: createTs,
currentCtx: ctx,
}
}

Expand Down Expand Up @@ -131,11 +134,12 @@ type StateChannel struct {

currentState ChannelState
assignedNode int64
currentCtx context.Context
}

var _ RWChannel = (*StateChannel)(nil)

func NewStateChannel(ch RWChannel) *StateChannel {
func NewStateChannel(ctx context.Context, ch RWChannel) *StateChannel {
c := &StateChannel{
Name: ch.GetName(),
CollectionID: ch.GetCollectionID(),
Expand All @@ -145,19 +149,21 @@ func NewStateChannel(ch RWChannel) *StateChannel {
Info: ch.GetWatchInfo(),

assignedNode: bufferID,
currentCtx: ctx,
}

c.setState(Standby)
return c
}

func NewStateChannelByWatchInfo(nodeID int64, info *datapb.ChannelWatchInfo) *StateChannel {
func NewStateChannelByWatchInfo(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) *StateChannel {
c := &StateChannel{
Name: info.GetVchan().GetChannelName(),
CollectionID: info.GetVchan().GetCollectionID(),
Schema: info.GetSchema(),
Info: info,
assignedNode: nodeID,
currentCtx: ctx,
}

switch info.GetState() {
Expand Down Expand Up @@ -219,6 +225,7 @@ func (c *StateChannel) Clone() *StateChannel {

currentState: c.currentState,
assignedNode: c.assignedNode,
currentCtx: c.currentCtx,
}
}

Expand Down
Loading

0 comments on commit 4c2d9be

Please sign in to comment.