Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Avoid to call import and export Arrow array for native execution #1055

Closed

Conversation

kazuyukitanimura
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

Performance improvement

What changes are included in this PR?

This PR has changes to avoid to call import and export Arrow array for native execution

How are these changes tested?

Exisiting tests

@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 36.06557% with 39 lines in your changes missing coverage. Please review.

Project coverage is 34.32%. Comparing base (845b654) to head (ec72117).
Report is 16 commits behind head on main.

Files with missing lines Patch % Lines
...ava/org/apache/comet/vector/CometNativeVector.java 0.00% 15 Missing ⚠️
...in/java/org/apache/comet/parquet/ColumnReader.java 0.00% 11 Missing ⚠️
...ain/scala/org/apache/comet/vector/NativeUtil.scala 0.00% 8 Missing ⚠️
...ain/java/org/apache/comet/parquet/BatchReader.java 0.00% 3 Missing ⚠️
...ava/org/apache/comet/parquet/LazyColumnReader.java 0.00% 1 Missing ⚠️
.../src/main/java/org/apache/comet/parquet/Utils.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1055      +/-   ##
============================================
- Coverage     34.46%   34.32%   -0.15%     
- Complexity      888      893       +5     
============================================
  Files           113      114       +1     
  Lines         43580    42916     -664     
  Branches       9658     9339     -319     
============================================
- Hits          15021    14732     -289     
+ Misses        25507    25336     -171     
+ Partials       3052     2848     -204     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@andygrove
Copy link
Member

@kazuyukitanimura. I can run the benchmarks now, but I did not see any difference in performance compared to the main branch. Are you still seeing a performance benefit after the recent changes?

@kazuyukitanimura
Copy link
Contributor Author

kazuyukitanimura commented Nov 13, 2024

Thank you @andygrove
That's odd. My local queries are showing clear improvement. Just checking, are you using iceberg? I haven't done the DSv2 yet. Right now, pure parquet with DSv1 gets the benefit of this PR
I will try to run more queries as well.

@kazuyukitanimura
Copy link
Contributor Author

kazuyukitanimura commented Nov 13, 2024

5 iterations
before

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q27: Comet (Scan, Exec)                            4218           4320         116         68.8          14.5       1.0X

After

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q27: Comet (Scan, Exec)                            3534           3784         191         82.1          12.2       1.0X

@kazuyukitanimura
Copy link
Contributor Author

Before

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q39a: Comet (Scan, Exec)                          14077          14413         252         28.4          35.2       1.0X

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q44: Comet (Scan, Exec)                            2160           2235          73        133.4           7.5       1.0X

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q68: Comet (Scan, Exec)                            5023           5091          61         58.0          17.3       1.0

After

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q39a: Comet (Scan, Exec)                          12968          13770        1085         30.8          32.5       1.0X

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q44: Comet (Scan, Exec)                            1968           2106         129        146.5           6.8       1.0X

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q68: Comet (Scan, Exec)                            4232           4530         195         68.8          14.5       1.0

@andygrove
Copy link
Member

I see these results from 3 runs of q39 (both a+b) with 1TB input:

main: 59.3s/57.0s/57.1s
this PR: 58.2s/58.9s/57.9s

I am using these configs:

    --conf spark.executor.instances=4 \
    --conf spark.executor.cores=6 \
    --conf spark.executor.memory=8g \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=32g \

@kazuyukitanimura
Copy link
Contributor Author

kazuyukitanimura commented Nov 13, 2024

Thanks @andygrove
I tried with offHeap but I still see the advantage. Wondering if any of the following configs make difference. I use 100G input

spark.comet.regexp.allowIncompatible=true
spark.sql.shuffle.partitions=4
spark.driver.memory=3g
spark.executor.memory=3g
spark.sql.autoBroadcastJoinThreshold=20971520 // (20 * 1024 * 1024)
spark.sql.crossJoin.enabled=true
parquet.enable.dictionary=true
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q39a: Comet (Scan, Exec)                          13732          14177         756         29.1          34.4       1.0X

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q44: Comet (Scan, Exec)                            1942           2029          69        148.4           6.7       1.0X

OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Mac OS X 14.7
Apple M1 Max
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q68: Comet (Scan, Exec)                            4544           4823         211         64.1          15.6       1.0X

@viirya
Copy link
Member

viirya commented Nov 13, 2024

I guess the diff is only observable on very small scale (ms). Once the query time is second/minute/hour level, the diff is insignificant.

@kazuyukitanimura
Copy link
Contributor Author

Thank you @andygrove @viirya I addressed memory issues and added DSv2 support (except Iceberg). This is ready for review again.

I also run with 1TB by myself. I still see 10% ish speed up that is the order of seconds not ms. I think there are some environmental differences between @andygrove 's and mine.

Before (1TB)

TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q39a: Comet (Scan, Exec)                          27979          33349         580         28.0          35.7       1.0X

TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q44: Comet (Scan, Exec)                           24877          25216         271        115.8           8.6       1.0X

After (1TB)

TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q39a: Comet (Scan, Exec)                          27281          28714         111         28.7          34.8       1.0X

TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
q44: Comet (Scan, Exec)                           23534          23880         388        122.4           8.2       1.0X

@viirya
Copy link
Member

viirya commented Nov 14, 2024

It looks like less than 5%?

27979 - 27281 = 698 ~= 2%
24877 - 23534 = 1343 ~= 5%

So I believe that the diff is insignificant for longer query time.

@kazuyukitanimura
Copy link
Contributor Author

Thanks @viirya

It looks like less than 5%?
27979 - 27281 = 698 ~= 2%
24877 - 23534 = 1343 ~= 5%

I used Ave Time (5 iterations) when I mentioned 10% ish
100G q39a (14413-13770)/14413 ~= 4.5%
1T q39a (33349-28714)/33349 ~= 14%
The performance gain can easily swing between 2%-15% due to the noise

So I believe that the diff is insignificant for longer query time.

The performance gain is proportional to the input size by design because it saves the overhead for all input arrow vectors.
If you mean longer query time by deeper query plan, then yes, the performance gain gets less effective as the benefit is one-time per data read and query procesing time takes more portions in the entire runtime.
But there are many use cases I think for shallow query plans as well and we can still save $$$?

@viirya
Copy link
Member

viirya commented Nov 14, 2024

1T q39a (33349-28714)/33349 ~= 14%

33349 obviously looks like it is affected by some noises.

q44 avg diff is also ~ 5%.

The performance gain is proportional to the input size by design because it saves the overhead for all input arrow vectors.

I mean it is less significant with longer query time. For queries running many minutes or hours, it has no difference.
If this is a trivial change, it will be good. But as I took a quick look, the change is not small and doesn't look like in good design to me.

I doubt that if it is worth.

@kazuyukitanimura
Copy link
Contributor Author

@viirya Let me try to convince one more time.

For queries running many minutes or hours, it has no difference.

If the query time is long because the data size, this PR still helps (5% at least?). If the query time is long because the query itself is complex, this PR has less value.

the change is not small

The latest change is pretty small after following your change on Arrow spec memory model. There are only 3 main changes

  1. avoid importing: common/src/main/java/org/apache/comet/parquet/ColumnReader.java
  2. avoid exporting: common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
  3. type handling fix: native/core/src/execution/utils.rs

The rest of changes are only for passing the information of the new mode is used.

doesn't look like in good design to me.

Do you have any recommendations here? What if I add a feature flag to enable/disable this new code flow. The latest change is fully backward compatible. We can easily enable/disable with a single flag to manipulate hasNativeOperations boolean.

@andygrove
Copy link
Member

@kazuyukitanimura I plan on reviewing this PR this week. I have found that the import/export cost during shuffle is excessive (see #1123) and would like to understand more about the approach in this PR.

@andygrove
Copy link
Member

I understand what this PR is doing now. By using CometNativeVector when importing from native, and then reexporting CometNativeVector we are just passing memory addresses and avoiding extra serde costs. I am going to spend some time debugging with my benchmarks to ensure this path is being tested.

@kazuyukitanimura
Copy link
Contributor Author

Currently this PR only handles scan + exec, but if we can cover shuffle similarly the design may become clearner

@andygrove
Copy link
Member

The approach in this PR still serializes the schema with each batch (as is necessary when using the Arrow C data interface). I am not sure that the fast path reduces much cost. I think it is worth exploring more but I am not sure we should merge this PR yet, especially as the comet-parquet-exec work may replace some of this code.

@kazuyukitanimura
Copy link
Contributor Author

kazuyukitanimura commented Dec 10, 2024

Thanks @andygrove

The approach in this PR still serializes the schema with each batch (as is necessary when using the Arrow C data interface)

This PR is for saving Java import/export to/from java value vector format

I am not sure that the fast path reduces much cost.

In the flamegraph, currentBatch() and exportBatch() blocks are significantly shortened/disappeared
#1055 (comment)

I think it is worth exploring more but I am not sure we should merge this PR yet,

I can close this for now

especially as the comet-parquet-exec work may replace some of this code.

I thought this PR part does not change as we still need to send back the results to JVM for pure scan case, but I could be wrong...

@parthchandra
Copy link
Contributor

especially as the comet-parquet-exec work may replace some of this code.

I thought this PR part does not change as we still need to send back the results to JVM for pure scan case, but I could be wrong...

[comet-parquet-exec ] POC2 certainly does some import/export to pass data back for each columnar batch, but I see no way to remove that. Maybe we can apply some learnings from this PR once [comet-parquet-exec] is more stable.

@kazuyukitanimura
Copy link
Contributor Author

Closing this for now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants