Ciclo de vida¶
milpa narra el ciclo de una petición HTTP. tequio no tiene HTTP: su ciclo de vida es
el del worker. Hay tres procesos distintos (CLI, worker, beat) y un cuarto camino, el
de un @job despachado. Esta página los recorre, cada paso anclado al código real.
Sin create_app() ni request
Aquí no hay una app de FastAPI ni un _lifespan: el equivalente al "bootstrap" es el
arranque del CLI y la configuración de Celery. La capa HTTP vive en
milpa.
1. Arranque del CLI (jornal)¶
El launcher jornal (en la raíz del proyecto) solo hace from tequio.Core.Console.Cli
import run y lo llama. El kernel de consola vive en tequio/Core/Console/Cli.py. Qué
pasa, en orden:
run()configura el logging:setup_logging()arma los sinks de Loguru (stderr conciso + archivo rotativo;diagnosesolo enlocal). Ver Logging.- Discovery de comandos (a nivel de import del módulo
Cli):import_submodules("tequio.Core.Console.Commands")importa los commands del framework (queue work,schedule work,schedule run,db,make,migrate). Al importarse, sus@console_commandse registran.import_submodules(settings.app_commands_package)hace lo mismo con los commands generales de tu proyecto.for group, sub_app in iter_cli_apps()recorre los módulos presentes (app/Modules/*), importa cadaConsole/Commands/(más@console_commandque se registran) y arma untyper.Typer()por grupo, montándolo como sub-app.
app()despacha el subcomando que pediste. Si algo truena, el borde de error del CLI rinde unDomainErrorcomo mensaje limpio (sin traceback) y un error inesperado como "Error interno" + traceback completo al log.
Discovery dinámico, no imports estáticos
Cli vive en Core pero no importa Modules de forma estática: el discovery es
por rutas en string (import_submodules + iter_cli_apps). Por eso no rompe el
contrato "Core ↛ Modules" de import-linter.
2. El worker de Celery (queue work)¶
El worker es quien ejecuta las tasks en background. Por sí solo no agenda nada: solo procesa lo que se le despacha.
python jornal queue work # arranca el worker (bloquea hasta Ctrl-C)
python jornal queue work --queue exports # consume solo esa(s) cola(s)
Qué pasa al arrancar (tequio/Core/CeleryApp/CeleryApp.py):
- Se crea la
Celery(...)conbroker=settings.effective_broker_urlybackend=settings.effective_result_backend(None por default; los crons son fire-and-forget).setup_logging()también corre aquí, para unificar el log del worker. visibility_timeoutse fija solo si el broker es redis/SQS (broker_uses_visibility_timeout).- Al terminar Celery su configuración, el hook
on_after_configuredispara el discovery diferido:import_all_tasks()importaJobs/,Crons/yConsole/Commands/de cada módulo presente → registra sus@job/@cron_taskcomo tasks de Celery.import_submodules("tequio.Core.Events")registra la task que corre los observers encolados.sender.conf.beat_schedule = collect_beat_schedule()arma el calendario (lo usa el beat, abajo).
Registrar ≠ disparar
Que un módulo esté presente no dispara nada: registrar tasks solo las vuelve ejecutables bajo demanda. El único disparo automático es el beat.
3. El beat / scheduler (schedule work y schedule run)¶
El beat es el despertador: cada cierto tiempo despacha los crons al worker. Debe correr una sola instancia.
El beat_schedule lo armó el discovery diferido (paso 2) con collect_beat_schedule(), que
fusiona dos fuentes: (1) los @cron_task(schedule=…) auto-descubiertos —su expresión cron
convertida a un crontab de Celery— y (2) los beat_schedule declarados en el
Console/Kernel.py de cada módulo (la vía declarativa, con precedencia en colisiones de
nombre). Así el beat agenda los @cron_task sin que escribas un Kernel.py.
Como alternativa al beat de larga duración, está schedule run —el php artisan
schedule:run— que el crontab del SO dispara cada minuto (lee los mismos @cron_task, pero
no los Console/Kernel.py):
En cada corrida (ScheduleRunCommand.py):
setup_logging(); tomaSystemClock().now()al minuto exacto (en la zona de la app).- Recorre
registered_crons(). Para cada uno:- Gate de entorno: si
cron.environmentsno incluyeAPP_ENV, lo salta (mismo gate que@cron_task). - Si
croniter.match(cron.schedule, now)(toca este minuto), lo despacha dentro debroker_guard()(error claro si redis no está) a su cola o a la default.
- Gate de entorno: si
- Loguea cuántos despachó.
Por qué el despertador va aparte del worker
queue work no embebe el scheduler (-B) a propósito: así una laptop de
desarrollo nunca dispara crons sola. En prod, beat corre como su propio servicio.
4. El ciclo de un @job despachado¶
Este es el camino que recorre un trabajo on-demand, desde tu código hasta el worker.
from app.Modules.Demo.Jobs.ExportNotesJob import export_user_notes
export_user_notes.dispatch() # encola; NO bloquea al llamador
Paso a paso:
.dispatch(...)(tequio/Core/Jobs/Jobs.py) envuelve el encolado enbroker_guard(): si el broker no responde, traduce el error crudo de kombu/redis en unQueueUnavailableError(503) con mensaje accionable, en vez de un stacktrace mudo. Luego hacetask.apply_async(...)a la cola del job (aquíexports).- El worker (
queue work) toma la task de la cola y la ejecuta. - Si el
@jobdeclaróretry_for=(...)(excepciones transitorias: timeouts, desconexiones), aplica laretry_policy: reintenta con backoff exponencial + jitter, con los topes deTASK_MAX_RETRIES/TASK_RETRY_BACKOFF/TASK_RETRY_BACKOFF_MAX(o lo que se pase a mano). Sinretry_for, es fire-and-forget.
job ≠ cron
@job(Core/Jobs): lo disparas tú (.dispatch()); reintentos opt-in; sin lock ni env-gating. Ver Jobs.@cron_task(Core/Cron): lo agenda el scheduler; trae lock anti-overlapping y env-gating; nunca reintenta (se re-agenda solo). Ver Programación (cron).
Flujo completo (de un vistazo)¶
CLI: jornal <cmd> → setup_logging → import_submodules + iter_cli_apps → app()
worker: queue work → Celery config → on_after_configure: import_all_tasks + beat_schedule
beat: schedule work → beat_schedule = @cron_task + Console/Kernel.py → despacha al worker
cron: schedule run → croniter.match + gate environments → broker_guard → cola → worker
job: .dispatch() → broker_guard → apply_async → worker → (retry_policy si retry_for)