diff --git a/tubular/scripts/dd_synthetic_tests.py b/tubular/scripts/dd_synthetic_tests.py index de0af83f..dbab25ea 100644 --- a/tubular/scripts/dd_synthetic_tests.py +++ b/tubular/scripts/dd_synthetic_tests.py @@ -2,21 +2,13 @@ from dataclasses import dataclass import click +import json import logging import os import requests import time import sys -@dataclass -class SyntheticTestRequest: - ''' - Data Transfer Object (DTO) for requesting that a predefined Datadog synthetic test be run - ''' - test_name: str - test_id: str - - class DatadogClient: ''' Invokes datadog API to run and monitor synthetic tests ''' @@ -26,22 +18,25 @@ class DatadogClient: def __init__(self, api_key, app_key): self.api_key = api_key self.app_key = app_key + self.test_run_id = None + self.trigger_time = None - def trigger_synthetic_tests(self, test_requests: [SyntheticTestRequest]) -> str: + def trigger_synthetic_tests(self, test_requests: [dict]) -> str: ''' - Trigger running one or more synthetic tests. + Trigger running of one or more synthetic tests. :param test_requests: List of tests to be run - :return: An opaque aggregate run request identifier + :return: None, but saves opaque test run ID as object attribute ''' + if self.test_run_id: + raise Exception("Datadog error: tests already triggered") + url = f"{self.DATADOG_SYNTHETIC_TESTS_API_URL}/trigger" headers = { "Content-Type": "application/json", "DD-API-KEY": self.api_key, "DD-APPLICATION-KEY": self.app_key } - # TODO: For Hello, world exercise, just run one test; in general, need to run all of them - json_request_body = {"tests": [{"public_id": test_requests[0].test_id}]} - + json_request_body = {"tests": [{"public_id": t.id} for t in test_requests]} response = requests.post(url, headers=headers, json=json_request_body) if response.status_code != 200: @@ -51,48 +46,51 @@ def trigger_synthetic_tests(self, test_requests: [SyntheticTestRequest]) -> str: response_body = response.json() result = response_body['results'][0] aggregate_test_run_id = result['result_id'] - return aggregate_test_run_id + logging.info(f"Datadog test run launched: {aggregate_test_run_id}") + self.test_run_id = aggregate_test_run_id + self.trigger_time = time.time() # Key timeouts off of this except Exception as e: - raise Exception("Unexpected Datadog results: " + str(e)) + raise Exception("Datadog error on triggering tests: " + str(e)) - def get_test_results(self, test_run_id, test_requests): + def get_failed_tests(self, test_requests): ''' Poll for completion of all tests triggered as part of the "test_run_id" test run :param test_run_id: Opaque identifier for the aggregate test run - :param test_requests: A list of the tests that were requested with this aggregate test run - :return: Currently, just a True/False bool for the last test in the aggregate test run; TODO - return - a dictionary of results with results for all the tests that ran + :return: A list of the test ids for the tests that failed; Empty list if all tests passed ''' - json_test_run_results = None - for request in test_requests: - json_test_run_results = self._get_result_via_polling(test_run_id, request.test_id) + failed_tests = [] + for test in test_requests: + test_result = self._poll_for_test_result(test['id']) + if test_result == False: + failed_tests.add(test) + + return failed_tests - return json_test_run_results # TODO: Create and return array of results; don't just return the last one + # ***************** Private methods ********************** - def _get_result_via_polling(self, test_run_id, test_id): + def _poll_for_test_result(self, test_id): """ - Poll for aggregate test run completion within the maximum allowable time. - Returns the JSON structure with all test run results. + Poll for test run completion within the maximum allowable time for the specified test. + Returns None if still running; otherwise, returns True on test success and False on test failure. """ start_time = time.time() - single_test_json_result = None - while single_test_json_result is None and (time.time() - start_time) < (self.MAX_ALLOWABLE_TIME_SECS): + test_result = None + while test_result is None and (time.time() - self.trigger_time) < (self.MAX_ALLOWABLE_TIME_SECS): time.sleep(5) # Poll every 5 seconds - logging.info("*** Testing whether DD result is available ***") - single_test_json_result = self._get_single_test_json_result(test_run_id, test_id) + test_result = self._get_test_result(test_id) - if single_test_json_result is None: + if test_result is None: raise Exception("The test run timed out.") - logging.info(f"*** Test result found: {single_test_json_result=} ***") - return single_test_json_result + return test_result - def _get_single_test_json_result(self, test_run_id, test_id): + def _get_test_result(self, test_id): """ - returns JSON structure if test run has completed; returns None otherwise + returns JSON structure with test results for all tests in the test run + if the test run has completed; returns None otherwise """ - url = f"{self.DATADOG_SYNTHETIC_TESTS_API_URL}/{test_id}/results" + url = f"{self.DATADOG_SYNTHETIC_TESTS_API_URL}/{test_id}/results/{self.test_run_id}" headers = { "DD-API-KEY": self.api_key, "DD-APPLICATION-KEY": self.app_key @@ -104,36 +102,8 @@ def _get_single_test_json_result(self, test_run_id, test_id): return None response_json = response.json() - logging.info(f"*** {test_run_id=} ***") - # return response_json - return self._get_pass_fail_result(response_json, test_run_id) - - @staticmethod - def _json_get_test_run_id(json_response): - """ - Extract test run ID from trigger test run result. - """ - key = "result_id" - if key in json_response: - return json_response[key] - raise Exception("Trigger request failed") - - @staticmethod - def _get_pass_fail_result(input_json, test_run_id): - """ - Extracts pass/fail result from the specific test run in the aggregate test run result set - """ - aggregate_results = input_json['results'] - result = [r for r in aggregate_results if r['result_id'] == test_run_id] - logging.info(f"*** Parsing pass/fail: {result[0]=} ***") - if result is not None: - test_run_data = result[0]['result'] - logging.info(f"*** Found result for {test_run_id=}: {test_run_data} ***") - pass_fail = test_run_data['passed'] - return pass_fail - - raise Exception("Failed to find test result in test run") - + logging.info(f'Response for test {test_id} = {response_json}') + return response_json['passed'] """ Command-line script to run Datadog synthetic tests in the production enviornment and then slack notify and/or roll back @@ -155,15 +125,10 @@ def _get_pass_fail_result(input_json, test_run_id): def run_synthetic_tests(enable_automated_rollbacks, slack_notification_channel): ''' - :param enable_automated_rollbacks: :param slack_notification_channel: :return: ''' - PUBLIC_TEST_ID = "sad-hqu-h33" - - logging.info("****** In run_synthetic_tests") - if enable_automated_rollbacks: logging.Error("Automated rollbacks are not yet supported") sys.exit(1) @@ -175,20 +140,23 @@ def run_synthetic_tests(enable_automated_rollbacks, slack_notification_channel): dd_client = DatadogClient(api_key, app_key) # Prepare and trigger the synthetic test request - test_requests = [SyntheticTestRequest("Hello, world test", PUBLIC_TEST_ID)] - logging.info("*** Triggering synthetic tests ***") - test_run_id = dd_client.trigger_synthetic_tests(test_requests) + # PUBLIC_TEST_ID = "sad-hqu-h33" + tests_to_run = json.loads(os.getenv("TESTS_TO_RUN")) + logging.info(f"Running the following tests: {str(tests_to_run)}") + + dd_client.trigger_synthetic_tests(tests_to_run) + + failed_tests = dd_client.get_failed_tests() + for failed_test in failed_tests: + logging.warning(f'Test {failed_test["name"]}({failed_test["id"]}) failed') - # Fetch and print the test results - logging.info("*** Getting test results ***") - test_results = dd_client.get_test_results(test_run_id, test_requests) - logging.info(f"****** Test results: {test_results}") + task_failed_code = 1 if failed_tests else 0 except Exception as e: - print("An error occurred:", str(e)) - exit() + logging.error("GoCD/Datadog integration error: ", str(e)) + task_failed_code = 1 - sys.exit(0) + sys.exit(task_failed_code) if __name__ == "__main__": run_synthetic_tests(False, None) # pylint: disable=no-value-for-parameter