Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Writetime filter + custom writetimestamp work correctly when used together #331

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions PERF/cdm-v3.properties
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ spark.batchSize 10
# ENABLE ONLY IF YOU WANT TO MIGRATE/VALIDATE SOME % OF ROWS (NOT 100%)
#spark.coveragePercent 100

# ENABLE ONLY IF WANT LOG STATS MORE OR LESS FREQUENTLY THAN DEFAULT
#spark.printStatsAfter 100000

# ENABLE ONLY IF YOU WANT TO USE READ AND/OR WRITE CONSISTENCY OTHER THAN LOCAL_QUORUM
#spark.consistency.read LOCAL_QUORUM
#spark.consistency.write LOCAL_QUORUM
Expand Down
3 changes: 0 additions & 3 deletions PERF/cdm-v4.properties
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ spark.cdm.autocorrect.mismatch false
# .read : Default is LOCAL_QUORUM. Read consistency from Origin, and also from Target
# when records are read for comparison purposes.
# .write : Default is LOCAL_QUORUM. Write consistency to Target.
# .printStatsAfter : Default is 100000. Number of rows of processing after which a progress log
# entry will be made.
# .fetchSizeInRows : Default is 1000. This affects the frequency of reads from Origin, and also the
# frequency of flushes to Target.
# .error.limit : Default is 0. Controls how many errors a thread may encounter during MigrateData
Expand All @@ -179,7 +177,6 @@ spark.cdm.perfops.readRateLimit 5000
spark.cdm.perfops.writeRateLimit 5000
#spark.cdm.perfops.consistency.read LOCAL_QUORUM
#spark.cdm.perfops.consistency.write LOCAL_QUORUM
#spark.cdm.perfops.printStatsAfter 100000
#spark.cdm.perfops.fetchSizeInRows 1000
#spark.cdm.perfops.error.limit 0

Expand Down
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Release Notes
## [5.1.1] - 2024-11-22
- Bug fix: Writetime filter does not work as expected when custom writetimestamp is also used (issue #327).
- Removed deprecated properties `printStatsAfter` and `printStatsPerPart`. Run metrics should now be tracked using the `trackRun` feature instead.

## [5.1.0] - 2024-11-15
- Improves metrics output by producing stats labels in an intuitive and consistent order
- Refactored JobCounter by removing any references to `thread` or `global` as CDM operations are now isolated within partition-ranges (`parts`). Each such `part` is then parallelly processed and aggregated by Spark.
Expand Down
3 changes: 0 additions & 3 deletions SIT/regression/03_performance/migrate.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,3 @@ spark.cdm.schema.target.keyspaceTable target.regression_performance

spark.cdm.perfops.numParts 32
spark.cdm.perfops.batchSize 1

spark.cdm.perfops.printStatsAfter 450
spark.cdm.perfops.printStatsPerPart true
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract class OriginSelectStatement extends BaseCdmStatement {
private final Boolean filterColumnEnabled;
private final Integer filterColumnIndex;
private final String filterColumnString;
private final WritetimeTTL writetimeTTLFeature;

public OriginSelectStatement(IPropertyHelper propertyHelper, EnhancedSession session) {
super(propertyHelper, session);
Expand All @@ -50,7 +51,7 @@ public OriginSelectStatement(IPropertyHelper propertyHelper, EnhancedSession ses
throw new RuntimeException("No columns found in table " + cqlTable.getTableName());
}

WritetimeTTL writetimeTTLFeature = (WritetimeTTL) cqlTable.getFeature(Featureset.WRITETIME_TTL);
this.writetimeTTLFeature = (WritetimeTTL) cqlTable.getFeature(Featureset.WRITETIME_TTL);
if (null != writetimeTTLFeature && writetimeTTLFeature.isEnabled()
&& writetimeTTLFeature.hasWriteTimestampFilter()) {
writeTimestampFilterEnabled = true;
Expand Down Expand Up @@ -114,14 +115,15 @@ public boolean shouldFilterRecord(Record record) {
}

if (this.writeTimestampFilterEnabled) {
// only process rows greater than writeTimeStampFilter
Long originWriteTimeStamp = record.getPk().getWriteTimestamp();
// only process rows within the writeTimeStampFilter
Long originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow());
if (null == originWriteTimeStamp) {
return false;
}
if (originWriteTimeStamp < minWriteTimeStampFilter || originWriteTimeStamp > maxWriteTimeStampFilter) {
if (logger.isInfoEnabled())
logger.info("Timestamp filter removing: {}", record.getPk());
logger.info("Timestamp filter removing record with primary key: {} with write timestamp: {}", record.getPk(),
originWriteTimeStamp);
return true;
}
}
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/datastax/cdm/data/PKFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,16 @@ public EnhancedPK getTargetPK(Row originRow) {
Long originWriteTimeStamp = null;
Integer originTTL = null;
if (FeatureFactory.isEnabled(writetimeTTLFeature)) {
originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(originRow);
originTTL = writetimeTTLFeature.getLargestTTL(originRow);
if (writetimeTTLFeature.getCustomWritetime() > 0) {
originWriteTimeStamp = writetimeTTLFeature.getCustomWritetime();
} else {
originWriteTimeStamp = writetimeTTLFeature.getLargestWriteTimeStamp(originRow);
}
if (writetimeTTLFeature.getCustomTTL() > 0) {
originTTL = writetimeTTLFeature.getCustomTTL().intValue();
} else {
originTTL = writetimeTTLFeature.getLargestTTL(originRow);
}
}
if (explodeMapTargetKeyColumnIndex < 0) {
return new EnhancedPK(this, newValues, getPKClasses(Side.TARGET), originTTL, originWriteTimeStamp);
Expand Down
11 changes: 3 additions & 8 deletions src/main/java/com/datastax/cdm/feature/WritetimeTTL.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,7 @@ public boolean hasWritetimeColumns() {

public Long getLargestWriteTimeStamp(Row row) {
if (logDebug)
logger.debug("getLargestWriteTimeStamp: customWritetime={}, writetimeSelectColumnIndexes={}",
customWritetime, writetimeSelectColumnIndexes);
if (this.customWritetime > 0)
return this.customWritetime;
logger.debug("getLargestWriteTimeStamp: writetimeSelectColumnIndexes={}", writetimeSelectColumnIndexes);
if (null == this.writetimeSelectColumnIndexes || this.writetimeSelectColumnIndexes.isEmpty())
return null;

Expand All @@ -262,9 +259,7 @@ private OptionalLong getMaxWriteTimeStamp(Row row) {

public Integer getLargestTTL(Row row) {
if (logDebug)
logger.debug("getLargestTTL: customTTL={}, ttlSelectColumnIndexes={}", customTTL, ttlSelectColumnIndexes);
if (this.customTTL > 0)
return this.customTTL.intValue();
logger.debug("getLargestTTL: ttlSelectColumnIndexes={}", ttlSelectColumnIndexes);
msmygit marked this conversation as resolved.
Show resolved Hide resolved
if (null == this.ttlSelectColumnIndexes || this.ttlSelectColumnIndexes.isEmpty())
return null;

Expand Down Expand Up @@ -317,7 +312,7 @@ private void validateTTLColumns(CqlTable originTable) {
}

private void validateWritetimeColumns(CqlTable originTable) {
if (writetimeNames == null || writetimeNames.isEmpty() || customWritetime > 0) {
if (writetimeNames == null || writetimeNames.isEmpty()) {
return;
}

Expand Down
10 changes: 0 additions & 10 deletions src/main/java/com/datastax/cdm/job/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public abstract class AbstractJobSession<T> extends BaseJobSession {
protected EnhancedSession originSession;
protected EnhancedSession targetSession;
protected Guardrail guardrailFeature;
protected Long printStatsAfter;
protected TrackRun trackRunFeature;
protected long runId;

Expand All @@ -56,15 +55,6 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
return;
}

this.printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER);
if (!propertyHelper.meetsMinimum(KnownProperties.PRINT_STATS_AFTER, printStatsAfter, 1L)) {
logger.warn(KnownProperties.PRINT_STATS_AFTER + " must be greater than 0. Setting to default value of "
+ KnownProperties.getDefaultAsString(KnownProperties.PRINT_STATS_AFTER));
propertyHelper.setProperty(KnownProperties.PRINT_STATS_AFTER,
KnownProperties.getDefault(KnownProperties.PRINT_STATS_AFTER));
printStatsAfter = propertyHelper.getLong(KnownProperties.PRINT_STATS_AFTER);
}

rateLimiterOrigin = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_ORIGIN));
rateLimiterTarget = RateLimiter.create(propertyHelper.getInteger(KnownProperties.PERF_RATELIMIT_TARGET));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ public enum PropertyType {
public static final String READ_CL = "spark.cdm.perfops.consistency.read";
public static final String WRITE_CL = "spark.cdm.perfops.consistency.write";
public static final String PERF_FETCH_SIZE = "spark.cdm.perfops.fetchSizeInRows";
public static final String PRINT_STATS_AFTER = "spark.cdm.perfops.printStatsAfter";
public static final String PRINT_STATS_PER_PART = "spark.cdm.perfops.printStatsPerPart";

static {
types.put(AUTOCORRECT_MISSING, PropertyType.BOOLEAN);
Expand Down Expand Up @@ -153,10 +151,6 @@ public enum PropertyType {
defaults.put(READ_CL, "LOCAL_QUORUM");
types.put(WRITE_CL, PropertyType.STRING);
defaults.put(WRITE_CL, "LOCAL_QUORUM");
types.put(PRINT_STATS_AFTER, PropertyType.NUMBER);
defaults.put(PRINT_STATS_AFTER, "100000");
types.put(PRINT_STATS_PER_PART, PropertyType.BOOLEAN);
defaults.put(PRINT_STATS_PER_PART, "false");
types.put(PERF_FETCH_SIZE, PropertyType.NUMBER);
defaults.put(PERF_FETCH_SIZE, "1000");
}
Expand Down
5 changes: 0 additions & 5 deletions src/resources/cdm-detailed.properties
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,6 @@ spark.cdm.trackRun.runId <auto-generated-unique-long-va
# .read : Default is LOCAL_QUORUM. Read consistency from Origin, and also from Target
# when records are read for comparison purposes.
# .write : Default is LOCAL_QUORUM. Write consistency to Target.
# .printStatsAfter : Default is 100000. Number of rows of processing after which a progress log
# entry will be made.
# .printStatsPerPart : Default is false. Print statistics for each part after it is processed.
# .fetchSizeInRows : Default is 1000. This affects the frequency of reads from Origin, and also the
# frequency of flushes to Target. A larger value will reduce the number of reads
# and writes, but will increase the memory requirements.
Expand All @@ -240,8 +237,6 @@ spark.cdm.perfops.ratelimit.origin 20000
spark.cdm.perfops.ratelimit.target 20000
#spark.cdm.perfops.consistency.read LOCAL_QUORUM
#spark.cdm.perfops.consistency.write LOCAL_QUORUM
#spark.cdm.perfops.printStatsAfter 100000
#spark.cdm.perfops.printStatsPerPart false
#spark.cdm.perfops.fetchSizeInRows 1000
#spark.cdm.perfops.errorLimit 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ public void writetime_filter() {

assertAll(
() -> {
when(record.getPk().getWriteTimestamp()).thenReturn(1500L);
when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(1500L);
assertFalse(originSelectStatement.shouldFilterRecord(record), "timestamp is range");
},
() -> {
when(record.getPk().getWriteTimestamp()).thenReturn(500L);
when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(500L);
assertTrue(originSelectStatement.shouldFilterRecord(record), "timestamp below range");
},
() -> {
when(record.getPk().getWriteTimestamp()).thenReturn(2500L);
when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(2500L);
assertTrue(originSelectStatement.shouldFilterRecord(record), "timestamp above range");
},
() -> {
when(record.getPk().getWriteTimestamp()).thenReturn(null);
when(writetimeTTLFeature.getLargestWriteTimeStamp(record.getOriginRow())).thenReturn(null);
assertFalse(originSelectStatement.shouldFilterRecord(record), "null timestamp");
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ public void getLargestWriteTimeStampWithCustomTimeTest() {
feature.loadProperties(propertyHelper);
feature.initializeAndValidate(originTable, targetTable);
Long largestWritetime = feature.getLargestWriteTimeStamp(originRow);
assertEquals(customWritetime, largestWritetime);
assertEquals(3000L, largestWritetime);
}

@Test
Expand Down Expand Up @@ -580,10 +580,9 @@ public void customWriteTime_withAutoWritetime() {

assertAll(
() -> assertFalse(feature.hasWriteTimestampFilter(), "hasWriteTimestampFilter"),
() -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns")
() -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns"),
() -> assertEquals(12345L, feature.getCustomWritetime(), "hasWritetimeColumns")
);

verify(originTable, times(0)).extendColumns(any(),any());
}

@Test
Expand Down