Skip to content

Commit

Permalink
Rewrite for handling multiple tests, specified
Browse files Browse the repository at this point in the history
in YAML file
  • Loading branch information
Bernard Szabo committed Dec 10, 2024
1 parent 8d0f263 commit 1ffb368
Showing 1 changed file with 51 additions and 83 deletions.
134 changes: 51 additions & 83 deletions tubular/scripts/dd_synthetic_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,13 @@
from dataclasses import dataclass

import click
import json
import logging
import os
import requests
import time
import sys

@dataclass
class SyntheticTestRequest:
'''
Data Transfer Object (DTO) for requesting that a predefined Datadog synthetic test be run
'''
test_name: str
test_id: str


class DatadogClient:
''' Invokes datadog API to run and monitor synthetic tests '''

Expand All @@ -26,22 +18,25 @@ class DatadogClient:
def __init__(self, api_key, app_key):
self.api_key = api_key
self.app_key = app_key
self.test_run_id = None
self.trigger_time = None

def trigger_synthetic_tests(self, test_requests: [SyntheticTestRequest]) -> str:
def trigger_synthetic_tests(self, test_requests: [dict]) -> str:
'''
Trigger running one or more synthetic tests.
Trigger running of one or more synthetic tests.
:param test_requests: List of tests to be run
:return: An opaque aggregate run request identifier
:return: None, but saves opaque test run ID as object attribute
'''
if self.test_run_id:
raise Exception("Datadog error: tests already triggered")

url = f"{self.DATADOG_SYNTHETIC_TESTS_API_URL}/trigger"
headers = {
"Content-Type": "application/json",
"DD-API-KEY": self.api_key,
"DD-APPLICATION-KEY": self.app_key
}
# TODO: For Hello, world exercise, just run one test; in general, need to run all of them
json_request_body = {"tests": [{"public_id": test_requests[0].test_id}]}

json_request_body = {"tests": [{"public_id": t.id} for t in test_requests]}
response = requests.post(url, headers=headers, json=json_request_body)

if response.status_code != 200:
Expand All @@ -51,48 +46,51 @@ def trigger_synthetic_tests(self, test_requests: [SyntheticTestRequest]) -> str:
response_body = response.json()
result = response_body['results'][0]
aggregate_test_run_id = result['result_id']
return aggregate_test_run_id
logging.info(f"Datadog test run launched: {aggregate_test_run_id}")
self.test_run_id = aggregate_test_run_id
self.trigger_time = time.time() # Key timeouts off of this

except Exception as e:
raise Exception("Unexpected Datadog results: " + str(e))
raise Exception("Datadog error on triggering tests: " + str(e))

def get_test_results(self, test_run_id, test_requests):
def get_failed_tests(self, test_requests):
'''
Poll for completion of all tests triggered as part of the "test_run_id" test run
:param test_run_id: Opaque identifier for the aggregate test run
:param test_requests: A list of the tests that were requested with this aggregate test run
:return: Currently, just a True/False bool for the last test in the aggregate test run; TODO - return
a dictionary of results with results for all the tests that ran
:return: A list of the test ids for the tests that failed; Empty list if all tests passed
'''
json_test_run_results = None
for request in test_requests:
json_test_run_results = self._get_result_via_polling(test_run_id, request.test_id)
failed_tests = []
for test in test_requests:
test_result = self._poll_for_test_result(test['id'])
if test_result == False:
failed_tests.add(test)

return failed_tests

return json_test_run_results # TODO: Create and return array of results; don't just return the last one
# ***************** Private methods **********************

def _get_result_via_polling(self, test_run_id, test_id):
def _poll_for_test_result(self, test_id):
"""
Poll for aggregate test run completion within the maximum allowable time.
Returns the JSON structure with all test run results.
Poll for test run completion within the maximum allowable time for the specified test.
Returns None if still running; otherwise, returns True on test success and False on test failure.
"""
start_time = time.time()
single_test_json_result = None
while single_test_json_result is None and (time.time() - start_time) < (self.MAX_ALLOWABLE_TIME_SECS):
test_result = None
while test_result is None and (time.time() - self.trigger_time) < (self.MAX_ALLOWABLE_TIME_SECS):
time.sleep(5) # Poll every 5 seconds
logging.info("*** Testing whether DD result is available ***")
single_test_json_result = self._get_single_test_json_result(test_run_id, test_id)
test_result = self._get_test_result(test_id)

