From 3f9b3c556a38bc7422d70a9e2e6b541f038fe278 Mon Sep 17 00:00:00 2001 From: binbin lv Date: Tue, 24 Dec 2024 15:47:05 +0800 Subject: [PATCH] test: Add test cases for rbac v2 Signed-off-by: binbin lv --- .../python_client/base/collection_wrapper.py | 4 + tests/python_client/base/database_wrapper.py | 12 + .../base/high_level_api_wrapper.py | 37 +- tests/python_client/base/partition_wrapper.py | 8 + tests/python_client/base/utility_wrapper.py | 42 + tests/python_client/common/common_type.py | 23 + tests/python_client/conftest.py | 4 +- .../milvus_client/test_milvus_client_alter.py | 4 +- tests/python_client/testcases/test_utility.py | 1536 +++++++++++++++++ 9 files changed, 1662 insertions(+), 8 deletions(-) diff --git a/tests/python_client/base/collection_wrapper.py b/tests/python_client/base/collection_wrapper.py index 2fae11cbec0df..b8d1c7f1c0e18 100644 --- a/tests/python_client/base/collection_wrapper.py +++ b/tests/python_client/base/collection_wrapper.py @@ -58,6 +58,10 @@ def is_empty(self): self.flush() return self.collection.is_empty + @property + def is_empty_without_flush(self): + return self.collection.is_empty + @property def num_entities(self): self.flush() diff --git a/tests/python_client/base/database_wrapper.py b/tests/python_client/base/database_wrapper.py index 3bbaf897af46e..87b5d1295c274 100644 --- a/tests/python_client/base/database_wrapper.py +++ b/tests/python_client/base/database_wrapper.py @@ -33,3 +33,15 @@ def list_database(self, using="default", timeout=None, check_task=None, check_it response, is_succ = api_request([self.database.list_database, using, timeout]) check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ).run() return response, check_result + + def set_properties(self, db_name: str, properties: dict, using="default", timeout=None, check_task=None, check_items=None): + func_name = sys._getframe().f_code.co_name + response, is_succ = api_request([self.database.set_properties, db_name, properties, using, timeout]) + check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ).run() + return response, check_result + + def describe_database(self, db_name: str, using="default", timeout=None, check_task=None, check_items=None): + func_name = sys._getframe().f_code.co_name + response, is_succ = api_request([self.database.describe_database, db_name, using, timeout]) + check_result = ResponseChecker(response, func_name, check_task, check_items, is_succ).run() + return response, check_result diff --git a/tests/python_client/base/high_level_api_wrapper.py b/tests/python_client/base/high_level_api_wrapper.py index 4dc62e17f2792..115ce54655dab 100644 --- a/tests/python_client/base/high_level_api_wrapper.py +++ b/tests/python_client/base/high_level_api_wrapper.py @@ -665,6 +665,39 @@ def revoke_privilege(self, role_name, object_type, privilege, object_name, db_na object_name=object_name, db_name=db_name, **kwargs).run() return res, check_result + def create_privilege_group(self, privilege_group: str, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.milvus_client.create_privilege_group, privilege_group], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def drop_privilege_group(self, privilege_group: str, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.milvus_client.drop_privilege_group, privilege_group], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def list_privilege_groups(self, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.milvus_client.list_privilege_groups], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def add_privileges_to_group(self, privilege_group: str, privileges: list, check_task=None, check_items=None, + **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.milvus_client.add_privileges_to_group, privilege_group, privileges], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def remove_privileges_from_group(self, privilege_group: str, privileges: list, check_task=None, check_items=None, + **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.milvus_client.remove_privileges_from_group, privilege_group, privileges], + **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + @trace() def alter_index_properties(self, client, collection_name, index_name, properties, timeout=None, check_task=None, check_items=None, **kwargs): @@ -785,7 +818,3 @@ def list_databases(self, client, timeout=None, check_task=None, check_items=None check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() return res, check_result - - - - diff --git a/tests/python_client/base/partition_wrapper.py b/tests/python_client/base/partition_wrapper.py index 3d22978de105d..50aaa277268ca 100644 --- a/tests/python_client/base/partition_wrapper.py +++ b/tests/python_client/base/partition_wrapper.py @@ -38,11 +38,19 @@ def is_empty(self): self.flush() return self.partition.is_empty if self.partition else None + @property + def is_empty_without_flush(self): + return self.partition.is_empty if self.partition else None + @property def num_entities(self): self.flush() return self.partition.num_entities if self.partition else None + @property + def num_entities_without_flush(self): + return self.partition.num_entities if self.partition else None + def drop(self, check_task=None, check_items=None, **kwargs): timeout = kwargs.get("timeout", TIMEOUT) kwargs.update({"timeout": timeout}) diff --git a/tests/python_client/base/utility_wrapper.py b/tests/python_client/base/utility_wrapper.py index c48d736df63fa..7a8d7fd064328 100644 --- a/tests/python_client/base/utility_wrapper.py +++ b/tests/python_client/base/utility_wrapper.py @@ -471,6 +471,18 @@ def role_revoke(self, object: str, object_name: str, privilege: str, db_name: st check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() return res, check_result + def role_grant_v2(self, privilege: str, collection_name: str, db_name: str = None, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.role.grant_v2, privilege, collection_name, db_name], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def role_revoke_v2(self, privilege: str, collection_name: str, db_name: str = None, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.role.revoke_v2, privilege, collection_name, db_name], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + def role_list_grant(self, object: str, object_name: str, db_name: str = "", check_task=None, check_items=None, **kwargs): func_name = sys._getframe().f_code.co_name res, check = api_request([self.role.list_grant, object, object_name, db_name], **kwargs) @@ -558,3 +570,33 @@ def list_indexes(self, collection_name, using="default", timeout=None, check_tas check_result = ResponseChecker(res, func_name, check_task, check_items, check, collection_name=collection_name, using=using, timeout=timeout, **kwargs).run() return res, check_result + + def create_privilege_group(self, privilege_group: str, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.role.create_privilege_group, privilege_group], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def drop_privilege_group(self, privilege_group: str, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.role.drop_privilege_group, privilege_group], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def list_privilege_groups(self, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.role.list_privilege_groups], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def add_privileges_to_group(self, privilege_group: str, privileges: list, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.role.add_privileges_to_group, privilege_group, privileges], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result + + def remove_privileges_from_group(self, privilege_group: str, privileges: list, check_task=None, check_items=None, **kwargs): + func_name = sys._getframe().f_code.co_name + res, check = api_request([self.role.remove_privileges_from_group, privilege_group, privileges], **kwargs) + check_result = ResponseChecker(res, func_name, check_task, check_items, check, **kwargs).run() + return res, check_result \ No newline at end of file diff --git a/tests/python_client/common/common_type.py b/tests/python_client/common/common_type.py index bae1c122b57c1..77ed742179d36 100644 --- a/tests/python_client/common/common_type.py +++ b/tests/python_client/common/common_type.py @@ -258,6 +258,29 @@ default_diskann_search_params = {"params": {"search_list": 30}} default_sparse_search_params = {"metric_type": "IP", "params": {"drop_ratio_search": "0.2"}} default_text_sparse_search_params = {"metric_type": "BM25", "params": {}} +built_in_privilege_groups = ["CollectionReadWrite", "CollectionReadOnly", "CollectionAdmin", + "DatabaseReadWrite", "DatabaseReadOnly", "DatabaseAdmin", + "ClusterReadWrite", "ClusterReadOnly", "ClusterAdmin"] +privilege_group_privilege_dict = {"Query": False, "Search": False, "GetLoadState": False, + "GetLoadingProgress": False, "HasPartition": False, "ShowPartitions": False, + "ShowCollections": False, "ListAliases": False, "ListDatabases": False, + "DescribeDatabase": False, "DescribeAlias": False, "GetStatistics": False, + "CreateIndex": False, "DropIndex": False, "CreatePartition": False, + "DropPartition": False, "Load": False, "Release": False, + "Insert": False, "Delete": False, "Upsert": False, + "Import": False, "Flush": False, "Compaction": False, + "LoadBalance": False, "RenameCollection": False, "CreateAlias": False, + "DropAlias": False, "CreateCollection": False, "DropCollection": False, + "CreateOwnership": False, "DropOwnership": False, "SelectOwnership": False, + "ManageOwnership": False, "UpdateUser": False, "SelectUser": False, + "CreateResourceGroup": False, "DropResourceGroup": False, + "UpdateResourceGroups": False, + "DescribeResourceGroup": False, "ListResourceGroups": False, "TransferNode": False, + "TransferReplica": False, "CreateDatabase": False, "DropDatabase": False, + "AlterDatabase": False, "FlushAll": False, "ListPrivilegeGroups": False, + "CreatePrivilegeGroup": False, "DropPrivilegeGroup": False, + "OperatePrivilegeGroup": False} + class CheckTasks: """ The name of the method used to check the result """ diff --git a/tests/python_client/conftest.py b/tests/python_client/conftest.py index 04a03dc71072f..526b1caf96b24 100644 --- a/tests/python_client/conftest.py +++ b/tests/python_client/conftest.py @@ -22,8 +22,8 @@ def pytest_addoption(parser): parser.addoption("--host", action="store", default="localhost", help="service's ip") parser.addoption("--service", action="store", default="", help="service address") parser.addoption("--port", action="store", default=19530, help="service's port") - parser.addoption("--user", action="store", default="", help="user name for connection") - parser.addoption("--password", action="store", default="", help="password for connection") + parser.addoption("--user", action="store", default="root", help="user name for connection") + parser.addoption("--password", action="store", default="Milvus", help="password for connection") parser.addoption("--db_name", action="store", default="default", help="database name for connection") parser.addoption("--secure", type=bool, action="store", default=False, help="secure for connection") parser.addoption("--milvus_ns", action="store", default="chaos-testing", help="milvus_ns") diff --git a/tests/python_client/milvus_client/test_milvus_client_alter.py b/tests/python_client/milvus_client/test_milvus_client_alter.py index 63f0d7f5a70e1..9a15700bfdc6f 100644 --- a/tests/python_client/milvus_client/test_milvus_client_alter.py +++ b/tests/python_client/milvus_client/test_milvus_client_alter.py @@ -70,8 +70,8 @@ def test_milvus_client_alter_index_default(self): client_w.load_collection(client, collection_name) res1 = client_w.describe_index(client, collection_name, index_name=idx_names[0])[0] assert res1.get('mmap.enabled', None) is None - error = {ct.err_code: 999, - ct.err_msg: "can't alter index on loaded collection, please release the collection first"} + error = {ct.err_code: 104, + ct.err_msg: f"can't alter index on loaded collection, please release the collection first: collection already loaded[collection={collection_name}]"} # 1. alter index after load client_w.alter_index_properties(client, collection_name, idx_names[0], properties={"mmap.enabled": True}, check_task=CheckTasks.err_res, check_items=error) diff --git a/tests/python_client/testcases/test_utility.py b/tests/python_client/testcases/test_utility.py index 883a2ab4270c0..4608208499d78 100644 --- a/tests/python_client/testcases/test_utility.py +++ b/tests/python_client/testcases/test_utility.py @@ -15,6 +15,8 @@ from common.milvus_sys import MilvusSys from pymilvus.grpc_gen.common_pb2 import SegmentState import random +from pymilvus.client.types import ResourceGroupConfig +import copy prefix = "utility" default_schema = cf.gen_default_collection_schema() @@ -5039,3 +5041,1537 @@ def do_flush_all(): res, _ = cw.query(f'{ct.default_int64_field_name} not in {delete_ids}') assert len(res) == ct.default_nb * 2 - delete_num + + +class TestUtilityNegativeRbacPrivilegeGroup(TestcaseBase): + + def teardown_method(self, method): + """ + teardown method: drop role and user + """ + log.info("[utility_teardown_method] Start teardown utility test cases ...") + + self.connection_wrap.connect(host=cf.param_info.param_host, port=cf.param_info.param_port, user=ct.default_user, + password=ct.default_password, secure=cf.param_info.param_secure) + + # drop users + users, _ = self.utility_wrap.list_users(False) + for u in users.groups: + if u.username != ct.default_user: + self.utility_wrap.delete_user(u.username) + user_groups, _ = self.utility_wrap.list_users(False) + assert len(user_groups.groups) == 1 + + role_groups, _ = self.utility_wrap.list_roles(False) + + # drop roles + for role_group in role_groups.groups: + if role_group.role_name not in ['admin', 'public']: + self.utility_wrap.init_role(role_group.role_name) + g_list, _ = self.utility_wrap.role_list_grants() + for g in g_list.groups: + self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege) + privilege_groups = self.utility_wrap.list_privilege_groups()[0] + for privilege_group in privilege_groups.groups: + if privilege_group.privilege_group not in ct.built_in_privilege_groups: + self.utility_wrap.drop_privilege_group(privilege_group.privilege_group) + self.utility_wrap.role_drop() + role_groups, _ = self.utility_wrap.list_roles(False) + assert len(role_groups.groups) == 2 + + # drop database + databases, _ = self.database_wrap.list_database() + for db_name in databases: + self.database_wrap.using_database(db_name) + for c_name in self.utility_wrap.list_collections()[0]: + self.utility_wrap.drop_collection(c_name) + + if db_name != ct.default_db: + self.database_wrap.drop_database(db_name) + + super().teardown_method(method) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", [1, 1.0]) + def test_create_privilege_group_with_privilege_group_name_invalid_type(self, name, host, port): + """ + target: create privilege group with invalid name + method: create privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("role_1") + error = {"err_code": 1, + "err_msg": f"`privilege_group` value {name} is illegal"} + self.utility_wrap.create_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"]) + def test_create_privilege_group_with_privilege_group_name_invalid_value_1(self, name, host, port): + """ + target: create privilege group with invalid name + method: create privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("role_1") + error = {"err_code": 1100, "err_msg": f"privilege group name {name} can only contain numbers, letters and underscores: invalid parameter"} + self.utility_wrap.create_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip(reason="issue #37842") + @pytest.mark.parametrize("name", ["longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong" + "longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong" + "longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong" + "longlonglonglong", "123n", " ", "''", "中文"]) + def test_create_privilege_group_with_privilege_group_name_invalid_value_2(self, name, host, port): + """ + target: create privilege group with invalid name + method: create privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("role_1") + error = {"err_code": 1402, "err_msg": f"{name}: invalid privilege group name[privilegeGroup=the first character " + f"of a privilege group name %s must be an underscore or letter]"} + self.utility_wrap.create_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_create_privilege_group_with_built_in_privilege_groups_name(self, host, port): + """ + target: create privilege group with built in privilege groups name + method: create privilege group with built in privilege groups name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + for name in ct.built_in_privilege_groups: + error = {"err_code": 1100, + "err_msg": f"privilege group name [{name}] is defined by built in privileges or privilege groups in system: invalid parameter"} + self.utility_wrap.create_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", [1, 1.0]) + def test_drop_privilege_group_with_privilege_group_name_invalid_type(self, name, host, port): + """ + target: drop privilege group with invalid name + method: drop privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("role_1") + error = {"err_code": 1, + "err_msg": f"`privilege_group` value {name} is illegal"} + self.utility_wrap.drop_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"]) + def test_drop_privilege_group_with_privilege_group_name_invalid_value_1(self, name, host, port): + """ + target: drop privilege group with invalid name + method: drop privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("role_1") + error = {"err_code": 1100, "err_msg": f"privilege group name {name} can only contain numbers, letters and underscores: invalid parameter"} + self.utility_wrap.drop_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.skip(reason="issue #37842") + @pytest.mark.parametrize("name", ["longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong" + "longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong" + "longlonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglonglong" + "longlonglonglong", "123n", " ", "''", "中文"]) + def test_drop_privilege_group_with_privilege_group_name_invalid_value_2(self, name, host, port): + """ + target: drop privilege group with invalid name + method: drop privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("role_1") + error = {"err_code": 1402, "err_msg": f"{name}: invalid privilege group name[privilegeGroup=the first character " + f"of a privilege group name %s must be an underscore or letter]"} + self.utility_wrap.drop_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_drop_not_exist_privilege_group(self, host, port): + """ + target: drop same privilege group twice + method: drop same privilege group twice + expected: drop successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name = "privilege_group_not_exist" + self.utility_wrap.drop_privilege_group(privilege_group=name) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_drop_same_privilege_group_twice(self, host, port): + """ + target: drop same privilege group twice + method: drop same privilege group twice + expected: drop successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name = "privilege_group_1" + self.utility_wrap.create_privilege_group(privilege_group=name) + privilege_groups = self.utility_wrap.list_privilege_groups()[0] + privilege_groups_extracted = [] + for privilege_group in privilege_groups.groups: + privilege_groups_extracted.append(privilege_group.privilege_group) + assert name in privilege_groups_extracted + self.utility_wrap.drop_privilege_group(privilege_group=name) + privilege_groups = self.utility_wrap.list_privilege_groups()[0] + privilege_groups_extracted = [] + for privilege_group in privilege_groups.groups: + privilege_groups_extracted.append(privilege_group.privilege_group) + assert name not in privilege_groups_extracted + self.utility_wrap.drop_privilege_group(privilege_group=name) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_drop_privilege_group_with_built_in_privilege_groups_name(self, host, port): + """ + target: drop privilege group with built in privilege groups name + method: drop privilege group with built in privilege groups name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + for name in ct.built_in_privilege_groups: + self.utility_wrap.drop_privilege_group(privilege_group=name) + privilege_groups = self.utility_wrap.list_privilege_groups()[0] + privilege_groups_extracted = [] + for privilege_group in privilege_groups.groups: + privilege_groups_extracted.append(privilege_group.privilege_group) + assert len(privilege_groups_extracted) == len(ct.built_in_privilege_groups) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_drop_privilege_group_granted(self, host, port): + """ + target: drop the privilege group granted to one role + method: drop the privilege group granted to one role + expected: 1. raise exception + 2. drop successfully after revoke + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_general(prefix)[0] + role_name = "role_1" + role = self.utility_wrap.init_role(role_name)[0] + role.create() + name = "privilege_group_1" + self.utility_wrap.create_privilege_group(privilege_group=name) + self.utility_wrap.role_grant_v2(name, collection_w.name) + error = {"err_code": 65535, + "err_msg": f"privilege group [{name}] is used by role [{role_name}], Use REVOKE API to revoke it first"} + self.utility_wrap.drop_privilege_group(privilege_group=name, check_task=CheckTasks.err_res, check_items=error) + self.utility_wrap.role_revoke_v2(name, collection_w.name) + self.utility_wrap.drop_privilege_group(privilege_group=name) + privilege_groups = self.utility_wrap.list_privilege_groups()[0] + privilege_groups_extracted = [] + for privilege_group in privilege_groups.groups: + privilege_groups_extracted.append(privilege_group.privilege_group) + assert name not in privilege_groups_extracted + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", [1, 1.0]) + def test_add_privileges_to_group_with_privilege_group_name_invalid_type(self, name, host, port): + """ + target: add privilege group with invalid name + method: add privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("role_1") + error = {"err_code": 1, + "err_msg": f"`privilege_group` value {name} is illegal"} + self.utility_wrap.add_privileges_to_group(privilege_group=name, privileges=["Insert"], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"]) + def test_add_privileges_to_group_with_privilege_group_name_invalid_value(self, name, host, port): + """ + target: add privilege group with invalid name + method: add privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("role_1") + error = {"err_code": 1100, "err_msg": f"privilege group name {name} can only contain numbers, letters and underscores: invalid parameter"} + self.utility_wrap.add_privileges_to_group(privilege_group=name, privileges=["Insert"], check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_add_privilege_into_not_exist_privilege_group(self, host, port): + """ + target: add privilege into not exist privilege group + method: add privilege into not exist privilege group + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name = "privilege_group_not_exist" + error = {"err_code": 1100, + "err_msg": f"there is no privilege group name [{name}] to operate: invalid parameter"} + self.utility_wrap.add_privileges_to_group(privilege_group=name, privileges=["Insert"], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_add_privilege_into_built_in_privilege_group(self, host, port): + """ + target: add privilege into not exist privilege group + method: add privilege into not exist privilege group + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + for name in ct.built_in_privilege_groups: + error = {"err_code": 1100, + "err_msg": f"the privilege group name [{name}] is defined " + f"by built in privilege groups in system: invalid parameter"} + self.utility_wrap.add_privileges_to_group(privilege_group=name, privileges=["Insert"], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", [1, 1.0, "n%$#@!", "test-role", "ff ff"]) + def test_add_privileges_to_group_with_privilege_invalid_type(self, name, host, port): + """ + target: add privilege group with invalid name + method: add privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("role_1") + error = {"err_code": 1, + "err_msg": f"`privileges` value {name} is illegal"} + self.utility_wrap.add_privileges_to_group(privilege_group="privilege_group_1", privileges=name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_add_privileges_to_group_with_privilege_invalid_value(self, host, port): + """ + target: add privilege group with invalid name + method: add privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name = "privilege_group_1" + self.utility_wrap.create_privilege_group(privilege_group=name) + privilege_name = "invalid_privilege" + error = {"err_code": 1100, "err_msg": f"there is no privilege name or privielge group name [{privilege_name}] " + f"defined in system to operate: invalid parameter"} + self.utility_wrap.add_privileges_to_group(privilege_group="privilege_group_1", privileges=[privilege_name], check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", [1, 1.0]) + def test_remove_privileges_to_group_with_privilege_group_name_invalid_type(self, name, host, port): + """ + target: remove privilege group with invalid name + method: remove privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("role_1") + error = {"err_code": 1, + "err_msg": f"`privilege_group` value {name} is illegal"} + self.utility_wrap.remove_privileges_from_group(privilege_group=name, privileges=["Insert"], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"]) + def test_remove_privileges_to_group_with_privilege_group_name_invalid_value(self, name, host, port): + """ + target: remove privilege group with invalid name + method: remove privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("role_1") + error = {"err_code": 1100, "err_msg": f"privilege group name {name} can only contain numbers, letters and underscores: invalid parameter"} + self.utility_wrap.remove_privileges_from_group(privilege_group=name, privileges=["Insert"], check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_remove_privilege_into_not_exist_privilege_group(self, host, port): + """ + target: remove privilege into not exist privilege group + method: remove privilege into not exist privilege group + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name = "privilege_group_not_exist" + error = {"err_code": 1100, + "err_msg": f"there is no privilege group name [{name}] to operate: invalid parameter"} + self.utility_wrap.remove_privileges_from_group(privilege_group=name, privileges=["Insert"], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_remove_privilege_into_built_in_privilege_group(self, host, port): + """ + target: remove privilege into not exist privilege group + method: remove privilege into not exist privilege group + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + for name in ct.built_in_privilege_groups: + error = {"err_code": 1100, + "err_msg": f"the privilege group name [{name}] is defined " + f"by built in privilege groups in system: invalid parameter"} + self.utility_wrap.remove_privileges_from_group(privilege_group=name, privileges=["Insert"], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", [1, 1.0, "n%$#@!", "test-role", "ff ff"]) + def test_remove_privileges_to_group_with_privilege_invalid_type(self, name, host, port): + """ + target: remove privilege group with invalid name + method: remove privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("role_1") + error = {"err_code": 1, + "err_msg": f"`privileges` value {name} is illegal"} + self.utility_wrap.remove_privileges_from_group(privilege_group="privilege_group_1", privileges=name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_remove_privileges_to_group_with_privilege_invalid_value(self, host, port): + """ + target: remove privilege group with invalid name + method: remove privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name = "privilege_group_1" + self.utility_wrap.create_privilege_group(privilege_group=name) + privilege_name = "invalid_privilege" + error = {"err_code": 1100, "err_msg": f"there is no privilege name or privielge group name [{privilege_name}] " + f"defined in system to operate: invalid parameter"} + self.utility_wrap.remove_privileges_from_group(privilege_group="privilege_group_1", privileges=[privilege_name], + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", [1, 1.0, "n%$#@!", "test-role", "ff ff"]) + def test_remove_privileges_to_group_with_privilege_invalid_type(self, name, host, port): + """ + target: remove privilege group with invalid name + method: remove privilege group with invalid name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.init_role("role_1") + error = {"err_code": 1, + "err_msg": f"`privileges` value {name} is illegal"} + self.utility_wrap.remove_privileges_from_group(privilege_group="privilege_group_1", privileges=name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", [1, 1.0]) + def test_grant_v2_privilege_invalid_type(self, name, host, port): + """ + target: grant v2 with invalid privilege name + method: grant v2 with invalid privilege name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_general(prefix)[0] + role = self.utility_wrap.init_role("role_1")[0] + role.create() + error = {"err_code": 1, + "err_msg": f"`privilege` value {name} is illegal"} + self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_w.name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_grant_v2_privilege_invalid_value(self, host, port): + """ + target: grant v2 with invalid privilege name + method: grant v2 with invalid privilege name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_general(prefix)[0] + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name = "privilege_group_1" + self.utility_wrap.create_privilege_group(privilege_group=name) + privilege_name = "invalid_privilege" + error = {"err_code": 65535, "err_msg": f"not found the privilege name[{privilege_name}]"} + self.utility_wrap.role_grant_v2(privilege=privilege_name, collection_name=collection_w.name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", [1, 1.0]) + def test_grant_v2_collection_name_invalid_type(self, name, host, port): + """ + target: grant v2 with invalid collection name + method: grant v2 with invalid collection name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + self.utility_wrap.create_privilege_group(privilege_group="privilege_group_1") + error = {"err_code": 1, "err_msg": f"`privilege` value {name} is illegal"} + self.utility_wrap.role_grant_v2(privilege=name, collection_name=name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_grant_v2_not_exist_collection(self, host, port): + """ + target: grant v2 with not exist collection + method: grant v2 with not exist collection + expected: grant successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name = "privilege_group_1" + self.utility_wrap.create_privilege_group(privilege_group=name) + self.utility_wrap.role_grant_v2(privilege=name, collection_name="not_exist_collection") + self.utility_wrap.role_revoke_v2(privilege=name, collection_name="not_exist_collection") + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", [1, 1.0]) + def test_grant_v2_db_name_invalid_type(self, name, host, port): + """ + target: grant v2 with invalid collection name + method: grant v2 with invalid collection name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + self.utility_wrap.create_privilege_group(privilege_group="privilege_group_1") + error = {"err_code": 1, "err_msg": f"`db_name` value {name} is illegal"} + self.utility_wrap.role_grant_v2(privilege="privilege_group_1", collection_name="collection_name", db_name=name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", ["n%$#@!", "test-role", "ff ff"]) + def test_grant_v2_db_name_invalid_value(self, name, host, port): + """ + target: grant v2 with invalid collection name + method: grant v2 with invalid collection name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + self.utility_wrap.create_privilege_group(privilege_group="privilege_group_1") + error = {"err_code": 802, "err_msg": f"database name can only contain numbers, letters and underscores: " + f"invalid database name[database={name}]"} + self.utility_wrap.role_grant_v2(privilege="privilege_group_1", collection_name="collection_name", + db_name=name, check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_grant_v2_not_exist_db_name(self, host, port): + """ + target: grant v2 with invalid collection name + method: grant v2 with invalid collection name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + self.utility_wrap.create_privilege_group(privilege_group="privilege_group_1") + self.utility_wrap.role_grant_v2(privilege="privilege_group_1", collection_name="collection_name", + db_name="not_exist_db") + self.utility_wrap.role_revoke_v2(privilege="privilege_group_1", collection_name="collection_name", + db_name="not_exist_db") + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("name", [1, 1.0]) + def test_revoke_v2_privilege_invalid_type(self, name, host, port): + """ + target: grant v2 with invalid privilege name + method: grant v2 with invalid privilege name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_general(prefix)[0] + role = self.utility_wrap.init_role("role_1")[0] + role.create() + error = {"err_code": 1, + "err_msg": f"`privilege` value {name} is illegal"} + self.utility_wrap.role_revoke_v2(privilege=name, collection_name=collection_w.name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_revoke_v2_privilege_invalid_value(self, host, port): + """ + target: grant v2 with invalid privilege name + method: grant v2 with invalid privilege name + expected: raise exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_general(prefix)[0] + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name = "privilege_group_1" + self.utility_wrap.create_privilege_group(privilege_group=name) + privilege_name = "invalid_privilege" + error = {"err_code": 65535, "err_msg": f"not found the privilege name[{privilege_name}]"} + self.utility_wrap.role_revoke_v2(privilege=privilege_name, collection_name=collection_w.name, + check_task=CheckTasks.err_res, check_items=error) + @pytest.mark.tags(CaseLabel.RBAC) + def test_grant_v2_database_built_in_invalid_collection_name(self, host, port): + """ + target: grant/revoke v2 normal case + method: grant/revoke v2 normal case + expected: grant successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_general(prefix)[0] + role = self.utility_wrap.init_role("role_1")[0] + role.create() + collection_name = collection_w.name + for name in ct.built_in_privilege_groups: + if name.startswith("Database") is not True: + continue + error = {"err_code": 1100, "err_msg": f"collectionName should be * for the database " + f"level privilege: {name}: invalid parameter"} + self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_name, + check_task=CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_grant_v2_cluster_built_in_invalid_collection_name(self, host, port): + """ + target: grant/revoke v2 normal case + method: grant/revoke v2 normal case + expected: grant successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_general(prefix)[0] + role = self.utility_wrap.init_role("role_1")[0] + role.create() + for name in ct.built_in_privilege_groups: + if name.startswith("Cluster") is not True: + continue + error = {"err_code": 1100, "err_msg": f"dbName and collectionName should be * for the cluster " + f"level privilege: {name}: invalid parameter"} + self.utility_wrap.role_grant_v2(privilege=name, collection_name="*", db_name="default", + check_task=CheckTasks.err_res, check_items=error) + self.utility_wrap.role_grant_v2(privilege=name, collection_name="*", + check_task=CheckTasks.err_res, check_items=error) + + +class TestUtilityPositiveRbacPrivilegeGroup(TestcaseBase): + + def teardown_method(self, method): + """ + teardown method: drop role and user + """ + log.info("[utility_teardown_method] Start teardown utility test cases ...") + + self.connection_wrap.connect(host=cf.param_info.param_host, port=cf.param_info.param_port, user=ct.default_user, + password=ct.default_password, secure=cf.param_info.param_secure) + + # drop users + users, _ = self.utility_wrap.list_users(False) + for u in users.groups: + if u.username != ct.default_user: + self.utility_wrap.delete_user(u.username) + user_groups, _ = self.utility_wrap.list_users(False) + assert len(user_groups.groups) == 1 + + role_groups, _ = self.utility_wrap.list_roles(False) + + # drop roles + for role_group in role_groups.groups: + if role_group.role_name not in ['admin', 'public']: + self.utility_wrap.init_role(role_group.role_name) + g_list, _ = self.utility_wrap.role_list_grants() + for g in g_list.groups: + # self.utility_wrap.role_revoke(g.object, g.object_name, g.privilege) + if g.privilege.startswith("Cluster"): + self.utility_wrap.role_revoke_v2(g.privilege, "*", "*") + else: + self.utility_wrap.role_revoke_v2(g.privilege, g.object_name) + privilege_groups = self.utility_wrap.list_privilege_groups()[0] + for privilege_group in privilege_groups.groups: + if privilege_group.privilege_group not in ct.built_in_privilege_groups: + self.utility_wrap.drop_privilege_group(privilege_group.privilege_group) + self.utility_wrap.role_drop() + role_groups, _ = self.utility_wrap.list_roles(False) + assert len(role_groups.groups) == 2 + + # drop database + databases, _ = self.database_wrap.list_database() + for db_name in databases: + self.database_wrap.using_database(db_name) + for c_name in self.utility_wrap.list_collections()[0]: + self.utility_wrap.drop_collection(c_name) + + if db_name != ct.default_db: + self.database_wrap.drop_database(db_name) + + super().teardown_method(method) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_create_privilege_groups(self, host, port): + """ + target: create valid privilege groups + method: create valid privilege groups + expected: create successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name_1 = "privilege_group_1" + self.utility_wrap.create_privilege_group(privilege_group=name_1) + name_2 = "privilege_group_2" + self.utility_wrap.create_privilege_group(privilege_group=name_2) + privilege_groups = self.utility_wrap.list_privilege_groups()[0] + privilege_groups_extracted = [] + for privilege_group in privilege_groups.groups: + privilege_groups_extracted.append(privilege_group.privilege_group) + assert name_1 in privilege_groups_extracted + assert name_2 in privilege_groups_extracted + + @pytest.mark.tags(CaseLabel.RBAC) + def test_create_large_numbers_privilege_groups(self, host, port): + """ + target: create large numbers of privilege groups + method: create 100 privilege groups + expected: create successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + number = 100 + # 1. create privilege groups + for i in range(number): + name = f"privilege_group_{i}" + self.utility_wrap.create_privilege_group(privilege_group=name) + privilege_groups = self.utility_wrap.list_privilege_groups()[0] + privilege_groups_extracted = [] + for privilege_group in privilege_groups.groups: + privilege_groups_extracted.append(privilege_group.privilege_group) + for i in range(number): + name = f"privilege_group_{i}" + assert name in privilege_groups_extracted + # 2. drop privilege groups + for i in range(number): + name = f"privilege_group_{i}" + self.utility_wrap.drop_privilege_group(privilege_group=name) + privilege_groups = self.utility_wrap.list_privilege_groups()[0] + privilege_groups_extracted = [] + for privilege_group in privilege_groups.groups: + privilege_groups_extracted.append(privilege_group.privilege_group) + for i in range(number): + name = f"privilege_group_{i}" + assert name not in privilege_groups_extracted + + @pytest.mark.tags(CaseLabel.RBAC) + def test_drop_privilege_groups(self, host, port): + """ + target: drop valid privilege groups + method: create valid privilege groups + expected: drop successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name_1 = "privilege_group_1" + self.utility_wrap.create_privilege_group(privilege_group=name_1) + name_2 = "privilege_group_2" + self.utility_wrap.create_privilege_group(privilege_group=name_2) + self.utility_wrap.drop_privilege_group(privilege_group=name_1) + self.utility_wrap.drop_privilege_group(privilege_group=name_2) + privilege_groups = self.utility_wrap.list_privilege_groups()[0] + privilege_groups_extracted = [] + for privilege_group in privilege_groups.groups: + privilege_groups_extracted.append(privilege_group.privilege_group) + assert name_1 not in privilege_groups_extracted + assert name_2 not in privilege_groups_extracted + + @pytest.mark.tags(CaseLabel.RBAC) + def test_add_privileges_to_group(self, host, port): + """ + target: add privilege group with invalid name + method: add privilege group with invalid name + expected: no exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name_1 = "privilege_group_1" + name_2 = "privilege_group_2" + self.utility_wrap.create_privilege_group(privilege_group=name_1) + self.utility_wrap.add_privileges_to_group(privilege_group=name_1, privileges=["Insert"]) + self.utility_wrap.add_privileges_to_group(privilege_group=name_1, privileges=["Insert"]) + self.utility_wrap.create_privilege_group(privilege_group=name_2) + self.utility_wrap.add_privileges_to_group(privilege_group=name_2, privileges=["Insert"]) + self.utility_wrap.add_privileges_to_group(privilege_group=name_2, privileges=["Insert"]) + self.utility_wrap.add_privileges_to_group(privilege_group=name_1, privileges=["Search"]) + self.utility_wrap.add_privileges_to_group(privilege_group=name_2, privileges=["Search"]) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_remove_privileges_to_group(self, host, port): + """ + target: remove privilege group with invalid name + method: remove privilege group with invalid name + expected: no exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name_1 = "privilege_group_1" + name_2 = "privilege_group_2" + self.utility_wrap.create_privilege_group(privilege_group=name_1) + self.utility_wrap.add_privileges_to_group(privilege_group=name_1, privileges=["Insert"]) + self.utility_wrap.add_privileges_to_group(privilege_group=name_1, privileges=["Insert"]) + self.utility_wrap.remove_privileges_from_group(privilege_group=name_1, privileges=["Insert"]) + self.utility_wrap.remove_privileges_from_group(privilege_group=name_1, privileges=["Insert"]) + self.utility_wrap.create_privilege_group(privilege_group=name_2) + self.utility_wrap.add_privileges_to_group(privilege_group=name_2, privileges=["Search"]) + self.utility_wrap.remove_privileges_from_group(privilege_group=name_2, privileges=["Search"]) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_list_built_in_privilege_groups(self, host, port): + """ + target: remove privilege group with invalid name + method: remove privilege group with invalid name + expected: no exception + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + res = self.utility_wrap.list_privilege_groups()[0] + for privilege_groups in res.groups: + if privilege_groups.privilege_group == "CollectionReadOnly": + assert privilege_groups.privileges == ('Query', 'Search', 'IndexDetail', 'GetFlushState', 'GetLoadState', + 'GetLoadingProgress', 'HasPartition', 'ShowPartitions', + 'DescribeCollection', 'DescribeAlias', 'GetStatistics', 'ListAliases') + elif privilege_groups.privilege_group == "CollectionReadWrite": + assert privilege_groups.privileges == ('Query', 'Search', 'IndexDetail', 'GetFlushState', 'GetLoadState', + 'GetLoadingProgress', 'HasPartition', 'ShowPartitions', + 'DescribeCollection', 'DescribeAlias', 'GetStatistics', 'ListAliases', + 'Load', 'Release', 'Insert', 'Delete', 'Upsert', 'Import', 'Flush', + 'Compaction', 'LoadBalance', 'CreateIndex', 'DropIndex', + 'CreatePartition', 'DropPartition') + elif privilege_groups.privilege_group == "CollectionAdmin": + assert privilege_groups.privileges == ('Query', 'Search', 'IndexDetail', 'GetFlushState', + 'GetLoadState', 'GetLoadingProgress', 'HasPartition', + 'ShowPartitions', 'DescribeCollection', 'DescribeAlias', + 'GetStatistics', 'ListAliases', 'Load', 'Release', 'Insert', + 'Delete', 'Upsert', 'Import', 'Flush', 'Compaction', 'LoadBalance', + 'CreateIndex', 'DropIndex', 'CreatePartition', 'DropPartition', + 'CreateAlias', 'DropAlias') + elif privilege_groups.privilege_group == "DatabaseReadOnly": + assert privilege_groups.privileges == ('ShowCollections', 'DescribeDatabase') + elif privilege_groups.privilege_group == "DatabaseReadWrite": + assert privilege_groups.privileges == ('ShowCollections', 'DescribeDatabase', 'AlterDatabase') + elif privilege_groups.privilege_group == "DatabaseAdmin": + assert privilege_groups.privileges == ('ShowCollections', 'DescribeDatabase', 'AlterDatabase', + 'CreateCollection', 'DropCollection') + elif privilege_groups.privilege_group == "ClusterReadOnly": + assert privilege_groups.privileges == ('ListDatabases', 'SelectOwnership', 'SelectUser', 'DescribeResourceGroup', + 'ListResourceGroups', 'ListPrivilegeGroups') + elif privilege_groups.privilege_group == "ClusterReadWrite": + assert privilege_groups.privileges == ('ListDatabases', 'SelectOwnership', 'SelectUser', 'DescribeResourceGroup', + 'ListResourceGroups', 'ListPrivilegeGroups', 'FlushAll', + 'TransferNode', 'TransferReplica', 'UpdateResourceGroups') + elif privilege_groups.privilege_group == "ClusterAdmin": + assert privilege_groups.privileges == ('ListDatabases', 'SelectOwnership', 'SelectUser', 'DescribeResourceGroup', + 'ListResourceGroups', 'ListPrivilegeGroups', 'FlushAll', + 'TransferNode', 'TransferReplica', 'UpdateResourceGroups', + 'BackupRBAC', 'RestoreRBAC', 'CreateDatabase', 'DropDatabase', + 'CreateOwnership', 'DropOwnership', 'ManageOwnership', + 'CreateResourceGroup', 'DropResourceGroup', 'UpdateUser', + 'RenameCollection', 'CreatePrivilegeGroup', 'DropPrivilegeGroup', + 'OperatePrivilegeGroup') + + @pytest.mark.tags(CaseLabel.RBAC) + def test_grant_revoke_v2_normal(self, host, port): + """ + target: grant/revoke v2 normal case + method: grant/revoke v2 normal case + expected: grant successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_general(prefix)[0] + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name = "privilege_group_1" + self.utility_wrap.create_privilege_group(privilege_group=name) + self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_w.name) + self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_w.name) + res = self.utility_wrap.role_list_grants()[0] + is_exist = False + for group in res.groups: + if group.privilege == name: + assert group.object == "Global" + assert group.object_name == collection_w.name + assert group.db_name == "default" + assert group.role_name == "role_1" + assert group.grantor_name == "root" + is_exist = True + assert is_exist == True + self.utility_wrap.role_revoke_v2(privilege=name, collection_name=collection_w.name) + self.utility_wrap.role_revoke_v2(privilege=name, collection_name=collection_w.name) + res = self.utility_wrap.role_list_grants()[0] + not_exist = True + for group in res.groups: + if group.privilege == name: + not_exist = False + assert not_exist == True + + @pytest.mark.tags(CaseLabel.RBAC) + def test_grant_revoke_v2_another_db(self, host, port): + """ + target: grant/revoke v2 normal case + method: grant/revoke v2 normal case + expected: grant successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + new_db = "db_1" + self.database_wrap.create_database(new_db) + collection_w = self.init_collection_general(prefix)[0] + role = self.utility_wrap.init_role("role_1")[0] + role.create() + name = "privilege_group_1" + self.utility_wrap.create_privilege_group(privilege_group=name) + self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_w.name, db_name=new_db) + res = self.utility_wrap.role_list_grants(db_name=new_db)[0] + is_exist = False + for group in res.groups: + if group.privilege == name: + assert group.object == "Global" + assert group.object_name == collection_w.name + assert group.db_name == new_db + assert group.role_name == "role_1" + assert group.grantor_name == "root" + is_exist = True + assert is_exist == True + self.utility_wrap.role_revoke_v2(privilege=name, collection_name=collection_w.name, db_name=new_db) + res = self.utility_wrap.role_list_grants(db_name=new_db)[0] + not_exist = True + for group in res.groups: + if group.privilege == name: + not_exist = False + assert not_exist == True + res = self.utility_wrap.role_list_grants(db_name='default')[0] + not_exist = True + for group in res.groups: + if group.privilege == name: + not_exist = False + assert not_exist == True + + @pytest.mark.tags(CaseLabel.RBAC) + def test_grant_revoke_v2_privilege(self, host, port): + """ + target: grant/revoke v2 normal case + method: grant/revoke v2 normal case + expected: grant successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_general(prefix)[0] + role = self.utility_wrap.init_role("role_1")[0] + role.create() + collection_name = collection_w.name + db_name = "default" + for name in ct.built_in_privilege_groups: + if name.startswith("Database"): + collection_name = "*" + if name.startswith("Cluster"): + collection_name = "*" + db_name = "*" + self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_name, db_name=db_name) + res = self.utility_wrap.role_list_grants()[0] + is_exist = False + for group in res.groups: + if group.privilege == name: + assert group.object == "Global" + assert group.object_name == collection_name + assert group.db_name == db_name + assert group.role_name == "role_1" + assert group.grantor_name == "root" + is_exist = True + assert is_exist == True + self.utility_wrap.role_revoke_v2(privilege=name, collection_name=collection_name, db_name=db_name) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_grant_revoke_v2_built_in_privilege_group(self, host, port): + """ + target: grant/revoke v2 normal case + method: grant/revoke v2 normal case + expected: grant successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_general(prefix)[0] + role = self.utility_wrap.init_role("role_1")[0] + role.create() + collection_name = collection_w.name + db_name = "default" + for name in ct.built_in_privilege_groups: + if name.startswith("Database"): + collection_name = "*" + if name.startswith("Cluster"): + collection_name = "*" + db_name = "*" + self.utility_wrap.role_grant_v2(privilege=name, collection_name=collection_name, db_name=db_name) + res = self.utility_wrap.role_list_grants()[0] + is_exist = False + for group in res.groups: + if group.privilege == name: + assert group.object == "Global" + assert group.object_name == collection_name + assert group.db_name == db_name + assert group.role_name == "role_1" + assert group.grantor_name == "root" + is_exist = True + assert is_exist == True + self.utility_wrap.role_revoke_v2(privilege=name, collection_name=collection_name, db_name=db_name) + + @pytest.mark.tags(CaseLabel.RBAC) + def test_grant_revoke_v2_duplicate_privilege_and_privilege_group(self, host, port): + """ + target: grant/revoke v2 normal case + method: grant/revoke v2 normal case + expected: grant successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + role = self.utility_wrap.init_role("role_1")[0] + role.create() + self.utility_wrap.role_grant_v2(privilege="CreatePrivilegeGroup", collection_name="*", db_name="*") + self.utility_wrap.role_grant_v2(privilege="ClusterAdmin", collection_name="*", db_name="*") + res = self.utility_wrap.role_list_grants()[0] + privilege_number = 0 + for group in res.groups: + if group.privilege == "CreatePrivilegeGroup": + assert group.object == "Global" + assert group.object_name == "*" + assert group.db_name == "*" + assert group.role_name == "role_1" + assert group.grantor_name == "root" + privilege_number += 1 + if group.privilege == "ClusterAdmin": + assert group.object == "Global" + assert group.object_name == "*" + assert group.db_name == "*" + assert group.role_name == "role_1" + assert group.grantor_name == "root" + privilege_number += 1 + assert privilege_number == 2 + new_user = "user1" + self.utility_wrap.create_user(user=new_user, password=ct.default_password) + self.utility_wrap.role_add_user(new_user) + self.connection_wrap.connect(host=host, port=port, user=new_user, + password=ct.default_password) + self.utility_wrap.create_privilege_group("privilege_group_1") + self.utility_wrap.drop_privilege_group("privilege_group_1") + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + self.utility_wrap.role_revoke_v2(privilege="CreatePrivilegeGroup", collection_name="*", db_name="*") + self.connection_wrap.connect(host=host, port=port, user=new_user, + password=ct.default_password) + self.utility_wrap.create_privilege_group("privilege_group_1") + self.utility_wrap.drop_privilege_group("privilege_group_1") + self.utility_wrap.role_revoke_v2(privilege="ClusterAdmin", collection_name="*", db_name="*") + + @pytest.mark.tags(CaseLabel.RBAC) + @pytest.mark.parametrize("privilege_group_name", ct.built_in_privilege_groups) + def test_built_in_privilege_groups(self, host, port, privilege_group_name): + """ + target: test all the built in privilege groups + method: test all the built in privilege groups + expected: grant successfully + """ + self.connection_wrap.connect(host=host, port=port, user=ct.default_user, + password=ct.default_password, check_task=ct.CheckTasks.ccr) + collection_w = self.init_collection_general(prefix)[0] + partition_w = self.init_partition_wrap(collection_w, "_default") + collection_w_1 = self.init_collection_general(prefix+'new')[0] + collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index) + collection_w.load() + role = self.utility_wrap.init_role("role_1")[0] + role.create() + collection_name = collection_w.name + db_name = "default" + if privilege_group_name.startswith("Database"): + collection_name = "*" + if privilege_group_name.startswith("Cluster"): + collection_name = "*" + db_name = "*" + self.utility_wrap.role_grant_v2(privilege=privilege_group_name, collection_name=collection_name, db_name=db_name) + new_user = "user1" + self.utility_wrap.create_user(user=new_user, password=ct.default_password) + new_user_1 = "user2" + self.utility_wrap.create_user(user=new_user_1, password=ct.default_password) + self.utility_wrap.role_add_user(new_user) + privilege_group_privilege_dict = copy.deepcopy(ct.privilege_group_privilege_dict) + res = self.utility_wrap.list_privilege_groups()[0] + for privilege_groups in res.groups: + if privilege_groups.privilege_group == privilege_group_name: + for single_privilege in privilege_groups.privileges: + if single_privilege in privilege_group_privilege_dict.keys(): + privilege_group_privilege_dict[single_privilege] = True + break + # check all the granted privileges using api for the built in privilege group + self.connection_wrap.connect(host=host, port=port, user=new_user, password=ct.default_password) + # wait to make sure the grant takes effect + time.sleep(20) + # PrivilegeQuery + if privilege_group_privilege_dict["Query"]: + res = collection_w.query(expr="int64==0")[0] + else: + res = collection_w.query(expr="int64==0", check_task=CheckTasks.check_permission_deny) + # PrivilegeSearch + vectors = [[random.random() for _ in range(ct.default_dim)] for _ in range(1)] + if privilege_group_privilege_dict["Search"]: + collection_w.search(vectors, ct.default_float_vec_field_name, {}, 1, + check_task=CheckTasks.check_search_results, + check_items={ + "nq": 1, + "limit": 0}) + else: + collection_w.search(vectors, ct.default_float_vec_field_name, {}, 1, + check_task=CheckTasks.check_permission_deny) + # PrivilegeGetLoadState + if privilege_group_privilege_dict["GetLoadState"]: + res = self.utility_wrap.load_state(collection_name=collection_w.name) + res = self.utility_wrap.wait_for_loading_complete(collection_name=collection_w.name) + else: + res = self.utility_wrap.load_state(collection_name=collection_w.name, + check_task=CheckTasks.check_permission_deny) + res = self.utility_wrap.wait_for_loading_complete(collection_name=collection_w.name, + check_task=CheckTasks.check_permission_deny) + # PrivilegeGetLoadingProgress + if privilege_group_privilege_dict["GetLoadingProgress"]: + res = self.utility_wrap.loading_progress(collection_name=collection_w.name) + else: + res = self.utility_wrap.loading_progress(collection_name=collection_w.name, + check_task=CheckTasks.check_permission_deny) + # PrivilegeHasPartition + if privilege_group_privilege_dict["HasPartition"]: + res = collection_w.has_partition(partition_name="_default")[0] + assert res == True + res = partition_w.name + res = partition_w.is_empty_without_flush + res = partition_w.num_entities_without_flush + else: + res = collection_w.has_partition(partition_name="_default", + check_task=CheckTasks.check_permission_deny) + try: + partition_w = self.init_partition_wrap(collection_w, "_default") + res = partition_w.name + res = partition_w.is_empty_without_flush + res = partition_w.num_entities_without_flush + except Exception as e: + log.info(e) + # PrivilegeShowPartitions + if privilege_group_privilege_dict["ShowPartitions"]: + res = collection_w.partitions + else: + try: + res = collection_w.partitions + except Exception as e: + log.info(e) + # PrivilegeShowCollections + # if privilege_group_privilege_dict["ShowCollections"]: + res = self.utility_wrap.list_collections()[0] + assert len(res) == 0 + # PrivilegeListAliases + res = self.utility_wrap.list_aliases(collection_name=collection_w.name) + # PrivilegeListDatabases + if privilege_group_privilege_dict["ListDatabases"]: + res = self.database_wrap.list_database()[0] + assert len(res) == 1 + assert res[0] == "default" + else: + res = self.database_wrap.list_database()[0] + assert len(res) == 1 + assert res[0] == "default" + # PrivilegeDescribeDatabase + if privilege_group_privilege_dict["DescribeDatabase"]: + res = self.database_wrap.describe_database(db_name="default") + else: + self.database_wrap.describe_database(db_name="default", check_task=CheckTasks.check_permission_deny) + # PrivilegeDescribeAlias + res = collection_w.aliases + # PrivilegeGetStatistics + if privilege_group_privilege_dict["GetStatistics"]: + res = collection_w.is_empty_without_flush + res = collection_w.num_entities_without_flush + else: + try: + res = collection_w.is_empty_without_flush + except Exception as e: + log.info(e) + try: + res = collection_w.num_entities_without_flush + except Exception as e: + log.info(e) + # PrivilegeDropIndex + if privilege_group_privilege_dict["DropIndex"]: + collection_w.release() + collection_w.drop_index(index_name=ct.default_float_vec_field_name) + else: + collection_w.drop_index(index_name=ct.default_float_vec_field_name, check_task=CheckTasks.check_permission_deny) + # PrivilegeCreateIndex + if privilege_group_privilege_dict["CreateIndex"]: + collection_w.create_index(field_name=ct.default_float_vec_field_name, index_name="index1") + collection_w.alter_index(index_name="index1", extra_params={}) + collection_w.load() + else: + collection_w.create_index(field_name=ct.default_float_vec_field_name, check_task=CheckTasks.check_permission_deny) + collection_w.alter_index(index_name="index1", extra_params={}, check_task=CheckTasks.check_permission_deny) + # PrivilegeCreatePartition + if privilege_group_privilege_dict["CreatePartition"]: + collection_w.create_partition("partition") + else: + collection_w.create_partition("partition", check_task=CheckTasks.check_permission_deny) + # PrivilegeDropPartition + if privilege_group_privilege_dict["DropPartition"]: + collection_w.release() + collection_w.drop_partition("partition") + collection_w.load() + else: + collection_w.drop_partition("partition", check_task=CheckTasks.check_permission_deny) + # PrivilegeLoad + if privilege_group_privilege_dict["Load"]: + collection_w.load() + else: + collection_w.load(check_task=CheckTasks.check_permission_deny) + # PrivilegeRelease + if privilege_group_privilege_dict["Release"]: + collection_w.release() + else: + collection_w.release(check_task=CheckTasks.check_permission_deny) + # PrivilegeInsert + data = cf.gen_default_dataframe_data() + if privilege_group_privilege_dict["Insert"]: + collection_w.insert(data=data) + else: + collection_w.insert(data=data, check_task=CheckTasks.check_permission_deny) + # PrivilegeDelete + if privilege_group_privilege_dict["Delete"]: + collection_w.delete(expr="int64==10") + else: + collection_w.delete(expr="int64==10", check_task=CheckTasks.check_permission_deny) + # PrivilegeUpsert + if privilege_group_privilege_dict["Upsert"]: + collection_w.upsert(data=data) + else: + collection_w.upsert(data=data, check_task=CheckTasks.check_permission_deny) + # PrivilegeImport + if privilege_group_privilege_dict["Import"]: + self.utility_wrap.do_bulk_insert(collection_name=collection_w.name, files=[""], + check_task=CheckTasks.err_res, + check_items={"err_code": 2100, + "err_msg": "unexpected file type, files=[]: importing data failed"}) + else: + self.utility_wrap.do_bulk_insert(collection_name=collection_w.name, files=[""], + check_task=CheckTasks.check_permission_deny) + # PrivilegeFlush + if privilege_group_privilege_dict["Flush"]: + collection_w.flush() + partition_w.flush() + else: + collection_w.flush(check_task=CheckTasks.check_permission_deny) + partition_w.flush(check_task=CheckTasks.check_permission_deny) + # PrivilegeCompaction + if privilege_group_privilege_dict["Compaction"]: + collection_w.compact() + else: + collection_w.compact(check_task=CheckTasks.check_permission_deny) + if privilege_group_privilege_dict["LoadBalance"]: + collection_w.load() + self.utility_wrap.load_balance(collection_name=collection_w.name, src_node_id=1, + dst_node_ids=[2, 3], sealed_segment_ids=[2, 3], + check_task=CheckTasks.err_res, + check_items={"err_code": 901, + "err_msg": "destination node not found in " + "the same replica: node not found[node=2]"}) + else: + self.utility_wrap.load_balance(collection_name=collection_w.name, src_node_id=1, + dst_node_ids=[2, 3], sealed_segment_ids=[2, 3], + check_task=CheckTasks.check_permission_deny) + # PrivilegeRenameCollection + if privilege_group_privilege_dict["RenameCollection"]: + self.utility_wrap.rename_collection(old_collection_name=collection_w.name, + new_collection_name=collection_w.name+'new') + self.utility_wrap.rename_collection(old_collection_name=collection_w.name+'new', + new_collection_name=collection_w.name) + else: + self.utility_wrap.rename_collection(old_collection_name=collection_w.name, + new_collection_name=collection_w.name, + check_task=CheckTasks.check_permission_deny) + # PrivilegeCreateAlias + if privilege_group_privilege_dict["CreateAlias"]: + self.utility_wrap.create_alias(collection_name=collection_w.name, alias="alias") + self.utility_wrap.alter_alias(collection_name=collection_w.name, alias="alias") + else: + self.utility_wrap.create_alias(collection_name=collection_w.name, alias="alias", + check_task=CheckTasks.check_permission_deny) + self.utility_wrap.alter_alias(collection_name=collection_w.name, alias="alias", + check_task=CheckTasks.check_permission_deny) + # PrivilegeDropAlias + if privilege_group_privilege_dict["DropAlias"]: + self.utility_wrap.drop_alias(alias="alias") + else: + self.utility_wrap.drop_alias(alias="alias", check_task=CheckTasks.check_permission_deny) + # PrivilegeCreateCollection + if privilege_group_privilege_dict["CreateCollection"]: + collection_w_new = self.init_collection_general(prefix, is_index=False, is_flush=False)[0] + collection_w_new.set_properties({"collection.ttl.seconds": 60}) + else: + try: + self.init_collection_general(prefix)[0] + except Exception as e: + log.info(e) + collection_w.set_properties({"collection.ttl.seconds": 60}, check_task=CheckTasks.check_permission_deny) + # PrivilegeDropCollection + if privilege_group_privilege_dict["DropCollection"]: + collection_w_new.drop() + self.utility_wrap.drop_collection(collection_w_new.name) + else: + collection_w.drop(check_task=CheckTasks.check_permission_deny) + self.utility_wrap.drop_collection(collection_w.name, check_task=CheckTasks.check_permission_deny) + # PrivilegeCreateOwnership + if privilege_group_privilege_dict["CreateOwnership"]: + role_1 = self.utility_wrap.init_role("role_new")[0] + role_1.create() + self.utility_wrap.create_user(user="user3", password=ct.default_password) + else: + role_1 = self.utility_wrap.init_role("role_new")[0] + self.utility_wrap.create_role(check_task=CheckTasks.check_permission_deny) + self.utility_wrap.create_user(user="user3", password=ct.default_password, + check_task=CheckTasks.check_permission_deny) + # PrivilegeDropOwnership + if privilege_group_privilege_dict["DropOwnership"]: + role_1 = self.utility_wrap.init_role("role_new")[0] + role_1.drop() + self.utility_wrap.delete_user("user3") + else: + role_1 = self.utility_wrap.init_role("role_new")[0] + self.utility_wrap.role_drop(check_task=CheckTasks.check_permission_deny) + self.utility_wrap.delete_user("user3", check_task=CheckTasks.check_permission_deny) + # PrivilegeSelectOwnership + role = self.utility_wrap.init_role("role_1")[0] + if privilege_group_privilege_dict["SelectOwnership"]: + self.utility_wrap.role_get_users() + self.utility_wrap.role_is_exist() + self.utility_wrap.list_usernames() + self.utility_wrap.list_roles(True) + res = self.utility_wrap.role_list_grants()[0] + else: + self.utility_wrap.role_get_users(check_task=CheckTasks.check_permission_deny) + self.utility_wrap.role_is_exist(check_task=CheckTasks.check_permission_deny) + self.utility_wrap.list_usernames(check_task=CheckTasks.check_permission_deny) + self.utility_wrap.list_roles(True, check_task=CheckTasks.check_permission_deny) + # default own the privilege of the role itself + res = self.utility_wrap.role_list_grants()[0] + # PrivilegeManageOwnership + if privilege_group_privilege_dict["ManageOwnership"]: + role = self.utility_wrap.init_role("role_1")[0] + self.utility_wrap.role_grant(object="Collection", object_name=collection_w.name, privilege="Insert") + self.utility_wrap.role_revoke(object="Collection", object_name=collection_w.name, privilege="Insert") + self.utility_wrap.role_grant_v2(privilege="Insert", collection_name=collection_w.name) + self.utility_wrap.role_revoke_v2(privilege="Insert", collection_name=collection_w.name) + self.utility_wrap.create_user("user3", "Milvus") + self.utility_wrap.role_add_user(username="user3") + self.utility_wrap.role_remove_user(username="user3") + else: + role = self.utility_wrap.init_role("role_1")[0] + self.utility_wrap.role_grant(object="Collection", object_name=collection_w.name, privilege="Insert", + check_task=CheckTasks.check_permission_deny) + self.utility_wrap.role_revoke(object="Collection", object_name=collection_w.name, privilege="Insert", + check_task=CheckTasks.check_permission_deny) + self.utility_wrap.role_grant_v2(privilege="Insert", collection_name=collection_w.name, + check_task=CheckTasks.check_permission_deny) + self.utility_wrap.role_revoke_v2(privilege="Insert", collection_name=collection_w.name, + check_task=CheckTasks.check_permission_deny) + self.utility_wrap.role_add_user(username="user3", + check_task=CheckTasks.check_permission_deny) + self.utility_wrap.role_remove_user(username="user3", + check_task=CheckTasks.check_permission_deny) + # PrivilegeUpdateUser + if privilege_group_privilege_dict["UpdateUser"]: + self.utility_wrap.reset_password(user=new_user_1, old_password=ct.default_password, + new_password=ct.default_password) + self.utility_wrap.update_password(user=new_user_1, old_password=ct.default_password, + new_password=ct.default_password) + else: + self.utility_wrap.reset_password(user=new_user_1, old_password=ct.default_password, + new_password=ct.default_password, + check_task=CheckTasks.check_permission_deny) + self.utility_wrap.update_password(user=new_user_1, old_password=ct.default_password, + new_password=ct.default_password, + check_task=CheckTasks.check_permission_deny) + # PrivilegeSelectUser + self.connection_wrap.connect(host=host, port=port, user=new_user, password=ct.default_password) + if privilege_group_privilege_dict["SelectUser"]: + self.utility_wrap.list_user(username="user1", include_role_info=True) + res = self.utility_wrap.list_users(include_role_info=True)[0] + assert len(res.groups) > 1 + else: + res = self.utility_wrap.list_user(username="user1", include_role_info=True)[0] + assert len(res.groups) == 1 + self.utility_wrap.list_users(include_role_info=True, check_task=CheckTasks.check_permission_deny)[0] + # PrivilegeCreateResourceGroup + if privilege_group_privilege_dict["CreateResourceGroup"]: + self.utility_wrap.create_resource_group(name="rg1") + else: + self.utility_wrap.create_resource_group(name="rg1", check_task=CheckTasks.check_permission_deny) + # PrivilegeDropResourceGroup + if privilege_group_privilege_dict["DropResourceGroup"]: + self.utility_wrap.drop_resource_group(name="rg1") + else: + self.utility_wrap.drop_resource_group(name="rg1", check_task=CheckTasks.check_permission_deny) + # PrivilegeUpdateResourceGroups + if privilege_group_privilege_dict["UpdateResourceGroups"]: + self.utility_wrap.update_resource_group(config={"resource_group_1": ResourceGroupConfig(requests={"node_num": 1})}, + check_task=CheckTasks.err_res, + check_items={"err_code": 300, + "err_msg": "resource group not found[rg=resource_group_1]"}) + else: + log.info("binbin6") + self.utility_wrap.update_resource_group(config={"resource_group_1": ResourceGroupConfig(requests={"node_num": 1})}, + check_task=CheckTasks.check_permission_deny) + # PrivilegeDescribeResourceGroup + if privilege_group_privilege_dict["DescribeResourceGroup"]: + self.utility_wrap.describe_resource_group(name="rg1", + check_task=CheckTasks.err_res, + check_items={"err_code": 300, + "err_msg": "resource group not found[rg=rg1]"}) + else: + self.utility_wrap.describe_resource_group(name="rg1", check_task=CheckTasks.check_permission_deny) + # PrivilegeListResourceGroups + if privilege_group_privilege_dict["ListResourceGroups"]: + self.utility_wrap.list_resource_groups() + else: + self.utility_wrap.list_resource_groups(check_task=CheckTasks.check_permission_deny) + # PrivilegeTransferNode + if privilege_group_privilege_dict["TransferNode"]: + self.utility_wrap.transfer_node(source="source", target="target", num_node=2, + check_task=CheckTasks.err_res, + check_items={"err_code": 300, + "err_msg": "resource group not found[rg=source]"} + ) + else: + self.utility_wrap.transfer_node(source="source", target="target", + num_node=2, check_task=CheckTasks.check_permission_deny) + # PrivilegeTransferReplica + if privilege_group_privilege_dict["TransferReplica"]: + self.utility_wrap.transfer_replica(source="source", target="target", + collection_name=collection_w.name, num_replica=2, + check_task=CheckTasks.err_res, + check_items={"err_code": 300, + "err_msg": "resource group not found[rg=source]"}) + else: + self.utility_wrap.transfer_replica(source="source", target="target", + collection_name=collection_w.name, num_replica=2, + check_task=CheckTasks.check_permission_deny) + # PrivilegeCreateDatabase + if privilege_group_privilege_dict["CreateDatabase"]: + self.database_wrap.create_database(db_name="new_db") + else: + self.database_wrap.create_database(db_name="new_db", check_task=CheckTasks.check_permission_deny) + # PrivilegeDropDatabase + if privilege_group_privilege_dict["DropDatabase"]: + self.database_wrap.drop_database(db_name="new_db") + else: + self.database_wrap.drop_database(db_name="default", check_task=CheckTasks.check_permission_deny) + # PrivilegeAlterDatabase + if privilege_group_privilege_dict["AlterDatabase"]: + self.database_wrap.set_properties(db_name="default", properties={}, + check_task=CheckTasks.err_res, + check_items={"err_code": 65535, + "err_msg": "alter database requires either properties " + "or deletekeys to modify or delete keys, " + "both cannot be empty"}) + else: + self.database_wrap.set_properties(db_name="default", properties={}, check_task=CheckTasks.check_permission_deny) + # PrivilegeFlushAll + if privilege_group_privilege_dict["FlushAll"]: + self.utility_wrap.flush_all() + else: + self.utility_wrap.flush_all(check_task=CheckTasks.check_permission_deny) + # PrivilegeListPrivilegeGroups + if privilege_group_privilege_dict["ListPrivilegeGroups"]: + self.utility_wrap.list_privilege_groups() + else: + self.utility_wrap.list_privilege_groups(check_task=CheckTasks.check_permission_deny) + # PrivilegeCreatePrivilegeGroup + if privilege_group_privilege_dict["CreatePrivilegeGroup"]: + self.utility_wrap.create_privilege_group("privilege_group_1") + else: + self.utility_wrap.create_privilege_group("privilege_group_1", check_task=CheckTasks.check_permission_deny) + # PrivilegeOperatePrivilegeGroup + if privilege_group_privilege_dict["OperatePrivilegeGroup"]: + self.utility_wrap.add_privileges_to_group("privilege_group_1", ["Insert"]) + self.utility_wrap.remove_privileges_from_group("privilege_group_1", ["Insert"]) + else: + self.utility_wrap.add_privileges_to_group("privilege_group_1", ["Insert"], + check_task=CheckTasks.check_permission_deny) + self.utility_wrap.remove_privileges_from_group("privilege_group_1", ["Insert"], + check_task=CheckTasks.check_permission_deny) + # PrivilegeDropPrivilegeGroup + if privilege_group_privilege_dict["DropPrivilegeGroup"]: + self.utility_wrap.drop_privilege_group("privilege_group_1") + else: + self.utility_wrap.drop_privilege_group("privilege_group_1", check_task=CheckTasks.check_permission_deny)