diff --git a/internal/metastore/catalog.go b/internal/metastore/catalog.go index e036d0d0fcfd1..4598dea4bf678 100644 --- a/internal/metastore/catalog.go +++ b/internal/metastore/catalog.go @@ -214,7 +214,12 @@ type StreamingCoordCataLog interface { // StreamingNodeCataLog is the interface for streamingnode catalog type StreamingNodeCataLog interface { + // WAL select the wal related recovery infos. + // Which must give the pchannel name. + + // ListSegmentAssignment list all segment assignments for the wal. ListSegmentAssignment(ctx context.Context, pChannelName string) ([]*streamingpb.SegmentAssignmentMeta, error) + // SaveSegmentAssignments save the segment assignments for the wal. SaveSegmentAssignments(ctx context.Context, pChannelName string, infos []*streamingpb.SegmentAssignmentMeta) error } diff --git a/internal/metastore/kv/streamingnode/constant.go b/internal/metastore/kv/streamingnode/constant.go index d1bf796f286c4..1c83ba17432b2 100644 --- a/internal/metastore/kv/streamingnode/constant.go +++ b/internal/metastore/kv/streamingnode/constant.go @@ -1,7 +1,8 @@ package streamingnode const ( - MetaPrefix = "streamingnode-meta" - SegmentAssignMeta = MetaPrefix + "/segment-assign" - SegmentAssignSubFolder = "s" + MetaPrefix = "streamingnode-meta" + + DirectoryWAL = "wal" + DirectorySegmentAssign = "segment-assign" ) diff --git a/internal/metastore/kv/streamingnode/kv_catalog.go b/internal/metastore/kv/streamingnode/kv_catalog.go index 2d279ab6ea6ca..2e474738cfdef 100644 --- a/internal/metastore/kv/streamingnode/kv_catalog.go +++ b/internal/metastore/kv/streamingnode/kv_catalog.go @@ -15,7 +15,22 @@ import ( "github.com/milvus-io/milvus/pkg/util/etcd" ) -// NewCataLog creates a new catalog instance +// NewCataLog creates a new streaming-node catalog instance. +// It's used to persist the recovery info for a streaming node and wal. +// The catalog is shown as following: +// streamingnode-meta +// └── wal +// +// ├── pchannel-1 +// │   └── segment-assign +// │   ├── 456398247934 +// │   ├── 456398247936 +// │   └── 456398247939 +// └── pchannel-2 +// └── segment-assign +// ├── 456398247934 +// ├── 456398247935 +// └── 456398247938 func NewCataLog(metaKV kv.MetaKv) metastore.StreamingNodeCataLog { return &catalog{ metaKV: metaKV, @@ -27,6 +42,7 @@ type catalog struct { metaKV kv.MetaKv } +// ListSegmentAssignment lists the segment assignment info of the pchannel. func (c *catalog) ListSegmentAssignment(ctx context.Context, pChannelName string) ([]*streamingpb.SegmentAssignmentMeta, error) { prefix := buildSegmentAssignmentMetaPath(pChannelName) keys, values, err := c.metaKV.LoadWithPrefix(ctx, prefix) @@ -81,15 +97,16 @@ func (c *catalog) SaveSegmentAssignments(ctx context.Context, pChannelName strin } // buildSegmentAssignmentMetaPath builds the path for segment assignment -// streamingnode-meta/segment-assign/${pChannelName} func buildSegmentAssignmentMetaPath(pChannelName string) string { - // !!! bad implementation here, but we can't make compatibility for underlying meta kv. - // underlying meta kv will remove the last '/' of the path, cause the pchannel lost. - // So we add a special sub path to avoid this. - return path.Join(SegmentAssignMeta, pChannelName, SegmentAssignSubFolder) + "/" + return path.Join(buildWALDirectory(pChannelName), DirectorySegmentAssign) + "/" } // buildSegmentAssignmentMetaPathOfSegment builds the path for segment assignment func buildSegmentAssignmentMetaPathOfSegment(pChannelName string, segmentID int64) string { - return path.Join(SegmentAssignMeta, pChannelName, SegmentAssignSubFolder, strconv.FormatInt(segmentID, 10)) + return path.Join(buildWALDirectory(pChannelName), DirectorySegmentAssign, strconv.FormatInt(segmentID, 10)) +} + +// buildWALDirectory builds the path for wal directory +func buildWALDirectory(pchannelName string) string { + return path.Join(MetaPrefix, DirectoryWAL, pchannelName) + "/" } diff --git a/internal/metastore/kv/streamingnode/kv_catalog_test.go b/internal/metastore/kv/streamingnode/kv_catalog_test.go index 35e8769664f57..4ebdf5a09ca9b 100644 --- a/internal/metastore/kv/streamingnode/kv_catalog_test.go +++ b/internal/metastore/kv/streamingnode/kv_catalog_test.go @@ -41,3 +41,14 @@ func TestCatalog(t *testing.T) { }) assert.NoError(t, err) } + +func TestBuildDirectory(t *testing.T) { + assert.Equal(t, "streamingnode-meta/wal/p1/", buildWALDirectory("p1")) + assert.Equal(t, "streamingnode-meta/wal/p2/", buildWALDirectory("p2")) + + assert.Equal(t, "streamingnode-meta/wal/p1/segment-assign/", buildSegmentAssignmentMetaPath("p1")) + assert.Equal(t, "streamingnode-meta/wal/p2/segment-assign/", buildSegmentAssignmentMetaPath("p2")) + + assert.Equal(t, "streamingnode-meta/wal/p1/segment-assign/1", buildSegmentAssignmentMetaPathOfSegment("p1", 1)) + assert.Equal(t, "streamingnode-meta/wal/p2/segment-assign/2", buildSegmentAssignmentMetaPathOfSegment("p2", 2)) +}