Skip to content

Commit

Permalink
linting
Browse files Browse the repository at this point in the history
  • Loading branch information
kubealex committed Jul 10, 2023
1 parent 8dcde5d commit dd42658
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 170 deletions.
141 changes: 77 additions & 64 deletions plugins/modules/eda_activations.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,116 +81,129 @@

def get_project_id(controller_url, controller_user, controller_password, project_name):
url = f"{controller_url}/api/eda/v1/projects/?name={project_name.replace(' ', '+')}"
response = requests.get(url, auth=(controller_user, controller_password), verify=False)
response = requests.get(
url, auth=(controller_user, controller_password), verify=False
)
if response.status_code in (200, 201):
projects = response.json().get('results', [])
projects = response.json().get("results", [])
if projects:
return int(projects[0].get('id'))
return int(projects[0].get("id"))
return None


def get_denv_id(controller_url, controller_user, controller_password, decision_env):
url = f"{controller_url}/api/eda/v1/decision-environments/?name={decision_env.replace(' ', '+')}"
response = requests.get(url, auth=(controller_user, controller_password), verify=False)
response = requests.get(
url, auth=(controller_user, controller_password), verify=False
)
if response.status_code in (200, 201):
denvs = response.json().get('results', [])
denvs = response.json().get("results", [])
if denvs:
return int(denvs[0].get('id'))
return int(denvs[0].get("id"))
return None


def create_activations(module):
# Extract input parameters from the module object
controller_url = module.params['controller_url']
controller_user = module.params['controller_user']
controller_password = module.params['controller_password']
activations = module.params['activations']
controller_url = module.params["controller_url"]
controller_user = module.params["controller_user"]
controller_password = module.params["controller_password"]
activations = module.params["activations"]

response_list = []

for activation in activations:
project_name = activation.get('project_name')
decision_env = activation.get('decision_env')
enabled = activation.get('enabled', True)
restart_policy = activation.get('restart_policy', 'always')
activation_name = activation['name']
rulebook_name = activation['rulebook']
project_name = activation.get("project_name")
decision_env = activation.get("decision_env")
enabled = activation.get("enabled", True)
restart_policy = activation.get("restart_policy", "always")
activation_name = activation["name"]
rulebook_name = activation["rulebook"]

if not project_name:
module.fail_json(msg="Project name is required for each activation.")
if not decision_env:
module.fail_json(msg="Decision environment is required for each activation.")
module.fail_json(
msg="Decision environment is required for each activation."
)

project_id = get_project_id(controller_url, controller_user, controller_password, project_name)
project_id = get_project_id(
controller_url, controller_user, controller_password, project_name
)
if project_id is None:
module.fail_json(msg=f"Project '{project_name}' not found.")

denv_id = get_denv_id(controller_url, controller_user, controller_password, decision_env)
denv_id = get_denv_id(
controller_url, controller_user, controller_password, decision_env
)
if denv_id is None:
module.fail_json(msg=f"Decision environment '{decision_env}' not found.")

rulebook_list = []
extra_vars_list = []

# Retrieve rulebooks
url = f"{controller_url}/api/eda/v1/rulebooks/?project_id={project_id}"
response = requests.get(url, auth=(controller_user, controller_password), verify=False)
rulebook_url = f"{controller_url}/api/eda/v1/rulebooks/?project_id={project_id}"
response = requests.get(
rulebook_url, auth=(controller_user, controller_password), verify=False
)
if response.status_code in (200, 201):
rulebooks = response.json().get('results', [])
rulebooks = response.json().get("results", [])
for rulebook in rulebooks:
if rulebook['name'] == rulebook_name:
rulebook_list.append({'name': activation_name, 'id': int(rulebook['id'])})
if rulebook["name"] == rulebook_name:
rulebook_list.append(int(rulebook["id"]))
break

# Create extra vars for activations
if 'extra_vars' in activation and activation['extra_vars']:
url = f"{controller_url}/api/eda/v1/extra-vars/"
body = {"extra_var": activation['extra_vars']}
response = requests.post(url, auth=(controller_user, controller_password),
json=body, verify=False)
if response.status_code in (200, 201):
extra_vars_list.append({'name': activation_name, 'var_id': int(response.json().get('id'))})

# Join rulebook_list and extra_vars_list
activations_list = []
for rulebook in rulebook_list:
activation = {
'name': rulebook['name'],
'project_id': project_id,
'decision_environment_id': denv_id,
'rulebook_id': rulebook['id'],
'restart_policy': restart_policy,
'is_enabled': enabled
}
for extra_vars in extra_vars_list:
if extra_vars['name'] == rulebook['name']:
activation['extra_var_id'] = extra_vars['var_id']
break
activations_list.append(activation)
if not rulebook_list:
module.fail_json(
msg=f"Rulebook '{rulebook_name}' not found in project '{project_name}'."
)

