-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
143 lines (106 loc) · 4.56 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from urllib.parse import urlparse, urlunparse
import copy
import requests
import json
def create_flow(service_specs: list, pod_specs: list, flow_uuid, NEON_PROJECT_ID, NEON_FORK_FROM_BRANCH_ID, NEON_API_KEY):
# main vars
modified_pod_specs = []
dev_branch_id = ""
is_there_one_container_at_least: bool = False
for pod_spec in pod_specs:
modified_pod_spec = copy.deepcopy(pod_spec)
containers = modified_pod_spec.get('containers', [])
# Check if at least one container has been received otherwise the resource creation shouldn't be requested
if containers:
is_there_one_container_at_least = True
break
if is_there_one_container_at_least:
# create the external resource
dev_branch_hostname, dev_branch_id, error = create_neon_branch(NEON_API_KEY, NEON_PROJECT_ID, NEON_FORK_FROM_BRANCH_ID)
if error:
print(f"Error: {error}")
# edit all the pod specs to update the environment variables with the new resource address
for pod_spec in pod_specs:
modified_pod_spec = copy.deepcopy(pod_spec)
containers = modified_pod_spec.get('containers', [])
# Check if there are containers
if containers:
container = modified_pod_spec['containers'][0]
postgres_url = ""
for env in container['env']:
if env.get("name") == "POSTGRES":
postgres_url = env.get("value")
new_postgres_url = update_postgres_url(postgres_url, dev_branch_hostname)
container['env'] = [
{'name': 'POSTGRES', 'value': new_postgres_url},
]
modified_pod_spec['containers'] = [container]
return {
"pod_specs": modified_pod_specs,
"config_map": {
"NEON_API_KEY": NEON_API_KEY,
"NEON_PROJECT_ID": NEON_PROJECT_ID,
"NEON_BRANCH_ID": dev_branch_id,
}
}
def delete_flow(config_map, flow_uuid):
resopnse = delete_neon_branch(config_map["NEON_API_KEY"], config_map["NEON_PROJECT_ID"], config_map["NEON_BRANCH_ID"])
print(resopnse)
return
def create_neon_branch(neon_api_key, project_id, parent_branch_id):
url = f"https://console.neon.tech/api/v2/projects/{project_id}/branches"
json_payload = {
"endpoints": [{"type": "read_write"}],
"branch": {"parent_id": parent_branch_id}
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {neon_api_key}"
}
try:
response = requests.post(url, json=json_payload, headers=headers)
response.raise_for_status()
except requests.RequestException as e:
return "", f"Error sending request: {e}"
if response.status_code != 201:
return "", f"Unexpected status code: {response.status_code}, body: {response.text}"
try:
result = response.json()
except json.JSONDecodeError as e:
return "", f"Error decoding response: {e}"
endpoints = result.get("endpoints", [])
if not endpoints:
return "", "Endpoints not found in response"
first_endpoint = endpoints[0]
host = first_endpoint.get("host")
if not host:
return "", "Host not found in response"
branch_id = result.get("branch", {}).get("id")
if not branch_id:
return "", "", "Branch ID not found in response"
return host, branch_id, None
def delete_neon_branch(neon_api_key, project_id, branch_id):
url = f"https://console.neon.tech/api/v2/projects/{project_id}/branches/{branch_id}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {neon_api_key}"
}
try:
response = requests.delete(url, headers=headers)
response.raise_for_status()
except requests.RequestException as e:
return f"Error sending request: {e}"
if response.status_code != 204:
return f"Unexpected status code: {response.status_code}, body: {response.text}"
return "Branch deleted successfully"
def update_postgres_url(postgres_url, new_hostname):
parsed_url = urlparse(postgres_url)
if parsed_url.username and parsed_url.password:
userinfo = f"{parsed_url.username}:{parsed_url.password}@"
elif parsed_url.username:
userinfo = f"{parsed_url.username}@"
else:
userinfo = ""
new_netloc = f"{userinfo}{new_hostname}:{parsed_url.port}" if parsed_url.port else f"{userinfo}{new_hostname}"
updated_url = urlunparse(parsed_url._replace(netloc=new_netloc))
return updated_url