Skip to content

Commit

Permalink
enhance: add some apis in Restful (#38733)
Browse files Browse the repository at this point in the history
add some apis in Restful #38709 

- alter/drop collection properties
- alter/drop index properties
- alter collection field properties
- refresh load

Signed-off-by: lixinguo <[email protected]>
Co-authored-by: lixinguo <[email protected]>
  • Loading branch information
smellthemoon and lixinguo authored Dec 31, 2024
1 parent 3822819 commit 6b10583
Show file tree
Hide file tree
Showing 4 changed files with 396 additions and 11 deletions.
24 changes: 14 additions & 10 deletions internal/distributed/proxy/httpserver/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ import (
// v2
const (
// --- category ---
DataBaseCategory = "/databases/"
CollectionCategory = "/collections/"
EntityCategory = "/entities/"
PartitionCategory = "/partitions/"
UserCategory = "/users/"
RoleCategory = "/roles/"
IndexCategory = "/indexes/"
AliasCategory = "/aliases/"
ImportJobCategory = "/jobs/import/"
PrivilegeGroupCategory = "/privilege_groups/"
DataBaseCategory = "/databases/"
CollectionCategory = "/collections/"
EntityCategory = "/entities/"
PartitionCategory = "/partitions/"
UserCategory = "/users/"
RoleCategory = "/roles/"
IndexCategory = "/indexes/"
AliasCategory = "/aliases/"
ImportJobCategory = "/jobs/import/"
PrivilegeGroupCategory = "/privilege_groups/"
CollectionFieldCategory = "/collections/fields/"

ListAction = "list"
HasAction = "has"
Expand All @@ -45,6 +46,7 @@ const (
LoadStateAction = "get_load_state"
RenameAction = "rename"
LoadAction = "load"
RefreshLoadAction = "refresh_load"
ReleaseAction = "release"
QueryAction = "query"
GetAction = "get"
Expand All @@ -63,6 +65,8 @@ const (
GrantPrivilegeActionV2 = "grant_privilege_v2"
RevokePrivilegeActionV2 = "revoke_privilege_v2"
AlterAction = "alter"
AlterPropertiesAction = "alter_properties"
DropPropertiesAction = "drop_properties"
GetProgressAction = "get_progress" // deprecated, keep it for compatibility, use `/v2/vectordb/jobs/import/describe` instead
AddPrivilegesToGroupAction = "add_privileges_to_group"
RemovePrivilegesFromGroupAction = "remove_privileges_from_group"
Expand Down
122 changes: 122 additions & 0 deletions internal/distributed/proxy/httpserver/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) {
router.POST(CollectionCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.dropCollection))))
router.POST(CollectionCategory+RenameAction, timeoutMiddleware(wrapperPost(func() any { return &RenameCollectionReq{} }, wrapperTraceLog(h.renameCollection))))
router.POST(CollectionCategory+LoadAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.loadCollection))))
router.POST(CollectionCategory+RefreshLoadAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.refreshLoadCollection))))
router.POST(CollectionCategory+ReleaseAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.releaseCollection))))
router.POST(CollectionCategory+AlterPropertiesAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionReqWithProperties{} }, wrapperTraceLog(h.alterCollectionProperties))))
router.POST(CollectionCategory+DropPropertiesAction, timeoutMiddleware(wrapperPost(func() any { return &DropCollectionPropertiesReq{} }, wrapperTraceLog(h.dropCollectionProperties))))

router.POST(CollectionFieldCategory+AlterPropertiesAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionFieldReqWithParams{} }, wrapperTraceLog(h.alterCollectionFieldProperties))))

router.POST(DataBaseCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &DatabaseReqWithProperties{} }, wrapperTraceLog(h.createDatabase))))
router.POST(DataBaseCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &DatabaseReqRequiredName{} }, wrapperTraceLog(h.dropDatabase))))
Expand Down Expand Up @@ -170,6 +175,8 @@ func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) {
router.POST(IndexCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &IndexParamReq{} }, wrapperTraceLog(h.createIndex))))
// todo cannot drop index before release it ?
router.POST(IndexCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &IndexReq{} }, wrapperTraceLog(h.dropIndex))))
router.POST(IndexCategory+AlterPropertiesAction, timeoutMiddleware(wrapperPost(func() any { return &IndexReqWithProperties{} }, wrapperTraceLog(h.alterIndexProperties))))
router.POST(IndexCategory+DropPropertiesAction, timeoutMiddleware(wrapperPost(func() any { return &DropIndexPropertiesReq{} }, wrapperTraceLog(h.dropIndexProperties))))

