diff --git a/CHANGELOG.md b/CHANGELOG.md index 72016af8..a268ed6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [3.6.0] - 2024-11-28 +### Added +- Added filter for Iceberg tables in `beekeeper-scheduler-apiary` to prevent scheduling paths and metadata for deletion. +- Added `IcebergValidator` to ensure Iceberg tables are identified and excluded from cleanup operations. + ## [3.5.7] - 2024-10-25 ### Changed - Added error handling for bad requests with incorrect sort parameters. diff --git a/beekeeper-cleanup/src/main/java/com/expediagroup/beekeeper/cleanup/hive/HiveMetadataCleaner.java b/beekeeper-cleanup/src/main/java/com/expediagroup/beekeeper/cleanup/hive/HiveMetadataCleaner.java index b4abf607..08db511c 100644 --- a/beekeeper-cleanup/src/main/java/com/expediagroup/beekeeper/cleanup/hive/HiveMetadataCleaner.java +++ b/beekeeper-cleanup/src/main/java/com/expediagroup/beekeeper/cleanup/hive/HiveMetadataCleaner.java @@ -18,6 +18,7 @@ import com.expediagroup.beekeeper.cleanup.metadata.CleanerClient; import com.expediagroup.beekeeper.cleanup.metadata.MetadataCleaner; import com.expediagroup.beekeeper.cleanup.monitoring.DeletedMetadataReporter; +import com.expediagroup.beekeeper.cleanup.validation.IcebergValidator; import com.expediagroup.beekeeper.core.config.MetadataType; import com.expediagroup.beekeeper.core.model.HousekeepingMetadata; import com.expediagroup.beekeeper.core.monitoring.TimedTaggable; @@ -25,14 +26,18 @@ public class HiveMetadataCleaner implements MetadataCleaner { private DeletedMetadataReporter deletedMetadataReporter; + private IcebergValidator icebergValidator; - public HiveMetadataCleaner(DeletedMetadataReporter deletedMetadataReporter) { + public HiveMetadataCleaner(DeletedMetadataReporter deletedMetadataReporter, IcebergValidator icebergValidator) { this.deletedMetadataReporter = deletedMetadataReporter; + this.icebergValidator = icebergValidator; } @Override @TimedTaggable("hive-table-deleted") public void dropTable(HousekeepingMetadata housekeepingMetadata, CleanerClient client) { + icebergValidator.throwExceptionIfIceberg(housekeepingMetadata.getDatabaseName(), + housekeepingMetadata.getTableName()); client.dropTable(housekeepingMetadata.getDatabaseName(), housekeepingMetadata.getTableName()); deletedMetadataReporter.reportTaggable(housekeepingMetadata, MetadataType.HIVE_TABLE); } @@ -40,6 +45,8 @@ public void dropTable(HousekeepingMetadata housekeepingMetadata, CleanerClient c @Override @TimedTaggable("hive-partition-deleted") public boolean dropPartition(HousekeepingMetadata housekeepingMetadata, CleanerClient client) { + icebergValidator.throwExceptionIfIceberg(housekeepingMetadata.getDatabaseName(), + housekeepingMetadata.getTableName()); boolean partitionDeleted = client .dropPartition(housekeepingMetadata.getDatabaseName(), housekeepingMetadata.getTableName(), housekeepingMetadata.getPartitionName()); diff --git a/beekeeper-cleanup/src/main/java/com/expediagroup/beekeeper/cleanup/validation/IcebergValidator.java b/beekeeper-cleanup/src/main/java/com/expediagroup/beekeeper/cleanup/validation/IcebergValidator.java new file mode 100644 index 00000000..7ec15a3f --- /dev/null +++ b/beekeeper-cleanup/src/main/java/com/expediagroup/beekeeper/cleanup/validation/IcebergValidator.java @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2019-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.cleanup.validation; + +import static java.lang.String.format; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.expediagroup.beekeeper.cleanup.metadata.CleanerClient; +import com.expediagroup.beekeeper.cleanup.metadata.CleanerClientFactory; +import com.expediagroup.beekeeper.core.error.BeekeeperIcebergException; +import com.expediagroup.beekeeper.core.predicate.IsIcebergTablePredicate; + +public class IcebergValidator { + + private static final Logger log = LoggerFactory.getLogger(IcebergValidator.class); + + private final CleanerClientFactory cleanerClientFactory; + private final IsIcebergTablePredicate isIcebergTablePredicate; + + public IcebergValidator(CleanerClientFactory cleanerClientFactory) { + this.cleanerClientFactory = cleanerClientFactory; + this.isIcebergTablePredicate = new IsIcebergTablePredicate(); + } + + /** + * Beekeeper currently does not support the Iceberg format. Iceberg tables in the Hive Metastore do not store partition information, + * causing Beekeeper to attempt to clean up the entire table due to the missing information. This method checks if + * the table is an Iceberg table and throws a BeekeeperIcebergException to stop the process. + * + * @param databaseName + * @param tableName + */ + public void throwExceptionIfIceberg(String databaseName, String tableName) { + try (CleanerClient client = cleanerClientFactory.newInstance()) { + Map tableParameters = client.getTableProperties(databaseName, tableName); + + if (isIcebergTablePredicate.test(tableParameters)) { + throw new BeekeeperIcebergException( + format("Iceberg table %s.%s is not currently supported in Beekeeper.", databaseName, tableName)); + } + } catch (Exception e) { + throw new BeekeeperIcebergException( + format("Unexpected exception when identifying if table %s.%s is Iceberg.", databaseName, tableName), e); + } + } +} diff --git a/beekeeper-cleanup/src/test/java/com/expediagroup/beekeeper/cleanup/aws/S3DryRunPathCleanerTest.java b/beekeeper-cleanup/src/test/java/com/expediagroup/beekeeper/cleanup/aws/S3DryRunPathCleanerTest.java index 42adb416..5fab57af 100644 --- a/beekeeper-cleanup/src/test/java/com/expediagroup/beekeeper/cleanup/aws/S3DryRunPathCleanerTest.java +++ b/beekeeper-cleanup/src/test/java/com/expediagroup/beekeeper/cleanup/aws/S3DryRunPathCleanerTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2023 Expedia, Inc. + * Copyright (C) 2019-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,13 +23,13 @@ import java.time.LocalDateTime; import org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider; -import org.junit.Rule; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; @@ -58,20 +58,18 @@ class S3DryRunPathCleanerTest { private HousekeepingPath housekeepingPath; private AmazonS3 amazonS3; private @Mock BytesDeletedReporter bytesDeletedReporter; + private boolean dryRunEnabled = true; private S3PathCleaner s3DryRunPathCleaner; - @Rule + @Container public static LocalStackContainer awsContainer = new LocalStackContainer( DockerImageName.parse("localstack/localstack:0.14.2")).withServices(S3); - static { - awsContainer.start(); - } - public static String S3_ENDPOINT = awsContainer.getEndpointConfiguration(S3).getServiceEndpoint(); @BeforeEach void setUp() { + String S3_ENDPOINT = awsContainer.getEndpointConfiguration(S3).getServiceEndpoint(); amazonS3 = AmazonS3ClientBuilder .standard() .withCredentials(new BasicAWSCredentialsProvider("accesskey", "secretkey")) diff --git a/beekeeper-cleanup/src/test/java/com/expediagroup/beekeeper/cleanup/hive/HiveMetadataCleanerTest.java b/beekeeper-cleanup/src/test/java/com/expediagroup/beekeeper/cleanup/hive/HiveMetadataCleanerTest.java index bf230190..5520b8fb 100644 --- a/beekeeper-cleanup/src/test/java/com/expediagroup/beekeeper/cleanup/hive/HiveMetadataCleanerTest.java +++ b/beekeeper-cleanup/src/test/java/com/expediagroup/beekeeper/cleanup/hive/HiveMetadataCleanerTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2021 Expedia, Inc. + * Copyright (C) 2019-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,8 @@ */ package com.expediagroup.beekeeper.cleanup.hive; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -26,7 +28,9 @@ import org.mockito.junit.jupiter.MockitoExtension; import com.expediagroup.beekeeper.cleanup.monitoring.DeletedMetadataReporter; +import com.expediagroup.beekeeper.cleanup.validation.IcebergValidator; import com.expediagroup.beekeeper.core.config.MetadataType; +import com.expediagroup.beekeeper.core.error.BeekeeperIcebergException; import com.expediagroup.beekeeper.core.model.HousekeepingMetadata; @ExtendWith(MockitoExtension.class) @@ -35,6 +39,7 @@ public class HiveMetadataCleanerTest { private @Mock HousekeepingMetadata housekeepingMetadata; private @Mock DeletedMetadataReporter deletedMetadataReporter; private @Mock HiveClient hiveClient; + private @Mock IcebergValidator icebergValidator; private HiveMetadataCleaner cleaner; private static final String DATABASE = "database"; @@ -43,14 +48,18 @@ public class HiveMetadataCleanerTest { @BeforeEach public void init() { - cleaner = new HiveMetadataCleaner(deletedMetadataReporter); + cleaner = new HiveMetadataCleaner(deletedMetadataReporter, icebergValidator); } @Test public void typicalDropTable() { when(housekeepingMetadata.getDatabaseName()).thenReturn(DATABASE); when(housekeepingMetadata.getTableName()).thenReturn(TABLE_NAME); + cleaner.dropTable(housekeepingMetadata, hiveClient); + + verify(icebergValidator).throwExceptionIfIceberg(DATABASE, TABLE_NAME); + verify(hiveClient).dropTable(DATABASE, TABLE_NAME); verify(deletedMetadataReporter).reportTaggable(housekeepingMetadata, MetadataType.HIVE_TABLE); } @@ -62,6 +71,9 @@ public void typicalDropPartition() { when(hiveClient.dropPartition(DATABASE, TABLE_NAME, PARTITION_NAME)).thenReturn(true); cleaner.dropPartition(housekeepingMetadata, hiveClient); + + verify(icebergValidator).throwExceptionIfIceberg(DATABASE, TABLE_NAME); + verify(hiveClient).dropPartition(DATABASE, TABLE_NAME, PARTITION_NAME); verify(deletedMetadataReporter).reportTaggable(housekeepingMetadata, MetadataType.HIVE_PARTITION); } @@ -81,4 +93,36 @@ public void tableExists() { cleaner.tableExists(hiveClient, DATABASE, TABLE_NAME); verify(hiveClient).tableExists(DATABASE, TABLE_NAME); } + + @Test + public void doesNotDropTableWhenIcebergTable() { + when(housekeepingMetadata.getDatabaseName()).thenReturn(DATABASE); + when(housekeepingMetadata.getTableName()).thenReturn(TABLE_NAME); + doThrow(new BeekeeperIcebergException("Iceberg table")) + .when(icebergValidator).throwExceptionIfIceberg(DATABASE, TABLE_NAME); + + assertThrows( + BeekeeperIcebergException.class, + () -> cleaner.dropTable(housekeepingMetadata, hiveClient) + ); + + verify(hiveClient, never()).dropTable(DATABASE, TABLE_NAME); + verify(deletedMetadataReporter, never()).reportTaggable(housekeepingMetadata, MetadataType.HIVE_TABLE); + } + + @Test + public void doesNotDropPartitionWhenIcebergTable() { + when(housekeepingMetadata.getDatabaseName()).thenReturn(DATABASE); + when(housekeepingMetadata.getTableName()).thenReturn(TABLE_NAME); + doThrow(new BeekeeperIcebergException("Iceberg table")) + .when(icebergValidator).throwExceptionIfIceberg(DATABASE, TABLE_NAME); + + assertThrows( + BeekeeperIcebergException.class, + () -> cleaner.dropPartition(housekeepingMetadata, hiveClient) + ); + + verify(hiveClient, never()).dropPartition(DATABASE, TABLE_NAME, PARTITION_NAME); + verify(deletedMetadataReporter, never()).reportTaggable(housekeepingMetadata, MetadataType.HIVE_PARTITION); + } } diff --git a/beekeeper-cleanup/src/test/java/com/expediagroup/beekeeper/cleanup/validation/IcebergValidatorTest.java b/beekeeper-cleanup/src/test/java/com/expediagroup/beekeeper/cleanup/validation/IcebergValidatorTest.java new file mode 100644 index 00000000..84eb88f8 --- /dev/null +++ b/beekeeper-cleanup/src/test/java/com/expediagroup/beekeeper/cleanup/validation/IcebergValidatorTest.java @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2019-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.cleanup.validation; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; + +import com.expediagroup.beekeeper.cleanup.metadata.CleanerClient; +import com.expediagroup.beekeeper.cleanup.metadata.CleanerClientFactory; +import com.expediagroup.beekeeper.core.error.BeekeeperIcebergException; + +public class IcebergValidatorTest { + + private CleanerClientFactory cleanerClientFactory; + private CleanerClient cleanerClient; + private IcebergValidator icebergValidator; + + @Before + public void setUp() throws Exception { + cleanerClientFactory = mock(CleanerClientFactory.class); + cleanerClient = mock(CleanerClient.class); + when(cleanerClientFactory.newInstance()).thenReturn(cleanerClient); + icebergValidator = new IcebergValidator(cleanerClientFactory); + } + + @Test(expected = BeekeeperIcebergException.class) + public void shouldThrowExceptionWhenTableTypeIsIceberg() throws Exception { + Map properties = new HashMap<>(); + properties.put("table_type", "ICEBERG"); + + when(cleanerClient.getTableProperties("db", "table")).thenReturn(properties); + + icebergValidator.throwExceptionIfIceberg("db", "table"); + verify(cleanerClientFactory).newInstance(); + verify(cleanerClient).close(); + } + + @Test(expected = BeekeeperIcebergException.class) + public void shouldThrowExceptionWhenMetadataIsIceberg() throws Exception { + Map properties = new HashMap<>(); + properties.put("metadata_location", "s3://db/table/metadata/0000.json"); + + when(cleanerClient.getTableProperties("db", "table")).thenReturn(properties); + + icebergValidator.throwExceptionIfIceberg("db", "table"); + } + + @Test + public void shouldNotThrowExceptionForNonIcebergTable() throws Exception { + Map properties = new HashMap<>(); + properties.put("table_type", "HIVE_TABLE"); + + when(cleanerClient.getTableProperties("db", "table")).thenReturn(properties); + + icebergValidator.throwExceptionIfIceberg("db", "table"); + verify(cleanerClientFactory).newInstance(); + verify(cleanerClient).close(); + } + + @Test + public void shouldThrowExceptionWhenOutputFormatIsNull() throws Exception { + Map properties = new HashMap<>(); + properties.put("table_type", null); + properties.put("metadata_location", null); + + when(cleanerClient.getTableProperties("db", "table")).thenReturn(properties); + + assertThatThrownBy(() -> icebergValidator.throwExceptionIfIceberg("db", "table")).isInstanceOf( + BeekeeperIcebergException.class); + } +} diff --git a/beekeeper-core/src/main/java/com/expediagroup/beekeeper/core/error/BeekeeperIcebergException.java b/beekeeper-core/src/main/java/com/expediagroup/beekeeper/core/error/BeekeeperIcebergException.java new file mode 100644 index 00000000..d85be542 --- /dev/null +++ b/beekeeper-core/src/main/java/com/expediagroup/beekeeper/core/error/BeekeeperIcebergException.java @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2019-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.core.error; + +public class BeekeeperIcebergException extends BeekeeperException { + + private static final long serialVersionUID = 1L; + + public BeekeeperIcebergException(String message, Exception e) { + super(message, e); + } + + public BeekeeperIcebergException(String message, Throwable e) { + super(message, e); + } + + public BeekeeperIcebergException(String message) { + super(message); + } +} diff --git a/beekeeper-core/src/main/java/com/expediagroup/beekeeper/core/predicate/IsIcebergTablePredicate.java b/beekeeper-core/src/main/java/com/expediagroup/beekeeper/core/predicate/IsIcebergTablePredicate.java new file mode 100644 index 00000000..33e067cc --- /dev/null +++ b/beekeeper-core/src/main/java/com/expediagroup/beekeeper/core/predicate/IsIcebergTablePredicate.java @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2019-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.core.predicate; + +import java.util.Map; +import java.util.function.Predicate; + +public class IsIcebergTablePredicate implements Predicate> { + + private static final String METADATA_LOCATION_KEY = "metadata_location"; + private static final String TABLE_TYPE_KEY = "table_type"; + private static final String TABLE_TYPE_ICEBERG_VALUE = "iceberg"; + + @Override + public boolean test(Map tableParameters) { + if (tableParameters == null || tableParameters.isEmpty()) { + return false; + } + + String metadataLocation = tableParameters.getOrDefault(METADATA_LOCATION_KEY, "").trim(); + String tableType = tableParameters.getOrDefault(TABLE_TYPE_KEY, ""); + + boolean hasMetadataLocation = !metadataLocation.isEmpty(); + boolean isIcebergType = tableType.toLowerCase().contains(TABLE_TYPE_ICEBERG_VALUE); + + return hasMetadataLocation || isIcebergType; + } +} diff --git a/beekeeper-core/src/test/java/com/expediagroup/beekeeper/core/predicate/IsIcebergTablePredicateTest.java b/beekeeper-core/src/test/java/com/expediagroup/beekeeper/core/predicate/IsIcebergTablePredicateTest.java new file mode 100644 index 00000000..fcd1b7dc --- /dev/null +++ b/beekeeper-core/src/test/java/com/expediagroup/beekeeper/core/predicate/IsIcebergTablePredicateTest.java @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2019-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.core.predicate; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class IsIcebergTablePredicateTest { + + private IsIcebergTablePredicate predicate; + + @BeforeEach + void setUp() { + predicate = new IsIcebergTablePredicate(); + } + + @Test + void testNullTableParameters() { + assertFalse(predicate.test(null)); + } + + @Test + void testEmptyTableParameters() { + Map tableParameters = new HashMap<>(); + assertFalse(predicate.test(tableParameters)); + } + + @Test + void testNoMetadataLocationOrTableType() { + Map tableParameters = Map.of("some_key", "some_value"); + assertFalse(predicate.test(tableParameters)); + } + + @Test + void testHasMetadataLocation() { + Map tableParameters = Map.of("metadata_location", "some/location/path"); + assertTrue(predicate.test(tableParameters)); + } + + @Test + void testHasIcebergTableType() { + Map tableParameters = Map.of("table_type", "ICEBERG"); + assertTrue(predicate.test(tableParameters)); + } + + @Test + void testBothMetadataLocationAndTableType() { + Map tableParameters = Map.of( + "metadata_location", "some/location/path", + "table_type", "iceberg"); + assertTrue(predicate.test(tableParameters)); + } + + @Test + void testCaseInsensitiveIcebergType() { + Map tableParameters = Map.of("table_type", "IcEbErG"); + assertTrue(predicate.test(tableParameters)); + } + + @Test + void testWhitespaceInMetadataLocation() { + Map tableParameters = Map.of("metadata_location", " "); + assertFalse(predicate.test(tableParameters)); + } + + @Test + void testIrrelevantTableType() { + Map tableParameters = Map.of("table_type", "hive"); + assertFalse(predicate.test(tableParameters)); + } +} diff --git a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest.java b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest.java index ccbf19c6..66382669 100644 --- a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest.java +++ b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest.java @@ -213,6 +213,17 @@ public void expiredMetadataMultipleAlterPartitionTableEvents() throws SQLExcepti assertExpiredMetadata(expiredMetadata.get(1), LOCATION_B, PARTITION_B_NAME); } + @Test + public void expiredMetadataCreateIcebergTableEvent() throws SQLException, IOException, URISyntaxException { + CreateTableSqsMessage createTableSqsMessage = new CreateTableSqsMessage(LOCATION_A, true, true); + amazonSQS.sendMessage(sendMessageRequest(createTableSqsMessage.getFormattedString())); + + await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getExpiredMetadataRowCount() == 0); + + List expiredMetadata = getExpiredMetadata(); + assertThat(expiredMetadata.size()).isEqualTo(0); + } + @Test public void healthCheck() { CloseableHttpClient client = HttpClientBuilder.create().build(); diff --git a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperMetadataCleanupIntegrationTest.java b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperMetadataCleanupIntegrationTest.java index d0e52df9..e45ee0b9 100644 --- a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperMetadataCleanupIntegrationTest.java +++ b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperMetadataCleanupIntegrationTest.java @@ -26,6 +26,7 @@ import static com.expediagroup.beekeeper.cleanup.monitoring.DeletedMetadataReporter.METRIC_NAME; import static com.expediagroup.beekeeper.core.model.HousekeepingStatus.DELETED; import static com.expediagroup.beekeeper.core.model.HousekeepingStatus.DISABLED; +import static com.expediagroup.beekeeper.core.model.HousekeepingStatus.SKIPPED; import static com.expediagroup.beekeeper.integration.CommonTestVariables.AWS_REGION; import static com.expediagroup.beekeeper.integration.CommonTestVariables.DATABASE_NAME_VALUE; import static com.expediagroup.beekeeper.integration.CommonTestVariables.LONG_CLEANUP_DELAY_VALUE; @@ -33,6 +34,7 @@ import static com.expediagroup.beekeeper.integration.CommonTestVariables.TABLE_NAME_VALUE; import java.sql.SQLException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -225,6 +227,25 @@ public void cleanupPartitionedTable() throws Exception { assertThat(amazonS3.doesObjectExist(BUCKET, PARTITIONED_OBJECT_KEY)).isFalse(); } + @Test + public void shouldSkipCleanupForIcebergTable() throws Exception { + Map tableProperties = new HashMap<>(); + tableProperties.put("table_type", "ICEBERG"); + + hiveTestUtils.createTableWithProperties( + PARTITIONED_TABLE_PATH, TABLE_NAME_VALUE, true, tableProperties, true); + amazonS3.putObject(BUCKET, PARTITIONED_TABLE_OBJECT_KEY, TABLE_DATA); + + insertExpiredMetadata(PARTITIONED_TABLE_PATH, null); + + await() + .atMost(TIMEOUT, TimeUnit.SECONDS) + .until(() -> getExpiredMetadata().get(0).getHousekeepingStatus() == SKIPPED); + + assertThat(metastoreClient.tableExists(DATABASE_NAME_VALUE, TABLE_NAME_VALUE)).isTrue(); + assertThat(amazonS3.doesObjectExist(BUCKET, PARTITIONED_TABLE_OBJECT_KEY)).isTrue(); + } + @Test public void cleanupPartitionButNotTable() throws Exception { Table table = hiveTestUtils.createTable(PARTITIONED_TABLE_PATH, TABLE_NAME_VALUE, true); diff --git a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest.java b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest.java index 92a3d0c0..1c3fd3a4 100644 --- a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest.java +++ b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/BeekeeperUnreferencedPathSchedulerApiaryIntegrationTest.java @@ -157,7 +157,7 @@ public void unreferencedAlterPartitionEvent() throws SQLException, IOException, public void unreferencedMultipleAlterPartitionEvent() throws IOException, SQLException, URISyntaxException { List .of(new AlterPartitionSqsMessage("s3://bucket/table/expiredTableLocation", - "s3://bucket/table/partitionLocation", "s3://bucket/table/unreferencedPartitionLocation", true, true), + "s3://bucket/table/partitionLocation", "s3://bucket/table/unreferencedPartitionLocation", true, true), new AlterPartitionSqsMessage("s3://bucket/table/expiredTableLocation2", "s3://bucket/table/partitionLocation2", "s3://bucket/table/partitionLocation", true, true)) .forEach(msg -> amazonSQS.sendMessage(sendMessageRequest(msg.getFormattedString()))); diff --git a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/model/CreateTableSqsMessage.java b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/model/CreateTableSqsMessage.java index 7e6e31a0..17349ed3 100644 --- a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/model/CreateTableSqsMessage.java +++ b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/model/CreateTableSqsMessage.java @@ -30,4 +30,17 @@ public CreateTableSqsMessage( setTableLocation(tableLocation); setExpired(isExpired); } + + public CreateTableSqsMessage( + String tableLocation, + boolean isIceberg, + boolean isExpired + ) throws IOException, URISyntaxException { + super(CREATE_TABLE); + setTableLocation(tableLocation); + setExpired(isExpired); + if (isIceberg) { + setIceberg(); + } + } } diff --git a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/model/SqsMessage.java b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/model/SqsMessage.java index c51b2513..da3412ad 100644 --- a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/model/SqsMessage.java +++ b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/model/SqsMessage.java @@ -93,6 +93,12 @@ public void setExpired(boolean isExpired) { tableParameters.add(EXPIRED_DATA_RETENTION_PERIOD_PROPERTY_KEY, new JsonPrimitive(SHORT_CLEANUP_DELAY_VALUE)); } + public void setIceberg() { + JsonObject tableParameters = apiaryEventMessageJsonObject.getAsJsonObject(EVENT_TABLE_PARAMETERS_KEY); + tableParameters.add("table_format", new JsonPrimitive("ICEBERG")); + tableParameters.add("metadata_location", new JsonPrimitive("s3://bucket/metadata")); + } + public void setWhitelisted(boolean isWhitelisted) { String whitelist = isWhitelisted ? eventType.toString() : ""; JsonObject tableParameters = apiaryEventMessageJsonObject.getAsJsonObject(EVENT_TABLE_PARAMETERS_KEY); diff --git a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/utils/HiveTestUtils.java b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/utils/HiveTestUtils.java index 896efe25..b8d66219 100644 --- a/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/utils/HiveTestUtils.java +++ b/beekeeper-integration-tests/src/test/java/com/expediagroup/beekeeper/integration/utils/HiveTestUtils.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; @@ -57,7 +58,7 @@ public Table createTable(String path, String tableName, boolean partitioned) thr } public Table createTable(String path, String tableName, boolean partitioned, boolean withBeekeeperProperty) - throws TException { + throws TException { Table hiveTable = new Table(); hiveTable.setDbName(DATABASE_NAME_VALUE); hiveTable.setTableName(tableName); @@ -88,7 +89,7 @@ public Table createTable(String path, String tableName, boolean partitioned, boo * @param hiveTable Table to add partitions to * @param partitionValues The list of partition values, e.g. ["2020-01-01", "0", "A"] * @throws Exception May be thrown if there is a problem when trying to write the data to the file, or when the client - * adds the partition to the table. + * adds the partition to the table. */ public void addPartitionsToTable(String path, Table hiveTable, List partitionValues) throws Exception { String eventDate = "/event_date=" + partitionValues.get(0); // 2020-01-01 @@ -111,4 +112,35 @@ private Partition newTablePartition(Table hiveTable, List values, URI lo partition.getSd().setLocation(location.toString()); return partition; } + + public Table createTableWithProperties(String path, String tableName, boolean partitioned, + Map tableProperties, boolean withBeekeeperProperty) + throws TException { + Table hiveTable = new Table(); + hiveTable.setDbName(DATABASE_NAME_VALUE); + hiveTable.setTableName(tableName); + hiveTable.setTableType(TableType.EXTERNAL_TABLE.name()); + hiveTable.putToParameters("EXTERNAL", "TRUE"); + + if (tableProperties != null) { + hiveTable.getParameters().putAll(tableProperties); + } + if (withBeekeeperProperty) { + hiveTable.putToParameters(LifecycleEventType.EXPIRED.getTableParameterName(), "true"); + } + if (partitioned) { + hiveTable.setPartitionKeys(PARTITION_COLUMNS); + } + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(DATA_COLUMNS); + sd.setLocation(path); + sd.setParameters(new HashMap<>()); + sd.setOutputFormat(TextOutputFormat.class.getName()); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setSerializationLib("org.apache.hadoop.hive.serde2.OpenCSVSerde"); + hiveTable.setSd(sd); + metastoreClient.createTable(hiveTable); + + return hiveTable; + } } diff --git a/beekeeper-metadata-cleanup/src/main/java/com/expediagroup/beekeeper/metadata/cleanup/context/CommonBeans.java b/beekeeper-metadata-cleanup/src/main/java/com/expediagroup/beekeeper/metadata/cleanup/context/CommonBeans.java index 353adfd9..60518cd0 100644 --- a/beekeeper-metadata-cleanup/src/main/java/com/expediagroup/beekeeper/metadata/cleanup/context/CommonBeans.java +++ b/beekeeper-metadata-cleanup/src/main/java/com/expediagroup/beekeeper/metadata/cleanup/context/CommonBeans.java @@ -48,6 +48,7 @@ import com.expediagroup.beekeeper.cleanup.service.CleanupService; import com.expediagroup.beekeeper.cleanup.service.DisableTablesService; import com.expediagroup.beekeeper.cleanup.service.RepositoryCleanupService; +import com.expediagroup.beekeeper.cleanup.validation.IcebergValidator; import com.expediagroup.beekeeper.core.repository.HousekeepingMetadataRepository; import com.expediagroup.beekeeper.metadata.cleanup.handler.ExpiredMetadataHandler; import com.expediagroup.beekeeper.metadata.cleanup.handler.MetadataHandler; @@ -80,8 +81,7 @@ public CloseableMetaStoreClientFactory metaStoreClientFactory() { @Bean Supplier metaStoreClientSupplier( - CloseableMetaStoreClientFactory metaStoreClientFactory, - HiveConf hiveConf) { + CloseableMetaStoreClientFactory metaStoreClientFactory, HiveConf hiveConf) { String name = "beekeeper-metadata-cleanup"; return new HiveMetaStoreClientSupplier(metaStoreClientFactory, hiveConf, name); } @@ -93,6 +93,11 @@ public CleanerClientFactory clientFactory( return new HiveClientFactory(metaStoreClientSupplier, dryRunEnabled); } + @Bean + public IcebergValidator icebergValidator(CleanerClientFactory clientFactory) { + return new IcebergValidator(clientFactory); + } + @Bean public DeletedMetadataReporter deletedMetadataReporter( MeterRegistry meterRegistry, @@ -102,8 +107,8 @@ public DeletedMetadataReporter deletedMetadataReporter( @Bean(name = "hiveTableCleaner") MetadataCleaner metadataCleaner( - DeletedMetadataReporter deletedMetadataReporter) { - return new HiveMetadataCleaner(deletedMetadataReporter); + DeletedMetadataReporter deletedMetadataReporter, IcebergValidator icebergValidator) { + return new HiveMetadataCleaner(deletedMetadataReporter, icebergValidator); } @Bean diff --git a/beekeeper-metadata-cleanup/src/main/java/com/expediagroup/beekeeper/metadata/cleanup/handler/ExpiredMetadataHandler.java b/beekeeper-metadata-cleanup/src/main/java/com/expediagroup/beekeeper/metadata/cleanup/handler/ExpiredMetadataHandler.java index 28a3cfe4..39fe7d5e 100644 --- a/beekeeper-metadata-cleanup/src/main/java/com/expediagroup/beekeeper/metadata/cleanup/handler/ExpiredMetadataHandler.java +++ b/beekeeper-metadata-cleanup/src/main/java/com/expediagroup/beekeeper/metadata/cleanup/handler/ExpiredMetadataHandler.java @@ -32,6 +32,7 @@ import com.expediagroup.beekeeper.cleanup.metadata.CleanerClientFactory; import com.expediagroup.beekeeper.cleanup.metadata.MetadataCleaner; import com.expediagroup.beekeeper.cleanup.path.PathCleaner; +import com.expediagroup.beekeeper.core.error.BeekeeperIcebergException; import com.expediagroup.beekeeper.core.model.HousekeepingMetadata; import com.expediagroup.beekeeper.core.model.HousekeepingStatus; import com.expediagroup.beekeeper.core.repository.HousekeepingMetadataRepository; @@ -77,11 +78,18 @@ public void cleanupMetadata(HousekeepingMetadata housekeepingMetadata, LocalDate if (deleted && !dryRunEnabled) { updateAttemptsAndStatus(housekeepingMetadata, DELETED); } + } catch (BeekeeperIcebergException e) { + updateAttemptsAndStatus(housekeepingMetadata, SKIPPED); + String logMessage = String.format("Table \"%s.%s\" is skipped because it is iceberg or could not be identified.", + housekeepingMetadata.getDatabaseName(), housekeepingMetadata.getTableName()); + log.info(logMessage); + log.debug(logMessage, e); } catch (Exception e) { updateAttemptsAndStatus(housekeepingMetadata, FAILED); - log - .warn("Unexpected exception when deleting metadata for table \"{}.{}\"", - housekeepingMetadata.getDatabaseName(), housekeepingMetadata.getTableName(), e); + String logMessage = String.format("Unexpected exception when deleting metadata for table \"%s.%s\".", + housekeepingMetadata.getDatabaseName(), housekeepingMetadata.getTableName()); + log.info(logMessage); + log.debug(logMessage, e); } } diff --git a/beekeeper-metadata-cleanup/src/test/java/com/expediagroup/beekeeper/metadata/cleanup/context/CommonBeansTest.java b/beekeeper-metadata-cleanup/src/test/java/com/expediagroup/beekeeper/metadata/cleanup/context/CommonBeansTest.java index 36085496..e2323868 100644 --- a/beekeeper-metadata-cleanup/src/test/java/com/expediagroup/beekeeper/metadata/cleanup/context/CommonBeansTest.java +++ b/beekeeper-metadata-cleanup/src/test/java/com/expediagroup/beekeeper/metadata/cleanup/context/CommonBeansTest.java @@ -49,6 +49,7 @@ import com.expediagroup.beekeeper.cleanup.service.CleanupService; import com.expediagroup.beekeeper.cleanup.service.DisableTablesService; import com.expediagroup.beekeeper.cleanup.service.RepositoryCleanupService; +import com.expediagroup.beekeeper.cleanup.validation.IcebergValidator; import com.expediagroup.beekeeper.core.repository.HousekeepingMetadataRepository; import com.expediagroup.beekeeper.metadata.cleanup.handler.ExpiredMetadataHandler; import com.expediagroup.beekeeper.metadata.cleanup.service.MetadataDisableTablesService; @@ -76,6 +77,7 @@ public class CommonBeansTest { private @Mock PathCleaner pathCleaner; private @Mock MeterRegistry meterRegistry; private @Mock HiveClientFactory hiveClientFactory; + private @Mock IcebergValidator icebergValidator; @BeforeEach public void awsSetUp() { @@ -122,7 +124,7 @@ public void verifyHiveClient() { @Test public void verifyHiveMetadataCleaner() { DeletedMetadataReporter reporter = commonBeans.deletedMetadataReporter(meterRegistry, false); - MetadataCleaner metadataCleaner = commonBeans.metadataCleaner(reporter); + MetadataCleaner metadataCleaner = commonBeans.metadataCleaner(reporter, icebergValidator); assertThat(metadataCleaner).isInstanceOf(HiveMetadataCleaner.class); } @@ -159,8 +161,7 @@ void verifyS3pathCleaner() { @Test public void verifyExpiredMetadataHandler() { ExpiredMetadataHandler expiredMetadataHandler = commonBeans.expiredMetadataHandler(hiveClientFactory, - metadataRepository, - metadataCleaner, pathCleaner); + metadataRepository, metadataCleaner, pathCleaner); assertThat(expiredMetadataHandler).isInstanceOf(ExpiredMetadataHandler.class); } diff --git a/beekeeper-metadata-cleanup/src/test/java/com/expediagroup/beekeeper/metadata/cleanup/service/PagingMetadataCleanupServiceTest.java b/beekeeper-metadata-cleanup/src/test/java/com/expediagroup/beekeeper/metadata/cleanup/service/PagingMetadataCleanupServiceTest.java index 1a14b3f8..00f8a762 100644 --- a/beekeeper-metadata-cleanup/src/test/java/com/expediagroup/beekeeper/metadata/cleanup/service/PagingMetadataCleanupServiceTest.java +++ b/beekeeper-metadata-cleanup/src/test/java/com/expediagroup/beekeeper/metadata/cleanup/service/PagingMetadataCleanupServiceTest.java @@ -235,10 +235,10 @@ public void mixOfAllPaths() { } @Test - void metadataCleanerException() { + public void metadataCleanerException() { Mockito .doNothing() - .doThrow(new BeekeeperException("Error")) + .doThrow(new RuntimeException("Error")) .when(metadataCleaner) .dropTable(Mockito.any(HousekeepingMetadata.class), Mockito.any(HiveClient.class)); @@ -270,7 +270,7 @@ void metadataCleanerException() { } @Test - void invalidPaths() { + public void invalidPaths() { List tables = List .of(createHousekeepingMetadata("table1", "s3://invalid", null, SCHEDULED), createHousekeepingMetadata("table2", "s3://invalid/path", "partition", SCHEDULED)); diff --git a/beekeeper-path-cleanup/src/test/java/com/expediagroup/beekeeper/path/cleanup/service/PagingCleanupServiceTest.java b/beekeeper-path-cleanup/src/test/java/com/expediagroup/beekeeper/path/cleanup/service/PagingCleanupServiceTest.java index db654ed4..c75b0d53 100644 --- a/beekeeper-path-cleanup/src/test/java/com/expediagroup/beekeeper/path/cleanup/service/PagingCleanupServiceTest.java +++ b/beekeeper-path-cleanup/src/test/java/com/expediagroup/beekeeper/path/cleanup/service/PagingCleanupServiceTest.java @@ -211,7 +211,6 @@ private HousekeepingPath createEntityHousekeepingPath(String path, HousekeepingS .housekeepingStatus(housekeepingStatus) .creationTimestamp(localNow) .modifiedTimestamp(localNow) - .modifiedTimestamp(localNow) .cleanupDelay(PeriodDuration.of(Duration.parse("P3D"))) .cleanupAttempts(0) .lifecycleType(UNREFERENCED.toString()) diff --git a/beekeeper-scheduler-apiary/pom.xml b/beekeeper-scheduler-apiary/pom.xml index bc492abf..258c3d24 100644 --- a/beekeeper-scheduler-apiary/pom.xml +++ b/beekeeper-scheduler-apiary/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 @@ -10,6 +11,7 @@ beekeeper-scheduler-apiary + com.amazonaws @@ -56,8 +58,8 @@ geronimo-jaspic_1.0_spec - org.eclipse.jetty - jetty-util + org.eclipse.jetty + jetty-util org.eclipse.jetty.aggregate diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java index 492017c6..8bfbae44 100644 --- a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeans.java @@ -39,6 +39,7 @@ import com.expediagroup.beekeeper.core.model.LifecycleEventType; import com.expediagroup.beekeeper.scheduler.apiary.filter.EventTypeListenerEventFilter; +import com.expediagroup.beekeeper.scheduler.apiary.filter.IcebergTableListenerEventFilter; import com.expediagroup.beekeeper.scheduler.apiary.filter.ListenerEventFilter; import com.expediagroup.beekeeper.scheduler.apiary.filter.LocationOnlyUpdateListenerEventFilter; import com.expediagroup.beekeeper.scheduler.apiary.filter.TableParameterListenerEventFilter; @@ -96,7 +97,8 @@ public MessageEventHandler unreferencedHousekeepingPathMessageEventHandler( new EventTypeListenerEventFilter(eventClasses), new LocationOnlyUpdateListenerEventFilter(), new TableParameterListenerEventFilter(), - new WhitelistedListenerEventFilter() + new WhitelistedListenerEventFilter(), + new IcebergTableListenerEventFilter() ); return new MessageEventHandler(generator, filters); @@ -120,7 +122,8 @@ public MessageEventHandler expiredHousekeepingMetadataMessageEventHandler( List filters = List.of( new EventTypeListenerEventFilter(eventClasses), - new TableParameterListenerEventFilter() + new TableParameterListenerEventFilter(), + new IcebergTableListenerEventFilter() ); return new MessageEventHandler(generator, filters); diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java new file mode 100644 index 00000000..3b2c2b20 --- /dev/null +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilter.java @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2019-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.scheduler.apiary.filter; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.stereotype.Component; + +import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent; +import com.expediagroup.beekeeper.core.model.LifecycleEventType; +import com.expediagroup.beekeeper.core.predicate.IsIcebergTablePredicate; + +import java.util.Map; + +@Component +public class IcebergTableListenerEventFilter implements ListenerEventFilter { + + private static final Logger log = LogManager.getLogger(IcebergTableListenerEventFilter.class); + private final IsIcebergTablePredicate isIcebergPredicate = new IsIcebergTablePredicate(); + + @Override + public boolean isFiltered(ListenerEvent event, LifecycleEventType type) { + Map tableParameters = event.getTableParameters(); + + if (isIcebergPredicate.test(tableParameters)) { + log.info("Ignoring table '{}.{}'. Iceberg tables are not supported in Beekeeper.", + event.getDbName(), event.getTableName()); + return true; + } + return false; + } +} diff --git a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandler.java b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandler.java index 44f0e2f2..d07912b2 100644 --- a/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandler.java +++ b/beekeeper-scheduler-apiary/src/main/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandler.java @@ -58,4 +58,8 @@ private boolean shouldFilterMessage(ListenerEvent listenerEvent) { private List generateHousekeepingEntities(ListenerEvent listenerEvent) { return generator.generate(listenerEvent, CLIENT_ID); } + + public List getFilters() { + return filters; + } } diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java index 2aa5f17b..cd1f776a 100644 --- a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/context/CommonBeansTest.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.EnumMap; +import java.util.List; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; @@ -32,6 +33,8 @@ import com.expedia.apiary.extensions.receiver.sqs.messaging.SqsMessageReader; import com.expediagroup.beekeeper.core.model.LifecycleEventType; +import com.expediagroup.beekeeper.scheduler.apiary.filter.IcebergTableListenerEventFilter; +import com.expediagroup.beekeeper.scheduler.apiary.filter.ListenerEventFilter; import com.expediagroup.beekeeper.scheduler.apiary.generator.ExpiredHousekeepingMetadataGenerator; import com.expediagroup.beekeeper.scheduler.apiary.generator.HousekeepingEntityGenerator; import com.expediagroup.beekeeper.scheduler.apiary.generator.UnreferencedHousekeepingPathGenerator; @@ -117,4 +120,18 @@ public void validatePathEventReader() { mock(MessageEventHandler.class)); assertThat(reader).isInstanceOf(BeekeeperEventReader.class); } + + @Test + public void validateUnreferencedHousekeepingPathMessageEventHandlerIncludesIcebergFilter() { + MessageEventHandler handler = commonBeans.unreferencedHousekeepingPathMessageEventHandler(unreferencedHousekeepingPathGenerator); + List filters = handler.getFilters(); + assertThat(filters).hasAtLeastOneElementOfType(IcebergTableListenerEventFilter.class); + } + + @Test + public void validateExpiredHousekeepingMetadataMessageEventHandlerIncludesIcebergFilter() { + MessageEventHandler handler = commonBeans.expiredHousekeepingMetadataMessageEventHandler(expiredHousekeepingMetadataGenerator); + List filters = handler.getFilters(); + assertThat(filters).hasAtLeastOneElementOfType(IcebergTableListenerEventFilter.class); + } } diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilterTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilterTest.java new file mode 100644 index 00000000..66dceebd --- /dev/null +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/filter/IcebergTableListenerEventFilterTest.java @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2019-2024 Expedia, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.expediagroup.beekeeper.scheduler.apiary.filter; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import com.expedia.apiary.extensions.receiver.common.event.ListenerEvent; +import com.expediagroup.beekeeper.core.model.LifecycleEventType; + +public class IcebergTableListenerEventFilterTest { + + private final IcebergTableListenerEventFilter filter = new IcebergTableListenerEventFilter(); + + @Test + public void shouldFilterWhenTableTypeIsIceberg() { + ListenerEvent event = createListenerEventWithTableType("ICEBERG"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertTrue(isFiltered); + } + + @Test + public void shouldNotFilterWhenTableTypeIsNotIceberg() { + ListenerEvent event = createListenerEventWithTableType("HIVE"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertFalse(isFiltered); + } + + @Test + public void shouldFilterWhenTableTypeIsIcebergIgnoreCase() { + ListenerEvent event = createListenerEventWithTableType("iceberg"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertTrue(isFiltered); + } + + @Test + public void shouldFilterWhenMetadataLocationIsPresent() { + ListenerEvent event = createListenerEventWithMetadataLocation("s3://example/path/to/metadata"); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertTrue(isFiltered); + } + + @Test + public void shouldNotFilterWhenMetadataLocationIsEmpty() { + ListenerEvent event = createListenerEventWithMetadataLocation(""); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertFalse(isFiltered); + } + + @Test + public void shouldHandleNullTableParameters() { + ListenerEvent event = createListenerEventWithTableParameters(null); + boolean isFiltered = filter.isFiltered(event, LifecycleEventType.EXPIRED); + assertFalse(isFiltered); + } + + private ListenerEvent createListenerEventWithTableType(String tableType) { + Map tableParameters = new HashMap<>(); + tableParameters.put("table_type", tableType); + return createListenerEventWithTableParameters(tableParameters); + } + + private ListenerEvent createListenerEventWithMetadataLocation(String metadataLocation) { + Map tableParameters = new HashMap<>(); + tableParameters.put("metadata_location", metadataLocation); + return createListenerEventWithTableParameters(tableParameters); + } + + private ListenerEvent createListenerEventWithTableParameters(Map tableParameters) { + return new ListenerEvent() { + @Override + public String getDbName() { + return "test_db"; + } + + @Override + public String getTableName() { + return "test_table"; + } + + @Override + public Map getTableParameters() { + return tableParameters; + } + }; + } +} diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/generator/ExpiredHousekeepingMetadataGeneratorTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/generator/ExpiredHousekeepingMetadataGeneratorTest.java index 7d753fa5..5041c910 100644 --- a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/generator/ExpiredHousekeepingMetadataGeneratorTest.java +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/generator/ExpiredHousekeepingMetadataGeneratorTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2022 Expedia, Inc. + * Copyright (C) 2019-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/generator/UnreferencedHousekeepingPathGeneratorTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/generator/UnreferencedHousekeepingPathGeneratorTest.java index f74bc4c7..c8dc6cd3 100644 --- a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/generator/UnreferencedHousekeepingPathGeneratorTest.java +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/generator/UnreferencedHousekeepingPathGeneratorTest.java @@ -18,7 +18,7 @@ import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.fail; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandlerTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandlerTest.java index 3f1cdd30..9a7f2a92 100644 --- a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandlerTest.java +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/handler/MessageEventHandlerTest.java @@ -16,7 +16,6 @@ package com.expediagroup.beekeeper.scheduler.apiary.handler; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -24,6 +23,7 @@ import java.util.List; +import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; diff --git a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/service/SchedulerApiaryTest.java b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/service/SchedulerApiaryTest.java index 31f48702..4f45aef2 100644 --- a/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/service/SchedulerApiaryTest.java +++ b/beekeeper-scheduler-apiary/src/test/java/com/expediagroup/beekeeper/scheduler/apiary/service/SchedulerApiaryTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-2020 Expedia, Inc. + * Copyright (C) 2019-2024 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/pom.xml b/pom.xml index 4683bea5..a2008896 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 @@ -50,7 +51,7 @@ 1.27 2.7.9 5.3.25 - 1.17.1 + 1.17.6 11-slim