Skip to content

Latest commit

 

History

History
217 lines (170 loc) · 20.9 KB

ReadMe.md

File metadata and controls

217 lines (170 loc) · 20.9 KB

Phil's Stopwatch for profiling Spark

A tech blog showing some of the features of this library can be found here

The Scala sources for the riddles in can be found in this companion project

The project can be locally installed as a module with the following command executed in its base directory:

export PYSPARK_PYTHON=python3
pip3 install -e .

The input to all scripts in the analytics folder are output records from a profiler or Spark logs. Two parser classes are defined in parsers.py that get initialized with the path to an individual profile file (ProfileParser) or to an individual Spark log file (SparkLogParser). When creating an object of the AppParser class, the constructor expects a path to an application directory under which many log files are located. Concrete examples are shown below.

The design is compositional: Since records from different profilers could have been written to a single file along with logging messages, a SparkLogParser object contains a ProfileParser object in a member field that might never be needed or get initialized.

Since an application folder typically contains many log files and only one of those, the master log file created by the driver (see below), contains all information about task/stage/job boundaries, an AppParser object contains a list of ProfileParser objects in a member field and most of its methods delegate to them.

The script plot_application gives a good overview, it extracts and plots metrics as well as scheduling info from all logs belonging to a Spark application. Further explanations and examples are included below.

When visualizing metrics, normalization logic is applied by default so different metric types can be conveniently displayed in the same plot. This can be prevented by using a normalize=False parameter when constructing a SparkLogParser or ProfileParser object like so:

log_parser = SparkLogParser('./data/ProfileFatso/JobFatso.log.gz', normalize=False)

The normalization logic is configured in this dictionary (second element in the lists) which also defines all known metrics and can be extended for new profilers.

Uber's JVM profiler

Uber's JVM profiler has several advantages so this project assumes that it will be used as the JVM profiler (the ProfileParser code could be easily modified to use outputs from other profilers though). The profiler JAR can be built with the following commands:

$ git clone https://github.com/uber-common/jvm-profiler.git
$ cd jvm-profiler/
$ mvn clean package
[...]
Replacing /users/phil/jvm-profiler/target/jvm-profiler-1.0.0.jar with /users/phil/jvm-profiler/target/jvm-profiler-1.0.0-shaded.jar

$ ls
-rw-r--r--   1 a  staff  7097056  9 Feb 10:07 jvm-profiler-1.0.0.jar
drwxr-xr-x   3 a  staff       96  9 Feb 10:07 maven-archiver
drwxr-xr-x   3 a  staff       96  9 Feb 10:07 maven-status
-rw-r--r--   1 a  staff    92420  9 Feb 10:07 original-jvm-profiler-1.0.0.jar
jvm-profiler-1.0.0.jar

... or you can use the JAR that I built from here

Profiling a single JVM / Spark in local mode

The following command was used to generate the output uploaded here from the JobStraggler class:

~/spark-2.4.0-bin-hadoop2.7/bin/spark-submit \
--conf spark.driver.extraJavaOptions=-javaagent:/users/phil/jvm-profiler/target/jvm-profiler-1.0.0.jar=sampleInterval=1000,metricInterval=100,reporter=com.uber.profiling.reporters.FileOutputReporter,outputDir=./ProfileStraggler \
--class profile.sparkjobs.JobStraggler \
target/scala-2.11/philstopwatch-assembly-0.1.jar > JobStraggler.log 

If the JVM profiler is used in this fashion, three different kinds of records are generated by its FileOutputReporter which are written to three separate JSON files: ProcessInfo.json, CpuAndMemory.json, and Stacktrace.json.

Profiling executor JVMs

When Spark is launched in distributed mode, most of the actual work is done by Spark executors that run on remote cluster nodes. In order to profile their VMs, the following conf setting would need to be added to the launch command above:

--conf spark.executor.extraJavaOptions=[...]

This is redundant in "local" mode though since the driver and executor run in the same JVM -- the profile records are already created by including --conf spark.driver.extraJavaOptions. An actual example of the command used for profiling a legit distributed Spark job is included further below.

Profiling PySpark

The output of executing the PySpark edition of Straggler is included in this repo folder, here is the command that was used:

~/spark-2.4.0-bin-hadoop2.7/bin/spark-submit \
--conf spark.driver.extraJavaOptions=-javaagent:/users/phil/jvm-profiler/target/jvm-profiler-1.0.0.jar=sampleInterval=1000,metricInterval=100,reporter=com.uber.profiling.reporters.FileOutputReporter,outputDir=./profile_straggler \
--conf spark.python.profile=true \
./spark_jobs/job_straggler.py  cpumemstack /users/phil/phil_stopwatch/analytics/data/profile_straggler > Straggler_PySpark.log

