diff --git a/README.md b/README.md index 80eb362e..beff1f8b 100644 --- a/README.md +++ b/README.md @@ -96,14 +96,34 @@ Each line above represents a partition-range (`min,max`). Alternatively, you can ``` ./spark-submit --properties-file cdm.properties \ --conf spark.cdm.schema.origin.keyspaceTable="." \ - --conf spark.cdm.tokenrange.partitionFile="//" \ + --conf spark.cdm.tokenrange.partitionFile.input="//" \ --master "local[*]" --driver-memory 25G --executor-memory 25G \ --class com.datastax.cdm.job. cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt ``` This mode is specifically useful to processes a subset of partition-ranges that may have failed during a previous run. -> **Note:** -> A file named `./._partitions.csv` is auto generated by the Migration & Validation jobs in the above format containing any failed partition ranges. No file is created if there are no failed partitions. You can use this file as an input to process any failed partition in a following run. +A file named `./._partitions.csv` is auto-generated by the Migration & Validation jobs in the above format containing any failed partition ranges. No file is created if there are no failed partitions. This file can be used as an input to process any failed partition in a following run. You can also specify a different output file using the `spark.cdm.tokenrange.partitionFile.output` option. +``` +./spark-submit --properties-file cdm.properties \ + --conf spark.cdm.schema.origin.keyspaceTable="." \ + --conf spark.cdm.tokenrange.partitionFile.input="//" \ + --conf spark.cdm.tokenrange.partitionFile.output="//" \ + --master "local[*]" --driver-memory 25G --executor-memory 25G \ + --class com.datastax.cdm.job. cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt +``` + +For the Data-Validation step, use the conf option `-conf spark.cdm.tokenrange.partitionFile.appendOnDiff` as shown below. This allows the partition range to be outputted whenever there are differences, not just fails. +``` +./spark-submit --properties-file cdm.properties \ + --conf spark.cdm.schema.origin.keyspaceTable="." \ + --conf spark.cdm.tokenrange.partitionFile.input="//" \ + --conf spark.cdm.tokenrange.partitionFile.output="//" \ + --conf spark.cdm.tokenrange.partitionFile.appendOnDiff=true \ + --master "local[*]" --driver-memory 25G --executor-memory 25G \ + --class com.datastax.cdm.job. cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt +``` + +If `spark.cdm.tokenrange.partitionFile.input` or `spark.cdm.tokenrange.partitionFile.output` are not specified, the system will use `./._partitions.csv` as the default file. # Perform large-field Guardrail violation checks - The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field) `--class com.datastax.cdm.job.GuardrailCheck` as shown below diff --git a/SIT/features/06_partition_range/migrate_with_partitionfile.properties b/SIT/features/06_partition_range/migrate_with_partitionfile.properties index e7b3927f..706f85ca 100644 --- a/SIT/features/06_partition_range/migrate_with_partitionfile.properties +++ b/SIT/features/06_partition_range/migrate_with_partitionfile.properties @@ -18,5 +18,6 @@ spark.cdm.perfops.numParts 1 spark.cdm.autocorrect.missing true spark.cdm.autocorrect.mismatch true -spark.cdm.tokenrange.partitionFile ./partitions.csv +spark.cdm.tokenrange.partitionFile.input ./partitions.csv +spark.cdm.tokenrange.partitionFile.output ./partitions.csv diff --git a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java index 28a1a25d..43350bd9 100644 --- a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java +++ b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java @@ -34,7 +34,9 @@ public abstract class AbstractJobSession extends BaseJobSession { protected EnhancedSession targetSession; protected Guardrail guardrailFeature; protected boolean guardrailEnabled; - protected String partitionFile = SplitPartitions.getPartitionFile(propertyHelper); + protected boolean appendPartitionOnDiff = SplitPartitions.appendPartitionOnDiff(propertyHelper); + protected String partitionFileInput = SplitPartitions.getPartitionFileInput(propertyHelper); + protected String partitionFileOutput = SplitPartitions.getPartitionFileOutput(propertyHelper); protected JobCounter jobCounter; protected Long printStatsAfter; @@ -62,7 +64,8 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, maxRetries = propertyHelper.getInteger(KnownProperties.MAX_RETRIES); logger.info("PARAM -- Max Retries: {}", maxRetries); - logger.info("PARAM -- Partition file: {}", partitionFile); + logger.info("PARAM -- Partition file input: {}", partitionFileInput); + logger.info("PARAM -- Partition file output: {}", partitionFileOutput); logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate()); logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate()); diff --git a/src/main/java/com/datastax/cdm/job/BaseJobSession.java b/src/main/java/com/datastax/cdm/job/BaseJobSession.java index 962a564c..f1c450c3 100644 --- a/src/main/java/com/datastax/cdm/job/BaseJobSession.java +++ b/src/main/java/com/datastax/cdm/job/BaseJobSession.java @@ -99,7 +99,7 @@ private void appendToFile(String filePath, String content) throws IOException { StandardOpenOption.APPEND); } - protected void logFailedPartitionsInFile(String partitionFile, BigInteger min, BigInteger max) { + protected void logPartitionsInFile(String partitionFile, BigInteger min, BigInteger max) { try { appendToFile(partitionFile, min + "," + max); } catch (Exception ee) { diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSession.java b/src/main/java/com/datastax/cdm/job/CopyJobSession.java index 4a94b804..27b8bc39 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSession.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSession.java @@ -125,7 +125,7 @@ public void getDataAndInsert(BigInteger min, BigInteger max) { } catch (Exception e) { if (attempts == maxAttempts) { jobCounter.threadIncrement(JobCounter.CounterType.ERROR, jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE) - jobCounter.getCount(JobCounter.CounterType.SKIPPED)); - logFailedPartitionsInFile(partitionFile, min, max); + logPartitionsInFile(partitionFileOutput, min, max); } logger.error("Error occurred during Attempt#: {}", attempts, e); logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}", diff --git a/src/main/java/com/datastax/cdm/job/DiffJobSession.java b/src/main/java/com/datastax/cdm/job/DiffJobSession.java index 77801998..1f9a7866 100644 --- a/src/main/java/com/datastax/cdm/job/DiffJobSession.java +++ b/src/main/java/com/datastax/cdm/job/DiffJobSession.java @@ -42,6 +42,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.StreamSupport; @@ -112,6 +113,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) { ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max)); logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max); boolean done = false; + AtomicBoolean hasDiff = new AtomicBoolean(false); int maxAttempts = maxRetries + 1; for (int attempts = 1; attempts <= maxAttempts && !done; attempts++) { try { @@ -152,19 +154,27 @@ public void getDataAndDiff(BigInteger min, BigInteger max) { r.setAsyncTargetRow(targetResult); recordsToDiff.add(r); if (recordsToDiff.size() > fetchSizeInRows) { - diffAndClear(recordsToDiff); + if(diffAndClear(recordsToDiff)) { + hasDiff.set(true); + } } } // targetRecord!=null } // recordSet iterator } // shouldFilterRecord }); - diffAndClear(recordsToDiff); + if (diffAndClear(recordsToDiff)) { + hasDiff.set(true); + } done = true; + + if (hasDiff.get() && appendPartitionOnDiff) { + logPartitionsInFile(partitionFileOutput, min, max); + } } catch (Exception e) { logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}", Thread.currentThread().getId(), min, max, attempts, e); if (attempts == maxAttempts) { - logFailedPartitionsInFile(partitionFile, min, max); + logPartitionsInFile(partitionFileOutput, min, max); } } finally { jobCounter.globalIncrement(); @@ -173,18 +183,22 @@ public void getDataAndDiff(BigInteger min, BigInteger max) { } } - private void diffAndClear(List recordsToDiff) { + private boolean diffAndClear(List recordsToDiff) { + boolean isDiff = false; for (Record record : recordsToDiff) { try { - diff(record); + if (diff(record)) { + isDiff = true; + } } catch (Exception e) { logger.error("Could not perform diff for key {}: {}", record.getPk(), e); } } recordsToDiff.clear(); + return isDiff; } - private void diff(Record record) { + private boolean diff(Record record) { EnhancedPK originPK = record.getPk(); Row originRow = record.getOriginRow(); Row targetRow = record.getTargetRow(); @@ -194,7 +208,7 @@ private void diff(Record record) { logger.error("Missing target row found for key: {}", record.getPk()); if (autoCorrectMissing && isCounterTable && !forceCounterWhenMissing) { logger.error("{} is true, but not Inserting as {} is not enabled; key : {}", KnownProperties.AUTOCORRECT_MISSING, KnownProperties.AUTOCORRECT_MISSING_COUNTER, record.getPk()); - return; + return true; } //correct data @@ -204,7 +218,7 @@ private void diff(Record record) { jobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISSING); logger.error("Inserted missing row in target: {}", record.getPk()); } - return; + return true; } String diffData = isDifferent(originPK, originRow, targetRow); @@ -218,8 +232,11 @@ private void diff(Record record) { jobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISMATCH); logger.error("Corrected mismatch row in target: {}", record.getPk()); } + + return true; } else { jobCounter.threadIncrement(JobCounter.CounterType.VALID); + return false; } } diff --git a/src/main/java/com/datastax/cdm/job/SplitPartitions.java b/src/main/java/com/datastax/cdm/job/SplitPartitions.java index 604ad59e..9e716795 100644 --- a/src/main/java/com/datastax/cdm/job/SplitPartitions.java +++ b/src/main/java/com/datastax/cdm/job/SplitPartitions.java @@ -168,13 +168,24 @@ private static BufferedReader getfileReader(String fileName) { } } - public static String getPartitionFile(PropertyHelper propertyHelper) { - String filePath = propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE); - if (StringUtils.isAllBlank(filePath)) { - filePath = "./" + propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE) + "_partitions.csv"; + public static boolean appendPartitionOnDiff(PropertyHelper propertyHelper) { + return Boolean.TRUE.equals(propertyHelper.getBoolean(KnownProperties.TOKEN_RANGE_PARTITION_FILE_APPEND_ON_DIFF)); + } + + public static String getPartitionFileInput(PropertyHelper propertyHelper) { + if (!StringUtils.isAllBlank(propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE_INPUT))) { + return propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE_INPUT); + } + + return "./" + propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE) + "_partitions.csv"; + } + + public static String getPartitionFileOutput(PropertyHelper propertyHelper) { + if (!StringUtils.isAllBlank(propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE_OUTPUT))) { + return propertyHelper.getString(KnownProperties.TOKEN_RANGE_PARTITION_FILE_OUTPUT); } - return filePath; + return "./" + propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE) + "_partitions.csv"; } public static class PKRows implements Serializable { diff --git a/src/main/java/com/datastax/cdm/properties/KnownProperties.java b/src/main/java/com/datastax/cdm/properties/KnownProperties.java index b027c5a2..fb0ef45a 100644 --- a/src/main/java/com/datastax/cdm/properties/KnownProperties.java +++ b/src/main/java/com/datastax/cdm/properties/KnownProperties.java @@ -152,12 +152,17 @@ public enum PropertyType { } //========================================================================== - // Error handling + // Partition File //========================================================================== - public static final String TOKEN_RANGE_PARTITION_FILE = "spark.cdm.tokenrange.partitionFile"; + public static final String TOKEN_RANGE_PARTITION_FILE_APPEND_ON_DIFF = "spark.cdm.tokenrange.partitionFile.appendOnDiff"; + public static final String TOKEN_RANGE_PARTITION_FILE_INPUT = "spark.cdm.tokenrange.partitionFile.input"; + public static final String TOKEN_RANGE_PARTITION_FILE_OUTPUT = "spark.cdm.tokenrange.partitionFile.output"; static { - types.put(TOKEN_RANGE_PARTITION_FILE, PropertyType.STRING); + types.put(TOKEN_RANGE_PARTITION_FILE_APPEND_ON_DIFF, PropertyType.BOOLEAN); + types.put(TOKEN_RANGE_PARTITION_FILE_INPUT, PropertyType.STRING); + types.put(TOKEN_RANGE_PARTITION_FILE_OUTPUT, PropertyType.STRING); } + //========================================================================== // Guardrails and Transformations //========================================================================== diff --git a/src/main/scala/com/datastax/cdm/job/BaseJob.scala b/src/main/scala/com/datastax/cdm/job/BaseJob.scala index 9c72b9c1..83c87f84 100644 --- a/src/main/scala/com/datastax/cdm/job/BaseJob.scala +++ b/src/main/scala/com/datastax/cdm/job/BaseJob.scala @@ -15,7 +15,7 @@ */ package com.datastax.cdm.job -import com.datastax.cdm.job.SplitPartitions.getPartitionFile +import com.datastax.cdm.job.SplitPartitions.getPartitionFileInput import com.datastax.cdm.properties.{KnownProperties, PropertyHelper} import com.datastax.spark.connector.cql.CassandraConnector import org.apache.spark.{SparkConf, SparkContext} @@ -53,7 +53,7 @@ abstract class BaseJob[T: ClassTag] extends App { var originConnection: CassandraConnector = _ var targetConnection: CassandraConnector = _ - var partitionFileName: String = "" + var partitionFileNameInput: String = "" def setup(jobName: String, jobFactory: IJobSessionFactory[T]): Unit = { logBanner(jobName + " - Starting") @@ -66,7 +66,7 @@ abstract class BaseJob[T: ClassTag] extends App { sContext = spark.sparkContext sc = sContext.getConf propertyHelper = PropertyHelper.getInstance(sc); - this.partitionFileName = getPartitionFile(propertyHelper); + this.partitionFileNameInput = getPartitionFileInput(propertyHelper); consistencyLevel = propertyHelper.getString(KnownProperties.READ_CL) val connectionFetcher = new ConnectionFetcher(sContext, propertyHelper) diff --git a/src/main/scala/com/datastax/cdm/job/BasePKJob.scala b/src/main/scala/com/datastax/cdm/job/BasePKJob.scala index 7d03b99e..dcbb192d 100644 --- a/src/main/scala/com/datastax/cdm/job/BasePKJob.scala +++ b/src/main/scala/com/datastax/cdm/job/BasePKJob.scala @@ -21,6 +21,6 @@ abstract class BasePKJob extends BaseJob[SplitPartitions.PKRows] { override def getParts(pieces: Int): util.Collection[SplitPartitions.PKRows] = { // This takes a file with N rows and divides it into pieces of size N/pieces // Each PKRows object contains a list of Strings that contain the PK to be parsed - SplitPartitions.getRowPartsFromFile(pieces, this.partitionFileName) + SplitPartitions.getRowPartsFromFile(pieces, this.partitionFileNameInput) } } \ No newline at end of file diff --git a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala index 5329bd04..4fc0c29e 100644 --- a/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala +++ b/src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala @@ -20,10 +20,10 @@ import scala.reflect.io.File abstract class BasePartitionJob extends BaseJob[SplitPartitions.Partition] { override def getParts(pieces: Int): util.Collection[SplitPartitions.Partition] = { - if (!File(this.partitionFileName).exists) { + if (!File(this.partitionFileNameInput).exists) { SplitPartitions.getRandomSubPartitions(pieces, minPartition, maxPartition, coveragePercent) } else { - SplitPartitions.getSubPartitionsFromFile(pieces, this.partitionFileName) + SplitPartitions.getSubPartitionsFromFile(pieces, this.partitionFileNameInput) } } diff --git a/src/resources/cdm-detailed.properties b/src/resources/cdm-detailed.properties index 54d4d95f..f237bd86 100644 --- a/src/resources/cdm-detailed.properties +++ b/src/resources/cdm-detailed.properties @@ -151,17 +151,30 @@ spark.cdm.schema.origin.keyspaceTable keyspace_name.table_name # 5323. The corresponding counter in Origin is also 5323. At some point, the Target # counter gets DELETEd. Should the .missing record be re-inserted before # the DELETE gets tombstoned, the counter will zombie back to life, and the -# counter will become 5323+5323 = 10646. +# counter will become 5323+5323 = 10646. + +# spark.cdm.tokenrange +# .partitionFile +# .input : Default is "./._partitions.csv". Note, this file is used as +# input when applicable. If the file exists, only the partition ranges +# in this file will be Migrated or Validated. +# spark.cdm.tokenrange +# .partitionFile +# .output : Default is "./._partitions.csv". Note, this file is used as +# output when applicable. If exceptions occur during Migrating or Validation, +# or if `spark.cdm.tokenrange.partitionFile.appendOnDiff` is set to true, +# partition ranges with exceptions will be logged to this file. # spark.cdm.tokenrange -# .partitionFile : Default is "./._partitions.csv". Note, this file is used as -# input as well as output when applicable. If the file exists, only the partition ranges -# in this file will be Migrated or Validated. Similarly, if exceptions occur during -# Migrating or Validation, partition ranges with exceptions will be logged to this file. +# .partitionFile +# .appendOnDiff : Default is false. If it is set to true, the partition range would be outputted +# to `spark.cdm.tokenrange.partitionFile.output` if there are any differences. #----------------------------------------------------------------------------------------------------------- spark.cdm.autocorrect.missing false spark.cdm.autocorrect.mismatch false #spark.cdm.autocorrect.missing.counter false -#spark.cdm.tokenrange.partitionFile /tokenrange/exception/path/keyspace.tablename_partitions.csv +#spark.cdm.tokenrange.partitionFile.input /tokenrange/path/input/keyspace.tablename_partitions.csv +#spark.cdm.tokenrange.partitionFile.output /tokenrange/path/output/keyspace.tablename_partitions.csv +#spark.cdm.tokenrange.partitionFile.appendOnDiff false #=========================================================================================================== # Performance and Operations Parameters affecting throughput, error handling, and similar concerns. diff --git a/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java b/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java index bf6398b9..bbe1fdbd 100644 --- a/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java +++ b/src/test/java/com/datastax/cdm/job/SplitPartitionsTest.java @@ -14,6 +14,10 @@ package com.datastax.cdm.job; +import com.datastax.cdm.properties.PropertyHelper; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -25,6 +29,10 @@ import static org.junit.jupiter.api.Assertions.*; public class SplitPartitionsTest { + @AfterEach + void tearDown() { + PropertyHelper.destroyInstance(); + } @Test void getRandomSubPartitionsTest() { @@ -90,4 +98,32 @@ void PartitionMinMaxValidMinMaxTest() { assertEquals(BigInteger.valueOf(-507900353496146534l), (new SplitPartitions.PartitionMinMax(" -507900353496146534, 456")).min); assertEquals(BigInteger.valueOf(9101008634499147643l), (new SplitPartitions.PartitionMinMax(" -507900353496146534,9101008634499147643")).max); } + + @Test + void appendPartitionOnDiff() { + PropertyHelper helper = PropertyHelper.getInstance(); + assertFalse(SplitPartitions.appendPartitionOnDiff(helper)); + helper.setProperty("spark.cdm.tokenrange.partitionFile.appendOnDiff", true); + assertTrue(SplitPartitions.appendPartitionOnDiff(helper)); + } + + @Test + void getPartitionFileInput() { + PropertyHelper helper = PropertyHelper.getInstance(); + helper.setProperty("spark.cdm.schema.origin.keyspaceTable", "tb"); + assertEquals("./tb_partitions.csv", SplitPartitions.getPartitionFileInput(helper)); + + helper.setProperty("spark.cdm.tokenrange.partitionFile.input", "./file_input.csv"); + assertEquals("./file_input.csv", SplitPartitions.getPartitionFileInput(helper)); + } + + @Test + void getPartitionFileOutput() { + PropertyHelper helper = PropertyHelper.getInstance(); + helper.setProperty("spark.cdm.schema.origin.keyspaceTable", "tb"); + assertEquals("./tb_partitions.csv", SplitPartitions.getPartitionFileOutput(helper)); + + helper.setProperty("spark.cdm.tokenrange.partitionFile.output", "./file_output.csv"); + assertEquals("./file_output.csv", SplitPartitions.getPartitionFileOutput(helper)); + } }