Observables¶
1. What Are Observables?¶
Observables are a programming concept used to implement the Observer Pattern, where a function (or object) reacts to events dynamically. This means that:
- An event is emitted when something significant happens (e.g., a user registers, a payment is processed, etc.).
- Subscribers (listeners) react to that event without direct coupling to the emitter.
- This allows for a highly flexible, decoupled, and scalable event-driven architecture.
Observables can be thought of as a stream of events, much like how promises handle asynchronous data but continuously over time. Instead of handling a single future result, observables enable reactive programming, where parts of the system automatically respond to changes.
2. Benefits of Using Observables¶
Using observables in an application, especially within Esmerald, comes with several advantages:
β Decoupling & Maintainability¶
Observables help separate event producers (emitters) from event consumers (listeners). This reduces dependencies and makes the system easier to maintain.
β Scalability & Extensibility¶
By using observables, new features can be added without modifying existing code. Developers can subscribe to events dynamically instead of changing core logic.
β Concurrency & Efficiency¶
Using async-based event dispatching, Esmerald handles multiple listeners without blocking execution. This improves performance in real-time applications.
β Code Reusability¶
Once an observable event is defined, it can be reused across multiple parts of the application, reducing redundant logic.
β Better Error Handling¶
Observables allow for centralized error handling on emitted events, making debugging more manageable.
3. Why Should You Use Observables?¶
Observables are useful in scenarios where something happens and multiple parts of the system need to react independently.
For example:
- User Registration Events β Send a welcome email, log activity, and assign default roles.
- Payment Processing β Notify the user, update the database, and trigger order fulfillment.
- Live Data Streaming β Real-time notifications, stock updates, or WebSocket messages.
- Background Tasks β Perform long-running operations (e.g., data processing, cleanup).
- Logging & Monitoring β Collect application metrics without affecting request performance.
In Esmerald, observables allow for an efficient and scalable event-driven approach, making it ideal for high-performance applications.
4. How Observables Are Applied in Esmerald¶
Esmerald provides built-in support for observables through the @observable
decorator and EventDispatcher
.
πΉ Key Components¶
@observable
Decorator- Defines functions that can emit and/or listen for events.
EventDispatcher
- Manages event subscriptions and emissions asynchronously.
- Async and Sync Support
- Supports both asynchronous and synchronous event handlers.
- Concurrency Handling
- Uses
anyio.create_task_group()
to handle multiple listeners in parallel.
5. Real-World Examples Using Esmerald¶
Example 1: User Registration with Multiple Side Effects¶
A user registers, and multiple actions occur without coupling the logic together.
from esmerald import post
from esmerald.utils.decorators import observable
# User registration endpoint
@post("/register")
@observable(send=["user_registered"])
async def register_user(data: dict):
return {"message": "User registered successfully!"}
# Listeners for the event
@observable(listen=["user_registered"])
async def send_welcome_email():
print("Sending welcome email...")
@observable(listen=["user_registered"])
async def assign_default_roles():
print("Assigning default roles to the user...")
"user_registered"
is emitted.
β
The system automatically triggers listeners β Email is sent, roles are assigned.
β
No direct dependency β Can easily add more listeners in the future.
Example 2: Payment Processing¶
When a payment is made, multiple systems react to the event.
from esmerald import post
from esmerald.utils.decorators import observable
@post("/pay")
@observable(send=["payment_success"])
async def process_payment():
return {"message": "Payment processed!"}
@observable(listen=["payment_success"])
async def notify_user():
print("Notifying user about payment confirmation...")
@observable(listen=["payment_success"])
async def update_database():
print("Updating payment database records...")
@observable(listen=["payment_success"])
async def generate_invoice():
print("Generating invoice for the payment...")
β One event triggers multiple independent processes. β Fully decoupled logic for better maintainability.
Example 3: Logging User Activity¶
from esmerald import post
from esmerald.utils.decorators import observable
@post("/login")
@observable(send=["user_logged_in"])
async def login():
return {"message": "User logged in!"}
@observable(listen=["user_logged_in"])
async def log_login_activity():
print("Logging user login activity...")
β Logs login activity without modifying authentication logic.
Example 4: Real-Time Notifications¶
from esmerald import post
from esmerald.utils.decorators import observable
@post("/comment")
@observable(send=["new_comment"])
async def add_comment():
return {"message": "Comment added!"}
@observable(listen=["new_comment"])
async def send_notification():
print("Sending notification about the new comment...")
β Users get notified immediately after a comment is posted.
Example 5: Background Data Processing¶
from esmerald import post
from esmerald.utils.decorators import observable
@post("/upload")
@observable(send=["file_uploaded"])
async def upload_file():
return {"message": "File uploaded successfully!"}
@observable(listen=["file_uploaded"])
async def process_file():
print("Processing file in the background...")
β Heavy file processing runs asynchronously, without blocking the request.
Example 6: Scheduled Tasks & Cleanup Jobs¶
from esmerald.utils.decorators import observable
@observable(send=["daily_cleanup"])
async def trigger_cleanup():
print("Daily cleanup event triggered!")
@observable(listen=["daily_cleanup"])
async def delete_old_records():
print("Deleting old database records...")
@observable(listen=["daily_cleanup"])
async def clear_cache():
print("Clearing application cache...")
β Scheduled task runs automatically β Triggers multiple cleanup tasks.
Conclusion¶
Observables in Esmerald allow developers to build efficient, scalable, and maintainable event-driven applications. By leveraging @observable
and EventDispatcher
:
βοΈ Events are decoupled from logic, improving maintainability. βοΈ Asynchronous execution improves performance. βοΈ Easily extend functionality without modifying existing code. βοΈ Ensures a clean, modular architecture.
Whether you're handling user events, background jobs, notifications, or real-time updates, observables empower you to build dynamic and reactive applications. π