Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed connection issue caused when using different types of origin and target clusters #334

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Release Notes
## [5.1.3] - 2024-11-27
- Bug fix: Fixed connection issue caused when using different types of origin and target clusters (e.g. Cassandra/DSE with host/port and Astra with SCB).

## [5.1.2] - 2024-11-26
- Bug fix: SCB file on some Spark worker nodes may get deleted before the connection is established, which may cause connection exception on that worker node. Added a static async SCB delete delay to address such issues.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)
// TODO: Remove this code block after a few releases, its only added for backward compatibility
try {
this.session.execute("ALTER TABLE " + cdmKsTabInfo + " ADD status TEXT");
} catch (Exception e) { // ignore if column already exists
logger.debug("Column 'status' already exists in table {}", cdmKsTabInfo);
}
try {
this.session.execute("ALTER TABLE " + cdmKsTabDetails + " ADD run_info TEXT");
} catch (Exception e) {
// ignore if column already exists
logger.trace("Column 'status' already exists in table {}", cdmKsTabInfo);
} catch (Exception e) { // ignore if column already exists
logger.debug("Column 'run_info' already exists in table {}", cdmKsTabDetails);
}

boundInitInfoStatement = bindStatement("INSERT INTO " + cdmKsTabInfo
Expand Down
12 changes: 5 additions & 7 deletions src/main/scala/com/datastax/cdm/job/BaseJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package com.datastax.cdm.job

import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
Expand All @@ -40,7 +40,6 @@ abstract class BaseJob[T: ClassTag] extends App {

var spark: SparkSession = _
var sContext: SparkContext = _
var sc: SparkConf = _
var propertyHelper: PropertyHelper = _

var consistencyLevel: String = _
Expand Down Expand Up @@ -69,8 +68,7 @@ abstract class BaseJob[T: ClassTag] extends App {
.appName(jobName)
.getOrCreate()
sContext = spark.sparkContext
sc = sContext.getConf
propertyHelper = PropertyHelper.getInstance(sc);
propertyHelper = PropertyHelper.getInstance(sContext.getConf);

runId = propertyHelper.getLong(KnownProperties.RUN_ID)
prevRunId = propertyHelper.getLong(KnownProperties.PREV_RUN_ID)
Expand All @@ -79,9 +77,9 @@ abstract class BaseJob[T: ClassTag] extends App {
runId = System.nanoTime();
}
consistencyLevel = propertyHelper.getString(KnownProperties.READ_CL)
connectionFetcher = new ConnectionFetcher(sc, propertyHelper)
originConnection = connectionFetcher.getConnection(Side.ORIGIN, consistencyLevel, runId)
targetConnection = connectionFetcher.getConnection(Side.TARGET, consistencyLevel, runId)
connectionFetcher = new ConnectionFetcher(propertyHelper)
originConnection = connectionFetcher.getConnection(sContext.getConf, Side.ORIGIN, consistencyLevel, runId)
targetConnection = connectionFetcher.getConnection(sContext.getConf, Side.TARGET, consistencyLevel, runId)

val hasRandomPartitioner: Boolean = {
val partitionerName = originConnection.withSessionDo(_.getMetadata.getTokenMap.get().getPartitionerName)
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/datastax/cdm/job/ConnectionFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.datastax.cdm.data.DataUtility.generateSCB
import com.datastax.cdm.data.PKFactory.Side

// TODO: CDM-31 - add localDC configuration support
class ConnectionFetcher(config: SparkConf, propertyHelper: IPropertyHelper) extends Serializable {
class ConnectionFetcher(propertyHelper: IPropertyHelper) extends Serializable {
val logger: Logger = LoggerFactory.getLogger(this.getClass.getName)

def getConnectionDetails(side: Side): ConnectionDetails = {
Expand Down Expand Up @@ -63,7 +63,7 @@ class ConnectionFetcher(config: SparkConf, propertyHelper: IPropertyHelper) exte
}
}

def getConnection(side: Side, consistencyLevel: String, runId: Long): CassandraConnector = {
def getConnection(config: SparkConf, side: Side, consistencyLevel: String, runId: Long): CassandraConnector = {
val connectionDetails = getConnectionDetails(side)

logger.info("PARAM -- SSL Enabled: "+connectionDetails.sslEnabled);
Expand Down
8 changes: 5 additions & 3 deletions src/main/scala/com/datastax/cdm/job/DiffData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package com.datastax.cdm.job

import com.datastax.cdm.feature.TrackRun
import com.datastax.cdm.data.PKFactory.Side
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
import com.datastax.cdm.properties.KnownProperties
import com.datastax.cdm.job.IJobSessionFactory.JobType

object DiffData extends BasePartitionJob {
Expand All @@ -34,6 +34,8 @@ object DiffData extends BasePartitionJob {
var ma = new CDMMetricsAccumulator(jobType)
sContext.register(ma, "CDMMetricsAccumulator")

val bcOriginConfig = sContext.broadcast(sContext.getConf)
val bcTargetConfig = sContext.broadcast(sContext.getConf)
val bcConnectionFetcher = sContext.broadcast(connectionFetcher)
val bcPropHelper = sContext.broadcast(propertyHelper)
val bcJobFactory = sContext.broadcast(jobFactory)
Expand All @@ -42,8 +44,8 @@ object DiffData extends BasePartitionJob {

slices.foreach(slice => {
if (null == originConnection) {
originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
targetConnection = bcConnectionFetcher.value.getConnection(Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
originConnection = bcConnectionFetcher.value.getConnection(bcOriginConfig.value, Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
targetConnection = bcConnectionFetcher.value.getConnection(bcTargetConfig.value, Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
trackRunFeature = targetConnection.withSessionDo(targetSession => new TrackRun(targetSession, bcKeyspaceTableValue.value))
}
originConnection.withSessionDo(originSession =>
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/com/datastax/cdm/job/GuardrailCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.datastax.cdm.job

import com.datastax.cdm.data.PKFactory.Side
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
import com.datastax.cdm.properties.KnownProperties
import com.datastax.cdm.job.IJobSessionFactory.JobType

object GuardrailCheck extends BasePartitionJob {
Expand All @@ -32,13 +32,14 @@ object GuardrailCheck extends BasePartitionJob {
var ma = new CDMMetricsAccumulator(jobType)
sContext.register(ma, "CDMMetricsAccumulator")

val bcOriginConfig = sContext.broadcast(sContext.getConf)
val bcConnectionFetcher = sContext.broadcast(connectionFetcher)
val bcPropHelper = sContext.broadcast(propertyHelper)
val bcJobFactory = sContext.broadcast(jobFactory)

slices.foreach(slice => {
if (null == originConnection) {
originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), 0)
originConnection = bcConnectionFetcher.value.getConnection(bcOriginConfig.value, Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), 0)
}
originConnection.withSessionDo(originSession =>
bcJobFactory.value.getInstance(originSession, null, bcPropHelper.value)
Expand Down
10 changes: 6 additions & 4 deletions src/main/scala/com/datastax/cdm/job/Migrate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.datastax.cdm.job
import com.datastax.cdm.feature.TrackRun
import com.datastax.cdm.job.CDMMetricsAccumulator
import com.datastax.cdm.data.PKFactory.Side
import com.datastax.cdm.properties.{KnownProperties, PropertyHelper}
import com.datastax.cdm.properties.KnownProperties
import com.datastax.cdm.job.IJobSessionFactory.JobType

object Migrate extends BasePartitionJob {
Expand All @@ -34,7 +34,9 @@ object Migrate extends BasePartitionJob {
jobFactory.getInstance(originSession, targetSession, propertyHelper).initCdmRun(runId, prevRunId, parts, trackRunFeature, jobType)))
var ma = new CDMMetricsAccumulator(jobType)
sContext.register(ma, "CDMMetricsAccumulator")


val bcOriginConfig = sContext.broadcast(sContext.getConf)
val bcTargetConfig = sContext.broadcast(sContext.getConf)
val bcConnectionFetcher = sContext.broadcast(connectionFetcher)
val bcPropHelper = sContext.broadcast(propertyHelper)
val bcJobFactory = sContext.broadcast(jobFactory)
Expand All @@ -43,8 +45,8 @@ object Migrate extends BasePartitionJob {

slices.foreach(slice => {
if (null == originConnection) {
originConnection = bcConnectionFetcher.value.getConnection(Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
targetConnection = bcConnectionFetcher.value.getConnection(Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
originConnection = bcConnectionFetcher.value.getConnection(bcOriginConfig.value, Side.ORIGIN, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
targetConnection = bcConnectionFetcher.value.getConnection(bcTargetConfig.value, Side.TARGET, bcPropHelper.value.getString(KnownProperties.READ_CL), bcRunId.value)
trackRunFeature = targetConnection.withSessionDo(targetSession => new TrackRun(targetSession, bcKeyspaceTableValue.value))
}
originConnection.withSessionDo(originSession =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void setup() {
commonSetupWithoutDefaultClassVariables();
MockitoAnnotations.openMocks(this);

cf = new ConnectionFetcher(conf, propertyHelper);
cf = new ConnectionFetcher(propertyHelper);
}

@Test
Expand Down