diff --git a/RELEASE.md b/RELEASE.md index c5e5a287..74e10506 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,7 +1,10 @@ # Release Notes +## [4.3.10] - 2024-09-12 +- Added property `spark.cdm.trackRun.runId` to support a custom unique identifier for the current run. This can be used by wrapper scripts to pass a known `runId` and then use it to query the `cdm_run_info` and `cdm_run_details` tables. + ## [4.3.9] - 2024-09-11 - Added new `status` value of `DIFF_CORRECTED` on `cdm_run_details` table to specifically mark partitions that were corrected during the CDM validation run. -- Upgraded Validation job skip partitions with `DIFF_CORRECTED` status on rerun with a previous `runId`. +- Upgraded Validation job to skip partitions with `DIFF_CORRECTED` status on rerun with a previous `runId`. ## [4.3.8] - 2024-09-09 - Upgraded `spark.cdm.trackRun` feature to include `status` on `cdm_run_info` table. Also improved the code to handle rerun of previous run which may have exited before being correctly initialized. diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java index 79d29fe4..639a1333 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java @@ -37,8 +37,6 @@ public class TargetUpsertRunDetailsStatement { private CqlSession session; private String keyspaceName; private String tableName; - private long runId; - private long prevRunId; private BoundStatement boundInitInfoStatement; private BoundStatement boundInitStatement; private BoundStatement boundEndInfoStatement; @@ -60,16 +58,18 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable) String cdmKsTabInfo = this.keyspaceName + ".cdm_run_info"; String cdmKsTabDetails = this.keyspaceName + ".cdm_run_details"; - this.session.execute("create table if not exists " + cdmKsTabInfo - + " (table_name text, run_id bigint, run_type text, prev_run_id bigint, start_time timestamp, end_time timestamp, run_info text, status text, primary key (table_name, run_id))"); + this.session.execute("CREATE TABLE IF NOT EXISTS " + cdmKsTabInfo + + " (table_name TEXT, run_id BIGINT, run_type TEXT, prev_run_id BIGINT, start_time TIMESTAMP, end_time TIMESTAMP, run_info TEXT, status TEXT, PRIMARY KEY (table_name, run_id))"); // TODO: Remove this code block after a few releases, its only added for backward compatibility try { - this.session.execute("alter table " + cdmKsTabInfo + " add status text"); + this.session.execute("ALTER TABLE " + cdmKsTabInfo + " ADD status TEXT"); } catch (Exception e) { // ignore if column already exists logger.trace("Column 'status' already exists in table {}", cdmKsTabInfo); } + this.session.execute("CREATE TABLE IF NOT EXISTS " + cdmKsTabDetails + + " (table_name TEXT, run_id BIGINT, start_time TIMESTAMP, token_min BIGINT, token_max BIGINT, status TEXT, PRIMARY KEY ((table_name, run_id), token_min))"); boundInitInfoStatement = bindStatement("INSERT INTO " + cdmKsTabInfo + " (table_name, run_id, run_type, prev_run_id, start_time, status) VALUES (?, ?, ?, ?, dateof(now()), ?)"); @@ -88,7 +88,6 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable) } public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { - this.prevRunId = prevRunId; final Collection pendingParts = new ArrayList(); if (prevRunId == 0) { return pendingParts; @@ -117,31 +116,34 @@ public Collection getPendingPartitions(long prevRunId return pendingParts; } - public long initCdmRun(Collection parts, RUN_TYPE runType) { - runId = System.currentTimeMillis(); + public void initCdmRun(long runId, long prevRunId, Collection parts, RUN_TYPE runType) { + ResultSet rsInfo = session + .execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", runId)); + if (null != rsInfo.one()) { + throw new RuntimeException("Run id " + runId + " already exists for table " + tableName); + } session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId) .setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId) .setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString())); - parts.forEach(part -> initCdmRun(part)); + parts.forEach(part -> initCdmRun(runId, part)); session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId) .setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId) .setString("status", TrackRun.RUN_STATUS.STARTED.toString())); - return runId; } - private void initCdmRun(Partition partition) { + private void initCdmRun(long runId, Partition partition) { session.execute(boundInitStatement.setString("table_name", tableName).setLong("run_id", runId) .setLong("token_min", partition.getMin().longValue()) .setLong("token_max", partition.getMax().longValue()) .setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString())); } - public void endCdmRun(String runInfo) { + public void endCdmRun(long runId, String runInfo) { session.execute(boundEndInfoStatement.setString("table_name", tableName).setLong("run_id", runId) .setString("run_info", runInfo).setString("status", TrackRun.RUN_STATUS.ENDED.toString())); } - public void updateCdmRun(BigInteger min, TrackRun.RUN_STATUS status) { + public void updateCdmRun(long runId, BigInteger min, TrackRun.RUN_STATUS status) { if (TrackRun.RUN_STATUS.STARTED.equals(status)) { session.execute(boundUpdateStartStatement.setString("table_name", tableName).setLong("run_id", runId) .setLong("token_min", min.longValue()).setString("status", status.toString())); diff --git a/src/main/java/com/datastax/cdm/feature/TrackRun.java b/src/main/java/com/datastax/cdm/feature/TrackRun.java index a75cd0a2..92b1784f 100644 --- a/src/main/java/com/datastax/cdm/feature/TrackRun.java +++ b/src/main/java/com/datastax/cdm/feature/TrackRun.java @@ -49,18 +49,16 @@ public Collection getPendingPartitions(long prevRunId return pendingParts; } - public long initCdmRun(Collection parts, RUN_TYPE runType) { - long runId = runStatement.initCdmRun(parts, runType); + public void initCdmRun(long runId, long prevRunId, Collection parts, RUN_TYPE runType) { + runStatement.initCdmRun(runId, prevRunId, parts, runType); logger.info("###################### Run Id for this job is: {} ######################", runId); - - return runId; } - public void updateCdmRun(BigInteger min, RUN_STATUS status) { - runStatement.updateCdmRun(min, status); + public void updateCdmRun(long runId, BigInteger min, RUN_STATUS status) { + runStatement.updateCdmRun(runId, min, status); } - public void endCdmRun(String runInfo) { - runStatement.endCdmRun(runInfo); + public void endCdmRun(long runId, String runInfo) { + runStatement.endCdmRun(runId, runInfo); } } diff --git a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java index 86c630f4..8d783ce2 100644 --- a/src/main/java/com/datastax/cdm/job/AbstractJobSession.java +++ b/src/main/java/com/datastax/cdm/job/AbstractJobSession.java @@ -41,6 +41,7 @@ public abstract class AbstractJobSession extends BaseJobSession { protected JobCounter jobCounter; protected Long printStatsAfter; protected TrackRun trackRunFeature; + protected long runId; protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) { this(originSession, targetSession, sc, false); @@ -102,12 +103,17 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, public abstract void processSlice(T slice); - public synchronized void initCdmRun(Collection parts, TrackRun trackRunFeature) { + public synchronized void initCdmRun(long runId, long prevRunId, Collection parts, + TrackRun trackRunFeature, TrackRun.RUN_TYPE runType) { + this.runId = runId; + this.trackRunFeature = trackRunFeature; + if (null != trackRunFeature) + trackRunFeature.initCdmRun(runId, prevRunId, parts, runType); } public synchronized void printCounts(boolean isFinal) { if (isFinal) { - jobCounter.printFinal(trackRunFeature); + jobCounter.printFinal(runId, trackRunFeature); } else { jobCounter.printProgress(); } diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSession.java b/src/main/java/com/datastax/cdm/job/CopyJobSession.java index 67ab5fd3..506b71e5 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSession.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSession.java @@ -70,17 +70,11 @@ public void processSlice(SplitPartitions.Partition slice) { this.getDataAndInsert(slice.getMin(), slice.getMax()); } - public synchronized void initCdmRun(Collection parts, TrackRun trackRunFeature) { - this.trackRunFeature = trackRunFeature; - if (null != trackRunFeature) - trackRunFeature.initCdmRun(parts, TrackRun.RUN_TYPE.MIGRATE); - } - private void getDataAndInsert(BigInteger min, BigInteger max) { ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max)); logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max); if (null != trackRunFeature) - trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.STARTED); + trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.STARTED); BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED); String guardrailCheck; @@ -139,13 +133,13 @@ private void getDataAndInsert(BigInteger min, BigInteger max) { jobCounter.getCount(JobCounter.CounterType.UNFLUSHED)); jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED); if (null != trackRunFeature) - trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.PASS); + trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS); } catch (Exception e) { jobCounter.threadIncrement(JobCounter.CounterType.ERROR, jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE) - jobCounter.getCount(JobCounter.CounterType.SKIPPED)); if (null != trackRunFeature) - trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.FAIL); + trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL); logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max, e); logger.error("Error stats " + jobCounter.getThreadCounters(false)); diff --git a/src/main/java/com/datastax/cdm/job/DiffJobSession.java b/src/main/java/com/datastax/cdm/job/DiffJobSession.java index c01b97af..21d44605 100644 --- a/src/main/java/com/datastax/cdm/job/DiffJobSession.java +++ b/src/main/java/com/datastax/cdm/job/DiffJobSession.java @@ -17,7 +17,6 @@ import java.math.BigInteger; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletionStage; @@ -124,18 +123,11 @@ public void processSlice(SplitPartitions.Partition slice) { this.getDataAndDiff(slice.getMin(), slice.getMax()); } - @Override - public synchronized void initCdmRun(Collection parts, TrackRun trackRunFeature) { - this.trackRunFeature = trackRunFeature; - if (null != trackRunFeature) - trackRunFeature.initCdmRun(parts, TrackRun.RUN_TYPE.DIFF_DATA); - } - private void getDataAndDiff(BigInteger min, BigInteger max) { ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max)); logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max); if (null != trackRunFeature) - trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.STARTED); + trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.STARTED); AtomicBoolean hasDiff = new AtomicBoolean(false); try { @@ -196,12 +188,12 @@ private void getDataAndDiff(BigInteger min, BigInteger max) { .getCount(JobCounter.CounterType.CORRECTED_MISSING) && jobCounter.getCount(JobCounter.CounterType.MISMATCH) == jobCounter .getCount(JobCounter.CounterType.CORRECTED_MISMATCH)) { - trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF_CORRECTED); + trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.DIFF_CORRECTED); } else { - trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF); + trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.DIFF); } } else if (null != trackRunFeature) { - trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.PASS); + trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS); } } catch (Exception e) { jobCounter.threadIncrement(JobCounter.CounterType.ERROR, @@ -212,7 +204,7 @@ private void getDataAndDiff(BigInteger min, BigInteger max) { logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max, e); if (null != trackRunFeature) - trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.FAIL); + trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL); } finally { jobCounter.globalIncrement(); printCounts(false); diff --git a/src/main/java/com/datastax/cdm/job/JobCounter.java b/src/main/java/com/datastax/cdm/job/JobCounter.java index 59d595a6..3610fd16 100644 --- a/src/main/java/com/datastax/cdm/job/JobCounter.java +++ b/src/main/java/com/datastax/cdm/job/JobCounter.java @@ -178,7 +178,7 @@ protected void printAndLogProgress(String message, boolean global) { logger.info(fullMessage); } - public void printFinal(TrackRun trackRunFeature) { + public void printFinal(long runId, TrackRun trackRunFeature) { if (null != trackRunFeature) { StringBuilder sb = new StringBuilder(); if (counterMap.containsKey(CounterType.READ)) @@ -202,7 +202,7 @@ public void printFinal(TrackRun trackRunFeature) { if (counterMap.containsKey(CounterType.LARGE)) sb.append("; Large: " + counterMap.get(CounterType.LARGE).getGlobalCounter()); - trackRunFeature.endCdmRun(sb.toString()); + trackRunFeature.endCdmRun(runId, sb.toString()); } logger.info("################################################################################################"); if (counterMap.containsKey(CounterType.READ)) diff --git a/src/main/java/com/datastax/cdm/properties/KnownProperties.java b/src/main/java/com/datastax/cdm/properties/KnownProperties.java index b0573c0f..c3bdd05d 100644 --- a/src/main/java/com/datastax/cdm/properties/KnownProperties.java +++ b/src/main/java/com/datastax/cdm/properties/KnownProperties.java @@ -109,6 +109,7 @@ public enum PropertyType { public static final String AUTOCORRECT_MISMATCH = "spark.cdm.autocorrect.mismatch"; // false public static final String AUTOCORRECT_MISSING_COUNTER = "spark.cdm.autocorrect.missing.counter"; // false public static final String TRACK_RUN = "spark.cdm.trackRun"; + public static final String RUN_ID = "spark.cdm.trackRun.runId"; public static final String PREV_RUN_ID = "spark.cdm.trackRun.previousRunId"; public static final String PERF_NUM_PARTS = "spark.cdm.perfops.numParts"; // 5000, was spark.splitSize @@ -131,6 +132,8 @@ public enum PropertyType { defaults.put(AUTOCORRECT_MISSING_COUNTER, "false"); types.put(TRACK_RUN, PropertyType.BOOLEAN); defaults.put(TRACK_RUN, "false"); + types.put(RUN_ID, PropertyType.NUMBER); + defaults.put(RUN_ID, "0"); types.put(PREV_RUN_ID, PropertyType.NUMBER); defaults.put(PREV_RUN_ID, "0"); diff --git a/src/main/scala/com/datastax/cdm/job/BaseJob.scala b/src/main/scala/com/datastax/cdm/job/BaseJob.scala index b1143801..0cb3a360 100644 --- a/src/main/scala/com/datastax/cdm/job/BaseJob.scala +++ b/src/main/scala/com/datastax/cdm/job/BaseJob.scala @@ -47,6 +47,7 @@ abstract class BaseJob[T: ClassTag] extends App { var coveragePercent: Int = _ var numSplits: Int = _ var trackRun: Boolean = _ + var runId: Long = _ var prevRunId: Long = _ var parts: util.Collection[T] = _ @@ -80,14 +81,21 @@ abstract class BaseJob[T: ClassTag] extends App { maxPartition = getMaxPartition(propertyHelper.getString(KnownProperties.PARTITION_MAX), hasRandomPartitioner) coveragePercent = propertyHelper.getInteger(KnownProperties.TOKEN_COVERAGE_PERCENT) numSplits = propertyHelper.getInteger(KnownProperties.PERF_NUM_PARTS) + runId = propertyHelper.getLong(KnownProperties.RUN_ID) prevRunId = propertyHelper.getLong(KnownProperties.PREV_RUN_ID) - trackRun = if (0 != prevRunId) true else propertyHelper.getBoolean(KnownProperties.TRACK_RUN) + trackRun = if (0 != prevRunId || 0 != runId) true else propertyHelper.getBoolean(KnownProperties.TRACK_RUN) + if (trackRun == true && runId == 0) { + runId = System.nanoTime(); + } abstractLogger.info("PARAM -- Min Partition: " + minPartition) abstractLogger.info("PARAM -- Max Partition: " + maxPartition) abstractLogger.info("PARAM -- Number of Splits : " + numSplits) abstractLogger.info("PARAM -- Track Run : " + trackRun) - abstractLogger.info("PARAM -- Previous RunId : " + prevRunId) + if (trackRun == true) { + abstractLogger.info("PARAM -- RunId : " + runId) + abstractLogger.info("PARAM -- Previous RunId : " + prevRunId) + } abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent) this.parts = getParts(numSplits) abstractLogger.info("PARAM Calculated -- Total Partitions: " + parts.size()) diff --git a/src/main/scala/com/datastax/cdm/job/DiffData.scala b/src/main/scala/com/datastax/cdm/job/DiffData.scala index b7d61b5b..caaa3cfa 100644 --- a/src/main/scala/com/datastax/cdm/job/DiffData.scala +++ b/src/main/scala/com/datastax/cdm/job/DiffData.scala @@ -15,6 +15,8 @@ */ package com.datastax.cdm.job +import com.datastax.cdm.feature.TrackRun + object DiffData extends BasePartitionJob { setup("Data Validation Job", new DiffJobSessionFactory()) execute() @@ -24,7 +26,7 @@ object DiffData extends BasePartitionJob { if (!parts.isEmpty()) { originConnection.withSessionDo(originSession => targetConnection.withSessionDo(targetSession => - jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(parts, trackRunFeature))); + jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(runId, prevRunId, parts, trackRunFeature, TrackRun.RUN_TYPE.DIFF_DATA))); slices.foreach(slice => { originConnection.withSessionDo(originSession => diff --git a/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala b/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala index 80a065e4..6d439cd2 100644 --- a/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala +++ b/src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala @@ -22,10 +22,6 @@ object GuardrailCheck extends BasePartitionJob { protected def execute(): Unit = { if (!parts.isEmpty()) { - originConnection.withSessionDo(originSession => - targetConnection.withSessionDo(targetSession => - jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(parts, trackRunFeature))); - slices.foreach(slice => { originConnection.withSessionDo(originSession => targetConnection.withSessionDo(targetSession => diff --git a/src/main/scala/com/datastax/cdm/job/Migrate.scala b/src/main/scala/com/datastax/cdm/job/Migrate.scala index c87fd1ff..e88709e2 100644 --- a/src/main/scala/com/datastax/cdm/job/Migrate.scala +++ b/src/main/scala/com/datastax/cdm/job/Migrate.scala @@ -15,6 +15,8 @@ */ package com.datastax.cdm.job +import com.datastax.cdm.feature.TrackRun + object Migrate extends BasePartitionJob { setup("Migrate Job", new CopyJobSessionFactory()) execute() @@ -24,7 +26,7 @@ object Migrate extends BasePartitionJob { if (!parts.isEmpty()) { originConnection.withSessionDo(originSession => targetConnection.withSessionDo(targetSession => - jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(parts, trackRunFeature))); + jobFactory.getInstance(originSession, targetSession, sc).initCdmRun(runId, prevRunId, parts, trackRunFeature, TrackRun.RUN_TYPE.MIGRATE))); slices.foreach(slice => { originConnection.withSessionDo(originSession => diff --git a/src/resources/cdm-detailed.properties b/src/resources/cdm-detailed.properties index 74208bb0..500f579b 100644 --- a/src/resources/cdm-detailed.properties +++ b/src/resources/cdm-detailed.properties @@ -173,9 +173,15 @@ spark.cdm.autocorrect.missing.counter false # successful (`status != PASS`). A token-range may not be marked as 'PASS' for several # reasons including if the job get killed, or some token-ranges fail due to load on # the cluster (origin or target) or any other reasons. +# .runId : Default is an auto generated unique long value. When a non-zero value is provided, +# it will be used as a custom and unique identifier for the current run. Note the value +# of this id must be numeric and can be any java `long` unique value. This can be used +# by wrapper scripts to pass a known `runId` and then use it to query the +# `cdm_run_info` and `cdm_run_details` tables. #----------------------------------------------------------------------------------------------------------- spark.cdm.trackRun false spark.cdm.trackRun.previousRunId 0 +spark.cdm.trackRun.runId #=========================================================================================================== # Performance and Operations Parameters affecting throughput and similar concerns. diff --git a/src/test/java/com/datastax/cdm/job/JobCounterTest.java b/src/test/java/com/datastax/cdm/job/JobCounterTest.java index 49280d8b..9e36a5bc 100644 --- a/src/test/java/com/datastax/cdm/job/JobCounterTest.java +++ b/src/test/java/com/datastax/cdm/job/JobCounterTest.java @@ -101,9 +101,12 @@ public void testPrintFinal() { jobCounter.threadIncrement(JobCounter.CounterType.READ, 5); jobCounter.globalIncrement(); // You may use mocking to capture logger outputs - jobCounter.printFinal(null); + jobCounter.printFinal(0, null); } + @Captor + private ArgumentCaptor trackRunInfoCaptorLong; + @Captor private ArgumentCaptor trackRunInfoCaptor; @@ -116,8 +119,8 @@ public void testPrintFinalWithRunTracking() { jobCounter.threadIncrement(JobCounter.CounterType.LARGE, 42); jobCounter.globalIncrement(); // You may use mocking to capture logger outputs - jobCounter.printFinal(trackRun); - Mockito.verify(trackRun).endCdmRun(trackRunInfoCaptor.capture()); + jobCounter.printFinal(0, trackRun); + Mockito.verify(trackRun).endCdmRun(trackRunInfoCaptorLong.capture(), trackRunInfoCaptor.capture()); assertEquals(expected, trackRunInfoCaptor.getValue()); }