Перейти к содержанию

SchedulerConfig

До версии 3.2.3 Esmerald поддерживал только Asyncz для своего внутреннего планировщика. Начиная с версии 3.2.3, поддержка Asyncz по-прежнему сохраняется, но теперь Esmerald делает её модульной, как и всё остальное в системе.

Что это значит?

Это значит, что если вы не хотите использовать Asyncz по каким-либо причинам, вы можете просто создать свою конфигурацию и подключить собственный планировщик к Esmerald.

Теперь это стало возможным благодаря реализации SchedulerConfig в Esmerald.

Как импортировать

Вы можете импортировать конфигурацию следующим образом:

from esmerald.contrib.schedulers import SchedulerConfig

Класс SchedulerConfig

При реализации конфигураций планировщика вы должны реализовать две функции.

  1. async def start()
  2. async def shutdown()

Это делает SchedulerConfig модульным, поскольку существует много планировщиков, каждый из которых предлагает множество различных опций и конфигураций. Однако у всех них есть одна общая черта: все они должны запускаться и завершаться в какой-то момент. Единственное, что важно для Esmerald, это возможность инкапсулировать эту функциональность в две простые функции.

Функция start

Функция start, как следует из названия, является функцией, которую Esmerald вызывает за вас для запуска планировщика. Это важно, потому что, когда установлен флаг enable_scheduler, Esmerald будет искать конфигурацию планировщика и вызывать функцию start при запуске.

Функция shutdown

Функция shutdown, как следует из названия, является функцией, которую Esmerald вызывает за вас для завершения работы планировщика. Это важно, потому что, когда установлен флаг enable_scheduler, Esmerald будет искать конфигурацию планировщика и вызывать функцию shutdown при остановке (обычно, когда приложение останавливается).

Как это использовать

Esmerald уже реализует этот интерфейс с помощью пользовательской AsynczConfig. Эта функциональность очень полезна, поскольку Asyncz имеет множество конфигураций, которые можно передавать и использовать в приложении Esmerald.

Давайте посмотрим, как выглядит реализация.

import warnings
from uuid import uuid4
from datetime import datetime
from datetime import timezone as dtimezone
from typing import Any, Callable, Dict, Union, cast, Type

from asyncz.schedulers import AsyncIOScheduler
from asyncz.schedulers.types import SchedulerType
from asyncz.triggers.types import TriggerType
from asyncz.tasks.base import Task as AsynczTask
from asyncz.typing import undefined, UndefinedType
from esmerald.conf import settings
from esmerald.contrib.schedulers.base import SchedulerConfig
from esmerald.exceptions import ImproperlyConfigured
from esmerald.utils.module_loading import import_string


class AsynczConfig(SchedulerConfig):
    """
    Implements an integration with Asyncz, allowing to
    customise the scheduler with the provided configurations.
    """

    def __init__(
        self,
        scheduler_class: Type[SchedulerType] = AsyncIOScheduler,
        tasks: Union[Dict[str, str]] = None,
        timezone: Union[dtimezone, str, None] = None,
        configurations: Union[Dict[str, Dict[str, str]], None] = None,
        **kwargs: Dict[str, Any],
    ):
        """
        Initializes the AsynczConfig object.

        Args:
            scheduler_class: The class of the scheduler to be used.
            tasks: A dictionary of tasks to be registered in the scheduler.
            timezone: The timezone to be used by the scheduler.
            configurations: Extra configurations to be passed to the scheduler.
            **kwargs: Additional keyword arguments.
        """
        super().__init__(**kwargs)
        self.scheduler_class = scheduler_class
        self.tasks = tasks
        self.timezone = timezone
        self.configurations = configurations
        self.options = kwargs

        for task, module in self.tasks.items():
            if not isinstance(task, str) or not isinstance(module, str):
                raise ImproperlyConfigured("The dict of tasks must be Dict[str, str].")

        if not self.tasks:
            warnings.warn(
                "Esmerald is starting the scheduler, yet there are no tasks declared.",
                UserWarning,
                stacklevel=2,
            )

        # Load the scheduler object
        self.handler = self.get_scheduler(
            scheduler=self.scheduler_class,
            timezone=self.timezone,
            configurations=self.configurations,
            **self.options,
        )

        self.register_tasks(tasks=self.tasks)

    def register_tasks(self, tasks: Dict[str, str]) -> None:
        """
        Registers the tasks in the Scheduler.

        Args:
            tasks: A dictionary of tasks to be registered in the scheduler.
        """
        for task, _module in tasks.items():
            imported_task = f"{_module}.{task}"
            scheduled_task: "Task" = import_string(imported_task)

            if not scheduled_task.is_enabled:
                continue

            try:
                scheduled_task.add_task(self.handler)
            except Exception as e:
                raise ImproperlyConfigured(str(e)) from e

    def get_scheduler(
        self,
        scheduler: Type[SchedulerType],
        timezone: Union[dtimezone, str, None] = None,
        configurations: Union[Dict[str, Any], None] = None,
        **options: Dict[str, Any],
    ) -> SchedulerType:
        """
        Initiates the scheduler from the given time.
        If no value is provided, it will default to AsyncIOScheduler.

        The value of `scheduler_class` can be overwritten by any esmerald custom settings.

        Args:
            scheduler: The class of the scheduler to be used.
            timezone: The timezone instance.
            configurations: A dictionary with extra configurations to be passed to the scheduler.
            **options: Additional options.

        Returns:
            SchedulerType: An instance of a Scheduler.
        """
        if not timezone:
            timezone = settings.timezone

        if not configurations:
            return scheduler(timezone=timezone, **options)

        return scheduler(global_config=configurations, timezone=timezone, **options)

    async def start(self, **kwargs: Dict[str, Any]) -> None:
        """
        Starts the scheduler.

        Args:
            **kwargs: Additional keyword arguments.
        """
        self.handler.start(**kwargs)

    async def shutdown(self, **kwargs: Dict[str, Any]) -> None:
        """
        Shuts down the scheduler.

        Args:
            **kwargs: Additional keyword arguments.
        """
        self.handler.shutdown(**kwargs)


class Task:
    """
    Base for the scheduler decorator that will auto discover the
    tasks in the application and add them to the internal scheduler.
    """

    def __init__(
        self,
        *,
        name: Union[str, None] = None,
        trigger: Union[TriggerType, None] = None,
        id: Union[str, None] = None,
        mistrigger_grace_time: Union[int, UndefinedType, None] = undefined,
        coalesce: Union[bool, UndefinedType] = undefined,
        max_instances: Union[int, UndefinedType, None] = undefined,
        next_run_time: Union[datetime, str, UndefinedType, None] = undefined,
        store: str = "default",
        executor: str = "default",
        replace_existing: bool = False,
        args: Union[Any, None] = None,
        kwargs: Union[Dict[str, Any], None] = None,
        is_enabled: bool = True,
    ) -> None:
        """
        Initializes a new instance of the `Task` class for the  Scheduler.

        Args:
            name (str, optional): Textual description of the task.
            trigger (TriggerType, optional): An instance of a trigger class.
            id (str, optional): Explicit identifier for the task.
            mistrigger_grace_time (int, optional): Seconds after the designated runtime that the task is still allowed to be run
                (or None to allow the task to run no matter how late it is).
            coalesce (bool, optional): Run once instead of many times if the scheduler determines that the task should be run more than once in succession.
            max_instances (int, optional): Maximum number of concurrently running instances allowed for this task.
            next_run_time (datetime, optional): When to first run the task, regardless of the trigger (pass None to add the task as paused).
            store (str, optional): Alias of the task store to store the task in.
            executor (str, optional): Alias of the executor to run the task with.
            replace_existing (bool, optional): True to replace an existing task with the same id
                (but retain the number of runs from the existing one).
            args (Any, optional): List of positional arguments to call func with.
            kwargs (Dict[str, Any], optional): Dict of keyword arguments to call func with.
            is_enabled (bool, optional): True if the task is to be added to the scheduler.
        """
        self.name = name
        self.trigger = trigger
        self.id = id
        self.mistrigger_grace_time = mistrigger_grace_time
        self.coalesce = coalesce
        self.max_instances = max_instances
        self.next_run_time = next_run_time
        self.store = store
        self.executor = executor
        self.replace_existing = replace_existing
        self.args = args
        self.kwargs = kwargs
        self.is_enabled = is_enabled
        self.fn = None

    def add_task(self, scheduler: SchedulerType) -> None:
        try:
            scheduler.add_task(
                self.fn,
                trigger=self.trigger,
                args=self.args,
                kwargs=self.kwargs,
                id=self.id,
                name=self.name,
                mistrigger_grace_time=self.mistrigger_grace_time,
                coalesce=self.coalesce,
                max_instances=self.max_instances,
                next_run_time=self.next_run_time,
                store=self.store,
                executor=self.executor,
                replace_existing=self.replace_existing,
            )
        except Exception as e:
            raise ImproperlyConfigured(str(e)) from e

