diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 9575a654be92..773cd35e9367 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.delta -import org.apache.gluten.backendsapi.clickhouse.CHBackend +import org.apache.gluten.backendsapi.clickhouse.CHConf import org.apache.gluten.execution.ColumnarToRowExecBase import org.apache.spark.SparkException @@ -128,7 +128,7 @@ class ClickhouseOptimisticTransaction( spark.conf.getAll.foreach( entry => { if ( - entry._1.startsWith(s"${CHBackend.CONF_PREFIX}.runtime_settings") + CHConf.startWithSettings(entry._1) || entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key) ) { options += (entry._1 -> entry._2) diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 9575a654be92..773cd35e9367 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.delta -import org.apache.gluten.backendsapi.clickhouse.CHBackend +import org.apache.gluten.backendsapi.clickhouse.CHConf import org.apache.gluten.execution.ColumnarToRowExecBase import org.apache.spark.SparkException @@ -128,7 +128,7 @@ class ClickhouseOptimisticTransaction( spark.conf.getAll.foreach( entry => { if ( - entry._1.startsWith(s"${CHBackend.CONF_PREFIX}.runtime_settings") + CHConf.startWithSettings(entry._1) || entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key) ) { options += (entry._1 -> entry._2) diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala index 6db39b72781f..00940a4851e0 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.delta -import org.apache.gluten.backendsapi.clickhouse.CHBackend +import org.apache.gluten.backendsapi.clickhouse.CHConf import org.apache.gluten.execution.ColumnarToRowExecBase import org.apache.spark.SparkException @@ -140,7 +140,7 @@ class ClickhouseOptimisticTransaction( spark.conf.getAll.foreach( entry => { if ( - entry._1.startsWith(s"${CHBackend.CONF_PREFIX}.runtime_settings") + CHConf.startWithSettings(entry._1) || entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key) ) { options += (entry._1 -> entry._2) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index e63d871f9964..cecdd7477a3a 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -62,8 +62,7 @@ class CHBackend extends SubstraitBackend { } object CHBackend { - val BACKEND_NAME = "ch" - val CONF_PREFIX: String = GlutenConfig.prefixOf(BACKEND_NAME) + val BACKEND_NAME: String = CHConf.BACKEND_NAME } object CHBackendSettings extends BackendSettingsApi with Logging { @@ -74,13 +73,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging { // experimental: when the files count per partition exceeds this threshold, // it will put the files into one partition. val GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD: String = - GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + - ".files.per.partition.threshold" + CHConf.prefixOf("files.per.partition.threshold") val GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD_DEFAULT = "-1" private val GLUTEN_CLICKHOUSE_CUSTOMIZED_SHUFFLE_CODEC_ENABLE: String = - GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + - ".customized.shuffle.codec.enable" + CHConf.prefixOf("customized.shuffle.codec.enable") private val GLUTEN_CLICKHOUSE_CUSTOMIZED_SHUFFLE_CODEC_ENABLE_DEFAULT = false lazy val useCustomizedShuffleCodec: Boolean = SparkEnv.get.conf.getBoolean( CHBackendSettings.GLUTEN_CLICKHOUSE_CUSTOMIZED_SHUFFLE_CODEC_ENABLE, @@ -88,8 +85,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { ) private val GLUTEN_CLICKHOUSE_CUSTOMIZED_BUFFER_SIZE: String = - GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + - ".customized.buffer.size" + CHConf.prefixOf("customized.buffer.size") private val GLUTEN_CLICKHOUSE_CUSTOMIZED_BUFFER_SIZE_DEFAULT = 4096 lazy val customizeBufferSize: Int = SparkEnv.get.conf.getInt( CHBackendSettings.GLUTEN_CLICKHOUSE_CUSTOMIZED_BUFFER_SIZE, @@ -97,8 +93,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { ) val GLUTEN_CLICKHOUSE_BROADCAST_CACHE_EXPIRED_TIME: String = - GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + - ".broadcast.cache.expired.time" + CHConf.prefixOf("broadcast.cache.expired.time") // unit: SECONDS, default 1 day val GLUTEN_CLICKHOUSE_BROADCAST_CACHE_EXPIRED_TIME_DEFAULT: Int = 86400 @@ -106,8 +101,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { // The algorithm for hash partition of the shuffle private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM: String = - GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + - ".shuffle.hash.algorithm" + CHConf.prefixOf("shuffle.hash.algorithm") // valid values are: cityHash64 or sparkMurmurHash3_32 private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT = "sparkMurmurHash3_32" def shuffleHashAlgorithm: String = { @@ -122,25 +116,19 @@ object CHBackendSettings extends BackendSettingsApi with Logging { } } - private val GLUTEN_CLICKHOUSE_AFFINITY_MODE: String = - GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + ".affinity.mode" + private val GLUTEN_CLICKHOUSE_AFFINITY_MODE: String = CHConf.prefixOf("affinity.mode") val SOFT: String = "soft" val FORCE: String = "force" private val GLUTEN_CLICKHOUSE_AFFINITY_MODE_DEFAULT = SOFT - private val GLUTEN_MAX_BLOCK_SIZE: String = - GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + - ".runtime_settings.max_block_size" + private val GLUTEN_MAX_BLOCK_SIZE: String = CHConf.runtimeSettings("max_block_size") // Same as default value in clickhouse private val GLUTEN_MAX_BLOCK_SIZE_DEFAULT = 65409 private val GLUTEN_MAX_SHUFFLE_READ_BYTES: String = - GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + - ".runtime_config.max_source_concatenate_bytes" + CHConf.runtimeConfig("max_source_concatenate_bytes") private val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT * 256 - val GLUTEN_AQE_PROPAGATEEMPTY: String = - GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + - ".aqe.propagate.empty.relation" + val GLUTEN_AQE_PROPAGATEEMPTY: String = CHConf.prefixOf("aqe.propagate.empty.relation") def affinityMode: String = { SparkEnv.get.conf @@ -368,7 +356,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { // Need to enable AQE def enableReorderHashJoinTables(): Boolean = { SparkEnv.get.conf.getBoolean( - "spark.gluten.sql.columnar.backend.ch.enable_reorder_hash_join_tables", + CHConf.prefixOf("enable_reorder_hash_join_tables"), defaultValue = true ) } @@ -376,7 +364,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging { // large then this threshold, reorder the tables. e.g. a/b > threshold or b/a > threshold def reorderHashJoinTablesThreshold(): Int = { SparkEnv.get.conf.getInt( - "spark.gluten.sql.columnar.backend.ch.reorder_hash_join_tables_thresdhold", + CHConf.prefixOf("reorder_hash_join_tables_thresdhold"), 10 ) } @@ -385,8 +373,8 @@ object CHBackendSettings extends BackendSettingsApi with Logging { // for example, select a, b, sum(c+d) from t group by a, b with cube def enablePushdownPreProjectionAheadExpand(): Boolean = { SparkEnv.get.conf.getBoolean( - "spark.gluten.sql.columnar.backend.ch.enable_pushdown_preprojection_ahead_expand", - true + CHConf.prefixOf("enable_pushdown_preprojection_ahead_expand"), + defaultValue = true ) } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConf.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConf.scala index e015d7c55ce4..16201dbad120 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConf.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConf.scala @@ -21,32 +21,35 @@ import org.apache.gluten.GlutenConfig import org.apache.spark.SparkConf object CHConf { - - private val CH = GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + "." - private val CH_SETTINGS = CH + "runtime_settings." - private val CH_CONFIG = CH + "runtime_config." + private[clickhouse] val BACKEND_NAME: String = "ch" + private[clickhouse] val CONF_PREFIX: String = GlutenConfig.prefixOf(BACKEND_NAME) + private val RUNTIME_SETTINGS: String = s"$CONF_PREFIX.runtime_settings" + private val RUNTIME_CONFIG = s"$CONF_PREFIX.runtime_config" implicit class GlutenCHConf(conf: SparkConf) { def setCHSettings(settings: (String, String)*): SparkConf = { - settings.foreach { case (k, v) => conf.set(settingsKey(k), v) } + settings.foreach { case (k, v) => conf.set(runtimeSettings(k), v) } conf } def setCHSettings[T](k: String, v: T): SparkConf = { - conf.set(settingsKey(k), v.toString) + conf.set(runtimeSettings(k), v.toString) conf } def setCHConfig(config: (String, String)*): SparkConf = { - config.foreach { case (k, v) => conf.set(configKey(k), v) } + config.foreach { case (k, v) => conf.set(runtimeConfig(k), v) } conf } def setCHConfig[T](k: String, v: T): SparkConf = { - conf.set(configKey(k), v.toString) + conf.set(runtimeConfig(k), v.toString) conf } } - def configKey(key: String): String = CH_CONFIG + key - def settingsKey(key: String): String = CH_SETTINGS + key + def prefixOf(key: String): String = s"$CONF_PREFIX.$key" + def runtimeConfig(key: String): String = s"$RUNTIME_CONFIG.$key" + def runtimeSettings(key: String): String = s"$RUNTIME_SETTINGS.$key" + + def startWithSettings(key: String): Boolean = key.startsWith(RUNTIME_SETTINGS) } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala index e0f266e904f6..58acda88fba5 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala @@ -87,8 +87,7 @@ class CHListenerApi extends ListenerApi with Logging { "local_engine.settings.log_processors_profiles" -> "true") // add memory limit for external sort - val externalSortKey = s"${CHBackend.CONF_PREFIX}.runtime_settings" + - s".max_bytes_before_external_sort" + val externalSortKey = CHConf.runtimeSettings("max_bytes_before_external_sort") if (conf.getLong(externalSortKey, -1) < 0) { if (conf.getBoolean("spark.memory.offHeap.enabled", defaultValue = false)) { val memSize = JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size")) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala index 9653256256bd..62fe3d9fa97a 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala @@ -87,32 +87,33 @@ class CHTransformerApi extends TransformerApi with Logging { override def postProcessNativeConfig( nativeConfMap: util.Map[String, String], backendPrefix: String): Unit = { - val settingPrefix = backendPrefix + ".runtime_settings." + + require(backendPrefix == CHConf.CONF_PREFIX) if (nativeConfMap.getOrDefault("spark.memory.offHeap.enabled", "false").toBoolean) { val offHeapSize = nativeConfMap.getOrDefault("spark.gluten.memory.offHeap.size.in.bytes", "0").toLong if (offHeapSize > 0) { // Only set default max_bytes_before_external_group_by for CH when it is not set explicitly. - val groupBySpillKey = settingPrefix + "max_bytes_before_external_group_by"; + val groupBySpillKey = CHConf.runtimeSettings("max_bytes_before_external_group_by") if (!nativeConfMap.containsKey(groupBySpillKey)) { val groupBySpillValue = offHeapSize * 0.5 nativeConfMap.put(groupBySpillKey, groupBySpillValue.toLong.toString) } - val maxMemoryUsageKey = settingPrefix + "max_memory_usage"; + val maxMemoryUsageKey = CHConf.runtimeSettings("max_memory_usage") if (!nativeConfMap.containsKey(maxMemoryUsageKey)) { val maxMemoryUsageValue = offHeapSize - nativeConfMap.put(maxMemoryUsageKey, maxMemoryUsageValue.toLong.toString) + nativeConfMap.put(maxMemoryUsageKey, maxMemoryUsageValue.toString) } // Only set default max_bytes_before_external_join for CH when join_algorithm is grace_hash - val joinAlgorithmKey = settingPrefix + "join_algorithm"; + val joinAlgorithmKey = CHConf.runtimeSettings("join_algorithm") if ( nativeConfMap.containsKey(joinAlgorithmKey) && nativeConfMap.get(joinAlgorithmKey) == "grace_hash" ) { - val joinSpillKey = settingPrefix + "max_bytes_in_join"; + val joinSpillKey = CHConf.runtimeSettings("max_bytes_in_join") if (!nativeConfMap.containsKey(joinSpillKey)) { val joinSpillValue = offHeapSize * 0.7 nativeConfMap.put(joinSpillKey, joinSpillValue.toLong.toString) @@ -127,24 +128,24 @@ class CHTransformerApi extends TransformerApi with Logging { } } - val hdfsConfigPrefix = backendPrefix + ".runtime_config.hdfs." - injectConfig("spark.hadoop.input.connect.timeout", hdfsConfigPrefix + "input_connect_timeout") - injectConfig("spark.hadoop.input.read.timeout", hdfsConfigPrefix + "input_read_timeout") - injectConfig("spark.hadoop.input.write.timeout", hdfsConfigPrefix + "input_write_timeout") + val hdfsConfigPrefix = CHConf.runtimeConfig("hdfs") + injectConfig("spark.hadoop.input.connect.timeout", s"$hdfsConfigPrefix.input_connect_timeout") + injectConfig("spark.hadoop.input.read.timeout", s"$hdfsConfigPrefix.input_read_timeout") + injectConfig("spark.hadoop.input.write.timeout", s"$hdfsConfigPrefix.input_write_timeout") injectConfig( "spark.hadoop.dfs.client.log.severity", - hdfsConfigPrefix + "dfs_client_log_severity") + s"$hdfsConfigPrefix.dfs_client_log_severity") // TODO: set default to true when metrics could be collected // while ch query plan optimization is enabled. - val planOptKey = settingPrefix + "query_plan_enable_optimizations" + val planOptKey = CHConf.runtimeSettings("query_plan_enable_optimizations") if (!nativeConfMap.containsKey(planOptKey)) { nativeConfMap.put(planOptKey, "false") } // Respect spark config spark.sql.orc.compression.codec for CH backend // TODO: consider compression or orc.compression in table options. - val orcCompressionKey = settingPrefix + "output_format_orc_compression_method" + val orcCompressionKey = CHConf.runtimeSettings("output_format_orc_compression_method") if (!nativeConfMap.containsKey(orcCompressionKey)) { if (nativeConfMap.containsKey("spark.sql.orc.compression.codec")) { val compression = nativeConfMap.get("spark.sql.orc.compression.codec").toLowerCase() diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/expression/CHExpressionTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/expression/CHExpressionTransformer.scala index f430b1141cf9..fe8d23f9a958 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/expression/CHExpressionTransformer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/expression/CHExpressionTransformer.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.expression -import org.apache.gluten.backendsapi.clickhouse.CHBackend +import org.apache.gluten.backendsapi.clickhouse.CHConf import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.expression.ConverterUtils.FunctionConfig import org.apache.gluten.substrait.expression._ @@ -70,7 +70,7 @@ case class CHTruncTimestampTransformer( if ( timeZoneIgnore && timeZoneId.nonEmpty && !timeZoneId.get.equalsIgnoreCase( - SQLConf.get.getConfString(s"${CHBackend.CONF_PREFIX}.runtime_config.timezone") + SQLConf.get.getConfString(s"${CHConf.runtimeConfig("timezone")}") ) ) { throw new GlutenNotSupportException( @@ -157,23 +157,23 @@ case class CHPosExplodeTransformer( // Output (pos, col) when input is array type val structType = StructType( Array( - StructField("pos", IntegerType, false), + StructField("pos", IntegerType, nullable = false), StructField("col", a.elementType, a.containsNull))) ExpressionBuilder.makeScalarFunction( funcId, Lists.newArrayList(childNode), - ConverterUtils.getTypeNode(structType, false)) + ConverterUtils.getTypeNode(structType, nullable = false)) case m: MapType => // Output (pos, key, value) when input is map type val structType = StructType( Array( - StructField("pos", IntegerType, false), - StructField("key", m.keyType, false), + StructField("pos", IntegerType, nullable = false), + StructField("key", m.keyType, nullable = false), StructField("value", m.valueType, m.valueContainsNull))) ExpressionBuilder.makeScalarFunction( funcId, Lists.newArrayList(childNode), - ConverterUtils.getTypeNode(structType, false)) + ConverterUtils.getTypeNode(structType, nullable = false)) case _ => throw new GlutenNotSupportException(s"posexplode($childType) not supported yet.") } @@ -225,7 +225,7 @@ case class GetArrayItemTransformer( Seq(IntegerType, getArrayItem.right.dataType), FunctionConfig.OPT) val addFunctionId = ExpressionBuilder.newScalarFunction(functionMap, addFunctionName) - val literalNode = ExpressionBuilder.makeLiteral(1.toInt, IntegerType, false) + val literalNode = ExpressionBuilder.makeLiteral(1, IntegerType, false) rightNode = ExpressionBuilder.makeScalarFunction( addFunctionId, Lists.newArrayList(literalNode, rightNode), diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala index ec7c686707aa..e300533bdb22 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution.datasources.utils -import org.apache.gluten.backendsapi.clickhouse.CHBackend +import org.apache.gluten.backendsapi.clickhouse.CHConf import org.apache.gluten.execution.{GlutenMergeTreePartition, MergeTreePartRange, MergeTreePartSplit} import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter} import org.apache.gluten.softaffinity.SoftAffinityManager @@ -171,9 +171,10 @@ object MergeTreePartsPartitionsUtil extends Logging { val ret = ClickhouseSnapshot.pathToAddMTPCache.getIfPresent(path) if (ret == null) { val keys = ClickhouseSnapshot.pathToAddMTPCache.asMap().keySet() - val keySample = keys.isEmpty match { - case true => "" - case false => keys.iterator().next() + val keySample = if (keys.isEmpty) { + "" + } else { + keys.iterator().next() } throw new IllegalStateException( "Can't find AddMergeTreeParts from cache pathToAddMTPCache for key: " + @@ -418,7 +419,7 @@ object MergeTreePartsPartitionsUtil extends Logging { bucketId => val currBucketParts: Seq[MergeTreePartRange] = prunedFilesGroupedToBuckets.getOrElse(bucketId, Seq.empty) - if (!currBucketParts.isEmpty) { + if (currBucketParts.nonEmpty) { val currentFiles = currBucketParts.map { part => MergeTreePartSplit( @@ -453,8 +454,7 @@ object MergeTreePartsPartitionsUtil extends Logging { } private def useDriverFilter(filterExprs: Seq[Expression], sparkSession: SparkSession): Boolean = { - val enableDriverFilterKey = s"${CHBackend.CONF_PREFIX}.runtime_settings" + - s".enabled_driver_filter_mergetree_index" + val enableDriverFilterKey = CHConf.runtimeSettings("enabled_driver_filter_mergetree_index") // When using soft affinity, disable driver filter filterExprs.nonEmpty && sparkSession.sessionState.conf.getConfString( diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala index 69fc3f3bcd6e..53ccb8d1c046 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.v2.clickhouse +import org.apache.gluten.backendsapi.clickhouse.CHConf + import org.apache.spark.sql.catalyst.catalog.BucketSpec import java.util @@ -34,12 +36,10 @@ object ClickHouseConfig { @deprecated // Whether to use MergeTree DataSource V2 API, default is false, fall back to V1. - val USE_DATASOURCE_V2 = "spark.gluten.sql.columnar.backend.ch.use.v2" + val USE_DATASOURCE_V2: String = CHConf.prefixOf("use.v2") val DEFAULT_USE_DATASOURCE_V2 = "false" - val CLICKHOUSE_WORKER_ID = "spark.gluten.sql.columnar.backend.ch.worker.id" - - val CLICKHOUSE_WAREHOUSE_DIR = "spark.gluten.sql.columnar.backend.ch.warehouse.dir" + val CLICKHOUSE_WORKER_ID: String = CHConf.prefixOf("worker.id") /** Create a mergetree configurations and returns the normalized key -> value map. */ def createMergeTreeConfigurations( diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/RunTPCHTest.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/RunTPCHTest.scala index 6dfab5bf5a91..4c774c9e037b 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/RunTPCHTest.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/RunTPCHTest.scala @@ -16,9 +16,11 @@ */ package org.apache.gluten +import org.apache.gluten.backendsapi.clickhouse.CHConf import org.apache.gluten.benchmarks.GenTPCHTableScripts import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig import org.apache.commons.io.FileUtils @@ -34,10 +36,10 @@ object RunTPCHTest { // parquet or mergetree val fileFormat = "parquet" val libPath = "/usr/local/clickhouse/lib/libch.so" - if (!(new File(libPath)).exists()) System.exit(1) + if (!new File(libPath).exists()) System.exit(1) // TPCH data files path val dataFilesPath = "/data/tpch-data/" + fileFormat - if (!(new File(dataFilesPath)).exists()) System.exit(1) + if (!new File(dataFilesPath).exists()) System.exit(1) // the time of execution val executedCnt = 5 // local thread count @@ -91,7 +93,7 @@ object RunTPCHTest { .config("spark.databricks.delta.properties.defaults.checkpointInterval", 5) .config("spark.databricks.delta.stalenessLimit", 3600 * 1000) .config("spark.gluten.sql.columnar.columnarToRow", columnarColumnToRow) - .config("spark.gluten.sql.columnar.backend.ch.worker.id", "1") + .config(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1") .config(GlutenConfig.GLUTEN_LIB_PATH, libPath) .config("spark.gluten.sql.columnar.iterator", "true") .config("spark.gluten.sql.columnar.hashagg.enablefinal", "true") @@ -99,7 +101,7 @@ object RunTPCHTest { .config("spark.sql.columnVector.offheap.enabled", "true") .config("spark.memory.offHeap.enabled", "true") .config("spark.memory.offHeap.size", offHeapSize) - .config("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error") + .config(CHConf.runtimeConfig("logger.level"), "error") .config("spark.sql.warehouse.dir", warehouse) .config( "javax.jdo.option.ConnectionURL", diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala index d025bd85a0cf..055b5c807d37 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarMemorySortShuffleSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.gluten.execution +import org.apache.gluten.backendsapi.clickhouse.CHConf + import org.apache.spark.SparkConf import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -35,7 +37,7 @@ class GlutenClickHouseColumnarMemorySortShuffleSuite .set("spark.sql.shuffle.partitions", "5") .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") - .set("spark.gluten.sql.columnar.backend.ch.forceMemorySortShuffle", "true") + .set(CHConf.prefixOf("forceMemorySortShuffle"), "true") // TODO: forceMemorySortShuffle } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala index 0ac6284991ae..b43fc2625f0b 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseColumnarShuffleAQESuite.scala @@ -16,6 +16,8 @@ */ package org.apache.gluten.execution +import org.apache.gluten.backendsapi.clickhouse.CHConf + import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.optimizer._ @@ -32,7 +34,6 @@ class GlutenClickHouseColumnarShuffleAQESuite override protected val tablesPath: String = basePath + "/tpch-data-ch" override protected val tpchQueries: String = rootPath + "queries/tpch-queries-ch" override protected val queriesResults: String = rootPath + "mergetree-queries-output" - private val backendConfigPrefix = "spark.gluten.sql.columnar.backend.ch." /** Run Gluten + ClickHouse Backend with ColumnarShuffleManager */ override protected def sparkConf: SparkConf = { @@ -53,13 +54,11 @@ class GlutenClickHouseColumnarShuffleAQESuite case csr: AQEShuffleReadExec => csr } assert(colCustomShuffleReaderExecs.size == 2) - val coalescedPartitionSpec0 = colCustomShuffleReaderExecs(0) - .partitionSpecs(0) + val coalescedPartitionSpec0 = colCustomShuffleReaderExecs.head.partitionSpecs.head .asInstanceOf[CoalescedPartitionSpec] assert(coalescedPartitionSpec0.startReducerIndex == 0) assert(coalescedPartitionSpec0.endReducerIndex == 5) - val coalescedPartitionSpec1 = colCustomShuffleReaderExecs(1) - .partitionSpecs(0) + val coalescedPartitionSpec1 = colCustomShuffleReaderExecs(1).partitionSpecs.head .asInstanceOf[CoalescedPartitionSpec] assert(coalescedPartitionSpec1.startReducerIndex == 0) assert(coalescedPartitionSpec1.endReducerIndex == 5) @@ -180,7 +179,7 @@ class GlutenClickHouseColumnarShuffleAQESuite test("GLUTEN-6768 rerorder hash join") { withSQLConf( - ("spark.gluten.sql.columnar.backend.ch.enable_reorder_hash_join_tables", "true"), + (CHConf.prefixOf("enable_reorder_hash_join_tables"), "true"), ("spark.sql.adaptive.enabled", "true")) { spark.sql("create table t1(a int, b int) using parquet") spark.sql("create table t2(a int, b int) using parquet") @@ -266,8 +265,8 @@ class GlutenClickHouseColumnarShuffleAQESuite test("GLUTEN-6768 change mixed join condition into multi join on clauses") { withSQLConf( - (backendConfigPrefix + "runtime_config.prefer_multi_join_on_clauses", "true"), - (backendConfigPrefix + "runtime_config.multi_join_on_clauses_build_side_row_limit", "1000000") + (CHConf.runtimeConfig("prefer_multi_join_on_clauses"), "true"), + (CHConf.runtimeConfig("multi_join_on_clauses_build_side_row_limit"), "1000000") ) { spark.sql("create table t1(a int, b int, c int, d int) using parquet") diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala index 5b499026f81c..bb99c6bd1ef0 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.gluten.execution +import org.apache.gluten.backendsapi.clickhouse.CHConf + import org.apache.spark.SparkConf import org.apache.spark.sql.{functions, DataFrame, Row} import org.apache.spark.sql.execution.LocalTableScanExec @@ -64,12 +66,12 @@ class GlutenClickHouseFileFormatSuite override protected def createTPCHNotNullTables(): Unit = {} override protected def sparkConf: SparkConf = { + import org.apache.gluten.backendsapi.clickhouse.CHConf._ + super.sparkConf .set("spark.sql.adaptive.enabled", "true") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_settings.date_time_input_format", - "best_effort_us") - .set("spark.gluten.sql.columnar.backend.ch.runtime_settings.use_excel_serialization", "true") + .setCHSettings("date_time_input_format", "best_effort_us") + .setCHSettings("use_excel_serialization", true) } // in this case, FakeRowAdaptor does R2C @@ -877,10 +879,7 @@ class GlutenClickHouseFileFormatSuite .toDF() .createTempView("no_quote_table") - withSQLConf(( - "spark.gluten.sql.columnar.backend.ch.runtime_settings.use_excel_serialization.quote_strict", - "true" - )) { + withSQLConf((CHConf.runtimeSettings("use_excel_serialization.quote_strict"), "true")) { compareResultsAgainstVanillaSpark( "select * from no_quote_table", compareResult = true, @@ -1186,11 +1185,7 @@ class GlutenClickHouseFileFormatSuite } test("issue-2881 null string test") { - withSQLConf( - ( - "spark.gluten.sql.columnar.backend.ch.runtime_settings." + - "use_excel_serialization.empty_as_null", - "true")) { + withSQLConf((CHConf.runtimeSettings("use_excel_serialization.empty_as_null"), "true")) { val file_path = csvDataPath + "/null_string.csv" val schema = StructType.apply( Seq( @@ -1223,11 +1218,7 @@ class GlutenClickHouseFileFormatSuite } test("issue-3542 null string test") { - withSQLConf( - ( - "spark.gluten.sql.columnar.backend.ch.runtime_settings." + - "use_excel_serialization.empty_as_null", - "false")) { + withSQLConf((CHConf.runtimeSettings("use_excel_serialization.empty_as_null"), "false")) { val file_path = csvDataPath + "/null_string.csv" val schema = StructType.apply( Seq( @@ -1312,11 +1303,7 @@ class GlutenClickHouseFileFormatSuite } test("issues-3609 int read test") { - withSQLConf( - ( - "spark.gluten.sql.columnar.backend.ch.runtime_settings." + - "use_excel_serialization.number_force", - "false")) { + withSQLConf((CHConf.runtimeSettings("use_excel_serialization.number_force"), "false")) { val csv_path = csvDataPath + "/int_special.csv" val options = new util.HashMap[String, String]() options.put("delimiter", ",") @@ -1345,11 +1332,7 @@ class GlutenClickHouseFileFormatSuite checkAnswer(df, expectedAnswer) } - withSQLConf( - ( - "spark.gluten.sql.columnar.backend.ch.runtime_settings." + - "use_excel_serialization.number_force", - "true")) { + withSQLConf((CHConf.runtimeSettings("use_excel_serialization.number_force"), "true")) { val csv_path = csvDataPath + "/int_special.csv" val options = new util.HashMap[String, String]() options.put("delimiter", ",") diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseJoinSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseJoinSuite.scala index 75c4372a04d9..97ba1fe4214c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseJoinSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseJoinSuite.scala @@ -17,9 +17,11 @@ package org.apache.gluten.execution import org.apache.gluten.GlutenConfig +import org.apache.gluten.backendsapi.clickhouse.CHConf import org.apache.gluten.utils.UTSystemParameters import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig class GlutenClickHouseJoinSuite extends GlutenClickHouseWholeStageTransformerSuite { @@ -28,7 +30,7 @@ class GlutenClickHouseJoinSuite extends GlutenClickHouseWholeStageTransformerSui rootPath + "../../../../gluten-core/src/test/resources/tpch-queries" protected val queriesResults: String = rootPath + "queries-output" - private val joinAlgorithm = "spark.gluten.sql.columnar.backend.ch.runtime_settings.join_algorithm" + private val joinAlgorithm = CHConf.runtimeSettings("join_algorithm") override protected def sparkConf: SparkConf = { super.sparkConf @@ -38,7 +40,7 @@ class GlutenClickHouseJoinSuite extends GlutenClickHouseWholeStageTransformerSui .set("spark.sql.adaptive.enabled", "false") .set("spark.sql.files.minPartitionNum", "1") .set("spark.gluten.sql.columnar.columnartorow", "true") - .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1") + .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1") .set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath) .set("spark.gluten.sql.columnar.iterator", "true") .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true") diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseS3SourceSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseS3SourceSuite.scala index eafc0ba0abbb..aa5123440d3f 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseS3SourceSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseS3SourceSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.gluten.execution +import org.apache.gluten.backendsapi.clickhouse.CHConf + import org.apache.spark.SparkConf // Some sqls' line length exceeds 100 @@ -93,14 +95,13 @@ class GlutenClickHouseS3SourceSuite extends GlutenClickHouseTPCHAbstractSuite { println(s"currTime=$currTime") // scalastyle:on println spark.sparkContext.setLocalProperty( - "spark.gluten.sql.columnar.backend.ch." + - "runtime_settings.spark.kylin.local-cache.accept-cache-time", + CHConf.runtimeSettings("spark.kylin.local-cache.accept-cache-time"), currTime.toString) spark .sql(""" |select * from supplier_s3 |""".stripMargin) - .show(10, false) + .show(10, truncate = false) Thread.sleep(5000) @@ -108,14 +109,13 @@ class GlutenClickHouseS3SourceSuite extends GlutenClickHouseTPCHAbstractSuite { println(s"currTime=$currTime") // scalastyle:on println spark.sparkContext.setLocalProperty( - "spark.gluten.sql.columnar.backend.ch." + - "runtime_settings.spark.kylin.local-cache.accept-cache-time", + CHConf.runtimeSettings("spark.kylin.local-cache.accept-cache-time"), currTime.toString) spark .sql(""" |select * from supplier_s3 |""".stripMargin) - .show(10, false) + .show(10, truncate = false) Thread.sleep(5000) currTime = System.currentTimeMillis() @@ -123,14 +123,13 @@ class GlutenClickHouseS3SourceSuite extends GlutenClickHouseTPCHAbstractSuite { println(s"currTime=$currTime") // scalastyle:on println spark.sparkContext.setLocalProperty( - "spark.gluten.sql.columnar.backend.ch." + - "runtime_settings.spark.kylin.local-cache.accept-cache-time", + CHConf.runtimeSettings("spark.kylin.local-cache.accept-cache-time"), currTime.toString) spark .sql(""" |select * from supplier_s3 |""".stripMargin) - .show(10, false) + .show(10, truncate = false) } } // scalastyle:on line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseSyntheticDataSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseSyntheticDataSuite.scala index 8db4e3b10fe2..bd739a291e51 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseSyntheticDataSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseSyntheticDataSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig import java.time.LocalDate @@ -51,7 +52,7 @@ class GlutenClickHouseSyntheticDataSuite .set("spark.databricks.delta.properties.defaults.checkpointInterval", "5") .set("spark.databricks.delta.stalenessLimit", "3600000") .set("spark.gluten.sql.columnar.columnarToRow", "true") - .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1") + .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1") .set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath) .set("spark.gluten.sql.columnar.iterator", "true") .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true") @@ -136,7 +137,7 @@ class GlutenClickHouseSyntheticDataSuite prepareTables() var sqlStr: String = null - var expected: Seq[Row] = null; + var expected: Seq[Row] = null withSQLConf(vanillaSparkConfs(): _*) { val supportedAggs = "count" :: "avg" :: "sum" :: "min" :: "max" :: Nil val selected = supportedAggs @@ -173,7 +174,7 @@ class GlutenClickHouseSyntheticDataSuite test("test data function in https://github.com/Kyligence/ClickHouse/issues/88") { var sqlStr: String = null - var expected: Seq[Row] = null; + var expected: Seq[Row] = null val x = spark import x.implicits._ @@ -205,7 +206,7 @@ class GlutenClickHouseSyntheticDataSuite test("sql on Seq based(row based) DataFrame") { var sqlStr: String = null - var expected: Seq[Row] = null; + var expected: Seq[Row] = null val x = spark import x.implicits._ diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala index abb7d27ffe92..f1d790322fe6 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog} +import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig import org.apache.spark.sql.types.{StructField, StructType} import org.apache.commons.io.FileUtils @@ -131,7 +132,7 @@ abstract class GlutenClickHouseTPCDSAbstractSuite .set("spark.databricks.delta.properties.defaults.checkpointInterval", "5") .set("spark.databricks.delta.stalenessLimit", "3600000") .set("spark.gluten.sql.columnar.columnarToRow", "true") - .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1") + .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1") .set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath) .set("spark.gluten.sql.columnar.iterator", "true") .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true") diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala index 8d671e29f18b..7f15edf73b85 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.DataFrame import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog} +import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig import org.apache.commons.io.FileUtils import org.scalatest.time.SpanSugar.convertIntToGrainOfTime @@ -567,7 +568,7 @@ abstract class GlutenClickHouseTPCHAbstractSuite .set("spark.databricks.delta.stalenessLimit", "3600000") .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .set("spark.gluten.sql.columnar.columnarToRow", "true") - .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1") + .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1") .set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath) .set("spark.gluten.sql.columnar.iterator", "true") .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true") @@ -588,7 +589,7 @@ abstract class GlutenClickHouseTPCHAbstractSuite assert(CHBroadcastBuildSideCache.size() <= 10) } - ClickhouseSnapshot.clearAllFileStatusCache + ClickhouseSnapshot.clearAllFileStatusCache() DeltaLog.clearCache() super.afterAll() // init GlutenConfig in the next beforeAll diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala index 52cdaf0592ad..594d88fb6402 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala @@ -32,9 +32,6 @@ class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbst .set("spark.io.compression.codec", "SNAPPY") .set("spark.sql.shuffle.partitions", "5") .set("spark.sql.autoBroadcastJoinThreshold", "10MB") -// .set("spark.ui.enabled", "true") -// .set("spark.gluten.sql.columnar.backend.ch.runtime_config.dump_pipeline", "true") -// .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "debug") } test("test simple minmax index") { @@ -81,7 +78,7 @@ class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbst case f: FileSourceScanExecTransformer => f } assert(scanExec.size == 1) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head val ret = df.collect() assert(ret.apply(0).get(0) == 1) val marks = mergetreeScan.metrics("selectedMarks").value @@ -139,7 +136,7 @@ class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbst case f: FileSourceScanExecTransformer => f } assert(scanExec.size == 1) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head val ret = df.collect() assert(ret.apply(0).get(0) == 2) val marks = mergetreeScan.metrics("selectedMarks").value @@ -197,7 +194,7 @@ class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbst case f: FileSourceScanExecTransformer => f } assert(scanExec.size == 1) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head val ret = df.collect() assert(ret.apply(0).get(0) == 2) val marks = mergetreeScan.metrics("selectedMarks").value @@ -255,7 +252,7 @@ class GlutenClickHouseTPCHNotNullSkipIndexSuite extends GlutenClickHouseTPCHAbst case f: FileSourceScanExecTransformer => f } assert(scanExec.size == 1) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head val ret = df.collect() assert(ret.apply(0).get(0) == 1) val marks = mergetreeScan.metrics("selectedMarks").value diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala index 3d44f500a879..632cffff7cf6 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala @@ -33,9 +33,6 @@ class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbs .set("spark.io.compression.codec", "SNAPPY") .set("spark.sql.shuffle.partitions", "5") .set("spark.sql.autoBroadcastJoinThreshold", "10MB") -// .set("spark.ui.enabled", "true") -// .set("spark.gluten.sql.columnar.backend.ch.runtime_config.dump_pipeline", "true") -// .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "debug") } test("test simple minmax index") { @@ -82,7 +79,7 @@ class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbs case f: FileSourceScanExecTransformer => f } assert(scanExec.size == 1) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head val ret = df.collect() assert(ret.apply(0).get(0) == 1) val marks = mergetreeScan.metrics("selectedMarks").value @@ -140,7 +137,7 @@ class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbs case f: FileSourceScanExecTransformer => f } assert(scanExec.size == 1) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head val ret = df.collect() assert(ret.apply(0).get(0) == 2) val marks = mergetreeScan.metrics("selectedMarks").value @@ -198,7 +195,7 @@ class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbs case f: FileSourceScanExecTransformer => f } assert(scanExec.size == 1) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head val ret = df.collect() assert(ret.apply(0).get(0) == 2) val marks = mergetreeScan.metrics("selectedMarks").value @@ -255,7 +252,7 @@ class GlutenClickHouseTPCHNullableSkipIndexSuite extends GlutenClickHouseTPCHAbs case f: FileSourceScanExecTransformer => f } assert(scanExec.size == 1) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head val ret = df.collect() assert(ret.apply(0).get(0) == 1) val marks = mergetreeScan.metrics("selectedMarks").value diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala index f5e020af4363..b268eb7192d3 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala @@ -22,6 +22,7 @@ import org.apache.gluten.utils.UTSystemParameters import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, Row, TestUtils} import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, NullPropagation} +import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -54,7 +55,7 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS .set("spark.databricks.delta.properties.defaults.checkpointInterval", "5") .set("spark.databricks.delta.stalenessLimit", "3600000") .set("spark.gluten.sql.columnar.columnartorow", "true") - .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1") + .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1") .set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath) .set("spark.gluten.sql.columnar.iterator", "true") .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true") @@ -75,9 +76,9 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS val schema = StructType( Array( - StructField("double_field1", DoubleType, true), - StructField("int_field1", IntegerType, true), - StructField("string_field1", StringType, true) + StructField("double_field1", DoubleType, nullable = true), + StructField("int_field1", IntegerType, nullable = true), + StructField("string_field1", StringType, nullable = true) )) val data = sparkContext.parallelize( Seq( @@ -103,9 +104,9 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS val dateSchema = StructType( Array( - StructField("ts", IntegerType, true), - StructField("day", DateType, true), - StructField("weekday_abbr", StringType, true) + StructField("ts", IntegerType, nullable = true), + StructField("day", DateType, nullable = true), + StructField("weekday_abbr", StringType, nullable = true) ) ) val dateRows = sparkContext.parallelize( @@ -142,7 +143,7 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS val str2MapFilePath = str2Mapfile.getAbsolutePath val str2MapSchema = StructType( Array( - StructField("str", StringType, true) + StructField("str", StringType, nullable = true) )) val str2MapData = sparkContext.parallelize( Seq( @@ -165,12 +166,12 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS .parquet(str2MapFilePath) spark.catalog.createTable("str2map_table", str2MapFilePath, fileFormat) - val urlFile = Files.createTempFile("", ".parquet").toFile() + val urlFile = Files.createTempFile("", ".parquet").toFile urlFile.deleteOnExit() val urlFilePath = urlFile.getAbsolutePath val urlTalbeSchema = StructType( Array( - StructField("url", StringType, true) + StructField("url", StringType, nullable = true) ) ) val urlTableData = sparkContext.parallelize( @@ -504,7 +505,7 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS def checkResult(df: DataFrame, exceptedResult: Seq[Row]): Unit = { // check the result val result = df.collect() - assert(result.size == exceptedResult.size) + assert(result.length === exceptedResult.size) TestUtils.compareAnswers(result, exceptedResult) } @@ -723,10 +724,10 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS val transform_sql = "select transform(ids, x -> x + 1) from tb_array" runQueryAndCompare(transform_sql)(checkGlutenOperatorMatch[ProjectExecTransformer]) - val filter_sql = "select filter(ids, x -> x % 2 == 1) from tb_array"; + val filter_sql = "select filter(ids, x -> x % 2 == 1) from tb_array" runQueryAndCompare(filter_sql)(checkGlutenOperatorMatch[ProjectExecTransformer]) - val aggregate_sql = "select ids, aggregate(ids, 3, (acc, x) -> acc + x) from tb_array"; + val aggregate_sql = "select ids, aggregate(ids, 3, (acc, x) -> acc + x) from tb_array" runQueryAndCompare(aggregate_sql)(checkGlutenOperatorMatch[ProjectExecTransformer]) } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala index 9c3dbcac3245..f16c897671b1 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala @@ -21,6 +21,7 @@ import org.apache.gluten.execution.GlutenClickHouseTPCHAbstractSuite import org.apache.gluten.utils.UTSystemParameters import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite { override protected val needCopyParquetToTablePath = true @@ -50,7 +51,7 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite { .set("spark.databricks.delta.properties.defaults.checkpointInterval", "5") .set("spark.databricks.delta.stalenessLimit", "3600000") .set("spark.gluten.sql.columnar.columnartorow", "true") - .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1") + .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1") .set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath) .set("spark.gluten.sql.columnar.iterator", "true") .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true") diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala index 86a01ce99718..4e2b5ad63e0a 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SaveMode} import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig import org.apache.spark.sql.hive.HiveTableScanExecTransformer import org.apache.spark.sql.internal.SQLConf @@ -54,7 +55,7 @@ class GlutenClickHouseHiveTableSuite .set("spark.sql.adaptive.enabled", "false") .set("spark.sql.files.minPartitionNum", "1") .set("spark.gluten.sql.columnar.columnartorow", "true") - .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1") + .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1") .set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath) .set("spark.gluten.sql.columnar.iterator", "true") .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true") @@ -1352,7 +1353,7 @@ class GlutenClickHouseHiveTableSuite sql(insertSql) val selectSql = s"SELECT * FROM $tableName" - compareResultsAgainstVanillaSpark(selectSql, true, _ => {}) + compareResultsAgainstVanillaSpark(selectSql, compareResult = true, _ => {}) sql(s"drop table if exists $tableName") } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala index 9e3fa00787de..53aef16d143e 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.SparkConf import org.apache.spark.gluten.NativeWriteChecker import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig import org.apache.spark.sql.types._ import scala.reflect.runtime.universe.TypeTag @@ -36,6 +37,8 @@ class GlutenClickHouseNativeWriteTableSuite with NativeWriteChecker { override protected def sparkConf: SparkConf = { + import org.apache.gluten.backendsapi.clickhouse.CHConf._ + var sessionTimeZone = "GMT" if (isSparkVersionGE("3.5")) { sessionTimeZone = java.util.TimeZone.getDefault.getID @@ -55,7 +58,7 @@ class GlutenClickHouseNativeWriteTableSuite .set("spark.databricks.delta.properties.defaults.checkpointInterval", "5") .set("spark.databricks.delta.stalenessLimit", "3600000") .set("spark.gluten.sql.columnar.columnartorow", "true") - .set("spark.gluten.sql.columnar.backend.ch.worker.id", "1") + .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1") .set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath) .set("spark.gluten.sql.columnar.iterator", "true") .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true") @@ -64,7 +67,7 @@ class GlutenClickHouseNativeWriteTableSuite .set("spark.sql.storeAssignmentPolicy", "legacy") .set("spark.sql.warehouse.dir", getWarehouseDir) .set("spark.sql.session.timeZone", sessionTimeZone) - .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error") + .setCHConfig("logger.level", "error") .setMaster("local[1]") } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala index d359428d03ca..503c9bd886ed 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseTableAfterRestart.scala @@ -46,27 +46,21 @@ class GlutenClickHouseTableAfterRestart /** Run Gluten + ClickHouse Backend with SortShuffleManager */ override protected def sparkConf: SparkConf = { + import org.apache.gluten.backendsapi.clickhouse.CHConf._ + super.sparkConf .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") .set("spark.io.compression.codec", "LZ4") .set("spark.sql.shuffle.partitions", "5") .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") - .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.user_defined_path", - "/tmp/user_defined") + .setCHConfig("logger.level", "error") + .setCHConfig("user_defined_path", "/tmp/user_defined") .set("spark.sql.files.maxPartitionBytes", "20000000") .set("spark.ui.enabled", "true") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows", - "100000") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", - "false") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size", - "8192") + .setCHSettings("min_insert_block_size_rows", 100000) + .setCHSettings("mergetree.merge_after_insert", false) + .setCHSettings("input_format_parquet_max_block_size", 8192) .setMaster("local[2]") } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala index 717454a23f37..9effd64a277f 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeOptimizeSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.gluten.execution.mergetree +import org.apache.gluten.backendsapi.clickhouse.CHConf import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite} import org.apache.spark.SparkConf @@ -472,7 +473,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite test("test mergetree insert with optimize basic") { withSQLConf( "spark.databricks.delta.optimize.minFileSize" -> "200000000", - "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true" + CHConf.runtimeSettings("mergetree.merge_after_insert") -> "true" ) { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_insert_optimize_basic; diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala index 689d7ccad0cb..83990d68ad47 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala @@ -623,9 +623,9 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite withSQLConf( "spark.databricks.delta.optimize.minFileSize" -> "200000000", - CHConf.settingsKey("mergetree.merge_after_insert") -> "true", - CHConf.settingsKey("mergetree.insert_without_local_storage") -> "true", - CHConf.settingsKey("min_insert_block_size_rows") -> "10000" + CHConf.runtimeSettings("mergetree.merge_after_insert") -> "true", + CHConf.runtimeSettings("mergetree.insert_without_local_storage") -> "true", + CHConf.runtimeSettings("min_insert_block_size_rows") -> "10000" ) { spark.sql(s""" |DROP TABLE IF EXISTS $tableName; diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala index 6a2a3055767e..3830f8775aaa 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala @@ -623,9 +623,9 @@ class GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite withSQLConf( "spark.databricks.delta.optimize.minFileSize" -> "200000000", - CHConf.settingsKey("mergetree.merge_after_insert") -> "true", - CHConf.settingsKey("mergetree.insert_without_local_storage") -> "true", - CHConf.settingsKey("min_insert_block_size_rows") -> "10000" + CHConf.runtimeSettings("mergetree.merge_after_insert") -> "true", + CHConf.runtimeSettings("mergetree.insert_without_local_storage") -> "true", + CHConf.runtimeSettings("min_insert_block_size_rows") -> "10000" ) { spark.sql(s""" |DROP TABLE IF EXISTS $tableName; diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index 9784b528c989..13d3a53d3031 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -684,8 +684,8 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite withSQLConf( "spark.databricks.delta.optimize.minFileSize" -> "200000000", - settingsKey("mergetree.insert_without_local_storage") -> "true", - settingsKey("mergetree.merge_after_insert") -> "true" + runtimeSettings("mergetree.insert_without_local_storage") -> "true", + runtimeSettings("mergetree.merge_after_insert") -> "true" ) { spark.sql(s""" |DROP TABLE IF EXISTS $tableName; @@ -757,7 +757,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite | AND l_quantity < 24 |""".stripMargin - withSQLConf(settingsKey("enabled_driver_filter_mergetree_index") -> "true") { + withSQLConf(runtimeSettings("enabled_driver_filter_mergetree_index") -> "true") { runTPCHQueryBySQL(6, sqlStr) { df => val scanExec = collect(df.queryExecution.executedPlan) { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala index ed7f9b0c87d6..648198590b2c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala @@ -1783,7 +1783,7 @@ class GlutenClickHouseMergeTreeWriteSuite Seq(("true", 2), ("false", 3)).foreach( conf => { - withSQLConf(CHConf.settingsKey("enabled_driver_filter_mergetree_index") -> conf._1) { + withSQLConf(CHConf.runtimeSettings("enabled_driver_filter_mergetree_index") -> conf._1) { runTPCHQueryBySQL(6, sqlStr) { df => val scanExec = collect(df.queryExecution.executedPlan) { @@ -1900,7 +1900,7 @@ class GlutenClickHouseMergeTreeWriteSuite Seq(("true", 2), ("false", 2)).foreach( conf => { - withSQLConf(CHConf.settingsKey("enabled_driver_filter_mergetree_index") -> conf._1) { + withSQLConf(CHConf.runtimeSettings("enabled_driver_filter_mergetree_index") -> conf._1) { runTPCHQueryBySQL(12, sqlStr) { df => val scanExec = collect(df.queryExecution.executedPlan) { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala index 25b4987bd7ab..5f03a0d39f50 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.execution.mergetree -import org.apache.gluten.backendsapi.clickhouse.CHBackend +import org.apache.gluten.backendsapi.clickhouse.CHConf import org.apache.gluten.execution.GlutenClickHouseTPCHAbstractSuite import org.apache.spark.SparkConf @@ -50,8 +50,7 @@ class GlutenClickHouseMergeTreeWriteTaskNotSerializableSuite test("GLUTEN-6470: Fix Task not serializable error when inserting mergetree data") { - val externalSortKey = s"${CHBackend.CONF_PREFIX}.runtime_settings" + - s".max_bytes_before_external_sort" + val externalSortKey = CHConf.runtimeSettings("max_bytes_before_external_sort") assertResult(3435973836L)(spark.conf.get(externalSortKey).toLong) spark.sql(s""" diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala index a7ec27d82f08..24b50dfebe7c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala @@ -41,18 +41,16 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite // scalastyle:off line.size.limit /** Run Gluten + ClickHouse Backend with SortShuffleManager */ override protected def sparkConf: SparkConf = { + import org.apache.gluten.backendsapi.clickhouse.CHConf._ + super.sparkConf .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") .set("spark.io.compression.codec", "LZ4") .set("spark.sql.shuffle.partitions", "1") .set("spark.sql.autoBroadcastJoinThreshold", "10MB") - // .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "DEBUG") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size", - s"$parquetMaxBlockSize") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.enable_streaming_aggregating", - "true") + .setCHConfig("logger.level", "error") + .setCHSettings("input_format_parquet_max_block_size", parquetMaxBlockSize) + .setCHConfig("enable_streaming_aggregating", true) } // scalastyle:on line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala index f41f330b2154..614780dbbded 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala @@ -45,7 +45,7 @@ class GlutenClickHouseHDFSSuite .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") .setCHConfig("use_local_format", true) - .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32") + .set(prefixOf("shuffle.hash.algorithm"), "sparkMurmurHash3_32") .setCHConfig("gluten_cache.local.enabled", "true") .setCHConfig("gluten_cache.local.name", cache_name) .setCHConfig("gluten_cache.local.path", hdfsCachePath) @@ -53,7 +53,8 @@ class GlutenClickHouseHDFSSuite .setCHConfig("reuse_disk_cache", "false") .set("spark.sql.adaptive.enabled", "false") - // TODO: spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm + // TODO: spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm => + // CHConf.prefixOf("shuffle.hash.algorithm") } override protected def createTPCHNotNullTables(): Unit = { @@ -126,7 +127,7 @@ class GlutenClickHouseHDFSSuite ignore("test no cache by query") { withSQLConf( - settingsKey("read_from_filesystem_cache_if_exists_otherwise_bypass_cache") -> "true") { + runtimeSettings("read_from_filesystem_cache_if_exists_otherwise_bypass_cache") -> "true") { runWithoutCache() } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala index 28b874e21bac..1ca9ab7f45b2 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala @@ -38,6 +38,8 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite /** Run Gluten + ClickHouse Backend with SortShuffleManager */ override protected def sparkConf: SparkConf = { + import org.apache.gluten.backendsapi.clickhouse.CHConf._ + super.sparkConf .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") .set("spark.io.compression.codec", "LZ4") @@ -45,9 +47,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32") - .set( - "spark.gluten.sql.columnar.backend.ch.runtime_config.enable_streaming_aggregating", - "true") + .setCHConfig("enable_streaming_aggregating", true) } override protected def createTPCHNotNullTables(): Unit = { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala index 0536d9136ec8..679d2ddd85c4 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala @@ -17,6 +17,7 @@ package org.apache.gluten.execution.tpch import org.apache.gluten.GlutenConfig +import org.apache.gluten.backendsapi.clickhouse.CHConf import org.apache.gluten.execution._ import org.apache.gluten.extension.GlutenPlan @@ -41,9 +42,6 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr rootPath + "../../../../gluten-core/src/test/resources/tpch-queries" override protected val queriesResults: String = rootPath + "queries-output" - protected val BACKEND_CONF_KEY = "spark.gluten.sql.columnar.backend.ch." - protected val BACKEND_RUNTIME_CINF_KEY: String = BACKEND_CONF_KEY + "runtime_config." - override protected def sparkConf: SparkConf = { super.sparkConf .set("spark.shuffle.manager", "sort") @@ -1419,8 +1417,8 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr queriesResults: String = queriesResults, compareResult: Boolean = true, noFallBack: Boolean = true)(customCheck: DataFrame => Unit): Unit = { - val confName = "spark.gluten.sql.columnar.backend.ch." + - "runtime_settings.query_plan_enable_optimizations" + val confName = CHConf.runtimeSettings("query_plan_enable_optimizations") + withSQLConf((confName, "true")) { compareTPCHQueryAgainstVanillaSpark(queryNum, tpchQueries, customCheck, noFallBack) } @@ -2549,9 +2547,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr test("GLUTEN-4521: Invalid result from grace mergeing aggregation with spill") { withSQLConf( - ( - BACKEND_RUNTIME_CINF_KEY + "max_allowed_memory_usage_ratio_for_aggregate_merging", - "0.0001")) { + (CHConf.runtimeConfig("max_allowed_memory_usage_ratio_for_aggregate_merging"), "0.0001")) { val sql = """ |select count(l_orderkey, l_partkey) from ( @@ -2840,7 +2836,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr |""".stripMargin compareResultsAgainstVanillaSpark( sql, - true, + compareResult = true, df => { checkBHJWithIsNullAwareAntiJoin(df) }) @@ -2879,7 +2875,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr |""".stripMargin compareResultsAgainstVanillaSpark( sql1, - true, + compareResult = true, df => { checkBHJWithIsNullAwareAntiJoin(df) }) @@ -2892,7 +2888,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr |""".stripMargin compareResultsAgainstVanillaSpark( sql2, - true, + compareResult = true, df => { checkBHJWithIsNullAwareAntiJoin(df) }) @@ -2905,7 +2901,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr |""".stripMargin compareResultsAgainstVanillaSpark( sql3, - true, + compareResult = true, df => { checkBHJWithIsNullAwareAntiJoin(df) }) @@ -2918,7 +2914,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr |""".stripMargin compareResultsAgainstVanillaSpark( sql4, - true, + compareResult = true, df => { checkBHJWithIsNullAwareAntiJoin(df) }) @@ -2931,7 +2927,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr |""".stripMargin compareResultsAgainstVanillaSpark( sql5, - true, + compareResult = true, df => { checkBHJWithIsNullAwareAntiJoin(df) }) @@ -2952,7 +2948,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr |""".stripMargin compareResultsAgainstVanillaSpark( sql6, - true, + compareResult = true, df => { checkAQEBHJWithIsNullAwareAntiJoin(df, 0) }) @@ -2965,7 +2961,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr |""".stripMargin compareResultsAgainstVanillaSpark( sql7, - true, + compareResult = true, df => { checkAQEBHJWithIsNullAwareAntiJoin(df, 0) }) @@ -2978,7 +2974,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr |""".stripMargin compareResultsAgainstVanillaSpark( sql8, - true, + compareResult = true, df => { checkAQEBHJWithIsNullAwareAntiJoin(df) }) diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHOptimizeRuleBenchmark.scala b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHOptimizeRuleBenchmark.scala index 8d6d749fd650..a6bf4a9c994a 100644 --- a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHOptimizeRuleBenchmark.scala +++ b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHOptimizeRuleBenchmark.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.benchmarks +import org.apache.gluten.backendsapi.clickhouse.CHConf + import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark @@ -60,7 +62,7 @@ object CHOptimizeRuleBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmar } def testToDateOptimize(parquetDir: String, enable: String): Unit = { - withSQLConf(("spark.gluten.sql.columnar.backend.ch.rewrite.dateConversion", enable)) { + withSQLConf((CHConf.prefixOf("rewrite.dateConversion"), enable)) { spark .sql(s""" |select