Programación (cron)¶
tequio reproduce el scheduler estilo milpa (a su vez inspirado en el de Laravel) sobre
Celery: declaras la cadencia pegada al job con @cron_task, y un disparador despacha lo que
toca. Hay dos disparadores que leen esos mismos crons: el beat de Celery
(schedule work, que los auto-agenda) o el crontab del SO llamando schedule run
cada minuto (estilo php artisan schedule:run).
Declarar un cron¶
Declara tus crons con @cron_task dentro de tu módulo (el discovery importa todo el árbol;
el demo los pone, por la convención de los generadores make:*, en
Crons/DailyDigestCron.py). El módulo Demo trae uno en
tequio/Modules/Demo/Crons/DailyDigestCron.py — un resumen diario de notas:
# Modules/Demo/Crons/DailyDigestCron.py
from tequio.Core.CeleryApp import QueueUnavailableError
from tequio.Core.Cron import cron_task, daily_at
from tequio.Core.Mail import Mail
from tequio.Modules.Demo.Mail.DailyDigestMailable import DailyDigestMailable
from tequio.Modules.Demo.Repositories.NoteRepository import NoteRepository
@cron_task(name="demo.daily_digest", schedule=daily_at("08:00"), output="demo_digest")
def daily_digest() -> None:
"""Corre en el WORKER cada día a las 8:00 (lo despacha el beat de `schedule work`,
o el crontab del SO vía `schedule run`)."""
total = len(NoteRepository().all())
# El cron MANDA correo. Worker-side lo idiomático es ENCOLAR a la cola `emails`
# (= ->onQueue('emails') de Laravel; la consume `queue work --queue emails`); si el
# broker no está, caemos al envío SÍNCRONO. Con MAIL_DRIVER=log (default dev) el MIME
# se vuelca al log —a logs/cron_demo_digest.log por su `output`— sin SMTP; con Mailpit, a la UI.
mailable = DailyDigestMailable(total=total)
try:
Mail.queue(mailable, to=["admin@example.com"], queue="emails")
except QueueUnavailableError:
Mail.send(mailable, to=["admin@example.com"])
Se registra al importarse (el Registry importa todo el árbol de cada módulo).
El cron que motivó traer el correo al worker
Este es justo el caso por el que tequio sí extrajo el correo de milpa: en milpa el
DailyDigestCron manda el resumen por correo al admin, y muchísimos crons reales
terminan haciendo lo mismo. Así que aquí el digest manda correo de verdad (un
Mailable encolado a la cola emails, con fallback síncrono si el broker no está);
con el driver log (default dev) se vuelca al log sin SMTP, con
Mailpit lo ves en la UI. Lo que tequio no trae es la capa web que lo disparaba;
el patrón del cron (cadencia + guards + despacho al worker) es idéntico al de milpa. Ver
Correo.
El decorador @cron_task¶
def cron_task(
*,
name: str,
schedule: str | None = None,
queue: str | None = None,
environments: Sequence[str] | None = None,
without_overlapping: bool = False,
output: str | None = None,
lock_timeout: int | None = None,
**celery_options: Any,
) -> Callable[[DecoratedTask], Any]
| Parámetro | Default | Semántica |
|---|---|---|
name |
(obligatorio) | Identificador único de la task. |
schedule |
None |
Expresión cron (5 campos). Si es None, la task existe pero no se agenda. |
queue |
None |
Cola de Celery a la que se despacha; None = cola por defecto. |
environments |
None → todos |
Lista de APP_ENV donde corre; si app_env no está, se omite. |
without_overlapping |
False |
Lock en Redis; si la corrida previa sigue, se omite esta. |
output |
None |
Rutea los logs de la corrida a logs/cron_<output>.log (rotación diaria, 14 días). |
lock_timeout |
derivado | Timeout del lock. Por defecto visibility_timeout + 300s. |
**celery_options |
— | Cualquier opción extra de Celery (rate_limit, etc.). |
@cron_tasksí envuelve la función: la wrapper ejecuta los guards (entorno, lock, logs) antes de tu código, y devuelve una task de Celery. Puedes llamarla con.delay()o directotask().
@cron_task(
name="reminders",
schedule=every_five_minutes(),
environments=["qa", "production"],
without_overlapping=True,
output="reminders",
queue="reports",
)
def send_reminders() -> None:
logger.info("Procesando recordatorios...")
# ...
No hay generador
make cron: los crons se escriben a mano bajoModules/<X>/Crons/. (El generadormake jobsí existe, para jobs on-demand; ver Jobs (@job).)
Cadencia: helpers de Schedule¶
En vez de escribir cron raw, usa los helpers (tequio/Core/Cron). Cada uno devuelve una
expresión cron (string de 5 campos) que pasas a @cron_task(schedule=...):
| Helper | Cron |
|---|---|
every_minute() |
* * * * * |
every_minutes(n) |
*/n * * * * (n entre 1 y 59) |
every_five_minutes() |
*/5 * * * * |
every_ten_minutes() |
*/10 * * * * |
every_fifteen_minutes() |
*/15 * * * * |
every_thirty_minutes() |
*/30 * * * * |
hourly() |
0 * * * * |
hourly_at(min) |
<min> * * * * |
daily() |
0 0 * * * |
daily_at("HH:MM") |
<m> <h> * * * |
weekly() |
0 0 * * 0 |
monthly() |
0 0 1 * * |
cron("expr") |
escape hatch (raw) |
from tequio.Core.Cron import cron_task, daily_at, cron
@cron_task(name="backup", schedule=daily_at("02:30"), environments=["production"])
def backup() -> None: ...
@cron_task(name="reporte", schedule=cron("15 9 * * 1-5")) # 9:15 lun-vie
def reporte() -> None: ...
Cómo se disparan: schedule work vs schedule run¶
Hay dos modos, y ambos disparan los @cron_task que descubre el framework. Elige uno
(no corras los dos a la vez, o cada cron se despacharía doble):
A) schedule work (beat de Celery)¶
jornal schedule work arranca el beat (un proceso de larga duración que despierta y
despacha los crons a la cola). Corre una sola instancia (varios beats = crons duplicados):
El beat lee su beat_schedule, que arma el Registry al configurarse Celery
(collect_beat_schedule()). Ese calendario es la fusión de dos fuentes:
- Los
@cron_task(schedule=…)auto-descubiertos — el discovery importa todo el árbol de cada módulo, registra los crons (registered_crons()), y el Registry convierte la expresión cron de cada uno a uncelery.schedules.crontaby lo agenda. Sin escribir un soloKernel.py: defines el cron donde te quede y el beat lo agenda solo. - Los
beat_scheduledeclarados enConsole/Kernel.pyde cada módulo — la vía declarativa (estilo elKernelde Laravel), para quien prefiere ver el calendario centralizado en un archivo. Tiene precedencia: si unKernel.pydeclara una entrada con el mismo nombre que un@cron_taskdescubierto, gana la delKernel.py.
Arrancar el beat sí dispara crons según el
environmentsde cada uno: el beat solo agenda y despacha; el gate deenvironmentsy el lock anti-overlapping siguen viviendo en@cron_tasky se aplican al ejecutar la task en el worker. En dev normalmente no corres el beat: pruebas un cron a mano (mi_cron.delay()omi_cron()directo).
El conversor exige 5 campos (no agenda mal en silencio)
Al agendar un @cron_task, el Registry convierte su schedule a un crontab de Celery
mapeando los 5 campos posicionalmente (minute hour día-del-mes mes día-de-semana). Los
helpers de Schedule.py siempre devuelven 5 campos; el escape hatch cron("…") pasa el
string crudo. Si esa expresión no tiene exactamente 5 campos, el conversor falla con
un error claro en vez de agendar algo mal (faro: un cron mal agendado que nunca dispara
sería un fallo invisible).
B) schedule run desde el crontab del SO¶
jornal schedule run es el php artisan schedule:run: evalúa qué crons tocan este minuto
(con croniter) y los despacha; arranca, despacha en milisegundos y sale (stateless). En vez de
un beat de larga duración, lo llama el crontab del SO cada minuto:
En cada corrida, schedule run recorre los crons registrados (registered_crons()), aplica el
mismo gate de environments que el decorador, y para los que croniter.match(...) confirma que
tocan, los despacha a la cola (a su queue con apply_async, o a la cola por defecto con
.delay()), todo envuelto en broker_guard (error claro si redis no está).
Este modo lee los mismos
@cron_taskque el beat (víaregistered_crons()), pero no mira losConsole/Kernel.py: elKernel.pydeclarativo solo lo honra el beat. Si declaras crons enKernel.pyy disparas conschedule run, esos no se despachan; usa el beat.
En ambos modos, el worker (jornal queue work) es quien ejecuta la task despachada. Ver
Colas y tareas.
Los guards (en orden)¶
Cuando un cron se ejecuta, la wrapper aplica:
- Entorno — si
environmentsno está vacío yAPP_ENVno está en la lista, se omite (loguea y retorna sin ejecutar). - Logs — si hay
output, los logs de la corrida van alogs/cron_<output>.log. - Lock — si
without_overlapping, toma un lock Rediscron-lock:<name>; si ya está tomado (la corrida anterior sigue), se omite.
El invariante del lock¶
lock_timeout debe ser mayor que redis_visibility_timeout. Si fueran iguales,
expirarían juntos: Redis re-entregaría la task y un segundo worker tomaría el lock recién
liberado → doble ejecución. Por eso el default es visibility_timeout + 300s, y si pasas
un lock_timeout menor o igual, falla al decorar (no en runtime).
El lock vive en el store de
LOCK_URL(redis), independiente del broker: el broker puede ser RabbitMQ/SQS (que no tienen primitiva de lock), pero el lock siempre sale de un redis.
Flujo completo¶
1. @cron_task registra el cron (cadencia + guards) en registered_crons().
2a. beat (schedule work): el Registry fusiona los @cron_task + los Console/Kernel.py
en el beat_schedule; el beat despacha a la cola cuando toca el crontab celery.
2b. crontab del SO (schedule run): cada minuto → ¿toca (croniter)? ¿aplica el entorno?
→ despacha a la cola.
3. worker (queue work): ejecuta la wrapper (guards: entorno, lock, logs) → tu función.
Dos disparadores, una misma fuente: ambos parten de los
@cron_taskdescubiertos. El beat agrega la vía declarativa (Console/Kernel.py, con precedencia);schedule runno la mira.
El reloj (Clock)¶
Para los cálculos de fechas de negocio, tequio trae un reloj inyectable
(tequio/Core/Clock/Clock.py), el equivalente de java.time.Clock de Spring o de
Carbon::setTestNow() de Laravel. La idea: no llamar datetime.now() suelto en el dominio
(eso acopla al reloj de pared y no se puede congelar en un test), sino recibir un Clock y
pedirle la hora.
Es un Protocol con dos implementaciones:
| Implementación | Qué hace |
|---|---|
SystemClock |
Hora real en la zona de la app (TIMEZONE del .env), naive local (como guarda Eloquent/Carbon). |
FixedClock(moment) |
Reloj congelado: siempre devuelve moment. Para tests (= Carbon::setTestNow). |
Cómo se inyecta: a mano, instanciándolo donde se necesite (tequio no tiene un Unit of Work que lo cablee por ti). El único consumidor del core es
schedule run, que haceSystemClock().now()para saber qué minuto es. En tu dominio, recibe unClockpor parámetro/constructor y pásale unSystemClock()en producción.Para los timestamps de BD no uses esto: los pone la BD con
func.now()y la conexión ya corre en la zona de la app (verDatabase/Session.pyyTimestamp.py).
Congelar el tiempo en un test de cron¶
Como tu cron recibe el reloj (en vez de llamar datetime.now() adentro), un test lo congela
pasándole un FixedClock y verifica el comportamiento de un instante exacto, sin esperar ni
depender de la hora real:
from datetime import datetime
from tequio.Core.Clock import Clock, FixedClock
def expira_membresias(clock: Clock) -> int:
"""Marca como vencidas las membresías cuya fecha de corte ya pasó."""
hoy = clock.now()
# ... usa `hoy` para filtrar/decidir ...
return 0
def test_no_expira_nada_si_el_corte_es_manana() -> None:
# Congela el "ahora" en un instante exacto (= Carbon::setTestNow).
clock = FixedClock(datetime(2026, 6, 6, 8, 0, 0))
assert expira_membresias(clock) == 0
Y si lo que quieres es probar que un cron toca este minuto, croniter.match(schedule, now)
con un now que sacas del FixedClock te deja afirmar el agendado sin reloj de pared.