diff --git a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java index 75ec9526..945c1918 100644 --- a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java +++ b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java @@ -75,10 +75,12 @@ public boolean loadProperties(IPropertyHelper propertyHelper) { logger.info("PARAM -- {}: {} datetime is {} ", KnownProperties.FILTER_WRITETS_MAX, filterMax, Instant.ofEpochMilli(filterMax / 1000)); } + isValid = validateProperties(); isEnabled = isValid && ((null != ttlNames && !ttlNames.isEmpty()) || (null != writetimeNames && !writetimeNames.isEmpty()) + || autoTTLNames || autoWritetimeNames || customWritetime > 0); isLoaded = true; @@ -309,4 +311,12 @@ private void validateFilterRangeProperties() { isValid = false; } } + + public List getTtlNames() { + return ttlNames; + } + + public List getWritetimeNames() { + return writetimeNames; + } } diff --git a/src/resources/cdm-detailed.properties b/src/resources/cdm-detailed.properties index b9b0cda1..3dabefb0 100644 --- a/src/resources/cdm-detailed.properties +++ b/src/resources/cdm-detailed.properties @@ -101,9 +101,17 @@ spark.cdm.connect.target.password cassandra # columns need to be listed. #----------------------------------------------------------------------------------------------------------- spark.cdm.schema.origin.keyspaceTable keyspace_name.table_name + +# Max TTL value of all non-PK columns will be used for insert on target #spark.cdm.schema.origin.column.ttl.automatic true + +# Max TTL value of specified non-PK columns will be used for insert on target (overrides automatic setting) #spark.cdm.schema.origin.column.ttl.names data_col1,data_col2,... + +# Max WRITETIME value of all non-PK columns will be used for insert on target #spark.cdm.schema.origin.column.writetime.automatic true + +# Max WRITETIME value of specified non-PK columns will be used for insert on target (overrides automatic setting) #spark.cdm.schema.origin.column.writetime.names data_col1,data_col2,... #spark.cdm.schema.origin.column.names.to.target partition_col1:partition_col_1,partition_col2:partition_col_2,... diff --git a/src/test/java/com/datastax/cdm/feature/TTLAndWritetimeTest.java b/src/test/java/com/datastax/cdm/feature/TTLAndWritetimeTest.java index bfce7bf0..c9e34c52 100644 --- a/src/test/java/com/datastax/cdm/feature/TTLAndWritetimeTest.java +++ b/src/test/java/com/datastax/cdm/feature/TTLAndWritetimeTest.java @@ -114,6 +114,8 @@ public void smokeTest_disabledFeature() { when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null); when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(null); when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null); + when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(Boolean.FALSE); + when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(Boolean.FALSE); assertAll( () -> assertTrue(feature.loadProperties(propertyHelper), "loadProperties"), @@ -126,6 +128,49 @@ public void smokeTest_disabledFeature() { ); } + + @Test + public void smokeTest_enabledFeature_withOnlyWritetimeAuto() { + when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null); + when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null); + when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null); + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(null); + when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null); + when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(Boolean.TRUE); + when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(Boolean.FALSE); + + assertAll( + () -> assertTrue(feature.loadProperties(propertyHelper), "loadProperties"), + () -> assertTrue(feature.initializeAndValidate(originTable, targetTable), "initializeAndValidate"), + () -> assertTrue(feature.isEnabled(), "feature should be enabled"), + () -> assertEquals(0L, feature.getCustomWritetime(), "customWritetime is not set"), + () -> assertFalse(feature.hasWriteTimestampFilter(), "hasWriteTimestampFilter"), + () -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns with Auto set"), + () -> assertFalse(feature.hasTTLColumns(), "hasTTLColumns") + ); + } + + @Test + public void smokeTest_enabledFeature_withOnlyTTLAuto() { + when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null); + when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null); + when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null); + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(null); + when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null); + when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(Boolean.FALSE); + when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(Boolean.TRUE); + + assertAll( + () -> assertTrue(feature.loadProperties(propertyHelper), "loadProperties"), + () -> assertTrue(feature.initializeAndValidate(originTable, targetTable), "initializeAndValidate"), + () -> assertTrue(feature.isEnabled(), "feature should be enabled"), + () -> assertEquals(0L, feature.getCustomWritetime(), "customWritetime is not set"), + () -> assertFalse(feature.hasWriteTimestampFilter(), "hasWriteTimestampFilter"), + () -> assertFalse(feature.hasWritetimeColumns(), "hasWritetimeColumns"), + () -> assertTrue(feature.hasTTLColumns(), "hasTTLColumns with ttlAuto set") + ); + } + @Test public void smoke_writetimeWithoutTTL() { when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L); @@ -250,7 +295,7 @@ public void counter_unconfigured() { assertAll( () -> assertFalse(feature.isEnabled(), "isEnabled"), - () -> assertTrue(feature.isValid, "isValid") + () -> assertFalse(feature.isValid, "isValid") ); } @@ -497,4 +542,52 @@ public void customWriteTime_withAutoWritetime() { verify(originTable, times(0)).extendColumns(any(),any()); } + + @Test + public void specifiedColumnsOverrideAuto(){ + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L); + when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(Arrays.asList("writetime_ttl_col")); + when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(true); + when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null); + when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null); + when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(Arrays.asList("writetime_ttl_col")); + when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(true); + + feature.loadProperties(propertyHelper); + feature.initializeAndValidate(originTable, targetTable); + assertAll( + () -> assertTrue(feature.hasTTLColumns(), "has TTL columns"), + () -> assertTrue(feature.hasWritetimeColumns(), "has Writetime columns"), + () -> assertEquals(1, feature.getWritetimeNames().size(), "has exactly 1 Writetime column"), + () -> assertEquals(1, feature.getTtlNames().size(), "has exactly 1 TTL column"), + () -> assertTrue(feature.isEnabled, "feature is enabled") + ); + + //gets called once for adding TTL(..) columns and once for adding WRITETIME(..) columns + verify(originTable, times(2)).extendColumns(any(),any()); + } + + @Test + public void autoFlagResultsInAllValueColumnsBeingUsed(){ + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L); + when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null); + when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(true); + when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null); + when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null); + when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null); + when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(true); + + feature.loadProperties(propertyHelper); + feature.initializeAndValidate(originTable, targetTable); + assertAll( + () -> assertTrue(feature.hasTTLColumns(), "has TTL columns"), + () -> assertTrue(feature.hasWritetimeColumns(), "has Writetime columns"), + () -> assertEquals(3, feature.getWritetimeNames().size(), "has exactly 1 Writetime column"), + () -> assertEquals(3, feature.getTtlNames().size(), "has exactly 1 TTL column"), + () -> assertTrue(feature.isEnabled, "feature is enabled") + ); + + //gets called once for adding TTL(..) columns and once for adding WRITETIME(..) columns + verify(originTable, times(2)).extendColumns(any(),any()); + } }