Skip to content

Commit

Permalink
Merge pull request #226 from arvy/fix-auto-ttlWritime-issue220
Browse files Browse the repository at this point in the history
issue-220: Fixes ttl-writetime auto flag handling
  • Loading branch information
mfmaher2 authored Dec 12, 2023
2 parents 1e803b2 + 7ba9aac commit 9361b79
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 1 deletion.
10 changes: 10 additions & 0 deletions src/main/java/com/datastax/cdm/feature/WritetimeTTL.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ public boolean loadProperties(IPropertyHelper propertyHelper) {
logger.info("PARAM -- {}: {} datetime is {} ", KnownProperties.FILTER_WRITETS_MAX, filterMax, Instant.ofEpochMilli(filterMax / 1000));
}


isValid = validateProperties();
isEnabled = isValid &&
((null != ttlNames && !ttlNames.isEmpty())
|| (null != writetimeNames && !writetimeNames.isEmpty())
|| autoTTLNames || autoWritetimeNames
|| customWritetime > 0);

isLoaded = true;
Expand Down Expand Up @@ -309,4 +311,12 @@ private void validateFilterRangeProperties() {
isValid = false;
}
}

public List<String> getTtlNames() {
return ttlNames;
}

public List<String> getWritetimeNames() {
return writetimeNames;
}
}
8 changes: 8 additions & 0 deletions src/resources/cdm-detailed.properties
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,17 @@ spark.cdm.connect.target.password cassandra
# columns need to be listed.
#-----------------------------------------------------------------------------------------------------------
spark.cdm.schema.origin.keyspaceTable keyspace_name.table_name

# Max TTL value of all non-PK columns will be used for insert on target
#spark.cdm.schema.origin.column.ttl.automatic true

# Max TTL value of specified non-PK columns will be used for insert on target (overrides automatic setting)
#spark.cdm.schema.origin.column.ttl.names data_col1,data_col2,...

# Max WRITETIME value of all non-PK columns will be used for insert on target
#spark.cdm.schema.origin.column.writetime.automatic true

# Max WRITETIME value of specified non-PK columns will be used for insert on target (overrides automatic setting)
#spark.cdm.schema.origin.column.writetime.names data_col1,data_col2,...
#spark.cdm.schema.origin.column.names.to.target partition_col1:partition_col_1,partition_col2:partition_col_2,...

Expand Down
95 changes: 94 additions & 1 deletion src/test/java/com/datastax/cdm/feature/TTLAndWritetimeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public void smokeTest_disabledFeature() {
when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(Boolean.FALSE);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(Boolean.FALSE);

assertAll(
() -> assertTrue(feature.loadProperties(propertyHelper), "loadProperties"),
Expand All @@ -126,6 +128,49 @@ public void smokeTest_disabledFeature() {
);
}


@Test
public void smokeTest_enabledFeature_withOnlyWritetimeAuto() {
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(Boolean.TRUE);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(Boolean.FALSE);

assertAll(
() -> assertTrue(feature.loadProperties(propertyHelper), "loadProperties"),
() -> assertTrue(feature.initializeAndValidate(originTable, targetTable), "initializeAndValidate"),
() -> assertTrue(feature.isEnabled(), "feature should be enabled"),
() -> assertEquals(0L, feature.getCustomWritetime(), "customWritetime is not set"),
() -> assertFalse(feature.hasWriteTimestampFilter(), "hasWriteTimestampFilter"),
() -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns with Auto set"),
() -> assertFalse(feature.hasTTLColumns(), "hasTTLColumns")
);
}

@Test
public void smokeTest_enabledFeature_withOnlyTTLAuto() {
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(Boolean.FALSE);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(Boolean.TRUE);

assertAll(
() -> assertTrue(feature.loadProperties(propertyHelper), "loadProperties"),
() -> assertTrue(feature.initializeAndValidate(originTable, targetTable), "initializeAndValidate"),
() -> assertTrue(feature.isEnabled(), "feature should be enabled"),
() -> assertEquals(0L, feature.getCustomWritetime(), "customWritetime is not set"),
() -> assertFalse(feature.hasWriteTimestampFilter(), "hasWriteTimestampFilter"),
() -> assertFalse(feature.hasWritetimeColumns(), "hasWritetimeColumns"),
() -> assertTrue(feature.hasTTLColumns(), "hasTTLColumns with ttlAuto set")
);
}

@Test
public void smoke_writetimeWithoutTTL() {
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L);
Expand Down Expand Up @@ -250,7 +295,7 @@ public void counter_unconfigured() {

assertAll(
() -> assertFalse(feature.isEnabled(), "isEnabled"),
() -> assertTrue(feature.isValid, "isValid")
() -> assertFalse(feature.isValid, "isValid")
);
}

Expand Down Expand Up @@ -497,4 +542,52 @@ public void customWriteTime_withAutoWritetime() {

verify(originTable, times(0)).extendColumns(any(),any());
}

@Test
public void specifiedColumnsOverrideAuto(){
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(Arrays.<String>asList("writetime_ttl_col"));
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(true);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(Arrays.<String>asList("writetime_ttl_col"));
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(true);

feature.loadProperties(propertyHelper);
feature.initializeAndValidate(originTable, targetTable);
assertAll(
() -> assertTrue(feature.hasTTLColumns(), "has TTL columns"),
() -> assertTrue(feature.hasWritetimeColumns(), "has Writetime columns"),
() -> assertEquals(1, feature.getWritetimeNames().size(), "has exactly 1 Writetime column"),
() -> assertEquals(1, feature.getTtlNames().size(), "has exactly 1 TTL column"),
() -> assertTrue(feature.isEnabled, "feature is enabled")
);

//gets called once for adding TTL(..) columns and once for adding WRITETIME(..) columns
verify(originTable, times(2)).extendColumns(any(),any());
}

@Test
public void autoFlagResultsInAllValueColumnsBeingUsed(){
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(true);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(true);

feature.loadProperties(propertyHelper);
feature.initializeAndValidate(originTable, targetTable);
assertAll(
() -> assertTrue(feature.hasTTLColumns(), "has TTL columns"),
() -> assertTrue(feature.hasWritetimeColumns(), "has Writetime columns"),
() -> assertEquals(3, feature.getWritetimeNames().size(), "has exactly 1 Writetime column"),
() -> assertEquals(3, feature.getTtlNames().size(), "has exactly 1 TTL column"),
() -> assertTrue(feature.isEnabled, "feature is enabled")
);

//gets called once for adding TTL(..) columns and once for adding WRITETIME(..) columns
verify(originTable, times(2)).extendColumns(any(),any());
}
}

0 comments on commit 9361b79

Please sign in to comment.