Skip to content

Commit

Permalink
Implemented support for custom runId
Browse files Browse the repository at this point in the history
  • Loading branch information
pravinbhat committed Sep 11, 2024
1 parent 6ddc534 commit 7cfe5d5
Show file tree
Hide file tree
Showing 14 changed files with 70 additions and 55 deletions.
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Release Notes
## [4.3.10] - 2024-09-11
- Added property `spark.cdm.trackRun.runId` to support a custom unique identifier for the current run. This can be used by wrapper scripts to pass a known `runId` and then use it to query the `cdm_run_info` and `cdm_run_details` tables.

## [4.3.9] - 2024-09-11
- Added new `status` value of `DIFF_CORRECTED` on `cdm_run_details` table to specifically mark partitions that were corrected during the CDM validation run.
- Upgraded Validation job skip partitions with `DIFF_CORRECTED` status on rerun with a previous `runId`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public class TargetUpsertRunDetailsStatement {
private CqlSession session;
private String keyspaceName;
private String tableName;
private long runId;
private long prevRunId;
private BoundStatement boundInitInfoStatement;
private BoundStatement boundInitStatement;
private BoundStatement boundEndInfoStatement;
Expand Down Expand Up @@ -70,6 +68,8 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
// ignore if column already exists
logger.trace("Column 'status' already exists in table {}", cdmKsTabInfo);
}
this.session.execute("create table if not exists " + cdmKsTabDetails
+ " (table_name text, run_id bigint, start_time timestamp, token_min bigint, token_max bigint, status text, primary key ((table_name, run_id), token_min))");

boundInitInfoStatement = bindStatement("INSERT INTO " + cdmKsTabInfo
+ " (table_name, run_id, run_type, prev_run_id, start_time, status) VALUES (?, ?, ?, ?, dateof(now()), ?)");
Expand All @@ -88,7 +88,6 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
}

public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId) throws RunNotStartedException {
this.prevRunId = prevRunId;
final Collection<SplitPartitions.Partition> pendingParts = new ArrayList<SplitPartitions.Partition>();
if (prevRunId == 0) {
return pendingParts;
Expand Down Expand Up @@ -117,31 +116,34 @@ public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId
return pendingParts;
}

public long initCdmRun(Collection<SplitPartitions.Partition> parts, RUN_TYPE runType) {
runId = System.currentTimeMillis();
public void initCdmRun(long runId, long prevRunId, Collection<SplitPartitions.Partition> parts, RUN_TYPE runType) {
ResultSet rsInfo = session
.execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", runId));
if (null != rsInfo.one()) {
throw new RuntimeException("Run id " + runId + " already exists for table " + tableName);
}
session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
.setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId)
.setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString()));
parts.forEach(part -> initCdmRun(part));
parts.forEach(part -> initCdmRun(runId, part));
session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
.setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId)
.setString("status", TrackRun.RUN_STATUS.STARTED.toString()));
return runId;
}

private void initCdmRun(Partition partition) {
private void initCdmRun(long runId, Partition partition) {
session.execute(boundInitStatement.setString("table_name", tableName).setLong("run_id", runId)
.setLong("token_min", partition.getMin().longValue())
.setLong("token_max", partition.getMax().longValue())
.setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString()));
}

public void endCdmRun(String runInfo) {
public void endCdmRun(long runId, String runInfo) {
session.execute(boundEndInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
.setString("run_info", runInfo).setString("status", TrackRun.RUN_STATUS.ENDED.toString()));
}

public void updateCdmRun(BigInteger min, TrackRun.RUN_STATUS status) {
public void updateCdmRun(long runId, BigInteger min, TrackRun.RUN_STATUS status) {
if (TrackRun.RUN_STATUS.STARTED.equals(status)) {
session.execute(boundUpdateStartStatement.setString("table_name", tableName).setLong("run_id", runId)
.setLong("token_min", min.longValue()).setString("status", status.toString()));
Expand Down
14 changes: 6 additions & 8 deletions src/main/java/com/datastax/cdm/feature/TrackRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,16 @@ public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId
return pendingParts;
}

public long initCdmRun(Collection<SplitPartitions.Partition> parts, RUN_TYPE runType) {
long runId = runStatement.initCdmRun(parts, runType);
public void initCdmRun(long runId, long prevRunId, Collection<SplitPartitions.Partition> parts, RUN_TYPE runType) {
runStatement.initCdmRun(runId, prevRunId, parts, runType);
logger.info("###################### Run Id for this job is: {} ######################", runId);

return runId;
}

public void updateCdmRun(BigInteger min, RUN_STATUS status) {
runStatement.updateCdmRun(min, status);
public void updateCdmRun(long runId, BigInteger min, RUN_STATUS status) {
runStatement.updateCdmRun(runId, min, status);
}

public void endCdmRun(String runInfo) {
runStatement.endCdmRun(runInfo);
public void endCdmRun(long runId, String runInfo) {
runStatement.endCdmRun(runId, runInfo);
}
}
10 changes: 8 additions & 2 deletions src/main/java/com/datastax/cdm/job/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract class AbstractJobSession<T> extends BaseJobSession {
protected JobCounter jobCounter;
protected Long printStatsAfter;
protected TrackRun trackRunFeature;
protected long runId;

protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
this(originSession, targetSession, sc, false);
Expand Down Expand Up @@ -102,12 +103,17 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,

public abstract void processSlice(T slice);

public synchronized void initCdmRun(Collection<SplitPartitions.Partition> parts, TrackRun trackRunFeature) {
public synchronized void initCdmRun(long runId, long prevRunId, Collection<SplitPartitions.Partition> parts,
TrackRun trackRunFeature, TrackRun.RUN_TYPE runType) {
this.runId = runId;
this.trackRunFeature = trackRunFeature;
if (null != trackRunFeature)
trackRunFeature.initCdmRun(runId, prevRunId, parts, runType);
}

public synchronized void printCounts(boolean isFinal) {
if (isFinal) {
jobCounter.printFinal(trackRunFeature);
jobCounter.printFinal(runId, trackRunFeature);
} else {
jobCounter.printProgress();
}
Expand Down
12 changes: 3 additions & 9 deletions src/main/java/com/datastax/cdm/job/CopyJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,11 @@ public void processSlice(SplitPartitions.Partition slice) {
this.getDataAndInsert(slice.getMin(), slice.getMax());
}

public synchronized void initCdmRun(Collection<SplitPartitions.Partition> parts, TrackRun trackRunFeature) {
this.trackRunFeature = trackRunFeature;
if (null != trackRunFeature)
trackRunFeature.initCdmRun(parts, TrackRun.RUN_TYPE.MIGRATE);
}

private void getDataAndInsert(BigInteger min, BigInteger max) {
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.STARTED);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.STARTED);

BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED);
String guardrailCheck;
Expand Down Expand Up @@ -139,13 +133,13 @@ private void getDataAndInsert(BigInteger min, BigInteger max) {
jobCounter.getCount(JobCounter.CounterType.UNFLUSHED));
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.PASS);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS);
} catch (Exception e) {
jobCounter.threadIncrement(JobCounter.CounterType.ERROR,
jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE)
- jobCounter.getCount(JobCounter.CounterType.SKIPPED));
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.FAIL);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL);
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}",
Thread.currentThread().getId(), min, max, e);
logger.error("Error stats " + jobCounter.getThreadCounters(false));
Expand Down
18 changes: 5 additions & 13 deletions src/main/java/com/datastax/cdm/job/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -124,18 +123,11 @@ public void processSlice(SplitPartitions.Partition slice) {
this.getDataAndDiff(slice.getMin(), slice.getMax());
}

@Override
public synchronized void initCdmRun(Collection<SplitPartitions.Partition> parts, TrackRun trackRunFeature) {
this.trackRunFeature = trackRunFeature;
if (null != trackRunFeature)
trackRunFeature.initCdmRun(parts, TrackRun.RUN_TYPE.DIFF_DATA);
}

private void getDataAndDiff(BigInteger min, BigInteger max) {
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.STARTED);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.STARTED);

