Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support BloomFilterMightContain expr #179

Merged
merged 7 commits into from
Mar 14, 2024

Conversation

advancedxy
Copy link
Contributor

Which issue does this PR close?

Closes #145

Rationale for this change

More expr coverage

What changes are included in this PR?

  1. define new expr in proto
  2. add SparkBitArray, SparkBloomFilter and BloomFilterMightContain support in the rust side
  3. glue code to transform SparkPlan to proto message in JVM side and proto message to physical expr in the native side
  4. add tests in both JVM and native side

Note: BloomFilterMightContain is only available in Spark 3.3+. So we have to add a separate test directory to test that.

How are these changes tested?

Added new tests.

@advancedxy
Copy link
Contributor Author

cc @viirya and @sunchao.

Due to my limited experience with Rust programming yet, there might be some non-idiomatic code in this PR. Appreciate your comments.

@codecov-commenter
Copy link

codecov-commenter commented Mar 9, 2024

Codecov Report

Attention: Patch coverage is 80.00000% with 3 lines in your changes are missing coverage. Please review.

Project coverage is 33.45%. Comparing base (d069713) to head (3851042).
Report is 1 commits behind head on main.

Files Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 85.71% 0 Missing and 2 partials ⚠️
...la/org/apache/comet/shims/ShimQueryPlanSerde.scala 0.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #179      +/-   ##
============================================
+ Coverage     33.31%   33.45%   +0.14%     
- Complexity      767      770       +3     
============================================
  Files           107      107              
  Lines         35375    35432      +57     
  Branches       7658     7696      +38     
============================================
+ Hits          11784    11855      +71     
+ Misses        21144    21108      -36     
- Partials       2447     2469      +22     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

core/src/execution/datafusion/util/spark_bloom_filter.rs Outdated Show resolved Hide resolved
core/src/execution/datafusion/util/spark_bloom_filter.rs Outdated Show resolved Hide resolved
core/src/execution/datafusion/util/spark_bloom_filter.rs Outdated Show resolved Hide resolved

fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> {
// lazily get the spark bloom filter
if self.bloom_filter.get().is_none() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious whether there is any clear advantage to lazily initialize this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bloom filter's binary is either a literal type or from ScalarSubquery, so I took the approach the lazily initialize that.

But maybe we can simply evaluate it before constructing the expression just like how in_list is constructed, let me address it in the new commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The static bloom filter now is evaluated eagerly. Please let me know what you think about the latest change.

core/src/execution/datafusion/util/spark_bit_array.rs Outdated Show resolved Hide resolved

pub fn set(&mut self, index: usize) -> bool {
if !self.get(index) {
self.data[index >> 6] |= 1u64 << (index & 0x3f);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why index & 0x3f? Spark BitArray doesn't do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java and Rust have different semantics about bit shift left.

For Java, the shit left operator will rotate bits if the number of bits to be shifted are large than 64

jshell> 1 << 65
$1 ==> 2

jshell> 1 << 129
$5 ==> 2

Rust doesn't support this semantic, it will panic at overflow.

1u64 << 65 // panics shift left with overflow

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it is not rotated. Java shift operators defines:

If the promoted type of the left-hand operand is long, then only the six lowest-order bits of the right-hand operand are used as the shift distance. It is as if the right-hand operand were subjected to a bitwise logical AND operator & with the mask value 0x3f (0b111111).[11] The shift distance actually used is therefore always in the range 0 to 63, inclusive.

https://en.wikipedia.org/wiki/Bitwise_operation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add a comment on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it is not rotated.

Hmm, thanks for the correction and the info. I didn't find an authentic place about how java defines its shift operators and thought it was a rotated shift.

Maybe we should add a comment on this?

Of course.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is ready for another round of review.

I will address this comment together with other issues if any.

pom.xml Outdated
@@ -494,6 +494,7 @@ under the License.
<spark.version>3.2.2</spark.version>
<spark.version.short>3.2</spark.version.short>
<parquet.version>1.12.0</parquet.version>
<additional.test.source>spark-3.2</additional.test.source>
Copy link
Member

@viirya viirya Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you add spark-3.2 test source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark-3.2 is just a place holder, there's no test code to be added in spark-3.2 yet.

I can add the empty spark-3.2 dir though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean do we must add additional.test.source for 3.2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in the plugin to configure additional test source.

        <executions>
          <execution>
            <id>add-test-source</id>
            <phase>generate-test-sources</phase>
            <goals>
              <goal>add-test-source</goal>
            </goals>
            <configuration>
              <sources>
                <source>src/test/${additional.test.source}</source>
              </sources>
            </configuration>
          </execution>
        </executions>

If we don't add a placeholder here, the configuration will be wrong or has to be configured conditionally.

@advancedxy advancedxy force-pushed the SupportBFMightContain branch from de678d3 to 23bec73 Compare March 13, 2024 11:51
@advancedxy advancedxy force-pushed the SupportBFMightContain branch from 23bec73 to e13c168 Compare March 13, 2024 11:55
@@ -494,6 +495,8 @@ under the License.
<spark.version>3.2.2</spark.version>
<spark.version.short>3.2</spark.version.short>
<parquet.version>1.12.0</parquet.version>
<!-- we don't add special test suits for spark-3.2, so a not existed dir is specified-->
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya I add a comment here. Hopefully this addresses your concern.

the <additional.test.source> property is now added as a global property, which will make IDEs happy.
It would also simply work out of box for ./mvnw commands with out additional Profiles.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly good, just a few more nits

) -> Self {
// early evaluate the bloom_filter_expr to get the actual bloom filter
let bloom_filter = evaluate_bloom_filter(&bloom_filter_expr)
.expect("bloom_filter_expr could be evaluated statically");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"could be" -> "could not be"? also we can consider returning Result and change this to try_new, but not a big deal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. I thought the expect message is a precondition message.

try_new seems better, let me change to that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case it's better to say "bloom_filter_expr should be evaluated successfully"?

Copy link
Contributor Author

@advancedxy advancedxy Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. should should be used.

Anyway, I changed it with try_new returning Result.

#[derive(Debug, Hash)]
pub struct SparkBloomFilter {
bits: SparkBitArray,
num_hashes: u32,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: better add a comment on this, since it is actually the number of hash functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the variable to num_hash_functions, which should be more clear?

core/src/execution/datafusion/util/spark_bloom_filter.rs Outdated Show resolved Hide resolved
}
}

pub fn put_long(&mut self, item: i64) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not used right now but perhaps it would be in future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's not used right now. it's added for symmetry and it mostly would be in the future.

})
.unwrap_or_else(|| {
// when the bloom filter is null, we should return null for all the input
Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than use ScalarValue::Null, I think ScalarValue::Boolean(None) is more appropriate? Since it contains the data type info

Copy link
Member

@viirya viirya Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScalarValue::Boolean(None) is correct. ScalarValue::Null is null type.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sunchao sunchao merged commit 969f683 into apache:main Mar 14, 2024
26 checks passed
@sunchao
Copy link
Member

sunchao commented Mar 14, 2024

Merged, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Spark bloom filter expression BloomFilterMightContain
5 participants