Skip to content

Commit

Permalink
WIP: tt and cdc relation
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Nov 22, 2024
1 parent d06f3ca commit c3a3c17
Show file tree
Hide file tree
Showing 48 changed files with 656 additions and 234 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241111062829-6de3d96f664f
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241118153429-bf88e9c3d154
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -628,8 +628,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241111062829-6de3d96f664f h1:yLxT8NH0ixUOJMqJuk0xvGf0cKsr+N2xibyTat256PI=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241111062829-6de3d96f664f/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241118153429-bf88e9c3d154 h1:8RBRPdeQAqDinQGEk2h85RcOvCneR5m4cZujMFtbj5M=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20241118153429-bf88e9c3d154/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
16 changes: 16 additions & 0 deletions internal/datacoord/broker/coordinator_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
Expand All @@ -39,6 +40,7 @@ type Broker interface {
ShowCollections(ctx context.Context, dbName string) (*milvuspb.ShowCollectionsResponse, error)
ListDatabases(ctx context.Context) (*milvuspb.ListDatabasesResponse, error)
HasCollection(ctx context.Context, collectionID int64) (bool, error)
DescribeDatabase(ctx context.Context, databaseName string) (*rootcoordpb.DescribeDatabaseResponse, error)
}

type coordinatorBroker struct {
Expand Down Expand Up @@ -151,3 +153,17 @@ func (b *coordinatorBroker) HasCollection(ctx context.Context, collectionID int6
}
return err == nil, err
}

func (b *coordinatorBroker) DescribeDatabase(ctx context.Context, databaseName string) (*rootcoordpb.DescribeDatabaseResponse, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()
resp, err := b.rootCoord.DescribeDatabase(ctx, &rootcoordpb.DescribeDatabaseRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_DescribeDatabase)),
DbName: databaseName,
})
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Ctx(ctx).Warn("failed to DescribeDatabase", zap.Error(err))
return nil, err
}
return resp, nil
}
24 changes: 13 additions & 11 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,23 +719,25 @@ func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
collInfo, err := m.h.GetCollection(ctx, ch.GetCollectionID())
if err != nil {
cancel()
return err
}
cancel()
schema := ch.GetSchema()
if schema == nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
collInfo, err := m.h.GetCollection(ctx, ch.GetCollectionID())
if err != nil {
return err
}
schema = collInfo.Schema
}

info := &datapb.ChannelWatchInfo{
Vchan: reduceVChanSize(vcInfo),
StartTs: startTs,
State: inferStateByOpType(op.Type),
Schema: schema,
OpID: opID,
Vchan: reduceVChanSize(vcInfo),
StartTs: startTs,
State: inferStateByOpType(op.Type),
Schema: schema,
OpID: opID,
DbProperties: collInfo.DatabaseProperties,
}
ch.UpdateWatchInfo(info)
}
Expand Down
65 changes: 36 additions & 29 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,16 @@ type segMetricMutation struct {
}

type collectionInfo struct {
ID int64
Schema *schemapb.CollectionSchema
Partitions []int64
StartPositions []*commonpb.KeyDataPair
Properties map[string]string
CreatedAt Timestamp
DatabaseName string
DatabaseID int64
VChannelNames []string
ID int64
Schema *schemapb.CollectionSchema
Partitions []int64
StartPositions []*commonpb.KeyDataPair
Properties map[string]string
CreatedAt Timestamp
DatabaseName string
DatabaseID int64
DatabaseProperties []*commonpb.KeyValuePair
VChannelNames []string
}

