Skip to content

Commit

Permalink
Add ForbiddenError
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertoPrevato authored Jun 16, 2023
1 parent ab605fe commit 8fdf2e6
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 27 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.0.2] - 2023-06-16 :corn:
- Raises a more specific exception `ForbiddenError` when the user of an
operation is authenticated properly, but authorization fails.
This enables better handling of authorization error, differentiating when the
user context is missing or invalid, and when the context is valid but the
user has no rights to do a certain operation. See [#371](https://github.com/Neoteroi/BlackSheep/issues/371).

## [1.0.1] - 2023-03-20 :sun_with_face:
- Improves the automatic rotation of `JWKS`: when validating `JWTs`, `JWKS` are
refreshed automatically if an unknown `kid` is encountered, and `JWKS` were
Expand Down
2 changes: 1 addition & 1 deletion guardpost/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.0.1"
__version__ = "1.0.2"
17 changes: 17 additions & 0 deletions guardpost/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ def _is_async_handler(handler_type: Type[Requirement]) -> bool:


class UnauthorizedError(AuthorizationError):
"""
Error class used for all situations in which a user initiating an operation is not
authorized to complete the operation.
"""

def __init__(
self,
forced_failure: Optional[str],
Expand Down Expand Up @@ -85,6 +90,14 @@ def _get_message(forced_failure, failed_requirements):
return "Unauthorized"


class ForbiddenError(UnauthorizedError):
"""
A specific kind of authorization error, used to indicate that the application
understands a request but refuses to authorize it. In other words, the user context
is valid but the user is not authorized to perform a certain operation.
"""


class AuthorizationContext:
__slots__ = ("identity", "requirements", "_succeeded", "_failed_forced")

Expand Down Expand Up @@ -228,6 +241,10 @@ async def _handle_with_policy(self, policy: Policy, identity: Identity, scope: A
requirement.handle(context) # type: ignore

if not context.has_succeeded:
if identity and identity.is_authenticated():
raise ForbiddenError(
context.forced_failure, context.pending_requirements
)
raise UnauthorizedError(
context.forced_failure, context.pending_requirements
)
Expand Down
74 changes: 48 additions & 26 deletions tests/test_authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from guardpost.authorization import (
AuthorizationContext,
AuthorizationStrategy,
ForbiddenError,
Policy,
PolicyNotFoundError,
Requirement,
Expand Down Expand Up @@ -164,32 +165,6 @@ def request_identity_getter(request):
return request.user


@pytest.mark.asyncio
async def test_authorization_identity_getter():
class UserNameRequirement(Requirement):
def __init__(self, expected_name: str):
self.expected_name = expected_name

async def handle(self, context: AuthorizationContext):
assert context.identity is not None

if context.identity.has_claim_value("name", self.expected_name):
context.succeed(self)

auth = get_strategy(
[Policy("user", UserNameRequirement("Tybek"))], request_identity_getter
)

@auth(policy="user")
async def some_method(request: Request):
assert request is not None
return True

value = await some_method(Request(User({"name": "Tybek"})))

assert value is True


@pytest.mark.asyncio
async def test_claims_requirement():
auth = get_strategy(
Expand Down Expand Up @@ -422,3 +397,50 @@ async def some_method():

with raises(TypeError, match="Missing identity getter function."):
await some_method()


class UserNameRequirement(Requirement):
def __init__(self, expected_name: str):
self.expected_name = expected_name

async def handle(self, context: AuthorizationContext):
assert context.identity is not None

if context.identity.has_claim_value("name", self.expected_name):
context.succeed(self)


@pytest.mark.asyncio
async def test_authorization_identity_getter():
auth = get_strategy(
[Policy("user", UserNameRequirement("Tybek"))], request_identity_getter
)

@auth(policy="user")
async def some_method(request: Request):
assert request is not None
return True

value = await some_method(Request(User({"name": "Tybek"})))

assert value is True


@pytest.mark.asyncio
async def test_authorization_identity_getter_forbidden():
auth = get_strategy(
[Policy("user", UserNameRequirement("Tybek"))], request_identity_getter
)

@auth(policy="user")
async def some_method(request: Request):
assert request is not None
return True

with pytest.raises(UnauthorizedError):
await some_method(
Request(User({"some_prop": "Example"}, authentication_mode=None))
)

with pytest.raises(ForbiddenError):
await some_method(Request(User({"name": "Foo"}, authentication_mode="cookie")))

0 comments on commit 8fdf2e6

Please sign in to comment.