SchedulerConfig¶
До версии 3.2.3 Esmerald поддерживал только Asyncz для своего внутреннего планировщика. Начиная с версии 3.2.3, поддержка Asyncz по-прежнему сохраняется, но теперь Esmerald делает её модульной, как и всё остальное в системе.
Что это значит?
Это значит, что если вы не хотите использовать Asyncz по каким-либо причинам, вы можете просто создать свою конфигурацию и подключить собственный планировщик к Esmerald.
Теперь это стало возможным благодаря реализации SchedulerConfig в Esmerald.
Как импортировать¶
Вы можете импортировать конфигурацию следующим образом:
from esmerald.contrib.schedulers import SchedulerConfig
Класс SchedulerConfig¶
При реализации конфигураций планировщика вы должны реализовать две функции.
Это делает SchedulerConfig
модульным, поскольку существует много планировщиков, каждый из которых
предлагает множество различных опций и конфигураций. Однако у всех них есть одна общая черта:
все они должны запускаться и завершаться в какой-то момент. Единственное, что важно для Esmerald,
это возможность инкапсулировать эту функциональность в две простые функции.
Функция start¶
Функция start
, как следует из названия, является функцией, которую Esmerald вызывает за вас для
запуска планировщика. Это важно, потому что, когда установлен флаг enable_scheduler
,
Esmerald будет искать конфигурацию планировщика и вызывать функцию start
при запуске.
Функция shutdown¶
Функция shutdown
, как следует из названия, является функцией, которую Esmerald вызывает за вас
для завершения работы планировщика. Это важно, потому что, когда установлен флаг enable_scheduler
,
Esmerald будет искать конфигурацию планировщика и вызывать функцию shutdown
при остановке
(обычно, когда приложение останавливается).
Как это использовать¶
Esmerald уже реализует этот интерфейс с помощью пользовательской AsynczConfig
. Эта функциональность
очень полезна, поскольку Asyncz имеет множество конфигураций, которые можно передавать и использовать в
приложении Esmerald.
Давайте посмотрим, как выглядит реализация.
import warnings
from uuid import uuid4
from datetime import datetime
from datetime import timezone as dtimezone
from typing import Any, Callable, Dict, Union, cast, Type
from asyncz.schedulers import AsyncIOScheduler
from asyncz.schedulers.types import SchedulerType
from asyncz.triggers.types import TriggerType
from asyncz.tasks.base import Task as AsynczTask
from asyncz.typing import undefined, UndefinedType
from esmerald.conf import settings
from esmerald.contrib.schedulers.base import SchedulerConfig
from esmerald.exceptions import ImproperlyConfigured
from esmerald.utils.module_loading import import_string
class AsynczConfig(SchedulerConfig):
"""
Implements an integration with Asyncz, allowing to
customise the scheduler with the provided configurations.
"""
def __init__(
self,
scheduler_class: Type[SchedulerType] = AsyncIOScheduler,
tasks: Union[Dict[str, str]] = None,
timezone: Union[dtimezone, str, None] = None,
configurations: Union[Dict[str, Dict[str, str]], None] = None,
**kwargs: Dict[str, Any],
):
"""
Initializes the AsynczConfig object.
Args:
scheduler_class: The class of the scheduler to be used.
tasks: A dictionary of tasks to be registered in the scheduler.
timezone: The timezone to be used by the scheduler.
configurations: Extra configurations to be passed to the scheduler.
**kwargs: Additional keyword arguments.
"""
super().__init__(**kwargs)
self.scheduler_class = scheduler_class
self.tasks = tasks
self.timezone = timezone
self.configurations = configurations
self.options = kwargs
for task, module in self.tasks.items():
if not isinstance(task, str) or not isinstance(module, str):
raise ImproperlyConfigured("The dict of tasks must be Dict[str, str].")
if not self.tasks:
warnings.warn(
"Esmerald is starting the scheduler, yet there are no tasks declared.",
UserWarning,
stacklevel=2,
)
# Load the scheduler object
self.handler = self.get_scheduler(
scheduler=self.scheduler_class,
timezone=self.timezone,
configurations=self.configurations,
**self.options,
)
self.register_tasks(tasks=self.tasks)
def register_tasks(self, tasks: Dict[str, str]) -> None:
"""
Registers the tasks in the Scheduler.
Args:
tasks: A dictionary of tasks to be registered in the scheduler.
"""
for task, _module in tasks.items():
imported_task = f"{_module}.{task}"
scheduled_task: "Task" = import_string(imported_task)
if not scheduled_task.is_enabled:
continue
try:
scheduled_task.add_task(self.handler)
except Exception as e:
raise ImproperlyConfigured(str(e)) from e
def get_scheduler(
self,
scheduler: Type[SchedulerType],
timezone: Union[dtimezone, str, None] = None,
configurations: Union[Dict[str, Any], None] = None,
**options: Dict[str, Any],
) -> SchedulerType:
"""
Initiates the scheduler from the given time.
If no value is provided, it will default to AsyncIOScheduler.
The value of `scheduler_class` can be overwritten by any esmerald custom settings.
Args:
scheduler: The class of the scheduler to be used.
timezone: The timezone instance.
configurations: A dictionary with extra configurations to be passed to the scheduler.
**options: Additional options.
Returns:
SchedulerType: An instance of a Scheduler.
"""
if not timezone:
timezone = settings.timezone
if not configurations:
return scheduler(timezone=timezone, **options)
return scheduler(global_config=configurations, timezone=timezone, **options)
async def start(self, **kwargs: Dict[str, Any]) -> None:
"""
Starts the scheduler.
Args:
**kwargs: Additional keyword arguments.
"""
self.handler.start(**kwargs)
async def shutdown(self, **kwargs: Dict[str, Any]) -> None:
"""
Shuts down the scheduler.
Args:
**kwargs: Additional keyword arguments.
"""
self.handler.shutdown(**kwargs)
class Task:
"""
Base for the scheduler decorator that will auto discover the
tasks in the application and add them to the internal scheduler.
"""
def __init__(
self,
*,
name: Union[str, None] = None,
trigger: Union[TriggerType, None] = None,
id: Union[str, None] = None,
mistrigger_grace_time: Union[int, UndefinedType, None] = undefined,
coalesce: Union[bool, UndefinedType] = undefined,
max_instances: Union[int, UndefinedType, None] = undefined,
next_run_time: Union[datetime, str, UndefinedType, None] = undefined,
store: str = "default",
executor: str = "default",
replace_existing: bool = False,
args: Union[Any, None] = None,
kwargs: Union[Dict[str, Any], None] = None,
is_enabled: bool = True,
) -> None:
"""
Initializes a new instance of the `Task` class for the Scheduler.
Args:
name (str, optional): Textual description of the task.
trigger (TriggerType, optional): An instance of a trigger class.
id (str, optional): Explicit identifier for the task.
mistrigger_grace_time (int, optional): Seconds after the designated runtime that the task is still allowed to be run
(or None to allow the task to run no matter how late it is).
coalesce (bool, optional): Run once instead of many times if the scheduler determines that the task should be run more than once in succession.
max_instances (int, optional): Maximum number of concurrently running instances allowed for this task.
next_run_time (datetime, optional): When to first run the task, regardless of the trigger (pass None to add the task as paused).
store (str, optional): Alias of the task store to store the task in.
executor (str, optional): Alias of the executor to run the task with.
replace_existing (bool, optional): True to replace an existing task with the same id
(but retain the number of runs from the existing one).
args (Any, optional): List of positional arguments to call func with.
kwargs (Dict[str, Any], optional): Dict of keyword arguments to call func with.
is_enabled (bool, optional): True if the task is to be added to the scheduler.
"""
self.name = name
self.trigger = trigger
self.id = id
self.mistrigger_grace_time = mistrigger_grace_time
self.coalesce = coalesce
self.max_instances = max_instances
self.next_run_time = next_run_time
self.store = store
self.executor = executor
self.replace_existing = replace_existing
self.args = args
self.kwargs = kwargs
self.is_enabled = is_enabled
self.fn = None
def add_task(self, scheduler: SchedulerType) -> None:
try:
scheduler.add_task(
self.fn,
trigger=self.trigger,
args=self.args,
kwargs=self.kwargs,
id=self.id,
name=self.name,
mistrigger_grace_time=self.mistrigger_grace_time,
coalesce=self.coalesce,
max_instances=self.max_instances,
next_run_time=self.next_run_time,
store=self.store,
executor=self.executor,
replace_existing=self.replace_existing,
)
except Exception as e:
raise ImproperlyConfigured(str(e)) from e
Мы не будем углубляться в технические детали этой конфигурации, так как она уникальна для Asyncz,
предоставленного Esmerald, но необязательно использовать именно его, так как вы можете создать
свою собственную конфигурацию и передать её в параметр scheduler_config
Esmerald.
SchedulerConfig и приложение¶
Чтобы использовать SchedulerConfig
в приложении, как показано выше с asyncz, вы можете сделать следующее:
Note
Мы используем существующий AsynczConfig
в качестве примера, но не стесняйтесь использовать свою
собственную конфигурацию, если вам нужно что-то другое.
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from esmerald import Esmerald
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig
def get_scheduler_config() -> AsynczConfig:
# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"default": MongoDBStore()}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
"default": AsyncIOExecutor(),
"threadpool": ThreadPoolExecutor(max_workers=20),
}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
return AsynczConfig(
tasks=...,
timezone="UTC",
stores=stores,
executors=executors,
task_defaults=task_defaults,
)
app = Esmerald(routes=[...], scheduler_config=get_scheduler_config())
Если вы хотите узнать больше о том, как использовать AsynczConfig, ознакомьтесь с соответствующим разделом.
Жизненный цикл приложения¶
Планировщик Esmerald связан с жизненным циклом приложения, что означает, что он использует
события on_startup/on_shutdown
и lifespan
. Вы можете узнать больше об этом
в соответствующем разделе документации.
По умолчанию планировщик связан с событиями on_startup/on_shutdown
, и они автоматически управляются
для вас, но если вам требуется использовать lifespan, то вы должны внести соответствующие изменения.
Следующий пример служит предложением, но не стесняйтесь использовать свой собственный дизайн.
Давайте посмотрим, как мы могли бы управлять этим с использованием lifespan
вместо.
from contextlib import asynccontextmanager
from functools import lru_cache
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from esmerald import Esmerald
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig
@asynccontextmanager
async def lifespan(app: Esmerald):
# What happens on startup
await get_scheduler_config().start()
yield
# What happens on shutdown
await get_scheduler_config().shutdown()
@lru_cache
def get_scheduler_config() -> AsynczConfig:
# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"default": MongoDBStore()}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
"default": AsyncIOExecutor(),
"threadpool": ThreadPoolExecutor(max_workers=20),
}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
return AsynczConfig(
tasks=...,
timezone="UTC",
stores=stores,
executors=executors,
task_defaults=task_defaults,
)
app = Esmerald(
routes=[...],
lifespan=lifespan,
scheduler_config=get_scheduler_config(),
)
Довольно просто, верно? Esmerald понимает, что нужно сделать как обычно.
SchedulerConfig и настройки¶
Как и всё в Esmerald, SchedulerConfig также может быть доступен через настройки.
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from esmerald import EsmeraldAPISettings
from esmerald.contrib.schedulers import SchedulerConfig
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig
class CustomSettings(EsmeraldAPISettings):
@property
def scheduler_config(self) -> SchedulerConfig:
stores = {"default": MongoDBStore()}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
"default": AsyncIOExecutor(),
"threadpool": ThreadPoolExecutor(max_workers=20),
}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
return AsynczConfig(
tasks=...,
timezone="UTC",
stores=stores,
executors=executors,
task_defaults=task_defaults,
)
Важные заметки¶
- Вы можете создать свой собственный настраиваемый конфиг для планировщика.
- Вы должны реализовать функции
start/shutdown
в любой конфигурации планировщика. - Вы можете использовать события
on_startup/shutdown
илиlifespan
. Первые автоматически управляются за вас.