Skip to content

Commit

Permalink
Merge branch '2.4' of https://github.com/milvus-io/milvus into 2411-c…
Browse files Browse the repository at this point in the history
…heck-general-2.4
  • Loading branch information
bigsheeper committed Dec 3, 2024
2 parents 03b740d + 03f01d8 commit fd44d61
Show file tree
Hide file tree
Showing 43 changed files with 589 additions and 426 deletions.
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ dataCoord:
# level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions.
# mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions.
taskPrioritizer: default
taskQueueCapacity: 256 # compaction task queue size
taskQueueCapacity: 100000 # compaction task queue size
rpcTimeout: 10
maxParallelTaskNum: 10
mix:
Expand Down
31 changes: 28 additions & 3 deletions internal/core/src/common/File.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@

#pragma once

#include <cstdio>
#include <string>
#include "common/EasyAssert.h"
#include "common/Types.h"
#include "fmt/core.h"
#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>

namespace milvus {
Expand All @@ -27,8 +29,8 @@ class File {
file.fd_ = -1;
}
~File() {
if (fd_ >= 0) {
close(fd_);
if (fs_ != nullptr) {
fclose(fs_);
}
}

Expand Down Expand Up @@ -63,22 +65,45 @@ class File {
return write(fd_, &value, sizeof(value));
}

ssize_t
FWrite(const void* buf, size_t size) {
return fwrite(buf, sizeof(char), size, fs_);
}

template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
ssize_t
FWriteInt(T value) {
return fwrite(&value, 1, sizeof(value), fs_);
}

int
FFlush() {
return fflush(fs_);
}

offset_t
Seek(offset_t offset, int whence) {
return lseek(fd_, offset, whence);
}

void
Close() {
close(fd_);
fclose(fs_);
fs_ = nullptr;
fd_ = -1;
}

private:
explicit File(int fd, const std::string& filepath)
: fd_(fd), filepath_(filepath) {
fs_ = fdopen(fd_, "wb+");
AssertInfo(fs_ != nullptr,
"failed to open file {}: {}",
filepath,
strerror(errno));
}
int fd_{-1};
FILE* fs_;
std::string filepath_;
};
} // namespace milvus
17 changes: 9 additions & 8 deletions internal/core/src/mmap/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ WriteFieldData(File& file,
auto str =
static_cast<const std::string*>(data->RawValue(i));
ssize_t written_data_size =
file.WriteInt<uint32_t>(uint32_t(str->size()));
file.FWriteInt<uint32_t>(uint32_t(str->size()));
if (written_data_size != sizeof(uint32_t)) {
THROW_FILE_WRITE_ERROR
}
total_written += written_data_size;
auto written_data = file.Write(str->data(), str->size());
auto written_data = file.FWrite(str->data(), str->size());
if (written_data < str->size()) {
THROW_FILE_WRITE_ERROR
}
Expand All @@ -120,14 +120,14 @@ WriteFieldData(File& file,
indices.push_back(total_written);
auto padded_string =
static_cast<const Json*>(data->RawValue(i))->data();
ssize_t written_data_size =
file.WriteInt<uint32_t>(uint32_t(padded_string.size()));
ssize_t written_data_size = file.FWriteInt<uint32_t>(
uint32_t(padded_string.size()));
if (written_data_size != sizeof(uint32_t)) {
THROW_FILE_WRITE_ERROR
}
total_written += written_data_size;
ssize_t written_data =
file.Write(padded_string.data(), padded_string.size());
file.FWrite(padded_string.data(), padded_string.size());
if (written_data < padded_string.size()) {
THROW_FILE_WRITE_ERROR
}
Expand All @@ -141,7 +141,7 @@ WriteFieldData(File& file,
indices.push_back(total_written);
auto array = static_cast<const Array*>(data->RawValue(i));
ssize_t written =
file.Write(array->data(), array->byte_size());
file.FWrite(array->data(), array->byte_size());
if (written < array->byte_size()) {
THROW_FILE_WRITE_ERROR
}
Expand All @@ -157,7 +157,7 @@ WriteFieldData(File& file,
static_cast<const knowhere::sparse::SparseRow<float>*>(
data->RawValue(i));
ssize_t written =
file.Write(vec->data(), vec->data_byte_size());
file.FWrite(vec->data(), vec->data_byte_size());
if (written < vec->data_byte_size()) {
break;
}
Expand All @@ -172,7 +172,7 @@ WriteFieldData(File& file,
}
} else {
// write as: data|data|data|data|data|data......
size_t written = file.Write(data->Data(), data->Size());
size_t written = file.FWrite(data->Data(), data->Size());
if (written < data->Size()) {
THROW_FILE_WRITE_ERROR
}
Expand All @@ -181,5 +181,6 @@ WriteFieldData(File& file,
total_written += data->Size(i);
}
}
file.FFlush();
}
} // namespace milvus
30 changes: 24 additions & 6 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,30 @@ func (c *compactionPlanHandler) loadMeta() {
zap.String("state", task.GetState().String()))
continue
} else {
// TODO: how to deal with the create failed tasks, leave it in meta forever?
t, err := c.createCompactTask(task)
if err != nil {
log.Warn("compactionPlanHandler loadMeta create compactionTask failed",
log.Info("compactionPlanHandler loadMeta create compactionTask failed, try to clean it",
zap.Int64("planID", task.GetPlanID()),
zap.String("type", task.GetType().String()),
zap.String("state", task.GetState().String()),
zap.Error(err),
)
// ignore the drop error
c.meta.DropCompactionTask(task)
continue
}
if t.NeedReAssignNodeID() {
c.submitTask(t)
if err = c.submitTask(t); err != nil {
log.Info("compactionPlanHandler loadMeta submit task failed, try to clean it",
zap.Int64("planID", task.GetPlanID()),
zap.String("type", task.GetType().String()),
zap.String("state", task.GetState().String()),
zap.Error(err),
)
// ignore the drop error
c.meta.DropCompactionTask(task)
continue
}
log.Info("compactionPlanHandler loadMeta submitTask",
zap.Int64("planID", t.GetPlanID()),
zap.Int64("triggerID", t.GetTriggerID()),
Expand Down Expand Up @@ -537,11 +548,14 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
c.executingGuard.Unlock()
}

func (c *compactionPlanHandler) submitTask(t CompactionTask) {
func (c *compactionPlanHandler) submitTask(t CompactionTask) error {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
t.SetSpan(span)
c.queueTasks.Enqueue(t)
if err := c.queueTasks.Enqueue(t); err != nil {
return err
}
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Inc()
return nil
}

// restoreTask used to restore Task from etcd
Expand Down Expand Up @@ -592,7 +606,11 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
log.Warn("Failed to enqueue compaction task, unable to save task meta", zap.Error(err))
return err
}
c.submitTask(t)
if err = c.submitTask(t); err != nil {
log.Warn("submit compaction task failed", zap.Error(err))
c.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
return err
}
log.Info("Compaction plan submitted")
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,16 @@ func (t *clusteringCompactionTask) processExecuting() error {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
if err != nil || result == nil {
log.Info("processExecuting clustering compaction", zap.Bool("result nil", result == nil), zap.Error(err))
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("GetCompactionPlanResult fail", zap.Error(err))
// setNodeID(NullNodeID) to trigger reassign node ID
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
}
return err
}
log.Info("compaction result", zap.Any("result", result.String()))
log.Debug("compaction result", zap.String("result state", result.GetState().String()),
zap.Int("result segments num", len(result.GetSegments())), zap.Int("result string length", len(result.String())))
switch result.GetState() {
case datapb.CompactionTaskState_completed:
t.result = result
Expand Down Expand Up @@ -504,6 +506,7 @@ func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskO
return merr.WrapErrClusteringCompactionMetaError("updateAndSaveTaskMeta", err) // retryable
}
t.CompactionTask = task
log.Info("updateAndSaveTaskMeta success", zap.String("task state", t.CompactionTask.State.String()))
return nil
}

Expand Down
47 changes: 47 additions & 0 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -709,6 +710,52 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
s.Equal(1, info.failedCnt)
}

func (s *CompactionPlanHandlerSuite) TestCompactionQueueFull() {
s.SetupTest()
paramtable.Get().Save("dataCoord.compaction.taskQueueCapacity", "1")
defer paramtable.Get().Reset("dataCoord.compaction.taskQueueCapacity")

s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil)

t1 := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
TriggerID: 1,
PlanID: 1,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
State: datapb.CompactionTaskState_executing,
},
meta: s.mockMeta,
sessions: s.mockSessMgr,
}
t1.plan = &datapb.CompactionPlan{
PlanID: 1,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
}

s.NoError(s.handler.submitTask(t1))

t2 := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
TriggerID: 1,
PlanID: 2,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
State: datapb.CompactionTaskState_completed,
},
meta: s.mockMeta,
sessions: s.mockSessMgr,
}
t2.plan = &datapb.CompactionPlan{
PlanID: 2,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
}

s.Error(s.handler.submitTask(t2))
}

func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
s.SetupTest()
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Maybe()
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,7 @@ func (m *indexMeta) SetStoredIndexFileSizeMetric(collections map[UniqueID]*colle
coll, ok := collections[segmentIdx.CollectionID]
if ok {
metrics.DataCoordStoredIndexFilesSize.WithLabelValues(coll.DatabaseName, coll.Schema.GetName(),
fmt.Sprint(segmentIdx.CollectionID)).Set(float64(segmentIdx.IndexSize))
fmt.Sprint(segmentIdx.CollectionID)).Add(float64(segmentIdx.IndexSize))
total += segmentIdx.IndexSize
}
}
Expand Down
21 changes: 20 additions & 1 deletion internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,22 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
return m.getNumRowsOfCollectionUnsafe(collectionID)
}

func getBinlogFileCount(s *datapb.SegmentInfo) int {
statsFieldFn := func(fieldBinlogs []*datapb.FieldBinlog) int {
cnt := 0
for _, fbs := range fieldBinlogs {
cnt += len(fbs.Binlogs)
}
return cnt
}

cnt := 0
cnt += statsFieldFn(s.GetBinlogs())
cnt += statsFieldFn(s.GetStatslogs())
cnt += statsFieldFn(s.GetDeltalogs())
return cnt
}