AtomicBoolean hasDiff = new AtomicBoolean(false);
try {
Expand Down Expand Up @@ -196,12 +188,12 @@ private void getDataAndDiff(BigInteger min, BigInteger max) {
.getCount(JobCounter.CounterType.CORRECTED_MISSING)
&& jobCounter.getCount(JobCounter.CounterType.MISMATCH) == jobCounter
.getCount(JobCounter.CounterType.CORRECTED_MISMATCH)) {
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF_CORRECTED);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.DIFF_CORRECTED);
} else {
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.DIFF);
}
} else if (null != trackRunFeature) {
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.PASS);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS);
}
} catch (Exception e) {
jobCounter.threadIncrement(JobCounter.CounterType.ERROR,
Expand All @@ -212,7 +204,7 @@ private void getDataAndDiff(BigInteger min, BigInteger max) {
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}",
Thread.currentThread().getId(), min, max, e);
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.FAIL);
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL);
} finally {
jobCounter.globalIncrement();
printCounts(false);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/datastax/cdm/job/JobCounter.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ protected void printAndLogProgress(String message, boolean global) {
logger.info(fullMessage);
}

public void printFinal(TrackRun trackRunFeature) {
public void printFinal(long runId, TrackRun trackRunFeature) {
if (null != trackRunFeature) {
StringBuilder sb = new StringBuilder();
if (counterMap.containsKey(CounterType.READ))
Expand All @@ -202,7 +202,7 @@ public void printFinal(TrackRun trackRunFeature) {
if (counterMap.containsKey(CounterType.LARGE))
sb.append("; Large: " + counterMap.get(CounterType.LARGE).getGlobalCounter());

trackRunFeature.endCdmRun(sb.toString());
trackRunFeature.endCdmRun(runId, sb.toString());
}
logger.info("################################################################################################");
if (counterMap.containsKey(CounterType.READ))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public enum PropertyType {
public static final String AUTOCORRECT_MISMATCH = "spark.cdm.autocorrect.mismatch"; // false
public static final String AUTOCORRECT_MISSING_COUNTER = "spark.cdm.autocorrect.missing.counter"; // false
public static final String TRACK_RUN = "spark.cdm.trackRun";
public static final String RUN_ID = "spark.cdm.trackRun.runId";
public static final String PREV_RUN_ID = "spark.cdm.trackRun.previousRunId";

public static final String PERF_NUM_PARTS = "spark.cdm.perfops.numParts"; // 5000, was spark.splitSize
Expand All @@ -131,6 +132,8 @@ public enum PropertyType {
defaults.put(AUTOCORRECT_MISSING_COUNTER, "false");
types.put(TRACK_RUN, PropertyType.BOOLEAN);
defaults.put(TRACK_RUN, "false");
types.put(RUN_ID, PropertyType.NUMBER);
defaults.put(RUN_ID, "0");
types.put(PREV_RUN_ID, PropertyType.NUMBER);
defaults.put(PREV_RUN_ID, "0");

Expand Down
12 changes: 10 additions & 2 deletions src/main/scala/com/datastax/cdm/job/BaseJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ abstract class BaseJob[T: ClassTag] extends App {
var coveragePercent: Int = _
var numSplits: Int = _
var trackRun: Boolean = _
var runId: Long = _
var prevRunId: Long = _

var parts: util.Collection[T] = _
Expand Down Expand Up @@ -80,14 +81,21 @@ abstract class BaseJob[T: ClassTag] extends App {
maxPartition = getMaxPartition(propertyHelper.getString(KnownProperties.PARTITION_MAX), hasRandomPartitioner)
coveragePercent = propertyHelper.getInteger(KnownProperties.TOKEN_COVERAGE_PERCENT)
numSplits = propertyHelper.getInteger(KnownProperties.PERF_NUM_PARTS)
runId = propertyHelper.getLong(KnownProperties.RUN_ID)
prevRunId = propertyHelper.getLong(KnownProperties.PREV_RUN_ID)
trackRun = if (0 != prevRunId) true else propertyHelper.getBoolean(KnownProperties.TRACK_RUN)
trackRun = if (0 != prevRunId || 0 != runId) true else propertyHelper.getBoolean(KnownProperties.TRACK_RUN)
if (trackRun == true && runId == 0) {
runId = System.nanoTime();
}

abstractLogger.info("PARAM -- Min Partition: " + minPartition)
abstractLogger.info("PARAM -- Max Partition: " + maxPartition)
abstractLogger.info("PARAM -- Number of Splits : " + numSplits)
abstractLogger.info("PARAM -- Track Run : " + trackRun)
abstractLogger.info("PARAM -- Previous RunId : " + prevRunId)
if (trackRun == true) {
abstractLogger.info("PARAM -- RunId : " + runId)
abstractLogger.info("PARAM -- Previous RunId : " + prevRunId)
}
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)
this.parts = getParts(numSplits)
abstractLogger.info("PARAM Calculated -- Total Partitions: " + parts.size())
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/com/datastax/cdm/job/DiffData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.datastax.cdm.job

import com.datastax.cdm.feature.TrackRun

object DiffData extends BasePartitionJob {
setup("Data Validation Job", new DiffJobSessionFactory())
execute()
Expand All @@ -24,7 +26,7 @@ object DiffData extends BasePartitionJob {
if (!parts.isEmpty()) {
originConnection.withSessionDo(originSession =>
targetConnection.withSessionDo(targetSession =>
jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(parts, trackRunFeature)));
jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(runId, prevRunId, parts, trackRunFeature, TrackRun.RUN_TYPE.DIFF_DATA)));

slices.foreach(slice => {
originConnection.withSessionDo(originSession =>
Expand Down
4 changes: 0 additions & 4 deletions src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ object GuardrailCheck extends BasePartitionJob {

protected def execute(): Unit = {
if (!parts.isEmpty()) {
originConnection.withSessionDo(originSession =>
targetConnection.withSessionDo(targetSession =>
jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(parts, trackRunFeature)));

slices.foreach(slice => {
originConnection.withSessionDo(originSession =>
targetConnection.withSessionDo(targetSession =>
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/com/datastax/cdm/job/Migrate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.datastax.cdm.job

import com.datastax.cdm.feature.TrackRun

object Migrate extends BasePartitionJob {
setup("Migrate Job", new CopyJobSessionFactory())
execute()
Expand All @@ -24,7 +26,7 @@ object Migrate extends BasePartitionJob {
if (!parts.isEmpty()) {
originConnection.withSessionDo(originSession =>
targetConnection.withSessionDo(targetSession =>
jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(parts, trackRunFeature)));
jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(runId, prevRunId, parts, trackRunFeature, TrackRun.RUN_TYPE.MIGRATE)));

slices.foreach(slice => {
originConnection.withSessionDo(originSession =>
Expand Down
6 changes: 6 additions & 0 deletions src/resources/cdm-detailed.properties
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,15 @@ spark.cdm.autocorrect.missing.counter false
# successful (`status != PASS`). A token-range may not be marked as 'PASS' for several
# reasons including if the job get killed, or some token-ranges fail due to load on
# the cluster (origin or target) or any other reasons.
# .runId : Default is an auto generated unique long value. When a non-zero value is provided,
# it will be used a custom and unique identifier for the current run. Note the value
# of this id must be numeric and can be any java `long` unique value. This can be used
# by wrapper scripts to pass a known `runId` and then use it to query the
# `cdm_run_info` and `cdm_run_details` tables.
#-----------------------------------------------------------------------------------------------------------
spark.cdm.trackRun false
spark.cdm.trackRun.previousRunId 0
spark.cdm.trackRun.runId <auto-generated-unique-long-value>

#===========================================================================================================
# Performance and Operations Parameters affecting throughput and similar concerns.
Expand Down
9 changes: 6 additions & 3 deletions src/test/java/com/datastax/cdm/job/JobCounterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,12 @@ public void testPrintFinal() {
jobCounter.threadIncrement(JobCounter.CounterType.READ, 5);
jobCounter.globalIncrement();
// You may use mocking to capture logger outputs
jobCounter.printFinal(null);
jobCounter.printFinal(0, null);
}

@Captor
private ArgumentCaptor<Long> trackRunInfoCaptorLong;

@Captor
private ArgumentCaptor<String> trackRunInfoCaptor;

Expand All @@ -116,8 +119,8 @@ public void testPrintFinalWithRunTracking() {
jobCounter.threadIncrement(JobCounter.CounterType.LARGE, 42);
jobCounter.globalIncrement();
// You may use mocking to capture logger outputs
jobCounter.printFinal(trackRun);
Mockito.verify(trackRun).endCdmRun(trackRunInfoCaptor.capture());
jobCounter.printFinal(0, trackRun);
Mockito.verify(trackRun).endCdmRun(trackRunInfoCaptorLong.capture(), trackRunInfoCaptor.capture());
assertEquals(expected, trackRunInfoCaptor.getValue());
}

Expand Down

0 comments on commit 7cfe5d5

Please sign in to comment.