Skip to content

Commit

Permalink
Add plugin to use Pyroscope agents for monitoring Spark workloads
Browse files Browse the repository at this point in the history
LucaCanali committed Sep 26, 2023
1 parent 53ac3b5 commit 32ce826
Showing 3 changed files with 136 additions and 13 deletions.
63 changes: 54 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -2,17 +2,16 @@
![SparkPlugins CI](https://github.com/cerndb/SparkPlugins/workflows/SparkPlugins%20CI/badge.svg?branch=master&event=push)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/ch.cern.sparkmeasure/spark-plugins_2.12/badge.svg)](https://maven-badges.herokuapp.com/maven-central/ch.cern.sparkmeasure/spark-plugins_2.12)

This repository contains code and examples of how to use Apache Spark Plugins.
This repository contains code and examples of how to use Apache Spark Plugins.
Spark plugins provide an interface,
and related configuration, for injecting custom code on executors as they are initialized.
Spark plugins can also be used to implement custom extensions to the Spark metrics system.

### Motivations
- One important use case for deploying Spark Plugins is extending Spark instrumentation with custom metrics.
- Other use cases include running custom actions when the executors start up, typically useful for integrating with
external systems.
- This repo provides code and examples of plugins applied to measuring Spark on K8S,
Spark I/O from cloud Filesystems, OS metrics, and custom application metrics.
external systems, such as monitoring systems.
- This repo provides code and examples of plugins applied to measuring Spark on cluster resources (YARN, K8S, Standalone),
including measuring Spark I/O from cloud Filesystems, OS metrics, custom application metrics, and integrations with external systems like Pyroscope.
- Note: The code in this repo is for Spark 3.x.
For Spark 2.x, see instead [Executor Plugins for Spark 2.4](https://github.com/cerndb/SparkExecutorPlugins2.4)

@@ -21,9 +20,9 @@ Spark I/O from cloud Filesystems, OS metrics, and custom application metrics.
and can be used to run custom code at the startup of Spark executors and driver.
- Plugins basic configuration: `--conf spark.plugins=<list of plugin classes>`
- Plugin JARs need to be made available to Spark executors
- you can distribute the plugin code to the executors using `--jars` and `--packages`.
- for K8S you can also consider making the jars available directly in the container image.
- Most of the Plugins described in this repo are intended to extend the Spark Metrics System.
- you can distribute the plugin code to the executors using `--jars` and `--packages`
- for K8S you can also consider making the jars available directly in the container image
- Most of the Plugins described in this repo are intended to extend the Spark Metrics System
- See the details on the Spark metrics system at [Spark Monitoring documentation](https://spark.apache.org/docs/latest/monitoring.html#metrics).
- You can find the metrics generated by the plugins in the Spark metrics system stream under the
namespace `namespace=plugin.<Plugin Class Name>`
@@ -32,7 +31,7 @@ Spark I/O from cloud Filesystems, OS metrics, and custom application metrics.
### Related Work and Spark Performance Dashboard

- Spark Performance Dashboard - a solution to ingest and visualize Spark metrics
- Link to the repo on [how to deploy a Spark Performance Dashboard using Spark metrics](https://github.com/cerndb/spark-dashboard)
- link to the repo on [how to deploy a Spark Performance Dashboard using Spark metrics](https://github.com/cerndb/spark-dashboard)
- DATA+AI summit 2020 talk [What is New with Apache Spark Performance Monitoring in Spark 3.0](https://databricks.com/session_eu20/what-is-new-with-apache-spark-performance-monitoring-in-spark-3-0)
- DATA+AI summit 2021 talk [Monitor Apache Spark 3 on Kubernetes using Metrics and Plugins](https://databricks.com/session_na21/monitor-apache-spark-3-on-kubernetes-using-metrics-and-plugins)

@@ -71,6 +70,52 @@ Spark I/O from cloud Filesystems, OS metrics, and custom application metrics.
---
## Plugins in this Repository
### Plugin for integrating with Pyroscope
[Grafana Pyroscope](https://grafana.com/oss/pyroscope/) is a tool for continuous profiling and Flame Graph visualization. This plugin allows to integrate Apache Spark and Pyroscope.
For details see:
[How to profile Spark with Pyroscope](https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Tools_Spark_Pyroscope_FlameGraph.md)
An example of how to put all the configuration together and start Spark on a cluster with Pyroscope Flame Graph
continuous monitoring. Example:
1. Start Pyroscope
- Download from https://github.com/grafana/pyroscope/releases
- CLI start: `./pyroscope -server.http-listen-port 5040`
- Or use docker: `docker run -it -p 5040:4040 grafana/pyroscope`
2. Spark Spark (spark-shell, PySpark, spark-submit
```
bin/spark-shell --master yarn \
--packages ch.cern.sparkmeasure:spark-plugins_2.12:0.3,io.pyroscope:agent:0.12.0 \ # update to use the latest versions
--conf spark.plugins=ch.cern.PyroscopePlugin \
--conf spark.pyroscope.server="http://<myhostname>:5040" # match with the server and port used when starting Pyroscope
```
**Spark configurations:**
This plugin adds the following configurations:
```
--conf spark.pyroscope.server - > default "http://localhost:4040", update to match the server name and port used by Pyroscope
--conf spark.pyroscope.applicationName -> default spark.conf.get("spark.app.id")
--conf spark.pyroscope.eventType -> default ITIMER, possible values ITIMER, CPU, WALL, ALLOC, LOCK
```
**Example:**
This is an example of how to use the configuration programmatically (using PySpark):
```
from pyspark.sql import SparkSession

# Get the Spark session
spark = (SparkSession.builder.
appName("Instrumented app").master("yarn")
.config("spark.executor.memory","16g")
.config("spark.executor.cores","4")
.config("spark.executor.instances", 2)
.config("spark.jars.packages", "ch.cern.sparkmeasure:spark-plugins_2.12:0.3,io.pyroscope:agent:0.12.0")
.config("spark.plugins", "ch.cern.PyroscopePlugin")
.config("spark.pyroscope.server", "http://<myhostname>:5040")
.getOrCreate()
)
```
### OS metrics instrumentation with cgroups, for Spark on Kubernetes
- [CgroupMetrics](src/main/scala/ch/cern/CgroupMetrics.scala)
- Configure with: `--conf spark.plugins=ch.cern.CgroupMetrics`
9 changes: 5 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -3,14 +3,15 @@ name := "spark-plugins"
version := "0.3-SNAPSHOT"
isSnapshot := true

scalaVersion := "2.12.17"
crossScalaVersions := Seq("2.12.17", "2.13.8")
scalaVersion := "2.12.18"
crossScalaVersions := Seq("2.12.18", "2.13.8")

licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0"))

libraryDependencies += "io.dropwizard.metrics" % "metrics-core" % "4.2.15"
libraryDependencies += "io.dropwizard.metrics" % "metrics-core" % "4.2.19"
libraryDependencies += "org.apache.hadoop" % "hadoop-client-api" % "3.3.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.1"
libraryDependencies += "io.pyroscope" % "agent" % "0.12.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.0"

// publishing to Sonatype Nexus repository and Maven
publishMavenStyle := true
77 changes: 77 additions & 0 deletions src/main/scala/ch/cern/PyroscopePlugin.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package ch.cern

import io.pyroscope.javaagent.PyroscopeAgent
import io.pyroscope.javaagent.EventType
import io.pyroscope.javaagent.config.Config
import io.pyroscope.http.Format

import java.net.InetAddress
import java.util.{Map => JMap}
import scala.collection.JavaConverters._

import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.SparkContext

/**
* Use this Plugin to configure Pyroscope java agent data collection on Spark executors
* See https://grafana.com/docs/pyroscope/latest/configure-client/language-sdks/java/
*
* This plugin adds the following configurations:
* --conf spark.pyroscope.server - > default "http://localhost:4040", update to match the server name and port used by Pyroscope
* --conf spark.pyroscope.applicationName -> default spark.conf.get("spark.app.id")
* --conf spark.pyroscope.eventType -> default ITIMER, possible values ITIMER, CPU, WALL, ALLOC, LOCK
*/
class PyroscopePlugin extends SparkPlugin {

// Return the plugin's driver-side component.
override def driverPlugin(): DriverPlugin = {
new DriverPlugin() {
override def init(sc: SparkContext, myContext: PluginContext): JMap[String, String] = {
Map.empty[String, String].asJava
}
}
}

// Return the plugin's executor-side component.
// This implements an executor plugin to set up the configuration for Pyroscope
override def executorPlugin(): ExecutorPlugin = {
new ExecutorPlugin() {
override def init(myContext: PluginContext, extraConf: JMap[String, String]): Unit = {

// Pyroscope server URL, match the URL with your Pyroscope runtime
val pyroscopeServerUrl = myContext.conf.get("spark.pyroscope.server", "http://localhost:4040")

// this will be used for the application name
// note, in local mode spark.app.id in null, we use "local" to handle the case
val pyroscopeApplicationName = myContext.conf.get("spark.pyroscope.applicationName",
myContext.conf.get("spark.app.id", "local"))

val executorId = myContext.executorID
val localHostname = InetAddress.getLocalHost.getHostName

// this sets the event type to profile, default ITIMER, possible values ITIMER, CPU, WALL, ALLOC, LOCK
val pyroscopeEventType = myContext.conf.get("spark.pyroscope.eventType", "ITIMER")

val eventType = pyroscopeEventType.toUpperCase match {
case "ITIMER" => EventType.ITIMER
case "CPU" => EventType.CPU
case "WALL" => EventType.WALL
case "ALLOC" => EventType.ALLOC
case "LOCK" => EventType.LOCK
case _ => throw new IllegalArgumentException(s"Invalid event type: $pyroscopeEventType")
}

PyroscopeAgent.start(
new Config.Builder()
.setApplicationName(pyroscopeApplicationName)
.setProfilingEvent(eventType)
.setFormat(Format.JFR)
.setServerAddress(pyroscopeServerUrl)
.setLabels(Map("executorId" -> executorId.toString, "hostname" -> localHostname).asJava)
.build()
)
}
}
}

}

0 comments on commit 32ce826

Please sign in to comment.