func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
info := &metricsinfo.DataCoordQuotaMetrics{}
m.RLock()
Expand All @@ -400,6 +416,7 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
segments := m.segments.GetSegments()
var total int64
metrics.DataCoordStoredBinlogSize.Reset()
metrics.DataCoordSegmentBinLogFileCount.Reset()
for _, segment := range segments {
segmentSize := segment.getSegmentSize()
if isSegmentHealthy(segment) && !segment.GetIsImporting() {
Expand All @@ -416,7 +433,9 @@ func (m *meta) GetQuotaInfo() *metricsinfo.DataCoordQuotaMetrics {
coll, ok := m.collections[segment.GetCollectionID()]
if ok {
metrics.DataCoordStoredBinlogSize.WithLabelValues(coll.DatabaseName,
fmt.Sprint(segment.GetCollectionID()), segment.GetState().String()).Set(float64(segmentSize))
fmt.Sprint(segment.GetCollectionID()), segment.GetState().String()).Add(float64(segmentSize))
metrics.DataCoordSegmentBinLogFileCount.WithLabelValues(
fmt.Sprint(segment.GetCollectionID())).Add(float64(getBinlogFileCount(segment.SegmentInfo)))
} else {
log.Warn("not found database name", zap.Int64("collectionID", segment.GetCollectionID()))
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ func (c *SessionManagerImpl) SyncSegments(ctx context.Context, nodeID int64, req
zap.Int64("planID", req.GetPlanID()),
)

ctx, cancel := context.WithTimeout(context.Background(), Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second))
cli, err := c.getClient(ctx, nodeID)
childCtx, cancel := context.WithTimeout(context.Background(), Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second))
cli, err := c.getClient(childCtx, nodeID)
cancel()
if err != nil {
log.Warn("failed to get client", zap.Error(err))
Expand Down
13 changes: 6 additions & 7 deletions internal/datanode/broker/datacoord.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,10 @@ func (dc *dataCoordBroker) ReportTimeTick(ctx context.Context, msgs []*msgpb.Dat
}

func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, ids []int64) ([]*datapb.SegmentInfo, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()

log := log.Ctx(ctx).With(zap.Int64s("segments", ids))

getSegmentInfo := func(ids []int64) (*datapb.GetSegmentInfoResponse, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().DataCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()

infoResp, err := dc.client.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_SegmentInfo),
Expand All @@ -80,12 +78,12 @@ func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, ids []int64) ([]*
IncludeUnHealthy: true,
})
if err := merr.CheckRPCCall(infoResp, err); err != nil {
log.Warn("Fail to get SegmentInfo by ids from datacoord", zap.Error(err))
log.Warn("fail to get SegmentInfo by ids from datacoord", zap.Int64s("segmentIDs", ids), zap.Error(err))
return nil, err
}
err = binlog.DecompressMultiBinLogs(infoResp.GetInfos())
if err != nil {
log.Warn("Fail to DecompressMultiBinLogs", zap.Error(err))
log.Warn("fail to DecompressMultiBinLogs", zap.Int64s("segmentIDs", ids), zap.Error(err))
return nil, err
}
return infoResp, nil
Expand All @@ -99,6 +97,7 @@ func (dc *dataCoordBroker) GetSegmentInfo(ctx context.Context, ids []int64) ([]*

resp, err := getSegmentInfo(ids[startIdx:endIdx])
if err != nil {
log.Warn("fail to get SegmentInfo", zap.Int("total segment num", len(ids)), zap.Int("returned num", startIdx))
return nil, err
}
ret = append(ret, resp.GetInfos()...)
Expand Down
Loading

0 comments on commit fd44d61

Please sign in to comment.