Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement token revocation endpoint #84

Merged
merged 1 commit into from
Apr 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions aioauth/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,13 @@ class InvalidRedirectURIError(OAuth2Error[TRequest]):
"""

error: ErrorType = "invalid_request"


class UnsupportedTokenTypeError(OAuth2Error[TRequest]):
"""
The authorization server does not support the revocation of the presented
token type. That is, the client tried to revoke an access token on a server
not supporting this feature.
"""

error: ErrorType = "unsupported_token_type"
75 changes: 75 additions & 0 deletions aioauth/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
TemporarilyUnavailableError,
UnsupportedGrantTypeError,
UnsupportedResponseTypeError,
UnsupportedTokenTypeError,
)
from .grant_type import (
AuthorizationCodeGrantType,
Expand Down Expand Up @@ -447,3 +448,77 @@ async def authorize(request: fastapi.Request) -> fastapi.Response:
headers=HTTPHeaderDict({"location": location}),
content=content,
)

@catch_errors_and_unavailability()
async def revoke_token(self, request: TRequest) -> Response:
"""Endpoint to revoke an access token or refresh token.
For more information see
`RFC7009 <https://tools.ietf.org/html/rfc7009>`_.

Note:
The API endpoint that leverages this function is usually
``/revoke``.
Example:
Below is an example utilizing FastAPI as the server framework.
.. code-block:: python

from aioauth_fastapi.utils import to_oauth2_request, to_fastapi_response

@app.post("/revoke")
async def revoke(request: fastapi.Request) -> fastapi.Response:
# Converts a fastapi.Request to an aioauth.Request.
oauth2_request: aioauth.Request = await to_oauth2_request(request)
# Creates the response via this function call.
oauth2_response: aioauth.Response = await server.revoke_token(oauth2_request)
# Converts an aioauth.Response to a fastapi.Response.
response: fastapi.Response = await to_fastapi_response(oauth2_response)
return response

Args:
request: An :py:class:`aioauth.requests.Request` object.

Returns:
response: An :py:class:`aioauth.responses.Response` object.
"""
self.validate_request(request, ["POST"])
client_id, _ = self.get_client_credentials(request)

if not request.post.token:
raise InvalidRequestError[TRequest](
request=request, description="Request is missing token."
)

if request.post.token_type_hint and request.post.token_type_hint not in {
"refresh_token",
"access_token",
}:
raise UnsupportedTokenTypeError[TRequest](request=request)

access_token = (
request.post.token
if request.post.token_type_hint != "refresh_token"
else None
)
refresh_token = (
request.post.token
if request.post.token_type_hint != "access_token"
else None
)

token = await self.storage.get_token(
request=request,
client_id=client_id,
access_token=access_token,
refresh_token=refresh_token,
token_type=request.post.token_type_hint,
)

if token:
await self.storage.revoke_token(
request=request,
access_token=access_token,
refresh_token=refresh_token,
token_type=request.post.token_type_hint,
)

return Response(status_code=HTTPStatus.NO_CONTENT)
8 changes: 7 additions & 1 deletion aioauth/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,13 @@ async def delete_authorization_code(
"Method delete_authorization_code must be implemented for AuthorizationCodeGrantType"
)

async def revoke_token(self, request: TRequest, refresh_token: str) -> None:
async def revoke_token(
self,
request: TRequest,
token_type: Optional[TokenType] = "refresh_token",
access_token: Optional[str] = None,
refresh_token: Optional[str] = None,
) -> None:
"""Revokes a token's from the database.

Note:
Expand Down
1 change: 1 addition & 0 deletions aioauth/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"unauthorized_client",
"unsupported_grant_type",
"unsupported_response_type",
"unsupported_token_type",
"insecure_transport",
"mismatching_state",
"method_is_not_allowed",
Expand Down
10 changes: 9 additions & 1 deletion tests/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,19 @@ async def create_token(
self.tokens.append(token)
return token

async def revoke_token(self, request: Request, refresh_token: str) -> None:
async def revoke_token(
self,
request: Request,
token_type: Optional[TokenType] = "refresh_token",
access_token: Optional[str] = None,
refresh_token: Optional[str] = None,
) -> None:
tokens = self.tokens
for key, token_ in enumerate(tokens):
if token_.refresh_token == refresh_token:
tokens[key] = replace(token_, revoked=True)
elif token_.access_token == access_token:
tokens[key] = replace(token_, revoked=True)

async def get_token(
self,
Expand Down
64 changes: 64 additions & 0 deletions tests/test_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,67 @@ async def test_endpoint_availability(context_factory):
response = await server.create_token_introspection_response(request)
assert response.status_code == HTTPStatus.BAD_REQUEST
assert response.content["error"] == "temporarily_unavailable"


@pytest.mark.asyncio
async def test_revoke_refresh_token(context: AuthorizationContext):
client = context.clients[0]
client_id = client.client_id
client_secret = client.client_secret

settings = context.settings
token = context.initial_tokens[0]
server = context.server

post = Post(token=token.refresh_token, token_type_hint="refresh_token")
request = Request(
post=post,
method="POST",
headers=encode_auth_headers(client_id, client_secret),
settings=settings,
)

response = await server.revoke_token(request)
assert response.status_code == HTTPStatus.NO_CONTENT

# Check that the token was revoked
request = Request(
settings=settings,
post=post,
method="POST",
headers=encode_auth_headers(client_id, client_secret),
)
response = await server.create_token_introspection_response(request)
assert not response.content["active"], "The refresh_token must be revoked"


@pytest.mark.asyncio
async def test_revoke_access_token(context: AuthorizationContext):
client = context.clients[0]
client_id = client.client_id
client_secret = client.client_secret

settings = context.settings
token = context.initial_tokens[0]
server = context.server

post = Post(token=token.access_token, token_type_hint="access_token")
request = Request(
post=post,
method="POST",
headers=encode_auth_headers(client_id, client_secret),
settings=settings,
)

response = await server.revoke_token(request)
assert response.status_code == HTTPStatus.NO_CONTENT

# Check that the token was revoked
request = Request(
settings=settings,
post=post,
method="POST",
headers=encode_auth_headers(client_id, client_secret),
)
response = await server.create_token_introspection_response(request)
assert not response.content["active"], "The access_token must be revoked"