Перейти к содержанию

JWTConfig

JWT расшифровывается как JSON Web Token. Его можно использовать с любым middleware на ваш выбор, которое реализует BaseAuthMiddleware.

Tip

Больше информации о JWT тут.

Зависимости

Esmerald использует pyjwt и passlib для интеграции с JWT. Вы можете установить их, выполнив:

$ pip install esmerald[jwt]

JWTConfig и приложение

Для использования JWTConfig с middleware.

from myapp.models import User

from esmerald import Esmerald, settings
from esmerald.config.jwt import JWTConfig
from esmerald.contrib.auth.edgy.middleware import JWTAuthMiddleware
from lilya.middleware import DefineMiddleware as LilyaMiddleware

jwt_config = JWTConfig(
    signing_key=settings.secret_key,
)

auth_middleware = LilyaMiddleware(JWTAuthMiddleware, config=jwt_config, user_model=User)

app = Esmerald(middleware=[auth_middleware])

Info

В примере используется JWTAuthMiddleware из Esmerald с Edgy ORM.

Параметры

Все параметры и значения по умолчанию доступны в справке по JWTConfig.

JWTConfig и настройки приложения

Конфигурацию JWTConfig можно выполнить напрямую через инициализацию приложения, а также с помощью настроек.

from typing import TYPE_CHECKING, List

from esmerald import EsmeraldAPISettings
from esmerald.config.jwt import JWTConfig
from esmerald.contrib.auth.edgy.middleware import JWTAuthMiddleware
from lilya._internal._module_loading import import_string
from lilya.middleware import DefineMiddleware as LilyaMiddleware

if TYPE_CHECKING:
    from esmerald.types import Middleware


class CustomSettings(EsmeraldAPISettings):
    @property
    def jwt_config(self) -> JWTConfig:
        """
        A JWT object configuration to be passed to the application middleware
        """
        return JWTConfig(signing_key=self.secret_key, auth_header_types=["Bearer", "Token"])

    @property
    def middleware(self) -> List["Middleware"]:
        """
        Initial middlewares to be loaded on startup of the application.
        """
        return [
            LilyaMiddleware(
                JWTAuthMiddleware,
                config=self.jwt_config,
                user_model=import_string("myapp.models.User"),
            )
        ]

Это поможет вам поддерживать настройки в чистоте, без перегруженного экземпляра Esmerald.

Модель токена

Esmerald предоставляет стандартный объект токена, который позволяет легко генерировать и декодировать токены.

from esmerald.security.jwt.token import Token

token = Token(exp=..., iat=..., sub=...)

Параметры являются стандартными для Python JOSE, так что вы можете чувствовать себя комфортно, используя их.

Генерация токена (кодирование)

Токен предоставляет стандартные операции для взаимодействия с pyjwt.

from esmerald.security.jwt.token import Token
from esmerald.conf import settings

# Создание модели токена
token = Token(exp=..., iat=..., sub=...)

# Генерация JWT токена
jwt_token = Token.encode(key=settings.secret_key, algorithm="HS256", **claims)

Декодирование токена (decode)

Функция декодирования также предоставляется.

from esmerald.security.jwt.token import Token
from esmerald.conf import settings

# Декодирование JWT токена
jwt_token = Token.decode(token=..., key=settings.secret_key, algorithms=["HS256"])

Метод Token.decode возвращает объект Token.

Note

Эта функциональность сильно зависит от библиотеки pyjwt, но её использование не является обязательным. Вы можете использовать любую библиотеку, которая соответствует вашим требованиям. Esmerald просто предлагает примеры и альтернативы.

Поля claims

Параметр **claims может быть очень полезен, особенно если вы хотите генерировать токены access и refresh. При использовании claims вы можете передавать любые дополнительные параметры, которые после декодирования будут доступны для манипуляций.

Интеграция с базой данных содержит пример выполнения таких операций, но давайте рассмотрим еще пример.

Мы будем использовать middleware и сгенерируем access_token и refresh_token для определённого API.

