-
Notifications
You must be signed in to change notification settings - Fork 361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1720] Prevent stage re-run if another task attempt is running or successful #2921
base: main
Are you sure you want to change the base?
Conversation
ddc8ca4
to
8c43cc2
Compare
e1d465b
to
f6b9d24
Compare
Seems difficult to add UT, how do you think about? @FMX |
It is too difficult to add the UT. Gentle ping @mridulm |
Hi, I see this PR. IMO, you can add a test config to trigger task hang and fetch failure in certain map tasks. Maybe it won't be too difficult to add UTs. |
Thanks, added UT and tested locally. |
### What changes were proposed in this pull request? Fix NPE. When failed to connect to celeborn worker, the currentReader might be `null`. ### Why are the changes needed? I am testing #2921 in the celeborn cluster. And set the `celeborn.data.io.connectionTimeout` to 30s for fetch failure testing, and it failed to connect to celeborn worker for 3 times, and then the currentReader was null. <img width="1700" alt="image" src="https://github.com/user-attachments/assets/9473294d-2cca-4f8b-bc86-ab6f70f04cff"> https://github.com/turboFei/incubator-celeborn/blob/2be9682a34f97ff10b90f22f60d9fea2bc5b81b7/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java#L672 ``` 24/11/20 16:15:41 ERROR Executor: Exception in task 16238.0 in stage 9.0 (TID 108550) java.lang.NullPointerException at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.fillBuffer(CelebornInputStream.java:672) at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.read(CelebornInputStream.java:515) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at java.io.DataInputStream.read(DataInputStream.java:149) at org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899) at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110) at scala.collection.Iterator$$anon$11.next(Iterator.scala:496) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:756) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225) at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. Closes #2933 from turboFei/npe_reader. Authored-by: Wang, Fei <[email protected]> Signed-off-by: mingji <[email protected]>
### What changes were proposed in this pull request? Fix NPE. When failed to connect to celeborn worker, the currentReader might be `null`. ### Why are the changes needed? I am testing #2921 in the celeborn cluster. And set the `celeborn.data.io.connectionTimeout` to 30s for fetch failure testing, and it failed to connect to celeborn worker for 3 times, and then the currentReader was null. <img width="1700" alt="image" src="https://github.com/user-attachments/assets/9473294d-2cca-4f8b-bc86-ab6f70f04cff"> https://github.com/turboFei/incubator-celeborn/blob/2be9682a34f97ff10b90f22f60d9fea2bc5b81b7/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java#L672 ``` 24/11/20 16:15:41 ERROR Executor: Exception in task 16238.0 in stage 9.0 (TID 108550) java.lang.NullPointerException at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.fillBuffer(CelebornInputStream.java:672) at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.read(CelebornInputStream.java:515) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at java.io.DataInputStream.read(DataInputStream.java:149) at org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899) at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110) at scala.collection.Iterator$$anon$11.next(Iterator.scala:496) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:756) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225) at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. Closes #2933 from turboFei/npe_reader. Authored-by: Wang, Fei <[email protected]> Signed-off-by: mingji <[email protected]> (cherry picked from commit 094fe28) Signed-off-by: mingji <[email protected]>
The UT is invalid, checking |
This PR is ready as it is impossible to add UT for speculation, see details in the PR description. And I have added UT for the SparkUtils. cc @FMX |
What changes were proposed in this pull request?
Prevent stage re-run if another task attempt is running.
If a shuffle read task can not read the shuffle data and the task another attempt is running or successful, just throw the CelebornIOException instead of FetchFailureException.
The app will not failure before reach the task maxFailures.
Why are the changes needed?
I met below issue because I set the wrong parameters, I should set
spark.celeborn.data.io.connectTime=30s
but set thespark.celeborn.data.io.connectionTime=30s
, and the Disk IO Utils was high at that time.Due the stage re-run is heavy, so I wonder that, we should ignore the shuffle fetch failure, if there is another task attempt running.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT for the SparkUtils method only, due it is impossible to add UT for speculation.
https://github.com/apache/spark/blob/d5da49d56d7dec5f8a96c5252384d865f7efd4d9/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L236-L244
For local master, it would not start the speculationScheduler.
https://github.com/apache/spark/blob/d5da49d56d7dec5f8a96c5252384d865f7efd4d9/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L322-L346
and it is also not allowed to launch speculative task on the same host.