Programación de tareas (cron)¶
milpa reproduce el scheduler de Laravel sobre Celery: declaras la cadencia pegada al
job con @cron_task, y un disparador (schedule run, llamado por el crontab del SO)
despacha lo que toca.
Declarar un cron¶
Pon el job bajo Modules/<X>/Jobs/ y decóralo:
# app/Modules/Example/Jobs/SendReminders.py
from loguru import logger
from milpa.Core.Cron import cron_task, every_five_minutes
@cron_task(
name="send_reminders",
schedule=every_five_minutes(),
environments=["qa", "production"],
without_overlapping=True,
output="reminders",
queue="emails",
)
def send_reminders() -> None:
logger.info("Enviando recordatorios...")
# ...
Se registra al importarse. (El Registry importa Jobs/ de cada módulo.)
El decorador @cron_task¶
def cron_task(
*,
name: str,
schedule: str | None = None,
queue: str | None = None,
environments: Sequence[str] | None = None,
without_overlapping: bool = False,
output: str | None = None,
lock_timeout: int | None = None,
**celery_options: Any,
) -> Callable[[DecoratedTask], Any]
| Parámetro | Default | Semántica | Laravel |
|---|---|---|---|
name |
(obligatorio) | Identificador único de la task. | — |
schedule |
None |
Expresión cron (5 campos). Si es None, la task existe pero no se agenda. |
->cron() |
queue |
None |
Cola de Celery; None = cola por defecto. |
->onQueue() |
environments |
None → todos |
Lista de APP_ENV donde corre; si app_env no está, se omite. |
->environments() |
without_overlapping |
False |
Lock en Redis; si la corrida previa sigue, se omite esta. | ->withoutOverlapping() |
output |
None |
Rutea los logs de la corrida a logs/cron_<output>.log (rotación diaria, 14 días). |
->appendOutputTo() |
lock_timeout |
derivado | Timeout del lock. Por defecto visibility_timeout + 300s. |
— |
**celery_options |
— | Cualquier opción extra de Celery (rate_limit, etc.). |
— |
A diferencia de
@console_command,@cron_tasksí envuelve la función: la wrapper ejecuta los guards (entorno, lock, logs) antes de tu código, y devuelve una task de Celery. Puedes llamarla con.delay()o directotask().
Cadencia: helpers de Schedule¶
En vez de escribir cron raw, usa los helpers (milpa/Core/Cron):
| Helper | Cron | Laravel |
|---|---|---|
every_minute() |
* * * * * |
everyMinute() |
every_minutes(n) |
*/n * * * * |
everyNMinutes() |
every_five_minutes() |
*/5 * * * * |
everyFiveMinutes() |
every_ten_minutes() |
*/10 * * * * |
everyTenMinutes() |
every_fifteen_minutes() |
*/15 * * * * |
everyFifteenMinutes() |
every_thirty_minutes() |
*/30 * * * * |
everyThirtyMinutes() |
hourly() |
0 * * * * |
hourly() |
hourly_at(min) |
<min> * * * * |
hourlyAt() |
daily() |
0 0 * * * |
daily() |
daily_at("HH:MM") |
<m> <h> * * * |
dailyAt() |
weekly() |
0 0 * * 0 |
weekly() |
monthly() |
0 0 1 * * |
monthly() |
cron("expr") |
escape hatch (raw) | cron() |
from milpa.Core.Cron import cron_task, daily_at, hourly_at, cron
@cron_task(name="backup", schedule=daily_at("02:30"), environments=["production"])
def backup() -> None: ...
@cron_task(name="reporte", schedule=cron("15 9 * * 1-5")) # 9:15 lun-vie
def reporte() -> None: ...
Cómo se disparan: schedule run vs schedule work¶
Hay dos modos. Elige uno:
A) schedule run desde el crontab del SO (recomendado)¶
jornal schedule run evalúa qué crons tocan este minuto y los despacha; arranca,
despacha en milisegundos y sale (stateless). Lo llamas cada minuto desde el crontab:
B) schedule work (beat de Celery)¶
jornal schedule work arranca el beat (un proceso de larga duración que dispara los
crons). Corre una sola instancia (varios beats = crons duplicados):
Arrancar el beat sí dispara crons según el
environmentsde cada uno. En dev normalmente no lo corres: pruebas un job a mano (mi_job.delay()).
En ambos casos, el worker (jornal queue work) es quien ejecuta el job despachado.
Los guards (en orden)¶
Cuando un cron se ejecuta, la wrapper aplica:
- Entorno — si
environmentsno está vacío yAPP_ENVno está en la lista, se omite (loguea y retorna sin ejecutar). - Logs — si hay
output, los logs de la corrida van alogs/cron_<output>.log. - Lock — si
without_overlapping, toma un lock Rediscron-lock:<name>; si ya está tomado (la corrida anterior sigue), se omite.
El lock store debe estar disponible (error accionable, sin fallback mágico)¶
without_overlapping exige un lock store (redis). El default es un redis LOCAL
(LOCK_URL vacío). La conexión es perezosa: ocurre al adquirir el lock, dentro del
worker (no en el borde CLI). Si en ese momento el store no responde, la corrida
no se ejecuta a ciegas: truena con un error que te dice exactamente qué configurar
—sin caer al broker por su cuenta (nada de fallback mágico):
cron 'send_reminders': without_overlapping necesita el LOCK store (redis) y no se pudo
conectar a redis://127.0.0.1:6379/0. El default es un redis LOCAL; en docker configúralo
con LOCK_URL=redis://<host>:6379/0 (apuntando al servicio redis de tu compose), o quita
without_overlapping de este cron.
El caso típico es docker: dentro del contenedor no hay un redis local, así que hay que
apuntar LOCK_URL al servicio del compose (p. ej. LOCK_URL=redis://redis:6379/0). El
store de locks es independiente del BROKER_URL: puedes tener el broker en RabbitMQ y el
lock en redis.
El invariante del lock¶
lock_timeout debe ser mayor que redis_visibility_timeout. Si fueran iguales,
expirarían juntos: Redis re-entregaría la task y un segundo worker tomaría el lock recién
liberado → doble ejecución. Por eso el default es visibility_timeout + 300s, y si
pasas un lock_timeout menor o igual, falla al decorar (no en runtime).
Flujo completo¶
1. @cron_task registra el cron (cadencia + guards).
2. crontab del SO: cada minuto → jornal schedule run
3. schedule run: ¿toca este minuto (croniter)? ¿aplica el entorno? → despacha a la cola
4. worker (queue work): ejecuta la wrapper (guards) → tu función