diff --git a/sdks/python/apache_beam/testing/analyzers/__init__.py b/sdks/python/apache_beam/testing/analyzers/__init__.py index cce3acad34a4..136d9f5f5d8a 100644 --- a/sdks/python/apache_beam/testing/analyzers/__init__.py +++ b/sdks/python/apache_beam/testing/analyzers/__init__.py @@ -14,3 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +""" +Peformance alert tooling for Apache Beam. No backwards compatibility +guarantees. +""" diff --git a/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py index 82758be8f180..bbcd2a8b11b5 100644 --- a/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/github_issues_utils.py @@ -21,10 +21,11 @@ from typing import Optional from typing import Tuple -import pandas as pd import requests from apache_beam.testing.analyzers import constants +from apache_beam.testing.analyzers.perf_analysis_utils import MetricContainer +from apache_beam.testing.analyzers.perf_analysis_utils import TestConfigContainer try: _GITHUB_TOKEN: Optional[str] = os.environ['GITHUB_TOKEN'] @@ -140,25 +141,18 @@ def add_awaiting_triage_label(issue_number: int): def get_issue_description( - test_id: str, - test_name: Optional[str], - metric_name: str, - timestamps: List[pd.Timestamp], - metric_values: List, + test_config_container: TestConfigContainer, + metric_container: MetricContainer, change_point_index: int, max_results_to_display: int = 5, - test_description: Optional[str] = None, ) -> str: """ Args: - metric_name: Metric name used for the Change Point Analysis. - timestamps: Timestamps of the metrics when they were published to the - Database. Timestamps are expected in ascending order. - metric_values: metric values for the previous runs. - change_point_index: Index for the change point. The element in the - index of the metric_values would be the change point. - max_results_to_display: Max number of results to display from the change - point index, in both directions of the change point index. + test_config_container: TestConfigContainer containing test metadata. + metric_container: MetricContainer containing metric data. + change_point_index: Index of the change point in the metric data. + max_results_to_display: Max number of results to display from the change + point index, in both directions of the change point index. Returns: str: Description used to fill the GitHub issues description. @@ -168,25 +162,30 @@ def get_issue_description( description = [] - description.append(_ISSUE_DESCRIPTION_TEMPLATE.format(test_id, metric_name)) + description.append( + _ISSUE_DESCRIPTION_TEMPLATE.format( + test_config_container.test_id, test_config_container.metric_name)) - if test_name: - description.append(("`test_name:` " + f'{test_name}')) + if test_config_container.test_name: + description.append(("`test_name:` " + f'{test_config_container.test_name}')) - if test_description: - description.append(("`Test description:` " + f'{test_description}')) + if test_config_container.test_description: + description.append( + ("`Test description:` " + f'{test_config_container.test_description}')) description.append('```') runs_to_display = [] max_timestamp_index = min( - change_point_index + max_results_to_display, len(metric_values) - 1) + change_point_index + max_results_to_display, + len(metric_container.values) - 1) min_timestamp_index = max(0, change_point_index - max_results_to_display) # run in reverse to display the most recent runs first. for i in reversed(range(min_timestamp_index, max_timestamp_index + 1)): row_template = _METRIC_INFO_TEMPLATE.format( - timestamps[i].ctime(), format(metric_values[i], '.2f')) + metric_container.timestamps[i].ctime(), + format(metric_container.values[i], '.2f')) if i == change_point_index: row_template += constants._ANOMALY_MARKER runs_to_display.append(row_template) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py index 109e5bfcc286..27f8398a0fb3 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis.py @@ -34,66 +34,103 @@ from apache_beam.testing.analyzers import constants from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher +from apache_beam.testing.analyzers.perf_analysis_utils import ChangePointConfig from apache_beam.testing.analyzers.perf_analysis_utils import GitHubIssueMetaData from apache_beam.testing.analyzers.perf_analysis_utils import MetricsFetcher +from apache_beam.testing.analyzers.perf_analysis_utils import TestConfigContainer from apache_beam.testing.analyzers.perf_analysis_utils import create_performance_alert from apache_beam.testing.analyzers.perf_analysis_utils import find_latest_change_point_index from apache_beam.testing.analyzers.perf_analysis_utils import get_existing_issues_data from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window -from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert +from apache_beam.testing.analyzers.perf_analysis_utils import is_sibling_change_point from apache_beam.testing.analyzers.perf_analysis_utils import publish_issue_metadata_to_big_query from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config -from apache_beam.testing.analyzers.perf_analysis_utils import validate_config + + +def get_test_config_container( + params: Dict[str, Any], + test_id: str, +) -> TestConfigContainer: + """ + Args: + params: Dict containing parameters to run change point analysis. + Returns: + TestConfigContainer object containing test config parameters. + """ + return TestConfigContainer( + project=params['project'], + metrics_dataset=params['metrics_dataset'], + metrics_table=params['metrics_table'], + metric_name=params['metric_name'], + test_id=test_id, + test_description=params['test_description'], + test_name=params.get('test_name', None), + labels=params.get('labels', None), + ) + + +def get_change_point_config(params: Dict[str, Any], ) -> ChangePointConfig: + """ + Args: + params: Dict containing parameters to run change point analysis. + Returns: + ChangePointConfig object containing change point analysis parameters. + """ + return ChangePointConfig( + min_runs_between_change_points=params.get( + 'min_runs_between_change_points', + constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS), + num_runs_in_change_point_window=params.get( + 'num_runs_in_change_point_window', + constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW)) def run_change_point_analysis( - params, test_id, big_query_metrics_fetcher: MetricsFetcher): + test_config_container: TestConfigContainer, + big_query_metrics_fetcher: MetricsFetcher, + change_point_config: ChangePointConfig = ChangePointConfig(), +): """ Args: - params: Dict containing parameters to run change point analysis. - test_id: Test id for the current test. + test_config_container: TestConfigContainer containing test metadata for + fetching data and running change point analysis. big_query_metrics_fetcher: BigQuery metrics fetcher used to fetch data for change point analysis. + change_point_config: ChangePointConfig containing parameters to run + change point analysis. Returns: bool indicating if a change point is observed and alerted on GitHub. """ - logging.info("Running change point analysis for test ID %s" % test_id) - if not validate_config(params.keys()): - raise ValueError( - f"Please make sure all these keys {constants._PERF_TEST_KEYS} " - f"are specified for the {test_id}") - - metric_name = params['metric_name'] + logging.info( + "Running change point analysis for test ID %s" % + test_config_container.test_id) # test_name will be used to query a single test from # multiple tests in a single BQ table. Right now, the default # assumption is that all the test have an individual BQ table # but this might not be case for other tests(such as IO tests where # a single BQ tables stores all the data) - test_name = params.get('test_name', None) + test_name = test_config_container.test_name min_runs_between_change_points = ( - constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS) - if 'min_runs_between_change_points' in params: - min_runs_between_change_points = params['min_runs_between_change_points'] + change_point_config.min_runs_between_change_points) num_runs_in_change_point_window = ( - constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW) - if 'num_runs_in_change_point_window' in params: - num_runs_in_change_point_window = params['num_runs_in_change_point_window'] - - metric_values, timestamps = big_query_metrics_fetcher.fetch_metric_data( - project=params['project'], - metrics_dataset=params['metrics_dataset'], - metrics_table=params['metrics_table'], - metric_name=params['metric_name'], - test_name=test_name - ) + change_point_config.num_runs_in_change_point_window) + + metric_container = big_query_metrics_fetcher.fetch_metric_data( + test_config=test_config_container) + metric_container.sort_by_timestamp() + + metric_values = metric_container.values + timestamps = metric_container.timestamps change_point_index = find_latest_change_point_index( metric_values=metric_values) if not change_point_index: - logging.info("Change point is not detected for the test ID %s" % test_id) + logging.info( + "Change point is not detected for the test ID %s" % + test_config_container.test_id) return False # since timestamps are ordered in ascending order and # num_runs_in_change_point_window refers to the latest runs, @@ -107,15 +144,17 @@ def run_change_point_analysis( 'on metric %s. Since the change point run %s ' 'lies outside the num_runs_in_change_point_window distance: %s, ' 'alert is not raised.' % ( - test_id, - metric_name, + test_config_container.test_id, + test_config_container.metric_name, latest_change_point_run + 1, num_runs_in_change_point_window)) return False - is_alert = True + is_valid_change_point = True last_reported_issue_number = None - issue_metadata_table_name = f'{params.get("metrics_table")}_{metric_name}' + issue_metadata_table_name = ( + f'{test_config_container.metrics_table}_{test_config_container.metric_name}' # pylint: disable=line-too-long + ) existing_issue_data = get_existing_issues_data( table_name=issue_metadata_table_name) @@ -127,37 +166,39 @@ def run_change_point_analysis( # convert numpy.int64 to int last_reported_issue_number = last_reported_issue_number.item() - is_alert = is_perf_alert( + is_valid_change_point = is_sibling_change_point( previous_change_point_timestamps=existing_issue_timestamps, change_point_index=change_point_index, timestamps=timestamps, - min_runs_between_change_points=min_runs_between_change_points) - if is_alert: + min_runs_between_change_points=min_runs_between_change_points, + test_id=test_config_container.test_id) + if is_valid_change_point: issue_number, issue_url = create_performance_alert( - metric_name, test_id, timestamps, - metric_values, change_point_index, - params.get('labels', None), - last_reported_issue_number, - test_description = params.get('test_description', None), - test_name = test_name + test_config_container=test_config_container, + metric_container=metric_container, + change_point_index=change_point_index, + existing_issue_number=last_reported_issue_number, ) issue_metadata = GitHubIssueMetaData( issue_timestamp=pd.Timestamp( datetime.now().replace(tzinfo=timezone.utc)), # BQ doesn't allow '.' in table name - test_id=test_id.replace('.', '_'), + test_id=test_config_container.test_id.replace('.', '_'), test_name=test_name or uuid.uuid4().hex, - metric_name=metric_name, + metric_name=test_config_container.metric_name, change_point=metric_values[change_point_index], issue_number=issue_number, issue_url=issue_url, - change_point_timestamp=timestamps[change_point_index]) + change_point_timestamp=timestamps[change_point_index], + ) publish_issue_metadata_to_big_query( - issue_metadata=issue_metadata, table_name=issue_metadata_table_name) - - return is_alert + issue_metadata=issue_metadata, + table_name=issue_metadata_table_name, + project=test_config_container.project, + ) + return is_valid_change_point def run( @@ -185,10 +226,13 @@ def run( tests_config: Dict[str, Dict[str, Any]] = read_test_config(config_file_path) for test_id, params in tests_config.items(): + test_config_container = get_test_config_container(params, test_id=test_id) + change_point_config = get_change_point_config(params) run_change_point_analysis( - params=params, - test_id=test_id, - big_query_metrics_fetcher=big_query_metrics_fetcher) + test_config_container=test_config_container, + big_query_metrics_fetcher=big_query_metrics_fetcher, + change_point_config=change_point_config, + ) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py index 15344ab13b3a..5164c8d8fd36 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py @@ -16,7 +16,6 @@ # # pytype: skip-file -import datetime import logging import os import re @@ -33,15 +32,18 @@ from apache_beam.testing.analyzers import constants from apache_beam.testing.analyzers import github_issues_utils from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher + from apache_beam.testing.analyzers.perf_analysis_utils import MetricContainer + from apache_beam.testing.analyzers.perf_analysis_utils import TestConfigContainer from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window from apache_beam.testing.analyzers.perf_analysis_utils import is_edge_change_point - from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert + from apache_beam.testing.analyzers.perf_analysis_utils import is_sibling_change_point from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive from apache_beam.testing.analyzers.perf_analysis_utils import filter_change_points_by_median_threshold from apache_beam.testing.analyzers.perf_analysis_utils import find_change_points from apache_beam.testing.analyzers.perf_analysis_utils import find_latest_change_point_index from apache_beam.testing.analyzers.perf_analysis_utils import validate_config from apache_beam.testing.load_tests import load_test_metrics_utils + except ImportError as e: raise unittest.SkipTest('Missing dependencies to run perf analysis tests.') @@ -50,23 +52,23 @@ def get_fake_data_with_no_change_point(*args, **kwargs): num_samples = 20 metric_values = [1] * num_samples - timestamps = list(range(num_samples)) - return metric_values, timestamps + timestamps = [pd.Timestamp(i) for i in range(num_samples)] + return MetricContainer(metric_values, timestamps) def get_fake_data_with_change_point(*args, **kwargs): # change point will be at index 13. num_samples = 20 metric_values = [0] * 12 + [3] + [4] * 7 - timestamps = [i for i in range(num_samples)] - return metric_values, timestamps + timestamps = [pd.Timestamp(i) for i in range(num_samples)] + return MetricContainer(metric_values, timestamps) def get_existing_issue_data(**kwargs): # change point found at index 13. So passing 13 in the # existing issue data in mock method. return pd.DataFrame([{ - constants._CHANGE_POINT_TIMESTAMP_LABEL: 13, + constants._CHANGE_POINT_TIMESTAMP_LABEL: pd.Timestamp(13), constants._ISSUE_NUMBER: np.array([0]) }]) @@ -77,7 +79,7 @@ def setUp(self) -> None: self.multiple_change_point_series = self.single_change_point_series + [ 2 ] * 20 - self.timestamps = list(range(5)) + self.timestamps = [pd.Timestamp(i) for i in range(5)] self.params = { 'test_description': 'fake_description', 'metrics_dataset': 'fake_dataset', @@ -123,30 +125,33 @@ def test_validate_config(self): def test_duplicate_change_point(self): change_point_index = 2 min_runs_between_change_points = 1 - is_alert = is_perf_alert( + is_alert = is_sibling_change_point( previous_change_point_timestamps=[self.timestamps[0]], timestamps=self.timestamps, change_point_index=change_point_index, - min_runs_between_change_points=min_runs_between_change_points) + min_runs_between_change_points=min_runs_between_change_points, + test_id=self.test_id) self.assertTrue(is_alert) def test_duplicate_change_points_are_not_valid_alerts(self): change_point_index = 2 min_runs_between_change_points = 1 - is_alert = is_perf_alert( + is_alert = is_sibling_change_point( previous_change_point_timestamps=[self.timestamps[3]], timestamps=self.timestamps, change_point_index=change_point_index, - min_runs_between_change_points=min_runs_between_change_points) + min_runs_between_change_points=min_runs_between_change_points, + test_id=self.test_id) self.assertFalse(is_alert) - is_alert = is_perf_alert( + is_alert = is_sibling_change_point( previous_change_point_timestamps=[ self.timestamps[0], self.timestamps[3] ], timestamps=self.timestamps, change_point_index=change_point_index, - min_runs_between_change_points=min_runs_between_change_points) + min_runs_between_change_points=min_runs_between_change_points, + test_id=self.test_id) self.assertFalse(is_alert) @mock.patch.object( @@ -154,9 +159,10 @@ def test_duplicate_change_points_are_not_valid_alerts(self): 'fetch_metric_data', get_fake_data_with_no_change_point) def test_no_alerts_when_no_change_points(self): + test_config_container = analysis.get_test_config_container( + params=self.params, test_id=self.test_id) is_alert = analysis.run_change_point_analysis( - params=self.params, - test_id=self.test_id, + test_config_container=test_config_container, big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertFalse(is_alert) @@ -176,9 +182,10 @@ def test_no_alerts_when_no_change_points(self): '.create_performance_alert', return_value=(0, '')) def test_alert_on_data_with_change_point(self, *args): + test_config_container = analysis.get_test_config_container( + params=self.params, test_id=self.test_id) is_alert = analysis.run_change_point_analysis( - params=self.params, - test_id=self.test_id, + test_config_container=test_config_container, big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertTrue(is_alert) @@ -197,24 +204,32 @@ def test_alert_on_data_with_change_point(self, *args): 'apache_beam.testing.analyzers.perf_analysis.create_performance_alert', return_value=(0, '')) def test_alert_on_data_with_reported_change_point(self, *args): + test_config_container = analysis.get_test_config_container( + params=self.params, test_id=self.test_id) is_alert = analysis.run_change_point_analysis( - params=self.params, - test_id=self.test_id, + test_config_container=test_config_container, big_query_metrics_fetcher=BigQueryMetricsFetcher()) self.assertFalse(is_alert) def test_change_point_has_anomaly_marker_in_gh_description(self): - metric_values, timestamps = get_fake_data_with_change_point() - timestamps = [datetime.datetime.fromtimestamp(ts) for ts in timestamps] + metric_container = get_fake_data_with_change_point() + metric_values = metric_container.values change_point_index = find_latest_change_point_index(metric_values) - description = github_issues_utils.get_issue_description( + test_config_container = TestConfigContainer( + project=self.params['project'], + metrics_dataset=self.params['metrics_dataset'], + metrics_table=self.params['metrics_table'], + metric_name=self.params['metric_name'], test_id=self.test_id, - test_name=self.params.get('test_name', None), test_description=self.params['test_description'], - metric_name=self.params['metric_name'], - metric_values=metric_values, - timestamps=timestamps, + test_name=self.params.get('test_name', None), + labels=self.params.get('labels', None), + ) + + description = github_issues_utils.get_issue_description( + test_config_container=test_config_container, + metric_container=metric_container, change_point_index=change_point_index, max_results_to_display=( constants._NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION)) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py index 91c339a766d2..2b89ac9fdba9 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py @@ -31,7 +31,6 @@ from google.cloud import bigquery from apache_beam.testing.analyzers import constants -from apache_beam.testing.analyzers import github_issues_utils from apache_beam.testing.load_tests import load_test_metrics_utils from apache_beam.testing.load_tests.load_test_metrics_utils import BigQueryMetricsPublisher from signal_processing_algorithms.energy_statistics.energy_statistics import e_divisive @@ -54,6 +53,54 @@ class GitHubIssueMetaData: change_point: float +@dataclass +class ChangePointConfig: + """ + This class holds the change point configuration parameters. + """ + min_runs_between_change_points: int = ( + constants._DEFAULT_MIN_RUNS_BETWEEN_CHANGE_POINTS) + num_runs_in_change_point_window: int = ( + constants._DEFAULT_NUM_RUMS_IN_CHANGE_POINT_WINDOW) + + +@dataclass +class TestConfigContainer: + metric_name: str # make this list instead. + project: str + metrics_dataset: str + metrics_table: str + test_id: str # unique id for each test config. + test_description: str + test_name: Optional[str] = None + labels: Optional[List[str]] = None + + +@dataclass +class MetricContainer: + """ + This class holds the metric values and timestamps for a given metric. + Args: + metric_values: List of metric values. + timestamps: List of pandas timestamps corresponding to the metric values. + """ + + values: List[Union[int, float]] + timestamps: List[pd.Timestamp] + + def sort_by_timestamp(self, in_place=True): + """ + Sorts the metric values and timestamps in ascending order wrt timestamps. + Args: + in_place: If True, sort the metric values and timestamps in place. + """ + timestamps, values = zip(*sorted(zip(self.timestamps, self.values))) + if not in_place: + return MetricContainer(values=values, timestamps=timestamps) + self.timestamps, self.values = zip(*sorted( + zip(self.timestamps, self.values))) + + def is_change_point_in_valid_window( num_runs_in_change_point_window: int, latest_change_point_run: int) -> bool: return num_runs_in_change_point_window > latest_change_point_run @@ -81,12 +128,16 @@ def get_existing_issues_data(table_name: str) -> Optional[pd.DataFrame]: return existing_issue_data -def is_perf_alert( +def is_sibling_change_point( previous_change_point_timestamps: List[pd.Timestamp], change_point_index: int, timestamps: List[pd.Timestamp], - min_runs_between_change_points: int) -> bool: + min_runs_between_change_points: int, + test_id: str, +) -> bool: """ + Sibling change points are the change points that are close to each other. + Search the previous_change_point_timestamps with current observed change point sibling window and determine if it is a duplicate change point or not. @@ -105,6 +156,18 @@ def is_perf_alert( for previous_change_point_timestamp in previous_change_point_timestamps: if (sibling_change_point_min_timestamp <= previous_change_point_timestamp <= sibling_change_point_max_timestamp): + logging.info( + 'Performance regression/improvement found for the test ID: %s. ' + 'Since the change point timestamp %s ' + 'lies within the sibling change point window: %s, ' + 'alert is not raised.' % ( + test_id, + previous_change_point_timestamp.strftime('%Y-%m-%d %H:%M:%S'), + ( + sibling_change_point_min_timestamp.strftime( + '%Y-%m-%d %H:%M:%S'), + sibling_change_point_max_timestamp.strftime( + '%Y-%m-%d %H:%M:%S')))) return False return True @@ -161,12 +224,16 @@ def find_latest_change_point_index(metric_values: List[Union[float, int]]): return change_point_index -def publish_issue_metadata_to_big_query(issue_metadata, table_name): +def publish_issue_metadata_to_big_query( + issue_metadata, + table_name, + project=constants._BQ_PROJECT_NAME, +): """ Published issue_metadata to BigQuery with table name. """ bq_metrics_publisher = BigQueryMetricsPublisher( - project_name=constants._BQ_PROJECT_NAME, + project_name=project, dataset=constants._BQ_DATASET, table=table_name, bq_schema=constants._SCHEMA) @@ -177,37 +244,32 @@ def publish_issue_metadata_to_big_query(issue_metadata, table_name): def create_performance_alert( - metric_name: str, - test_id: str, - timestamps: List[pd.Timestamp], - metric_values: List[Union[int, float]], + test_config_container: TestConfigContainer, + metric_container: MetricContainer, change_point_index: int, - labels: List[str], existing_issue_number: Optional[int], - test_description: Optional[str] = None, - test_name: Optional[str] = None, ) -> Tuple[int, str]: """ Creates performance alert on GitHub issues and returns GitHub issue number and issue URL. """ + # avoid circular imports + # pylint: disable=wrong-import-order, wrong-import-position + from apache_beam.testing.analyzers import github_issues_utils + description = github_issues_utils.get_issue_description( - test_id=test_id, - test_name=test_name, - test_description=test_description, - metric_name=metric_name, - timestamps=timestamps, - metric_values=metric_values, + test_config_container=test_config_container, + metric_container=metric_container, change_point_index=change_point_index, max_results_to_display=( constants._NUM_RESULTS_TO_DISPLAY_ON_ISSUE_DESCRIPTION)) issue_number, issue_url = github_issues_utils.report_change_point_on_issues( title=github_issues_utils._ISSUE_TITLE_TEMPLATE.format( - test_id, metric_name + test_config_container.test_id, test_config_container.metric_name ), description=description, - labels=labels, + labels=test_config_container.labels, existing_issue_number=existing_issue_number) logging.info( @@ -263,13 +325,7 @@ def is_edge_change_point( class MetricsFetcher(metaclass=abc.ABCMeta): @abc.abstractmethod def fetch_metric_data( - self, - *, - project, - metrics_dataset, - metrics_table, - metric_name, - test_name=None) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: + self, *, test_config: TestConfigContainer) -> MetricContainer: """ Define SQL query and fetch the timestamp values and metric values from BigQuery tables. @@ -279,22 +335,18 @@ def fetch_metric_data( class BigQueryMetricsFetcher(MetricsFetcher): def fetch_metric_data( - self, - *, - project, - metrics_dataset, - metrics_table, - metric_name, - test_name=None, - ) -> Tuple[List[Union[int, float]], List[pd.Timestamp]]: + self, *, test_config: TestConfigContainer) -> MetricContainer: """ Args: - params: Dict containing keys required to fetch data from a data source. + test_config: TestConfigContainer containing metadata required to fetch + metric data from BigQuery. Returns: - Tuple[List[Union[int, float]], List[pd.Timestamp]]: Tuple containing list - of metric_values and list of timestamps. Both are sorted in ascending - order wrt timestamps. + MetricContainer containing metric values and timestamps. """ + project = test_config.project + metrics_dataset = test_config.metrics_dataset + metrics_table = test_config.metrics_table + metric_name = test_config.metric_name query = f""" SELECT * FROM {project}.{metrics_dataset}.{metrics_table} @@ -305,8 +357,9 @@ def fetch_metric_data( client = bigquery.Client() query_job = client.query(query=query) metric_data = query_job.result().to_dataframe() - metric_data.sort_values( - by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True) - return ( - metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(), - metric_data[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist()) + # metric_data.sort_values( + # by=[load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL], inplace=True) + return MetricContainer( + values=metric_data[load_test_metrics_utils.VALUE_LABEL].tolist(), + timestamps=metric_data[ + load_test_metrics_utils.SUBMIT_TIMESTAMP_LABEL].tolist()) diff --git a/sdks/python/apache_beam/testing/analyzers/tests_config.yaml b/sdks/python/apache_beam/testing/analyzers/tests_config.yaml index ec9cfe6f1ac0..2e72cd5cc301 100644 --- a/sdks/python/apache_beam/testing/analyzers/tests_config.yaml +++ b/sdks/python/apache_beam/testing/analyzers/tests_config.yaml @@ -19,7 +19,7 @@ # {test_id}-{metric_name} pytorch_image_classification_benchmarks-resnet152-mean_inference_batch_latency_micro_secs: - test_description: + test_description: | Pytorch image classification on 50k images of size 224 x 224 with resnet 152. Test link - https://github.com/apache/beam/blob/42d0a6e3564d8b9c5d912428a6de18fb22a13ac1/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L63 Test dashboard - http://metrics.beam.apache.org/d/ZpS8Uf44z/python-ml-runinference-benchmarks?orgId=1&viewPanel=2 @@ -30,7 +30,7 @@ pytorch_image_classification_benchmarks-resnet152-mean_inference_batch_latency_m metric_name: mean_inference_batch_latency_micro_secs pytorch_image_classification_benchmarks-resnet101-mean_load_model_latency_milli_secs: - test_description: + test_description: | Pytorch image classification on 50k images of size 224 x 224 with resnet 101. Test link - https://github.com/apache/beam/blob/42d0a6e3564d8b9c5d912428a6de18fb22a13ac1/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L34 Test dashboard - http://metrics.beam.apache.org/d/ZpS8Uf44z/python-ml-runinference-benchmarks?orgId=1&viewPanel=7 @@ -41,7 +41,7 @@ pytorch_image_classification_benchmarks-resnet101-mean_load_model_latency_milli_ metric_name: mean_load_model_latency_milli_secs pytorch_image_classification_benchmarks-resnet101-mean_inference_batch_latency_micro_secs: - test_description: + test_description: | Pytorch image classification on 50k images of size 224 x 224 with resnet 101. Test link - https://github.com/apache/beam/blob/42d0a6e3564d8b9c5d912428a6de18fb22a13ac1/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L34 Test dashboard - http://metrics.beam.apache.org/d/ZpS8Uf44z/python-ml-runinference-benchmarks?orgId=1&viewPanel=2 @@ -52,7 +52,7 @@ pytorch_image_classification_benchmarks-resnet101-mean_inference_batch_latency_m metric_name: mean_inference_batch_latency_micro_secs pytorch_image_classification_benchmarks-resnet152-GPU-mean_inference_batch_latency_micro_secs: - test_description: + test_description: | Pytorch image classification on 50k images of size 224 x 224 with resnet 152 with Tesla T4 GPU. Test link - https://github.com/apache/beam/blob/42d0a6e3564d8b9c5d912428a6de18fb22a13ac1/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L151 Test dashboard - http://metrics.beam.apache.org/d/ZpS8Uf44z/python-ml-runinference-benchmarks?orgId=1&viewPanel=7 @@ -63,7 +63,7 @@ pytorch_image_classification_benchmarks-resnet152-GPU-mean_inference_batch_laten metric_name: mean_inference_batch_latency_micro_secs pytorch_image_classification_benchmarks-resnet152-GPU-mean_load_model_latency_milli_secs: - test_description: + test_description: | Pytorch image classification on 50k images of size 224 x 224 with resnet 152 with Tesla T4 GPU. Test link - https://github.com/apache/beam/blob/42d0a6e3564d8b9c5d912428a6de18fb22a13ac1/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L151 Test dashboard - http://metrics.beam.apache.org/d/ZpS8Uf44z/python-ml-runinference-benchmarks?orgId=1&viewPanel=7 @@ -74,7 +74,7 @@ pytorch_image_classification_benchmarks-resnet152-GPU-mean_load_model_latency_mi metric_name: mean_load_model_latency_milli_secs pytorch_image_classification_benchmarks-resnet152-GPU-mean_inference_batch_latency_micro_secs: - test_description: + test_description: | Pytorch image classification on 50k images of size 224 x 224 with resnet 152 with Tesla T4 GPU. Test link - https://github.com/apache/beam/blob/42d0a6e3564d8b9c5d912428a6de18fb22a13ac1/.test-infra/jenkins/job_InferenceBenchmarkTests_Python.groovy#L151). Test dashboard - http://metrics.beam.apache.org/d/ZpS8Uf44z/python-ml-runinference-benchmarks?from=now-90d&to=now&viewPanel=2 @@ -85,7 +85,7 @@ pytorch_image_classification_benchmarks-resnet152-GPU-mean_inference_batch_laten metric_name: mean_inference_batch_latency_micro_secs test_cloudml_benchmark_cirteo_no_shuffle_10GB-runtime_sec: - test_description: + test_description: | TFT Criteo test on 10 GB data with no Reshuffle. Test link - [Test link](https://github.com/apache/beam/blob/42d0a6e3564d8b9c5d912428a6de18fb22a13ac1/sdks/python/apache_beam/testing/benchmarks/cloudml/cloudml_benchmark_test.py#L82) metrics_dataset: beam_cloudml @@ -94,7 +94,7 @@ test_cloudml_benchmark_cirteo_no_shuffle_10GB-runtime_sec: metric_name: runtime_sec test_cloudml_benchmark_criteo_10GB-runtime_sec: - test_description: + test_description: | TFT Criteo test on 10 GB data. Test link - https://github.com/apache/beam/blob/42d0a6e3564d8b9c5d912428a6de18fb22a13ac1/sdks/python/apache_beam/testing/benchmarks/cloudml/cloudml_benchmark_test.py#LL104C7-L104C41 metrics_dataset: beam_cloudml @@ -104,7 +104,7 @@ test_cloudml_benchmark_criteo_10GB-runtime_sec: # Python Combine load tests at http://metrics.beam.apache.org/d/WNzYt13Zk/combine-load-tests?orgId=1 combine_python_batch_2gb_10_byte_records: - test_description: + test_description: | Combine Python Load Test 2 GB 10 byte records Test link - https://github.com/apache/beam/blob/5e38decf9e723a385057131b01bbd33d8c60bda3/.test-infra/jenkins/job_LoadTests_Combine_Python.groovy#L76C24-L76C65 Test dashboard - http://metrics.beam.apache.org/d/WNzYt13Zk/combine-load-tests?orgId=1&from=now-90d&to=now&var-processingType=batch&var-sdk=python&viewPanel=2 @@ -115,7 +115,7 @@ combine_python_batch_2gb_10_byte_records: project: apache-beam-testing combine_python_batch_2gb_fanout_4: - test_description: + test_description: | Combine Python Load test - 2GB Fanout 4 Test link - https://github.com/apache/beam/blob/5e38decf9e723a385057131b01bbd33d8c60bda3/.test-infra/jenkins/job_LoadTests_Combine_Python.groovy#L52 Test Dashboard - http://metrics.beam.apache.org/d/WNzYt13Zk/combine-load-tests?orgId=1&from=now-90d&to=now&var-processingType=batch&var-sdk=python&viewPanel=4 @@ -126,7 +126,8 @@ combine_python_batch_2gb_fanout_4: project: apache-beam-testing combine_python_batch_2gb_fanout_8: - test_description: Combine Python Load test - 2GB Fanout 8 + test_description: | + Combine Python Load test - 2GB Fanout 8 test_target: apache_beam.testing.load_tests.combine_test metrics_dataset: load_test metrics_table: python_dataflow_batch_combine_5 @@ -135,7 +136,7 @@ combine_python_batch_2gb_fanout_8: # Python Batch GBK load tests at http://metrics.beam.apache.org/d/UYZ-oJ3Zk/gbk-load-tests?orgId=1&var-processingType=batch&var-sdk=python&from=now-90d&to=now gbk_python_batch_load_test_2gb_of_10B_records: - test_description: + test_description: | GroupByKey Python Load test - 2GB of 10B records python | GBK | Small records (10B) Test Dashboard - http://metrics.beam.apache.org/d/UYZ-oJ3Zk/gbk-load-tests?orgId=1&var-processingType=batch&var-sdk=python&from=now-90d&to=now&viewPanel=2 @@ -147,7 +148,7 @@ gbk_python_batch_load_test_2gb_of_10B_records: project: apache-beam-testing gbk_python_batch_load_test_2gb_of_100B_records: - test_description: + test_description: | GroupByKey Python Load test - 2GB of 100B records python | GBK | Medium records (100B) Test Dashboard - http://metrics.beam.apache.org/d/UYZ-oJ3Zk/gbk-load-tests?orgId=1&var-processingType=batch&var-sdk=python&from=now-90d&to=now&viewPanel=3 @@ -159,7 +160,7 @@ gbk_python_batch_load_test_2gb_of_100B_records: project: apache-beam-testing gbk_python_batch_load_test_2gb_of_100KB_records: - test_description: + test_description: | GroupByKey Python Load test - 2GB of 100kB records python | GBK | Large records (100kB) Test Dashboard - http://metrics.beam.apache.org/d/UYZ-oJ3Zk/gbk-load-tests?orgId=1&var-processingType=batch&var-sdk=python&from=now-6M&to=now&viewPanel=4&inspect=4 @@ -173,7 +174,7 @@ gbk_python_batch_load_test_2gb_of_100KB_records: gbk_python_batch_load_test_fanout_4_times_with_2GB_10byte_records_total: # this test looks little noisy. Remove this if it causes too many false # positives. - test_description: + test_description: | GroupByKey Python Load test - fanout 4 times with 2GB 10-byte records total python | GBK | Fanout 4 Test Dashboard - http://metrics.beam.apache.org/d/UYZ-oJ3Zk/gbk-load-tests?orgId=1&var-processingType=batch&var-sdk=python&from=now-90d&to=now&viewPanel=5 @@ -188,7 +189,7 @@ gbk_python_batch_load_test_fanout_4_times_with_2GB_10byte_records_total: gbk_python_batch_load_test_fanout_8_times_with_2GB_10byte_records_total: # this test looks little noisy. Remove this if it causes too many false # positives. - test_description: + test_description: | GroupByKey Python Load test - fanout 8 times with 2GB 10-byte records total python | GBK | Fanout 8 Test Dashboard - http://metrics.beam.apache.org/d/UYZ-oJ3Zk/gbk-load-tests?orgId=1&var-processingType=batch&var-sdk=python&from=now-90d&to=now&viewPanel=6 @@ -201,7 +202,7 @@ gbk_python_batch_load_test_fanout_8_times_with_2GB_10byte_records_total: # Python SideInput load tests at http://metrics.beam.apache.org/d/-E9aGlFGk/side-input-load-tests?orgId=1&from=now-90d&to=now sideinpts_python_batch_1gb_1kb_10workers_1000window_1key_percent_dict: - test_description: + test_description: | python | Side Input | 1 GB dictionary, 1% of keys, 1000 fixed windows Test Link - https://github.com/apache/beam/blob/5e38decf9e723a385057131b01bbd33d8c60bda3/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy#L120 Test Dashboard - http://metrics.beam.apache.org/d/-E9aGlFGk/side-input-load-tests?orgId=1&from=now-90d&to=now&viewPanel=8 @@ -212,7 +213,7 @@ sideinpts_python_batch_1gb_1kb_10workers_1000window_1key_percent_dict: sideinpts_python_batch_1gb_1kb_10workers_1000window_99key_percent_dict: - test_description: + test_description: | python | Side Input | 1 GB dictionary, 99% of keys, 1000 fixed windows Test Link - https://github.com/apache/beam/blob/5e38decf9e723a385057131b01bbd33d8c60bda3/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy#L133 Test Dashboard - http://metrics.beam.apache.org/d/-E9aGlFGk/side-input-load-tests?orgId=1&from=now-90d&to=now&viewPanel=9 @@ -222,7 +223,7 @@ sideinpts_python_batch_1gb_1kb_10workers_1000window_99key_percent_dict: project: apache-beam-testing sideinpts_python_batch_10gb_1kb_10workers_1000window_first_iterable: - test_description: + test_description: | python | Side Input | 10 GB iterable, 1% of elements, 1000 fixed windows Test Link - https://github.com/apache/beam/blob/5e38decf9e723a385057131b01bbd33d8c60bda3/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy#L146 Test Dashboard - http://metrics.beam.apache.org/d/-E9aGlFGk/side-input-load-tests?orgId=1&from=now-90d&to=now&viewPanel=10 @@ -233,7 +234,7 @@ sideinpts_python_batch_10gb_1kb_10workers_1000window_first_iterable: sideinpts_python_batch_10gb_1kb_10workers_1000window_first_iterable: - test_description: + test_description: | python | Side Input | 10 GB iterable, all elements, 1000 fixed windows Test Link - https://github.com/apache/beam/blob/5e38decf9e723a385057131b01bbd33d8c60bda3/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy#L159 Test Dashboard - http://metrics.beam.apache.org/d/-E9aGlFGk/side-input-load-tests?orgId=1&from=now-90d&to=now&viewPanel=11 @@ -245,7 +246,7 @@ sideinpts_python_batch_10gb_1kb_10workers_1000window_first_iterable: # Python CoGBK load tests at http://metrics.beam.apache.org/d/fK0U4JqWz/cogbk-load-tests?orgId=1&var-processingType=batch&var-sdk=python cogbk_python_batch_load_test_2GB_of_100B_records_with_a_single_key: - test_description: + test_description: | CoGroupByKey Python Load test - 2GB of 100B records with a single key python | coGBK | 100B records with a single key Test Link - https://github.com/apache/beam/blob/5e38decf9e723a385057131b01bbd33d8c60bda3/.test-infra/jenkins/job_LoadTests_coGBK_Python.groovy#L32C25-L32C76 @@ -257,7 +258,7 @@ cogbk_python_batch_load_test_2GB_of_100B_records_with_a_single_key: project: apache-beam-testing cogbk_python_batch_load_test_2GB_of_100B_records_with_a_multiple_key: - test_description: + test_description: | CoGroupByKey Python Load test - 2GB of 100B records with multiple keys python | coGBK | 100B records with multiple keys @@ -270,7 +271,7 @@ cogbk_python_batch_load_test_2GB_of_100B_records_with_a_multiple_key: project: apache-beam-testing cogbk_python_batch_load_test_reiterate_4times_10KB_values: - test_description: + test_description: | CoGroupByKey Python Load test - reiterate 4 times 10kB values python | coGBK | reiteration 10kB value Test Link - https://github.com/apache/beam/blob/5e38decf9e723a385057131b01bbd33d8c60bda3/.test-infra/jenkins/job_LoadTests_coGBK_Python.groovy#L96 @@ -281,7 +282,7 @@ cogbk_python_batch_load_test_reiterate_4times_10KB_values: project: apache-beam-testing cogbk_python_batch_load_test_reiterate_4times_2MB_values: - test_description: + test_description: | CoGroupByKey Python Load test - reiterate 4 times 2 MB values python | coGBK | reiteration 2MB value Test Link - https://github.com/apache/beam/blob/5e38decf9e723a385057131b01bbd33d8c60bda3/.test-infra/jenkins/job_LoadTests_coGBK_Python.groovy#L128