diff --git a/client/bulkwriter/bulk_import.go b/client/bulkwriter/bulk_import.go index 8b0d8348edc4c..16813600b5eb9 100644 --- a/client/bulkwriter/bulk_import.go +++ b/client/bulkwriter/bulk_import.go @@ -25,11 +25,13 @@ import ( "net/http" ) +// ResponseBase is the common milvus restful response struct. type ResponseBase struct { Status int `json:"status"` Message string `json:"message"` } +// CheckStatus checks the response status and return error if not ok. func (b ResponseBase) CheckStatus() error { if b.Status != 0 { return fmt.Errorf("bulk import return error, status: %d, message: %s", b.Status, b.Message) @@ -39,9 +41,10 @@ func (b ResponseBase) CheckStatus() error { type BulkImportOption struct { // milvus params - URI string `json:"-"` - CollectionName string `json:"collectionName"` - Files [][]string `json:"files"` + URL string `json:"-"` + CollectionName string `json:"collectionName"` + // optional in cloud api, use object url instead + Files [][]string `json:"files,omitempty"` // optional params PartitionName string `json:"partitionName,omitempty"` APIKey string `json:"-"` @@ -77,12 +80,13 @@ func (opt *BulkImportOption) WithOption(key, value string) *BulkImportOption { return opt } +// NewBulkImportOption returns BulkImportOption for Milvus bulk import API. func NewBulkImportOption(uri string, collectionName string, files [][]string, ) *BulkImportOption { return &BulkImportOption{ - URI: uri, + URL: uri, CollectionName: collectionName, Files: files, } @@ -91,7 +95,6 @@ func NewBulkImportOption(uri string, // NewCloudBulkImportOption returns import option for cloud import API. func NewCloudBulkImportOption(uri string, collectionName string, - files [][]string, apiKey string, objectURL string, clusterID string, @@ -99,9 +102,8 @@ func NewCloudBulkImportOption(uri string, secretKey string, ) *BulkImportOption { return &BulkImportOption{ - URI: uri, + URL: uri, CollectionName: collectionName, - Files: files, APIKey: apiKey, ObjectURL: objectURL, ClusterID: clusterID, @@ -119,7 +121,7 @@ type BulkImportResponse struct { // BulkImport is the API wrapper for restful import API. func BulkImport(ctx context.Context, option *BulkImportOption) (*BulkImportResponse, error) { - url := option.URI + "/v2/vectordb/jobs/import/create" + url := option.URL + "/v2/vectordb/jobs/import/create" bs, err := option.GetRequest() if err != nil { return nil, err @@ -142,12 +144,17 @@ func BulkImport(ctx context.Context, option *BulkImportOption) (*BulkImportRespo } type ListImportJobsOption struct { - URI string `json:"-"` + URL string `json:"-"` CollectionName string `json:"collectionName"` ClusterID string `json:"clusterId,omitempty"` APIKey string `json:"-"` - PageSize int `json:"pageSize"` - CurrentPage int `json:"currentPage"` + PageSize int `json:"pageSize,omitempty"` + CurrentPage int `json:"currentPage,omitempty"` +} + +func (opt *ListImportJobsOption) WithAPIKey(key string) *ListImportJobsOption { + opt.APIKey = key + return opt } func (opt *ListImportJobsOption) WithPageSize(pageSize int) *ListImportJobsOption { @@ -166,7 +173,7 @@ func (opt *ListImportJobsOption) GetRequest() ([]byte, error) { func NewListImportJobsOption(uri string, collectionName string) *ListImportJobsOption { return &ListImportJobsOption{ - URI: uri, + URL: uri, CollectionName: collectionName, CurrentPage: 1, PageSize: 10, @@ -191,7 +198,7 @@ type ImportJobRecord struct { } func ListImportJobs(ctx context.Context, option *ListImportJobsOption) (*ListImportJobsResponse, error) { - url := option.URI + "/v2/vectordb/jobs/import/list" + url := option.URL + "/v2/vectordb/jobs/import/list" bs, err := option.GetRequest() if err != nil { return nil, err @@ -214,7 +221,7 @@ func ListImportJobs(ctx context.Context, option *ListImportJobsOption) (*ListImp } type GetImportProgressOption struct { - URI string `json:"-"` + URL string `json:"-"` JobID string `json:"jobId"` // optional ClusterID string `json:"clusterId"` @@ -232,14 +239,14 @@ func (opt *GetImportProgressOption) WithAPIKey(key string) *GetImportProgressOpt func NewGetImportProgressOption(uri string, jobID string) *GetImportProgressOption { return &GetImportProgressOption{ - URI: uri, + URL: uri, JobID: jobID, } } func NewCloudGetImportProgressOption(uri string, jobID string, apiKey string, clusterID string) *GetImportProgressOption { return &GetImportProgressOption{ - URI: uri, + URL: uri, JobID: jobID, APIKey: apiKey, ClusterID: clusterID, @@ -275,7 +282,7 @@ type ImportProgressDetail struct { } func GetImportProgress(ctx context.Context, option *GetImportProgressOption) (*GetImportProgressResponse, error) { - url := option.URI + "/v2/vectordb/jobs/import/describe" + url := option.URL + "/v2/vectordb/jobs/import/describe" bs, err := option.GetRequest() if err != nil { diff --git a/client/bulkwriter/bulk_import_test.go b/client/bulkwriter/bulk_import_test.go index 5a45ee3463dea..7f3fb26d4f797 100644 --- a/client/bulkwriter/bulk_import_test.go +++ b/client/bulkwriter/bulk_import_test.go @@ -33,12 +33,19 @@ type BulkImportSuite struct { func (s *BulkImportSuite) TestBulkImport() { s.Run("normal_case", func() { svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + authHeader := req.Header.Get("Authorization") + s.Equal("Bearer root:Milvus", authHeader) s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/create")) rw.Write([]byte(`{"status":0, "data":{"jobId": "123"}}`)) })) defer svr.Close() - resp, err := BulkImport(context.Background(), NewBulkImportOption(svr.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}})) + resp, err := BulkImport(context.Background(), + NewBulkImportOption(svr.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}}). + WithPartition("_default"). + WithOption("backup", "true"). + WithAPIKey("root:Milvus"), + ) s.NoError(err) s.EqualValues(0, resp.Status) s.Equal("123", resp.Data.JobID) @@ -46,6 +53,7 @@ func (s *BulkImportSuite) TestBulkImport() { s.Run("svr_error", func() { svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + // rw. rw.WriteHeader(http.StatusInternalServerError) rw.Write([]byte(`interal server error`)) })) @@ -58,24 +66,38 @@ func (s *BulkImportSuite) TestBulkImport() { s.Run("status_error", func() { svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/create")) - rw.Write([]byte(`{"status":1100, "message": "import job failed"`)) + rw.Write([]byte(`{"status":1100, "message": "import job failed"}`)) })) defer svr.Close() _, err := BulkImport(context.Background(), NewBulkImportOption(svr.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}})) s.Error(err) }) + + s.Run("server_closed", func() { + svr2 := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {})) + svr2.Close() + _, err := BulkImport(context.Background(), NewBulkImportOption(svr2.URL, "hello_milvus", [][]string{{"files/a.json", "files/b.json"}})) + s.Error(err) + }) } func (s *BulkImportSuite) TestListImportJobs() { s.Run("normal_case", func() { svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + authHeader := req.Header.Get("Authorization") + s.Equal("Bearer root:Milvus", authHeader) s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/list")) rw.Write([]byte(`{"status":0, "data":{"records": [{"jobID": "abc", "collectionName": "hello_milvus", "state":"Importing", "progress": 50}]}}`)) })) defer svr.Close() - resp, err := ListImportJobs(context.Background(), NewListImportJobsOption(svr.URL, "hello_milvus")) + resp, err := ListImportJobs(context.Background(), + NewListImportJobsOption(svr.URL, "hello_milvus"). + WithPageSize(10). + WithCurrentPage(1). + WithAPIKey("root:Milvus"), + ) s.NoError(err) s.EqualValues(0, resp.Status) if s.Len(resp.Data.Records, 1) { @@ -101,12 +123,17 @@ func (s *BulkImportSuite) TestListImportJobs() { func (s *BulkImportSuite) TestGetImportProgress() { s.Run("normal_case", func() { svr := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + authHeader := req.Header.Get("Authorization") + s.Equal("Bearer root:Milvus", authHeader) s.True(strings.Contains(req.URL.Path, "/v2/vectordb/jobs/import/describe")) rw.Write([]byte(`{"status":0, "data":{"collectionName": "hello_milvus","jobId":"abc", "state":"Importing", "progress": 50, "importedRows": 20000,"totalRows": 40000, "details":[{"fileName": "files/a.json", "fileSize": 64312, "progress": 100, "state": "Completed"}, {"fileName":"files/b.json", "fileSize":52912, "progress":0, "state":"Importing"}]}}`)) })) defer svr.Close() - resp, err := GetImportProgress(context.Background(), NewGetImportProgressOption(svr.URL, "abc")) + resp, err := GetImportProgress(context.Background(), + NewGetImportProgressOption(svr.URL, "abc"). + WithAPIKey("root:Milvus"), + ) s.NoError(err) s.EqualValues(0, resp.Status) s.Equal("hello_milvus", resp.Data.CollectionName)