Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [2.4] Revert "enhance: Support db for bulkinsert (#37012) (#37017)" #37421

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions internal/datacoord/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,6 @@ func WithoutJobStates(states ...internalpb.ImportJobState) ImportJobFilter {
}
}

func WithDbID(DbID int64) ImportJobFilter {
return func(job ImportJob) bool {
return job.GetDbID() == DbID
}
}

type UpdateJobAction func(job ImportJob)

func UpdateJobState(state internalpb.ImportJobState) UpdateJobAction {
Expand Down Expand Up @@ -106,7 +100,6 @@ func UpdateJobCompleteTime(completeTime string) UpdateJobAction {

type ImportJob interface {
GetJobID() int64
GetDbID() int64
GetCollectionID() int64
GetCollectionName() string
GetPartitionIDs() []int64
Expand Down
22 changes: 5 additions & 17 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1631,9 +1631,7 @@
Status: merr.Success(),
}

log := log.With(
zap.Int64("dbID", in.GetDbID()),
zap.Int64("collection", in.GetCollectionID()),
log := log.With(zap.Int64("collection", in.GetCollectionID()),
zap.Int64s("partitions", in.GetPartitionIDs()),
zap.Strings("channels", in.GetChannelNames()))
log.Info("receive import request", zap.Any("files", in.GetFiles()), zap.Any("options", in.GetOptions()))
Expand Down Expand Up @@ -1692,7 +1690,6 @@
job := &importJob{
ImportJob: &datapb.ImportJob{
JobID: idStart,
DbID: in.GetDbID(),
CollectionID: in.GetCollectionID(),
CollectionName: in.GetCollectionName(),
PartitionIDs: in.GetPartitionIDs(),
Expand All @@ -1719,7 +1716,7 @@
}

func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImportProgressRequest) (*internalpb.GetImportProgressResponse, error) {
log := log.With(zap.String("jobID", in.GetJobID()), zap.Int64("dbID", in.GetDbID()))
log := log.With(zap.String("jobID", in.GetJobID()))
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
return &internalpb.GetImportProgressResponse{
Status: merr.Status(err),
Expand All @@ -1739,10 +1736,6 @@
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("import job does not exist, jobID=%d", jobID)))
return resp, nil
}
if job.GetDbID() != 0 && job.GetDbID() != in.GetDbID() {
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("import job does not exist, jobID=%d, dbID=%d", jobID, in.GetDbID())))
return resp, nil
}
progress, state, importedRows, totalRows, reason := GetJobProgress(jobID, s.importMeta, s.meta)
resp.State = state
resp.Reason = reason
Expand Down Expand Up @@ -1773,14 +1766,11 @@
}

var jobs []ImportJob
filters := make([]ImportJobFilter, 0)
if req.GetDbID() != 0 {
filters = append(filters, WithDbID(req.GetDbID()))
}
if req.GetCollectionID() != 0 {
filters = append(filters, WithCollectionID(req.GetCollectionID()))
jobs = s.importMeta.GetJobBy(WithCollectionID(req.GetCollectionID()))
} else {
jobs = s.importMeta.GetJobBy()

Check warning on line 1772 in internal/datacoord/services.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/services.go#L1772

Added line #L1772 was not covered by tests
}
jobs = s.importMeta.GetJobBy(filters...)

for _, job := range jobs {
progress, state, _, _, reason := GetJobProgress(job.GetJobID(), s.importMeta, s.meta)
Expand All @@ -1790,7 +1780,5 @@
resp.Progresses = append(resp.Progresses, progress)
resp.CollectionNames = append(resp.CollectionNames, job.GetCollectionName())
}
log.Info("ListImports done", zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64("dbID", req.GetDbID()), zap.Any("resp", resp))
return resp, nil
}
40 changes: 3 additions & 37 deletions internal/datacoord/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1711,10 +1711,9 @@ func TestImportV2(t *testing.T) {
assert.NoError(t, err)
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))

// db does not exist
// normal case
var job ImportJob = &importJob{
ImportJob: &datapb.ImportJob{
DbID: 1,
JobID: 0,
Schema: &schemapb.CollectionSchema{},
State: internalpb.ImportJobState_Failed,
Expand All @@ -1723,31 +1722,12 @@ func TestImportV2(t *testing.T) {
err = s.importMeta.AddJob(job)
assert.NoError(t, err)
resp, err = s.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{
DbID: 2,
JobID: "0",
})
assert.NoError(t, err)
assert.True(t, errors.Is(merr.Error(resp.GetStatus()), merr.ErrImportFailed))

// normal case
job = &importJob{
ImportJob: &datapb.ImportJob{
DbID: 1,
JobID: 0,
Schema: &schemapb.CollectionSchema{},
State: internalpb.ImportJobState_Pending,
},
}
err = s.importMeta.AddJob(job)
assert.NoError(t, err)
resp, err = s.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{
DbID: 1,
JobID: "0",
})
assert.NoError(t, err)
assert.Equal(t, int32(0), resp.GetStatus().GetCode())
assert.Equal(t, int64(10), resp.GetProgress())
assert.Equal(t, internalpb.ImportJobState_Pending, resp.GetState())
assert.Equal(t, int64(0), resp.GetProgress())
assert.Equal(t, internalpb.ImportJobState_Failed, resp.GetState())
})

