Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into optimize-rpad
Browse files Browse the repository at this point in the history
  • Loading branch information
kazuyukitanimura committed Aug 7, 2024
2 parents f5d128c + c4bd3db commit d4b0c66
Show file tree
Hide file tree
Showing 50 changed files with 1,780 additions and 669 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public long getLong(int rowId) {
return delegate.getLong(rowId);
}

@Override
public long getLongDecimal(int rowId) {
return delegate.getLongDecimal(rowId);
}

@Override
public float getFloat(int rowId) {
return delegate.getFloat(rowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public long decodeToLong(int index) {
return values.getLong(index);
}

public long decodeToLongDecimal(int index) {
return values.getLongDecimal(index);
}

public float decodeToFloat(int index) {
return values.getFloat(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public long getLong(int i) {
return values.decodeToLong(indices.getInt(i));
}

@Override
public long getLongDecimal(int i) {
return values.decodeToLongDecimal(indices.getInt(i));
}

@Override
public float getFloat(int i) {
return values.decodeToFloat(indices.getInt(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public long getLong(int rowId) {
return Platform.getLong(null, valueBufferAddress + rowId * 8L);
}

@Override
public long getLongDecimal(int rowId) {
return Platform.getLong(null, valueBufferAddress + rowId * 16L);
}

@Override
public float getFloat(int rowId) {
return Platform.getFloat(null, valueBufferAddress + rowId * 4L);
Expand Down
8 changes: 6 additions & 2 deletions common/src/main/java/org/apache/comet/vector/CometVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public boolean isFixedLength() {
public Decimal getDecimal(int i, int precision, int scale) {
if (!useDecimal128 && precision <= Decimal.MAX_INT_DIGITS() && type instanceof IntegerType) {
return createDecimal(getInt(i), precision, scale);
} else if (!useDecimal128 && precision <= Decimal.MAX_LONG_DIGITS()) {
return createDecimal(getLong(i), precision, scale);
} else if (precision <= Decimal.MAX_LONG_DIGITS()) {
return createDecimal(useDecimal128 ? getLongDecimal(i) : getLong(i), precision, scale);
} else {
byte[] bytes = getBinaryDecimal(i);
BigInteger bigInteger = new BigInteger(bytes);
Expand Down Expand Up @@ -166,6 +166,10 @@ public long getLong(int rowId) {
throw new UnsupportedOperationException("Not yet supported");
}

public long getLongDecimal(int rowId) {
throw new UnsupportedOperationException("Not yet supported");
}

@Override
public float getFloat(int rowId) {
throw new UnsupportedOperationException("Not yet supported");
Expand Down
15 changes: 15 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,14 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(COMET_ANSI_MODE_ENABLED_DEFAULT)

val COMET_CASE_CONVERSION_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.caseConversion.enabled")
.doc(
"Java uses locale-specific rules when converting strings to upper or lower case and " +
"Rust does not, so we disable upper and lower by default.")
.booleanConf
.createWithDefault(false)

val COMET_CAST_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.cast.allowIncompatible")
.doc(
Expand All @@ -410,6 +418,13 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.regexp.allowIncompatible")
.doc("Comet is not currently fully compatible with Spark for all regular expressions. " +
"Set this config to true to allow them anyway using Rust's regular expression engine. " +
"See compatibility guide for more information.")
.booleanConf
.createWithDefault(false)
}

object ConfigHelpers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.collection.JavaConverters._
import org.apache.arrow.c.CDataDictionaryProvider
import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot}
import org.apache.arrow.vector.complex.MapVector
import org.apache.arrow.vector.complex.StructVector
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.arrow.vector.ipc.ArrowStreamWriter
import org.apache.arrow.vector.types._
Expand Down Expand Up @@ -258,7 +259,7 @@ object Utils {
case v @ (_: BitVector | _: TinyIntVector | _: SmallIntVector | _: IntVector |
_: BigIntVector | _: Float4Vector | _: Float8Vector | _: VarCharVector |
_: DecimalVector | _: DateDayVector | _: TimeStampMicroTZVector | _: VarBinaryVector |
_: FixedSizeBinaryVector | _: TimeStampMicroVector) =>
_: FixedSizeBinaryVector | _: TimeStampMicroVector | _: StructVector) =>
v.asInstanceOf[FieldVector]
case _ =>
throw new SparkException(s"Unsupported Arrow Vector for $reason: ${valueVector.getClass}")
Expand Down
6 changes: 6 additions & 0 deletions docs/source/user-guide/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ be used in production.

There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support.

## Regular Expressions

Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but
this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.

## Cast

Cast operations in Comet fall into three levels of support:
Expand Down
2 changes: 2 additions & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Comet provides the following configuration settings.
| Config | Description | Default Value |
|--------|-------------|---------------|
| spark.comet.batchSize | The columnar batch size, i.e., the maximum number of rows that a batch can contain. | 8192 |
| spark.comet.caseConversion.enabled | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false |
| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. See compatibility guide for more information. | false |
| spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous shuffle for Arrow-based shuffle. By default, this config is false. | false |
| spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. By default, this config is 100. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 |
Expand All @@ -44,6 +45,7 @@ Comet provides the following configuration settings.
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. By default, this is false | false |
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false |
| spark.comet.rowToColumnar.supportedOperatorList | A comma-separated list of row-based operators that will be converted to columnar format when 'spark.comet.rowToColumnar.enabled' is true | Range,InMemoryTableScan |
| spark.comet.scan.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true |
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false |
Expand Down
6 changes: 6 additions & 0 deletions docs/templates/compatibility-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ be used in production.

There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support.

## Regular Expressions

Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but
this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.

## Cast

Cast operations in Comet fall into three levels of support:
Expand Down
Loading

0 comments on commit d4b0c66

Please sign in to comment.