if single_test_json_result is None:
if test_result is None:
raise Exception("The test run timed out.")

logging.info(f"*** Test result found: {single_test_json_result=} ***")
return single_test_json_result
return test_result

def _get_single_test_json_result(self, test_run_id, test_id):
def _get_test_result(self, test_id):
"""
returns JSON structure if test run has completed; returns None otherwise
returns JSON structure with test results for all tests in the test run
if the test run has completed; returns None otherwise
"""
url = f"{self.DATADOG_SYNTHETIC_TESTS_API_URL}/{test_id}/results"
url = f"{self.DATADOG_SYNTHETIC_TESTS_API_URL}/{test_id}/results/{self.test_run_id}"
headers = {
"DD-API-KEY": self.api_key,
"DD-APPLICATION-KEY": self.app_key
Expand All @@ -104,36 +102,8 @@ def _get_single_test_json_result(self, test_run_id, test_id):
return None

response_json = response.json()
logging.info(f"*** {test_run_id=} ***")
# return response_json
return self._get_pass_fail_result(response_json, test_run_id)

@staticmethod
def _json_get_test_run_id(json_response):
"""
Extract test run ID from trigger test run result.
"""
key = "result_id"
if key in json_response:
return json_response[key]
raise Exception("Trigger request failed")

@staticmethod
def _get_pass_fail_result(input_json, test_run_id):
"""
Extracts pass/fail result from the specific test run in the aggregate test run result set
"""
aggregate_results = input_json['results']
result = [r for r in aggregate_results if r['result_id'] == test_run_id]
logging.info(f"*** Parsing pass/fail: {result[0]=} ***")
if result is not None:
test_run_data = result[0]['result']
logging.info(f"*** Found result for {test_run_id=}: {test_run_data} ***")
pass_fail = test_run_data['passed']
return pass_fail

raise Exception("Failed to find test result in test run")

logging.info(f'Response for test {test_id} = {response_json}')
return response_json['passed']

"""
Command-line script to run Datadog synthetic tests in the production enviornment and then slack notify and/or roll back
Expand All @@ -155,15 +125,10 @@ def _get_pass_fail_result(input_json, test_run_id):

def run_synthetic_tests(enable_automated_rollbacks, slack_notification_channel):
'''
:param enable_automated_rollbacks:
:param slack_notification_channel:
:return:
'''
PUBLIC_TEST_ID = "sad-hqu-h33"

logging.info("****** In run_synthetic_tests")

if enable_automated_rollbacks:
logging.Error("Automated rollbacks are not yet supported")
sys.exit(1)
Expand All @@ -175,20 +140,23 @@ def run_synthetic_tests(enable_automated_rollbacks, slack_notification_channel):
dd_client = DatadogClient(api_key, app_key)

# Prepare and trigger the synthetic test request
test_requests = [SyntheticTestRequest("Hello, world test", PUBLIC_TEST_ID)]
logging.info("*** Triggering synthetic tests ***")
test_run_id = dd_client.trigger_synthetic_tests(test_requests)
# PUBLIC_TEST_ID = "sad-hqu-h33"
tests_to_run = json.loads(os.getenv("TESTS_TO_RUN"))
logging.info(f"Running the following tests: {str(tests_to_run)}")

dd_client.trigger_synthetic_tests(tests_to_run)

failed_tests = dd_client.get_failed_tests()
for failed_test in failed_tests:
logging.warning(f'Test {failed_test["name"]}({failed_test["id"]}) failed')

# Fetch and print the test results
logging.info("*** Getting test results ***")
test_results = dd_client.get_test_results(test_run_id, test_requests)
logging.info(f"****** Test results: {test_results}")
task_failed_code = 1 if failed_tests else 0

except Exception as e:
print("An error occurred:", str(e))
exit()
logging.error("GoCD/Datadog integration error: ", str(e))
task_failed_code = 1

sys.exit(0)
sys.exit(task_failed_code)

if __name__ == "__main__":
run_synthetic_tests(False, None) # pylint: disable=no-value-for-parameter

0 comments on commit 1ffb368

Please sign in to comment.