From 266ddc709195a5760f83e97eef588c12f858623f Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Fri, 22 Nov 2024 10:17:38 -0500 Subject: [PATCH] Make Writetime filter + custom writetimestamp work correctly when used together (#331) * Fixed issue #327 i.e. writetime filter does not work as expected when custom writetimestamp is also used. * Removed deprecated properties `printStatsAfter` and `printStatsPerPart`. Run metrics should now be tracked using the `trackRun` feature instead. * Apply suggestions from code review --------- Co-authored-by: Madhavan --- PERF/cdm-v3.properties | 3 --- PERF/cdm-v4.properties | 3 --- RELEASE.md | 4 ++++ SIT/regression/03_performance/migrate.properties | 3 --- .../cdm/cql/statement/OriginSelectStatement.java | 10 ++++++---- src/main/java/com/datastax/cdm/data/PKFactory.java | 12 ++++++++++-- .../java/com/datastax/cdm/feature/WritetimeTTL.java | 11 +++-------- .../com/datastax/cdm/job/AbstractJobSession.java | 10 ---------- .../com/datastax/cdm/properties/KnownProperties.java | 6 ------ src/resources/cdm-detailed.properties | 5 ----- .../cdm/cql/statement/OriginSelectStatementTest.java | 8 ++++---- .../com/datastax/cdm/feature/WritetimeTTLTest.java | 7 +++---- 12 files changed, 30 insertions(+), 52 deletions(-) diff --git a/PERF/cdm-v3.properties b/PERF/cdm-v3.properties index 4d28a545..bc0ea493 100644 --- a/PERF/cdm-v3.properties +++ b/PERF/cdm-v3.properties @@ -80,9 +80,6 @@ spark.batchSize 10 # ENABLE ONLY IF YOU WANT TO MIGRATE/VALIDATE SOME % OF ROWS (NOT 100%) #spark.coveragePercent 100 -# ENABLE ONLY IF WANT LOG STATS MORE OR LESS FREQUENTLY THAN DEFAULT -#spark.printStatsAfter 100000 - # ENABLE ONLY IF YOU WANT TO USE READ AND/OR WRITE CONSISTENCY OTHER THAN LOCAL_QUORUM #spark.consistency.read LOCAL_QUORUM #spark.consistency.write LOCAL_QUORUM diff --git a/PERF/cdm-v4.properties b/PERF/cdm-v4.properties index 6f88489f..94453c08 100644 --- a/PERF/cdm-v4.properties +++ b/PERF/cdm-v4.properties @@ -164,8 +164,6 @@ spark.cdm.autocorrect.mismatch false # .read : Default is LOCAL_QUORUM. Read consistency from Origin, and also from Target # when records are read for comparison purposes. # .write : Default is LOCAL_QUORUM. Write consistency to Target. -# .printStatsAfter : Default is 100000. Number of rows of processing after which a progress log -# entry will be made. # .fetchSizeInRows : Default is 1000. This affects the frequency of reads from Origin, and also the # frequency of flushes to Target. # .error.limit : Default is 0. Controls how many errors a thread may encounter during MigrateData @@ -179,7 +177,6 @@ spark.cdm.perfops.readRateLimit 5000 spark.cdm.perfops.writeRateLimit 5000 #spark.cdm.perfops.consistency.read LOCAL_QUORUM #spark.cdm.perfops.consistency.write LOCAL_QUORUM -#spark.cdm.perfops.printStatsAfter 100000 #spark.cdm.perfops.fetchSizeInRows 1000 #spark.cdm.perfops.error.limit 0 diff --git a/RELEASE.md b/RELEASE.md index d91de75d..09aea421 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,8 @@ # Release Notes +## [5.1.1] - 2024-11-22 +- Bug fix: Writetime filter does not work as expected when custom writetimestamp is also used (issue #327). +- Removed deprecated properties `printStatsAfter` and `printStatsPerPart`. Run metrics should now be tracked using the `trackRun` feature instead. + ## [5.1.0] - 2024-11-15 - Improves metrics output by producing stats labels in an intuitive and consistent order - Refactored JobCounter by removing any references to `thread` or `global` as CDM operations are now isolated within partition-ranges (`parts`). Each such `part` is then parallelly processed and aggregated by Spark. diff --git a/SIT/regression/03_performance/migrate.properties b/SIT/regression/03_performance/migrate.properties index dd9e992e..ad0d02f6 100644 --- a/SIT/regression/03_performance/migrate.properties +++ b/SIT/regression/03_performance/migrate.properties @@ -20,6 +20,3 @@ spark.cdm.schema.target.keyspaceTable target.regression_performance spark.cdm.perfops.numParts 32 spark.cdm.perfops.batchSize 1 - -spark.cdm.perfops.printStatsAfter 450 -spark.cdm.perfops.printStatsPerPart true \ No newline at end of file diff --git a/src/main/java/com/datastax/cdm/cql/statement/OriginSelectStatement.java b/src/main/java/com/datastax/cdm/cql/statement/OriginSelectStatement.java index 637ca111..6cb5aa4c 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/OriginSelectStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/OriginSelectStatement.java @@ -41,6 +41,7 @@ public abstract class OriginSelectStatement extends BaseCdmStatement { private final Boolean filterColumnEnabled; private final Integer filterColumnIndex; private final String filterColumnString; + private final WritetimeTTL writetimeTTLFeature; public OriginSelectStatement(IPropertyHelper propertyHelper, EnhancedSession session) { super(propertyHelper, session); @@ -50,7 +51,7 @@ public OriginSelectStatement(IPropertyHelper propertyHelper, EnhancedSession ses throw new RuntimeException("No columns found in table " + cqlTable.getTableName()); } - WritetimeTTL writetimeTTLFeature = (WritetimeTTL) cqlTable.getFeature(Featureset.WRITETIME_TTL); + this.writetimeTTLFeature = (WritetimeTTL) cqlTable.getFeature(Featureset.WRITETIME_TTL); if (null != writetimeTTLFeature && writetimeTTLFeature.isEnabled() && writetimeTTLFeature.hasWriteTimestampFilter()) { writeTimestampFilterEnabled = true; @@ -114,14 +115,15 @@ public boolean shouldFilterRecord(Record record) { } if (this.writeTimestampFilterEnabled) { - // only process rows greater than writeTimeStampFilter - Long originWriteTimeStamp = record.getPk().getWriteTimestamp(); + // only process rows within the writeTimeStampFilter + Long originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow()); if (null == originWriteTimeStamp) { return false; } if (originWriteTimeStamp < minWriteTimeStampFilter || originWriteTimeStamp > maxWriteTimeStampFilter) { if (logger.isInfoEnabled()) - logger.info("Timestamp filter removing: {}", record.getPk()); + logger.info("Timestamp filter removing record with primary key: {} with write timestamp: {}", record.getPk(), + originWriteTimeStamp); return true; } } diff --git a/src/main/java/com/datastax/cdm/data/PKFactory.java b/src/main/java/com/datastax/cdm/data/PKFactory.java index 413315cc..e2a6c268 100644 --- a/src/main/java/com/datastax/cdm/data/PKFactory.java +++ b/src/main/java/com/datastax/cdm/data/PKFactory.java @@ -106,8 +106,16 @@ public EnhancedPK getTargetPK(Row originRow) { Long originWriteTimeStamp = null; Integer originTTL = null; if (FeatureFactory.isEnabled(writetimeTTLFeature)) { - originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(originRow); - originTTL = writetimeTTLFeature.getLargestTTL(originRow); + if (writetimeTTLFeature.getCustomWritetime() > 0) { + originWriteTimeStamp = writetimeTTLFeature.getCustomWritetime(); + } else { + originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(originRow); + } + if (writetimeTTLFeature.getCustomTTL() > 0) { + originTTL = writetimeTTLFeature.getCustomTTL().intValue(); + } else { + originTTL = writetimeTTLFeature.getLargestTTL(originRow); + } } if (explodeMapTargetKeyColumnIndex < 0) { return new EnhancedPK(this, newValues, getPKClasses(Side.TARGET), originTTL, originWriteTimeStamp); diff --git a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java index c70cbd13..9bc70958 100644 --- a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java +++ b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java @@ -234,10 +234,7 @@ public boolean hasWritetimeColumns() { public Long getLargestWriteTimeStamp(Row row) { if (logDebug) - logger.debug("getLargestWriteTimeStamp: customWritetime={}, writetimeSelectColumnIndexes={}", - customWritetime, writetimeSelectColumnIndexes); - if (this.customWritetime > 0) - return this.customWritetime; + logger.debug("getLargestWriteTimeStamp: writetimeSelectColumnIndexes={}", writetimeSelectColumnIndexes); if (null == this.writetimeSelectColumnIndexes || this.writetimeSelectColumnIndexes.isEmpty()) return null; @@ -262,9 +259,7 @@ private OptionalLong getMaxWriteTimeStamp(Row row) { public Integer getLargestTTL(Row row) { if (logDebug) - logger.debug("getLargestTTL: customTTL={}, ttlSelectColumnIndexes={}", customTTL, ttlSelectColumnIndexes); - if (this.customTTL > 0) - return this.customTTL.intValue(); + logger.debug("getLargestTTL: ttlSelectColumnIndexes={}", ttlSelectColumnIndexes); if (null == this.ttlSelectColumnIndexes || this.ttlSelectColumnIndexes.isEmpty()) return null; @@ -317,7 +312,7 @@ private void validateTTLColumns(CqlTable originTable) { } private void validateWritetimeColumns(CqlTable originTable) { - if (writetimeNames == null || writetimeNames.isEmpty() || customWritetime > 0) { + if (writetimeNames == null || writetimeNames.isEmpty()) { return; } diff --git a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java index ba641672..b74eced4 100644 --- a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java +++ b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java @@ -40,7 +40,6 @@ public abstract class AbstractJobSession extends BaseJobSession { protected EnhancedSession originSession; protected EnhancedSession targetSession; protected Guardrail guardrailFeature; - protected Long printStatsAfter; protected TrackRun trackRunFeature; protected long runId; @@ -56,15 +55,6 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, return; } - this.printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER); - if (!propertyHelper.meetsMinimum(KnownProperties.PRINT_STATS_AFTER, printStatsAfter, 1L)) { - logger.warn(KnownProperties.PRINT_STATS_AFTER + " must be greater than 0. Setting to default value of " - + KnownProperties.getDefaultAsString(KnownProperties.PRINT_STATS_AFTER)); - propertyHelper.setProperty(KnownProperties.PRINT_STATS_AFTER, - KnownProperties.getDefault(KnownProperties.PRINT_STATS_AFTER)); - printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER); - } - rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN)); rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET)); diff --git a/src/main/java/com/datastax/cdm/properties/KnownProperties.java b/src/main/java/com/datastax/cdm/properties/KnownProperties.java index 91b332b0..3f54c929 100644 --- a/src/main/java/com/datastax/cdm/properties/KnownProperties.java +++ b/src/main/java/com/datastax/cdm/properties/KnownProperties.java @@ -123,8 +123,6 @@ public enum PropertyType { public static final String READ_CL = "spark.cdm.perfops.consistency.read"; public static final String WRITE_CL = "spark.cdm.perfops.consistency.write"; public static final String PERF_FETCH_SIZE = "spark.cdm.perfops.fetchSizeInRows"; - public static final String PRINT_STATS_AFTER = "spark.cdm.perfops.printStatsAfter"; - public static final String PRINT_STATS_PER_PART = "spark.cdm.perfops.printStatsPerPart"; static { types.put(AUTOCORRECT_MISSING, PropertyType.BOOLEAN); @@ -153,10 +151,6 @@ public enum PropertyType { defaults.put(READ_CL, "LOCAL_QUORUM"); types.put(WRITE_CL, PropertyType.STRING); defaults.put(WRITE_CL, "LOCAL_QUORUM"); - types.put(PRINT_STATS_AFTER, PropertyType.NUMBER); - defaults.put(PRINT_STATS_AFTER, "100000"); - types.put(PRINT_STATS_PER_PART, PropertyType.BOOLEAN); - defaults.put(PRINT_STATS_PER_PART, "false"); types.put(PERF_FETCH_SIZE, PropertyType.NUMBER); defaults.put(PERF_FETCH_SIZE, "1000"); } diff --git a/src/resources/cdm-detailed.properties b/src/resources/cdm-detailed.properties index 85867701..677bef16 100644 --- a/src/resources/cdm-detailed.properties +++ b/src/resources/cdm-detailed.properties @@ -227,9 +227,6 @@ spark.cdm.trackRun.runId { - when(record.getPk().getWriteTimestamp()).thenReturn(1500L); + when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(1500L); assertFalse(originSelectStatement.shouldFilterRecord(record), "timestamp is range"); }, () -> { - when(record.getPk().getWriteTimestamp()).thenReturn(500L); + when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(500L); assertTrue(originSelectStatement.shouldFilterRecord(record), "timestamp below range"); }, () -> { - when(record.getPk().getWriteTimestamp()).thenReturn(2500L); + when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(2500L); assertTrue(originSelectStatement.shouldFilterRecord(record), "timestamp above range"); }, () -> { - when(record.getPk().getWriteTimestamp()).thenReturn(null); + when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(null); assertFalse(originSelectStatement.shouldFilterRecord(record), "null timestamp"); } ); diff --git a/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java b/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java index 6841dbde..ba96603a 100644 --- a/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java +++ b/src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java @@ -393,7 +393,7 @@ public void getLargestWriteTimeStampWithCustomTimeTest() { feature.loadProperties(propertyHelper); feature.initializeAndValidate(originTable, targetTable); Long largestWritetime = feature.getLargestWriteTimeStamp(originRow); - assertEquals(customWritetime, largestWritetime); + assertEquals(3000L, largestWritetime); } @Test @@ -580,10 +580,9 @@ public void customWriteTime_withAutoWritetime() { assertAll( () -> assertFalse(feature.hasWriteTimestampFilter(), "hasWriteTimestampFilter"), - () -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns") + () -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns"), + () -> assertEquals(12345L, feature.getCustomWritetime(), "hasWritetimeColumns") ); - - verify(originTable, times(0)).extendColumns(any(),any()); } @Test