As in the run of the "Scala" edition above, the second line activates the JVM profiling, only the output directory name has changed (profile_straggler instead of ProfileStraggler). This line is optional here since a PySpark app is launched but it might make sense to include this JVM profiling it as a majority of the work is performed outside of Python. The third line and the two input arguments to the script in the last line (cpumemstack and /users/phil/phil_stopwatch/analytics/data/profile_straggler) are required for the actual PySpark profiler: The config parameter (--conf spark.python.profile=true) tells Spark that a custom profiler will be used, the first script argument cpumemstack specifies the profiler class (a profiler that tracks CPU, memory and the stack) and the second argument specifies the directory to where the profiler records will be saved. In that case, my PySpark profilers will create one or two different types of output records that are stored in at least two JSON files with the pattern sstack.json or scpumem.json.

Three PySpark profiler classes are included in pyspark_profilers.py: StackProfiler can be used to catch stack traces in order to create flame graphs, CpuMemProfiler captures CPU and memory usage, and CpuMemStackProfiler is a combination of these two. In order to use them, the profiler_cls field needs to be set with the name of the profiler as in this example when constructing a SparkContext so there are three possible settings:

  • profiler_cls=StackProfiler
  • profiler_cls=CpuMemProfiler
  • profiler_cls=CpuMemStackProfiler

I added a dictionary profiler_map in helper.py that links these class names with abbreviations that can be used as the actual script arguments when launching a PySpark app:

  • spark-submit [...] your_script.py stack sets profiler_cls=StackProfiler
  • spark-submit [...] your_script.py cpumem sets profiler_cls=CpuMemProfiler
  • spark-submit [...] your_script.py cpumemstack sets profiler_cls=CpuMemStackProfiler

The PySpark profiler code in pyspark_profilers.py needs a few auxiliary methods defined in helper.py. In case of a distributed application, Spark executors running on cluster nodes also need to access these two files, the easiest (but probably not most elegant) way of doing this is via SparkContext's adFile method, this solution is used here.

If such a distributed application is launched, it might not be possible to create output files in this fashion on storage systems like HDFS or S3 so profiler records might need to be written to the standard output instead. This can easily be accomplished by using the appropriate function in the appliction source code: The line ...

session.sparkContext.dump_profiles(dump_path)

... would change into ...

session.sparkContext.show_profiles()

An example occurrence is here

The script plot_slacker.py demonstrates how to create a combined JVM/PySpark metrics plot and flame graph.

Distributed Profiling

The last sentences already described some of the changes required for distributed PySpark profiling. For distributed JVM profiling, the worker nodes need to access the jvm-profiler-1.0.0.jar file so this JAR should be uploaded to the storage layer, in the case of S3 the copy command would be

aws s3 cp ./jvm-profiler-1.0.0.jar s3://your/bucket/jvm-profiler-1.0.0.jar

In a cloud environment, it is likely that the FileOutputReporter used above (set via reporter=com.uber.profiling.reporters.FileOutputReporter) will not work since its source code does not seem to include functionality for interacting with storage layers like HDFS or S3. In these cases, the profiler output records can be written to standard out along with other messages. This happens by default when no explicit reporter= flag is set as in the following command:

spark-submit --deploy-mode cluster \ 
--class your.Class \
--conf spark.jars=s3://your/bucket/jvm-profiler-1.0.0.jar \
--conf spark.driver.extraJavaOptions=-javaagent:jvm-profiler-1.0.0.jar=sampleInterval=2000,metricInterval=1000 \
--conf spark.executor.extraJavaOptions=-javaagent:jvm-profiler-1.0.0.jar=sampleInterval=2000,metricInterval=1000 \
s3://path/to/your/project.jar 

The actual source code of the Spark application is located in the class Class inside package your and all source code is packaged inside the JAR file project.jar. The third line specifies the location of the profiler JAR that all Spark executors and the driver need to be able to access in case they are profiled. They are indeed, the fourth and fifth line activate CpuAndMemory profiling and Stacktrace sampling.

Analyzing and Visualizing

As already mentioned, the code in this repo operates on two types of input, on the output records of a profiler or on Spark log files. Since the design is compositional, the records can be mixed in one or split across several files.

To see which metrics were extracted, the method .get_available_metrics() returns a list of metric strings and is available on a ProfileParser or SparkLogParser object; the SparkLogParser object would simply call the get_available_metrics() function of its enclosed ProfileParser. The same logic applies to make_graph() which constructs a graph from all metric values and to the more selective get_metrics(['metric_name']) which builds a graph only for the metrics included in its the list argument.

To handle mixed JVM and PySpark profiles in the same file or to selectively build graphs for records from a specific profile subset, check this script.

Calling get_executor_logparsers on an AppParser object returns a list of all encapsulated ProfileParsers objects. Most class methods of AppParser delegate to or accumulate values of its ProfileParsers objects, a good demonstration is plot_application.py which contains many examples.

Analyzing and visualizing a local job / 4 riddles

The source code of the four riddles inside the spark_jobs folder was executed in "local mode". Several scripts that produce visualizations and reports of the output of these riddles are included in the analytics folder.

An example of a script that extracts "everything" -- metric graphs, Spark tasks/stage/job boundaries -- and visualizes that is pasted below:

from plotly.offline import plot
from plotly.graph_objs import Figure, Scatter
from typing import List
from parsers import ProfileParser, SparkLogParser
from helper import get_max_y


