Skip to content

Commit

Permalink
Short-lived & unique SCB for each run (#317)
Browse files Browse the repository at this point in the history
* Made SCB much short-lived when its generated by CDM for security reasons.

* Made short-lived SCB also unique when using run-id to avoid overlap with other parallel runs.

* updated release notes
  • Loading branch information
pravinbhat authored Oct 11, 2024
1 parent fddce4e commit cd54ca7
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 23 deletions.
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Release Notes
## [4.5.1] - 2024-10-11
- Made CDM generated SCB unique & much short-lived when using the TLS option to connect to Astra more securely.

## [4.5.0] - 2024-10-03
- Upgraded to use log4j 2.x and included a template properties file that will help separate general logs from CDM class specific logs including a separate log for rows identified by `DiffData` (Validation) errors.
- Upgraded to use Spark `3.5.3`.
Expand Down
14 changes: 8 additions & 6 deletions src/main/java/com/datastax/cdm/data/DataUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import com.datastax.cdm.schema.CqlTable;

public class DataUtility {
public static final Logger logger = LoggerFactory.getLogger(CqlConversion.class);
public static final Logger logger = LoggerFactory.getLogger(DataUtility.class.getName());

protected static final String SCB_FILE_NAME = "_temp_cdm_scb_do_not_touch.zip";

public static boolean diff(Object obj1, Object obj2) {
Expand Down Expand Up @@ -151,19 +152,19 @@ public static String getMyClassMethodLine(Exception e) {
return "Unknown";
}

public static void deleteGeneratedSCB() {
File file = new File(PKFactory.Side.ORIGIN + SCB_FILE_NAME);
public static void deleteGeneratedSCB(long runId) {
File file = new File(PKFactory.Side.ORIGIN + "_" + Long.toString(runId) + SCB_FILE_NAME);
if (file.exists()) {
file.delete();
}
file = new File(PKFactory.Side.TARGET + SCB_FILE_NAME);
file = new File(PKFactory.Side.TARGET + "_" + Long.toString(runId) + SCB_FILE_NAME);
if (file.exists()) {
file.delete();
}
}

public static File generateSCB(String host, String port, String trustStorePassword, String trustStorePath,
String keyStorePassword, String keyStorePath, PKFactory.Side side) throws IOException {
String keyStorePassword, String keyStorePath, PKFactory.Side side, long runId) throws IOException {
FileOutputStream fileOutputStream = new FileOutputStream("config.json");
String scbJson = new StringBuilder("{\"host\": \"").append(host).append("\", \"port\": ").append(port)
.append(", \"keyStoreLocation\": \"./identity.jks\", \"keyStorePassword\": \"").append(keyStorePassword)
Expand All @@ -175,7 +176,8 @@ public static File generateSCB(String host, String port, String trustStorePasswo
FilePathAndNewName configFileWithName = new FilePathAndNewName(configFile, "config.json");
FilePathAndNewName keyFileWithName = new FilePathAndNewName(new File(keyStorePath), "identity.jks");
FilePathAndNewName trustFileWithName = new FilePathAndNewName(new File(trustStorePath), "trustStore.jks");
File zipFile = zip(Arrays.asList(configFileWithName, keyFileWithName, trustFileWithName), side + SCB_FILE_NAME);
File zipFile = zip(Arrays.asList(configFileWithName, keyFileWithName, trustFileWithName),
side + "_" + Long.toString(runId) + SCB_FILE_NAME);
configFile.delete();

return zipFile;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/datastax/cdm/job/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ public synchronized void initCdmRun(long runId, long prevRunId, Collection<Split
this.trackRunFeature = trackRunFeature;
if (null != trackRunFeature)
trackRunFeature.initCdmRun(runId, prevRunId, parts, runType);
DataUtility.deleteGeneratedSCB(runId);
}

public synchronized void printCounts(boolean isFinal) {
if (isFinal) {
jobCounter.printFinal(runId, trackRunFeature);
DataUtility.deleteGeneratedSCB();
} else {
jobCounter.printProgress();
}
Expand Down
16 changes: 8 additions & 8 deletions src/main/scala/com/datastax/cdm/job/BaseJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,16 @@ abstract class BaseJob[T: ClassTag] extends App {
sc = sContext.getConf
propertyHelper = PropertyHelper.getInstance(sc);

runId = propertyHelper.getLong(KnownProperties.RUN_ID)
prevRunId = propertyHelper.getLong(KnownProperties.PREV_RUN_ID)
trackRun = if (0 != prevRunId || 0 != runId) true else propertyHelper.getBoolean(KnownProperties.TRACK_RUN)
if (trackRun == true && runId == 0) {
runId = System.nanoTime();
}
consistencyLevel = propertyHelper.getString(KnownProperties.READ_CL)
val connectionFetcher = new ConnectionFetcher(sContext, propertyHelper)
originConnection = connectionFetcher.getConnection(Side.ORIGIN, consistencyLevel)
targetConnection = connectionFetcher.getConnection(Side.TARGET, consistencyLevel)
originConnection = connectionFetcher.getConnection(Side.ORIGIN, consistencyLevel, runId)
targetConnection = connectionFetcher.getConnection(Side.TARGET, consistencyLevel, runId)

val hasRandomPartitioner: Boolean = {
val partitionerName = originConnection.withSessionDo(_.getMetadata.getTokenMap.get().getPartitionerName)
Expand All @@ -82,12 +88,6 @@ abstract class BaseJob[T: ClassTag] extends App {
maxPartition = getMaxPartition(propertyHelper.getString(KnownProperties.PARTITION_MAX), hasRandomPartitioner)
coveragePercent = propertyHelper.getInteger(KnownProperties.TOKEN_COVERAGE_PERCENT)
numSplits = propertyHelper.getInteger(KnownProperties.PERF_NUM_PARTS)
runId = propertyHelper.getLong(KnownProperties.RUN_ID)
prevRunId = propertyHelper.getLong(KnownProperties.PREV_RUN_ID)
trackRun = if (0 != prevRunId || 0 != runId) true else propertyHelper.getBoolean(KnownProperties.TRACK_RUN)
if (trackRun == true && runId == 0) {
runId = System.nanoTime();
}

abstractLogger.info("PARAM -- Min Partition: " + minPartition)
abstractLogger.info("PARAM -- Max Partition: " + maxPartition)
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/datastax/cdm/job/ConnectionFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ConnectionFetcher(sparkContext: SparkContext, propertyHelper: IPropertyHel
}
}

def getConnection(side: Side, consistencyLevel: String): CassandraConnector = {
def getConnection(side: Side, consistencyLevel: String, runId: Long): CassandraConnector = {
val connectionDetails = getConnectionDetails(side)
val config: SparkConf = sparkContext.getConf

Expand All @@ -81,7 +81,7 @@ class ConnectionFetcher(sparkContext: SparkContext, propertyHelper: IPropertyHel

val scbFile = generateSCB(connectionDetails.host, connectionDetails.port,
connectionDetails.trustStorePassword, connectionDetails.trustStorePath,
connectionDetails.keyStorePassword, connectionDetails.keyStorePath, side)
connectionDetails.keyStorePassword, connectionDetails.keyStorePath, side, runId)
return CassandraConnector(config
.set("spark.cassandra.auth.username", connectionDetails.username)
.set("spark.cassandra.auth.password", connectionDetails.password)
Expand Down
12 changes: 6 additions & 6 deletions src/test/java/com/datastax/cdm/data/DataUtilityTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,24 +166,24 @@ public void getMyClassMethodLineTestUnknown() {
@Test
public void generateSCBOrigin() throws IOException {
File scb = DataUtility.generateSCB("localhost", "9042", "trust123", "./pom.xml", "key123", "./pom.xml",
PKFactory.Side.ORIGIN);
PKFactory.Side.ORIGIN, 0);
assertNotNull(scb);
File file = new File(PKFactory.Side.ORIGIN + DataUtility.SCB_FILE_NAME);
File file = new File(PKFactory.Side.ORIGIN + "_" + Long.toString(0) + DataUtility.SCB_FILE_NAME);
assertTrue(file.exists());

DataUtility.deleteGeneratedSCB();
DataUtility.deleteGeneratedSCB(0);
assertFalse(file.exists());
}

@Test
public void generateSCBTarget() throws IOException {
File scb = DataUtility.generateSCB("localhost", "9042", "trust123", "./pom.xml", "key123", "./pom.xml",
PKFactory.Side.TARGET);
PKFactory.Side.TARGET, 0);
assertNotNull(scb);
File file = new File(PKFactory.Side.TARGET + DataUtility.SCB_FILE_NAME);
File file = new File(PKFactory.Side.TARGET + "_" + Long.toString(0) + DataUtility.SCB_FILE_NAME);
assertTrue(file.exists());

DataUtility.deleteGeneratedSCB();
DataUtility.deleteGeneratedSCB(0);
assertFalse(file.exists());
}
}

0 comments on commit cd54ca7

Please sign in to comment.