Skip to content

Commit

Permalink
v1.1.5 LDAP set_users and set_groups fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
guimorone committed Apr 11, 2023
1 parent e23cbde commit 18d6c8f
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
uses: marvinpinto/[email protected]
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
title: 'v1.1.5 ...'
title: 'v1.1.5 LDAP set_users and set_groups fixed'
prerelease: false
automatic_release_tag: v1.1.5
files: |
Expand Down
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,6 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Local files
test.py
# Local test files
test.py
docker-compose.yml
4 changes: 2 additions & 2 deletions pyscora_wrangler/ldap/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ Requires only a config dictionary. An example with types can be seen below:

```python
{
"root_dn": "CN=GS_1,OU=Grupos,DC=service,DC=local", # REQUIRED. Type = str.
"server": "ldap://localhost.389", # REQUIRED. Type = str.
"root_dn": "DC=service,DC=local", # REQUIRED. Type = str.
"server": "ldap://localhost:389", # REQUIRED. Type = str.
"port": 636, # OPTIONAL. Type = int. Default is 389.
"server_alias": ["service.com.br"] # OPTIONAL. Type = List[str]. Default is [].
}
Expand Down
74 changes: 37 additions & 37 deletions pyscora_wrangler/ldap/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def auth(self, username: str, password: str) -> bool:

raise ValueError('Invalid credentials.')

root_dn = self.ldap_config.get('root_dn')
port = int(self.ldap_config.get('port', 389))
server_alias = self.ldap_config.get('server_alias', [])

Expand All @@ -136,7 +137,7 @@ def auth(self, username: str, password: str) -> bool:

self.__ldap_connection = Connection(
server,
user=self.__ldap_username,
user=f'CN={self.__ldap_username},{root_dn}',
password=self.__ldap_password,
authentication='SIMPLE',
raise_exceptions=False,
Expand All @@ -162,61 +163,60 @@ def logout(self) -> None:
logger.error(f'[logout] {err}')

def get_ldap_groups(self) -> List[str]:
"""Returns A list containing the ldap groups."""
"""Returns A list containing the ldap groups"""

return self.__ldap_groups

def get_ldap_users(self) -> List[str]:
"""Returns A list containing the ldap users."""
"""Returns A list containing the ldap users"""

return self.__ldap_users

def set_ldap_users_and_groups(self) -> Tuple[List[str], List[str]] | None:
"""Set the ldap groups and users.
def __set_ldap_arrays(self, search_filter: str) -> List[str]:
root_dn = self.ldap_config.get('root_dn', '')

self.__ldap_connection.search(
search_base=root_dn, search_filter=search_filter, search_scope=SUBTREE, size_limit=0
)

arr = []
for entry in self.__ldap_connection.entries:
arr.append(entry.entry_dn)

return arr

def set_ldap_users(self) -> List[str] | None:
"""Set the ldap users
Returns:
Tuple[List[str], List[str]] | None: Returns None if an error occurs in the process. Otherwise, returns a Tuple the lists of the ldap groups and the ldap users.
List[str] | None: Returns None if an error occurs in the process. Otherwise, returns the list of the ldap users.
"""

if not self.is_user_authenticated():
logger.warning('[set_ldap_users_and_groups] User is not authenticated. Skipping...')
logger.warning('[set_ldap_users] User is not authenticated. Skipping...')
return None

groups = []
root_dn = self.ldap_config.get('root_dn', '')
users_search_filter = '(objectClass=person)'
users_dn = self.__set_ldap_arrays(search_filter=users_search_filter)

try:
self.__ldap_connection.search(
search_base=root_dn,
search_filter='(objectclass=*)',
search_scope=SUBTREE,
attributes=['member'],
size_limit=0,
)
self.__ldap_users = [user.split('=')[1].split(',')[0] for user in users_dn]

response = json.loads(self.__ldap_connection.response_to_json())
return self.get_ldap_users()

if type(response.get('entries')) == list and len(response.get('entries')) > 0:
for entry in response.get('entries'):
for member in entry.member.values:
self.__ldap_connection.search(
search_base=root_dn,
search_filter=f'(distinguishedName={member})',
attributes=['sAMAccountName'],
)
def set_ldap_groups(self) -> List[str] | None:
"""Set the ldap groups
user = self.__ldap_connection.entries[0].sAMAccountName.values

self.__ldap_users.append(user)
Returns:
List[str] | None: Returns None if an error occurs in the process. Otherwise, returns the list of the ldap groups.
"""

cn_groups = response.get('entries')[0].get('attributes').get('member')
if not self.is_user_authenticated():
logger.warning('[set_ldap_users] User is not authenticated. Skipping...')
return None

for cn_group in cn_groups:
groups.append(cn_group.split(',')[0].replace('CN=', ''))
groups_search_filter = '(objectClass=groupOfNames)'
groups_dn = self.__set_ldap_arrays(search_filter=groups_search_filter)

self.__ldap_groups = groups
except Exception as err:
logger.error(f'[set_ldap_users_and_groups] {err}')
return None
self.__ldap_groups = [group.split('=')[1].split(',')[0] for group in groups_dn]

return self.get_ldap_users(), self.get_ldap_groups()
return self.get_ldap_groups()

0 comments on commit 18d6c8f

Please sign in to comment.