Skip to content

Commit

Permalink
Merge pull request #173 from datastax/feature/CDM-87
Browse files Browse the repository at this point in the history
CDM-87 Implemented partition-range exception file generation for Migrate
  • Loading branch information
pravinbhat authored Jun 16, 2023
2 parents 06cc507 + 3b2f683 commit ffcc115
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 165 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Migrate and Validate Tables between Origin and Target Cassandra Clusters.
- All migration tools (`cassandra-data-migrator` + `dsbulk` + `cqlsh`) would be available in the `/assets/` folder of the container

## Install as a JAR file
- Download the latest jar file from the GitHub [packages area here](https://github.com/orgs/datastax/packages?repo_name=cassandra-data-migrator)
- Download the latest jar file from the GitHub [packages area here](https://github.com/datastax/cassandra-data-migrator/packages/1832128)

### Prerequisite
- Install Java8 as spark binaries are compiled with it.
Expand Down Expand Up @@ -101,27 +101,27 @@ When running in above mode the tool assumes a `partitions.csv` file to be presen
This mode is specifically useful to processes a subset of partition-ranges that may have failed during a previous run.

> **Note:**
> Here is a quick tip to prepare `partitions.csv` from the log file,
> A file ending with `*_partitions.csv` will be auto created by the Migration & Validation job in the above format containing any failed partition ranges. Just rename it as below & run the above job.
```
grep "ERROR CopyJobSession: Error with PartitionRange" /path/to/logfile_name.txt | awk '{print $13","$15}' > partitions.csv
mv <keyspace>.<table>_partitions.csv partitions.csv
```
# Data validation for specific partition ranges
- You can also use the tool to validate data for a specific partition ranges using class option `--class com.datastax.cdm.job.DiffPartitionsFromFile` as shown below,
```
./spark-submit --properties-file cdm.properties /
--conf spark.origin.keyspaceTable="<keyspace-name>.<table-name>" /
--conf spark.cdm.schema.origin.keyspaceTable="<keyspace-name>.<table-name>" /
--master "local[*]" /
--class com.datastax.cdm.job.DiffPartitionsFromFile cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```

When running in above mode the tool assumes a `partitions.csv` file to be present in the current folder.

# Perform large-field Guardrail violation checks
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field) `--class datastax.astra.migrate.Guardrail` as shown below
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field) `--class com.datastax.cdm.job.GuardrailCheck` as shown below
```
./spark-submit --properties-file cdm.properties /
--conf spark.origin.keyspaceTable="<keyspace-name>.<table-name>" /
--conf spark.cdm.schema.origin.keyspaceTable="<keyspace-name>.<table-name>" /
--conf spark.cdm.feature.guardrail.colSizeInKB=10000 /
--master "local[*]" /
--class com.datastax.cdm.job.GuardrailCheck cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
Expand Down
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Release Notes
## [4.0.2] - 2023-06-16
- Capture failed partitions in a file for easier reruns
- Optimized mvn to reduce jar size
- Fixed bugs in docs

## [4.0.1] - 2023-06-08
- Fixes broken maven link in docker build process
Expand Down
22 changes: 9 additions & 13 deletions src/main/java/com/datastax/cdm/job/AbstractJobSession.java
Original file line number Diff line number Diff line change
@@ -1,33 +1,28 @@
package com.datastax.cdm.job;

import com.datastax.cdm.cql.EnhancedSession;
import com.datastax.cdm.data.PKFactory;
import com.datastax.cdm.feature.Feature;
import com.datastax.cdm.feature.Featureset;
import com.datastax.cdm.feature.Guardrail;
import com.datastax.cdm.properties.KnownProperties;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
import com.datastax.cdm.data.PKFactory;
import com.datastax.cdm.feature.Feature;
import com.datastax.cdm.properties.KnownProperties;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJobSession<T> extends BaseJobSession {

public abstract void processSlice(T slice);
public abstract void printCounts(boolean isFinal);

public Logger logger = LoggerFactory.getLogger(this.getClass().getName());

protected EnhancedSession originSession;
protected EnhancedSession targetSession;
protected Guardrail guardrailFeature;
protected boolean guardrailEnabled;

protected String tokenRangeExceptionDir;
protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
this(originSession, targetSession, sc, false);
}

protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc, boolean isJobMigrateRowsFromFile) {
super(sc);

Expand All @@ -37,7 +32,7 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,

printStatsAfter = propertyHelper.getInteger(KnownProperties.PRINT_STATS_AFTER);
if (!propertyHelper.meetsMinimum(KnownProperties.PRINT_STATS_AFTER, printStatsAfter, 1)) {
logger.warn(KnownProperties.PRINT_STATS_AFTER +" must be greater than 0. Setting to default value of " + KnownProperties.getDefaultAsString(KnownProperties.PRINT_STATS_AFTER));
logger.warn(KnownProperties.PRINT_STATS_AFTER + " must be greater than 0. Setting to default value of " + KnownProperties.getDefaultAsString(KnownProperties.PRINT_STATS_AFTER));
propertyHelper.setProperty(KnownProperties.PRINT_STATS_AFTER, KnownProperties.getDefault(KnownProperties.PRINT_STATS_AFTER));
printStatsAfter = propertyHelper.getInteger(KnownProperties.PRINT_STATS_AFTER);
}
Expand All @@ -47,11 +42,9 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
maxRetries = propertyHelper.getInteger(KnownProperties.MAX_RETRIES);

tokenRangeExceptionDir = propertyHelper.getString(KnownProperties.TOKEN_RANGE_EXCEPTION_DIR);
exceptionFileName = propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE);

logger.info("PARAM -- Max Retries: {}", maxRetries);
logger.info("PARAM -- Token range exception dir: {}", tokenRangeExceptionDir);
logger.info("PARAM -- Token range exception file name: {}", exceptionFileName);
logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());

Expand Down Expand Up @@ -80,6 +73,9 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
// Guardrail is referenced by many jobs, and is evaluated against the target table
this.guardrailFeature = (Guardrail) this.targetSession.getCqlTable().getFeature(Featureset.GUARDRAIL_CHECK);
this.guardrailEnabled = this.guardrailFeature.isEnabled();

}

public abstract void processSlice(T slice);

public abstract void printCounts(boolean isFinal);
}
48 changes: 37 additions & 11 deletions src/main/java/com/datastax/cdm/job/BaseJobSession.java
Original file line number Diff line number Diff line change
@@ -1,37 +1,38 @@
package com.datastax.cdm.job;

import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
import com.datastax.cdm.feature.Feature;
import com.datastax.cdm.feature.FeatureFactory;
import com.datastax.cdm.feature.Featureset;
import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;

public abstract class BaseJobSession {

public static final String THREAD_CONTEXT_LABEL = "ThreadLabel";
protected static final String NEW_LINE = System.lineSeparator();
protected PropertyHelper propertyHelper = PropertyHelper.getInstance();
protected Map<Featureset, Feature> featureMap;

// Read/Write Rate limiter
// Determine the total throughput for the entire cluster in terms of wries/sec,
// reads/sec
// then do the following to set the values as they are only applicable per JVM
// (hence spark Executor)...
// Rate = Total Throughput (write/read per sec) / Total Executors
protected RateLimiter rateLimiterOrigin;
protected RateLimiter rateLimiterTarget;
protected Integer maxRetries = 10;

protected Integer printStatsAfter = 100000;

protected String tokenRangeExceptionDir;
protected String exceptionFileName;
private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());

protected BaseJobSession(SparkConf sc) {
propertyHelper.initializeSparkConf(sc);
Expand Down Expand Up @@ -67,4 +68,29 @@ protected String getThreadLabel(BigInteger min, BigInteger max) {
return formattedMin + ":" + formattedMax;
}

private void appendToFile(Path path, String content)
throws IOException {
// if file not exists, create and write, else append
Files.write(path, content.getBytes(StandardCharsets.UTF_8),
StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
}

private void FileAppend(String dir, String fileName, String content) throws IOException {
if (StringUtils.isAllBlank(dir)) {
dir = "./"; // use current folder by default
}
Files.createDirectories(Paths.get(dir));
Path path = Paths.get(dir + "/" + fileName + "_partitions.csv");
appendToFile(path, content + NEW_LINE);
}

protected void logFailedPartitionsInFile(String dir, String fileName, BigInteger min, BigInteger max) {
try {
FileAppend(dir, fileName, min + "," + max);
} catch (Exception ee) {
logger.error("Error occurred while writing to token range file min: {} max: {}", min, max, ee);
}
}

}
30 changes: 16 additions & 14 deletions src/main/java/com/datastax/cdm/job/CopyJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,37 @@
import com.datastax.cdm.cql.statement.OriginSelectByPartitionRangeStatement;
import com.datastax.cdm.cql.statement.TargetSelectByPKStatement;
import com.datastax.cdm.cql.statement.TargetUpsertStatement;
import com.datastax.cdm.data.PKFactory;
import com.datastax.cdm.data.Record;
import com.datastax.cdm.feature.Guardrail;
import com.datastax.cdm.properties.KnownProperties;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.cdm.data.PKFactory;
import com.datastax.cdm.data.Record;
import org.apache.logging.log4j.ThreadContext;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigInteger;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;

public class CopyJobSession extends AbstractJobSession<SplitPartitions.Partition> {

private static CopyJobSession copyJobSession;
private final PKFactory pkFactory;
private final boolean isCounterTable;
private final Integer fetchSize;
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
protected AtomicLong readCounter = new AtomicLong(0);
protected AtomicLong skippedCounter = new AtomicLong(0);
protected AtomicLong writeCounter = new AtomicLong(0);
protected AtomicLong errorCounter = new AtomicLong(0);

private TargetUpsertStatement targetUpsertStatement;
private TargetSelectByPKStatement targetSelectByPKStatement;
private final PKFactory pkFactory;
private final boolean isCounterTable;
private Integer batchSize;
private final Integer fetchSize;
private final Integer batchSize;

protected CopyJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
super(originSession, targetSession, sc);
Expand All @@ -42,9 +43,9 @@ protected CopyJobSession(CqlSession originSession, CqlSession targetSession, Spa
fetchSize = this.originSession.getCqlTable().getFetchSizeInRows();
batchSize = this.originSession.getCqlTable().getBatchSize();

logger.info("CQL -- origin select: {}",this.originSession.getOriginSelectByPartitionRangeStatement().getCQL());
logger.info("CQL -- target select: {}",this.targetSession.getTargetSelectByPKStatement().getCQL());
logger.info("CQL -- target upsert: {}",this.targetSession.getTargetUpsertStatement().getCQL());
logger.info("CQL -- origin select: {}", this.originSession.getOriginSelectByPartitionRangeStatement().getCQL());
logger.info("CQL -- target select: {}", this.targetSession.getTargetSelectByPKStatement().getCQL());
logger.info("CQL -- target upsert: {}", this.targetSession.getTargetUpsertStatement().getCQL());
}

@Override
Expand All @@ -53,7 +54,7 @@ public void processSlice(SplitPartitions.Partition slice) {
}

public void getDataAndInsert(BigInteger min, BigInteger max) {
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min,max));
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED);
boolean done = false;
Expand Down Expand Up @@ -127,6 +128,8 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
writeCounter.addAndGet(flushedWriteCnt);
skippedCounter.addAndGet(skipCnt);
errorCounter.addAndGet(readCnt - flushedWriteCnt - skipCnt);
logFailedPartitionsInFile(tokenRangeExceptionDir,
propertyHelper.getString(KnownProperties.ORIGIN_KEYSPACE_TABLE), min, max);
}
logger.error("Error occurred during Attempt#: {}", attempts, e);
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {} -- Attempt# {}",
Expand Down Expand Up @@ -182,8 +185,7 @@ private BatchStatement writeAsync(BatchStatement batch, Collection<CompletionSta
return BatchStatement.newInstance(BatchType.UNLOGGED);
}
return batch;
}
else {
} else {
writeResults.add(targetUpsertStatement.executeAsync(boundUpsert));
return batch;
}
Expand Down
Loading

0 comments on commit ffcc115

Please sign in to comment.