Performance Analyzer is an agent and REST API that allows you to query numerous performance metrics for your cluster, including aggregations of those metrics, independent of the Java Virtual Machine (JVM). PerfTop is the default command line interface (CLI) for displaying those metrics. The two main components of performance analyzer are -
- Performance analyzer plugin aka writer
- Performance analyzer application aka reader
The performance analyzer plugin captures important events in OpenSearch and writes them to a shared memory. These events are then analyzed separately by the reader. Having separate processes, gives us better isolation between OpenSearch and metrics collection/aggregation.
The performance analyzer application is written in Java, and this allows us to share code with the writer easily. Java libraries like jdbc and jooq made it very easy to generate sql programmatically.
- MetricsProcessor - Periodically (every 5 seconds) processes all the events and statistics generated by the writer and generates metrics out of them. These metrics are then stored in metricsDB.
- MetricsDB - Sqlite database that contains all the metrics that can be queried by the client. We create a new db every 5 seconds.
- Webservice - A http server that serves client API requests. It services client requests by querying metricsDB for the relevant metrics requested by the client.
The writer has two different policies for lifecycle management of data in shared memory
- A new file is created in shared memory for every event. These files are then deleted by the writer once their age crosses a threshold. This strategy is used for http, request and cluster_manager events.
- Samples of statistics are written to a single file, which is overwritten every 5 seconds. This strategy is used for both operating system statistics(cpu, rss) and node level OpenSearch statistics(circuit breaker).
The reader scans the shared memory directory for updates every 2.5 seconds to make sure that no update from the writer is missed. This scheme avoids explicit synchronization between the reader and writer processes.
The metrics processor orchestrates work between sub-components. It schedules periodic jobs to process events generated by the writer. Events are then processed into five second snapshots. Metrics are generated from these snapshots every five seconds and written to a new metricsDB file. Old snapshot and db objects are deleted when their age crosses a certain threshold.
There are two main sub-components in the Metric Processor
- MetricsParser - This parses event files from shared memory and populates an in-memory database with the necessary data. The order in which the events are parsed is not important as the next stage(emitter) starts only after the parsing for the current snapshot is complete.
- MetricsEmitter - This stage queries the in-memory database for all the data and then populates the on-disk MetricDB. The emitter stage is triggered after all the metrics for a given time-period are processed by the metrics parser. For example - The CPUUtilization metric for a five second snapshot can only be computed after all the shard events as well as OS statistics are parsed and updated. It is important that the MetricsEmitter is invoked only after all the necessary events are completely parsed by the MetricsParser, as we may not be able to generate all the metrics if some events are missing.
The metrics processor does not support concurrent execution and it processes all the events generated by the writer one by one in the metricsParser stage before proceeding to the metricsEmitter stage. Metrics are visible to the client only after the metricsEmitter stage completes for all metrics. If metrics processing is not completed before the next run (2.5 seconds), then the metrics processing for the next run is scheduled only after the current run completes. Hence, if it takes longer than 5 seconds to process metrics we will start missing samples.
Each sub-component handles all known exceptions, logs them and continues processing. Unknown errors on the other hand will bubble up and cause the reader metrics processor to restart.
The metrics parser parses files in shared memory and writes the data into in-memory snapshots in sqlite db. We have multiple parsers and utility functions that make it easy to parse files and import data into a snaps.
After parsing, we batch all updates and apply it via a single update operation to an in-memory sqlite database. Using batch updates gave us a 10x performance boost when compared to single record updates.
Snapshots are in-memory representation of data in sqlite tables. They make it easy for us to correlate data across different types of events. Snapshots can be deleted after all the corresponding metrics from the snapshot are written to the metrics database.
Sqlite is a very popular database that can be embedded into our process with minimal overhead. The memory footprint with sqlite and java is around 400mb for heavy workloads. The reader uses JDBC and JOOQ libraries to programmatically construct SQL statements and submit them to sqlite. These libraries helps us move fast and makes the code more readable.
From our tests sqlite was able to handle a peak throughput of 100k writes per second and run complex queries in milliseconds. The CPU Utilization was negligible compared to OpenSearch, while the memory footprint of the reader process after a few days of constant metrics processing was around 400mb.
The Performance Analyzer plugin currently tracks two different types of events.
- HTTP requests - Events emitted when we receive and respond to a customer request.
- Shard requests - Events emitted when processing internal requests that are generated from a single customer request. For example - a single http search request from the customer on an index with multiple shards can result in multiple shardQuery and shardFetch events.
In some edge cases, the writer plugin might emit a start event but not an end event and vice versa. For example, when OpenSearch crashes and restarts while a request is being processed, the writer will not be able to write an end event. In order to handle such cases we take the following steps
- In unlikely cases where there was an end event but no corresponding start event, we simply ignore the end event.
- In case of missing end events, we ignore start events older than a threshold(10 mins)
- Certain operations like shardBulk run from start to finish on a single thread. For such operations, we assume that the operation has ended, if we see a new operation being executed on the same thread.
OS and node statistics samples are collected and updated by the writer every 5 seconds. The reader process, checks shared memory for changes every 2.5 seconds to make sure we don't miss any updates.
The reader and writer are separate processes and there is no explicit co-ordination between them. The timestamps at which the reader generates metrics does not coincide with the timestamps at which the writer emitted the sample. For example, consider the scenario where the writer emits CPU Utilization statistics at 7, 12, 18 and 23 seconds of a minute. The reader metrics are generated at 5, 10, 15 and 20 seconds of the same min. Given that the writer is emitting a large number of events and is also subjected to unpredictable events like GC pauses, its very likely that there will be a skew in the time at which the statistics are emitted. In such cases the reader infers the value of a statistic at a given time with a time weighted average.
All snapshots expose an alignment function which will return the time weighted average for metrics in the requested time range. There is a corner case where we can have multiple samples for the time period that needs to be algined. The current implementation only considers the latest sample in that time period during alignment and can instead be enhanced to calculate the weighted average of all snapshots.
We can correlate data between different events to generate fine grained metrics. For example, lets consider OS statistics, which are emitted by the writer every 5 seconds. This provides us with information such as cpu and disk utilization on a per thread basis. Additionally, OpenSearch events like shardbulk and shardsearch give us information about the tasks that were being carried out on those threads. By correlating these two, we can calculate the resources used at a task level granularity.
On a large OpenSearch cluster with heavy workload, we often see hundreds of thousands of events coupled with hundreds of threads. Filters/aggregations in sql make it easy to work with such large amounts of data declaratively. SQL is also more concise, compared to iterative for loops and thus easier to read and maintain.
The metrics emitter queries in memory snapshots of data and then bulk loads them into metricsDB. This helps us process more than 100k updates per second on a single thread. A single emitter can emit multiple metrics. We currently have four emitters - request, http, node and cluster_manager. Every emitter queries an inmemory snapshot and then populates the results into the corresponding metricsDB tables. The ability to add new metrics and dimensions through configuration instead of code would be a nice enhancement.
The final metric data is stored in a metrics database. We create a new database file every 5 seconds. This helps us easily truncate old data when the database file is archived without the additional overhead of dropping tables and running optimize. MetricsDB also supports the ability to query and aggregate metrics across different dimensions. The query format is the same as the one exposed by the webservice.
Each metric is stored in its own table and the final aggregation is done across metrics tables and returned to the client. The query API is not as feature rich as a full fledged query language like sql, but powerful enough for metric aggregations. It currently does not support features like metricMath and filtering, but the design makes it easy to add this support in the future.
We support over 70 metrics with multiple dimensions. Storing all these metrics gets expensive very quickly. Instead of storing all metrics, we leave it to clients to frequently poll for metrics and aggregate/store them in a format of their choice.
We chose to create a metricsDB layer instead of directly querying the inmemory database for the following reasons -
- The metrics aggregation and processing logic is completely separate from the format of the inmemory snapshots. We don't expect any major changes to this code unless we add new features to the API.
- MetricsDB files are written to disk and archived for future reference. Hence, we can support a playback feature to understand cluster behavior across time by fetching archived database files.
- Snapshots have additional information like threadID and threadName which are not exposed as metrics,
but only used for correlation. Ignoring these unnecessary fields makes metricDB more compact.
The performance analyzer application exposes a http webservice on port 9600. The webservice layer parses the http request, queries metricsDB and sends the json response back to the client. Additionally, if the client requests for metrics from all nodes in the cluster the webservice makes requests to all nodes in the cluster, concatenates the results and sends it back to the client.
There are two endpoints supported by the webservice -
- /metrics - The primary URL for fetching metrics from performance analyzer. It supports three parameters
- metrics - Array of metric names.
- dimensions - Array of dimensions on which to aggregate metrics.
- agg - Array of aggregations ( sum, min, max, avg ) to use.
- /metrics/units - Returns the units associated with every metric supported by Performance Analyzer.
The reader holds very few long lived objects in the JVM heap and can run with a very small heap. We currently allocate 64MB of heap by default, but we can go lower on smaller instance types. We change some default JVM parameters like disabling parallel GC to further reduce memory footprint.
With the default settings the reader can process up to a 100k events per second on a single thread. In our testing this was enough to handle heavy indexing and search traffic on an i3.16xlarge instance type in EC2. The single thread also guarantees that we are not stealing more than one core from the OS and OpenSearch for metric processing. The memory footprint was consistent around 400m.
The reader is composed of multiple components and the following sections will cover each of these in more detail.
We use log4j for logging and the log4j configuration can be found in the config directory. By default only error logs are emitted, but the log level should be changed to “info” for production deployments.
Both the reader and writer classes are currently bundled into a single jar via the gradle build script.