router.POST(AliasCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &OptionalCollectionNameReq{} }, wrapperTraceLog(h.listAlias))))
router.POST(AliasCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &AliasReq{} }, wrapperTraceLog(h.describeAlias))))
Expand Down Expand Up @@ -598,12 +605,26 @@ func (h *HandlersV2) renameCollection(ctx context.Context, c *gin.Context, anyRe
return resp, err
}

func (h *HandlersV2) refreshLoadCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
getter, _ := anyReq.(requestutil.CollectionNameGetter)
req := &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: getter.GetCollectionName(),
Refresh: true,
}
return h.loadCollectionInternal(ctx, c, req, dbName)
}

func (h *HandlersV2) loadCollection(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
getter, _ := anyReq.(requestutil.CollectionNameGetter)
req := &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: getter.GetCollectionName(),
}
return h.loadCollectionInternal(ctx, c, req, dbName)
}

func (h *HandlersV2) loadCollectionInternal(ctx context.Context, c *gin.Context, req *milvuspb.LoadCollectionRequest, dbName string) (interface{}, error) {
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/LoadCollection", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.LoadCollection(reqCtx, req.(*milvuspb.LoadCollectionRequest))
Expand All @@ -630,6 +651,68 @@ func (h *HandlersV2) releaseCollection(ctx context.Context, c *gin.Context, anyR
return resp, err
}

func (h *HandlersV2) alterCollectionProperties(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*CollectionReqWithProperties)
req := &milvuspb.AlterCollectionRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
}
properties := make([]*commonpb.KeyValuePair, 0, len(httpReq.Properties))
for key, value := range httpReq.Properties {
properties = append(properties, &commonpb.KeyValuePair{Key: key, Value: fmt.Sprintf("%v", value)})
}
req.Properties = properties

c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/AlterCollection", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.AlterCollection(reqCtx, req.(*milvuspb.AlterCollectionRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}

func (h *HandlersV2) dropCollectionProperties(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*DropCollectionPropertiesReq)
req := &milvuspb.AlterCollectionRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
DeleteKeys: httpReq.DeleteKeys,
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/AlterCollection", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.AlterCollection(reqCtx, req.(*milvuspb.AlterCollectionRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}

func (h *HandlersV2) alterCollectionFieldProperties(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*CollectionFieldReqWithParams)
req := &milvuspb.AlterCollectionFieldRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
FieldName: httpReq.FieldName,
}
properties := make([]*commonpb.KeyValuePair, 0, len(httpReq.FieldParams))
for key, value := range httpReq.FieldParams {
properties = append(properties, &commonpb.KeyValuePair{Key: key, Value: fmt.Sprintf("%v", value)})
}
req.Properties = properties

c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/AlterCollectionField", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.AlterCollectionField(reqCtx, req.(*milvuspb.AlterCollectionFieldRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}

// copy from internal/proxy/task_query.go
func matchCountRule(outputs []string) bool {
return len(outputs) == 1 && strings.ToLower(strings.TrimSpace(outputs[0])) == "count(*)"
Expand Down Expand Up @@ -2144,6 +2227,45 @@ func (h *HandlersV2) dropIndex(ctx context.Context, c *gin.Context, anyReq any,
return resp, err
}

func (h *HandlersV2) alterIndexProperties(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*IndexReqWithProperties)
req := &milvuspb.AlterIndexRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
}
extraParams := make([]*commonpb.KeyValuePair, 0, len(httpReq.Properties))
for key, value := range httpReq.Properties {
extraParams = append(extraParams, &commonpb.KeyValuePair{Key: key, Value: fmt.Sprintf("%v", value)})
}
req.ExtraParams = extraParams

c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/AlterIndex", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.AlterIndex(reqCtx, req.(*milvuspb.AlterIndexRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}

func (h *HandlersV2) dropIndexProperties(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*DropIndexPropertiesReq)
req := &milvuspb.AlterIndexRequest{
DbName: dbName,
CollectionName: httpReq.CollectionName,
DeleteKeys: httpReq.DeleteKeys,
}
c.Set(ContextRequest, req)
resp, err := wrapperProxyWithLimit(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/AlterIndex", true, h.proxy, func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.AlterIndex(reqCtx, req.(*milvuspb.AlterIndexRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}

func (h *HandlersV2) listAlias(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
req := &milvuspb.ListAliasesRequest{
Expand Down
Loading

0 comments on commit 6b10583

Please sign in to comment.