diff --git a/.classpath b/.classpath index 39b3b9b8..3540e69f 100644 --- a/.classpath +++ b/.classpath @@ -9,16 +9,17 @@ + + - - + @@ -28,5 +29,26 @@ + + + + + + + + + + + + + + + + + + + + + diff --git a/RELEASE.md b/RELEASE.md index 473175d3..cf4fcf02 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,9 @@ # Release Notes +## [4.1.16] - 2024-05-31 +- Added property to manage null values in Map fields +- Allow separate input and output partition CSV files +- Updated README + ## [4.1.15] - 2024-03-05 - Internal CI/CD release fix diff --git a/pom.xml b/pom.xml index be04f3a1..e50d41ea 100644 --- a/pom.xml +++ b/pom.xml @@ -1,330 +1,340 @@ - - 4.0.0 + + 4.0.0 - datastax.cdm - cassandra-data-migrator - 4.1.16-SNAPSHOT - jar + datastax.cdm + cassandra-data-migrator + 4.1.16-SNAPSHOT + jar - - UTF-8 - 2.13.13 - 2.13 - 3.5.1 - 3.5.0 - 5.0-beta1 - 5.9.1 - 4.11.0 - 4.17.0 - 2.23.1 - 11 - 11 - 11 - + + UTF-8 + 2.13.13 + 2.13 + 3.5.1 + 3.5.0 + 5.0-beta1 + 5.9.1 + 4.11.0 + 4.17.0 + 2.23.1 + 11 + 11 + 11 + - - - github - GitHub Packages - https://maven.pkg.github.com/datastax/cassandra-data-migrator - - + + + github + GitHub Packages + https://maven.pkg.github.com/datastax/cassandra-data-migrator + + - - - org.scala-lang - scala-library - ${scala.version} - - - org.apache.spark - spark-core_${scala.main.version} - ${spark.version} - - - log4j - log4j - - - org.apache.logging.log4j - log4j-slf4j-impl - - - org.apache.logging.log4j - log4j-slf4j2-impl - - - provided - - - org.apache.spark - spark-sql_${scala.main.version} - ${spark.version} - provided - - - org.apache.spark - spark-hive_${scala.main.version} - ${spark.version} - - - log4j - log4j - - - log4j - apache-log4j-extras - - - provided - - - com.datastax.spark - spark-cassandra-connector_${scala.main.version} - ${connector.version} - - - com.datastax.oss - java-driver-core-shaded - - - com.datastax.oss - java-driver-mapper-runtime - - - com.datastax.oss - java-driver-query-builder - - - com.datastax.oss - java-driver-shaded-guava - - - + + + org.scala-lang + scala-library + ${scala.version} + + + org.apache.spark + spark-core_${scala.main.version} + ${spark.version} + + + log4j + log4j + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-slf4j2-impl + + + provided + + + org.apache.spark + spark-sql_${scala.main.version} + ${spark.version} + provided + + + org.apache.spark + spark-hive_${scala.main.version} + ${spark.version} + + + log4j + log4j + + + log4j + apache-log4j-extras + + + commons-lang + commons-lang + + + provided + + + com.datastax.spark + spark-cassandra-connector_${scala.main.version} + ${connector.version} + + + com.datastax.oss + java-driver-core-shaded + + + com.datastax.oss + java-driver-mapper-runtime + + + com.datastax.oss + java-driver-query-builder + + + com.datastax.oss + java-driver-shaded-guava + + + - - - com.datastax.oss - java-driver-core-shaded - ${java-driver.version} - - - com.datastax.oss - java-driver-mapper-runtime - ${java-driver.version} - - - com.datastax.oss - java-driver-query-builder - ${java-driver.version} - - + + com.datastax.oss + java-driver-core-shaded + ${java-driver.version} + + + com.datastax.oss + java-driver-mapper-runtime + ${java-driver.version} + + + com.datastax.oss + java-driver-query-builder + ${java-driver.version} + + - - - com.github.jnr - jnr-posix - 3.1.19 - - - org.apache.logging.log4j - log4j-api - ${log4j.version} - - - org.apache.logging.log4j - log4j-core - ${log4j.version} - - - org.apache.logging.log4j - log4j-to-slf4j - ${log4j.version} - - - runtime - com.esri.geometry - esri-geometry-api - 2.2.4 - + + com.github.jnr + jnr-posix + 3.1.19 + + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + org.apache.logging.log4j + log4j-to-slf4j + ${log4j.version} + + + runtime + com.esri.geometry + esri-geometry-api + 2.2.4 + + + + + org.junit.jupiter + junit-jupiter-engine + ${junit.version} + test + + + org.apache.cassandra + cassandra-all + ${cassandra.version} + test + + + + org.slf4j + log4j-over-slf4j + + + + + + org.mockito + mockito-core + ${mockito.version} + test + + - - - org.junit.jupiter - junit-jupiter-engine - ${junit.version} - test - - - org.apache.cassandra - cassandra-all - ${cassandra.version} - test - - - - org.slf4j - log4j-over-slf4j - - - - - - org.mockito - mockito-core - ${mockito.version} - test - - - - - scm:git:git@github.com:datastax/cassandra-data-migrator.git - scm:git:git@github.com:datastax/cassandra-data-migrator.git - https://github.com/datastax/cassandra-data-migrator - HEAD - + + scm:git:git@github.com:datastax/cassandra-data-migrator.git + + scm:git:git@github.com:datastax/cassandra-data-migrator.git + https://github.com/datastax/cassandra-data-migrator + HEAD + - - - - src/resources - - - - - net.alchim31.maven - scala-maven-plugin - 4.8.1 - - - process-sources - - compile - testCompile - + + + + src/resources + + + + + net.alchim31.maven + scala-maven-plugin + 4.8.1 + + + process-sources + + compile + testCompile + - - - + + + - - org.apache.maven.plugins - maven-surefire-plugin - 2.22.2 - + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.2 + - - org.apache.maven.plugins - maven-shade-plugin - 3.5.1 - - + + org.apache.maven.plugins + maven-shade-plugin + 3.5.1 + + - package - - shade - + package + + shade + - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + - - org.apache.maven.plugins - maven-compiler-plugin - 3.12.1 - - ${maven.compiler.source} - ${maven.compiler.target} - ${maven.compiler.release} - - + + org.apache.maven.plugins + maven-compiler-plugin + 3.12.1 + + ${maven.compiler.source} + ${maven.compiler.target} + ${maven.compiler.release} + + - - - org.jacoco - jacoco-maven-plugin - 0.8.10 - - - report - prepare-package - - report - - - - - prepare-agent - - - - jacoco-check - test - - report - check - - - - - BUNDLE - - - COMPLEXITY - COVEREDRATIO - 0.33 - - - INSTRUCTION - COVEREDRATIO - 45% - - - LINE - MISSEDCOUNT - 1500 - - - - - - - - - - org.apache.rat - apache-rat-plugin - 0.15 - - rat-excludes.txt - - - - verify - - check - - - - - - + + + org.jacoco + jacoco-maven-plugin + 0.8.10 + + + report + prepare-package + + report + + + + + prepare-agent + + + + jacoco-check + test + + report + check + + + + + BUNDLE + + + COMPLEXITY + COVEREDRATIO + 0.33 + + + INSTRUCTION + COVEREDRATIO + 45% + + + LINE + MISSEDCOUNT + 1500 + + + + + + + + + + org.apache.rat + apache-rat-plugin + 0.15 + + rat-excludes.txt + + + + verify + + check + + + + + + diff --git a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java index 44799398..f8b0efa4 100644 --- a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java +++ b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java @@ -147,11 +147,12 @@ public boolean initializeAndValidate(CqlTable originTable, CqlTable targetTable) isValid = false; } - if (this.writetimeIncrement == 0L && (null!=writetimeNames && !writetimeNames.isEmpty()) && originTable.hasUnfrozenList()) { - logger.warn("Writetime is configured, but the origin table at least one unfrozen List, and there is a zero-value increment configured at "+ - KnownProperties.TRANSFORM_CUSTOM_WRITETIME_INCREMENT+"; this may result in duplicate list entries when "+ - KnownProperties.AUTOCORRECT_MISMATCH+" is enabled."); - } + if (this.writetimeIncrement == 0L && (null != writetimeNames && !writetimeNames.isEmpty()) + && originTable.hasUnfrozenList()) { + logger.warn("Origin table has at least one unfrozen List, and " + + KnownProperties.TRANSFORM_CUSTOM_WRITETIME_INCREMENT + + " is set to zero; this may result in duplicate list entries on reruns or validation with autocorrect."); + } if (!isValid) isEnabled = false; logger.info("Feature {} is {}", this.getClass().getSimpleName(), isEnabled?"enabled":"disabled"); diff --git a/src/main/java/com/datastax/cdm/properties/KnownProperties.java b/src/main/java/com/datastax/cdm/properties/KnownProperties.java index fb0ef45a..2eb2e74f 100644 --- a/src/main/java/com/datastax/cdm/properties/KnownProperties.java +++ b/src/main/java/com/datastax/cdm/properties/KnownProperties.java @@ -15,7 +15,13 @@ */ package com.datastax.cdm.properties; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; public class KnownProperties { @@ -172,24 +178,27 @@ public enum PropertyType { public static final String TRANSFORM_CODECS = "spark.cdm.transform.codecs"; public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT = "spark.cdm.transform.codecs.timestamp.string.format"; public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE = "spark.cdm.transform.codecs.timestamp.string.zone"; + public static final String TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE = "spark.cdm.transform.map.remove.null.value"; // TODO: 3.3.0 refactored how guardrails are handled, this needs to be merged forward // public static final String GUARDRAIL_FIELD_LIMIT_MB = "spark.guardrail.colSizeInKB"; //10 - static { - types.put(TRANSFORM_REPLACE_MISSING_TS, PropertyType.NUMBER); - types.put(TRANSFORM_CUSTOM_WRITETIME, PropertyType.NUMBER); - defaults.put(TRANSFORM_CUSTOM_WRITETIME, "0"); - types.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, PropertyType.NUMBER); - defaults.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, "0"); - types.put(TRANSFORM_CODECS, PropertyType.STRING_LIST); - types.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT, PropertyType.STRING); - defaults.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT, "yyyyMMddHHmmss"); - types.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE, PropertyType.STRING); - defaults.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE, "UTC"); - } + static { + types.put(TRANSFORM_REPLACE_MISSING_TS, PropertyType.NUMBER); + types.put(TRANSFORM_CUSTOM_WRITETIME, PropertyType.NUMBER); + defaults.put(TRANSFORM_CUSTOM_WRITETIME, "0"); + types.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, PropertyType.NUMBER); + defaults.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, "0"); + types.put(TRANSFORM_CODECS, PropertyType.STRING_LIST); + types.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT, PropertyType.STRING); + defaults.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT, "yyyyMMddHHmmss"); + types.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE, PropertyType.STRING); + defaults.put(TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE, "UTC"); + types.put(TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE, PropertyType.BOOLEAN); + defaults.put(TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE, "false"); + } //========================================================================== // Cassandra-side Filters diff --git a/src/main/java/com/datastax/cdm/properties/PropertyHelper.java b/src/main/java/com/datastax/cdm/properties/PropertyHelper.java index 6daf2f69..e7bf2fca 100644 --- a/src/main/java/com/datastax/cdm/properties/PropertyHelper.java +++ b/src/main/java/com/datastax/cdm/properties/PropertyHelper.java @@ -15,7 +15,7 @@ */ package com.datastax.cdm.properties; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.SparkConf; diff --git a/src/main/java/com/datastax/cdm/schema/CqlTable.java b/src/main/java/com/datastax/cdm/schema/CqlTable.java index 3b12cfe7..f1dc2fdf 100644 --- a/src/main/java/com/datastax/cdm/schema/CqlTable.java +++ b/src/main/java/com/datastax/cdm/schema/CqlTable.java @@ -15,9 +15,29 @@ */ package com.datastax.cdm.schema; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.cdm.data.CqlConversion; +import com.datastax.cdm.data.CqlData; +import com.datastax.cdm.data.DataUtility; import com.datastax.cdm.feature.Feature; import com.datastax.cdm.feature.Featureset; import com.datastax.cdm.feature.WritetimeTTL; +import com.datastax.cdm.properties.KnownProperties; +import com.datastax.cdm.properties.PropertyHelper; import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; @@ -26,22 +46,12 @@ import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; -import com.datastax.oss.driver.api.core.type.*; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.core.type.ListType; +import com.datastax.oss.driver.api.core.type.TupleType; import com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException; import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry; -import com.datastax.cdm.data.CqlData; -import com.datastax.cdm.data.CqlConversion; -import com.datastax.cdm.data.DataUtility; -import com.datastax.cdm.properties.KnownProperties; -import com.datastax.cdm.properties.PropertyHelper; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Instant; -import java.util.*; -import java.util.stream.Collectors; -import java.util.stream.IntStream; public class CqlTable extends BaseTable { public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); @@ -163,6 +173,9 @@ public Integer getBatchSize() { return prop; } + private boolean removeMapWithNoValues = propertyHelper + .getBoolean(KnownProperties.TRANSFORM_MAP_REMOVE_KEY_WITH_NO_VALUE); + // Adds to the current column list based on the name and type of columns already existing in the table // This is useful where a feature is adding a column by name of an existing column. // If the column is already present, the bind class is added to the return list. @@ -277,6 +290,11 @@ public Object getAndConvertData(int index, Row row) { if (null==thisObject) { return convertNull(index); } + + if (removeMapWithNoValues && thisObject instanceof Map) { + return removeNullValuesFromMap(thisObject); + } + CqlConversion cqlConversion = this.cqlConversions.get(index); if (null==cqlConversion) { if (logTrace) logger.trace("{} Index:{} not converting:{}",isOrigin?"origin":"target",index,thisObject); @@ -288,6 +306,12 @@ public Object getAndConvertData(int index, Row row) { } } + private Object removeNullValuesFromMap(Object thisObject) { + Set ms = (((Map) thisObject).entrySet()); + return ms.stream().filter(e -> (e.getValue() != null)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + public Object convertNull(int thisIndex) { // We do not need to convert nulls for non-PK columns int otherIndex = this.getCorrespondingIndex(thisIndex); diff --git a/src/resources/cdm-detailed.properties b/src/resources/cdm-detailed.properties index f237bd86..fa7b8bbb 100644 --- a/src/resources/cdm-detailed.properties +++ b/src/resources/cdm-detailed.properties @@ -183,13 +183,16 @@ spark.cdm.autocorrect.mismatch false # spark.cdm.perfops # .numParts : Defaults is 10000. In standard operation, the full token range (-2^63..2^63-1) # is divided into a number of parts which will be parallel-processed. You should -# aim for each part to comprise a total of ≈1-10GB of data to migrate. During -# initial testing, you may want this to be a small number (even 1). +# aim for each part to be ~10MB of data to migrate. The default is assuming the +# table size is close to 100GB (10MB * 10K), if the table size is significantly +# less or more than that, adjust this value accordingly. # .batchSize : Defaults is 5. When writing to Target, this comprises the number of records that # will be put into an UNLOGGED batch. CDM will tend to work on the same partition # at a time so if your partition sizes are larger, this number may be increased. # If .batchSize would mean that more than 1 partition is often contained in a batch, # the figure should be reduced. Ideally < 1% of batches have more than 1 partition. +# For tables where primary-key=partition-key OR average row-size larger than 20 KB, +# always set this value to 1. # .ratelimit # .origin : Defaults to 20000. Concurrent number of operations across all parallel threads # from Origin. This may be adjusted up (or down), depending on the amount of data @@ -266,6 +269,11 @@ spark.cdm.perfops.ratelimit.target 40000 # .codecs.timestamp Configuration for CQL_TIMESTAMP_TO_STRING_FORMAT codec. # .string.format Default is yyyyMMddHHmmss ; DateTimeFormatter.ofPattern(formatString) # .string.zone Default is UTC ; Must be in ZoneRulesProvider.getAvailableZoneIds() +# +# .map.remove.null.value Default is false. Setting this to true will remove any entries in Map +# field with empty or null values. Such values can create NPE exception +# if the value type does not support empty or null values (like Timestamp) +# and this property is false. Set it to true to handle such exceptions. #----------------------------------------------------------------------------------------------------------- #spark.cdm.transform.missing.key.ts.replace.value 1685577600000 #spark.cdm.transform.custom.writetime 0 @@ -273,6 +281,7 @@ spark.cdm.perfops.ratelimit.target 40000 #spark.cdm.transform.codecs #spark.cdm.transform.codecs.timestamp.string.format yyyyMMddHHmmss #spark.cdm.transform.codecs.timestamp.string.zone UTC +#spark.cdm.transform.map.remove.null.value false #=========================================================================================================== # Cassandra Filters are applied on the coordinator node. Note that, depending on the filter, the coordinator