Skip to content

Commit

Permalink
refine index node (#18545)
Browse files Browse the repository at this point in the history
Signed-off-by: Zach41 <[email protected]>
  • Loading branch information
Zach41 authored Aug 9, 2022
1 parent 4edc8d3 commit d3c478f
Show file tree
Hide file tree
Showing 24 changed files with 3,219 additions and 2,225 deletions.
137 changes: 69 additions & 68 deletions internal/core/src/pb/common.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/core/src/pb/common.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 17 additions & 5 deletions internal/distributed/indexnode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/indexnode"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexnodepb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
Expand Down Expand Up @@ -232,13 +233,24 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetSt
return s.indexnode.GetStatisticsChannel(ctx)
}

// CreateIndex sends the create index request to IndexNode.
func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
return s.indexnode.CreateIndex(ctx, req)
// CreateJob sends the create index request to IndexNode.
func (s *Server) CreateJob(ctx context.Context, req *indexnodepb.CreateJobRequest) (*commonpb.Status, error) {
return s.indexnode.CreateJob(ctx, req)
}

func (s *Server) GetTaskSlots(ctx context.Context, req *indexpb.GetTaskSlotsRequest) (*indexpb.GetTaskSlotsResponse, error) {
return s.indexnode.GetTaskSlots(ctx, req)
// QueryJobs querys index jobs statues
func (s *Server) QueryJobs(ctx context.Context, req *indexnodepb.QueryJobsRequest) (*indexnodepb.QueryJobsRespond, error) {
return s.indexnode.QueryJobs(ctx, req)
}

// DropJobs drops index build jobs
func (s *Server) DropJobs(ctx context.Context, req *indexnodepb.DropJobsRequest) (*commonpb.Status, error) {
return s.indexnode.DropJobs(ctx, req)
}

// GetJobNum gets indexnode's job statisctics
func (s *Server) GetJobStats(ctx context.Context, req *indexnodepb.GetJobStatsRequest) (*indexnodepb.GetJobStatsRespond, error) {
return s.indexnode.GetJobStats(ctx, req)
}

// GetMetrics gets the metrics info of IndexNode.
Expand Down
39 changes: 39 additions & 0 deletions internal/indexnode/chunk_mgr_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package indexnode

import (
"context"
"fmt"
"sync"

"github.com/milvus-io/milvus/internal/storage"
)

type StorageFactory interface {
NewChunkManager(ctx context.Context, bucket, storageAccessKey string) (storage.ChunkManager, error)
}

type chunkMgr struct {
cached sync.Map
}

func (m *chunkMgr) NewChunkManager(ctx context.Context, bucket, storageAccessKey string) (storage.ChunkManager, error) {
key := m.cacheKey(bucket, storageAccessKey)
if v, ok := m.cached.Load(key); ok {
return v.(storage.ChunkManager), nil
}
opts := []storage.Option{
storage.AccessKeyID(storageAccessKey),
storage.BucketName(bucket),
}
factory := storage.NewChunkManagerFactory("local", "minio", opts...)
mgr, err := factory.NewVectorStorageChunkManager(ctx)
if err != nil {
return nil, err
}
v, _ := m.cached.LoadOrStore(key, mgr)
return v.(storage.ChunkManager), nil
}

func (m *chunkMgr) cacheKey(bucket, storageAccessKey string) string {
return fmt.Sprintf("%s/%s", bucket, storageAccessKey)
}
Loading

0 comments on commit d3c478f

Please sign in to comment.