Skip to content

Commit

Permalink
Make Writetime filter + custom writetimestamp work correctly when use…
Browse files Browse the repository at this point in the history
…d together (#331)

* Fixed issue #327 i.e. writetime filter does not work as expected when custom writetimestamp is also used.
* Removed deprecated properties `printStatsAfter` and `printStatsPerPart`. Run metrics should now be tracked using the `trackRun` feature instead.
* Apply suggestions from code review

---------

Co-authored-by: Madhavan <[email protected]>
  • Loading branch information
pravinbhat and msmygit authored Nov 22, 2024
1 parent 029bddd commit 266ddc7
Show file tree
Hide file tree
Showing 12 changed files with 30 additions and 52 deletions.
3 changes: 0 additions & 3 deletions PERF/cdm-v3.properties
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ spark.batchSize 10
# ENABLE ONLY IF YOU WANT TO MIGRATE/VALIDATE SOME % OF ROWS (NOT 100%)
#spark.coveragePercent 100

# ENABLE ONLY IF WANT LOG STATS MORE OR LESS FREQUENTLY THAN DEFAULT
#spark.printStatsAfter 100000

# ENABLE ONLY IF YOU WANT TO USE READ AND/OR WRITE CONSISTENCY OTHER THAN LOCAL_QUORUM
#spark.consistency.read LOCAL_QUORUM
#spark.consistency.write LOCAL_QUORUM
Expand Down
3 changes: 0 additions & 3 deletions PERF/cdm-v4.properties
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ spark.cdm.autocorrect.mismatch false
# .read : Default is LOCAL_QUORUM. Read consistency from Origin, and also from Target
# when records are read for comparison purposes.
# .write : Default is LOCAL_QUORUM. Write consistency to Target.
# .printStatsAfter : Default is 100000. Number of rows of processing after which a progress log
# entry will be made.
# .fetchSizeInRows : Default is 1000. This affects the frequency of reads from Origin, and also the
# frequency of flushes to Target.
# .error.limit : Default is 0. Controls how many errors a thread may encounter during MigrateData
Expand All @@ -179,7 +177,6 @@ spark.cdm.perfops.readRateLimit 5000
spark.cdm.perfops.writeRateLimit 5000
#spark.cdm.perfops.consistency.read LOCAL_QUORUM
#spark.cdm.perfops.consistency.write LOCAL_QUORUM
#spark.cdm.perfops.printStatsAfter 100000
#spark.cdm.perfops.fetchSizeInRows 1000
#spark.cdm.perfops.error.limit 0

Expand Down
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Release Notes
## [5.1.1] - 2024-11-22
- Bug fix: Writetime filter does not work as expected when custom writetimestamp is also used (issue #327).
- Removed deprecated properties `printStatsAfter` and `printStatsPerPart`. Run metrics should now be tracked using the `trackRun` feature instead.

## [5.1.0] - 2024-11-15
- Improves metrics output by producing stats labels in an intuitive and consistent order
- Refactored JobCounter by removing any references to `thread` or `global` as CDM operations are now isolated within partition-ranges (`parts`). Each such `part` is then parallelly processed and aggregated by Spark.
Expand Down
3 changes: 0 additions & 3 deletions SIT/regression/03_performance/migrate.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,3 @@ spark.cdm.schema.target.keyspaceTable target.regression_performance

spark.cdm.perfops.numParts 32
spark.cdm.perfops.batchSize 1

spark.cdm.perfops.printStatsAfter 450
spark.cdm.perfops.printStatsPerPart true
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract class OriginSelectStatement extends BaseCdmStatement {
private final Boolean filterColumnEnabled;
private final Integer filterColumnIndex;
private final String filterColumnString;
private final WritetimeTTL writetimeTTLFeature;

public OriginSelectStatement(IPropertyHelper propertyHelper, EnhancedSession session) {
super(propertyHelper, session);
Expand All @@ -50,7 +51,7 @@ public OriginSelectStatement(IPropertyHelper propertyHelper, EnhancedSession ses
throw new RuntimeException("No columns found in table " + cqlTable.getTableName());
}

WritetimeTTL writetimeTTLFeature = (WritetimeTTL) cqlTable.getFeature(Featureset.WRITETIME_TTL);
this.writetimeTTLFeature = (WritetimeTTL) cqlTable.getFeature(Featureset.WRITETIME_TTL);
if (null != writetimeTTLFeature && writetimeTTLFeature.isEnabled()
&& writetimeTTLFeature.hasWriteTimestampFilter()) {
writeTimestampFilterEnabled = true;
Expand Down Expand Up @@ -114,14 +115,15 @@ public boolean shouldFilterRecord(Record record) {
}

if (this.writeTimestampFilterEnabled) {
// only process rows greater than writeTimeStampFilter
Long originWriteTimeStamp = record.getPk().getWriteTimestamp();
// only process rows within the writeTimeStampFilter
Long originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow());
if (null == originWriteTimeStamp) {
return false;
}
if (originWriteTimeStamp < minWriteTimeStampFilter || originWriteTimeStamp > maxWriteTimeStampFilter) {
if (logger.isInfoEnabled())
logger.info("Timestamp filter removing: {}", record.getPk());
logger.info("Timestamp filter removing record with primary key: {} with write timestamp: {}", record.getPk(),
originWriteTimeStamp);
return true;
}
}
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/datastax/cdm/data/PKFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,16 @@ public EnhancedPK getTargetPK(Row originRow) {
Long originWriteTimeStamp = null;
Integer originTTL = null;
if (FeatureFactory.isEnabled(writetimeTTLFeature)) {
originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(originRow);
originTTL = writetimeTTLFeature.getLargestTTL(originRow);
if (writetimeTTLFeature.getCustomWritetime() > 0) {
originWriteTimeStamp = writetimeTTLFeature.getCustomWritetime();
} else {
originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(originRow);
}
if (writetimeTTLFeature.getCustomTTL() > 0) {
originTTL = writetimeTTLFeature.getCustomTTL().intValue();
} else {
originTTL = writetimeTTLFeature.getLargestTTL(originRow);
}
}
if (explodeMapTargetKeyColumnIndex < 0) {
return new EnhancedPK(this, newValues, getPKClasses(Side.TARGET), originTTL, originWriteTimeStamp);
Expand Down
11 changes: 3 additions & 8 deletions src/main/java/com/datastax/cdm/feature/WritetimeTTL.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,7 @@ public boolean hasWritetimeColumns() {

public Long getLargestWriteTimeStamp(Row row) {
if (logDebug)
logger.debug("getLargestWriteTimeStamp: customWritetime={}, writetimeSelectColumnIndexes={}",
customWritetime, writetimeSelectColumnIndexes);
if (this.customWritetime > 0)
return this.customWritetime;
logger.debug("getLargestWriteTimeStamp: writetimeSelectColumnIndexes={}", writetimeSelectColumnIndexes);
if (null == this.writetimeSelectColumnIndexes || this.writetimeSelectColumnIndexes.isEmpty())
return null;

Expand All @@ -262,9 +259,7 @@ private OptionalLong getMaxWriteTimeStamp(Row row) {

public Integer getLargestTTL(Row row) {
if (logDebug)
logger.debug("getLargestTTL: customTTL={}, ttlSelectColumnIndexes={}", customTTL, ttlSelectColumnIndexes);
if (this.customTTL > 0)
return this.customTTL.intValue();
logger.debug("getLargestTTL: ttlSelectColumnIndexes={}", ttlSelectColumnIndexes);
if (null == this.ttlSelectColumnIndexes || this.ttlSelectColumnIndexes.isEmpty())
return null;

Expand Down Expand Up @@ -317,7 +312,7 @@ private void validateTTLColumns(CqlTable originTable) {
}

private void validateWritetimeColumns(CqlTable originTable) {
if (writetimeNames == null || writetimeNames.isEmpty() || customWritetime > 0) {
if (writetimeNames == null || writetimeNames.isEmpty()) {
return;
}

Expand Down
10 changes: 0 additions & 10 deletions src/main/java/com/datastax/cdm/job/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public abstract class AbstractJobSession<T> extends BaseJobSession {
protected EnhancedSession originSession;
protected EnhancedSession targetSession;
protected Guardrail guardrailFeature;
protected Long printStatsAfter;
protected TrackRun trackRunFeature;
protected long runId;

Expand All @@ -56,15 +55,6 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
return;
}

this.printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER);
if (!propertyHelper.meetsMinimum(KnownProperties.PRINT_STATS_AFTER, printStatsAfter, 1L)) {
logger.warn(KnownProperties.PRINT_STATS_AFTER + " must be greater than 0. Setting to default value of "
+ KnownProperties.getDefaultAsString(KnownProperties.PRINT_STATS_AFTER));
propertyHelper.setProperty(KnownProperties.PRINT_STATS_AFTER,
KnownProperties.getDefault(KnownProperties.PRINT_STATS_AFTER));
printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER);
}

rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ public enum PropertyType {
public static final String READ_CL = "spark.cdm.perfops.consistency.read";
public static final String WRITE_CL = "spark.cdm.perfops.consistency.write";
public static final String PERF_FETCH_SIZE = "spark.cdm.perfops.fetchSizeInRows";
public static final String PRINT_STATS_AFTER = "spark.cdm.perfops.printStatsAfter";
public static final String PRINT_STATS_PER_PART = "spark.cdm.perfops.printStatsPerPart";

static {
types.put(AUTOCORRECT_MISSING, PropertyType.BOOLEAN);
Expand Down Expand Up @@ -153,10 +151,6 @@ public enum PropertyType {
defaults.put(READ_CL, "LOCAL_QUORUM");
types.put(WRITE_CL, PropertyType.STRING);
defaults.put(WRITE_CL, "LOCAL_QUORUM");
types.put(PRINT_STATS_AFTER, PropertyType.NUMBER);
defaults.put(PRINT_STATS_AFTER, "100000");
types.put(PRINT_STATS_PER_PART, PropertyType.BOOLEAN);
defaults.put(PRINT_STATS_PER_PART, "false");
types.put(PERF_FETCH_SIZE, PropertyType.NUMBER);
defaults.put(PERF_FETCH_SIZE, "1000");
}
Expand Down
5 changes: 0 additions & 5 deletions src/resources/cdm-detailed.properties
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,6 @@ spark.cdm.trackRun.runId <auto-generated-unique-long-va
# .read : Default is LOCAL_QUORUM. Read consistency from Origin, and also from Target
# when records are read for comparison purposes.
# .write : Default is LOCAL_QUORUM. Write consistency to Target.
# .printStatsAfter : Default is 100000. Number of rows of processing after which a progress log
# entry will be made.
# .printStatsPerPart : Default is false. Print statistics for each part after it is processed.
# .fetchSizeInRows : Default is 1000. This affects the frequency of reads from Origin, and also the
# frequency of flushes to Target. A larger value will reduce the number of reads
# and writes, but will increase the memory requirements.
Expand All @@ -240,8 +237,6 @@ spark.cdm.perfops.ratelimit.origin 20000
spark.cdm.perfops.ratelimit.target 20000
#spark.cdm.perfops.consistency.read LOCAL_QUORUM
#spark.cdm.perfops.consistency.write LOCAL_QUORUM
#spark.cdm.perfops.printStatsAfter 100000
#spark.cdm.perfops.printStatsPerPart false
#spark.cdm.perfops.fetchSizeInRows 1000
#spark.cdm.perfops.errorLimit 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ public void writetime_filter() {

assertAll(
() -> {
when(record.getPk().getWriteTimestamp()).thenReturn(1500L);
when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(1500L);
assertFalse(originSelectStatement.shouldFilterRecord(record), "timestamp is range");
},
() -> {
when(record.getPk().getWriteTimestamp()).thenReturn(500L);
when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(500L);
assertTrue(originSelectStatement.shouldFilterRecord(record), "timestamp below range");
},
() -> {
when(record.getPk().getWriteTimestamp()).thenReturn(2500L);
when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(2500L);
assertTrue(originSelectStatement.shouldFilterRecord(record), "timestamp above range");
},
() -> {
when(record.getPk().getWriteTimestamp()).thenReturn(null);
when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(null);
assertFalse(originSelectStatement.shouldFilterRecord(record), "null timestamp");
}
);
Expand Down
7 changes: 3 additions & 4 deletions src/test/java/com/datastax/cdm/feature/WritetimeTTLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ public void getLargestWriteTimeStampWithCustomTimeTest() {
feature.loadProperties(propertyHelper);
feature.initializeAndValidate(originTable, targetTable);
Long largestWritetime = feature.getLargestWriteTimeStamp(originRow);
assertEquals(customWritetime, largestWritetime);
assertEquals(3000L, largestWritetime);
}

@Test
Expand Down Expand Up @@ -580,10 +580,9 @@ public void customWriteTime_withAutoWritetime() {

assertAll(
() -> assertFalse(feature.hasWriteTimestampFilter(), "hasWriteTimestampFilter"),
() -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns")
() -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns"),
() -> assertEquals(12345L, feature.getCustomWritetime(), "hasWritetimeColumns")
);

verify(originTable, times(0)).extendColumns(any(),any());
}

@Test
Expand Down

0 comments on commit 266ddc7

Please sign in to comment.