Skip to content

Commit

Permalink
[GLUTEN-7028][CH][Part-3] Refactor: Move mergetree related codes to b…
Browse files Browse the repository at this point in the history
…ackends-clickhouse (apache#7234)

This is second refacor PR for moving mergetree related codes to backends-clickhouse:
1. Move JniUtils from gluten-arrow to gluten-core and using config.proto, so we can use ConfigMap to pass configuration between java and c++.
2. Move ExtensionTableXXX from gluten-substrait to backends-clickhouse
3. Move `genWriteParameters` to `TransformerApi`, so we can pass mergetree related parameters.

(Fixes: \apache#7028)
  • Loading branch information
baibaichen authored and shamirchen committed Oct 14, 2024
1 parent f4c8074 commit f93d261
Show file tree
Hide file tree
Showing 102 changed files with 549 additions and 947 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -106,6 +106,7 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
context: TaskAttemptContext): OutputWriter = {
GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -109,6 +109,7 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
context: TaskAttemptContext): OutputWriter = {
GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -110,6 +110,7 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
context: TaskAttemptContext): OutputWriter = {
GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,28 @@
import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.execution.ColumnarNativeIterator;
import org.apache.gluten.memory.CHThreadGroup;
import org.apache.gluten.substrait.expression.ExpressionBuilder;
import org.apache.gluten.substrait.expression.StringMapNode;
import org.apache.gluten.substrait.extensions.AdvancedExtensionNode;
import org.apache.gluten.substrait.extensions.ExtensionBuilder;
import org.apache.gluten.substrait.plan.PlanBuilder;
import org.apache.gluten.utils.ConfigUtil;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.internal.SQLConf;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import scala.Tuple2;
import scala.collection.JavaConverters;

public class CHNativeExpressionEvaluator extends ExpressionEvaluatorJniWrapper {

private CHNativeExpressionEvaluator() {}

// Used to initialize the native computing.
public static void initNative(SparkConf conf) {
Tuple2<String, String>[] all = conf.getAll();
Map<String, String> confMap =
Arrays.stream(all).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
public static void initNative(scala.collection.Map<String, String> conf) {
Map<String, String> nativeConfMap =
GlutenConfig.getNativeBackendConf(
Backend.get().name(), JavaConverters.mapAsScalaMap(confMap));
GlutenConfig.getNativeBackendConf(Backend.get().name(), conf);

// Get the customer config from SparkConf for each backend
BackendsApiManager.getTransformerApiInstance()
.postProcessNativeConfig(nativeConfMap, GlutenConfig.prefixOf(Backend.get().name()));

nativeInitNative(buildNativeConf(nativeConfMap));
nativeInitNative(ConfigUtil.serialize(nativeConfMap));
}

public static void finalizeNative() {
Expand All @@ -68,15 +54,6 @@ public static boolean doValidate(byte[] subPlan) {
throw new UnsupportedOperationException("doValidate is not supported in Clickhouse Backend");
}

private static byte[] buildNativeConf(Map<String, String> confs) {
StringMapNode stringMapNode = ExpressionBuilder.makeStringMap(confs);
AdvancedExtensionNode extensionNode =
ExtensionBuilder.makeAdvancedExtension(
BackendsApiManager.getTransformerApiInstance()
.packPBMessage(stringMapNode.toProtobuf()));
return PlanBuilder.makePlan(extensionNode).toProtobuf().toByteArray();
}

private static Map<String, String> getNativeBackendConf() {
return GlutenConfig.getNativeBackendConf(Backend.get().name(), SQLConf.get().getAllConfs());
}
Expand All @@ -99,7 +76,7 @@ public static BatchIterator createKernelWithBatchIterator(
wsPlan,
splitInfo,
iterList.toArray(new ColumnarNativeIterator[0]),
buildNativeConf(getNativeBackendConf()),
ConfigUtil.serialize(getNativeBackendConf()),
materializeInput);
return createBatchIterator(handle);
}
Expand All @@ -113,7 +90,7 @@ public static BatchIterator createKernelWithBatchIterator(
wsPlan,
splitInfo,
iterList.toArray(new ColumnarNativeIterator[0]),
buildNativeConf(getNativeBackendConf()),
ConfigUtil.serialize(getNativeBackendConf()),
false);
return createBatchIterator(handle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,9 @@

import org.apache.gluten.execution.BroadCastHashJoinContext;
import org.apache.gluten.execution.JoinTypeTransform;
import org.apache.gluten.expression.ConverterUtils;
import org.apache.gluten.expression.ConverterUtils$;
import org.apache.gluten.substrait.type.TypeNode;
import org.apache.gluten.utils.SubstraitUtil;

import io.substrait.proto.NamedStruct;
import io.substrait.proto.Type;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Expression;

Expand Down Expand Up @@ -97,24 +93,8 @@ public static long build(
joinType,
broadCastContext.hasMixedFiltCondition(),
broadCastContext.isExistenceJoin(),
toNameStruct(output).toByteArray(),
SubstraitUtil.toNameStruct(output).toByteArray(),
broadCastContext.isNullAwareAntiJoin(),
hasNullKeyValues);
}

/** create table named struct */
private static NamedStruct toNameStruct(List<Attribute> output) {
List<TypeNode> typeList = ConverterUtils.collectAttributeTypeNodes(output);
List<String> nameList = ConverterUtils.collectAttributeNamesWithExprId(output);
Type.Struct.Builder structBuilder = Type.Struct.newBuilder();
for (TypeNode typeNode : typeList) {
structBuilder.addTypes(typeNode.toProtobuf());
}
NamedStruct.Builder nStructBuilder = NamedStruct.newBuilder();
nStructBuilder.setStruct(structBuilder.build());
for (String name : nameList) {
nStructBuilder.addNames(name);
}
return nStructBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.substrait.rel;
package org.apache.spark.sql.execution.datasources.clickhouse;

import org.apache.gluten.expression.ConverterUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.substrait.rel;
package org.apache.spark.sql.execution.datasources.clickhouse;

import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.substrait.rel.SplitInfo;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.StringValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.CHColumnarShuffleWriter
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder, ExtensionTableNode}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class CHListenerApi extends ListenerApi with Logging {
// Load supported hive/python/scala udfs
UDFMappings.loadFromSparkConf(conf)

CHNativeExpressionEvaluator.initNative(conf)
CHNativeExpressionEvaluator.initNative(conf.getAll.toMap)

// inject backend-specific implementations to override spark classes
// FIXME: The following set instances twice in local mode?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.backendsapi.TransformerApi
import org.apache.gluten.execution.CHHashAggregateExecTransformer
import org.apache.gluten.execution.{CHHashAggregateExecTransformer, WriteFilesExecTransformer}
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.substrait.expression.{BooleanLiteralNode, ExpressionBuilder, ExpressionNode}
import org.apache.gluten.utils.{CHInputPartitionsUtil, ExpressionDocUtil}
Expand All @@ -30,13 +30,14 @@ import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.BitSet

import com.google.common.collect.Lists
import com.google.protobuf.{Any, Message}
import com.google.protobuf.{Any, Message, StringValue}

import java.util

Expand Down Expand Up @@ -233,4 +234,25 @@ class CHTransformerApi extends TransformerApi with Logging {
override def invalidateSQLExecutionResource(executionId: String): Unit = {
GlutenDriverEndpoint.invalidateResourceRelation(executionId)
}

override def genWriteParameters(
fileFormat: FileFormat,
writeOptions: Map[String, String]): Any = {
val fileFormatStr = fileFormat match {
case register: DataSourceRegister =>
register.shortName
case _ => "UnknownFileFormat"
}
val compressionCodec =
WriteFilesExecTransformer.getCompressionCodec(writeOptions).capitalize
val writeParametersStr = new StringBuffer("WriteParameters:")
writeParametersStr.append("is").append(compressionCodec).append("=1")
writeParametersStr.append(";format=").append(fileFormatStr).append("\n")

packPBMessage(
StringValue
.newBuilder()
.setValue(writeParametersStr.toString)
.build())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.commands

import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.substrait.rel.ExtensionTableBuilder

import org.apache.spark.affinity.CHAffinity
import org.apache.spark.rpc.GlutenDriverEndpoint
Expand All @@ -28,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
import org.apache.spark.sql.delta._
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.commands.GlutenCacheBase._
import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types.{BooleanType, StringType}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.gluten.softaffinity.SoftAffinityManager
import org.apache.gluten.substrait.`type`.ColumnTypeNode
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.rel.{ExtensionTableBuilder, RelBuilder}
import org.apache.gluten.substrait.rel.RelBuilder

import org.apache.spark.affinity.CHAffinity
import org.apache.spark.internal.Logging
Expand All @@ -35,7 +35,7 @@ import org.apache.spark.sql.delta.ClickhouseSnapshot
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.clickhouse.MergeTreePartFilterReturnedRange
import org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder, MergeTreePartFilterReturnedRange}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.types.BooleanType
Expand All @@ -44,6 +44,7 @@ import org.apache.spark.util.collection.BitSet
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.protobuf.{Any, StringValue}
import io.substrait.proto.NamedStruct
import io.substrait.proto.Plan

import java.lang.{Long => JLong}
Expand Down Expand Up @@ -528,9 +529,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
val columnTypeNodes = output.map {
attr =>
if (table.partitionColumns.exists(_.equals(attr.name))) {
new ColumnTypeNode(1)
new ColumnTypeNode(NamedStruct.ColumnType.PARTITION_COL)
} else {
new ColumnTypeNode(0)
new ColumnTypeNode(NamedStruct.ColumnType.NORMAL_COL)
}
}.asJava
val substraitContext = new SubstraitContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase {
context: TaskAttemptContext,
nativeConf: java.util.Map[String, String]): OutputWriter = {
val originPath = path
val datasourceJniWrapper = new CHDatasourceJniWrapper();
val datasourceJniWrapper = new CHDatasourceJniWrapper()
CHThreadGroup.registerNewThreadGroup()

val namedStructBuilder = NamedStruct.newBuilder
Expand All @@ -49,13 +49,10 @@ trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase {
structBuilder.addTypes(ConverterUtils.getTypeNode(field.dataType, field.nullable).toProtobuf)
}
namedStructBuilder.setStruct(structBuilder.build)
var namedStruct = namedStructBuilder.build
val namedStruct = namedStructBuilder.build

val instance =
datasourceJniWrapper.nativeInitFileWriterWrapper(
path,
namedStruct.toByteArray,
getFormatName());
datasourceJniWrapper.nativeInitFileWriterWrapper(path, namedStruct.toByteArray, formatName)

new OutputWriter {
override def write(row: InternalRow): Unit = {
Expand Down Expand Up @@ -83,8 +80,29 @@ trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase {
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
// TODO: parquet and mergetree
OrcUtils.inferSchema(sparkSession, files, options)
}

// scalastyle:off argcount
/** For CH MergeTree format */
def createOutputWriter(
path: String,
database: String,
tableName: String,
snapshotId: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
partitionColumns: Seq[String],
tableSchema: StructType,
clickhouseTableConfigs: Map[String, String],
context: TaskAttemptContext,
nativeConf: java.util.Map[String, String]): OutputWriter = null
// scalastyle:on argcount
}

class CHRowSplitter extends GlutenRowSplitter {
Expand Down
Loading

0 comments on commit f93d261

Please sign in to comment.