t.Run("ListImports", func(t *testing.T) {
Expand All @@ -1770,7 +1750,6 @@ func TestImportV2(t *testing.T) {
assert.NoError(t, err)
var job ImportJob = &importJob{
ImportJob: &datapb.ImportJob{
DbID: 2,
JobID: 0,
CollectionID: 1,
Schema: &schemapb.CollectionSchema{},
Expand All @@ -1787,20 +1766,7 @@ func TestImportV2(t *testing.T) {
}
err = s.importMeta.AddTask(task)
assert.NoError(t, err)
// db id not match
resp, err = s.ListImports(ctx, &internalpb.ListImportsRequestInternal{
DbID: 3,
CollectionID: 1,
})
assert.NoError(t, err)
assert.Equal(t, int32(0), resp.GetStatus().GetCode())
assert.Equal(t, 0, len(resp.GetJobIDs()))
assert.Equal(t, 0, len(resp.GetStates()))
assert.Equal(t, 0, len(resp.GetReasons()))
assert.Equal(t, 0, len(resp.GetProgresses()))
// db id match
resp, err = s.ListImports(ctx, &internalpb.ListImportsRequestInternal{
DbID: 2,
CollectionID: 1,
})
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/proxy/httpserver/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) {

router.POST(ImportJobCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &OptionalCollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listImportJob)))))
router.POST(ImportJobCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &ImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createImportJob)))))
router.POST(ImportJobCategory+GetProgressAction, timeoutMiddleware(wrapperPost(func() any { return &GetImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
router.POST(ImportJobCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &GetImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
router.POST(ImportJobCategory+GetProgressAction, timeoutMiddleware(wrapperPost(func() any { return &JobIDReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
router.POST(ImportJobCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &JobIDReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
}

type (
Expand Down
9 changes: 3 additions & 6 deletions internal/distributed/proxy/httpserver/request_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,11 @@ func (req *ImportReq) GetOptions() map[string]string {
return req.Options
}

type GetImportReq struct {
DbName string `json:"dbName"`
JobID string `json:"jobId" binding:"required"`
type JobIDReq struct {
JobID string `json:"jobId" binding:"required"`
}

func (req *GetImportReq) GetJobID() string { return req.JobID }

func (req *GetImportReq) GetDbName() string { return req.DbName }
func (req *JobIDReq) GetJobID() string { return req.JobID }

type QueryReqV2 struct {
DbName string `json:"dbName"`
Expand Down
1 change: 0 additions & 1 deletion internal/proto/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ message ImportResponse {
message GetImportProgressRequest {
string db_name = 1;
string jobID = 2;
int64 dbID = 3;
}

message ImportTaskProgress {
Expand Down
30 changes: 1 addition & 29 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6158,7 +6158,6 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
return &internalpb.ImportResponse{Status: merr.Status(err)}, nil
}
log := log.Ctx(ctx).With(
zap.String("dbName", req.GetDbName()),
zap.String("collectionName", req.GetCollectionName()),
zap.String("partition name", req.GetPartitionName()),
zap.Any("files", req.GetFiles()),
Expand All @@ -6185,11 +6184,6 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
}
}()

dbInfo, err := globalMetaCache.GetDatabaseInfo(ctx, req.GetDbName())
if err != nil {
resp.Status = merr.Status(err)
return resp, nil
}
collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName())
if err != nil {
resp.Status = merr.Status(err)
Expand Down Expand Up @@ -6300,7 +6294,6 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
}
}
importRequest := &internalpb.ImportRequestInternal{
DbID: dbInfo.dbID,
CollectionID: collectionID,
CollectionName: req.GetCollectionName(),
PartitionIDs: partitionIDs,
Expand All @@ -6325,28 +6318,14 @@ func (node *Proxy) GetImportProgress(ctx context.Context, req *internalpb.GetImp
}, nil
}
log := log.Ctx(ctx).With(
zap.String("dbName", req.GetDbName()),
zap.String("jobID", req.GetJobID()),
)

resp := &internalpb.GetImportProgressResponse{
Status: merr.Success(),
}

method := "GetImportProgress"
tr := timerecord.NewTimeRecorder(method)
log.Info(rpcReceived(method))

// Fill db id for datacoord.
dbInfo, err := globalMetaCache.GetDatabaseInfo(ctx, req.GetDbName())
if err != nil {
resp.Status = merr.Status(err)
return resp, nil
}
req.DbID = dbInfo.dbID

nodeID := fmt.Sprint(paramtable.GetNodeID())
resp, err = node.dataCoord.GetImportProgress(ctx, req)
resp, err := node.dataCoord.GetImportProgress(ctx, req)
if resp.GetStatus().GetCode() != 0 || err != nil {
log.Warn("get import progress failed", zap.String("reason", resp.GetStatus().GetReason()), zap.Error(err))
metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, metrics.FailLabel, req.GetDbName(), "").Inc()
Expand Down Expand Up @@ -6383,11 +6362,6 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR
err error
collectionID UniqueID
)
dbInfo, err := globalMetaCache.GetDatabaseInfo(ctx, req.GetDbName())
if err != nil {
resp.Status = merr.Status(err)
return resp, nil
}
if req.GetCollectionName() != "" {
collectionID, err = globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName())
if err != nil {
Expand All @@ -6396,9 +6370,7 @@ func (node *Proxy) ListImports(ctx context.Context, req *internalpb.ListImportsR
return resp, nil
}
}

resp, err = node.dataCoord.ListImports(ctx, &internalpb.ListImportsRequestInternal{
DbID: dbInfo.dbID,
CollectionID: collectionID,
})
if resp.GetStatus().GetCode() != 0 || err != nil {
Expand Down
40 changes: 2 additions & 38 deletions internal/proxy/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1596,17 +1596,8 @@ func TestProxy_ImportV2(t *testing.T) {
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
node.UpdateStateCode(commonpb.StateCode_Healthy)

// no such database
mc := NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, mockErr)
globalMetaCache = mc
rsp, err = node.ImportV2(ctx, &internalpb.ImportRequest{CollectionName: "aaa"})
assert.NoError(t, err)
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())

// no such collection
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc := NewMockCache(t)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, mockErr)
globalMetaCache = mc
rsp, err = node.ImportV2(ctx, &internalpb.ImportRequest{CollectionName: "aaa"})
Expand All @@ -1615,7 +1606,6 @@ func TestProxy_ImportV2(t *testing.T) {

// get schema failed
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(nil, mockErr)
globalMetaCache = mc
Expand All @@ -1625,7 +1615,6 @@ func TestProxy_ImportV2(t *testing.T) {

// get channel failed
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
CollectionSchema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
Expand All @@ -1650,7 +1639,6 @@ func TestProxy_ImportV2(t *testing.T) {

// get partitions failed
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
CollectionSchema: &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
Expand All @@ -1665,7 +1653,6 @@ func TestProxy_ImportV2(t *testing.T) {

// get partitionID failed
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
CollectionSchema: &schemapb.CollectionSchema{},
Expand All @@ -1678,7 +1665,6 @@ func TestProxy_ImportV2(t *testing.T) {

// no file
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mc.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(&schemaInfo{
CollectionSchema: &schemapb.CollectionSchema{},
Expand Down Expand Up @@ -1725,18 +1711,7 @@ func TestProxy_ImportV2(t *testing.T) {
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
node.UpdateStateCode(commonpb.StateCode_Healthy)

// no such database
mc := NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, mockErr)
globalMetaCache = mc
rsp, err = node.GetImportProgress(ctx, &internalpb.GetImportProgressRequest{})
assert.NoError(t, err)
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())

// normal case
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
globalMetaCache = mc
dataCoord := mocks.NewMockDataCoordClient(t)
dataCoord.EXPECT().GetImportProgress(mock.Anything, mock.Anything).Return(nil, nil)
node.dataCoord = dataCoord
Expand All @@ -1754,19 +1729,8 @@ func TestProxy_ImportV2(t *testing.T) {
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())
node.UpdateStateCode(commonpb.StateCode_Healthy)

// no such database
mc := NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, mockErr)
globalMetaCache = mc
rsp, err = node.ListImports(ctx, &internalpb.ListImportsRequest{
CollectionName: "col",
})
assert.NoError(t, err)
assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode())

// normal case
mc = NewMockCache(t)
mc.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 1}, nil)
mc := NewMockCache(t)
mc.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
globalMetaCache = mc
dataCoord := mocks.NewMockDataCoordClient(t)
Expand Down
Loading
Loading