Предположим следующие вещи:

  • Есть модель User в файле accounts/models.py.
  • Контроллеры находятся в accounts/controllers.py.
  • Мы будем наследовать существующий middleware для упрощения.
  • middleware находится в файле accounts/middleware.py.
  • JWTConfig уже настроен в файле с настройками.
  • Класс Token будет унаследован для добавления дополнительных параметров, таких как token_type.
  • В файле accounts/backends.py находятся операции для аутентификации и обновления токена.

Класс Token

Вы должны создать подкласс для Token, если хотите добавить дополнительные параметры для своих нужд. Например, чтобы иметь дополнительный параметр token_type, указывающий, является ли токен access, refresh или каким-либо другим типом, который вы хотите использовать для ваших claims.

Пример может выглядеть так:

from typing import Union

from esmerald.security.jwt.token import Token as EsmeraldToken


class Token(EsmeraldToken):
    token_type: Union[str, None] = None

Это будет особенно полезно на следующих этапах, так как мы будем использовать token_type, чтобы различать access_token и refresh_token.

Middleware

Давайте воспользуемся существующим middleware из contrib, чтобы упростить задачу. Этот middleware будет служить только для доступа к API и не для обновления токена.

Tip

Не стесняйтесь создавать свой собственный middleware, это приведено для пояснения.

from jose import JWSError, JWTError

from esmerald.conf import settings
from esmerald.contrib.auth.edgy.middleware import JWTAuthMiddleware as EsmeraldMiddleware
from esmerald.exceptions import AuthenticationError, NotAuthorized
from esmerald.middleware.authentication import AuthResult
from esmerald.security.jwt.token import Token
from lilya._internal._connection import Connection
from lilya._internal._module_loading import import_string
from lilya.middleware import DefineMiddleware as LilyaMiddleware


class JWTAuthMiddleware(EsmeraldMiddleware):
    def get_token(self, request: Connection) -> Token:
        """
        Gets the token from the headers.
        """
        token = request.headers.get(self.config.authorization_header, None)

        if not token or token is None:
            raise NotAuthorized(detail="Token not found in the request header")

        token_partition = token.partition(" ")
        token_type = token_partition[0]
        auth_token = token_partition[-1]

        if token_type not in self.config.auth_header_types:
            raise NotAuthorized(detail=f"'{token_type}' is not an authorized header.")

        try:
            token = Token.decode(
                token=auth_token,
                key=self.config.signing_key,
                algorithms=[self.config.algorithm],
            )
        except (JWSError, JWTError) as e:
            raise AuthenticationError(str(e)) from e
        return token

    async def authenticate(self, request: Connection) -> AuthResult:
        """
        Retrieves the header default of the config, validates
        and returns the AuthResult.

        Raises Authentication error if invalid.
        """
        token: Token = self.get_token(request)

        if token.token_type == settings.jwt_config.refresh_token_name:
            raise NotAuthorized(detail="Refresh tokens cannot be used for operations.")

        user = await self.retrieve_user(token.sub)
        if not user:
            raise AuthenticationError("User not found.")
        return AuthResult(user=user)


# Middleware responsible from user accesses.
# This can be imported in any level of the application
AuthMiddleware = LilyaMiddleware(
    JWTAuthMiddleware,
    config=settings.jwt_config,
    user_model=import_string("accounts.models.User"),
)

Здесь происходит много всего, но в основном мы делаем следующее:

  • Проверяем наличие token в заголовке.
  • Проверяем, является ли token_type типом access_token (имя по умолчанию из JWTConfig и может быть любым другим) и вызываем исключение, если это не access_token.
  • Возвращаем объект AuthResult с данными пользователя из БД.

Middleware также содержит обертку под названием AuthMiddleware. Она будет использоваться позже в user controllers.

Backend

Здесь мы разместим логику, которая обрабатывает аутентификацию и обновление токена.

Warning

Пример ниже использует Edgy из contrib, чтобы упростить объяснение и запросы.

from datetime import datetime
from typing import Any, Dict

from accounts.models import User
from edgy.exceptions import ObjectNotFound
from jose import JWSError, JWTError
from pydantic import BaseModel

from esmerald.conf import settings
from esmerald.exceptions import AuthenticationError, NotAuthorized
from esmerald.security.jwt.token import Token


