Skip to content

Commit

Permalink
Merge pull request #261 from guofei/output-partition-option
Browse files Browse the repository at this point in the history
Add options to output partition range
  • Loading branch information
pravinbhat authored May 31, 2024
2 parents ed0d750 + b8e7be0 commit 6fd0a83
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 36 deletions.
26 changes: 23 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,34 @@ Each line above represents a partition-range (`min,max`). Alternatively, you can
```
./spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.cdm.tokenrange.partitionFile="/<path-to-file>/<csv-input-filename>" \
--conf spark.cdm.tokenrange.partitionFile.input="/<path-to-file>/<csv-input-filename>" \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```
This mode is specifically useful to processes a subset of partition-ranges that may have failed during a previous run.

> **Note:**
> A file named `./<keyspacename>.<tablename>_partitions.csv` is auto generated by the Migration & Validation jobs in the above format containing any failed partition ranges. No file is created if there are no failed partitions. You can use this file as an input to process any failed partition in a following run.
A file named `./<keyspacename>.<tablename>_partitions.csv` is auto-generated by the Migration & Validation jobs in the above format containing any failed partition ranges. No file is created if there are no failed partitions. This file can be used as an input to process any failed partition in a following run. You can also specify a different output file using the `spark.cdm.tokenrange.partitionFile.output` option.
```
./spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.cdm.tokenrange.partitionFile.input="/<path-to-file>/<csv-input-filename>" \
--conf spark.cdm.tokenrange.partitionFile.output="/<path-to-file>/<csv-output-filename>" \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```

For the Data-Validation step, use the conf option `-conf spark.cdm.tokenrange.partitionFile.appendOnDiff` as shown below. This allows the partition range to be outputted whenever there are differences, not just fails.
```
./spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.cdm.tokenrange.partitionFile.input="/<path-to-file>/<csv-input-filename>" \
--conf spark.cdm.tokenrange.partitionFile.output="/<path-to-file>/<csv-output-filename>" \
--conf spark.cdm.tokenrange.partitionFile.appendOnDiff=true \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```

If `spark.cdm.tokenrange.partitionFile.input` or `spark.cdm.tokenrange.partitionFile.output` are not specified, the system will use `./<keyspacename>.<tablename>_partitions.csv` as the default file.

# Perform large-field Guardrail violation checks
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field) `--class com.datastax.cdm.job.GuardrailCheck` as shown below
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ spark.cdm.perfops.numParts 1
spark.cdm.autocorrect.missing true
spark.cdm.autocorrect.mismatch true

spark.cdm.tokenrange.partitionFile ./partitions.csv
spark.cdm.tokenrange.partitionFile.input ./partitions.csv
spark.cdm.tokenrange.partitionFile.output ./partitions.csv

7 changes: 5 additions & 2 deletions src/main/java/com/datastax/cdm/job/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public abstract class AbstractJobSession<T> extends BaseJobSession {
protected EnhancedSession targetSession;
protected Guardrail guardrailFeature;
protected boolean guardrailEnabled;
protected String partitionFile = SplitPartitions.getPartitionFile(propertyHelper);
protected boolean appendPartitionOnDiff = SplitPartitions.appendPartitionOnDiff(propertyHelper);
protected String partitionFileInput = SplitPartitions.getPartitionFileInput(propertyHelper);
protected String partitionFileOutput = SplitPartitions.getPartitionFileOutput(propertyHelper);
protected JobCounter jobCounter;
protected Long printStatsAfter;

Expand Down Expand Up @@ -62,7 +64,8 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
maxRetries = propertyHelper.getInteger(KnownProperties.MAX_RETRIES);

logger.info("PARAM -- Max Retries: {}", maxRetries);
logger.info("PARAM -- Partition file: {}", partitionFile);
logger.info("PARAM -- Partition file input: {}", partitionFileInput);
logger.info("PARAM -- Partition file output: {}", partitionFileOutput);
logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/datastax/cdm/job/BaseJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private void appendToFile(String filePath, String content) throws IOException {
StandardOpenOption.APPEND);
}

protected void logFailedPartitionsInFile(String partitionFile, BigInteger min, BigInteger max) {
protected void logPartitionsInFile(String partitionFile, BigInteger min, BigInteger max) {
try {
appendToFile(partitionFile, min + "," + max);
} catch (Exception ee) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/datastax/cdm/job/CopyJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
} catch (Exception e) {
if (attempts == maxAttempts) {
jobCounter.threadIncrement(JobCounter.CounterType.ERROR, jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE) - jobCounter.getCount(JobCounter.CounterType.SKIPPED));
logFailedPartitionsInFile(partitionFile, min, max);
logPartitionsInFile(partitionFileOutput, min, max);
}
logger.error("Error occurred during Attempt#: {}", attempts, e);
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
Expand Down
33 changes: 25 additions & 8 deletions src/main/java/com/datastax/cdm/job/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -112,6 +113,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
boolean done = false;
AtomicBoolean hasDiff = new AtomicBoolean(false);
int maxAttempts = maxRetries + 1;
for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) {
try {
Expand Down Expand Up @@ -152,19 +154,27 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
r.setAsyncTargetRow(targetResult);
recordsToDiff.add(r);
if (recordsToDiff.size() > fetchSizeInRows) {
diffAndClear(recordsToDiff);
if(diffAndClear(recordsToDiff)) {
hasDiff.set(true);
}
}
} // targetRecord!=null
} // recordSet iterator
} // shouldFilterRecord
});
diffAndClear(recordsToDiff);
if (diffAndClear(recordsToDiff)) {
hasDiff.set(true);
}
done = true;

if (hasDiff.get() && appendPartitionOnDiff) {
logPartitionsInFile(partitionFileOutput, min, max);
}
} catch (Exception e) {
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
Thread.currentThread().getId(), min, max, attempts, e);
if (attempts == maxAttempts) {
logFailedPartitionsInFile(partitionFile, min, max);
logPartitionsInFile(partitionFileOutput, min, max);
}
} finally {
jobCounter.globalIncrement();
Expand All @@ -173,18 +183,22 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
}
}

private void diffAndClear(List<Record> recordsToDiff) {
private boolean diffAndClear(List<Record> recordsToDiff) {
boolean isDiff = false;
for (Record record : recordsToDiff) {
try {
diff(record);
if (diff(record)) {
isDiff = true;
}
} catch (Exception e) {
logger.error("Could not perform diff for key {}: {}", record.getPk(), e);
}
}
recordsToDiff.clear();
return isDiff;
}

private void diff(Record record) {
private boolean diff(Record record) {
EnhancedPK originPK = record.getPk();
Row originRow = record.getOriginRow();
Row targetRow = record.getTargetRow();
Expand All @@ -194,7 +208,7 @@ private void diff(Record record) {
logger.error("Missing target row found for key: {}", record.getPk());
if (autoCorrectMissing && isCounterTable && !forceCounterWhenMissing) {
logger.error("{} is true, but not Inserting as {} is not enabled; key : {}", KnownProperties.AUTOCORRECT_MISSING, KnownProperties.AUTOCORRECT_MISSING_COUNTER, record.getPk());
return;
return true;
}

//correct data
Expand All @@ -204,7 +218,7 @@ private void diff(Record record) {
jobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISSING);
logger.error("Inserted missing row in target: {}", record.getPk());
}
return;
return true;
}

String diffData = isDifferent(originPK, originRow, targetRow);
Expand All @@ -218,8 +232,11 @@ private void diff(Record record) {
jobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISMATCH);
logger.error("Corrected mismatch row in target: {}", record.getPk());
}

return true;
} else {
jobCounter.threadIncrement(JobCounter.CounterType.VALID);
return false;
}
}

Expand Down
21 changes: 16 additions & 5 deletions src/main/java/com/datastax/cdm/job/SplitPartitions.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,24 @@ private static BufferedReader getfileReader(String fileName) {
}
}

public static String getPartitionFile(PropertyHelper propertyHelper) {
String filePath = propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE);
if (StringUtils.isAllBlank(filePath)) {
filePath = "./" + propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE) + "_partitions.csv";
public static boolean appendPartitionOnDiff(PropertyHelper propertyHelper) {
return Boolean.TRUE.equals(propertyHelper.getBoolean(KnownProperties.TOKEN_RANGE_PARTITION_FILE_APPEND_ON_DIFF));
}

public static String getPartitionFileInput(PropertyHelper propertyHelper) {
if (!StringUtils.isAllBlank(propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE_INPUT))) {
return propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE_INPUT);
}

return "./" + propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE) + "_partitions.csv";
}

public static String getPartitionFileOutput(PropertyHelper propertyHelper) {
if (!StringUtils.isAllBlank(propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE_OUTPUT))) {
return propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE_OUTPUT);
}

return filePath;
return "./" + propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE) + "_partitions.csv";
}

