Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add additional metrics for shuffle write #1173

Closed
wants to merge 12 commits into from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Dec 16, 2024

Which issue does this PR close?

N/A

Rationale for this change

I would like to understand how much time is spent on shuffle writing.

What changes are included in this PR?

  • Introduce a new write_time native metric to record write time instead of using elapsed_time
  • Use elapsed_time to measure total native time (excluding executing the child plan and fetching data)
  • Add new input_time native metric for measuring the time for ShuffleWriterExec to execute the child plan and fetch its input data
  • Add new shuffleWallTime JVM metric to measure total time of shuffle
  • Update docs

Spark UI

Note the new metrics:

  • "native shuffle time"
  • "native shuffle input time"
  • "shuffle wall time"

Screenshot from 2024-12-16 15-21-24

Native plan

ShuffleWriterExec: ..., metrics=[elapsed_compute=42.425493ms, ..., input_time=255.873165ms, write_time=462.517µs]
  ScanExec: source=[ShuffleWriterInput], metrics=[elapsed_compute=254.994282ms, ...]

How are these changes tested?

@andygrove andygrove marked this pull request as ready for review December 16, 2024 22:29

### CometScanExec

`CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a problem statement. Did you mean that spark.comet.metrics.detailed=true will not loose the precision?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik, the conversion happens only when the data is to be displayed in the UI. (https://github.com/apache/spark/blob/576caec1da85c4451fe63e2a5923f2dbf136e278/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala#L248)
But this is what Spark does with all its nanosecond timing metrics, so we aren't doing anything different here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have updated this

Copy link
Member Author

@andygrove andygrove Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark converts nanos to millis on each batch:

        override def hasNext: Boolean = {
          // The `FileScanRDD` returns an iterator which scans the file during the `hasNext` call.
          val startNs = System.nanoTime()
          val res = batches.hasNext
          scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
          res
        }

We just use the nano time:

        override def hasNext: Boolean = {
          // The `FileScanRDD` returns an iterator which scans the file during the `hasNext` call.
          val startNs = System.nanoTime()
          val res = batches.hasNext
          scanTime += System.nanoTime() - startNs
          res
        }

It actually makes a large difference to the time reported in some cases

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the description in the metrics guide to explain this in more detail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That could be a big difference in small datasets (wonder if the same occurs when we have large files). Either way, better to not lose precision. We are not likely to run into overflow issues are we?

@andygrove
Copy link
Member Author

Also added ipc_time.

Screenshot from 2024-12-16 16-58-39

@andygrove
Copy link
Member Author

Seeing some segmentation faults in CI:

# Problematic frame:
# C  [libcomet-9228012970092653184.so+0x274908f]  core::sync::atomic::AtomicUsize::fetch_add::h50346ebf04f5a2ef+0x4f

@andygrove
Copy link
Member Author

I may have found a bug in DataFusion

 # A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007fb91a124f46, pid=762, tid=833
#
# JRE version: OpenJDK Runtime Environment Zulu11.76+21-CA (11.0.25+9) (build 11.0.25+9-LTS)
# Java VM: OpenJDK 64-Bit Server VM Zulu11.76+21-CA (11.0.25+9-LTS, mixed mode, tiered, compressed oops, g1 gc, linux-amd64)
# Problematic frame:
# C  [libcomet-1968252056067325991.so+0x1524f46]  datafusion_physical_plan::metrics::value::ScopedTimerGuard::stop::ha579d5e7b5f1a919+0x46

@andygrove andygrove marked this pull request as draft December 17, 2024 16:18
@andygrove
Copy link
Member Author

I created a new simpler PR to replace this one: #1175

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants