Skip to content

Commit

Permalink
Fixed issue #327 i.e. writetime filter does not work as expected when…
Browse files Browse the repository at this point in the history
… custom writetimestamp is also used.
  • Loading branch information
pravinbhat committed Nov 21, 2024
1 parent 524dd9a commit bb9fa5b
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract class OriginSelectStatement extends BaseCdmStatement {
private final Boolean filterColumnEnabled;
private final Integer filterColumnIndex;
private final String filterColumnString;
private final WritetimeTTL writetimeTTLFeature;

public OriginSelectStatement(IPropertyHelper propertyHelper, EnhancedSession session) {
super(propertyHelper, session);
Expand All @@ -50,7 +51,7 @@ public OriginSelectStatement(IPropertyHelper propertyHelper, EnhancedSession ses
throw new RuntimeException("No columns found in table " + cqlTable.getTableName());
}

WritetimeTTL writetimeTTLFeature = (WritetimeTTL) cqlTable.getFeature(Featureset.WRITETIME_TTL);
this.writetimeTTLFeature = (WritetimeTTL) cqlTable.getFeature(Featureset.WRITETIME_TTL);
if (null != writetimeTTLFeature && writetimeTTLFeature.isEnabled()
&& writetimeTTLFeature.hasWriteTimestampFilter()) {
writeTimestampFilterEnabled = true;
Expand Down Expand Up @@ -114,14 +115,15 @@ public boolean shouldFilterRecord(Record record) {
}

if (this.writeTimestampFilterEnabled) {
// only process rows greater than writeTimeStampFilter
Long originWriteTimeStamp = record.getPk().getWriteTimestamp();
// only process rows within the writeTimeStampFilter
Long originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow());
if (null == originWriteTimeStamp) {
return false;
}
if (originWriteTimeStamp < minWriteTimeStampFilter || originWriteTimeStamp > maxWriteTimeStampFilter) {
if (logger.isInfoEnabled())
logger.info("Timestamp filter removing: {}", record.getPk());
logger.info("Timestamp filter removing record: {} with WriteTimeStamp: {}", record.getPk(),
originWriteTimeStamp);
return true;
}
}
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/datastax/cdm/data/PKFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,16 @@ public EnhancedPK getTargetPK(Row originRow) {
Long originWriteTimeStamp = null;
Integer originTTL = null;
if (FeatureFactory.isEnabled(writetimeTTLFeature)) {
originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(originRow);
originTTL = writetimeTTLFeature.getLargestTTL(originRow);
if (writetimeTTLFeature.getCustomWritetime() > 0) {
originWriteTimeStamp = writetimeTTLFeature.getCustomWritetime();
} else {
originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(originRow);
}
if (writetimeTTLFeature.getCustomTTL() > 0) {
originTTL = writetimeTTLFeature.getCustomTTL().intValue();
} else {
originTTL = writetimeTTLFeature.getLargestTTL(originRow);
}
}
if (explodeMapTargetKeyColumnIndex < 0) {
return new EnhancedPK(this, newValues, getPKClasses(Side.TARGET), originTTL, originWriteTimeStamp);
Expand Down
11 changes: 3 additions & 8 deletions src/main/java/com/datastax/cdm/feature/WritetimeTTL.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,7 @@ public boolean hasWritetimeColumns() {

public Long getLargestWriteTimeStamp(Row row) {
if (logDebug)
logger.debug("getLargestWriteTimeStamp: customWritetime={}, writetimeSelectColumnIndexes={}",
customWritetime, writetimeSelectColumnIndexes);
if (this.customWritetime > 0)
return this.customWritetime;
logger.debug("getLargestWriteTimeStamp: writetimeSelectColumnIndexes={}", writetimeSelectColumnIndexes);
if (null == this.writetimeSelectColumnIndexes || this.writetimeSelectColumnIndexes.isEmpty())
return null;

Expand All @@ -262,9 +259,7 @@ private OptionalLong getMaxWriteTimeStamp(Row row) {

public Integer getLargestTTL(Row row) {
if (logDebug)
logger.debug("getLargestTTL: customTTL={}, ttlSelectColumnIndexes={}", customTTL, ttlSelectColumnIndexes);
if (this.customTTL > 0)
return this.customTTL.intValue();
logger.debug("getLargestTTL: ttlSelectColumnIndexes={}", ttlSelectColumnIndexes);
if (null == this.ttlSelectColumnIndexes || this.ttlSelectColumnIndexes.isEmpty())
return null;

Expand Down Expand Up @@ -317,7 +312,7 @@ private void validateTTLColumns(CqlTable originTable) {
}

private void validateWritetimeColumns(CqlTable originTable) {
if (writetimeNames == null || writetimeNames.isEmpty() || customWritetime > 0) {
if (writetimeNames == null || writetimeNames.isEmpty()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ public void writetime_filter() {

assertAll(
() -> {
when(record.getPk().getWriteTimestamp()).thenReturn(1500L);
when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(1500L);
assertFalse(originSelectStatement.shouldFilterRecord(record), "timestamp is range");
},
() -> {
when(record.getPk().getWriteTimestamp()).thenReturn(500L);
when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(500L);
assertTrue(originSelectStatement.shouldFilterRecord(record), "timestamp below range");
},
() -> {
when(record.getPk().getWriteTimestamp()).thenReturn(2500L);
when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(2500L);
assertTrue(originSelectStatement.shouldFilterRecord(record), "timestamp above range");
},
() -> {
when(record.getPk().getWriteTimestamp()).thenReturn(null);
when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(null);
assertFalse(originSelectStatement.shouldFilterRecord(record), "null timestamp");
}
);
Expand Down
7 changes: 3 additions & 4 deletions src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ public void getLargestWriteTimeStampWithCustomTimeTest() {
feature.loadProperties(propertyHelper);
feature.initializeAndValidate(originTable, targetTable);
Long largestWritetime = feature.getLargestWriteTimeStamp(originRow);
assertEquals(customWritetime, largestWritetime);
assertEquals(3000L, largestWritetime);
}

@Test
Expand Down Expand Up @@ -580,10 +580,9 @@ public void customWriteTime_withAutoWritetime() {

assertAll(
() -> assertFalse(feature.hasWriteTimestampFilter(), "hasWriteTimestampFilter"),
() -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns")
() -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns"),
() -> assertEquals(12345L, feature.getCustomWritetime(), "hasWritetimeColumns")
);

verify(originTable, times(0)).extendColumns(any(),any());
}

@Test
Expand Down

0 comments on commit bb9fa5b

Please sign in to comment.