Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/transform fields #289

Merged
merged 12 commits into from
Aug 26, 2024
Merged
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ Note:
- Perform guardrail checks (identify large fields)
- Supports adding `constants` as new columns on `Target`
- Supports expanding `Map` columns on `Origin` into multiple records on `Target`
- Supports extracting value from a JSON column in `Origin` and map it to a specific field on `Target`
- Fully containerized (Docker and K8s friendly)
- SSL Support (including custom cipher algorithms)
- Migrate from any Cassandra `Origin` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra)) to any Cassandra `Target` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra))
Expand Down
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Release Notes
## [4.3.5] - 2024-08-23
- Added feature `spark.cdm.feature.extractJson` which allows you to extract a json value from a column with json content in an Origin table and map it to a column in the Target table.

## [4.3.4] - 2024-07-31
- Use `spark.cdm.schema.origin.keyspaceTable` when `spark.cdm.schema.target.keyspaceTable` is missing. Fixes [bug introduced in prior version](https://github.com/datastax/cassandra-data-migrator/issues/284).

Expand Down
17 changes: 15 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>datastax.cdm</groupId>
Expand All @@ -10,7 +12,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.13.14</scala.version>
<scala.main.version>2.13</scala.main.version>
<spark.version>3.5.1</spark.version>
<spark.version>3.5.2</spark.version>
msmygit marked this conversation as resolved.
Show resolved Hide resolved
<connector.version>3.5.1</connector.version>
<cassandra.version>5.0-beta1</cassandra.version>
<junit.version>5.9.1</junit.version>
Expand Down Expand Up @@ -102,6 +104,11 @@
<artifactId>java-driver-query-builder</artifactId>
<version>${java-driver.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand All @@ -123,6 +130,12 @@
<groupId>com.esri.geometry</groupId>
<artifactId>esri-geometry-api</artifactId>
<version>2.2.4</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Test Dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,27 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr
try {
if (targetIndex== explodeMapKeyIndex) {
bindValue = explodeMapKey;
}
else if (targetIndex== explodeMapValueIndex) {
} else if (targetIndex== explodeMapValueIndex) {
bindValue = explodeMapValue;
}
else {
} else if (targetIndex == extractJsonFeature.getTargetColumnIndex()) {
int originIndex = extractJsonFeature.getOriginColumnIndex();
bindValue = extractJsonFeature.extract(originRow.getString(originIndex));
} else {
int originIndex = cqlTable.getCorrespondingIndex(targetIndex);
if (originIndex < 0) // we don't have data to bind for this column; continue to the next targetIndex
continue;
bindValue = cqlTable.getOtherCqlTable().getAndConvertData(originIndex, originRow);
}

boundStatement = boundStatement.set(currentBindIndex++, bindValue, cqlTable.getBindClass(targetIndex));
}
catch (Exception e) {
logger.error("Error trying to bind value:" + bindValue + " of class:" +(null==bindValue?"unknown":bindValue.getClass().getName())+ " to column:" + targetColumnNames.get(targetIndex) + " of targetDataType:" + targetColumnTypes.get(targetIndex)+ "/" + cqlTable.getBindClass(targetIndex).getName() + " at column index:" + targetIndex + " and bind index: "+ (currentBindIndex-1) + " of statement:" + this.getCQL());
throw e;
} catch (Exception e) {
logger.error("Error trying to bind value:" + bindValue + " of class:"
+ (null == bindValue ? "unknown" : bindValue.getClass().getName()) + " to column:"
+ targetColumnNames.get(targetIndex) + " of targetDataType:"
+ targetColumnTypes.get(targetIndex) + "/" + cqlTable.getBindClass(targetIndex).getName()
+ " at column index:" + targetIndex + " and bind index: " + (currentBindIndex - 1)
+ " of statement:" + this.getCQL());
throw new RuntimeException("Error trying to bind value: ", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr
}
else if (targetIndex== explodeMapKeyIndex) {
bindValueTarget = explodeMapKey;
}
else if (targetIndex== explodeMapValueIndex) {
} else if (targetIndex== explodeMapValueIndex) {
bindValueTarget = explodeMapValue;
} else if (targetIndex == extractJsonFeature.getTargetColumnIndex()) {
originIndex = extractJsonFeature.getOriginColumnIndex();
bindValueTarget = extractJsonFeature.extract(originRow.getString(originIndex));
} else {
if (originIndex < 0)
// we don't have data to bind for this column; continue to the next targetIndex
Expand All @@ -89,7 +91,7 @@ else if (targetIndex== explodeMapValueIndex) {
logger.error("Error trying to bind value:" + bindValueTarget + " to column:" +
targetColumnNames.get(targetIndex) + " of targetDataType:" + targetColumnTypes.get(targetIndex) + "/"
+ cqlTable.getBindClass(targetIndex).getName() + " at column index:" + targetIndex);
throw e;
throw new RuntimeException("Error trying to bind value: ", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.cdm.data.Record;
import com.datastax.cdm.feature.ConstantColumns;
import com.datastax.cdm.feature.ExplodeMap;
import com.datastax.cdm.feature.ExtractJson;
import com.datastax.cdm.feature.Featureset;
import com.datastax.cdm.feature.WritetimeTTL;
import com.datastax.cdm.properties.IPropertyHelper;
Expand Down Expand Up @@ -53,6 +54,8 @@ public abstract class TargetUpsertStatement extends BaseCdmStatement {
protected int explodeMapValueIndex = -1;
private Boolean haveCheckedBindInputsOnce = false;

protected ExtractJson extractJsonFeature;

protected abstract String buildStatement();
protected abstract BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long writeTime, Object explodeMapKey, Object explodeMapValue);

Expand All @@ -61,6 +64,7 @@ public TargetUpsertStatement(IPropertyHelper propertyHelper, EnhancedSession ses

constantColumnFeature = (ConstantColumns) cqlTable.getFeature(Featureset.CONSTANT_COLUMNS);
explodeMapFeature = (ExplodeMap) cqlTable.getFeature(Featureset.EXPLODE_MAP);
extractJsonFeature = (ExtractJson) cqlTable.getFeature(Featureset.EXTRACT_JSON);

setTTLAndWriteTimeBooleans();
targetColumnNames.addAll(cqlTable.getColumnNames(true));
Expand Down
154 changes: 154 additions & 0 deletions src/main/java/com/datastax/cdm/feature/ExtractJson.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.cdm.feature;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.cdm.properties.IPropertyHelper;
import com.datastax.cdm.properties.KnownProperties;
import com.datastax.cdm.schema.CqlTable;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class ExtractJson extends AbstractFeature {
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
private ObjectMapper mapper = new ObjectMapper();

private String originColumnName = "";
private String originJsonFieldName = "";
private Integer originColumnIndex = -1;

private String targetColumnName = "";
private Integer targetColumnIndex = -1;

@Override
public boolean loadProperties(IPropertyHelper helper) {
if (null == helper) {
throw new IllegalArgumentException("helper is null");
}

originColumnName = getColumnName(helper, KnownProperties.EXTRACT_JSON_ORIGIN_COLUMN_NAME);
targetColumnName = getColumnName(helper, KnownProperties.EXTRACT_JSON_TARGET_COLUMN_MAPPING);
// Convert columnToFieldMapping to targetColumnName and originJsonFieldName
if (!targetColumnName.isBlank()) {
String[] parts = targetColumnName.split("\\:");
if (parts.length == 2) {
originJsonFieldName = parts[0];
targetColumnName = parts[1];
} else {
originJsonFieldName = targetColumnName;
}
}

isValid = validateProperties();
isEnabled = isValid && !originColumnName.isEmpty() && !targetColumnName.isEmpty();
isLoaded = true;

return isLoaded && isValid;
}

@Override
protected boolean validateProperties() {
if ((null == originColumnName || originColumnName.isEmpty())
&& (null == targetColumnName || targetColumnName.isEmpty()))
return true;

if (null == originColumnName || originColumnName.isEmpty()) {
logger.error("Origin column name is not set when Target ({}) are set", targetColumnName);
return false;
}

if (null == targetColumnName || targetColumnName.isEmpty()) {
logger.error("Target column name is not set when Origin ({}) are set", originColumnName);
return false;
}

return true;
}

@Override
public boolean initializeAndValidate(CqlTable originTable, CqlTable targetTable) {
if (null == originTable || null == targetTable) {
throw new IllegalArgumentException("Origin table and/or Target table is null");
}
if (!originTable.isOrigin()) {
throw new IllegalArgumentException(originTable.getKeyspaceTable() + " is not an origin table");
}
if (targetTable.isOrigin()) {
throw new IllegalArgumentException(targetTable.getKeyspaceTable() + " is not a target table");
}

if (!validateProperties()) {
isEnabled = false;
return false;
}
if (!isEnabled)
return true;

// Initialize Origin variables
List<Class> originBindClasses = originTable.extendColumns(Collections.singletonList(originColumnName));
if (null == originBindClasses || originBindClasses.size() != 1 || null == originBindClasses.get(0)) {
throw new IllegalArgumentException("Origin column " + originColumnName
+ " is not found on the origin table " + originTable.getKeyspaceTable());
} else {
this.originColumnIndex = originTable.indexOf(originColumnName);
}

// Initialize Target variables
List<Class> targetBindClasses = targetTable.extendColumns(Collections.singletonList(targetColumnName));
if (null == targetBindClasses || targetBindClasses.size() != 1 || null == targetBindClasses.get(0)) {
throw new IllegalArgumentException("Target column " + targetColumnName
+ " is not found on the target table " + targetTable.getKeyspaceTable());
} else {
this.targetColumnIndex = targetTable.indexOf(targetColumnName);
}

logger.info("Feature {} is {}", this.getClass().getSimpleName(), isEnabled ? "enabled" : "disabled");
return true;
}

public Object extract(String jsonString) throws JsonMappingException, JsonProcessingException {
if (StringUtils.isNotBlank(jsonString)) {
return mapper.readValue(jsonString, Map.class).get(originJsonFieldName);
}

return null;
}

public Integer getOriginColumnIndex() {
return isEnabled ? originColumnIndex : -1;
}

public Integer getTargetColumnIndex() {
return isEnabled ? targetColumnIndex : -1;
}

public String getTargetColumnName() {
return isEnabled ? targetColumnName : "";
}

private String getColumnName(IPropertyHelper helper, String colName) {
String columnName = CqlTable.unFormatName(helper.getString(colName));
return (null == columnName) ? "" : columnName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public static Feature getFeature(Featureset feature) {
case ORIGIN_FILTER: return new OriginFilterCondition();
case CONSTANT_COLUMNS: return new ConstantColumns();
case EXPLODE_MAP: return new ExplodeMap();
case EXTRACT_JSON: return new ExtractJson();
case WRITETIME_TTL: return new WritetimeTTL();
case GUARDRAIL_CHECK: return new Guardrail();
default:
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/datastax/cdm/feature/Featureset.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public enum Featureset {
ORIGIN_FILTER,
CONSTANT_COLUMNS,
EXPLODE_MAP,
EXTRACT_JSON,
WRITETIME_TTL,
GUARDRAIL_CHECK,
TEST_UNIMPLEMENTED_FEATURE
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/datastax/cdm/job/CopyJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,14 @@ private void getDataAndInsert(BigInteger min, BigInteger max) {
jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED);
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.PASS);

} catch (Exception e) {
jobCounter.threadIncrement(JobCounter.CounterType.ERROR,
jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE)
- jobCounter.getCount(JobCounter.CounterType.SKIPPED));
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.FAIL);
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}",
Thread.currentThread().getId(), min, max);
Thread.currentThread().getId(), min, max, e);
logger.error("Error stats " + jobCounter.getThreadCounters(false));
} finally {
jobCounter.globalIncrement();
Expand Down
Loading