public static class PKRows implements Serializable {
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/com/datastax/cdm/properties/KnownProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,17 @@ public enum PropertyType {
}

//==========================================================================
// Error handling
// Partition File
//==========================================================================
public static final String TOKEN_RANGE_PARTITION_FILE = "spark.cdm.tokenrange.partitionFile";
public static final String TOKEN_RANGE_PARTITION_FILE_APPEND_ON_DIFF = "spark.cdm.tokenrange.partitionFile.appendOnDiff";
public static final String TOKEN_RANGE_PARTITION_FILE_INPUT = "spark.cdm.tokenrange.partitionFile.input";
public static final String TOKEN_RANGE_PARTITION_FILE_OUTPUT = "spark.cdm.tokenrange.partitionFile.output";
static {
types.put(TOKEN_RANGE_PARTITION_FILE, PropertyType.STRING);
types.put(TOKEN_RANGE_PARTITION_FILE_APPEND_ON_DIFF, PropertyType.BOOLEAN);
types.put(TOKEN_RANGE_PARTITION_FILE_INPUT, PropertyType.STRING);
types.put(TOKEN_RANGE_PARTITION_FILE_OUTPUT, PropertyType.STRING);
}

//==========================================================================
// Guardrails and Transformations
//==========================================================================
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/datastax/cdm/job/BaseJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.datastax.cdm.job

import com.datastax.cdm.job.SplitPartitions.getPartitionFile
import com.datastax.cdm.job.SplitPartitions.getPartitionFileInput
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.{SparkConf, SparkContext}
Expand Down Expand Up @@ -53,7 +53,7 @@ abstract class BaseJob[T: ClassTag] extends App {

var originConnection: CassandraConnector = _
var targetConnection: CassandraConnector = _
var partitionFileName: String = ""
var partitionFileNameInput: String = ""

def setup(jobName: String, jobFactory: IJobSessionFactory[T]): Unit = {
logBanner(jobName + " - Starting")
Expand All @@ -66,7 +66,7 @@ abstract class BaseJob[T: ClassTag] extends App {
sContext = spark.sparkContext
sc = sContext.getConf
propertyHelper = PropertyHelper.getInstance(sc);
this.partitionFileName = getPartitionFile(propertyHelper);
this.partitionFileNameInput = getPartitionFileInput(propertyHelper);

consistencyLevel = propertyHelper.getString(KnownProperties.READ_CL)
val connectionFetcher = new ConnectionFetcher(sContext, propertyHelper)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/datastax/cdm/job/BasePKJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ abstract class BasePKJob extends BaseJob[SplitPartitions.PKRows] {
override def getParts(pieces: Int): util.Collection[SplitPartitions.PKRows] = {
// This takes a file with N rows and divides it into pieces of size N/pieces
// Each PKRows object contains a list of Strings that contain the PK to be parsed
SplitPartitions.getRowPartsFromFile(pieces, this.partitionFileName)
SplitPartitions.getRowPartsFromFile(pieces, this.partitionFileNameInput)
}
}
4 changes: 2 additions & 2 deletions src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import scala.reflect.io.File

abstract class BasePartitionJob extends BaseJob[SplitPartitions.Partition] {
override def getParts(pieces: Int): util.Collection[SplitPartitions.Partition] = {
if (!File(this.partitionFileName).exists) {
if (!File(this.partitionFileNameInput).exists) {
SplitPartitions.getRandomSubPartitions(pieces, minPartition, maxPartition, coveragePercent)
} else {
SplitPartitions.getSubPartitionsFromFile(pieces, this.partitionFileName)
SplitPartitions.getSubPartitionsFromFile(pieces, this.partitionFileNameInput)
}
}

Expand Down
25 changes: 19 additions & 6 deletions src/resources/cdm-detailed.properties
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,30 @@ spark.cdm.schema.origin.keyspaceTable keyspace_name.table_name
# 5323. The corresponding counter in Origin is also 5323. At some point, the Target
# counter gets DELETEd. Should the .missing record be re-inserted before
# the DELETE gets tombstoned, the counter will zombie back to life, and the
# counter will become 5323+5323 = 10646.
# counter will become 5323+5323 = 10646.

# spark.cdm.tokenrange
# .partitionFile
# .input : Default is "./<keyspace>.<tablename>_partitions.csv". Note, this file is used as
# input when applicable. If the file exists, only the partition ranges
# in this file will be Migrated or Validated.
# spark.cdm.tokenrange
# .partitionFile
# .output : Default is "./<keyspace>.<tablename>_partitions.csv". Note, this file is used as
# output when applicable. If exceptions occur during Migrating or Validation,
# or if `spark.cdm.tokenrange.partitionFile.appendOnDiff` is set to true,
# partition ranges with exceptions will be logged to this file.
# spark.cdm.tokenrange
# .partitionFile : Default is "./<keyspace>.<tablename>_partitions.csv". Note, this file is used as
# input as well as output when applicable. If the file exists, only the partition ranges
# in this file will be Migrated or Validated. Similarly, if exceptions occur during
# Migrating or Validation, partition ranges with exceptions will be logged to this file.
# .partitionFile
# .appendOnDiff : Default is false. If it is set to true, the partition range would be outputted
# to `spark.cdm.tokenrange.partitionFile.output` if there are any differences.
#-----------------------------------------------------------------------------------------------------------
spark.cdm.autocorrect.missing false
spark.cdm.autocorrect.mismatch false
#spark.cdm.autocorrect.missing.counter false
#spark.cdm.tokenrange.partitionFile /tokenrange/exception/path/keyspace.tablename_partitions.csv
#spark.cdm.tokenrange.partitionFile.input /tokenrange/path/input/keyspace.tablename_partitions.csv
#spark.cdm.tokenrange.partitionFile.output /tokenrange/path/output/keyspace.tablename_partitions.csv
#spark.cdm.tokenrange.partitionFile.appendOnDiff false

#===========================================================================================================
# Performance and Operations Parameters affecting throughput, error handling, and similar concerns.
Expand Down
Loading

0 comments on commit 6fd0a83

Please sign in to comment.