Repositorios y transacciones¶
milpa adopta un modelo de persistencia estilo Spring Data / JPA:
- Repositorios tipados (
Repository[Model, Id]) con CRUD heredado — solo queries. - Escrituras en servicios
@transactional(commit/rollback automático). - Lecturas con
@auto_session(funcionan con o sin scope abierto). - La sesión es ambiente (un contextvar), no se inyecta por constructor.
La división de responsabilidades: el Repository consulta, el Service orquesta y transacciona.
La sesión ambiente¶
La sesión vive en un contextvar scoped por request/task (como el EntityManager
thread-bound de Spring), no es global de proceso. Tres primitivos la gobiernan
(milpa/Core/Database/Transactional.py):
| Primitivo | Qué hace | Cuándo usarlo |
|---|---|---|
current_session() |
Devuelve la sesión del scope; error claro si no hay. | dentro de un repo/servicio |
session_scope() |
Context manager: abre/cierra la sesión; commits manuales. | flujos con varios commits (lotes) |
@transactional |
Decorador: abre + commit on success / rollback on exception. | servicios de una transacción |
@auto_session |
Decorador: usa la sesión si hay; si no, abre una efímera (no commitea). | lecturas / queries de repo |
Todos son join-or-create (propagación REQUIRED): si ya hay sesión en el contextvar
(llamada anidada), la reutilizan y no la cierran/commitean — eso lo hace quien la
abrió. Esto hace que anidar servicios @transactional produzca una sola transacción.
Definir un repositorio¶
# app/Models/Repositories/InvoiceRepository.py
from sqlalchemy import select
from milpa.Core.Database import Repository
from app.Models.Invoice import Invoice
class InvoiceRepository(Repository[Invoice, int]):
model = Invoice
def find_by_numero(self, numero: str) -> Invoice | None:
return self.session.execute(
select(Invoice).where(Invoice.numero == numero)
).scalars().first()
model = Invoicedeclara qué entidad gestiona.self.sessionencapsulacurrent_session()— úsalo en tus queries custom; no llamescurrent_session()a mano.- Las queries custom (métodos públicos) se envuelven automáticamente con
@auto_session(vía__init_subclass__): funcionan con o sin scope abierto. No pones el decorador a mano.
CRUD heredado¶
| Método | Firma | Decorador |
|---|---|---|
get |
get(entity_id: IdT) -> ModelT \| None |
@auto_session |
find_or_fail |
find_or_fail(entity_id: IdT) -> ModelT (lanza ResourceNotFoundError si no existe) |
@auto_session |
all |
all() -> Sequence[ModelT] |
@auto_session |
add |
add(entity: ModelT) -> ModelT (hace flush() para asignar PK) |
@transactional |
first_or_create |
first_or_create(where: dict, values: dict \| None = None) -> ModelT |
@transactional |
delete |
delete(entity: ModelT) -> None (lógico si hereda SoftDeleteMixin) |
@transactional |
repo = InvoiceRepository()
inv = repo.get(7) # abre sesión efímera si no hay scope; None si no existe
inv = repo.find_or_fail(7) # = findOrFail de Eloquent: 404 (ResourceNotFoundError) si falta
todas = repo.all() # filtra borradas lógicas (SoftDeleteMixin)
inv2 = repo.find_by_numero("INV-001")
# firstOrCreate: busca por `where`; si no hay, crea con where + values (extras solo-al-crear)
cliente = ClienteRepository().first_or_create({"rfc": "XAXX010101000"}, {"nombre": "Público"})
find_or_failevita elif x is None: raiserepetido en cada service: el handler global convierteResourceNotFoundErroren un404 {error_code, message, details}.first_or_createes idempotente porwhere: devuelve el existente o crea uno nuevo (con su PK ya asignada víaflush). Como es@transactional, persiste o se une a la tx externa.
Limitación honesta: no derivamos queries del nombre del método (el
findByXde Spring). En Python sería frágil. Las queries custom llevan cuerpo explícito.
Escribir: servicios @transactional¶
# app/Modules/Billing/Services/InvoiceService.py
from decimal import Decimal
from milpa.Core.Database import transactional
from app.Models.Invoice import Invoice
from app.Models.Repositories.InvoiceRepository import InvoiceRepository
class InvoiceService:
def __init__(self) -> None:
self._invoices = InvoiceRepository()
@transactional
def crear(self, numero: str, monto: Decimal) -> Invoice:
return self._invoices.add(Invoice(numero=numero, monto=monto))
@transactional
def marcar_pagada(self, invoice_id: int) -> None:
inv = self._invoices.get(invoice_id) # se une a esta transacción
if inv is None:
raise ValueError("no existe")
inv.pagada = True # cambio tracked; flush+commit al salir
- Cada método
@transactionalabre sesión, commitea al terminar, o hace rollback si lanza. - Las llamadas a repos dentro (
add,get, queries) se unen a esa transacción. - No necesitas
session.add()para objetos ya cargados: SQLAlchemy trackea los cambios.
Transacciones compuestas (anidadas)¶
@transactional
def crear_pedido_con_factura(self, ...) -> None:
self._invoices.crear(...) # @transactional → se une, no commitea aparte
self._stock.descontar(...) # @transactional → se une
# UN solo commit al final; si cualquiera lanza → rollback de TODO
Leer fuera de una transacción¶
Un endpoint que lee y devuelve JSON no necesita @transactional: repo.get() abre una
sesión efímera (gracias a @auto_session). Pero convierte a DTO dentro del scope:
@router.get("/invoices/{invoice_id}", response_model=InvoiceDTO)
def get_invoice(invoice_id: int) -> InvoiceDTO:
inv = InvoiceRepository().get(invoice_id)
if inv is None:
raise HTTPException(status_code=404)
return InvoiceDTO.model_validate(inv) # serializa AQUÍ, con la sesión viva
Control manual: session_scope¶
Para flujos con varios checkpoints de commit (procesos por lotes):
from milpa.Core.Database import session_scope
def procesar_lote(ids: list[int]) -> None:
repo = InvoiceRepository()
with session_scope() as session:
for i, inv_id in enumerate(ids):
inv = repo.get(inv_id) # se une al scope
if inv:
inv.procesada = True
if (i + 1) % 100 == 0:
session.commit() # checkpoint cada 100
session.commit() # final
Aquí los commits son tuyos (a diferencia de @transactional). Es el patrón para
preservar invariantes "persiste el paso N antes de empezar el N+1".
N+1 y DetachedInstanceError¶
El error clásico: leer una entidad, cerrar la sesión, y luego acceder a una relación
lazy → DetachedInstanceError. Dos defensas:
- Eager load dentro del scope con
selectinload:
def get_con_items(self, invoice_id: int) -> Invoice | None:
return self.session.execute(
select(Invoice).where(Invoice.id == invoice_id)
.options(selectinload(Invoice.items))
).scalars().first()
- Devolver un DTO (no la entidad): serializa todo lo que necesitas mientras la sesión está abierta, y deja que la entidad muera con el scope.
Encapsula las lecturas que navegan un grafo en un método (auto_session) que carga con
selectinload y devuelve el DTO — así la entidad nunca "escapa" del scope.
Resumen del flujo¶
Controller
→ Service (@transactional: abre sesión, commitea/rollback)
→ Repository (get/add/query: se une a la transacción)
→ SQLAlchemy (SessionLocal sobre el engine; zona fijada por conexión)
→ Controller convierte a DTO (dentro del scope) → JSON
Volver al índice.