Skip to content

Commit

Permalink
Support for custom MetricsFetcher in Perf tooling. (#28671)
Browse files Browse the repository at this point in the history
* Support for custom BigQueryMetricsFetcher

* Read GITHUB repo and owner name from environment variables

* Add test_name, test_id

* Move client to the fetch method

* Update skip condition

* Run on self hosted runner

* Update readme

* Update README

* Pass test_name to the metrics_fetcher

* Fix linting issues

* Fix lint

* Fix formatting and lint issues

* fix lint
  • Loading branch information
AnandInguva authored Oct 5, 2023
1 parent af491eb commit 5446776
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 120 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run_perf_alert_tool.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ on:
jobs:
python_run_change_point_analysis:
name: Run Change Point Analysis.
runs-on: ubuntu-latest
runs-on: [self-hosted, ubuntu-20.04, main]
permissions:
issues: write
steps:
Expand Down
54 changes: 42 additions & 12 deletions sdks/python/apache_beam/testing/analyzers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,13 @@ update already created GitHub issue or ignore performance alert by not creating

## Config file structure

The config file defines the structure to run change point analysis on a given test. To add a test to the config file,
The yaml defines the structure to run change point analysis on a given test. To add a test config to the yaml file,
please follow the below structure.

**NOTE**: The Change point analysis only supports reading the metric data from Big Query for now.
**NOTE**: The Change point analysis only supports reading the metric data from `BigQuery` only.

```
# the test_1 must be a unique id.
test_1:
test_description: Pytorch image classification on 50k images of size 224 x 224 with resnet 152
test_target: apache_beam.testing.benchmarks.inference.pytorch_image_classification_benchmarks
test_1: # a unique id for each test config.
metrics_dataset: beam_run_inference
metrics_table: torch_inference_imagenet_results_resnet152
project: apache-beam-testing
Expand All @@ -55,11 +52,17 @@ test_1:
num_runs_in_change_point_window: 30 # optional parameter
```

**NOTE**: `test_target` is optional. It is used for identifying the test that was causing the regression.
#### Optional Parameters:

**Note**: By default, the tool fetches metrics from BigQuery tables. `metrics_dataset`, `metrics_table`, `project` and `metric_name` should match with the values defined for performance/load tests.
The above example uses this [test configuration](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30)
to fill up the values required to fetch the data from source.
These are the optional parameters that can be added to the test config in addition to the parameters mentioned above.

- `test_target`: Identifies the test responsible for the regression.

- `test_description`: Provides a brief overview of the test's function.

- `test_name`: Denotes the name of the test as stored in the BigQuery table.

**Note**: The tool, by default, pulls metrics from BigQuery tables. Ensure that the values for `metrics_dataset`, `metrics_table`, `project`, and `metric_name` align with those defined for performance/load tests. The provided example utilizes this [test configuration](https://github.com/apache/beam/blob/0a91d139dea4276dc46176c4cdcdfce210fc50c4/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L30) to populate the necessary values for data retrieval.

### Different ways to avoid false positive change points

Expand All @@ -76,8 +79,35 @@ setting `num_runs_in_change_point_window=7` will achieve it.

## Register a test for performance alerts

If a new test needs to be registered for the performance alerting tool, please add the required test parameters to the
config file.
If a new test needs to be registered for the performance alerting tool,

- You can either add it to the config file that is already present.
- You can define your own yaml file and call the [perf_analysis.run()](https://github.com/apache/beam/blob/a46bc12a256dcaa3ae2cc9e5d6fdcaa82b59738b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py#L152) method.


## Integrating the Perf Alert Tool with a Custom BigQuery Schema

By default, the Perf Alert Tool retrieves metrics from the `apache-beam-testing` BigQuery projects. All performance and load tests within Beam utilize a standard [schema](https://github.com/apache/beam/blob/a7e12db9b5977c4a7b13554605c0300389a3d6da/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py#L70) for metrics publication. The tool inherently recognizes and operates with this schema when extracting metrics from BigQuery tables.

To fetch the data from a BigQuery dataset that is not a default setting of the Apache Beam's setting, One can inherit the `MetricsFetcher` class and implement the abstract method `fetch_metric_data`. This method should return a tuple of desired metric values and timestamps of the metric values of when it was published.

```
from apache_beam.testing.analyzers import perf_analysis
config_file_path = <path_to_config_file>
my_metric_fetcher = MyMetricsFetcher() # inherited from MetricsFetcher
perf_analysis.run(config_file_path, my_metrics_fetcher)
```

``Note``: The metrics and timestamps should be sorted based on the timestamps values in ascending order.

### Configuring GitHub Parameters

Out of the box, the performance alert tool targets the `apache/beam` repository when raising issues. If you wish to utilize this tool for another repository, you'll need to pre-set a couple of environment variables:

- `REPO_OWNER`: Represents the owner of the repository. (e.g., `apache`)
- `REPO_NAME`: Specifies the repository name itself. (e.g., `beam`)

Before initiating the tool, also ensure that the `GITHUB_TOKEN` is set to an authenticated GitHub token. This permits the tool to generate GitHub issues whenever performance alerts arise.

## Triage performance alert issues

Expand Down
34 changes: 19 additions & 15 deletions sdks/python/apache_beam/testing/analyzers/github_issues_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
'A Github Personal Access token is required '
'to create Github Issues.')

_BEAM_GITHUB_REPO_OWNER = 'apache'
_BEAM_GITHUB_REPO_NAME = 'beam'
_GITHUB_REPO_OWNER = os.environ.get('REPO_OWNER', 'apache')
_GITHUB_REPO_NAME = os.environ.get('REPO_NAME', 'beam')
# Adding GitHub Rest API version to the header to maintain version stability.
# For more information, please look at
# https://github.blog/2022-11-28-to-infinity-and-beyond-enabling-the-future-of-githubs-rest-api-with-api-versioning/ # pylint: disable=line-too-long
Expand Down Expand Up @@ -77,10 +77,10 @@ def create_issue(
Tuple containing GitHub issue number and issue URL.
"""
url = "https://api.github.com/repos/{}/{}/issues".format(
_BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME)
_GITHUB_REPO_OWNER, _GITHUB_REPO_NAME)
data = {
'owner': _BEAM_GITHUB_REPO_OWNER,
'repo': _BEAM_GITHUB_REPO_NAME,
'owner': _GITHUB_REPO_OWNER,
'repo': _GITHUB_REPO_NAME,
'title': title,
'body': description,
'labels': [_AWAITING_TRIAGE_LABEL, _PERF_ALERT_LABEL]
Expand Down Expand Up @@ -108,20 +108,20 @@ def comment_on_issue(issue_number: int,
issue, and the comment URL.
"""
url = 'https://api.github.com/repos/{}/{}/issues/{}'.format(
_BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
_GITHUB_REPO_OWNER, _GITHUB_REPO_NAME, issue_number)
open_issue_response = requests.get(
url,
json.dumps({
'owner': _BEAM_GITHUB_REPO_OWNER,
'repo': _BEAM_GITHUB_REPO_NAME,
'owner': _GITHUB_REPO_OWNER,
'repo': _GITHUB_REPO_NAME,
'issue_number': issue_number
},
default=str),
headers=_HEADERS).json()
if open_issue_response['state'] == 'open':
data = {
'owner': _BEAM_GITHUB_REPO_OWNER,
'repo': _BEAM_GITHUB_REPO_NAME,
'owner': _GITHUB_REPO_OWNER,
'repo': _GITHUB_REPO_NAME,
'body': comment_description,
issue_number: issue_number,
}
Expand All @@ -134,13 +134,14 @@ def comment_on_issue(issue_number: int,

def add_awaiting_triage_label(issue_number: int):
url = 'https://api.github.com/repos/{}/{}/issues/{}/labels'.format(
_BEAM_GITHUB_REPO_OWNER, _BEAM_GITHUB_REPO_NAME, issue_number)
_GITHUB_REPO_OWNER, _GITHUB_REPO_NAME, issue_number)
requests.post(
url, json.dumps({'labels': [_AWAITING_TRIAGE_LABEL]}), headers=_HEADERS)


def get_issue_description(
test_name: str,
test_id: str,
test_name: Optional[str],
metric_name: str,
timestamps: List[pd.Timestamp],
metric_values: List,
Expand All @@ -167,10 +168,13 @@ def get_issue_description(

description = []

description.append(_ISSUE_DESCRIPTION_TEMPLATE.format(test_name, metric_name))
description.append(_ISSUE_DESCRIPTION_TEMPLATE.format(test_id, metric_name))

description.append(("`Test description:` " +
f'{test_description}') if test_description else '')
if test_name:
description.append(("`test_name:` " + f'{test_name}'))

if test_description:
description.append(("`Test description:` " + f'{test_description}'))

description.append('```')

Expand Down
55 changes: 33 additions & 22 deletions sdks/python/apache_beam/testing/analyzers/perf_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import argparse
import logging
import os
import uuid
from datetime import datetime
from datetime import timezone
from typing import Any
Expand All @@ -33,20 +32,21 @@
import pandas as pd

from apache_beam.testing.analyzers import constants
from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher
from apache_beam.testing.analyzers.perf_analysis_utils import GitHubIssueMetaData
from apache_beam.testing.analyzers.perf_analysis_utils import MetricsFetcher
from apache_beam.testing.analyzers.perf_analysis_utils import create_performance_alert
from apache_beam.testing.analyzers.perf_analysis_utils import fetch_metric_data
from apache_beam.testing.analyzers.perf_analysis_utils import find_latest_change_point_index
from apache_beam.testing.analyzers.perf_analysis_utils import get_existing_issues_data
from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window
from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
from apache_beam.testing.analyzers.perf_analysis_utils import publish_issue_metadata_to_big_query
from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config
from apache_beam.testing.analyzers.perf_analysis_utils import validate_config
from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsFetcher


def run_change_point_analysis(params, test_name, big_query_metrics_fetcher):
def run_change_point_analysis(
params, test_id, big_query_metrics_fetcher: MetricsFetcher):
"""
Args:
params: Dict containing parameters to run change point analysis.
Expand All @@ -56,14 +56,21 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher):
Returns:
bool indicating if a change point is observed and alerted on GitHub.
"""
logging.info("Running change point analysis for test %s" % test_name)
logging.info("Running change point analysis for test ID %s" % test_id)
if not validate_config(params.keys()):
raise ValueError(
f"Please make sure all these keys {constants._PERF_TEST_KEYS} "
f"are specified for the {test_name}")
f"are specified for the {test_id}")

metric_name = params['metric_name']

# test_name will be used to query a single test from
# multiple tests in a single BQ table. Right now, the default
# assumption is that all the test have an individual BQ table
# but this might not be case for other tests(such as IO tests where
# a single BQ tables stores all the data)
test_name = params.get('test_name', None)

min_runs_between_change_points = (
constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS)
if 'min_runs_between_change_points' in params:
Expand All @@ -74,15 +81,18 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher):
if 'num_runs_in_change_point_window' in params:
num_runs_in_change_point_window = params['num_runs_in_change_point_window']

metric_values, timestamps = fetch_metric_data(
params=params,
big_query_metrics_fetcher=big_query_metrics_fetcher
metric_values, timestamps = big_query_metrics_fetcher.fetch_metric_data(
project=params['project'],
metrics_dataset=params['metrics_dataset'],
metrics_table=params['metrics_table'],
metric_name=params['metric_name'],
test_name=test_name
)

change_point_index = find_latest_change_point_index(
metric_values=metric_values)
if not change_point_index:
logging.info("Change point is not detected for the test %s" % test_name)
logging.info("Change point is not detected for the test ID %s" % test_id)
return False
# since timestamps are ordered in ascending order and
# num_runs_in_change_point_window refers to the latest runs,
Expand All @@ -92,11 +102,11 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher):
if not is_change_point_in_valid_window(num_runs_in_change_point_window,
latest_change_point_run):
logging.info(
'Performance regression/improvement found for the test: %s. '
'Performance regression/improvement found for the test ID: %s. '
'on metric %s. Since the change point run %s '
'lies outside the num_runs_in_change_point_window distance: %s, '
'alert is not raised.' % (
test_name,
test_id,
metric_name,
latest_change_point_run + 1,
num_runs_in_change_point_window))
Expand All @@ -106,8 +116,7 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher):
last_reported_issue_number = None
issue_metadata_table_name = f'{params.get("metrics_table")}_{metric_name}'
existing_issue_data = get_existing_issues_data(
table_name=issue_metadata_table_name,
big_query_metrics_fetcher=big_query_metrics_fetcher)
table_name=issue_metadata_table_name)

if existing_issue_data is not None:
existing_issue_timestamps = existing_issue_data[
Expand All @@ -124,20 +133,21 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher):
min_runs_between_change_points=min_runs_between_change_points)
if is_alert:
issue_number, issue_url = create_performance_alert(
metric_name, test_name, timestamps,
metric_name, test_id, timestamps,
metric_values, change_point_index,
params.get('labels', None),
last_reported_issue_number,
test_description = params.get('test_description', None),
test_name = test_name
)

issue_metadata = GitHubIssueMetaData(
issue_timestamp=pd.Timestamp(
datetime.now().replace(tzinfo=timezone.utc)),
# BQ doesn't allow '.' in table name
test_name=test_name.replace('.', '_'),
test_id=test_id.replace('.', '_'),
test_name=test_name,
metric_name=metric_name,
test_id=uuid.uuid4().hex,
change_point=metric_values[change_point_index],
issue_number=issue_number,
issue_url=issue_url,
Expand All @@ -149,7 +159,10 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher):
return is_alert


def run(config_file_path: Optional[str] = None) -> None:
def run(
big_query_metrics_fetcher: MetricsFetcher = BigQueryMetricsFetcher(),
config_file_path: Optional[str] = None,
) -> None:
"""
run is the entry point to run change point analysis on test metric
data, which is read from config file, and if there is a performance
Expand All @@ -169,12 +182,10 @@ def run(config_file_path: Optional[str] = None) -> None:

tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path)

big_query_metrics_fetcher = BigQueryMetricsFetcher()

for test_name, params in tests_config.items():
for test_id, params in tests_config.items():
run_change_point_analysis(
params=params,
test_name=test_name,
test_id=test_id,
big_query_metrics_fetcher=big_query_metrics_fetcher)


Expand Down
Loading

0 comments on commit 5446776

Please sign in to comment.