Skip to content

Commit

Permalink
update per #7265
Browse files Browse the repository at this point in the history
 - Use CHConf
 - use CHConf.prefixOf() instead of "spark.gluten.sql.columnar.backend.ch."
 - settingsKey => runtimeSettings
 - configKey => runtimeConfig
 - CH => CONF_PREFIX
  • Loading branch information
baibaichen committed Sep 19, 2024
1 parent d81d354 commit 1a43af5
Show file tree
Hide file tree
Showing 37 changed files with 198 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.delta

import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.execution.ColumnarToRowExecBase

import org.apache.spark.SparkException
Expand Down Expand Up @@ -128,7 +128,7 @@ class ClickhouseOptimisticTransaction(
spark.conf.getAll.foreach(
entry => {
if (
entry._1.startsWith(s"${CHBackend.CONF_PREFIX}.runtime_settings")
CHConf.startWithSettings(entry._1)
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
) {
options += (entry._1 -> entry._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.delta

import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.execution.ColumnarToRowExecBase

import org.apache.spark.SparkException
Expand Down Expand Up @@ -128,7 +128,7 @@ class ClickhouseOptimisticTransaction(
spark.conf.getAll.foreach(
entry => {
if (
entry._1.startsWith(s"${CHBackend.CONF_PREFIX}.runtime_settings")
CHConf.startWithSettings(entry._1)
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
) {
options += (entry._1 -> entry._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.delta

import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.execution.ColumnarToRowExecBase

import org.apache.spark.SparkException
Expand Down Expand Up @@ -140,7 +140,7 @@ class ClickhouseOptimisticTransaction(
spark.conf.getAll.foreach(
entry => {
if (
entry._1.startsWith(s"${CHBackend.CONF_PREFIX}.runtime_settings")
CHConf.startWithSettings(entry._1)
|| entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
) {
options += (entry._1 -> entry._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ class CHBackend extends SubstraitBackend {
}

object CHBackend {
val BACKEND_NAME = "ch"
val CONF_PREFIX: String = GlutenConfig.prefixOf(BACKEND_NAME)
val BACKEND_NAME: String = CHConf.BACKEND_NAME
}

object CHBackendSettings extends BackendSettingsApi with Logging {
Expand All @@ -74,40 +73,35 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
// experimental: when the files count per partition exceeds this threshold,
// it will put the files into one partition.
val GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".files.per.partition.threshold"
CHConf.prefixOf("files.per.partition.threshold")
val GLUTEN_CLICKHOUSE_FILES_PER_PARTITION_THRESHOLD_DEFAULT = "-1"

private val GLUTEN_CLICKHOUSE_CUSTOMIZED_SHUFFLE_CODEC_ENABLE: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".customized.shuffle.codec.enable"
CHConf.prefixOf("customized.shuffle.codec.enable")
private val GLUTEN_CLICKHOUSE_CUSTOMIZED_SHUFFLE_CODEC_ENABLE_DEFAULT = false
lazy val useCustomizedShuffleCodec: Boolean = SparkEnv.get.conf.getBoolean(
CHBackendSettings.GLUTEN_CLICKHOUSE_CUSTOMIZED_SHUFFLE_CODEC_ENABLE,
CHBackendSettings.GLUTEN_CLICKHOUSE_CUSTOMIZED_SHUFFLE_CODEC_ENABLE_DEFAULT
)

private val GLUTEN_CLICKHOUSE_CUSTOMIZED_BUFFER_SIZE: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".customized.buffer.size"
CHConf.prefixOf("customized.buffer.size")
private val GLUTEN_CLICKHOUSE_CUSTOMIZED_BUFFER_SIZE_DEFAULT = 4096
lazy val customizeBufferSize: Int = SparkEnv.get.conf.getInt(
CHBackendSettings.GLUTEN_CLICKHOUSE_CUSTOMIZED_BUFFER_SIZE,
CHBackendSettings.GLUTEN_CLICKHOUSE_CUSTOMIZED_BUFFER_SIZE_DEFAULT
)

val GLUTEN_CLICKHOUSE_BROADCAST_CACHE_EXPIRED_TIME: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".broadcast.cache.expired.time"
CHConf.prefixOf("broadcast.cache.expired.time")
// unit: SECONDS, default 1 day
val GLUTEN_CLICKHOUSE_BROADCAST_CACHE_EXPIRED_TIME_DEFAULT: Int = 86400

private val GLUTEN_CLICKHOUSE_SHUFFLE_SUPPORTED_CODEC: Set[String] = Set("lz4", "zstd", "snappy")

// The algorithm for hash partition of the shuffle
private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".shuffle.hash.algorithm"
CHConf.prefixOf("shuffle.hash.algorithm")
// valid values are: cityHash64 or sparkMurmurHash3_32
private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT = "sparkMurmurHash3_32"
def shuffleHashAlgorithm: String = {
Expand All @@ -122,25 +116,19 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

private val GLUTEN_CLICKHOUSE_AFFINITY_MODE: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + ".affinity.mode"
private val GLUTEN_CLICKHOUSE_AFFINITY_MODE: String = CHConf.prefixOf("affinity.mode")
val SOFT: String = "soft"
val FORCE: String = "force"
private val GLUTEN_CLICKHOUSE_AFFINITY_MODE_DEFAULT = SOFT

private val GLUTEN_MAX_BLOCK_SIZE: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".runtime_settings.max_block_size"
private val GLUTEN_MAX_BLOCK_SIZE: String = CHConf.runtimeSettings("max_block_size")
// Same as default value in clickhouse
private val GLUTEN_MAX_BLOCK_SIZE_DEFAULT = 65409
private val GLUTEN_MAX_SHUFFLE_READ_BYTES: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".runtime_config.max_source_concatenate_bytes"
CHConf.runtimeConfig("max_source_concatenate_bytes")
private val GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT = GLUTEN_MAX_BLOCK_SIZE_DEFAULT * 256

val GLUTEN_AQE_PROPAGATEEMPTY: String =
GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME +
".aqe.propagate.empty.relation"
val GLUTEN_AQE_PROPAGATEEMPTY: String = CHConf.prefixOf("aqe.propagate.empty.relation")

def affinityMode: String = {
SparkEnv.get.conf
Expand Down Expand Up @@ -368,15 +356,15 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
// Need to enable AQE
def enableReorderHashJoinTables(): Boolean = {
SparkEnv.get.conf.getBoolean(
"spark.gluten.sql.columnar.backend.ch.enable_reorder_hash_join_tables",
CHConf.prefixOf("enable_reorder_hash_join_tables"),
defaultValue = true
)
}
// The threshold to reorder hash join tables, if The result of dividing two tables' size is
// large then this threshold, reorder the tables. e.g. a/b > threshold or b/a > threshold
def reorderHashJoinTablesThreshold(): Int = {
SparkEnv.get.conf.getInt(
"spark.gluten.sql.columnar.backend.ch.reorder_hash_join_tables_thresdhold",
CHConf.prefixOf("reorder_hash_join_tables_thresdhold"),
10
)
}
Expand All @@ -385,8 +373,8 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
// for example, select a, b, sum(c+d) from t group by a, b with cube
def enablePushdownPreProjectionAheadExpand(): Boolean = {
SparkEnv.get.conf.getBoolean(
"spark.gluten.sql.columnar.backend.ch.enable_pushdown_preprojection_ahead_expand",
true
CHConf.prefixOf("enable_pushdown_preprojection_ahead_expand"),
defaultValue = true
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,35 @@ import org.apache.gluten.GlutenConfig
import org.apache.spark.SparkConf

object CHConf {

private val CH = GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + "."
private val CH_SETTINGS = CH + "runtime_settings."
private val CH_CONFIG = CH + "runtime_config."
private[clickhouse] val BACKEND_NAME: String = "ch"
private[clickhouse] val CONF_PREFIX: String = GlutenConfig.prefixOf(BACKEND_NAME)
private val RUNTIME_SETTINGS: String = s"$CONF_PREFIX.runtime_settings"
private val RUNTIME_CONFIG = s"$CONF_PREFIX.runtime_config"
implicit class GlutenCHConf(conf: SparkConf) {
def setCHSettings(settings: (String, String)*): SparkConf = {
settings.foreach { case (k, v) => conf.set(settingsKey(k), v) }
settings.foreach { case (k, v) => conf.set(runtimeSettings(k), v) }
conf
}

def setCHSettings[T](k: String, v: T): SparkConf = {
conf.set(settingsKey(k), v.toString)
conf.set(runtimeSettings(k), v.toString)
conf
}

def setCHConfig(config: (String, String)*): SparkConf = {
config.foreach { case (k, v) => conf.set(configKey(k), v) }
config.foreach { case (k, v) => conf.set(runtimeConfig(k), v) }
conf
}

def setCHConfig[T](k: String, v: T): SparkConf = {
conf.set(configKey(k), v.toString)
conf.set(runtimeConfig(k), v.toString)
conf
}
}

def configKey(key: String): String = CH_CONFIG + key
def settingsKey(key: String): String = CH_SETTINGS + key
def prefixOf(key: String): String = s"$CONF_PREFIX.$key"
def runtimeConfig(key: String): String = s"$RUNTIME_CONFIG.$key"
def runtimeSettings(key: String): String = s"$RUNTIME_SETTINGS.$key"

def startWithSettings(key: String): Boolean = key.startsWith(RUNTIME_SETTINGS)
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ class CHListenerApi extends ListenerApi with Logging {
"local_engine.settings.log_processors_profiles" -> "true")

// add memory limit for external sort
val externalSortKey = s"${CHBackend.CONF_PREFIX}.runtime_settings" +
s".max_bytes_before_external_sort"
val externalSortKey = CHConf.runtimeSettings("max_bytes_before_external_sort")
if (conf.getLong(externalSortKey, -1) < 0) {
if (conf.getBoolean("spark.memory.offHeap.enabled", defaultValue = false)) {
val memSize = JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,32 +87,33 @@ class CHTransformerApi extends TransformerApi with Logging {
override def postProcessNativeConfig(
nativeConfMap: util.Map[String, String],
backendPrefix: String): Unit = {
val settingPrefix = backendPrefix + ".runtime_settings."

require(backendPrefix == CHConf.CONF_PREFIX)
if (nativeConfMap.getOrDefault("spark.memory.offHeap.enabled", "false").toBoolean) {
val offHeapSize =
nativeConfMap.getOrDefault("spark.gluten.memory.offHeap.size.in.bytes", "0").toLong
if (offHeapSize > 0) {

// Only set default max_bytes_before_external_group_by for CH when it is not set explicitly.
val groupBySpillKey = settingPrefix + "max_bytes_before_external_group_by";
val groupBySpillKey = CHConf.runtimeSettings("max_bytes_before_external_group_by")
if (!nativeConfMap.containsKey(groupBySpillKey)) {
val groupBySpillValue = offHeapSize * 0.5
nativeConfMap.put(groupBySpillKey, groupBySpillValue.toLong.toString)
}

val maxMemoryUsageKey = settingPrefix + "max_memory_usage";
val maxMemoryUsageKey = CHConf.runtimeSettings("max_memory_usage")
if (!nativeConfMap.containsKey(maxMemoryUsageKey)) {
val maxMemoryUsageValue = offHeapSize
nativeConfMap.put(maxMemoryUsageKey, maxMemoryUsageValue.toLong.toString)
nativeConfMap.put(maxMemoryUsageKey, maxMemoryUsageValue.toString)
}

// Only set default max_bytes_before_external_join for CH when join_algorithm is grace_hash
val joinAlgorithmKey = settingPrefix + "join_algorithm";
val joinAlgorithmKey = CHConf.runtimeSettings("join_algorithm")
if (
nativeConfMap.containsKey(joinAlgorithmKey) &&
nativeConfMap.get(joinAlgorithmKey) == "grace_hash"
) {
val joinSpillKey = settingPrefix + "max_bytes_in_join";
val joinSpillKey = CHConf.runtimeSettings("max_bytes_in_join")
if (!nativeConfMap.containsKey(joinSpillKey)) {
val joinSpillValue = offHeapSize * 0.7
nativeConfMap.put(joinSpillKey, joinSpillValue.toLong.toString)
Expand All @@ -127,24 +128,24 @@ class CHTransformerApi extends TransformerApi with Logging {
}
}

val hdfsConfigPrefix = backendPrefix + ".runtime_config.hdfs."
injectConfig("spark.hadoop.input.connect.timeout", hdfsConfigPrefix + "input_connect_timeout")
injectConfig("spark.hadoop.input.read.timeout", hdfsConfigPrefix + "input_read_timeout")
injectConfig("spark.hadoop.input.write.timeout", hdfsConfigPrefix + "input_write_timeout")
val hdfsConfigPrefix = CHConf.runtimeConfig("hdfs")
injectConfig("spark.hadoop.input.connect.timeout", s"$hdfsConfigPrefix.input_connect_timeout")
injectConfig("spark.hadoop.input.read.timeout", s"$hdfsConfigPrefix.input_read_timeout")
injectConfig("spark.hadoop.input.write.timeout", s"$hdfsConfigPrefix.input_write_timeout")
injectConfig(
"spark.hadoop.dfs.client.log.severity",
hdfsConfigPrefix + "dfs_client_log_severity")
s"$hdfsConfigPrefix.dfs_client_log_severity")

// TODO: set default to true when metrics could be collected
// while ch query plan optimization is enabled.
val planOptKey = settingPrefix + "query_plan_enable_optimizations"
val planOptKey = CHConf.runtimeSettings("query_plan_enable_optimizations")
if (!nativeConfMap.containsKey(planOptKey)) {
nativeConfMap.put(planOptKey, "false")
}

// Respect spark config spark.sql.orc.compression.codec for CH backend
// TODO: consider compression or orc.compression in table options.
val orcCompressionKey = settingPrefix + "output_format_orc_compression_method"
val orcCompressionKey = CHConf.runtimeSettings("output_format_orc_compression_method")
if (!nativeConfMap.containsKey(orcCompressionKey)) {
if (nativeConfMap.containsKey("spark.sql.orc.compression.codec")) {
val compression = nativeConfMap.get("spark.sql.orc.compression.codec").toLowerCase()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.expression

import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.expression.ConverterUtils.FunctionConfig
import org.apache.gluten.substrait.expression._
Expand Down Expand Up @@ -70,7 +70,7 @@ case class CHTruncTimestampTransformer(
if (
timeZoneIgnore && timeZoneId.nonEmpty &&
!timeZoneId.get.equalsIgnoreCase(
SQLConf.get.getConfString(s"${CHBackend.CONF_PREFIX}.runtime_config.timezone")
SQLConf.get.getConfString(s"${CHConf.runtimeConfig("timezone")}")
)
) {
throw new GlutenNotSupportException(
Expand Down Expand Up @@ -157,23 +157,23 @@ case class CHPosExplodeTransformer(
// Output (pos, col) when input is array type
val structType = StructType(
Array(
StructField("pos", IntegerType, false),
StructField("pos", IntegerType, nullable = false),
StructField("col", a.elementType, a.containsNull)))
ExpressionBuilder.makeScalarFunction(
funcId,
Lists.newArrayList(childNode),
ConverterUtils.getTypeNode(structType, false))
ConverterUtils.getTypeNode(structType, nullable = false))
case m: MapType =>
// Output (pos, key, value) when input is map type
val structType = StructType(
Array(
StructField("pos", IntegerType, false),
StructField("key", m.keyType, false),
StructField("pos", IntegerType, nullable = false),
StructField("key", m.keyType, nullable = false),
StructField("value", m.valueType, m.valueContainsNull)))
ExpressionBuilder.makeScalarFunction(
funcId,
Lists.newArrayList(childNode),
ConverterUtils.getTypeNode(structType, false))
ConverterUtils.getTypeNode(structType, nullable = false))
case _ =>
throw new GlutenNotSupportException(s"posexplode($childType) not supported yet.")
}
Expand Down Expand Up @@ -225,7 +225,7 @@ case class GetArrayItemTransformer(
Seq(IntegerType, getArrayItem.right.dataType),
FunctionConfig.OPT)
val addFunctionId = ExpressionBuilder.newScalarFunction(functionMap, addFunctionName)
val literalNode = ExpressionBuilder.makeLiteral(1.toInt, IntegerType, false)
val literalNode = ExpressionBuilder.makeLiteral(1, IntegerType, false)
rightNode = ExpressionBuilder.makeScalarFunction(
addFunctionId,
Lists.newArrayList(literalNode, rightNode),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.utils

import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.execution.{GlutenMergeTreePartition, MergeTreePartRange, MergeTreePartSplit}
import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
import org.apache.gluten.softaffinity.SoftAffinityManager
Expand Down Expand Up @@ -171,9 +171,10 @@ object MergeTreePartsPartitionsUtil extends Logging {
val ret = ClickhouseSnapshot.pathToAddMTPCache.getIfPresent(path)
if (ret == null) {
val keys = ClickhouseSnapshot.pathToAddMTPCache.asMap().keySet()
val keySample = keys.isEmpty match {
case true => "<empty>"
case false => keys.iterator().next()
val keySample = if (keys.isEmpty) {
"<empty>"
} else {
keys.iterator().next()
}
throw new IllegalStateException(
"Can't find AddMergeTreeParts from cache pathToAddMTPCache for key: " +
Expand Down Expand Up @@ -418,7 +419,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
bucketId =>
val currBucketParts: Seq[MergeTreePartRange] =
prunedFilesGroupedToBuckets.getOrElse(bucketId, Seq.empty)
if (!currBucketParts.isEmpty) {
if (currBucketParts.nonEmpty) {
val currentFiles = currBucketParts.map {
part =>
MergeTreePartSplit(
Expand Down Expand Up @@ -453,8 +454,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
}

private def useDriverFilter(filterExprs: Seq[Expression], sparkSession: SparkSession): Boolean = {
val enableDriverFilterKey = s"${CHBackend.CONF_PREFIX}.runtime_settings" +
s".enabled_driver_filter_mergetree_index"
val enableDriverFilterKey = CHConf.runtimeSettings("enabled_driver_filter_mergetree_index")

// When using soft affinity, disable driver filter
filterExprs.nonEmpty && sparkSession.sessionState.conf.getConfString(
Expand Down
Loading

0 comments on commit 1a43af5

Please sign in to comment.