JWTConfig¶
JWT расшифровывается как JSON Web Token. Его можно использовать с любым middleware на ваш выбор, которое реализует BaseAuthMiddleware.
Tip
Больше информации о JWT тут.
Зависимости¶
Esmerald использует pyjwt
и passlib
для интеграции с JWT. Вы можете установить их, выполнив:
$ pip install esmerald[jwt]
JWTConfig и приложение¶
Для использования JWTConfig с middleware.
from myapp.models import User
from esmerald import Esmerald, settings
from esmerald.config.jwt import JWTConfig
from esmerald.contrib.auth.edgy.middleware import JWTAuthMiddleware
from lilya.middleware import DefineMiddleware as LilyaMiddleware
jwt_config = JWTConfig(
signing_key=settings.secret_key,
)
auth_middleware = LilyaMiddleware(JWTAuthMiddleware, config=jwt_config, user_model=User)
app = Esmerald(middleware=[auth_middleware])
Info
В примере используется JWTAuthMiddleware из Esmerald с Edgy ORM.
Параметры¶
Все параметры и значения по умолчанию доступны в справке по JWTConfig.
JWTConfig и настройки приложения¶
Конфигурацию JWTConfig можно выполнить напрямую через инициализацию приложения, а также с помощью настроек.
from typing import TYPE_CHECKING, List
from esmerald import EsmeraldAPISettings
from esmerald.config.jwt import JWTConfig
from esmerald.contrib.auth.edgy.middleware import JWTAuthMiddleware
from lilya._internal._module_loading import import_string
from lilya.middleware import DefineMiddleware as LilyaMiddleware
if TYPE_CHECKING:
from esmerald.types import Middleware
class CustomSettings(EsmeraldAPISettings):
@property
def jwt_config(self) -> JWTConfig:
"""
A JWT object configuration to be passed to the application middleware
"""
return JWTConfig(signing_key=self.secret_key, auth_header_types=["Bearer", "Token"])
@property
def middleware(self) -> List["Middleware"]:
"""
Initial middlewares to be loaded on startup of the application.
"""
return [
LilyaMiddleware(
JWTAuthMiddleware,
config=self.jwt_config,
user_model=import_string("myapp.models.User"),
)
]
Это поможет вам поддерживать настройки в чистоте, без перегруженного экземпляра Esmerald.
Модель токена¶
Esmerald предоставляет стандартный объект токена, который позволяет легко генерировать и декодировать токены.
from esmerald.security.jwt.token import Token
token = Token(exp=..., iat=..., sub=...)
Параметры являются стандартными для Python JOSE, так что вы можете чувствовать себя комфортно, используя их.
Генерация токена (кодирование)¶
Токен предоставляет стандартные операции для взаимодействия с pyjwt
.
from esmerald.security.jwt.token import Token
from esmerald.conf import settings
# Создание модели токена
token = Token(exp=..., iat=..., sub=...)
# Генерация JWT токена
jwt_token = Token.encode(key=settings.secret_key, algorithm="HS256", **claims)
Декодирование токена (decode)¶
Функция декодирования также предоставляется.
from esmerald.security.jwt.token import Token
from esmerald.conf import settings
# Декодирование JWT токена
jwt_token = Token.decode(token=..., key=settings.secret_key, algorithms=["HS256"])
Метод Token.decode
возвращает объект Token.
Note
Эта функциональность сильно зависит от библиотеки pyjwt
, но её использование не является обязательным.
Вы можете использовать любую библиотеку, которая соответствует вашим требованиям.
Esmerald просто предлагает примеры и альтернативы.
Поля claims¶
Параметр **claims
может быть очень полезен, особенно если вы хотите генерировать токены
access
и refresh
.
При использовании claims
вы можете передавать любые дополнительные параметры, которые после
декодирования будут доступны для манипуляций.
Интеграция с базой данных содержит пример выполнения таких операций, но давайте рассмотрим еще пример.
Мы будем использовать middleware и сгенерируем access_token
и refresh_token
для определённого API.
Предположим следующие вещи:
- Есть модель
User
в файлеaccounts/models.py
. - Контроллеры находятся в
accounts/controllers.py
. - Мы будем наследовать существующий middleware для упрощения.
middleware
находится в файлеaccounts/middleware.py
.- JWTConfig уже настроен в файле с настройками.
- Класс
Token
будет унаследован для добавления дополнительных параметров, таких какtoken_type
. - В файле
accounts/backends.py
находятся операции для аутентификации и обновления токена.
Класс Token¶
Вы должны создать подкласс для Token
, если хотите добавить дополнительные параметры для своих нужд. Например,
чтобы иметь дополнительный параметр token_type
, указывающий, является ли токен access
, refresh
или каким-либо другим типом, который вы хотите использовать для ваших claims.
Пример может выглядеть так:
from typing import Union
from esmerald.security.jwt.token import Token as EsmeraldToken
class Token(EsmeraldToken):
token_type: Union[str, None] = None
Это будет особенно полезно на следующих этапах, так как мы будем использовать token_type
,
чтобы различать access_token
и refresh_token
.
Middleware¶
Давайте воспользуемся существующим middleware из contrib, чтобы упростить задачу. Этот middleware будет служить только для доступа к API и не для обновления токена.
Tip
Не стесняйтесь создавать свой собственный middleware, это приведено для пояснения.
from jose import JWSError, JWTError
from esmerald.conf import settings
from esmerald.contrib.auth.edgy.middleware import JWTAuthMiddleware as EsmeraldMiddleware
from esmerald.exceptions import AuthenticationError, NotAuthorized
from esmerald.middleware.authentication import AuthResult
from esmerald.security.jwt.token import Token
from lilya._internal._connection import Connection
from lilya._internal._module_loading import import_string
from lilya.middleware import DefineMiddleware as LilyaMiddleware
class JWTAuthMiddleware(EsmeraldMiddleware):
def get_token(self, request: Connection) -> Token:
"""
Gets the token from the headers.
"""
token = request.headers.get(self.config.authorization_header, None)
if not token or token is None:
raise NotAuthorized(detail="Token not found in the request header")
token_partition = token.partition(" ")
token_type = token_partition[0]
auth_token = token_partition[-1]
if token_type not in self.config.auth_header_types:
raise NotAuthorized(detail=f"'{token_type}' is not an authorized header.")
try:
token = Token.decode(
token=auth_token,
key=self.config.signing_key,
algorithms=[self.config.algorithm],
)
except (JWSError, JWTError) as e:
raise AuthenticationError(str(e)) from e
return token
async def authenticate(self, request: Connection) -> AuthResult:
"""
Retrieves the header default of the config, validates
and returns the AuthResult.
Raises Authentication error if invalid.
"""
token: Token = self.get_token(request)
if token.token_type == settings.jwt_config.refresh_token_name:
raise NotAuthorized(detail="Refresh tokens cannot be used for operations.")
user = await self.retrieve_user(token.sub)
if not user:
raise AuthenticationError("User not found.")
return AuthResult(user=user)
# Middleware responsible from user accesses.
# This can be imported in any level of the application
AuthMiddleware = LilyaMiddleware(
JWTAuthMiddleware,
config=settings.jwt_config,
user_model=import_string("accounts.models.User"),
)
Здесь происходит много всего, но в основном мы делаем следующее:
- Проверяем наличие
token
в заголовке. - Проверяем, является ли
token_type
типомaccess_token
(имя по умолчанию из JWTConfig и может быть любым другим) и вызываем исключение, если это неaccess_token
. - Возвращаем объект
AuthResult
с данными пользователя из БД.
Middleware также содержит обертку под названием AuthMiddleware
. Она будет использоваться позже в
user controllers.
Backend¶
Здесь мы разместим логику, которая обрабатывает аутентификацию и обновление токена.
from datetime import datetime
from typing import Any, Dict
from accounts.models import User
from edgy.exceptions import ObjectNotFound
from jose import JWSError, JWTError
from pydantic import BaseModel
from esmerald.conf import settings
from esmerald.exceptions import AuthenticationError, NotAuthorized
from esmerald.security.jwt.token import Token
class AccessToken(BaseModel):
access_token: str
class RefreshToken(BaseModel):
"""
Model used only to refresh
"""
refresh_token: str
class TokenAccess(AccessToken, RefreshToken):
"""
Model representation of an access token.
"""
...
class LoginIn(BaseModel):
email: str
password: str
class BackendAuthentication(BaseModel):
"""
Utility class that helps with the authentication process.
"""
email: str
password: str
async def authenticate(self) -> Dict[str, str]:
"""Authenticates a user and returns
a dictionary containing the `access_token` and `refresh_token`
in the format of:
{
"access_token": ...,
"refresh_token": ...
}
"""
try:
user: User = await User.query.get(email=self.email)
except ObjectNotFound:
# Run the default password hasher once to reduce the timing
# difference between an existing and a nonexistent user.
await User().set_password(self.password)
else:
is_password_valid = await user.check_password(self.password)
if is_password_valid and self.is_user_able_to_authenticate(user):
# The lifetime of a token should be short, let us make 5 minutes.
# You can use also the access_token_lifetime from the JWT config directly
access_time = datetime.now() + settings.jwt_config.access_token_lifetime
refresh_time = datetime.now() + settings.jwt_config.refresh_token_lifetime
access_token = TokenAccess(
access_token=self.generate_user_token(
user,
time=access_time,
token_type=settings.jwt_config.access_token_name, # 'access_token'
),
refresh_token=self.generate_user_token(
user,
time=refresh_time,
token_type=settings.jwt_config.refresh_token_name, # 'refresh_token'
),
)
return access_token.model_dump()
else:
raise NotAuthorized(detail="Invalid credentials.")
def is_user_able_to_authenticate(self, user: Any):
"""
Reject users with is_active=False. Custom user models that don't have
that attribute are allowed.
"""
return getattr(user, "is_active", True)
def generate_user_token(self, user: User, token_type: str, time: datetime = None):
"""
Generates the JWT token for the authenticated user.
"""
if not time:
later = datetime.now() + settings.jwt_config.access_token_lifetime
else:
later = time
token = Token(sub=str(user.id), exp=later)
return token.encode(
key=settings.jwt_config.signing_key,
algorithm=settings.jwt_config.algorithm,
token_type=token_type,
)
class RefreshAuthentication(BaseModel):
"""
Refreshes the access token given a refresh token of a given user.
This object does not perform any DB action, instead, uses the existing refresh
token to generate a new access.
"""
token: RefreshToken
async def refresh(self) -> AccessToken:
"""
Retrieves the header default of the config and validates against the decoding.
Raises Authentication error if invalid.
"""
token = self.token.refresh_token
try:
token = Token.decode(
token=token,
key=settings.jwt_config.signing_key,
algorithms=[settings.jwt_config.algorithm],
)
except (JWSError, JWTError) as e:
raise AuthenticationError(str(e)) from e
if token.token_type != settings.jwt_config.refresh_token_name:
raise NotAuthorized(detail="Only refresh tokens are allowed.")
# Apply the maximum living time
expiry_date = datetime.now() + settings.jwt_config.access_token_lifetime
# New token object
new_token = Token(sub=token.sub, exp=expiry_date)
# Encode the token
access_token = new_token.encode(
key=settings.jwt_config.signing_key,
algorithm=settings.jwt_config.algorithm,
token_type=settings.jwt_config.access_token_name,
)
return AccessToken(access_token=access_token)
Довольно много кода, верно? Да, но в основном это логика, используемая для аутентификации и обновления существующего токена.
Вы видели BackendAuthentication
и RefreshAuthentication
? Теперь это будет очень полезно.
RefreshAuthentication
— это то место, где мы проверяем refresh_token
. Помните middleware,
который позволяет использовать только access_token
? Middleware будет использоваться только для API,
которые требуют аутентификации, а refresh_token
, как правило, должен только обновлять access токен и ничего больше.
Поскольку refresh токен уже содержит всю информацию, необходимую для генерации нового access токена,
нет необходимости снова запрашивать user
и проходить весь процесс.
Способ, которым был спроектирован и передан refresh токен в claims
, также позволяет нам напрямую
использовать его для генерации нового access_token
.
Помните Token? Вот где token_type
играет роль в определении,
какой тип токена проверяется и отправляется.
access_token
отправляется через headers
как и должно быть, а refresh_token
отправляется через POST
.
Контроллеры¶
Теперь пришло время собрать всё в контроллерах, где у нас будут:
/auth/create
- Конечная точка для создания пользователей./auth/signin
- Конечная точка для входа пользователя./auth/users
- Конечная точка, которая возвращает список всех пользователей./auth/refresh-access
- Конечная точка, ответственная только за обновление access_token.
В итоге, что-то вроде этого:
from typing import Any, Dict, List, Union
from accounts.backends import (
AccessToken,
BackendAuthentication,
RefreshAuthentication,
RefreshToken,
TokenAccess,
)
from accounts.middleware import AuthMiddleware
from accounts.models import User
from accounts.v1.schemas import LoginIn, UserIn, UserOut
from pydantic import BaseModel, EmailStr
from esmerald import APIView, JSONResponse, get, post, status
from esmerald.openapi.datastructures import OpenAPIResponse
from esmerald.openapi.security.http import Bearer
class UserIn(BaseModel):
"""
Model responsible for the creation of a User.
"""
first_name: str
last_name: str
email: str
password: str
username: str
class UserOut(BaseModel):
"""
Representation of the list of users.
"""
id: int
first_name: str
last_name: str
email: str
username: str
is_staff: bool
is_active: bool
is_superuser: bool
is_verified: bool
class LoginIn(BaseModel):
"""
Details needed for a login of a user in the system.
"""
email: EmailStr
password: str
class ErrorDetail(BaseModel):
"""
Used by the OpenAPI to describe the error
exposing the details.
"""
detail: str
class UserAPIView(APIView):
tags: List[str] = ["User and Access"]
security: List[Any] = [Bearer]
@get(
"/users",
summary="Gets all the users",
responses={201: OpenAPIResponse(model=[UserOut])},
middleware=[AuthMiddleware],
)
async def get_all(self) -> List[UserOut]:
return await User.query.all()
@post(
path="/create",
summary="Creates a user in the system",
responses={400: OpenAPIResponse(model=ErrorDetail)},
)
async def create_user(self, data: UserIn) -> None:
"""
Creates a user in the system and returns the default 201
status code.
"""
user_data = data.model_dump()
user_data.update({"is_verified": False})
await User.query.create(**user_data)
@post(
path="/signin",
summary="Login API and returns a JWT Token.",
status_code=status.HTTP_200_OK,
responses={
200: OpenAPIResponse(model=TokenAccess),
401: OpenAPIResponse(model=ErrorDetail),
},
)
async def signin(self, data: LoginIn) -> JSONResponse:
"""
Login a user and returns a JWT token, else raises ValueError
"""
auth = BackendAuthentication(email=data.email, password=data.password)
access_tokens: Dict[str, str] = await auth.authenticate()
return JSONResponse(access_tokens)
@post(
path="/refresh-access",
summary="Refreshes the access token",
description="When a token expires, a new access token must be generated from the refresh token previously provided. The refresh token must be just that, a refresh and it should only return a new access token and nothing else",
status_code=status.HTTP_200_OK,
responses={
200: OpenAPIResponse(model=AccessToken),
401: OpenAPIResponse(model=ErrorDetail),
},
)
async def refresh_token(self, payload: RefreshToken) -> AccessToken:
authentication = RefreshAuthentication(token=payload)
access_token: AccessToken = await authentication.refresh()
return access_token
Как видите, мы теперь собрали всё вместе. Путь /auth/users
требует аутентификации для доступа,
а /auth/refresh-access
гарантирует, что будет возвращён только новый access_token
.