Skip to content

Commit

Permalink
Add dataclasses to perf alert tool and refactor code. (#28889)
Browse files Browse the repository at this point in the history
* Refactor code with Dataclasses
Refactor

* Add pipe to add extra line for test_description

* Fix lint
  • Loading branch information
AnandInguva authored Oct 9, 2023
1 parent 76fbb8e commit 6046297
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 167 deletions.
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/testing/analyzers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Peformance alert tooling for Apache Beam. No backwards compatibility
guarantees.
"""
43 changes: 21 additions & 22 deletions sdks/python/apache_beam/testing/analyzers/github_issues_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
from typing import Optional
from typing import Tuple

import pandas as pd
import requests

from apache_beam.testing.analyzers import constants
from apache_beam.testing.analyzers.perf_analysis_utils import MetricContainer
from apache_beam.testing.analyzers.perf_analysis_utils import TestConfigContainer

try:
_GITHUB_TOKEN: Optional[str] = os.environ['GITHUB_TOKEN']
Expand Down Expand Up @@ -140,25 +141,18 @@ def add_awaiting_triage_label(issue_number: int):


def get_issue_description(
test_id: str,
test_name: Optional[str],
metric_name: str,
timestamps: List[pd.Timestamp],
metric_values: List,
test_config_container: TestConfigContainer,
metric_container: MetricContainer,
change_point_index: int,
max_results_to_display: int = 5,
test_description: Optional[str] = None,
) -> str:
"""
Args:
metric_name: Metric name used for the Change Point Analysis.
timestamps: Timestamps of the metrics when they were published to the
Database. Timestamps are expected in ascending order.
metric_values: metric values for the previous runs.
change_point_index: Index for the change point. The element in the
index of the metric_values would be the change point.
max_results_to_display: Max number of results to display from the change
point index, in both directions of the change point index.
test_config_container: TestConfigContainer containing test metadata.
metric_container: MetricContainer containing metric data.
change_point_index: Index of the change point in the metric data.
max_results_to_display: Max number of results to display from the change
point index, in both directions of the change point index.
Returns:
str: Description used to fill the GitHub issues description.
Expand All @@ -168,25 +162,30 @@ def get_issue_description(

description = []

description.append(_ISSUE_DESCRIPTION_TEMPLATE.format(test_id, metric_name))
description.append(
_ISSUE_DESCRIPTION_TEMPLATE.format(
test_config_container.test_id, test_config_container.metric_name))

if test_name:
description.append(("`test_name:` " + f'{test_name}'))
if test_config_container.test_name:
description.append(("`test_name:` " + f'{test_config_container.test_name}'))

if test_description:
description.append(("`Test description:` " + f'{test_description}'))
if test_config_container.test_description:
description.append(
("`Test description:` " + f'{test_config_container.test_description}'))

description.append('```')

runs_to_display = []
max_timestamp_index = min(
change_point_index + max_results_to_display, len(metric_values) - 1)
change_point_index + max_results_to_display,
len(metric_container.values) - 1)
min_timestamp_index = max(0, change_point_index - max_results_to_display)

# run in reverse to display the most recent runs first.
for i in reversed(range(min_timestamp_index, max_timestamp_index + 1)):
row_template = _METRIC_INFO_TEMPLATE.format(
timestamps[i].ctime(), format(metric_values[i], '.2f'))
metric_container.timestamps[i].ctime(),
format(metric_container.values[i], '.2f'))
if i == change_point_index:
row_template += constants._ANOMALY_MARKER
runs_to_display.append(row_template)
Expand Down
144 changes: 94 additions & 50 deletions sdks/python/apache_beam/testing/analyzers/perf_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,66 +34,103 @@

from apache_beam.testing.analyzers import constants
from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher
from apache_beam.testing.analyzers.perf_analysis_utils import ChangePointConfig
from apache_beam.testing.analyzers.perf_analysis_utils import GitHubIssueMetaData
from apache_beam.testing.analyzers.perf_analysis_utils import MetricsFetcher
from apache_beam.testing.analyzers.perf_analysis_utils import TestConfigContainer
from apache_beam.testing.analyzers.perf_analysis_utils import create_performance_alert
from apache_beam.testing.analyzers.perf_analysis_utils import find_latest_change_point_index
from apache_beam.testing.analyzers.perf_analysis_utils import get_existing_issues_data
from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window
from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
from apache_beam.testing.analyzers.perf_analysis_utils import is_sibling_change_point
from apache_beam.testing.analyzers.perf_analysis_utils import publish_issue_metadata_to_big_query
from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config
from apache_beam.testing.analyzers.perf_analysis_utils import validate_config


def get_test_config_container(
params: Dict[str, Any],
test_id: str,
) -> TestConfigContainer:
"""
Args:
params: Dict containing parameters to run change point analysis.
Returns:
TestConfigContainer object containing test config parameters.
"""
return TestConfigContainer(
project=params['project'],
metrics_dataset=params['metrics_dataset'],
metrics_table=params['metrics_table'],
metric_name=params['metric_name'],
test_id=test_id,
test_description=params['test_description'],
test_name=params.get('test_name', None),
labels=params.get('labels', None),
)


def get_change_point_config(params: Dict[str, Any], ) -> ChangePointConfig:
"""
Args:
params: Dict containing parameters to run change point analysis.
Returns:
ChangePointConfig object containing change point analysis parameters.
"""
return ChangePointConfig(
min_runs_between_change_points=params.get(
'min_runs_between_change_points',
constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS),
num_runs_in_change_point_window=params.get(
'num_runs_in_change_point_window',
constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW))


