From bb9fa5b881561dc32c675b1bdb2ec6a5537cc722 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Thu, 21 Nov 2024 08:19:07 -0500 Subject: [PATCH] Fixed issue #327 i.e. writetime filter does not work as expected when custom writetimestamp is also used. --- .../cdm/cql/statement/OriginSelectStatement.java | 10 ++++++---- src/main/java/com/datastax/cdm/data/PKFactory.java | 12 ++++++++++-- .../java/com/datastax/cdm/feature/WritetimeTTL.java | 11 +++-------- .../cdm/cql/statement/OriginSelectStatementTest.java | 8 ++++---- .../com/datastax/cdm/feature/WritetimeTTLTest.java | 7 +++---- 5 files changed, 26 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/datastax/cdm/cql/statement/OriginSelectStatement.java b/src/main/java/com/datastax/cdm/cql/statement/OriginSelectStatement.java index 637ca111..1a2b70fa 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/OriginSelectStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/OriginSelectStatement.java @@ -41,6 +41,7 @@ public abstract class OriginSelectStatement extends BaseCdmStatement { private final Boolean filterColumnEnabled; private final Integer filterColumnIndex; private final String filterColumnString; + private final WritetimeTTL writetimeTTLFeature; public OriginSelectStatement(IPropertyHelper propertyHelper, EnhancedSession session) { super(propertyHelper, session); @@ -50,7 +51,7 @@ public OriginSelectStatement(IPropertyHelper propertyHelper, EnhancedSession ses throw new RuntimeException("No columns found in table " + cqlTable.getTableName()); } - WritetimeTTL writetimeTTLFeature = (WritetimeTTL) cqlTable.getFeature(Featureset.WRITETIME_TTL); + this.writetimeTTLFeature = (WritetimeTTL) cqlTable.getFeature(Featureset.WRITETIME_TTL); if (null != writetimeTTLFeature && writetimeTTLFeature.isEnabled() && writetimeTTLFeature.hasWriteTimestampFilter()) { writeTimestampFilterEnabled = true; @@ -114,14 +115,15 @@ public boolean shouldFilterRecord(Record record) { } if (this.writeTimestampFilterEnabled) { - // only process rows greater than writeTimeStampFilter - Long originWriteTimeStamp = record.getPk().getWriteTimestamp(); + // only process rows within the writeTimeStampFilter + Long originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow()); if (null == originWriteTimeStamp) { return false; } if (originWriteTimeStamp < minWriteTimeStampFilter || originWriteTimeStamp > maxWriteTimeStampFilter) { if (logger.isInfoEnabled()) - logger.info("Timestamp filter removing: {}", record.getPk()); + logger.info("Timestamp filter removing record: {} with WriteTimeStamp: {}", record.getPk(), + originWriteTimeStamp); return true; } } diff --git a/src/main/java/com/datastax/cdm/data/PKFactory.java b/src/main/java/com/datastax/cdm/data/PKFactory.java index 413315cc..e2a6c268 100644 --- a/src/main/java/com/datastax/cdm/data/PKFactory.java +++ b/src/main/java/com/datastax/cdm/data/PKFactory.java @@ -106,8 +106,16 @@ public EnhancedPK getTargetPK(Row originRow) { Long originWriteTimeStamp = null; Integer originTTL = null; if (FeatureFactory.isEnabled(writetimeTTLFeature)) { - originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(originRow); - originTTL = writetimeTTLFeature.getLargestTTL(originRow); + if (writetimeTTLFeature.getCustomWritetime() > 0) { + originWriteTimeStamp = writetimeTTLFeature.getCustomWritetime(); + } else { + originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(originRow); + } + if (writetimeTTLFeature.getCustomTTL() > 0) { + originTTL = writetimeTTLFeature.getCustomTTL().intValue(); + } else { + originTTL = writetimeTTLFeature.getLargestTTL(originRow); + } } if (explodeMapTargetKeyColumnIndex < 0) { return new EnhancedPK(this, newValues, getPKClasses(Side.TARGET), originTTL, originWriteTimeStamp); diff --git a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java index c70cbd13..9bc70958 100644 --- a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java +++ b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java @@ -234,10 +234,7 @@ public boolean hasWritetimeColumns() { public Long getLargestWriteTimeStamp(Row row) { if (logDebug) - logger.debug("getLargestWriteTimeStamp: customWritetime={}, writetimeSelectColumnIndexes={}", - customWritetime, writetimeSelectColumnIndexes); - if (this.customWritetime > 0) - return this.customWritetime; + logger.debug("getLargestWriteTimeStamp: writetimeSelectColumnIndexes={}", writetimeSelectColumnIndexes); if (null == this.writetimeSelectColumnIndexes || this.writetimeSelectColumnIndexes.isEmpty()) return null; @@ -262,9 +259,7 @@ private OptionalLong getMaxWriteTimeStamp(Row row) { public Integer getLargestTTL(Row row) { if (logDebug) - logger.debug("getLargestTTL: customTTL={}, ttlSelectColumnIndexes={}", customTTL, ttlSelectColumnIndexes); - if (this.customTTL > 0) - return this.customTTL.intValue(); + logger.debug("getLargestTTL: ttlSelectColumnIndexes={}", ttlSelectColumnIndexes); if (null == this.ttlSelectColumnIndexes || this.ttlSelectColumnIndexes.isEmpty()) return null; @@ -317,7 +312,7 @@ private void validateTTLColumns(CqlTable originTable) { } private void validateWritetimeColumns(CqlTable originTable) { - if (writetimeNames == null || writetimeNames.isEmpty() || customWritetime > 0) { + if (writetimeNames == null || writetimeNames.isEmpty()) { return; } diff --git a/src/test/java/com/datastax/cdm/cql/statement/OriginSelectStatementTest.java b/src/test/java/com/datastax/cdm/cql/statement/OriginSelectStatementTest.java index eb3d88af..fcccb208 100644 --- a/src/test/java/com/datastax/cdm/cql/statement/OriginSelectStatementTest.java +++ b/src/test/java/com/datastax/cdm/cql/statement/OriginSelectStatementTest.java @@ -62,19 +62,19 @@ public void writetime_filter() { assertAll( () -> { - when(record.getPk().getWriteTimestamp()).thenReturn(1500L); + when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(1500L); assertFalse(originSelectStatement.shouldFilterRecord(record), "timestamp is range"); }, () -> { - when(record.getPk().getWriteTimestamp()).thenReturn(500L); + when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(500L); assertTrue(originSelectStatement.shouldFilterRecord(record), "timestamp below range"); }, () -> { - when(record.getPk().getWriteTimestamp()).thenReturn(2500L); + when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(2500L); assertTrue(originSelectStatement.shouldFilterRecord(record), "timestamp above range"); }, () -> { - when(record.getPk().getWriteTimestamp()).thenReturn(null); + when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(null); assertFalse(originSelectStatement.shouldFilterRecord(record), "null timestamp"); } ); diff --git a/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java b/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java index 6841dbde..ba96603a 100644 --- a/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java +++ b/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java @@ -393,7 +393,7 @@ public void getLargestWriteTimeStampWithCustomTimeTest() { feature.loadProperties(propertyHelper); feature.initializeAndValidate(originTable, targetTable); Long largestWritetime = feature.getLargestWriteTimeStamp(originRow); - assertEquals(customWritetime, largestWritetime); + assertEquals(3000L, largestWritetime); } @Test @@ -580,10 +580,9 @@ public void customWriteTime_withAutoWritetime() { assertAll( () -> assertFalse(feature.hasWriteTimestampFilter(), "hasWriteTimestampFilter"), - () -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns") + () -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns"), + () -> assertEquals(12345L, feature.getCustomWritetime(), "hasWritetimeColumns") ); - - verify(originTable, times(0)).extendColumns(any(),any()); } @Test