Skip to content

Commit

Permalink
Implement IsIcebergTablePredicate
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamza Jugon committed Nov 27, 2024
1 parent 1206eb8 commit 5517c7f
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@
import com.expediagroup.beekeeper.cleanup.metadata.CleanerClient;
import com.expediagroup.beekeeper.cleanup.metadata.CleanerClientFactory;
import com.expediagroup.beekeeper.core.error.BeekeeperIcebergException;
import com.expediagroup.beekeeper.core.predicate.IsIcebergTablePredicate;

public class IcebergValidator {

private static final Logger log = LoggerFactory.getLogger(IcebergValidator.class);

private final CleanerClientFactory cleanerClientFactory;
private final IsIcebergTablePredicate isIcebergTablePredicate;

public IcebergValidator(CleanerClientFactory cleanerClientFactory) {
this.cleanerClientFactory = cleanerClientFactory;
this.isIcebergTablePredicate = new IsIcebergTablePredicate();
}

/**
Expand All @@ -46,10 +49,9 @@ public IcebergValidator(CleanerClientFactory cleanerClientFactory) {
*/
public void throwExceptionIfIceberg(String databaseName, String tableName) {
try (CleanerClient client = cleanerClientFactory.newInstance()) {
Map<String, String> parameters = client.getTableProperties(databaseName, tableName);
String tableType = parameters.getOrDefault("table_type", "").toLowerCase();
String metadataLocation = parameters.getOrDefault("metadata_location", "").toLowerCase();
if (tableType.contains("iceberg") || !metadataLocation.isEmpty()) {
Map<String, String> tableParameters = client.getTableProperties(databaseName, tableName);

if (isIcebergTablePredicate.test(tableParameters)) {
throw new BeekeeperIcebergException(
format("Iceberg table %s.%s is not currently supported in Beekeeper.", databaseName, tableName));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.expediagroup.beekeeper.core.predicate;

import java.util.Map;
import java.util.function.Predicate;

public class IsIcebergTablePredicate implements Predicate<Map<String, String>> {

private static final String METADATA_LOCATION_KEY = "metadata_location";
private static final String TABLE_TYPE_KEY = "table_type";
private static final String TABLE_TYPE_ICEBERG_VALUE = "iceberg";

@Override
public boolean test(Map<String, String> tableParameters) {
if (tableParameters == null) {
return false;
}

String metadataLocation = tableParameters.getOrDefault(METADATA_LOCATION_KEY, "").trim();
String tableType = tableParameters.getOrDefault(TABLE_TYPE_KEY, "");

boolean hasMetadataLocation = !metadataLocation.isEmpty();
boolean isIcebergType = tableType.toLowerCase().contains(TABLE_TYPE_ICEBERG_VALUE);

return hasMetadataLocation || isIcebergType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Copyright (C) 2019-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.expediagroup.beekeeper.core.predicate;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class IsIcebergTablePredicateTest {

private IsIcebergTablePredicate predicate;

@BeforeEach
void setUp() {
predicate = new IsIcebergTablePredicate();
}

@Test
void testNullTableParameters() {
assertFalse(predicate.test(null));
}

@Test
void testEmptyTableParameters() {
Map<String, String> tableParameters = new HashMap<>();
assertFalse(predicate.test(tableParameters));
}

@Test
void testNoMetadataLocationOrTableType() {
Map<String, String> tableParameters = Map.of("some_key", "some_value");
assertFalse(predicate.test(tableParameters));
}

@Test
void testHasMetadataLocation() {
Map<String, String> tableParameters = Map.of("metadata_location", "some/location/path");
assertTrue(predicate.test(tableParameters));
}

@Test
void testHasIcebergTableType() {
Map<String, String> tableParameters = Map.of("table_type", "ICEBERG");
assertTrue(predicate.test(tableParameters));
}

@Test
void testBothMetadataLocationAndTableType() {
Map<String, String> tableParameters = Map.of(
"metadata_location", "some/location/path",
"table_type", "iceberg");
assertTrue(predicate.test(tableParameters));
}

@Test
void testCaseInsensitiveIcebergType() {
Map<String, String> tableParameters = Map.of("table_type", "IcEbErG");
assertTrue(predicate.test(tableParameters));
}

@Test
void testWhitespaceInMetadataLocation() {
Map<String, String> tableParameters = Map.of("metadata_location", " ");
assertFalse(predicate.test(tableParameters));
}

@Test
void testIrrelevantTableType() {
Map<String, String> tableParameters = Map.of("table_type", "hive");
assertFalse(predicate.test(tableParameters));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,25 @@

import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent;
import com.expediagroup.beekeeper.core.model.LifecycleEventType;
import com.expediagroup.beekeeper.core.predicate.IsIcebergTablePredicate;

import java.util.Locale;
import java.util.Map;

@Component
public class IcebergTableListenerEventFilter implements ListenerEventFilter {

private static final Logger log = LogManager.getLogger(IcebergTableListenerEventFilter.class);

private static final String METADATA_LOCATION_KEY = "metadata_location";
private static final String TABLE_TYPE_KEY = "table_type";
private static final String TABLE_TYPE_ICEBERG_VALUE = "iceberg";
private final IsIcebergTablePredicate isIcebergPredicate = new IsIcebergTablePredicate();

@Override
public boolean isFiltered(ListenerEvent event, LifecycleEventType type) {
Map<String, String> tableParameters = event.getTableParameters();

if (tableParameters != null) {
String metadataLocation = tableParameters.getOrDefault(METADATA_LOCATION_KEY, "");
String tableType = tableParameters.getOrDefault(TABLE_TYPE_KEY, "");

boolean hasMetadataLocation = !metadataLocation.trim().isEmpty();
boolean isIcebergType = tableType.toLowerCase().contains(TABLE_TYPE_ICEBERG_VALUE);

if (hasMetadataLocation || isIcebergType) {
log.info("Iceberg table '{}.{}' is not currently supported in Beekeeper.",
event.getDbName(), event.getTableName());
return true;
}
if (isIcebergPredicate.test(tableParameters)) {
log.info("Iceberg table '{}.{}' is not currently supported in Beekeeper.",
event.getDbName(), event.getTableName());
return true;
}
return false;
}
}


0 comments on commit 5517c7f

Please sign in to comment.