def run_change_point_analysis(
params, test_id, big_query_metrics_fetcher: MetricsFetcher):
test_config_container: TestConfigContainer,
big_query_metrics_fetcher: MetricsFetcher,
change_point_config: ChangePointConfig = ChangePointConfig(),
):
"""
Args:
params: Dict containing parameters to run change point analysis.
test_id: Test id for the current test.
test_config_container: TestConfigContainer containing test metadata for
fetching data and running change point analysis.
big_query_metrics_fetcher: BigQuery metrics fetcher used to fetch data for
change point analysis.
change_point_config: ChangePointConfig containing parameters to run
change point analysis.
Returns:
bool indicating if a change point is observed and alerted on GitHub.
"""
logging.info("Running change point analysis for test ID %s" % test_id)
if not validate_config(params.keys()):
raise ValueError(
f"Please make sure all these keys {constants._PERF_TEST_KEYS} "
f"are specified for the {test_id}")

metric_name = params['metric_name']
logging.info(
"Running change point analysis for test ID %s" %
test_config_container.test_id)

# test_name will be used to query a single test from
# multiple tests in a single BQ table. Right now, the default
# assumption is that all the test have an individual BQ table
# but this might not be case for other tests(such as IO tests where
# a single BQ tables stores all the data)
test_name = params.get('test_name', None)
test_name = test_config_container.test_name

min_runs_between_change_points = (
constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS)
if 'min_runs_between_change_points' in params:
min_runs_between_change_points = params['min_runs_between_change_points']
change_point_config.min_runs_between_change_points)

num_runs_in_change_point_window = (
constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW)
if 'num_runs_in_change_point_window' in params:
num_runs_in_change_point_window = params['num_runs_in_change_point_window']

metric_values, timestamps = big_query_metrics_fetcher.fetch_metric_data(
project=params['project'],
metrics_dataset=params['metrics_dataset'],
metrics_table=params['metrics_table'],
metric_name=params['metric_name'],
test_name=test_name
)
change_point_config.num_runs_in_change_point_window)

metric_container = big_query_metrics_fetcher.fetch_metric_data(
test_config=test_config_container)
metric_container.sort_by_timestamp()

metric_values = metric_container.values
timestamps = metric_container.timestamps

change_point_index = find_latest_change_point_index(
metric_values=metric_values)
if not change_point_index:
logging.info("Change point is not detected for the test ID %s" % test_id)
logging.info(
"Change point is not detected for the test ID %s" %
test_config_container.test_id)
return False
# since timestamps are ordered in ascending order and
# num_runs_in_change_point_window refers to the latest runs,
Expand All @@ -107,15 +144,17 @@ def run_change_point_analysis(
'on metric %s. Since the change point run %s '
'lies outside the num_runs_in_change_point_window distance: %s, '
'alert is not raised.' % (
test_id,
metric_name,
test_config_container.test_id,
test_config_container.metric_name,
latest_change_point_run + 1,
num_runs_in_change_point_window))
return False

is_alert = True
is_valid_change_point = True
last_reported_issue_number = None
issue_metadata_table_name = f'{params.get("metrics_table")}_{metric_name}'
issue_metadata_table_name = (
f'{test_config_container.metrics_table}_{test_config_container.metric_name}' # pylint: disable=line-too-long
)
existing_issue_data = get_existing_issues_data(
table_name=issue_metadata_table_name)

Expand All @@ -127,37 +166,39 @@ def run_change_point_analysis(
# convert numpy.int64 to int
last_reported_issue_number = last_reported_issue_number.item()

is_alert = is_perf_alert(
is_valid_change_point = is_sibling_change_point(
previous_change_point_timestamps=existing_issue_timestamps,
change_point_index=change_point_index,
timestamps=timestamps,
min_runs_between_change_points=min_runs_between_change_points)
if is_alert:
min_runs_between_change_points=min_runs_between_change_points,
test_id=test_config_container.test_id)
if is_valid_change_point:
issue_number, issue_url = create_performance_alert(
metric_name, test_id, timestamps,
metric_values, change_point_index,
params.get('labels', None),
last_reported_issue_number,
test_description = params.get('test_description', None),
test_name = test_name
test_config_container=test_config_container,
metric_container=metric_container,
change_point_index=change_point_index,
existing_issue_number=last_reported_issue_number,
)

issue_metadata = GitHubIssueMetaData(
issue_timestamp=pd.Timestamp(
datetime.now().replace(tzinfo=timezone.utc)),
# BQ doesn't allow '.' in table name
test_id=test_id.replace('.', '_'),
test_id=test_config_container.test_id.replace('.', '_'),
test_name=test_name or uuid.uuid4().hex,
metric_name=metric_name,
metric_name=test_config_container.metric_name,
change_point=metric_values[change_point_index],
issue_number=issue_number,
issue_url=issue_url,
change_point_timestamp=timestamps[change_point_index])
change_point_timestamp=timestamps[change_point_index],
)

publish_issue_metadata_to_big_query(
issue_metadata=issue_metadata, table_name=issue_metadata_table_name)

return is_alert
issue_metadata=issue_metadata,
table_name=issue_metadata_table_name,
project=test_config_container.project,
)
return is_valid_change_point


def run(
Expand Down Expand Up @@ -185,10 +226,13 @@ def run(
tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path)

for test_id, params in tests_config.items():
test_config_container = get_test_config_container(params, test_id=test_id)
change_point_config = get_change_point_config(params)
run_change_point_analysis(
params=params,
test_id=test_id,
big_query_metrics_fetcher=big_query_metrics_fetcher)
test_config_container=test_config_container,
big_query_metrics_fetcher=big_query_metrics_fetcher,
change_point_config=change_point_config,
)


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 6046297

Please sign in to comment.