forked from ansible/galaxy_ng
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add management command to dump auth config data to a file (ansible#2082)
* Add management command to dump auth config data to a file https://issues.redhat.com/browse/AAP-19981 No-Issue
- Loading branch information
1 parent
5d56bdd
commit 67eab54
Showing
2 changed files
with
226 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
import json | ||
import os | ||
import sys | ||
from django.core.management.base import BaseCommand | ||
from django.conf import settings | ||
|
||
|
||
class Command(BaseCommand): | ||
KEYCLOAK_KEYS = [ | ||
"SOCIAL_AUTH_KEYCLOAK_ACCESS_TOKEN_URL", | ||
"SOCIAL_AUTH_KEYCLOAK_AUTHORIZATION_URL", | ||
"SOCIAL_AUTH_KEYCLOAK_KEY", | ||
"SOCIAL_AUTH_KEYCLOAK_PUBLIC_KEY", | ||
"SOCIAL_AUTH_KEYCLOAK_SECRET", | ||
] | ||
|
||
LDAP_KEYS = [ | ||
"AUTH_LDAP_SERVER_URI", | ||
"AUTH_LDAP_BIND_DN", | ||
"AUTH_LDAP_BIND_PASSWORD", | ||
"AUTH_LDAP_USER_SEARCH_BASE_DN", | ||
"AUTH_LDAP_USER_SEARCH_SCOPE", | ||
"AUTH_LDAP_USER_SEARCH_FILTER", | ||
"AUTH_LDAP_GROUP_SEARCH_BASE_DN", | ||
"AUTH_LDAP_GROUP_SEARCH_SCOPE", | ||
"AUTH_LDAP_GROUP_SEARCH_FILTER", | ||
] | ||
|
||
help = "Dump auth config data from database to a JSON file" | ||
|
||
def add_arguments(self, parser): | ||
parser.add_argument( | ||
"output_file", | ||
nargs="?", | ||
type=str, | ||
default=None, | ||
help="Output JSON file path", | ||
) | ||
|
||
def is_enabled(self, keys): | ||
values = [] | ||
for key in keys: | ||
values.append(settings.get(key, default=None)) | ||
return all(values) | ||
|
||
def post_config_ldap(self): | ||
post_config = {} | ||
# Other required platform params | ||
post_config["USER_ATTR_MAP"] = settings.get("AUTH_LDAP_USER_ATTR_MAP") | ||
post_config["USER_DN_TEMPLATE"] = settings.get("AUTH_LDAP_USER_DN_TEMPLATE") | ||
post_config["GROUP_TYPE_PARAMS"] = settings.get("AUTH_LDAP_GROUP_TYPE_PARAMS") | ||
post_config["CONNECTION_OPTIONS"] = settings.get("AUTH_LDAP_CONNECTION_OPTIONS") | ||
post_config["START_TLS"] = settings.get("AUTH_LDAP_START_TLS") | ||
|
||
# Configure USER_SEARCH and GROUP_SEARCH | ||
AUTH_LDAP_USER_SEARCH_BASE_DN = settings.get("AUTH_LDAP_USER_SEARCH_BASE_DN", default=None) | ||
AUTH_LDAP_USER_SEARCH_SCOPE = settings.get("AUTH_LDAP_USER_SEARCH_SCOPE", default=None) | ||
AUTH_LDAP_USER_SEARCH_FILTER = settings.get("AUTH_LDAP_USER_SEARCH_FILTER", default=None) | ||
AUTH_LDAP_GROUP_SEARCH_BASE_DN = settings.get( | ||
"AUTH_LDAP_GROUP_SEARCH_BASE_DN", | ||
default=None | ||
) | ||
AUTH_LDAP_GROUP_SEARCH_SCOPE = settings.get("AUTH_LDAP_GROUP_SEARCH_SCOPE", default=None) | ||
AUTH_LDAP_GROUP_SEARCH_FILTER = settings.get("AUTH_LDAP_GROUP_SEARCH_FILTER", default=None) | ||
|
||
post_config["USER_SEARCH"] = [ | ||
AUTH_LDAP_USER_SEARCH_BASE_DN, | ||
AUTH_LDAP_USER_SEARCH_SCOPE, | ||
AUTH_LDAP_USER_SEARCH_FILTER, | ||
] | ||
|
||
post_config["GROUP_SEARCH"] = [ | ||
AUTH_LDAP_GROUP_SEARCH_BASE_DN, | ||
AUTH_LDAP_GROUP_SEARCH_SCOPE, | ||
AUTH_LDAP_GROUP_SEARCH_FILTER, | ||
] | ||
|
||
# Configure GROUP_TYPE | ||
post_config["GROUP_TYPE"] = None | ||
AUTH_LDAP_GROUP_TYPE = settings.get("AUTH_LDAP_GROUP_TYPE") | ||
if AUTH_LDAP_GROUP_TYPE: | ||
post_config["GROUP_TYPE"] = type(AUTH_LDAP_GROUP_TYPE).__name__ | ||
|
||
return post_config | ||
|
||
def format_config_data(self, type, keys, prefix): | ||
config = { | ||
"type": f"galaxy.authentication.authenticator_plugins.{type}", | ||
"enabled": self.is_enabled(keys), | ||
"configuration": {}, | ||
} | ||
for key in keys: | ||
k = key | ||
if prefix in key: | ||
k = key[len(prefix):] | ||
v = settings.get(key, default=None) | ||
config["configuration"].update({k: v}) | ||
|
||
# handle post configuration for ldap: | ||
if type == "ldap": | ||
config["configuration"].update(self.post_config_ldap()) | ||
|
||
return config | ||
|
||
def handle(self, *args, **options): | ||
try: | ||
data = [] | ||
|
||
# Add Keycloak auth config | ||
data.append( | ||
self.format_config_data( | ||
"keycloak", | ||
self.KEYCLOAK_KEYS, | ||
"SOCIAL_AUTH_KEYCLOAK_"), | ||
) | ||
|
||
# Add LDAP auth config | ||
data.append(self.format_config_data("ldap", self.LDAP_KEYS, "AUTH_LDAP_")) | ||
|
||
# write to file if requested | ||
if options["output_file"]: | ||
# Define the path for the output JSON file | ||
output_file = options["output_file"] | ||
|
||
# Ensure the directory exists | ||
os.makedirs(os.path.dirname(output_file), exist_ok=True) | ||
|
||
# Write data to the JSON file | ||
with open(output_file, "w") as f: | ||
json.dump(data, f, indent=4) | ||
|
||
self.stdout.write( | ||
self.style.SUCCESS(f"Auth config data dumped to {output_file}") | ||
) | ||
else: | ||
self.stdout.write(json.dumps(data)) | ||
|
||
except Exception as e: | ||
self.stdout.write(self.style.ERROR(f"An error occurred: {str(e)}")) | ||
sys.exit(1) |
86 changes: 86 additions & 0 deletions
86
galaxy_ng/tests/unit/app/management/commands/test_dump_auth_config.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
from io import StringIO | ||
import json | ||
from django.core.management import call_command | ||
from django.test import TestCase, override_settings | ||
|
||
|
||
@override_settings(SOCIAL_AUTH_KEYCLOAK_ACCESS_TOKEN_URL="ACCESS_TOKEN_URL") | ||
@override_settings(SOCIAL_AUTH_KEYCLOAK_AUTHORIZATION_URL="AUTHORIZATION_URL") | ||
@override_settings(SOCIAL_AUTH_KEYCLOAK_KEY="KEY") | ||
@override_settings(SOCIAL_AUTH_KEYCLOAK_PUBLIC_KEY="PUBLIC_KEY") | ||
@override_settings(SOCIAL_AUTH_KEYCLOAK_SECRET="SECRET") | ||
@override_settings(AUTH_LDAP_SERVER_URI="SERVER_URI") | ||
@override_settings(AUTH_LDAP_BIND_DN="BIND_DN") | ||
@override_settings(AUTH_LDAP_BIND_PASSWORD="BIND_PASSWORD") | ||
@override_settings(AUTH_LDAP_USER_DN_TEMPLATE="USER_DN_TEMPLATE") | ||
@override_settings(AUTH_LDAP_USER_SEARCH_BASE_DN="USER_SEARCH_BASE_DN") | ||
@override_settings(AUTH_LDAP_USER_SEARCH_SCOPE="USER_SEARCH_SCOPE") | ||
@override_settings(AUTH_LDAP_USER_SEARCH_FILTER="USER_SEARCH_FILTER") | ||
@override_settings(AUTH_LDAP_GROUP_SEARCH_BASE_DN="GROUP_SEARCH_BASE_DN") | ||
@override_settings(AUTH_LDAP_GROUP_SEARCH_SCOPE="GROUP_SEARCH_SCOPE") | ||
@override_settings(AUTH_LDAP_GROUP_SEARCH_FILTER="GROUP_SEARCH_FILTER") | ||
@override_settings(AUTH_LDAP_GROUP_TYPE_PARAMS="GROUP_TYPE_PARAMS") | ||
@override_settings(AUTH_LDAP_USER_ATTR_MAP={ | ||
"email": "email", | ||
"last_name": "last_name", | ||
"first_name": "first_name", | ||
}) | ||
@override_settings(AUTH_LDAP_CONNECTION_OPTIONS={}) | ||
@override_settings(AUTH_LDAP_START_TLS=None) | ||
@override_settings(AUTH_LDAP_GROUP_TYPE="string object") | ||
class TestDumpAuthConfigCommand(TestCase): | ||
def setUp(self): | ||
super().setUp() | ||
self.expected_config = [ | ||
{ | ||
"type": "galaxy.authentication.authenticator_plugins.keycloak", | ||
"enabled": True, | ||
"configuration": { | ||
"ACCESS_TOKEN_URL": "ACCESS_TOKEN_URL", | ||
"AUTHORIZATION_URL": "AUTHORIZATION_URL", | ||
"KEY": "KEY", | ||
"PUBLIC_KEY": "PUBLIC_KEY", | ||
"SECRET": "SECRET" | ||
} | ||
}, | ||
{ | ||
"type": "galaxy.authentication.authenticator_plugins.ldap", | ||
"enabled": True, | ||
"configuration": { | ||
"SERVER_URI": "SERVER_URI", | ||
"BIND_DN": "BIND_DN", | ||
"BIND_PASSWORD": "BIND_PASSWORD", | ||
"USER_SEARCH_BASE_DN": "USER_SEARCH_BASE_DN", | ||
"USER_SEARCH_SCOPE": "USER_SEARCH_SCOPE", | ||
"USER_SEARCH_FILTER": "USER_SEARCH_FILTER", | ||
"GROUP_SEARCH_BASE_DN": "GROUP_SEARCH_BASE_DN", | ||
"GROUP_SEARCH_SCOPE": "GROUP_SEARCH_SCOPE", | ||
"GROUP_SEARCH_FILTER": "GROUP_SEARCH_FILTER", | ||
"USER_ATTR_MAP": { | ||
"email": "email", | ||
"last_name": "last_name", | ||
"first_name": "first_name" | ||
}, | ||
"USER_DN_TEMPLATE": "USER_DN_TEMPLATE", | ||
"GROUP_TYPE_PARAMS": "GROUP_TYPE_PARAMS", | ||
"CONNECTION_OPTIONS": {}, | ||
"START_TLS": None, | ||
"USER_SEARCH": [ | ||
"USER_SEARCH_BASE_DN", | ||
"USER_SEARCH_SCOPE", | ||
"USER_SEARCH_FILTER" | ||
], | ||
"GROUP_SEARCH": [ | ||
"GROUP_SEARCH_BASE_DN", | ||
"GROUP_SEARCH_SCOPE", | ||
"GROUP_SEARCH_FILTER" | ||
], | ||
"GROUP_TYPE": "str" | ||
} | ||
} | ||
] | ||
|
||
def test_json_returned_from_cmd(self): | ||
output = StringIO() | ||
call_command("dump-auth-config", stdout=output) | ||
assert output.getvalue().rstrip() == json.dumps(self.expected_config) |