Skip to content

Commit

Permalink
feat: Add levelzero compaction in DN (#28470)
Browse files Browse the repository at this point in the history
See also: #27606

---------

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Nov 30, 2023
1 parent d694405 commit aae7e62
Show file tree
Hide file tree
Showing 14 changed files with 1,215 additions and 37 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/datanode/io --output=$(PWD)/internal/datanode/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage

generate-mockery-metastore: getdeps
$(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks
Expand Down
24 changes: 10 additions & 14 deletions internal/datanode/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type iterator = storage.Iterator

type compactor interface {
complete()
// compact() (*datapb.CompactionResult, error)
compact() (*datapb.CompactionPlanResult, error)
injectDone()
stop()
Expand All @@ -79,9 +78,8 @@ type compactionTask struct {
ctx context.Context
cancel context.CancelFunc

done chan struct{}
tr *timerecord.TimeRecorder
chunkManager storage.ChunkManager
done chan struct{}
tr *timerecord.TimeRecorder
}

func newCompactionTask(
Expand All @@ -92,22 +90,20 @@ func newCompactionTask(
syncMgr syncmgr.SyncManager,
alloc allocator.Allocator,
plan *datapb.CompactionPlan,
chunkManager storage.ChunkManager,
) *compactionTask {
ctx1, cancel := context.WithCancel(ctx)
return &compactionTask{
ctx: ctx1,
cancel: cancel,

downloader: dl,
uploader: ul,
syncMgr: syncMgr,
metaCache: metaCache,
Allocator: alloc,
plan: plan,
tr: timerecord.NewTimeRecorder("compactionTask"),
chunkManager: chunkManager,
done: make(chan struct{}, 1),
downloader: dl,
uploader: ul,
syncMgr: syncMgr,
metaCache: metaCache,
Allocator: alloc,
plan: plan,
tr: timerecord.NewTimeRecorder("levelone compaction"),
done: make(chan struct{}, 1),
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
Channel: "channelname",
}

task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan, nil)
task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan)
result, err := task.compact()
assert.NoError(t, err)
assert.NotNil(t, result)
Expand Down Expand Up @@ -1103,7 +1103,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
Channel: "channelname",
}

task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan, nil)
task := newCompactionTask(context.TODO(), mockbIO, mockbIO, metaCache, syncMgr, alloc, plan)
result, err := task.compact()
assert.NoError(t, err)
assert.NotNil(t, result)
Expand Down
80 changes: 80 additions & 0 deletions internal/datanode/io/binlog_io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io

import (
"context"
"path"

"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/retry"
)

type BinlogIO interface {
Download(ctx context.Context, paths []string) ([][]byte, error)
Upload(ctx context.Context, kvs map[string][]byte) error
// JoinFullPath returns the full path by join the paths with the chunkmanager's rootpath
JoinFullPath(paths ...string) string
}

type BinlogIoImpl struct {
storage.ChunkManager
pool *conc.Pool[any]
}

func NewBinlogIO(cm storage.ChunkManager, ioPool *conc.Pool[any]) BinlogIO {
return &BinlogIoImpl{cm, ioPool}
}

func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, error) {
future := b.pool.Submit(func() (any, error) {
var vs [][]byte
var err error

err = retry.Do(ctx, func() error {
vs, err = b.MultiRead(ctx, paths)
return err
})

return vs, err
})

vs, err := future.Await()
if err != nil {
return nil, err
}

return vs.([][]byte), nil
}

func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error {
future := b.pool.Submit(func() (any, error) {
err := retry.Do(ctx, func() error {
return b.MultiWrite(ctx, kvs)
})

return nil, err
})

_, err := future.Await()
return err
}

func (b *BinlogIoImpl) JoinFullPath(paths ...string) string {
return path.Join(b.ChunkManager.RootPath(), path.Join(paths...))
}
73 changes: 73 additions & 0 deletions internal/datanode/io/binlog_io_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io

import (
"path"
"testing"

"github.com/samber/lo"
"github.com/stretchr/testify/suite"
"golang.org/x/net/context"

"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/conc"
)

const binlogIOTestDir = "/tmp/milvus_test/binlog_io"

func TestBinlogIO(t *testing.T) {
suite.Run(t, new(BinlogIOSuite))
}

type BinlogIOSuite struct {
suite.Suite

cm storage.ChunkManager
b BinlogIO
}

func (s *BinlogIOSuite) SetupTest() {
pool := conc.NewDefaultPool[any]()

s.cm = storage.NewLocalChunkManager(storage.RootPath(binlogIOTestDir))

s.b = NewBinlogIO(s.cm, pool)
}

func (s *BinlogIOSuite) TeardownTest() {
ctx := context.Background()
s.cm.RemoveWithPrefix(ctx, s.cm.RootPath())
}

func (s *BinlogIOSuite) TestUploadDownload() {
kvs := map[string][]byte{
path.Join(binlogIOTestDir, "a/b/c"): {1, 255, 255},
path.Join(binlogIOTestDir, "a/b/d"): {1, 255, 255},
}

ctx := context.Background()
err := s.b.Upload(ctx, kvs)
s.NoError(err)

vs, err := s.b.Download(ctx, lo.Keys(kvs))
s.NoError(err)
s.ElementsMatch(lo.Values(kvs), vs)
}

func (s *BinlogIOSuite) TestJoinFullPath() {
tests := []struct {
description string
inPaths []string
outPath string
}{
{"no input", nil, path.Join(binlogIOTestDir)},
{"input one", []string{"a"}, path.Join(binlogIOTestDir, "a")},
{"input two", []string{"a", "b"}, path.Join(binlogIOTestDir, "a/b")},
}

for _, test := range tests {
s.Run(test.description, func() {
out := s.b.JoinFullPath(test.inPaths...)
s.Equal(test.outPath, out)
})
}
}
Loading

0 comments on commit aae7e62

Please sign in to comment.