Skip to content

Handler

Esmerald uses Asyncz to manage the scheduler internally and therefore their documentation is also up to date.

The handler is the scheduler used to decorate a function that you want to process as a task.

Requirements

Please check the minimum requirements to use this feature within your project.

The scheduler

This decorator does a lot of the magic containing all the information needed about a task and how it should be performed internally.

Parameters:

  • name - The name given to the task. Not the name of the function.

    Default: None

  • trigger - An instance of a trigger.

    Default: None

  • id - Explicit identifier (id) for the task.

    Default: None

  • misfire_grace_time - The seconds after the designated runtime that the task is still allowed to be run.

    Default: undefined

  • coalesce - Run once instead of many times if the scheduler determines that the task should be run more than once in succession.

    Default: undefined

  • max_intances - The maximum number of concurrently running instances allowed for this task.

    Default: undefined

  • next_run_time - When to first run the task, regardless of the trigger.

    Default: undefined

  • store - The alias of the store to store the task in.

    Default: None

  • executor - The alias of the executor to run the task with.

    Default: None

  • replace_existing - True to replace an existing task with the same id.

    Default: None

  • is_enabled - If a task should run or be disabled and not being triggered by the task scheduler.

    Default: True

  • args - The list of positional arguments to call func with.

    Default: None

  • kwargs - The dict of keyword arguments to call func with.

    Default: None

To obtain the undefined type:

from asyncz.typing import undefined

Triggers

Esmerald comes with some pre-defined triggers ready to be used by the application.

The built-in trigger cover the majority of the needs of all users. However if that is not the case, there is always the option to create a custom.

  • BaseTrigger - The base of all triggers and it can be extended to create a custom.
  • CronTrigger
  • IntervalTrigger
  • DateTrigger
  • OrTrigger
  • AndTrigger

Importing the triggers:

from asyncz.triggers import (
    AndTrigger,
    BaseTrigger,
    CronTrigger,
    DateTrigger,
    IntervalTrigger,
    OrTrigger,
)

Or you can simply import directly from the asyncz library as it is fully compatible.

CronTrigger

Triggers when current time matches all specified time constraits. Very similar to the way the UNIX cron works.

┌───────────── minute (0 - 59) ┌───────────── hour (0 - 23)  ┌───────────── day of the month (1 - 31)   ┌───────────── month (1 - 12)    ┌───────────── day of the week (0 - 6) (Sunday to Saturday;                                       7 is also Sunday on some systems)    │
│    │
* * * * * <command to execute>

Parameters:

  • year (int|str) – 4-digit year
  • month (int|str) – Month (1-12)
  • day (int|str) – Day of month (1-31)
  • week (int|str) – ISO week (1-53)
  • day_of_week (int|str) – Number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
  • hour (int|str) – Hour (0-23)
  • minute (int|str) – Minute (0-59)
  • second (int|str) – Second (0-59)
  • start_date (datetime|str) – Earliest possible date/time to trigger on (inclusive)
  • end_date (datetime|str) – Latest possible date/time to trigger on (inclusive)
  • timezone (datetime.tzinfo|str) – Time zone to use for the date/time calculations (defaults to scheduler timezone)
  • jitter (int|None) – Delay the task execution by jitter seconds at most
from asyncz.contrib.esmerald.decorator import scheduler
from asyncz.triggers import CronTrigger


@scheduler(trigger=CronTrigger(month="4-9,12", day="3rd friday", hour="0-4"))
def print_message():
    print("Hello, world!")


@scheduler(trigger=CronTrigger(day_of_week="mon-fri", hour=5, minute=30, end_at="2022-12-30"))
def print_another_message():
    print("Hello, another world!")


# From a crontab
@scheduler(trigger=CronTrigger.from_crontab("0 0 1-15 may-oct *"))
def from_crontab():
    print("Hello, from crontab!")

IntervalTrigger

Triggers on specified intervals, starting on start_date if specified or datetime.now() + interval otherwise.

Parameters:

  • weeks (int) - Number of weeks to wait.
  • days (int) - Number of days to wait.
  • hours (int) - Number of hours to wait.
  • minutes (int) - Number of minutes to wait.
  • seconds (int) - Number of seconds to wait.
  • start_date (datetime|str) - Starting point for the interval calculation.
  • end_date (datetime|str) – Latest possible date/time to trigger on
  • timezone (datetime.tzinfo|str) – Time zone to use for the date/time calculations
  • jitter (int|None) – Delay the task execution by jitter seconds at most
from asyncz.contrib.esmerald.decorator import scheduler
from asyncz.triggers import IntervalTrigger


@scheduler(trigger=IntervalTrigger(minutes=5))
def print_message():
    print("Hello, world!")


@scheduler(trigger=IntervalTrigger(hours=2))
def print_another_message():
    print("Hello, another world!")


@scheduler(
    trigger=IntervalTrigger(hours=2, start_at="2022-01-01 09:30:00", end_at="2023-01-01 11:00:00")
)
def from_interval():
    print("Hello, from crontab!")

DateTrigger

Triggers once on the given datetime. If run_date is left empty, current time is used.