Мы не будем углубляться в технические детали этой конфигурации, так как она уникальна для Asyncz, предоставленного Esmerald, но необязательно использовать именно его, так как вы можете создать свою собственную конфигурацию и передать её в параметр scheduler_config Esmerald.

SchedulerConfig и приложение

Чтобы использовать SchedulerConfig в приложении, как показано выше с asyncz, вы можете сделать следующее:

Note

Мы используем существующий AsynczConfig в качестве примера, но не стесняйтесь использовать свою собственную конфигурацию, если вам нужно что-то другое.

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from esmerald import Esmerald
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig


def get_scheduler_config() -> AsynczConfig:
    # Define the stores
    # Override the default MemoryStore to become RedisStore where the db is 0
    stores = {"default": MongoDBStore()}

    # Define the executors
    # Override the default ot be the AsyncIOExecutor
    executors = {
        "default": AsyncIOExecutor(),
        "threadpool": ThreadPoolExecutor(max_workers=20),
    }

    # Set the defaults
    task_defaults = {"coalesce": False, "max_instances": 4}

    return AsynczConfig(
        tasks=...,
        timezone="UTC",
        stores=stores,
        executors=executors,
        task_defaults=task_defaults,
    )


app = Esmerald(routes=[...], scheduler_config=get_scheduler_config())

Если вы хотите узнать больше о том, как использовать AsynczConfig, ознакомьтесь с соответствующим разделом.

Жизненный цикл приложения

Планировщик Esmerald связан с жизненным циклом приложения, что означает, что он использует события on_startup/on_shutdown и lifespan. Вы можете узнать больше об этом в соответствующем разделе документации.

По умолчанию планировщик связан с событиями on_startup/on_shutdown, и они автоматически управляются для вас, но если вам требуется использовать lifespan, то вы должны внести соответствующие изменения.

Следующий пример служит предложением, но не стесняйтесь использовать свой собственный дизайн. Давайте посмотрим, как мы могли бы управлять этим с использованием lifespan вместо.

from contextlib import asynccontextmanager
from functools import lru_cache

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from esmerald import Esmerald
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig


@asynccontextmanager
async def lifespan(app: Esmerald):
    # What happens on startup
    await get_scheduler_config().start()
    yield
    # What happens on shutdown
    await get_scheduler_config().shutdown()


@lru_cache
def get_scheduler_config() -> AsynczConfig:
    # Define the stores
    # Override the default MemoryStore to become RedisStore where the db is 0
    stores = {"default": MongoDBStore()}

    # Define the executors
    # Override the default ot be the AsyncIOExecutor
    executors = {
        "default": AsyncIOExecutor(),
        "threadpool": ThreadPoolExecutor(max_workers=20),
    }

    # Set the defaults
    task_defaults = {"coalesce": False, "max_instances": 4}

    return AsynczConfig(
        tasks=...,
        timezone="UTC",
        stores=stores,
        executors=executors,
        task_defaults=task_defaults,
    )


app = Esmerald(
    routes=[...],
    lifespan=lifespan,
    scheduler_config=get_scheduler_config(),
)

Довольно просто, верно? Esmerald понимает, что нужно сделать как обычно.

SchedulerConfig и настройки

Как и всё в Esmerald, SchedulerConfig также может быть доступен через настройки.

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from esmerald import EsmeraldAPISettings
from esmerald.contrib.schedulers import SchedulerConfig
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig


class CustomSettings(EsmeraldAPISettings):
    @property
    def scheduler_config(self) -> SchedulerConfig:
        stores = {"default": MongoDBStore()}

        # Define the executors
        # Override the default ot be the AsyncIOExecutor
        executors = {
            "default": AsyncIOExecutor(),
            "threadpool": ThreadPoolExecutor(max_workers=20),
        }

        # Set the defaults
        task_defaults = {"coalesce": False, "max_instances": 4}

        return AsynczConfig(
            tasks=...,
            timezone="UTC",
            stores=stores,
            executors=executors,
            task_defaults=task_defaults,
        )

Важные заметки

  • Вы можете создать свой собственный настраиваемый конфиг для планировщика.
  • Вы должны реализовать функции start/shutdown в любой конфигурации планировщика.
  • Вы можете использовать события on_startup/shutdown или lifespan. Первые автоматически управляются за вас.