-
Notifications
You must be signed in to change notification settings - Fork 166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test: Reduce end-to-end test time #109
Conversation
withSQLConf( | ||
CometConf.COMET_EXEC_ENABLED.key -> "true", | ||
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", | ||
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, these configs are used to produce a specific situation or corner case. If they are changed, you won't fail the test but maybe it just tests nothing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think anything changed. I moved the common configs to the beginning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you move the common configs and it should be good. There are many tests so we should be careful if no one is missed or with changed configs.
protected val numElementsForceSpillThreshold: Int = 10 | ||
|
||
override protected def sparkConf: SparkConf = { | ||
val conf = super.sparkConf | ||
conf | ||
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveExecutionEnabled.toString) | ||
.set(CometConf.COMET_EXEC_ENABLED.key, "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved the common configs for columnar shuffle to the sparkConf
.
} | ||
} | ||
|
||
test("Comet shuffle: different data type") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have this which tests both columnar and native shuffle, and "Comet columnar shuffle shuffle: different data type" below which only test columnar shuffle. This PR moves the columnar shuffle from this test and moves it into CometNativeShuffleSuite
, to reduce the test time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think by purpose these tests focus on different aspects. For example, "different data type" tests are for shuffling on different data types especially.
They might be some overlapping when we look them now. At beginning, we want to have much more test coverage as possible to reduce missing corner cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have this which tests both columnar and native shuffle
I looked at this test now. I think it is not to test both columnar and native shuffle, but only native shuffle. You can see it never enabled columnar shuffle, and the feature is disable by default.
When CometExec
is disabled in this test, I think it goes to test if we can still run Spark shuffle without issue, i.e, if we can fallback to original Spark shuffle correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops you are right. I got confused when looking at the execEnabled
. It's interesting that for this test case, the check checkCometExchange(shuffled, 1, true)
can pass regardless of whether execEnabled
is true or false. Looks like native shuffle can be enabled even if native execution is not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the shuffle is directly on top of Scan:
+- BosonExchange hashpartitioning(_4#82021, 10), REPARTITION_BY_NUM, BosonNativeShuffle, [plan_id=342026]
+- BosonScan parquet [_1#82018,_4#82021] Batched: true, DataFilters: [], Format: BosonParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/liangc
hi/repos/boson/spark/target/tmp/spark-f2ec6474-6664-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:boolean,_4:int>
So even Comet exec is disabled, it still can work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see. I'll update and keep the execEnabled
2bd9915
to
e407d96
Compare
521bf2a
to
a908b9a
Compare
7fa120f
to
b3b14e9
Compare
@@ -90,13 +90,13 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { | |||
Range(0, len).map(_ => chars.charAt(r.nextInt(chars.length))).mkString | |||
} | |||
|
|||
private def fuzzCastFromString(chars: String, maxLen: Int, toType: DataType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is added by accident (from make format
). I'll remove it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a quick through, I think the test settings are not changed before and after this PR. Let me do another detailed code review and comment back.
The time reduction looks fantastic, thanks for your effort.
spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
Show resolved
Hide resolved
TPC-DS run now is the bottleneck, I think we can reduce it in a follow up PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments
spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
Outdated
Show resolved
Hide resolved
} | ||
checkSparkAnswer(s"SELECT _g$gCol, SUM(_1), SUM(_2) FROM tbl GROUP BY _g$gCol") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar for sum
, count
, min
, avg
and max
.
Count(distinct xx)
and sum(distinct xx)
is different, might have to be iterated by 4 cols.
(1 to 4).foreach { col => | ||
(1 to 14).foreach { gCol => | ||
(1 to 14).foreach { gCol => | ||
(1 to 4).foreach { col => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another unrelated question: why 1 to 4
? seems like _1
to _4
are both integer types.
We probably want to test other types like float/double, decimal etc?
But this should be addressed in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, this only covers integer types. The other types like float/double, decimal are covered by other tests in the same suite. We could add them here but it might cause explosion in the total test time.
} | ||
} | ||
|
||
test("fix: comet native shuffle with binary data") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this test case has already been covered by test("native shuffle: different data type") {
?
Let me do a refactor in a follow up PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. It seems we don't have binary type in the Parquet table, but only FixedLengthByteArray. We can add it.
Thanks. Updated the PR.
Yes it's easy to break it up. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, pending CI passes.
} | ||
checkSparkAnswer(s"SELECT _g$gCol, SUM(_1), SUM(_2), COUNT(_3), COUNT(_4), " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are columns _1
, _2
... different to each other? If so, seems this removes SUM(_3)
and SUM(_4)
and also misses some SUM(DISTINCT xx)
? Because previously it tests all columns with all aggregate expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the code path is pretty much the same though, so I thought it is fine to lose some precision here.
} | ||
} | ||
} | ||
|
||
class CometAsyncShuffleSuite extends CometShuffleSuiteBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, async is only meaningful for columnar shuffle. We don't need run native shuffle test cases with async.
CometConf.COMET_BATCH_SIZE.key -> "1", | ||
CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false", | ||
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { | ||
withParquetTable((0 until 1000000).map(i => (1, (i + 1).toLong)), "tbl") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember I set a big number because the bug only happens for many rows. We probably can reduce the number just no sure which is a proper one.
} | ||
} | ||
|
||
test("columnar shuffle: single partition") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to change test name? It is native shuffle suite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea let me update this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Only concern is the "different data types" test moved to native shuffle suite. Maybe we can restore the change there.
52a9033
to
6c5cab6
Compare
6c5cab6
to
dbbdc93
Compare
Merged, thanks! |
Which issue does this PR close?
Closes #.
Rationale for this change
Currently Java tests take over 2 hours to finish, which is very long. This increases the end-to-end time for pull requests to be processed, and decreases the developer efficiency.
What changes are included in this PR?
This PR makes a few changes in shuffle & aggregation tests:
CometColumnarShuffleSuite
andCometNativeShuffleSuite
, for columnar and native shuffle respectively. This is more like a refactoring rather than changing the tests themselvesNonFastMerge
related tests. In Spark, the fast merge feature has been true by default since 1.4, so it is not so useful to run all tests with the feature being turned off. If needed, we can add a dedicated test for it.CometAggregateSuite
.The total time is now reduced to ~40min
How are these changes tested?