class AccessToken(BaseModel):
    access_token: str


class RefreshToken(BaseModel):
    """
    Model used only to refresh
    """

    refresh_token: str


class TokenAccess(AccessToken, RefreshToken):
    """
    Model representation of an access token.
    """

    ...


class LoginIn(BaseModel):
    email: str
    password: str


class BackendAuthentication(BaseModel):
    """
    Utility class that helps with the authentication process.
    """

    email: str
    password: str

    async def authenticate(self) -> Dict[str, str]:
        """Authenticates a user and returns
        a dictionary containing the `access_token` and `refresh_token`
        in the format of:

        {
            "access_token": ...,
            "refresh_token": ...
        }
        """
        try:
            user: User = await User.query.get(email=self.email)
        except ObjectNotFound:
            # Run the default password hasher once to reduce the timing
            # difference between an existing and a nonexistent user.
            await User().set_password(self.password)
        else:
            is_password_valid = await user.check_password(self.password)
            if is_password_valid and self.is_user_able_to_authenticate(user):
                # The lifetime of a token should be short, let us make 5 minutes.
                # You can use also the access_token_lifetime from the JWT config directly
                access_time = datetime.now() + settings.jwt_config.access_token_lifetime
                refresh_time = datetime.now() + settings.jwt_config.refresh_token_lifetime
                access_token = TokenAccess(
                    access_token=self.generate_user_token(
                        user,
                        time=access_time,
                        token_type=settings.jwt_config.access_token_name,  # 'access_token'
                    ),
                    refresh_token=self.generate_user_token(
                        user,
                        time=refresh_time,
                        token_type=settings.jwt_config.refresh_token_name,  # 'refresh_token'
                    ),
                )
                return access_token.model_dump()
            else:
                raise NotAuthorized(detail="Invalid credentials.")

    def is_user_able_to_authenticate(self, user: Any):
        """
        Reject users with is_active=False. Custom user models that don't have
        that attribute are allowed.
        """
        return getattr(user, "is_active", True)

    def generate_user_token(self, user: User, token_type: str, time: datetime = None):
        """
        Generates the JWT token for the authenticated user.
        """
        if not time:
            later = datetime.now() + settings.jwt_config.access_token_lifetime
        else:
            later = time

        token = Token(sub=str(user.id), exp=later)
        return token.encode(
            key=settings.jwt_config.signing_key,
            algorithm=settings.jwt_config.algorithm,
            token_type=token_type,
        )


class RefreshAuthentication(BaseModel):
    """
    Refreshes the access token given a refresh token of a given user.

    This object does not perform any DB action, instead, uses the existing refresh
    token to generate a new access.
    """

    token: RefreshToken

    async def refresh(self) -> AccessToken:
        """
        Retrieves the header default of the config and validates against the decoding.
        Raises Authentication error if invalid.
        """
        token = self.token.refresh_token

        try:
            token = Token.decode(
                token=token,
                key=settings.jwt_config.signing_key,
                algorithms=[settings.jwt_config.algorithm],
            )
        except (JWSError, JWTError) as e:
            raise AuthenticationError(str(e)) from e

        if token.token_type != settings.jwt_config.refresh_token_name:
            raise NotAuthorized(detail="Only refresh tokens are allowed.")

        # Apply the maximum living time
        expiry_date = datetime.now() + settings.jwt_config.access_token_lifetime

        # New token object
        new_token = Token(sub=token.sub, exp=expiry_date)

        # Encode the token
        access_token = new_token.encode(
            key=settings.jwt_config.signing_key,
            algorithm=settings.jwt_config.algorithm,
            token_type=settings.jwt_config.access_token_name,
        )

        return AccessToken(access_token=access_token)

Довольно много кода, верно? Да, но в основном это логика, используемая для аутентификации и обновления существующего токена.

Вы видели BackendAuthentication и RefreshAuthentication? Теперь это будет очень полезно.

RefreshAuthentication — это то место, где мы проверяем refresh_token. Помните middleware, который позволяет использовать только access_token? Middleware будет использоваться только для API, которые требуют аутентификации, а refresh_token, как правило, должен только обновлять access токен и ничего больше.

Поскольку refresh токен уже содержит всю информацию, необходимую для генерации нового access токена, нет необходимости снова запрашивать user и проходить весь процесс.

Способ, которым был спроектирован и передан refresh токен в claims, также позволяет нам напрямую использовать его для генерации нового access_token.

Помните Token? Вот где token_type играет роль в определении, какой тип токена проверяется и отправляется.

access_token отправляется через headers как и должно быть, а refresh_token отправляется через POST.

Контроллеры

Теперь пришло время собрать всё в контроллерах, где у нас будут:

  • /auth/create - Конечная точка для создания пользователей.
  • /auth/signin - Конечная точка для входа пользователя.
  • /auth/users - Конечная точка, которая возвращает список всех пользователей.
  • /auth/refresh-access - Конечная точка, ответственная только за обновление access_token.

В итоге, что-то вроде этого:

from typing import Any, Dict, List, Union

from accounts.backends import (
    AccessToken,
    BackendAuthentication,
    RefreshAuthentication,
    RefreshToken,
    TokenAccess,
)
from accounts.middleware import AuthMiddleware
from accounts.models import User
from accounts.v1.schemas import LoginIn, UserIn, UserOut
from pydantic import BaseModel, EmailStr

from esmerald import APIView, JSONResponse, get, post, status
from esmerald.openapi.datastructures import OpenAPIResponse
from esmerald.openapi.security.http import Bearer


class UserIn(BaseModel):
    """
    Model responsible for the creation of a User.
    """

    first_name: str
    last_name: str
    email: str
    password: str
    username: str


class UserOut(BaseModel):
    """
    Representation of the list of users.
    """

    id: int
    first_name: str
    last_name: str
    email: str
    username: str
    is_staff: bool
    is_active: bool
    is_superuser: bool
    is_verified: bool


class LoginIn(BaseModel):
    """
    Details needed for a login of a user in the system.
    """

    email: EmailStr
    password: str


class ErrorDetail(BaseModel):
    """
    Used by the OpenAPI to describe the error
    exposing the details.
    """

    detail: str


class UserAPIView(APIView):
    tags: List[str] = ["User and Access"]
    security: List[Any] = [Bearer]

    @get(
        "/users",
        summary="Gets all the users",
        responses={201: OpenAPIResponse(model=[UserOut])},
        middleware=[AuthMiddleware],
    )
    async def get_all(self) -> List[UserOut]:
        return await User.query.all()

    @post(
        path="/create",
        summary="Creates a user in the system",
        responses={400: OpenAPIResponse(model=ErrorDetail)},
    )
    async def create_user(self, data: UserIn) -> None:
        """
        Creates a user in the system and returns the default 201
        status code.
        """
        user_data = data.model_dump()
        user_data.update({"is_verified": False})
        await User.query.create(**user_data)

    @post(
        path="/signin",
        summary="Login API and returns a JWT Token.",
        status_code=status.HTTP_200_OK,
        responses={
            200: OpenAPIResponse(model=TokenAccess),
            401: OpenAPIResponse(model=ErrorDetail),
        },
    )
    async def signin(self, data: LoginIn) -> JSONResponse:
        """
        Login a user and returns a JWT token, else raises ValueError
        """
        auth = BackendAuthentication(email=data.email, password=data.password)
        access_tokens: Dict[str, str] = await auth.authenticate()
        return JSONResponse(access_tokens)

    @post(
        path="/refresh-access",
        summary="Refreshes the access token",
        description="When a token expires, a new access token must be generated from the refresh token previously provided. The refresh token must be just that, a refresh and it should only return a new access token and nothing else",
        status_code=status.HTTP_200_OK,
        responses={
            200: OpenAPIResponse(model=AccessToken),
            401: OpenAPIResponse(model=ErrorDetail),
        },
    )
    async def refresh_token(self, payload: RefreshToken) -> AccessToken:
        authentication = RefreshAuthentication(token=payload)
        access_token: AccessToken = await authentication.refresh()
        return access_token

Как видите, мы теперь собрали всё вместе. Путь /auth/users требует аутентификации для доступа, а /auth/refresh-access гарантирует, что будет возвращён только новый access_token.