Skip to content

Commit

Permalink
fix: use new path for streamingnode recovery info (#38516)
Browse files Browse the repository at this point in the history
issue: #38399

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Dec 17, 2024
1 parent e19a4f7 commit 8916fbf
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 10 deletions.
5 changes: 5 additions & 0 deletions internal/metastore/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,12 @@ type StreamingCoordCataLog interface {

// StreamingNodeCataLog is the interface for streamingnode catalog
type StreamingNodeCataLog interface {
// WAL select the wal related recovery infos.
// Which must give the pchannel name.

// ListSegmentAssignment list all segment assignments for the wal.
ListSegmentAssignment(ctx context.Context, pChannelName string) ([]*streamingpb.SegmentAssignmentMeta, error)

// SaveSegmentAssignments save the segment assignments for the wal.
SaveSegmentAssignments(ctx context.Context, pChannelName string, infos []*streamingpb.SegmentAssignmentMeta) error
}
7 changes: 4 additions & 3 deletions internal/metastore/kv/streamingnode/constant.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package streamingnode

const (
MetaPrefix = "streamingnode-meta"
SegmentAssignMeta = MetaPrefix + "/segment-assign"
SegmentAssignSubFolder = "s"
MetaPrefix = "streamingnode-meta"

DirectoryWAL = "wal"
DirectorySegmentAssign = "segment-assign"
)
31 changes: 24 additions & 7 deletions internal/metastore/kv/streamingnode/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,22 @@ import (
"github.com/milvus-io/milvus/pkg/util/etcd"
)

// NewCataLog creates a new catalog instance
// NewCataLog creates a new streaming-node catalog instance.
// It's used to persist the recovery info for a streaming node and wal.
// The catalog is shown as following:
// streamingnode-meta
// └── wal
//
// ├── pchannel-1
// │   └── segment-assign
// │   ├── 456398247934
// │   ├── 456398247936
// │   └── 456398247939
// └── pchannel-2
// └── segment-assign
// ├── 456398247934
// ├── 456398247935
// └── 456398247938
func NewCataLog(metaKV kv.MetaKv) metastore.StreamingNodeCataLog {
return &catalog{
metaKV: metaKV,
Expand All @@ -27,6 +42,7 @@ type catalog struct {
metaKV kv.MetaKv
}

// ListSegmentAssignment lists the segment assignment info of the pchannel.
func (c *catalog) ListSegmentAssignment(ctx context.Context, pChannelName string) ([]*streamingpb.SegmentAssignmentMeta, error) {
prefix := buildSegmentAssignmentMetaPath(pChannelName)
keys, values, err := c.metaKV.LoadWithPrefix(ctx, prefix)
Expand Down Expand Up @@ -81,15 +97,16 @@ func (c *catalog) SaveSegmentAssignments(ctx context.Context, pChannelName strin
}

// buildSegmentAssignmentMetaPath builds the path for segment assignment
// streamingnode-meta/segment-assign/${pChannelName}
func buildSegmentAssignmentMetaPath(pChannelName string) string {
// !!! bad implementation here, but we can't make compatibility for underlying meta kv.
// underlying meta kv will remove the last '/' of the path, cause the pchannel lost.
// So we add a special sub path to avoid this.
return path.Join(SegmentAssignMeta, pChannelName, SegmentAssignSubFolder) + "/"
return path.Join(buildWALDirectory(pChannelName), DirectorySegmentAssign) + "/"
}

// buildSegmentAssignmentMetaPathOfSegment builds the path for segment assignment
func buildSegmentAssignmentMetaPathOfSegment(pChannelName string, segmentID int64) string {
return path.Join(SegmentAssignMeta, pChannelName, SegmentAssignSubFolder, strconv.FormatInt(segmentID, 10))
return path.Join(buildWALDirectory(pChannelName), DirectorySegmentAssign, strconv.FormatInt(segmentID, 10))
}

// buildWALDirectory builds the path for wal directory
func buildWALDirectory(pchannelName string) string {
return path.Join(MetaPrefix, DirectoryWAL, pchannelName) + "/"
}
11 changes: 11 additions & 0 deletions internal/metastore/kv/streamingnode/kv_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,14 @@ func TestCatalog(t *testing.T) {
})
assert.NoError(t, err)
}

func TestBuildDirectory(t *testing.T) {
assert.Equal(t, "streamingnode-meta/wal/p1/", buildWALDirectory("p1"))
assert.Equal(t, "streamingnode-meta/wal/p2/", buildWALDirectory("p2"))

assert.Equal(t, "streamingnode-meta/wal/p1/segment-assign/", buildSegmentAssignmentMetaPath("p1"))
assert.Equal(t, "streamingnode-meta/wal/p2/segment-assign/", buildSegmentAssignmentMetaPath("p2"))

assert.Equal(t, "streamingnode-meta/wal/p1/segment-assign/1", buildSegmentAssignmentMetaPathOfSegment("p1", 1))
assert.Equal(t, "streamingnode-meta/wal/p2/segment-assign/2", buildSegmentAssignmentMetaPathOfSegment("p2", 2))
}

0 comments on commit 8916fbf

Please sign in to comment.