Skip to content

Commit

Permalink
fix drop catalog error
Browse files Browse the repository at this point in the history
  • Loading branch information
mchades committed Dec 4, 2024
1 parent 0327979 commit 9af48e9
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.gravitino.client.GravitinoMetalake;
import org.apache.gravitino.exceptions.ConnectionFailedException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NonEmptyCatalogException;
import org.apache.gravitino.exceptions.NotFoundException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.integration.test.container.ContainerSuite;
Expand Down Expand Up @@ -136,8 +137,8 @@ public void startup() throws IOException, SQLException {

mysqlService = new MysqlService(MYSQL_CONTAINER, TEST_DB_NAME);
createMetalake();
createCatalog();
createSchema();
catalog = createCatalog(catalogName);
createSchema(catalog, schemaName);
}

@AfterAll
Expand All @@ -153,7 +154,7 @@ public void stop() {
@AfterEach
public void resetSchema() {
clearTableAndSchema();
createSchema();
createSchema(catalog, schemaName);
}

private void clearTableAndSchema() {
Expand All @@ -176,7 +177,7 @@ private void createMetalake() {
metalake = loadMetalake;
}

private void createCatalog() throws SQLException {
private Catalog createCatalog(String catalogName) throws SQLException {
Map<String, String> catalogProperties = Maps.newHashMap();

catalogProperties.put(
Expand All @@ -196,10 +197,10 @@ private void createCatalog() throws SQLException {
Catalog loadCatalog = metalake.loadCatalog(catalogName);
Assertions.assertEquals(createdCatalog, loadCatalog);

catalog = loadCatalog;
return loadCatalog;
}

private void createSchema() {
private void createSchema(Catalog catalog, String schemaName) {
Map<String, String> prop = Maps.newHashMap();

Schema createdSchema = catalog.asSchemas().createSchema(schemaName, schema_comment, prop);
Expand Down Expand Up @@ -257,6 +258,25 @@ private Map<String, String> createProperties() {
return properties;
}

@Test
void testDropCatalog() throws SQLException {
// test drop catalog with legacy entity
String catalogName = GravitinoITUtils.genRandomName("drop_catalog_it");
Catalog catalog = createCatalog(catalogName);
String schemaName = GravitinoITUtils.genRandomName("drop_catalog_it");
createSchema(catalog, schemaName);

metalake.disableCatalog(catalogName);
Assertions.assertThrows(
NonEmptyCatalogException.class, () -> metalake.dropCatalog(catalogName));

// drop database externally
String sql = String.format("DROP DATABASE %s", schemaName);
mysqlService.executeQuery(sql);

Assertions.assertTrue(metalake.dropCatalog(catalogName));
}

@Test
void testTestConnection() throws SQLException {
Map<String, String> catalogProperties = Maps.newHashMap();
Expand Down
60 changes: 46 additions & 14 deletions core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -643,24 +643,25 @@ public boolean dropCatalog(NameIdentifier ident, boolean force)
"Catalog %s is in use, please disable it first or use force option", ident);
}

List<SchemaEntity> schemas =
store.list(
Namespace.of(ident.namespace().level(0), ident.name()),
SchemaEntity.class,
EntityType.SCHEMA);
Namespace schemaNamespace = Namespace.of(ident.namespace().level(0), ident.name());
CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);

List<SchemaEntity> schemaEntities =
store.list(schemaNamespace, SchemaEntity.class, EntityType.SCHEMA);
CatalogEntity catalogEntity = store.get(ident, EntityType.CATALOG, CatalogEntity.class);

if (!schemas.isEmpty() && !force) {
// the Kafka catalog is special, it includes a default schema
if (!catalogEntity.getProvider().equals("kafka") || schemas.size() > 1) {
throw new NonEmptyCatalogException(
"Catalog %s has schemas, please drop them first or use force option", ident);
}
if (haveAvailableSchemas(schemaEntities, catalogEntity, catalogWrapper) && !force) {
throw new NonEmptyCatalogException(
"Catalog %s has schemas, please drop them first or use force option", ident);
}

CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
if (includeManagedEntities(catalogEntity)) {
schemas.forEach(
// code reach here in two cases:
// 1. the catalog does not have available schemas
// 2. the catalog has available schemas, and force is true
// for case 1, the forEach block can drop them without any side effect
// for case 2, the forEach block will drop all managed sub-entities
schemaEntities.forEach(
schema -> {
try {
catalogWrapper.doWithSchemaOps(
Expand All @@ -677,11 +678,42 @@ public boolean dropCatalog(NameIdentifier ident, boolean force)
} catch (NoSuchMetalakeException | NoSuchCatalogException ignored) {
return false;

} catch (IOException e) {
} catch (GravitinoRuntimeException e) {
throw e;

} catch (Exception e) {
throw new RuntimeException(e);
}
}

private boolean haveAvailableSchemas(
List<SchemaEntity> schemaEntities, CatalogEntity catalogEntity, CatalogWrapper catalogWrapper)
throws Exception {
if (schemaEntities.isEmpty()) {
return false;
}

if (schemaEntities.size() == 1 && catalogEntity.getProvider().equals("kafka")) {
return false;
}

NameIdentifier[] allSchemas =
catalogWrapper.doWithSchemaOps(
schemaOps ->
schemaOps.listSchemas(
Namespace.of(catalogEntity.namespace().level(0), catalogEntity.name())));
if (allSchemas.length == 0) {
return false;
}

Set<String> availableSchemaNames =
Arrays.stream(allSchemas).map(NameIdentifier::name).collect(Collectors.toSet());

// some schemas are dropped externally, but still exist in the entity store, those schemas are
// invalid
return schemaEntities.stream().map(SchemaEntity::name).anyMatch(availableSchemaNames::contains);
}

private boolean includeManagedEntities(CatalogEntity catalogEntity) {
return catalogEntity.getType().equals(FILESET);
}
Expand Down

0 comments on commit 9af48e9

Please sign in to comment.