Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
Browse files Browse the repository at this point in the history
…2-json-reader
  • Loading branch information
bigsheeper committed Dec 16, 2023
2 parents b5a0588 + 88b4b8b commit aaa146d
Show file tree
Hide file tree
Showing 55 changed files with 1,856 additions and 294 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/code-checker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jobs:
steps:
- name: Maximize build space
uses: easimon/maximize-build-space@master
if: ${{ ! startsWith(runner.name, 'self') }} # skip this step if it is self-hosted runner
with:
root-reserve-mb: 20480
swap-size-mb: 1024
Expand Down Expand Up @@ -88,6 +89,7 @@ jobs:
steps:
- name: Maximize build space
uses: easimon/maximize-build-space@master
if: ${{ ! startsWith(runner.name, 'self') }} # skip this step if it is self-hosted runner
with:
root-reserve-mb: 20480
swap-size-mb: 1024
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ jobs:

- name: Maximize build space
uses: easimon/maximize-build-space@master
if: ${{ ! startsWith(runner.name, 'self') }} # skip this step if it is self-hosted runner
with:
root-reserve-mb: 20480
swap-size-mb: 1024
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/publish-builder.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ jobs:
steps:
- name: Maximize build space
uses: easimon/maximize-build-space@master
if: ${{ ! startsWith(runner.name, 'self') }} # skip this step if it is self-hosted runner
with:
root-reserve-mb: 20480
# overprovision-lvm: 'true'
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/publish-gpu-builder.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ jobs:
steps:
- name: Maximize build space
uses: easimon/maximize-build-space@master
if: ${{ ! startsWith(runner.name, 'self') }} # skip this step if it is self-hosted runner
with:
root-reserve-mb: 20480
# overprovision-lvm: 'true'
Expand Down
2 changes: 1 addition & 1 deletion ci/jenkins/Nightly.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ String cron_string = BRANCH_NAME == "master" ? "50 1 * * * " : ""
// Make timeout 4 hours so that we can run two nightly during the ci
int total_timeout_minutes = 7 * 60
def imageTag=''
def chart_version='4.0.6'
def chart_version='4.1.10'
pipeline {
triggers {
cron """${cron_timezone}
Expand Down
1 change: 0 additions & 1 deletion internal/core/src/common/RangeSearchHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ ReGenRangeSearchResult(DatasetPtr data_set,
}

// The subscript of p_id and p_dist
#pragma omp parallel for
for (int i = 0; i < nq; i++) {
std::priority_queue<ResultPair, std::vector<ResultPair>, decltype(cmp)>
pq(cmp);
Expand Down
74 changes: 70 additions & 4 deletions internal/datacoord/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/minio/minio-go/v7"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand All @@ -56,10 +58,17 @@ type garbageCollector struct {
meta *meta
handler Handler

startOnce sync.Once
stopOnce sync.Once
wg sync.WaitGroup
closeCh chan struct{}
startOnce sync.Once
stopOnce sync.Once
wg sync.WaitGroup
closeCh chan struct{}
cmdCh chan gcCmd
pauseUntil atomic.Time
}
type gcCmd struct {
cmdType datapb.GcCommand
duration time.Duration
done chan struct{}
}

// newGarbageCollector create garbage collector with meta and option
Expand All @@ -71,6 +80,7 @@ func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageColl
handler: handler,
option: opt,
closeCh: make(chan struct{}),
cmdCh: make(chan gcCmd),
}
}

Expand All @@ -88,6 +98,43 @@ func (gc *garbageCollector) start() {
}
}

func (gc *garbageCollector) Pause(ctx context.Context, pauseDuration time.Duration) error {
if !gc.option.enabled {
log.Info("garbage collection not enabled")
return nil
}
done := make(chan struct{})
select {
case gc.cmdCh <- gcCmd{
cmdType: datapb.GcCommand_Pause,
duration: pauseDuration,
done: done,
}:
<-done
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (gc *garbageCollector) Resume(ctx context.Context) error {
if !gc.option.enabled {
log.Warn("garbage collection not enabled, cannot resume")
return merr.WrapErrServiceUnavailable("garbage collection not enabled")
}
done := make(chan struct{})
select {
case gc.cmdCh <- gcCmd{
cmdType: datapb.GcCommand_Resume,
done: done,
}:
<-done
return nil
case <-ctx.Done():
return ctx.Err()
}
}

// work contains actual looping check logic
func (gc *garbageCollector) work() {
defer gc.wg.Done()
Expand All @@ -96,11 +143,30 @@ func (gc *garbageCollector) work() {
for {
select {
case <-ticker.C:
if time.Now().Before(gc.pauseUntil.Load()) {
log.Info("garbage collector paused", zap.Time("until", gc.pauseUntil.Load()))
continue
}
gc.clearEtcd()
gc.recycleUnusedIndexes()
gc.recycleUnusedSegIndexes()
gc.scan()
gc.recycleUnusedIndexFiles()
case cmd := <-gc.cmdCh:
switch cmd.cmdType {
case datapb.GcCommand_Pause:
pauseUntil := time.Now().Add(cmd.duration)
if pauseUntil.After(gc.pauseUntil.Load()) {
log.Info("garbage collection paused", zap.Duration("duration", cmd.duration), zap.Time("pauseUntil", pauseUntil))
gc.pauseUntil.Store(pauseUntil)
} else {
log.Info("new pause until before current value", zap.Duration("duration", cmd.duration), zap.Time("pauseUntil", pauseUntil), zap.Time("oldPauseUntil", gc.pauseUntil.Load()))
}
case datapb.GcCommand_Resume:
// reset to zero value
gc.pauseUntil.Store(time.Time{})
}
close(cmd.done)
case <-gc.closeCh:
log.Warn("garbage collector quit")
return
Expand Down
132 changes: 132 additions & 0 deletions internal/datacoord/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
Expand Down Expand Up @@ -1134,3 +1135,134 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
segB = gc.meta.GetSegment(segID + 1)
assert.Nil(t, segB)
}

type GarbageCollectorSuite struct {
suite.Suite

bucketName string
rootPath string

cli *storage.MinioChunkManager
inserts []string
stats []string
delta []string
others []string

meta *meta
}

func (s *GarbageCollectorSuite) SetupTest() {
s.bucketName = `datacoord-ut` + strings.ToLower(funcutil.RandomString(8))
s.rootPath = `gc` + funcutil.RandomString(8)

var err error
s.cli, s.inserts, s.stats, s.delta, s.others, err = initUtOSSEnv(s.bucketName, s.rootPath, 4)
s.Require().NoError(err)

s.meta, err = newMemoryMeta()
s.Require().NoError(err)
}

func (s *GarbageCollectorSuite) TearDownTest() {
cleanupOSS(s.cli.Client, s.bucketName, s.rootPath)
}

func (s *GarbageCollectorSuite) TestPauseResume() {
s.Run("not_enabled", func() {
gc := newGarbageCollector(s.meta, newMockHandler(), GcOption{
cli: s.cli,
enabled: false,
checkInterval: time.Millisecond * 10,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
})

gc.start()
defer gc.close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := gc.Pause(ctx, time.Second)
s.NoError(err)

err = gc.Resume(ctx)
s.Error(err)
})

s.Run("pause_then_resume", func() {
gc := newGarbageCollector(s.meta, newMockHandler(), GcOption{
cli: s.cli,
enabled: true,
checkInterval: time.Millisecond * 10,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
})

gc.start()
defer gc.close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := gc.Pause(ctx, time.Minute)
s.NoError(err)

s.NotZero(gc.pauseUntil.Load())

err = gc.Resume(ctx)
s.NoError(err)

s.Zero(gc.pauseUntil.Load())
})

s.Run("pause_before_until", func() {
gc := newGarbageCollector(s.meta, newMockHandler(), GcOption{
cli: s.cli,
enabled: true,
checkInterval: time.Millisecond * 10,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
})

gc.start()
defer gc.close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := gc.Pause(ctx, time.Minute)
s.NoError(err)

until := gc.pauseUntil.Load()
s.NotZero(until)

err = gc.Pause(ctx, time.Second)
s.NoError(err)

second := gc.pauseUntil.Load()

s.Equal(until, second)
})

s.Run("pause_resume_timeout", func() {
gc := newGarbageCollector(s.meta, newMockHandler(), GcOption{
cli: s.cli,
enabled: true,
checkInterval: time.Millisecond * 10,
missingTolerance: time.Hour * 24,
dropTolerance: time.Hour * 24,
})

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
err := gc.Pause(ctx, time.Minute)
s.Error(err)

s.Zero(gc.pauseUntil.Load())

err = gc.Resume(ctx)
s.Error(err)

s.Zero(gc.pauseUntil.Load())
})
}

func TestGarbageCollector(t *testing.T) {
suite.Run(t, new(GarbageCollectorSuite))
}
2 changes: 2 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,13 +1041,15 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan,
CreatedByCompaction: true,
CompactionFrom: compactionFrom,
LastExpireTime: plan.GetStartTime(),
Level: datapb.SegmentLevel_L1,
}
segment := NewSegmentInfo(segmentInfo)
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
log.Info("meta update: prepare for complete compaction mutation - complete",
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("partitionID", segment.GetPartitionID()),
zap.Int64("new segment ID", segment.GetID()),
zap.String("new segment level", segment.GetLevel().String()),
zap.Int64("new segment num of rows", segment.GetNumOfRows()),
zap.Any("compacted from", segment.GetCompactionFrom()))

Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,8 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, afterCompact)
assert.NotNil(t, newSegment)
assert.Equal(t, 3, len(metricMutation.stateChange[datapb.SegmentLevel_Legacy.String()]))
assert.Equal(t, 2, len(metricMutation.stateChange[datapb.SegmentLevel_Legacy.String()]))
assert.Equal(t, 1, len(metricMutation.stateChange[datapb.SegmentLevel_L1.String()]))
assert.Equal(t, int64(0), metricMutation.rowCountChange)
assert.Equal(t, int64(2), metricMutation.rowCountAccChange)

Expand Down
38 changes: 38 additions & 0 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,44 @@ func (s *Server) startDataCoord() {
s.compactionViewManager.Start()
}
s.startServerLoop()

// http.Register(&http.Handler{
// Path: "/datacoord/garbage_collection/pause",
// HandlerFunc: func(w http.ResponseWriter, req *http.Request) {
// pauseSeconds := req.URL.Query().Get("pause_seconds")
// seconds, err := strconv.ParseInt(pauseSeconds, 10, 64)
// if err != nil {
// w.WriteHeader(400)
// w.Write([]byte(fmt.Sprintf(`{"msg": "invalid pause seconds(%v)"}`, pauseSeconds)))
// return
// }

// err = s.garbageCollector.Pause(req.Context(), time.Duration(seconds)*time.Second)
// if err != nil {
// w.WriteHeader(500)
// w.Write([]byte(fmt.Sprintf(`{"msg": "failed to pause garbage collection, %s"}`, err.Error())))
// return
// }
// w.WriteHeader(200)
// w.Write([]byte(`{"msg": "OK"}`))
// return
// },
// })
// http.Register(&http.Handler{
// Path: "/datacoord/garbage_collection/resume",
// HandlerFunc: func(w http.ResponseWriter, req *http.Request) {
// err := s.garbageCollector.Resume(req.Context())
// if err != nil {
// w.WriteHeader(500)
// w.Write([]byte(fmt.Sprintf(`{"msg": "failed to pause garbage collection, %s"}`, err.Error())))
// return
// }
// w.WriteHeader(200)
// w.Write([]byte(`{"msg": "OK"}`))
// return
// },
// })

s.afterStart()
s.stateCode.Store(commonpb.StateCode_Healthy)
sessionutil.SaveServerInfo(typeutil.DataCoordRole, s.session.GetServerID())
Expand Down
Loading

0 comments on commit aaa146d

Please sign in to comment.