diff --git a/beekeeper-cleanup/src/main/java/com/expediagroup/beekeeper/cleanup/validation/IcebergValidator.java b/beekeeper-cleanup/src/main/java/com/expediagroup/beekeeper/cleanup/validation/IcebergValidator.java index 048a3ba5..7ec15a3f 100644 --- a/beekeeper-cleanup/src/main/java/com/expediagroup/beekeeper/cleanup/validation/IcebergValidator.java +++ b/beekeeper-cleanup/src/main/java/com/expediagroup/beekeeper/cleanup/validation/IcebergValidator.java @@ -25,15 +25,18 @@ import com.expediagroup.beekeeper.cleanup.metadata.CleanerClient; import com.expediagroup.beekeeper.cleanup.metadata.CleanerClientFactory; import com.expediagroup.beekeeper.core.error.BeekeeperIcebergException; +import com.expediagroup.beekeeper.core.predicate.IsIcebergTablePredicate; public class IcebergValidator { private static final Logger log = LoggerFactory.getLogger(IcebergValidator.class); private final CleanerClientFactory cleanerClientFactory; + private final IsIcebergTablePredicate isIcebergTablePredicate; public IcebergValidator(CleanerClientFactory cleanerClientFactory) { this.cleanerClientFactory = cleanerClientFactory; + this.isIcebergTablePredicate = new IsIcebergTablePredicate(); } /** @@ -46,10 +49,9 @@ public IcebergValidator(CleanerClientFactory cleanerClientFactory) { */ public void throwExceptionIfIceberg(String databaseName, String tableName) { try (CleanerClient client = cleanerClientFactory.newInstance()) { - Map parameters = client.getTableProperties(databaseName, tableName); - String tableType = parameters.getOrDefault("table_type", "").toLowerCase(); - String metadataLocation = parameters.getOrDefault("metadata_location", "").toLowerCase(); - if (tableType.contains("iceberg") || !metadataLocation.isEmpty()) { + Map tableParameters = client.getTableProperties(databaseName, tableName); + + if (isIcebergTablePredicate.test(tableParameters)) { throw new BeekeeperIcebergException( format("Iceberg table %s.%s is not currently supported in Beekeeper.", databaseName, tableName)); } diff --git a/beekeeper-core/src/main/java/com/expediagroup/beekeeper/core/predicate/IsIcebergTablePredicate.java b/beekeeper-core/src/main/java/com/expediagroup/beekeeper/core/predicate/IsIcebergTablePredicate.java new file mode 100644 index 00000000..fb180d94 --- /dev/null +++ b/beekeeper-core/src/main/java/com/expediagroup/beekeeper/core/predicate/IsIcebergTablePredicate.java @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2019-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.core.predicate; + +import java.util.Map; +import java.util.function.Predicate; + +public class IsIcebergTablePredicate implements Predicate> { + + private static final String METADATA_LOCATION_KEY = "metadata_location"; + private static final String TABLE_TYPE_KEY = "table_type"; + private static final String TABLE_TYPE_ICEBERG_VALUE = "iceberg"; + + @Override + public boolean test(Map tableParameters) { + if (tableParameters == null) { + return false; + } + + String metadataLocation = tableParameters.getOrDefault(METADATA_LOCATION_KEY, "").trim(); + String tableType = tableParameters.getOrDefault(TABLE_TYPE_KEY, ""); + + boolean hasMetadataLocation = !metadataLocation.isEmpty(); + boolean isIcebergType = tableType.toLowerCase().contains(TABLE_TYPE_ICEBERG_VALUE); + + return hasMetadataLocation || isIcebergType; + } +} diff --git a/beekeeper-core/src/test/java/com/expediagroup/beekeeper/core/predicate/IsIcebergTablePredicateTest.java b/beekeeper-core/src/test/java/com/expediagroup/beekeeper/core/predicate/IsIcebergTablePredicateTest.java new file mode 100644 index 00000000..fcd1b7dc --- /dev/null +++ b/beekeeper-core/src/test/java/com/expediagroup/beekeeper/core/predicate/IsIcebergTablePredicateTest.java @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2019-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.core.predicate; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class IsIcebergTablePredicateTest { + + private IsIcebergTablePredicate predicate; + + @BeforeEach + void setUp() { + predicate = new IsIcebergTablePredicate(); + } + + @Test + void testNullTableParameters() { + assertFalse(predicate.test(null)); + } + + @Test + void testEmptyTableParameters() { + Map tableParameters = new HashMap<>(); + assertFalse(predicate.test(tableParameters)); + } + + @Test + void testNoMetadataLocationOrTableType() { + Map tableParameters = Map.of("some_key", "some_value"); + assertFalse(predicate.test(tableParameters)); + } + + @Test + void testHasMetadataLocation() { + Map tableParameters = Map.of("metadata_location", "some/location/path"); + assertTrue(predicate.test(tableParameters)); + } + + @Test + void testHasIcebergTableType() { + Map tableParameters = Map.of("table_type", "ICEBERG"); + assertTrue(predicate.test(tableParameters)); + } + + @Test + void testBothMetadataLocationAndTableType() { + Map tableParameters = Map.of( + "metadata_location", "some/location/path", + "table_type", "iceberg"); + assertTrue(predicate.test(tableParameters)); + } + + @Test + void testCaseInsensitiveIcebergType() { + Map tableParameters = Map.of("table_type", "IcEbErG"); + assertTrue(predicate.test(tableParameters)); + } + + @Test + void testWhitespaceInMetadataLocation() { + Map tableParameters = Map.of("metadata_location", " "); + assertFalse(predicate.test(tableParameters)); + } + + @Test + void testIrrelevantTableType() { + Map tableParameters = Map.of("table_type", "hive"); + assertFalse(predicate.test(tableParameters)); + } +} diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java index 5b0728fa..70a9fde2 100644 --- a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java @@ -21,38 +21,25 @@ import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent; import com.expediagroup.beekeeper.core.model.LifecycleEventType; +import com.expediagroup.beekeeper.core.predicate.IsIcebergTablePredicate; -import java.util.Locale; import java.util.Map; @Component public class IcebergTableListenerEventFilter implements ListenerEventFilter { private static final Logger log = LogManager.getLogger(IcebergTableListenerEventFilter.class); - - private static final String METADATA_LOCATION_KEY = "metadata_location"; - private static final String TABLE_TYPE_KEY = "table_type"; - private static final String TABLE_TYPE_ICEBERG_VALUE = "iceberg"; + private final IsIcebergTablePredicate isIcebergPredicate = new IsIcebergTablePredicate(); @Override public boolean isFiltered(ListenerEvent event, LifecycleEventType type) { Map tableParameters = event.getTableParameters(); - if (tableParameters != null) { - String metadataLocation = tableParameters.getOrDefault(METADATA_LOCATION_KEY, ""); - String tableType = tableParameters.getOrDefault(TABLE_TYPE_KEY, ""); - - boolean hasMetadataLocation = !metadataLocation.trim().isEmpty(); - boolean isIcebergType = tableType.toLowerCase().contains(TABLE_TYPE_ICEBERG_VALUE); - - if (hasMetadataLocation || isIcebergType) { - log.info("Iceberg table '{}.{}' is not currently supported in Beekeeper.", - event.getDbName(), event.getTableName()); - return true; - } + if (isIcebergPredicate.test(tableParameters)) { + log.info("Iceberg table '{}.{}' is not currently supported in Beekeeper.", + event.getDbName(), event.getTableName()); + return true; } return false; } } - -