Eventos y Observers¶
Los eventos en tequio siguen el patrón Events / Listeners de Laravel: un hecho de dominio ("se exportaron las notas", "se creó una nota") se dispara explícitamente y uno o varios Observers reaccionan. Es notificación 1:N fire-and-forget: el código que dispara el evento no espera retorno ni sabe quién escucha.
from tequio.Core.Events import dispatch
from tequio.Modules.Demo.Events import NoteCreated
dispatch(NoteCreated(note_id=7, title="Mi nota"))
NO es un model-observer de SQLAlchemy
tequio no ata esto a la base de datos. El evento no se dispara por un commit;
lo disparas tú con dispatch(...) desde donde ocurra el hecho de negocio (un service,
un job, un command de consola). Así controlas exactamente cuándo y con qué datos se notifica.
El evento: un @dataclass de primitivos¶
Un evento es solo un @dataclass con campos primitivos planos (str, int, listas de
str, ids). Nada de instancias ORM ni sesiones de BD. La razón es el transporte: si hay
broker, el evento viaja como kwargs JSON y se reconstruye en el worker con
Evento(**kwargs) — y eso solo funciona con primitivos serializables (mismo contrato que
SerializesModels de Laravel).
# src/tequio/Modules/Demo/Events.py
from __future__ import annotations
from dataclasses import dataclass
@dataclass
class NoteCreated:
"""Se creó una nota. Lo observa LogNoteCreated."""
note_id: int
title: str
Empaca en el evento lo que el observer necesitará lejos del caller
Un observer puede correr en el worker, en otro proceso: allá no tiene la sesión ni el
contexto del caller. Por eso el evento lleva dentro los datos que el observer usará
(note_id, title) en vez de un objeto ORM. Si necesitas datos de BD que sí puedes leer
por id, pasa el id y consúltalo en handle().
En milpa esto mandaba correo al dueño
En milpa, NoteCreated cargaba el dueño y su locale, y el observer enviaba un
Mailable i18n a ese usuario. tequio sí tiene correo (vuelve al worker; ver
Correo), pero no tiene Auth ni tabla de usuarios (eso vive en
milpa), así que el demo ya no tiene dueño: el
evento se quedó mínimo (note_id, title). El observer del demo (LogNoteCreated) solo
loguea —a propósito— para mostrar el otro transporte de un efecto secundario; el
correo del demo lo manda el cron del digest (ver Programación (cron)).
El Observer: subclase con observes + handle()¶
Un Observer hereda de la ABC Observer (tequio/Core/Events/Observer.py), fija el atributo
de clase observes = TipoDeEvento y sobreescribe handle(self, event):
# src/tequio/Modules/Demo/Observers/LogNoteCreated.py
from __future__ import annotations
from loguru import logger
from tequio.Core.Events import Observer
from tequio.Modules.Demo.Events import NoteCreated
class LogNoteCreated(Observer):
observes = NoteCreated
def handle(self, event: object) -> None:
assert isinstance(event, NoteCreated) # dispatch ya filtró por tipo; narrow para mypy
# en milpa esto era un Mailable i18n al dueño; aquí, sin dueño, el observer loguea
# (el correo del demo lo manda el cron del digest: muestra el otro transporte)
logger.info(
'demo.note_created | nota {id} "{t}" creada',
id=event.note_id, t=event.title,
)
| Atributo / método | Para qué | Laravel |
|---|---|---|
observes (ClassVar) |
Tipo de evento que escucha. Match por tipo exacto (sin herencia). None = escucha todos los eventos. |
$listen en EventServiceProvider |
handle(self, event) |
Reacciona al evento. Por defecto no hace nada. | handle(Event $event) |
Relación 1:N: varios Observers pueden declarar observes = NoteCreated y todos
corren. El event que llega a handle() ya está filtrado por tipo (de ahí el assert
isinstance para que mypy lo afine).
Un Observer SÍ puede leer la BD
Lo que evitamos es atarlo a la BD (no es un model-observer). Pero handle() es código
normal: puede consultar repositorios, escribir a un archivo, despachar otro job, etc. El
observer del demo no toca BD porque todo lo que necesita viaja en el evento.
Disparar el evento: dispatch(evento)¶
dispatch vive en tequio/Core/Events. Recibe la instancia del evento y la entrega a cada
Observer cuyo observes matchee (o sea None):
from tequio.Core.Events import dispatch
from tequio.Modules.Demo.Events import NoteCreated
dispatch(
NoteCreated(
note_id=7,
title="Mi nota",
)
)
En tequio el disparo nace donde ocurre el hecho de negocio: un service que acaba de crear la nota, un job o un command de consola. Por ejemplo, justo después de crear la nota en un service:
created = NoteService().create(title, body)
# Evento de dominio → el Observer LogNoteCreated loguea la creación (auto).
dispatch(
NoteCreated(
note_id=int(created["id"]),
title=str(created["title"]),
)
)
El demo trae el evento y el observer, no el call site
El módulo Demo define NoteCreated y LogNoteCreated para enseñar el patrón,
pero no incluye un caller que lo dispare (en milpa ese call site era el controller HTTP
POST /notes, que tequio no tiene). Tú decides desde dónde llamas dispatch(...): cualquier
service, job o command es válido.
Transporte adaptativo: broker si hay, síncrono si no¶
Aquí está la decisión clave de diseño (KISS, sin flags por-observer): si hay broker disponible, el observer corre en el worker (async); si no, corre síncrono inline. Tú no eliges; lo decide el framework por observer:
dispatch(NoteCreated(...))
│
▼
¿hay broker?
├── sí → encola task "events.handle" → el WORKER reconstruye observer + evento y corre handle()
└── no → observer().handle(event) (síncrono, en el acto)
El import de Celery es perezoso: un proyecto que nunca encola observers no jala redis al
arrancar. La rama encolada vive en tequio/Core/Events/Tasks.py y solo se importa cuando hace
falta (lo hace Dispatch._dispatch_one). Si el broker no responde, se cae a ejecución
síncrona automáticamente (QueueUnavailableError).
Best-effort por observer
Un observer que falla no tumba al caller ni a los demás observers: un efecto
secundario no debe romper la operación de negocio. El comportamiento ante un error lo
decide el flag EVENTS_STRICT (siguiente sección) — pero nunca falla en silencio.
Auto-registro y discovery¶
No hay que registrar nada a mano (adiós al EventServiceProvider). Dos mecanismos:
- Auto-registro por subclase: definir una clase que herede de
Observerla mete sola en el registro interno (__init_subclass__), mismo patrón que losSeeder. - Discovery por convención:
import_all_observers()(entequio.Core.Registry) importa todo el árbol de cada módulo (recursivo). Importar el módulo donde vive el observer es lo que dispara su auto-registro.
Por eso basta con que el Observer viva en algún lugar del árbol de tu módulo (el demo lo
pone en Observers/LogNoteCreated.py, pero el discovery no lo exige ahí). Si lo defines fuera del paquete de
módulos y nadie lo importa, dispatch no lo verá.
Procesos sueltos: corre el discovery a mano
Un proceso que dispare eventos fuera del arranque normal (un command de consola propio, un
script) debe importar los observers antes para que dispatch los encuentre:
El flag EVENTS_STRICT¶
Controla qué pasa cuando un observer lanza una excepción (setting events_strict, definido
en tequio/Core/Config, default False):
events_strict |
Comportamiento ante un observer que falla | Cuándo |
|---|---|---|
False (default) |
Loguea ruidoso (ERROR + traceback con logger.exception) y sigue. La operación de negocio no se rompe. |
Producción |
True |
Re-lanza la excepción, para que el bug del observer truene fuerte de inmediato. | Dev / tests |
En ambos casos nunca se traga el error en silencio. Pon EVENTS_STRICT=true en .env
mientras desarrollas para cazar bugs en tus observers al instante.
Forma tradicional vs. estilo milpa¶
Forma tradicional — el service orquesta los efectos secundarios inline. Sabe del log y del transporte; mezcla la regla de negocio con sus consecuencias:
def create(self, title, body):
note = ... # crear y persistir la nota
# El service orquesta TODO el efecto secundario a mano:
logger.info("nota {id} creada", id=note.id)
# y si mañana hay que auditar o avisar a otro sistema → se toca este método otra vez
return note
Agregar un segundo efecto (auditoría, webhook, export) significa tocar el service otra vez.
Estilo milpa — el service anuncia el hecho y se desentiende del resto. Quién reacciona y cómo viaja (worker o síncrono) es problema del framework y de los Observers:
def create(self, title, body):
note = ... # crear y persistir la nota
dispatch(NoteCreated(note_id=note.id, title=note.title))
return note
Para sumar un efecto, agregas un Observer (otra subclase de Observer con
observes = NoteCreated) — sin tocar el service. Eso es la inversión 1:N: el emisor
no conoce a sus consumidores.
Eventos vs. Mediator vs. Jobs¶
tequio ofrece varios mecanismos opt-in; elige por intención:
| Patrón | Cardinalidad | ¿Devuelve? | Cuándo |
|---|---|---|---|
Eventos / Observers (dispatch) |
1:N | No (fire-and-forget) | "Pasó X" — notificar a N reacciones desacopladas. |
Mediator (send) |
1:1 | Sí (resultado) | Una intención que resuelves y de la que esperas respuesta. |
Jobs (@job + .dispatch()) |
1:1 | No | Un trabajo de background concreto que siempre quieres encolar. |
El Mediator enruta UNA intención a UN handler y te devuelve el resultado; los Eventos son notificación 1:N donde no esperas retorno y el transporte lo decide el framework. Ver Jobs y Colas y tareas para el background.
Probar Observers sin BD ni broker¶
Como los observers se ejecutan síncronos cuando no hay broker, un test puede disparar el evento y verificar el efecto sin Celery. Para aislar el registro entre tests, tequio expone helpers (espejo de los seeders):
registered_observers(): la lista de subclases deObserverregistradas.reset_observers(): limpia el registro (solo para tests).
Con EVENTS_STRICT=true en el entorno de test, si un observer falla, el dispatch re-lanza
y el test truena (en vez de tragarse el error).