# Create a ProfileParser object to extract metrics graph:
profile_file = './data/ProfileStraggler/CpuAndMemory.json.gz'  # Output from JVM profiler
profile_parser = ProfileParser(profile_file, normalize=True)  # normalize various metrics
data_points: List[Scatter] = profile_parser.make_graph()  # create graph lines of various metrics

# Create a SparkLogParser object to extract task/stage/job boundaries:
log_file = './data/ProfileStraggler/JobStraggler.log.gz'  # standard Spark log
log_parser = SparkLogParser(log_file)

max: int = get_max_y(data_points)  # maximum y-value used to scale task lines extracted below:s
task_data: List[Scatter] = log_parser.graph_tasks(max)  # create graph lines of all Spark tasks
data_points.extend(task_data)

stage_interval_markers: Scatter = log_parser.extract_stage_markers()  # extract stage boundaries and will show on x-axis
data_points.append(stage_interval_markers)
layout = log_parser.extract_job_markers(max)  # extracts job boundaries and will show as vertical dotted lines

# Plot the actual gaph and save it in 'everything.html'
fig = Figure(data=data_points, layout=layout)
plot(fig, filename='everything.html')

Analyzing and visualizing a distributed application

When launching a distributed application, Spark executors run on multiple nodes in a cluster and produce several log files, one per executor/container. In a cloud environment like AWS, these log files will be organized in the following structure:

s3://aws-logs/elasticmapreduce/clusterid-1/containers/application_1_0001/container_1_001/
                                                                                         stderr.gz
                                                                                         stdout.gz
s3://aws-logs/elasticmapreduce/clusterid-1/containers/application_1_0001/container_1_002/
                                                                                         stderr.gz
                                                                                         stdout.gz
[...]
s3://aws-logs/elasticmapreduce/clusterid-1/containers/application_1_0001/container_1_N/
                                                                                         stderr.gz
                                                                                         stdout.gz

[...]

s3://aws-logs/elasticmapreduce/clusterid-M/containers/application_K_0001/container_K_L/
                                                                                         stderr.gz
                                                                                         stdout.gz

An EMR cluster like clusterid-1 might run several Spark applications consecutively, each one as its own step. Each application launched a number of containers, application_1_0001 for example launched executors container_1_001, container_1_002, ..., container_1_N. Each of these container created a standard error and a standard out file on S3. In order to analyze a particular application like application_1_0001 above, all of its associated log files like .../application_1_0001/container_1_001/stderr.gz and .../application_1_0001/container_1_001/stdout.gz are needed. The easiest way is to collect all files under the application folder using a command like ...

aws s3 cp --recursive s3://aws-logs/elasticmapreduce/clusterid-1/containers/application_1_0001/ ./application_1_0001/

... and then to create an AppParser object object like

from parsers import AppParser
app_path = './application_1_0001/'  # path to the application directory downloaded from s3 above
app_parser = AppParser(app_path)

This object creates a number of SparkLogParser objects internally (one for each container) and automatically identifies the "master" log file created by the Spark driver (likely located under application_1_0001/container_1_001/). Several useful functions can now be called on app_parser, example scripts are located in the analytics folder and more detailled explanations can be found in the readme file.

Finding Error messages and top log chunks

The script (extract_heckler)[https://github.com/g1thubhub/phil_stopwatch/blob/master/analytics/extract_heckler.py] shows how to extract top log chunks and the most recent error messages from an individual log file or from a collection of log files that form an application:

In the case of "top log chunks", the function SparkLogParser.get_top_log_chunks applies a pattern matching and collapsing algorithm across multiple consecutive log lines and creates a ranked list of these top log chunks as output.

The function AppParser.extract_errors() tries to deduplicate potential exceptions and error messages and will print them out in reverse chronological order. An exception or error message might occur several times during a run with slight variations (e.g., different timestamps or code line numbers) but the last occurrence is the most important one for debugging purposes since it might be the direct cause for the failure.

Creating Flame Graphs

The profilers described above might produce stacktraces -- Stacktrace.json files in the case of the JVM profiler and s_N_stack.json files in the case of the PySpark profilers. These outputs can be folded and transformed into flame graphs with the help of my fold_stacks.py script and this external script: flamegraph.pl

For JVM stack traces like ./analytics/data/ProfileFatso/StacktraceFatso.json.gz, use

Phils-MacBook-Pro:analytics a$ python3 fold_stacks.py ./analytics/data/ProfileFatso/StacktraceFatso.json.gz  > Fatso.folded
Phils-MacBook-Pro:analytics a$ perl flamegraph.pl Fatso.folded > FatsoFlame.svg

The final output file FatsoFlame.svg can be opened in a browser. The procedure is identical for PySpark stacktraces like ./analytics/data/profile_fatso/s_8_stack.json:

Phils-MacBook-Pro:analytics a$ python3 fold_stacks.py ./analytics/data/profile_fatso/s_8_stack.json  > FatsoPyspark.folded
Phils-MacBook-Pro:analytics a$ perl flamegraph.pl  FatsoPyspark.folded  > FatsoPySparkFlame.svg

The script plot_slacker.py mentions the steps needed to create a combined JVM/PySpark flame graph.

Made at https://github.com/g1thubhub/phil_stopwatch by [email protected]