From 823b8d7b5c5e8951d4e6d592fb0b11d5c17d1b41 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 10 Apr 2024 10:24:07 -0700 Subject: [PATCH] change count type to float --- .../datafusion/expressions/covariance.rs | 31 +++++----- .../comet/exec/CometAggregateSuite.scala | 56 +++++++++++-------- 2 files changed, 47 insertions(+), 40 deletions(-) diff --git a/core/src/execution/datafusion/expressions/covariance.rs b/core/src/execution/datafusion/expressions/covariance.rs index 9b24555b1b..dd3f81beb6 100644 --- a/core/src/execution/datafusion/expressions/covariance.rs +++ b/core/src/execution/datafusion/expressions/covariance.rs @@ -20,7 +20,7 @@ use std::{any::Any, sync::Arc}; use arrow::{ - array::{ArrayRef, Float64Array, Int64Array}, + array::{ArrayRef, Float64Array}, compute::cast, datatypes::{DataType, Field}, }; @@ -86,7 +86,7 @@ impl AggregateExpr for Covariance { Ok(vec![ Field::new( format_state_name(&self.name, "count"), - DataType::Int64, + DataType::Float64, true, ), Field::new( @@ -163,7 +163,7 @@ impl AggregateExpr for CovariancePop { Ok(vec![ Field::new( format_state_name(&self.name, "count"), - DataType::Int64, + DataType::Float64, true, ), Field::new( @@ -208,7 +208,7 @@ pub struct CovarianceAccumulator { algo_const: f64, mean1: f64, mean2: f64, - count: i64, + count: f64, stats_type: StatsType, } @@ -219,12 +219,12 @@ impl CovarianceAccumulator { algo_const: 0_f64, mean1: 0_f64, mean2: 0_f64, - count: 0_i64, + count: 0_f64, stats_type: s_type, }) } - pub fn get_count(&self) -> i64 { + pub fn get_count(&self) -> f64 { self.count } @@ -276,14 +276,14 @@ impl Accumulator for CovarianceAccumulator { let value1 = unwrap_or_internal_err!(value1); let value2 = unwrap_or_internal_err!(value2); - let new_count = self.count + 1; + let new_count = self.count + 1.0; let delta1 = value1 - self.mean1; let new_mean1 = delta1 / new_count as f64 + self.mean1; let delta2 = value2 - self.mean2; let new_mean2 = delta2 / new_count as f64 + self.mean2; let new_c = delta1 * (value2 - new_mean2) + self.algo_const; - self.count += 1; + self.count += 1.0; self.mean1 = new_mean1; self.mean2 = new_mean2; self.algo_const = new_c; @@ -317,14 +317,14 @@ impl Accumulator for CovarianceAccumulator { let value1 = unwrap_or_internal_err!(value1); let value2 = unwrap_or_internal_err!(value2); - let new_count = self.count - 1; + let new_count = self.count - 1.0; let delta1 = self.mean1 - value1; let new_mean1 = delta1 / new_count as f64 + self.mean1; let delta2 = self.mean2 - value2; let new_mean2 = delta2 / new_count as f64 + self.mean2; let new_c = self.algo_const - delta1 * (new_mean2 - value2); - self.count -= 1; + self.count -= 1.0; self.mean1 = new_mean1; self.mean2 = new_mean2; self.algo_const = new_c; @@ -334,14 +334,14 @@ impl Accumulator for CovarianceAccumulator { } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let counts = downcast_value!(states[0], Int64Array); + let counts = downcast_value!(states[0], Float64Array); let means1 = downcast_value!(states[1], Float64Array); let means2 = downcast_value!(states[2], Float64Array); let cs = downcast_value!(states[3], Float64Array); for i in 0..counts.len() { let c = counts.value(i); - if c == 0 { + if c == 0.0 { continue; } let new_count = self.count + c; @@ -366,19 +366,18 @@ impl Accumulator for CovarianceAccumulator { } fn evaluate(&mut self) -> Result { - println!("evaluate evaluate evaluate"); let count = match self.stats_type { datafusion_physical_expr::expressions::StatsType::Population => self.count, StatsType::Sample => { - if self.count > 0 { - self.count - 1 + if self.count > 0.0 { + self.count - 1.0 } else { self.count } } }; - if count == 0 { + if count == 0.0 { Ok(ScalarValue::Float64(None)) } else { Ok(ScalarValue::Float64(Some(self.algo_const / count as f64))) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala index 68a1117469..65afe76e30 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala @@ -998,30 +998,38 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("covar_pop and covar_samp") { - withSQLConf( - CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", - CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") { - Seq(false).foreach { dictionary => - withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { - val table = "test" - withTable(table) { - sql( - s"create table $table(col1 int, col2 int, col3 int, col4 float, col5 double, col6 double, col7 int) using parquet") - sql( - s"insert into $table values(1, 4, null, 1.1, 2.2, null, 1), (2, 5, 6, 3.4, 5.6, null, 1), (3, 6, null, 7.9, 2.4, null, 2)") - val expectedNumOfCometAggregates = 2 - checkSparkAnswerAndNumOfAggregates( - "SELECT covar_samp(col1, col2), covar_samp(col1, col3), covar_samp(col4, col5), covar_samp(col4, col6) FROM test", - expectedNumOfCometAggregates) - checkSparkAnswerAndNumOfAggregates( - "SELECT covar_pop(col1, col2), covar_pop(col1, col3), covar_pop(col4, col5), covar_pop(col4, col6) FROM test", - expectedNumOfCometAggregates) - checkSparkAnswerAndNumOfAggregates( - "SELECT covar_samp(col1, col2), covar_samp(col1, col3), covar_samp(col4, col5), covar_samp(col4, col6) FROM test GROUP BY col7", - expectedNumOfCometAggregates) - checkSparkAnswerAndNumOfAggregates( - "SELECT covar_pop(col1, col2), covar_pop(col1, col3), covar_pop(col4, col5), covar_pop(col4, col6) FROM test GROUP BY col7", - expectedNumOfCometAggregates) + withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { + Seq(true, false).foreach { cometColumnShuffleEnabled => + withSQLConf( + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> cometColumnShuffleEnabled.toString) { + Seq(true, false).foreach { dictionary => + withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { + val table = "test" + withTable(table) { + sql( + s"create table $table(col1 int, col2 int, col3 int, col4 float, col5 double," + + s" col6 double, col7 int) using parquet") + sql(s"insert into $table values(1, 4, null, 1.1, 2.2, null, 1)," + + s" (2, 5, 6, 3.4, 5.6, null, 1), (3, 6, null, 7.9, 2.4, null, 2)") + val expectedNumOfCometAggregates = 2 + checkSparkAnswerAndNumOfAggregates( + "SELECT covar_samp(col1, col2), covar_samp(col1, col3), covar_samp(col4, col5)," + + " covar_samp(col4, col6) FROM test", + expectedNumOfCometAggregates) + checkSparkAnswerAndNumOfAggregates( + "SELECT covar_pop(col1, col2), covar_pop(col1, col3), covar_pop(col4, col5)," + + " covar_pop(col4, col6) FROM test", + expectedNumOfCometAggregates) + checkSparkAnswerAndNumOfAggregates( + "SELECT covar_samp(col1, col2), covar_samp(col1, col3), covar_samp(col4, col5)," + + " covar_samp(col4, col6) FROM test GROUP BY col7", + expectedNumOfCometAggregates) + checkSparkAnswerAndNumOfAggregates( + "SELECT covar_pop(col1, col2), covar_pop(col1, col3), covar_pop(col4, col5)," + + " covar_pop(col4, col6) FROM test GROUP BY col7", + expectedNumOfCometAggregates) + } + } } } }