diff --git a/dev/scripts.community/check_namespaces_with_validated_data.py b/dev/scripts.community/check_namespaces_with_validated_data.py index 241b7cf241..578dde9245 100644 --- a/dev/scripts.community/check_namespaces_with_validated_data.py +++ b/dev/scripts.community/check_namespaces_with_validated_data.py @@ -19,104 +19,115 @@ CHECKMODE = False -def match_old_user_to_local_user(udata): - old_username = udata['galaxy_username'] - found_user = User.objects.filter(username=old_username).first() - if found_user: - return found_user - - if udata.get('github_login_new'): - found_user = User.objects.filter(username=udata['github_login_new']).first() - if found_user: - return found_user - - if udata.get('github_login'): - found_user = User.objects.filter(username=udata['github_login']).first() - if found_user: - return found_user - - github_login = udata['github_login'] - if udata.get('github_login_new'): - github_login = udata['github_login_new'] - if github_login is None: - github_login = udata['galaxy_username'] - - #if github_login is None: - # import epdb; epdb.st() - - print(f'FIX - create user {github_login}') - if not CHECKMODE: - user,_ = User.objects.get_or_create(username=github_login) - return user - - #import epdb; epdb.st() - return None - +class Fixer: -def do_check(): + def __init__(self): + known_usernames = list(User.objects.values_list('username', flat=True)) + self.known_usernames = dict((x, None) for x in known_usernames) - known_usernames = list(User.objects.values_list('username', flat=True)) - known_usernames = dict((x, None) for x in known_usernames) + known_v1_names = list(LegacyNamespace.objects.values_list('name', flat=True)) + self.known_v1_names = dict((x, None) for x in known_v1_names) - known_v1_names = list(LegacyNamespace.objects.values_list('name', flat=True)) - known_v1_names = dict((x, None) for x in known_v1_names) + known_v3_names = list(Namespace.objects.values_list('name', flat=True)) + self.known_v3_names = dict((x, None) for x in known_v3_names) - known_v3_names = list(Namespace.objects.values_list('name', flat=True)) - known_v3_names = dict((x, None) for x in known_v3_names) + collection_namespaces = list(Collection.objects.values_list('namespace', flat=True)) + self.collection_namespaces = sorted(set(collection_namespaces)) + role_namespaces = list(LegacyRole.objects.values_list('namespace__name', flat=True)) + self.role_namespaces = sorted(set(role_namespaces)) - collection_namespaces = list(Collection.objects.values_list('namespace', flat=True)) - collection_namespaces = sorted(set(collection_namespaces)) - role_namespaces = list(LegacyRole.objects.values_list('namespace__name', flat=True)) - role_namespaces = sorted(set(role_namespaces)) + # compressed for size ... + fn = 'user_namespace_map_validated.json.gz' + with gzip.open(fn, 'rb') as gz_file: + raw = gz_file.read() + self.umap = json.loads(raw) - # compressed for size ... - fn = 'user_namespace_map_validated.json.gz' - with gzip.open(fn, 'rb') as gz_file: - raw = gz_file.read() - umap = json.loads(raw) + self.old_ns_owners = {} + for k, v in self.umap.items(): + for ns_name in v.get('owned_namespaces', []): + if ns_name not in self.old_ns_owners: + self.old_ns_owners[ns_name] = [] + self.old_ns_owners[ns_name].append(k) - old_ns_owners = {} - for k, v in umap.items(): - for ns_name in v.get('owned_namespaces', []): - if ns_name not in old_ns_owners: - old_ns_owners[ns_name] = [] - old_ns_owners[ns_name].append(k) + def write_log_change(self, msg): + with open('changelog.txt', 'a') as f: + f.write(msg + '\n') - for ns_name in sorted(list(old_ns_owners.keys())): - print('-' * 50) - print(ns_name) - print('-' * 50) - - # optimize ... - if ns_name not in collection_namespaces and ns_name not in role_namespaces: - continue - - # figure this out later ... - if ns_name not in known_v1_names and ns_name not in known_v3_names: - continue - - # figure this out later ... - is_valid_v3_name = ns_name in known_v3_names or ns_name == generate_v3_namespace_from_attributes(username=ns_name) - if not is_valid_v3_name: - continue - - v3_namespace = Namespace.objects.filter(name=ns_name).first() - if not v3_namespace: - continue - - current_owners = rbac.get_v3_namespace_owners(v3_namespace) - old_owners = [match_old_user_to_local_user(umap[x]) for x in old_ns_owners[ns_name] if umap[x].get('github_login_verified')] - missing_owners = [x for x in old_owners if x and x not in current_owners] - - # looks good? - if not missing_owners: - continue - - #print(ns_name) - for missing_owner in missing_owners: - print(f'FIX - add {missing_owner} to v3:{v3_namespace} owners') - if not CHECKMODE: - rbac.add_user_to_v3_namespace(missing_owner, v3_namespace) + def match_old_user_to_local_user(self, udata): + old_username = udata['galaxy_username'] + found_user = User.objects.filter(username=old_username).first() + if found_user: + return found_user -do_check() + if udata.get('github_login_new'): + found_user = User.objects.filter(username=udata['github_login_new']).first() + if found_user: + return found_user + + if udata.get('github_login'): + found_user = User.objects.filter(username=udata['github_login']).first() + if found_user: + return found_user + + github_login = udata['github_login'] + if udata.get('github_login_new'): + github_login = udata['github_login_new'] + if github_login is None: + github_login = udata['galaxy_username'] + + print(f'FIX - create user {github_login}') + self.write_log_change(f'FIX - create user {github_login}') + if not CHECKMODE: + user,_ = User.objects.get_or_create(username=github_login) + return user + + #import epdb; epdb.st() + return None + + def do_check(self): + + for ns_name in sorted(list(self.old_ns_owners.keys())): + + print('-' * 50) + print(ns_name) + print('-' * 50) + + # optimize ... + if ns_name not in self.collection_namespaces and ns_name not in self.role_namespaces: + continue + + # figure this out later ... + if ns_name not in self.known_v1_names and ns_name not in self.known_v3_names: + continue + + # figure this out later ... + is_valid_v3_name = ns_name in self.known_v3_names or ns_name == generate_v3_namespace_from_attributes(username=ns_name) + if not is_valid_v3_name: + continue + + v3_namespace = Namespace.objects.filter(name=ns_name).first() + if not v3_namespace: + continue + + current_owners = rbac.get_v3_namespace_owners(v3_namespace) + old_owners = [ + self.match_old_user_to_local_user(self.umap[x]) + for x in self.old_ns_owners[ns_name] if self.umap[x].get('github_login_verified') + ] + missing_owners = [x for x in old_owners if x and x not in current_owners] + + # looks good? + if not missing_owners: + continue + + #print(ns_name) + for missing_owner in missing_owners: + print(f'FIX - add {missing_owner} to v3:{v3_namespace} owners') + self.write_log_change(f'FIX - add {missing_owner} to v3:{v3_namespace} owners') + if not CHECKMODE: + rbac.add_user_to_v3_namespace(missing_owner, v3_namespace) + + +fixer = Fixer() +fixer.do_check()