Parameters:

  • run_date (datetime|str) – The date/time to run the task at.
  • timezone (datetime.tzinfo|str) – Time zone for run_date if it doesn’t have one already.
from datetime import date

from asyncz.contrib.esmerald.decorator import scheduler
from asyncz.triggers import DateTrigger


def print_text(text):
    print(text)


@scheduler(trigger=DateTrigger(run_at=date(2022, 11, 6), args=["text"]))
def print_message():
    print_text("Hello, world!")


@scheduler(trigger=DateTrigger(run_at="2022-11-06 14:30:00", args=["text"]))
def print_another_message():
    print_text("Hello, another world!")

OrTrigger

Always returns the earliest next fire time produced by any of the given triggers. The trigger is considered finished when all the given triggers have finished their schedules.

Parameters:

  • triggers (list) – Triggers to combine.
  • jitter (int|None) – Delay the task execution by jitter seconds at most.
from asyncz.contrib.esmerald.decorator import scheduler
from asyncz.triggers import CronTrigger, OrTrigger


@scheduler(
    trigger=OrTrigger(
        [
            CronTrigger(day_of_week="mon", hour=2),
            CronTrigger(day_of_week="wed", hour=16),
        ]
    )
)
def print_message():
    print("Hello, world!")

AndTrigger

Always returns the earliest next fire time that all the given triggers can agree on. The trigger is considered to be finished when any of the given triggers has finished its schedule.

Parameters:

  • triggers (list) – Triggers to combine.
  • jitter (int|None) – Delay the task execution by jitter seconds at most.
from asyncz.contrib.esmerald.decorator import scheduler
from asyncz.triggers import AndTrigger, CronTrigger, IntervalTrigger


@scheduler(trigger=AndTrigger([IntervalTrigger(hours=2), CronTrigger(day_of_week="sat,sun")]))
def print_message():
    print("Hello, world!")

Note

These triggers are the same as the Asyncz and we didn't want to break existing functionality. For more examples how to use even different approaches, check their great documentation.

Stores, executors and other configurations

Using the scheduler also means access to a lot of extra possible configurations that can be added such as stores, executors and any other extra configuration needed.

Esmerald allows to pass those configurations via application instantiation or via settings.

Via application instantiation

from esmerald import Esmerald

scheduler_configurations = (
    {
        "asyncz.stores.mongo": {"type": "mongodb"},
        "asyncz.stores.default": {"type": "redis", "database": "0"},
        "asyncz.executors.threadpool": {
            "max_workers": "20",
            "class": "asyncz.executors.threadpool:ThreadPoolExecutor",
        },
        "asyncz.executors.default": {"class": "asyncz.executors.asyncio::AsyncIOExecutor"},
        "asyncz.task_defaults.coalesce": "false",
        "asyncz.task_defaults.max_instances": "3",
        "asyncz.task_defaults.timezone": "UTC",
    },
)

app = Esmerald(
    enable_scheduler=True,
    scheduler_tasks=...,
    scheduler_configurations=scheduler_configurations,
)

Via application settings

from typing import Dict, Union

from esmerald import Esmerald, EsmeraldAPISettings


class AppSettings(EsmeraldAPISettings):
    enable_scheduler: bool = True

    @property
    def scheduler_tasks(self) -> Dict[str, str]: ...

    @property
    def scheduler_configurations(self) -> Dict[str, Union[str, Dict[str, str]]]:
        return (
            {
                "asyncz.stores.mongo": {"type": "mongodb"},
                "asyncz.stores.default": {"type": "redis", "database": "0"},
                "asyncz.executors.threadpool": {
                    "max_workers": "20",
                    "class": "asyncz.executors.threadpool:ThreadPoolExecutor",
                },
                "asyncz.executors.default": {"class": "asyncz.executors.asyncio::AsyncIOExecutor"},
                "asyncz.task_defaults.coalesce": "false",
                "asyncz.task_defaults.max_instances": "3",
                "asyncz.task_defaults.timezone": "UTC",
            },
        )


app = Esmerald()

Start the application with the new settings.

ESMERALD_SETTINGS_MODULE=AppSettings uvicorn src:app --reload

INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [28720]
INFO:     Started server process [28722]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

Configurations and the handler

When creating a task and using the scheduler one of the parameters is the store.

From the example you have new task stores and executors and those can be passed:

import logging

from asyncz.contrib.esmerald.decorator import scheduler
from asyncz.triggers import IntervalTrigger
from loguru import logger

logging.basicConfig()
logging.getLogger("esmerald").setLevel(logging.DEBUG)


@scheduler(
    name="collect_data",
    trigger=IntervalTrigger(hours=12),
    max_intances=3,
    store="mongo",
    executor="default",
)
def collect_market_data():
    logger.error("Collecting market data")
    ...


@scheduler(
    name="collect_data",
    trigger=IntervalTrigger(hours=12),
    max_intances=3,
    store="default",
    executor="processpoll",
)
def another_example():
    logger.error("Collecting market data")
    ...

Tip

Have a look at the documentation from Asyncz and learn more about what can be done and how can be done. All the parameters available in the Asyncz add_task are also available in the @scheduler handler in the same way.