# Create activations for given project
url = f"{controller_url}/api/eda/v1/activations/"
for activation in activations_list:
activations_url = f"{controller_url}/api/eda/v1/activations/"
headers = {"Content-Type": "application/json"}
for rulebook_id in rulebook_list:
body = {
'restart_policy': activation['restart_policy'],
'is_enabled': activation['is_enabled'],
'name': activation['name'],
'project_id': activation['project_id'],
'decision_environment_id': activation['decision_environment_id'],
'rulebook_id': activation['rulebook_id'],
"name": activation_name,
"project_id": project_id,
"decision_environment_id": denv_id,
"rulebook_id": rulebook_id,
"restart_policy": restart_policy,
"is_enabled": enabled,
}
if 'extra_var_id' in activation:
body['extra_var_id'] = activation['extra_var_id']
headers = {'Content-Type': 'application/json'}
response = requests.post(url, auth=(controller_user, controller_password),
json=body, headers=headers, verify=False)
if "extra_vars" in activation and activation["extra_vars"]:
vars_url = f"{controller_url}/api/eda/v1/extra-vars/"
extra_vars_body = {"extra_var": activation["extra_vars"]}
response = requests.post(
vars_url,
auth=(controller_user, controller_password),
json=extra_vars_body,
headers=headers,
verify=False,
timeout=15,
)
if response.status_code in (200, 201):
body["extra_var_id"] = response.json().get("id")

response = requests.post(
activations_url,
auth=(controller_user, controller_password),
json=body,
headers=headers,
verify=False,
timeout=15,
)
if response.status_code in (200, 201):
response_list.append(response.json())
elif response.status_code == 400 and "already exists" in ' '.join(response.json().get("name")):
response_list.append(response.json())
else:
module.fail_json(
msg=f"Failed to create activation '{activation_name}' for project '{project_name}'. RESPONSE: {response.json()}"
)

module.exit_json(changed=True, activations=response_list)



def main():
module_args = dict(
controller_url=dict(type='str', required=True),
Expand Down
89 changes: 50 additions & 39 deletions plugins/modules/eda_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,51 +86,65 @@

def check_credential_exists(controller_url, controller_user, controller_password, name):
url = f"{controller_url}/api/eda/v1/credentials/?name={name}"
response = requests.get(url, auth=(controller_user, controller_password), verify=False)
response = requests.get(
url, auth=(controller_user, controller_password), verify=False
)
if response.status_code in (200, 201):
count = response.json().get('count', 0)
count = response.json().get("count", 0)
if count > 0:
credential_id = response.json().get('results', [{}])[0].get('id')
credential_id = response.json().get("results", [{}])[0].get("id")
return int(credential_id) if credential_id else None
return None


def create_or_update_credentials(module):
# Extract input parameters from the module object
controller_url = module.params['controller_url']
controller_user = module.params['controller_user']
controller_password = module.params['controller_password']
credentials = module.params['credentials']
controller_url = module.params["controller_url"]
controller_user = module.params["controller_user"]
controller_password = module.params["controller_password"]
credentials = module.params["credentials"]

response_list = []

for credential in credentials:
name = credential['name']
description = credential.get('description')
username = credential['username']
secret = credential['secret']
credential_type = credential['credential_type']
name = credential["name"]
description = credential.get("description")
username = credential["username"]
secret = credential["secret"]
credential_type = credential["credential_type"]

# Check if credential exists
credential_id = check_credential_exists(controller_url, controller_user, controller_password, name)
credential_id = check_credential_exists(
controller_url, controller_user, controller_password, name
)

# Prepare request body
body = {
'name': name,
'credential_type': credential_type,
'username': username,
'secret': secret
"name": name,
"credential_type": credential_type,
"username": username,
"secret": secret,
}
if description:
body['description'] = description
body["description"] = description

# Create or update credential
url = f"{controller_url}/api/eda/v1/credentials/"
if credential_id:
url += f"{credential_id}/"
response = requests.patch(url, auth=(controller_user, controller_password), json=body, verify=False)
response = requests.patch(
url,
auth=(controller_user, controller_password),
json=body,
verify=False,
)
else:
response = requests.post(url, auth=(controller_user, controller_password), json=body, verify=False)
response = requests.post(
url,
auth=(controller_user, controller_password),
json=body,
verify=False,
)

if response.status_code in (200, 201):
response_list.append(response.json())
Expand All @@ -145,41 +159,38 @@ def create_or_update_credentials(module):

def main():
module_args = dict(
controller_url=dict(type='str', required=True),
controller_user=dict(type='str', required=True),
controller_password=dict(type='str', required=True, no_log=True),
controller_url=dict(type="str", required=True),
controller_user=dict(type="str", required=True),
controller_password=dict(type="str", required=True, no_log=True),
credentials=dict(
type='list',
type="list",
required=True,
elements='dict',
elements="dict",
options=dict(
description=dict(type='str', required=False),
name=dict(type='str', required=True),
username=dict(type='str', required=True),
secret=dict(type='str', required=True, no_log=True),
description=dict(type="str", required=False),
name=dict(type="str", required=True),
username=dict(type="str", required=True),
secret=dict(type="str", required=True, no_log=True),
credential_type=dict(
type='str',
type="str",
required=True,
choices=[
'GitHub Personal Access Token',
'GitLab Personal Access Token',
'Container Registry'
]
"GitHub Personal Access Token",
"GitLab Personal Access Token",
"Container Registry",
],
),
),
),
)

module = AnsibleModule(
argument_spec=module_args,
supports_check_mode=False
)
module = AnsibleModule(argument_spec=module_args, supports_check_mode=False)

try:
create_or_update_credentials(module)
except Exception as e:
module.fail_json(msg=str(e))


if __name__ == '__main__':
if __name__ == "__main__":
main()
Loading

0 comments on commit dd42658

Please sign in to comment.