// NewMeta creates meta from provided `kv.TxnKV`
Expand Down Expand Up @@ -244,12 +245,16 @@ func (m *meta) reloadCollectionsFromRootcoord(ctx context.Context, broker broker
return err
}
for _, dbName := range resp.GetDbNames() {
resp, err := broker.ShowCollections(ctx, dbName)
dbResp, err := broker.DescribeDatabase(ctx, dbName)
if err != nil {
return err
}
for _, collectionID := range resp.GetCollectionIds() {
resp, err := broker.DescribeCollectionInternal(ctx, collectionID)
collectionsResp, err := broker.ShowCollections(ctx, dbName)
if err != nil {
return err
}
for _, collectionID := range collectionsResp.GetCollectionIds() {
descResp, err := broker.DescribeCollectionInternal(ctx, collectionID)
if err != nil {
return err
}
Expand All @@ -258,15 +263,16 @@ func (m *meta) reloadCollectionsFromRootcoord(ctx context.Context, broker broker
return err
}
collection := &collectionInfo{
ID: collectionID,
Schema: resp.GetSchema(),
Partitions: partitionIDs,
StartPositions: resp.GetStartPositions(),
Properties: funcutil.KeyValuePair2Map(resp.GetProperties()),
CreatedAt: resp.GetCreatedTimestamp(),
DatabaseName: resp.GetDbName(),
DatabaseID: resp.GetDbId(),
VChannelNames: resp.GetVirtualChannelNames(),
ID: collectionID,
Schema: descResp.GetSchema(),
Partitions: partitionIDs,
StartPositions: descResp.GetStartPositions(),
Properties: funcutil.KeyValuePair2Map(descResp.GetProperties()),
CreatedAt: descResp.GetCreatedTimestamp(),
DatabaseName: descResp.GetDbName(),
DatabaseID: descResp.GetDbId(),
DatabaseProperties: dbResp.GetProperties(),
VChannelNames: descResp.GetVirtualChannelNames(),
}
m.AddCollection(collection)
}
Expand Down Expand Up @@ -330,14 +336,15 @@ func (m *meta) GetClonedCollectionInfo(collectionID UniqueID) *collectionInfo {
clonedProperties := make(map[string]string)
maps.Copy(clonedProperties, coll.Properties)
cloneColl := &collectionInfo{
ID: coll.ID,
Schema: proto.Clone(coll.Schema).(*schemapb.CollectionSchema),
Partitions: coll.Partitions,
StartPositions: common.CloneKeyDataPairs(coll.StartPositions),
Properties: clonedProperties,
DatabaseName: coll.DatabaseName,
DatabaseID: coll.DatabaseID,
VChannelNames: coll.VChannelNames,
ID: coll.ID,
Schema: proto.Clone(coll.Schema).(*schemapb.CollectionSchema),
Partitions: coll.Partitions,
StartPositions: common.CloneKeyDataPairs(coll.StartPositions),
Properties: clonedProperties,
DatabaseName: coll.DatabaseName,
DatabaseID: coll.DatabaseID,
DatabaseProperties: common.CloneKeyValuePairs(coll.DatabaseProperties),
VChannelNames: coll.VChannelNames,
}

return cloneColl
Expand Down
23 changes: 14 additions & 9 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1202,22 +1202,27 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i
if err != nil {
return err
}
dbResp, err := s.broker.DescribeDatabase(ctx, resp.GetDbName())
if err != nil {
return err
}

properties := make(map[string]string)
for _, pair := range resp.Properties {
properties[pair.GetKey()] = pair.GetValue()
}

collInfo := &collectionInfo{
ID: resp.CollectionID,
Schema: resp.Schema,
Partitions: partitionIDs,
StartPositions: resp.GetStartPositions(),
Properties: properties,
CreatedAt: resp.GetCreatedTimestamp(),
DatabaseName: resp.GetDbName(),
DatabaseID: resp.GetDbId(),
VChannelNames: resp.GetVirtualChannelNames(),
ID: resp.CollectionID,
Schema: resp.Schema,
Partitions: partitionIDs,
StartPositions: resp.GetStartPositions(),
Properties: properties,
CreatedAt: resp.GetCreatedTimestamp(),
DatabaseName: resp.GetDbName(),
DatabaseID: resp.GetDbId(),
DatabaseProperties: dbResp.GetProperties(),
VChannelNames: resp.GetVirtualChannelNames(),
}
s.meta.AddCollection(collInfo)
return nil
Expand Down
20 changes: 13 additions & 7 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1555,14 +1555,20 @@ func (s *Server) BroadcastAlteredCollection(ctx context.Context, req *datapb.Alt

// cache miss and update cache
if clonedColl == nil {
dbResp, err := s.broker.DescribeDatabase(ctx, req.GetSchema().GetDbName())
if err != nil {
return merr.Status(err), nil
}
collInfo := &collectionInfo{
ID: req.GetCollectionID(),
Schema: req.GetSchema(),
Partitions: req.GetPartitionIDs(),
StartPositions: req.GetStartPositions(),
Properties: properties,
DatabaseID: req.GetDbID(),
VChannelNames: req.GetVChannels(),
ID: req.GetCollectionID(),
Schema: req.GetSchema(),
Partitions: req.GetPartitionIDs(),
StartPositions: req.GetStartPositions(),
Properties: properties,
DatabaseID: req.GetDbID(),
DatabaseName: dbResp.GetDbName(),
DatabaseProperties: dbResp.GetProperties(),
VChannelNames: req.GetVChannels(),
}
s.meta.AddCollection(collInfo)
return merr.Success(), nil
Expand Down
8 changes: 7 additions & 1 deletion internal/flushcommon/pipeline/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,13 @@ func NewDataSyncService(initCtx context.Context, pipelineParams *util.PipelinePa
return nil, err
}

input, err := createNewInputFromDispatcher(initCtx, pipelineParams.DispClient, info.GetVchan().GetChannelName(), info.GetVchan().GetSeekPosition())
input, err := createNewInputFromDispatcher(initCtx,
pipelineParams.DispClient,
info.GetVchan().GetChannelName(),
info.GetVchan().GetSeekPosition(),
info.GetSchema(),
info.GetDbProperties(),
)
if err != nil {
return nil, err
}
Expand Down
31 changes: 28 additions & 3 deletions internal/flushcommon/pipeline/flow_graph_dmstream_input_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (

"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/util/flowgraph"
pkgcommon "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/common"
Expand Down Expand Up @@ -57,11 +60,28 @@ func newDmInputNode(dmNodeConfig *nodeConfig, input <-chan *msgstream.MsgPack) *
return node
}

func createNewInputFromDispatcher(initCtx context.Context, dispatcherClient msgdispatcher.Client, vchannel string, seekPos *msgpb.MsgPosition) (<-chan *msgstream.MsgPack, error) {
func createNewInputFromDispatcher(initCtx context.Context,
dispatcherClient msgdispatcher.Client,
vchannel string,
seekPos *msgpb.MsgPosition,
schema *schemapb.CollectionSchema,
dbProperties []*commonpb.KeyValuePair,
) (<-chan *msgstream.MsgPack, error) {
log := log.With(zap.Int64("nodeID", paramtable.GetNodeID()),
zap.String("vchannel", vchannel))
replicateID, _ := pkgcommon.GetReplicateID(schema.GetProperties())
if replicateID == "" {
replicateID, _ = pkgcommon.GetReplicateID(dbProperties)
}
replicateConfig := msgstream.GetReplicateConfig(replicateID, schema.GetDbName(), schema.GetName())

if seekPos != nil && len(seekPos.MsgID) != 0 {
input, err := dispatcherClient.Register(initCtx, vchannel, seekPos, common.SubscriptionPositionUnknown)
input, err := dispatcherClient.Register(initCtx, &msgdispatcher.StreamConfig{
VChannel: vchannel,
Pos: seekPos,
SubPos: common.SubscriptionPositionUnknown,
ReplicateConfig: replicateConfig,
})
if err != nil {
return nil, err
}
Expand All @@ -71,7 +91,12 @@ func createNewInputFromDispatcher(initCtx context.Context, dispatcherClient msgd
zap.Duration("tsLag", time.Since(tsoutil.PhysicalTime(seekPos.GetTimestamp()))))
return input, err
}
input, err := dispatcherClient.Register(initCtx, vchannel, nil, common.SubscriptionPositionEarliest)
input, err := dispatcherClient.Register(initCtx, &msgdispatcher.StreamConfig{
VChannel: vchannel,
Pos: nil,
SubPos: common.SubscriptionPositionEarliest,
ReplicateConfig: replicateConfig,
})
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func (mtm *mockTtMsgStream) CheckTopicValid(channel string) error {
func (mtm *mockTtMsgStream) EnableProduce(can bool) {
}

func (mtm *mockTtMsgStream) SetReplicateID(replicateID string) {
}

func TestNewDmInputNode(t *testing.T) {
assert.Panics(t, func() {
newDmInputNode(&nodeConfig{
Expand Down
4 changes: 4 additions & 0 deletions internal/metastore/model/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Collection struct {
CollectionID int64
Partitions []*Partition
Name string
DBName string
Description string
AutoID bool
Fields []*Field
Expand All @@ -41,6 +42,7 @@ func (c *Collection) Clone() *Collection {
DBID: c.DBID,
CollectionID: c.CollectionID,
Name: c.Name,
DBName: c.DBName,
Description: c.Description,
AutoID: c.AutoID,
Fields: CloneFields(c.Fields),
Expand Down Expand Up @@ -99,6 +101,7 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection {
CollectionID: coll.ID,
DBID: coll.DbId,
Name: coll.Schema.Name,
DBName: coll.Schema.DbName,
Description: coll.Schema.Description,
AutoID: coll.Schema.AutoID,
Fields: UnmarshalFieldModels(coll.GetSchema().GetFields()),
Expand Down Expand Up @@ -154,6 +157,7 @@ func marshalCollectionModelWithConfig(coll *Collection, c *config) *pb.Collectio
Description: coll.Description,
AutoID: coll.AutoID,
EnableDynamicField: coll.EnableDynamicField,
DbName: coll.DBName,
}

if c.withFields {
Expand Down
1 change: 1 addition & 0 deletions internal/proto/data_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ message ChannelWatchInfo {
// watch progress, deprecated
int32 progress = 6;
int64 opID = 7;
repeated common.KeyValuePair dbProperties = 8;
}

enum CompactionType {
Expand Down
1 change: 1 addition & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ message LoadMetaInfo {
string db_name = 5; // Only used for metrics label.
string resource_group = 6; // Only used for metrics label.
repeated int64 load_fields = 7;
repeated common.KeyValuePair db_properties = 8;
}

message WatchDmChannelsRequest {
Expand Down
Loading

0 comments on commit c3a3c17

Please sign in to comment.