forked from opensearch-project/opensearch-build
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_cluster.py
148 lines (124 loc) · 4.56 KB
/
test_cluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
import abc
import os
from contextlib import contextmanager
from typing import Any, Generator, List, Tuple
from test_workflow.integ_test.service import ClusterCreationException, Service
from test_workflow.integ_test.service_termination_result import ServiceTerminationResult
from test_workflow.test_recorder.log_recorder import LogRecorder
from test_workflow.test_recorder.test_result_data import TestResultData
class TestCluster(abc.ABC):
work_dir: str
component_name: str
component_test_config: str
security_enabled: bool
additional_cluster_config: dict
save_logs: LogRecorder
all_services: List[Service]
termination_result: ServiceTerminationResult
"""
Abstract base class for all types of test clusters.
"""
def __init__(
self,
work_dir: str,
component_name: str,
component_test_config: str,
security_enabled: bool,
additional_cluster_config: dict,
save_logs: LogRecorder,
cluster_port: int = 9200
) -> None:
self.work_dir = os.path.join(work_dir, "local-test-cluster")
self.component_name = component_name
self.component_test_config = component_test_config
self.security_enabled = security_enabled
self.additional_cluster_config = additional_cluster_config
self.save_logs = save_logs
self.all_services = []
self.termination_result = None
@classmethod
@contextmanager
def create(cls, *args: Any) -> Generator[Tuple[str, int], None, None]:
"""
Set up the cluster. When this method returns, the cluster must be available to take requests.
Throws ClusterCreationException if the cluster could not start for some reason. If this exception is thrown, the caller does not need to call "destroy".
"""
cluster = cls(*args)
try:
cluster.start()
yield cluster.endpoint, cluster.port
finally:
cluster.terminate()
@classmethod
def create_cluster(cls, *args: Any) -> Any:
cluster = cls(*args)
cluster.start()
return cluster
def start(self) -> None:
os.makedirs(self.work_dir, exist_ok=True)
self.all_services = [self.service] + self.dependencies
for service in self.all_services:
service.start()
for service in self.all_services:
if not service.wait_for_service():
self.terminate()
raise ClusterCreationException("Cluster is not available after 10 attempts")
def terminate(self) -> None:
if self.service:
self.termination_result = self.service.terminate()
self.__save_test_result_data(self.termination_result)
self.service.uninstall()
for service in self.dependencies:
termination_result = service.terminate()
self.__save_test_result_data(termination_result)
service.uninstall()
if not self.termination_result:
raise ClusterServiceNotInitializedException()
def __save_test_result_data(self, termination_result: ServiceTerminationResult) -> None:
test_result_data = TestResultData(
self.component_name,
self.component_test_config,
termination_result.return_code,
termination_result.stdout_data,
termination_result.stderr_data,
termination_result.log_files
)
self.save_logs.save_test_result_data(test_result_data)
@property
def endpoint(self) -> str:
"""
Get the host that this cluster is listening on.
"""
return "localhost"
@property
@abc.abstractmethod
def port(self) -> int:
"""
Get the port that this cluster is listening on.
"""
pass
@property
@abc.abstractproperty
def service(self) -> Service:
"""
The main service running in this cluster.
"""
pass
@abc.abstractproperty
def dependencies(self) -> List[Service]:
"""
The dependencies running in this cluster.
"""
pass
class ClusterServiceNotInitializedException(Exception):
"""
Indicates that the service running in the cluster is not initialized.
"""
def __init__(self) -> None:
super().__init__("Service is not initialized")