Skip to content

Commit

Permalink
TMP
Browse files Browse the repository at this point in the history
  • Loading branch information
jemrobinson committed Feb 29, 2024
1 parent 626674e commit 994573d
Showing 1 changed file with 34 additions and 9 deletions.
43 changes: 34 additions & 9 deletions apricot/ldap/read_only_ldap_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@
LDAPSearchResultEntry,
)
from twisted.internet import defer
from twisted.python import log


class ReadOnlyLDAPServer(LDAPServer):
def __init__(self):
super().__init__()
self.debug = True

def getRootDSE( # noqa: N802
self,
request: LDAPBindRequest,
Expand All @@ -21,111 +26,131 @@ def getRootDSE( # noqa: N802
"""
Handle an LDAP Root RSE request
"""
log.msg(f"Types: request '{type(request)}'; reply '{type(reply)}'")
log.msg(f"Calling getRootDSE with '{request.toWire() if request else 'NoRequest'}' (DN='{request.dn if hasattr(request, 'dn') else 'NoDN'}')")
return super().getRootDSE(request, reply)

def handle_LDAPAddRequest( # noqa: N802
self,
request: LDAPBindRequest,
controls: LDAPControl | None,
controls: list[LDAPControl] | None,
reply: Callable[..., None] | None,
) -> defer.Deferred[ILDAPEntry]:
"""
Refuse to handle an LDAP add request
"""
log.msg(f"Types: request '{type(request)}'; controls '{type(controls)}'; reply '{type(reply)}'")
log.msg(f"Calling LDAP add with '{request.toWire() if request else 'NoRequest'}' '{[c.toWire() for c in controls] if controls else 'NoControls'}' (DN='{request.dn if hasattr(request, 'dn') else 'NoDN'}')")
id((request, controls, reply)) # ignore unused arguments
msg = "ReadOnlyLDAPServer will not handle LDAP add requests"
raise LDAPProtocolError(msg)

def handle_LDAPBindRequest( # noqa: N802
self,
request: LDAPBindRequest,
controls: LDAPControl | None,
controls: list[LDAPControl] | None,
reply: Callable[..., None] | None,
) -> defer.Deferred[ILDAPEntry]:
"""
Handle an LDAP bind request
"""
log.msg(f"Types: request '{type(request)}'; controls '{type(controls)}'; reply '{type(reply)}'")
log.msg(f"Calling LDAP bind with '{request.toWire() if request else 'NoRequest'}' '{[c.toWire() for c in controls] if controls else 'NoControls'}' (DN='{request.dn if hasattr(request, 'dn') else 'NoDN'}')")
return super().handle_LDAPBindRequest(request, controls, reply)

def handle_LDAPCompareRequest( # noqa: N802
self,
request: LDAPBindRequest,
controls: LDAPControl | None,
controls: list[LDAPControl] | None,
reply: Callable[..., None] | None,
) -> defer.Deferred[ILDAPEntry]:
"""
Handle an LDAP compare request
"""
log.msg(f"Types: request '{type(request)}'; controls '{type(controls)}'; reply '{type(reply)}'")
log.msg(f"Calling LDAP compare with '{request.toWire() if request else 'NoRequest'}' '{[c.toWire() for c in controls] if controls else 'NoControls'}' (DN='{request.dn if hasattr(request, 'dn') else 'NoDN'}')")
return super().handle_LDAPCompareRequest(request, controls, reply)

def handle_LDAPDelRequest( # noqa: N802
self,
request: LDAPBindRequest,
controls: LDAPControl | None,
controls: list[LDAPControl] | None,
reply: Callable[..., None] | None,
) -> defer.Deferred[ILDAPEntry]:
"""
Refuse to handle an LDAP delete request
"""
log.msg(f"Types: request '{type(request)}'; controls '{type(controls)}'; reply '{type(reply)}'")
log.msg(f"Calling LDAP del with '{request.toWire() if request else 'NoRequest'}' '{[c.toWire() for c in controls] if controls else 'NoControls'}' (DN='{request.dn if hasattr(request, 'dn') else 'NoDN'}')")
id((request, controls, reply)) # ignore unused arguments
msg = "ReadOnlyLDAPServer will not handle LDAP delete requests"
raise LDAPProtocolError(msg)

def handle_LDAPExtendedRequest( # noqa: N802
self,
request: LDAPBindRequest,
controls: LDAPControl | None,
controls: list[LDAPControl] | None,
reply: Callable[..., None] | None,
) -> defer.Deferred[ILDAPEntry]:
"""
Handle an LDAP extended request
"""
log.msg(f"Types: request '{type(request)}'; controls '{type(controls)}'; reply '{type(reply)}'")
log.msg(f"Calling LDAP extended with '{request.toWire() if request else 'NoRequest'}' '{[c.toWire() for c in controls] if controls else 'NoControls'}' (DN='{request.dn if hasattr(request, 'dn') else 'NoDN'}')")
return super().handle_LDAPExtendedRequest(request, controls, reply)

def handle_LDAPModifyDNRequest( # noqa: N802
self,
request: LDAPBindRequest,
controls: LDAPControl | None,
controls: list[LDAPControl] | None,
reply: Callable[..., None] | None,
) -> defer.Deferred[ILDAPEntry]:
"""
Refuse to handle an LDAP modify DN request
"""
log.msg(f"Types: request '{type(request)}'; controls '{type(controls)}'; reply '{type(reply)}'")
log.msg(f"Calling LDAP modify DN with '{request.toWire() if request else 'NoRequest'}' '{[c.toWire() for c in controls] if controls else 'NoControls'}' (DN='{request.dn if hasattr(request, 'dn') else 'NoDN'}')")
id((request, controls, reply)) # ignore unused arguments
msg = "ReadOnlyLDAPServer will not handle LDAP modify DN requests"
raise LDAPProtocolError(msg)

def handle_LDAPModifyRequest( # noqa: N802
self,
request: LDAPBindRequest,
controls: LDAPControl | None,
controls: list[LDAPControl] | None,
reply: Callable[..., None] | None,
) -> defer.Deferred[ILDAPEntry]:
"""
Refuse to handle an LDAP modify request
"""
log.msg(f"Types: request '{type(request)}'; controls '{type(controls)}'; reply '{type(reply)}'")
log.msg(f"Calling LDAP modify with '{request.toWire() if request else 'NoRequest'}' '{[c.toWire() for c in controls] if controls else 'NoControls'}' (DN='{request.dn if hasattr(request, 'dn') else 'NoDN'}')")
id((request, controls, reply)) # ignore unused arguments
msg = "ReadOnlyLDAPServer will not handle LDAP modify requests"
raise LDAPProtocolError(msg)

def handle_LDAPUnbindRequest( # noqa: N802
self,
request: LDAPBindRequest,
controls: LDAPControl | None,
controls: list[LDAPControl] | None,
reply: Callable[..., None] | None,
) -> None:
"""
Handle an LDAP unbind request
"""
log.msg(f"Types: request '{type(request)}'; controls '{type(controls)}'; reply '{type(reply)}'")
log.msg(f"Calling LDAP unbind with '{request.toWire() if request else 'NoRequest'}' '{[c.toWire() for c in controls] if controls else 'NoControls'}' (DN='{request.dn if hasattr(request, 'dn') else 'NoDN'}')")
super().handle_LDAPUnbindRequest(request, controls, reply)

def handle_LDAPSearchRequest( # noqa: N802
self,
request: LDAPBindRequest,
controls: LDAPControl | None,
controls: list[LDAPControl] | None,
reply: Callable[[LDAPSearchResultEntry], None] | None,
) -> defer.Deferred[ILDAPEntry]:
"""
Handle an LDAP search request
"""
log.msg(f"Types: request '{type(request)}'; controls '{type(controls)}'; reply '{type(reply)}'")
log.msg(f"Calling LDAP search with '{request.toWire() if request else 'NoRequest'}' '{[c.toWire() for c in controls] if controls else 'NoControls'}' (DN='{request.dn if hasattr(request, 'dn') else 'NoDN'}')")
return super().handle_LDAPSearchRequest(request, controls, reply)

0 comments on commit 994573d

Please sign in to comment.