Handler¶
Esmerald uses Asyncz to manage the scheduler internally and therefore their documentation is also up to date.
The handler is the scheduler
used to decorate a function that you want to process as a task.
Requirements¶
Please check the minimum requirements to use this feature within your project.
The scheduler¶
This decorator does a lot of the magic containing all the information needed about a task and how it should be performed internally.
Parameters:
-
name - The name given to the task. Not the name of the function.
Default:
None
-
trigger - An instance of a trigger.
Default:
None
-
id - Explicit identifier (id) for the task.
Default:
None
-
misfire_grace_time - The seconds after the designated runtime that the task is still allowed to be run.
Default:
undefined
-
coalesce - Run once instead of many times if the scheduler determines that the task should be run more than once in succession.
Default:
undefined
-
max_instances - The maximum number of concurrently running instances allowed for this task.
Default:
undefined
-
next_run_time - When to first run the task, regardless of the trigger.
Default:
undefined
-
store - The alias of the store to store the task in.
Default:
None
-
executor - The alias of the executor to run the task with.
Default:
None
-
replace_existing - True to replace an existing task with the same
id
.Default:
None
-
is_enabled - If a task should run or be disabled and not being triggered by the task scheduler.
Default:
True
-
args - The list of positional arguments to call func with.
Default:
None
-
kwargs - The dict of keyword arguments to call func with.
Default:
None
To obtain the undefined
type:
from asyncz.typing import undefined
Triggers¶
Esmerald comes with some pre-defined triggers ready to be used by the application.
The built-in trigger cover the majority of the needs of all users. However if that is not the case, there is always the option to create a custom.
BaseTrigger
- The base of all triggers and it can be extended to create a custom.CronTrigger
IntervalTrigger
DateTrigger
OrTrigger
AndTrigger
Importing the triggers:
from asyncz.triggers import (
AndTrigger,
BaseTrigger,
CronTrigger,
DateTrigger,
IntervalTrigger,
OrTrigger,
)
Or you can simply import directly from the asyncz
library as it is fully compatible.
CronTrigger¶
Triggers when current time matches all specified time constraits. Very similar to the way the UNIX cron works.
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of the month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday;
│ │ │ │ │ 7 is also Sunday on some systems)
│ │ │ │ │
│ │ │ │ │
* * * * * <command to execute>
Parameters:
- year (int|str) – 4-digit year
- month (int|str) – Month (1-12)
- day (int|str) – Day of month (1-31)
- week (int|str) – ISO week (1-53)
- day_of_week (int|str) – Number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
- hour (int|str) – Hour (0-23)
- minute (int|str) – Minute (0-59)
- second (int|str) – Second (0-59)
- start_date (datetime|str) – Earliest possible date/time to trigger on (inclusive)
- end_date (datetime|str) – Latest possible date/time to trigger on (inclusive)
- timezone (datetime.tzinfo|str) – Time zone to use for the date/time calculations (defaults to scheduler timezone)
- jitter (int|None) – Delay the task execution by jitter seconds at most
from asyncz.triggers import CronTrigger
from esmerald.contrib.schedulers.asyncz.decorator import scheduler
@scheduler(trigger=CronTrigger(month="4-9,12", day="3rd friday", hour="0-4"))
def print_message():
print("Hello, world!")
@scheduler(trigger=CronTrigger(day_of_week="mon-fri", hour=5, minute=30, end_at="2022-12-30"))
def print_another_message():
print("Hello, another world!")
# From a crontab
@scheduler(trigger=CronTrigger.from_crontab("0 0 1-15 may-oct *"))
def from_crontab():
print("Hello, from crontab!")
IntervalTrigger¶
Triggers on specified intervals, starting on start_date
if specified or datetime.now()
+ interval otherwise.
Parameters:
- weeks (int) - Number of weeks to wait.
- days (int) - Number of days to wait.
- hours (int) - Number of hours to wait.
- minutes (int) - Number of minutes to wait.
- seconds (int) - Number of seconds to wait.
- start_date (datetime|str) - Starting point for the interval calculation.
- end_date (datetime|str) – Latest possible date/time to trigger on
- timezone (datetime.tzinfo|str) – Time zone to use for the date/time calculations
- jitter (int|None) – Delay the task execution by jitter seconds at most
from esmerald.contrib.schedulers.asyncz.decorator import scheduler
from asyncz.triggers import IntervalTrigger
@scheduler(trigger=IntervalTrigger(minutes=5))
def print_message():
print("Hello, world!")
@scheduler(trigger=IntervalTrigger(hours=2))
def print_another_message():
print("Hello, another world!")
@scheduler(
trigger=IntervalTrigger(hours=2, start_at="2022-01-01 09:30:00", end_at="2023-01-01 11:00:00")
)
def from_interval():
print("Hello, from crontab!")
DateTrigger¶
Triggers once on the given datetime. If run_date
is left empty, current time is used.
Parameters:
- run_date (datetime|str) – The date/time to run the task at.
- timezone (datetime.tzinfo|str) – Time zone for run_date if it doesn’t have one already.
from datetime import date
from asyncz.triggers import DateTrigger
from esmerald.contrib.schedulers.asyncz.decorator import scheduler
def print_text(text):
print(text)
@scheduler(trigger=DateTrigger(run_at=date(2022, 11, 6), args=["text"]))
def print_message():
print_text("Hello, world!")
@scheduler(trigger=DateTrigger(run_at="2022-11-06 14:30:00", args=["text"]))
def print_another_message():
print_text("Hello, another world!")
OrTrigger¶
Always returns the earliest next fire time produced by any of the given triggers. The trigger is considered finished when all the given triggers have finished their schedules.
Parameters:
- triggers (list) – Triggers to combine.
- jitter (int|None) – Delay the task execution by jitter seconds at most.
from esmerald.contrib.schedulers.asyncz.decorator import scheduler
from asyncz.triggers import CronTrigger, OrTrigger
@scheduler(
trigger=OrTrigger(
[
CronTrigger(day_of_week="mon", hour=2),
CronTrigger(day_of_week="wed", hour=16),
]
)
)
def print_message():
print("Hello, world!")
AndTrigger¶
Always returns the earliest next fire time that all the given triggers can agree on. The trigger is considered to be finished when any of the given triggers has finished its schedule.
Parameters:
- triggers (list) – Triggers to combine.
- jitter (int|None) – Delay the task execution by jitter seconds at most.
from asyncz.triggers import AndTrigger, CronTrigger, IntervalTrigger
from esmerald.contrib.schedulers.asyncz.decorator import scheduler
@scheduler(trigger=AndTrigger([IntervalTrigger(hours=2), CronTrigger(day_of_week="sat,sun")]))
def print_message():
print("Hello, world!")
Note
These triggers are the same as the Asyncz
and we didn't want to break existing functionality.
For more examples how to use even different approaches, check their great documentation.
Stores, executors and other configurations¶
Using the scheduler also means access to a lot of extra possible configurations that can be added such as stores
,
executors
and any other extra configuration needed.
Esmerald allows to pass those configurations via application instantiation or via settings.
Via application instantiation¶
from esmerald import Esmerald
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig
scheduler_config = AsynczConfig(
configurations={
"asyncz.stores.mongo": {"type": "mongodb"},
"asyncz.stores.default": {"type": "redis", "database": "0"},
"asyncz.executors.threadpool": {
"max_workers": "20",
"class": "asyncz.executors.threadpool:ThreadPoolExecutor",
},
"asyncz.executors.default": {"class": "asyncz.executors.asyncio::AsyncIOExecutor"},
"asyncz.task_defaults.coalesce": "false",
"asyncz.task_defaults.max_instances": "3",
"asyncz.task_defaults.timezone": "UTC",
},
)
app = Esmerald(
enable_scheduler=True,
scheduler_config=scheduler_config,
)
Via application settings¶
from typing import Dict, Union
from esmerald import Esmerald, EsmeraldAPISettings
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig
class AppSettings(EsmeraldAPISettings):
enable_scheduler: bool = True
@property
def scheduler_config(self) -> AsynczConfig:
return AsynczConfig(
tasks=...,
configurations={
"asyncz.stores.mongo": {"type": "mongodb"},
"asyncz.stores.default": {"type": "redis", "database": "0"},
"asyncz.executors.threadpool": {
"max_workers": "20",
"class": "asyncz.executors.threadpool:ThreadPoolExecutor",
},
"asyncz.executors.default": {"class": "asyncz.executors.asyncio::AsyncIOExecutor"},
"asyncz.task_defaults.coalesce": "false",
"asyncz.task_defaults.max_instances": "3",
"asyncz.task_defaults.timezone": "UTC",
},
)
app = Esmerald()
Start the application with the new settings.
ESMERALD_SETTINGS_MODULE=AppSettings uvicorn src:app --reload
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Started reloader process [28720]
INFO: Started server process [28722]
INFO: Waiting for application startup.
INFO: Application startup complete.
$env:ESMERALD_SETTINGS_MODULE="AppSettings"; uvicorn src:app --reload
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Started reloader process [28720]
INFO: Started server process [28722]
INFO: Waiting for application startup.
INFO: Application startup complete.
Configurations and the handler¶
When creating a task and using the scheduler
one of the parameters is the store
.
From the example you have new task stores and executors and those can be passed:
import logging
from loguru import logger
from asyncz.triggers import IntervalTrigger
from esmerald.contrib.schedulers.asyncz.decorator import scheduler
logging.basicConfig()
logging.getLogger("esmerald").setLevel(logging.DEBUG)
@scheduler(
name="collect_data",
trigger=IntervalTrigger(hours=12),
max_instances=3,
store="mongo",
executor="default",
)
def collect_market_data():
logger.error("Collecting market data")
...
@scheduler(
name="collect_data",
trigger=IntervalTrigger(hours=12),
max_instances=3,
store="default",
executor="processpoll",
)
def another_example():
logger.error("Collecting market data")
...
Tip
Have a look at the documentation from
Asyncz and learn more
about what can be done and how can be done. All the parameters available in the Asyncz add_task
are also
available in the @scheduler
handler in the same way.