534 lines
19 KiB
Python
534 lines
19 KiB
Python
from fastapi import APIRouter, HTTPException, BackgroundTasks
|
|
from schemas.pedimentoSchema import PedimentoRequest
|
|
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
|
|
from fastapi.responses import JSONResponse
|
|
from celery_app import celery_app
|
|
from tasks import pedimento_completo_task
|
|
import logging
|
|
from typing import Dict, Any, List
|
|
import uuid
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
@router.post("/async/services/pedimento_completo")
|
|
async def async_get_pedimento_completo(request: ServiceRemesaSchema):
|
|
"""
|
|
Agenda la tarea de obtener pedimento completo de forma asíncrona.
|
|
|
|
Este endpoint:
|
|
1. Valida los datos de entrada
|
|
2. Agenda la tarea en Celery
|
|
3. Retorna inmediatamente el ID de la tarea para seguimiento
|
|
|
|
Args:
|
|
request: ServiceRemesaSchema con pedimento y organización
|
|
|
|
Returns:
|
|
JSONResponse con task_id para consultar el estado de la tarea
|
|
|
|
Raises:
|
|
HTTPException: En caso de errores de validación
|
|
"""
|
|
try:
|
|
# Convertir request a diccionario
|
|
request_data = request.model_dump()
|
|
|
|
logger.info(f"Agendando tarea de pedimento completo - Pedimento: {request_data['pedimento']}")
|
|
|
|
# Agendar la tarea en Celery
|
|
task = pedimento_completo_task.delay(request_data)
|
|
|
|
# Crear respuesta inmediata
|
|
response_data = {
|
|
"success": True,
|
|
"message": "Tarea agendada exitosamente. La consulta del pedimento completo se está procesando en segundo plano.",
|
|
"task_id": task.id,
|
|
"pedimento": request_data['pedimento'],
|
|
"organizacion": request_data.get('organizacion'),
|
|
"status": "PENDING",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"estimated_completion": "2-5 minutos",
|
|
"check_status_url": f"/async/task-status/{task.id}"
|
|
}
|
|
|
|
logger.info(f"Tarea agendada exitosamente - Task ID: {task.id}")
|
|
return JSONResponse(content=response_data, status_code=202)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al agendar tarea de pedimento completo: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error al agendar la tarea: {str(e)}"
|
|
)
|
|
|
|
@router.post("/async/services/pedimento_completo/multiple")
|
|
async def async_multi_pedimento_completo(request: MultiPedimentoCompletoSchema):
|
|
"""
|
|
Agenda la tarea de obtener pedimentos completos de forma asíncrona para MÚLTIPLES pedimentos.
|
|
|
|
Args:
|
|
request: MultiPedimentoCompletoSchema con lista de pedimentos y organización
|
|
|
|
Returns:
|
|
JSONResponse con task_id para consultar el estado de la tarea
|
|
|
|
Raises:
|
|
HTTPException: En caso de errores de validación
|
|
"""
|
|
try:
|
|
pedimentos_ids = request.pedimentos
|
|
organizacion = request.organizacion
|
|
|
|
logger.info(f"Agendando tarea para {len(pedimentos_ids)} pedimentos completos")
|
|
|
|
# Agendar la tarea en Celery
|
|
from tasks import multi_pedimento_completo_task
|
|
task = multi_pedimento_completo_task.delay(pedimentos_ids, organizacion)
|
|
|
|
# Crear respuesta inmediata (EXACTAMENTE igual a tus otras respuestas)
|
|
response_data = {
|
|
"success": True,
|
|
"message": f"Tarea agendada exitosamente para {len(pedimentos_ids)} pedimentos. La consulta de pedimentos completos se está procesando en segundo plano.",
|
|
"task_id": task.id,
|
|
"total_pedimentos": len(pedimentos_ids),
|
|
"pedimentos": pedimentos_ids,
|
|
"organizacion": organizacion,
|
|
"status": "PENDING",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"estimated_completion": f"{len(pedimentos_ids) * 2}-{len(pedimentos_ids) * 5} minutos",
|
|
"check_status_url": f"/async/task-status/{task.id}"
|
|
}
|
|
|
|
logger.info(f"Tarea múltiple agendada exitosamente - Task ID: {task.id}")
|
|
return JSONResponse(content=response_data, status_code=202)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al agendar tarea múltiple de pedimento completo: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error al agendar la tarea: {str(e)}"
|
|
)
|
|
|
|
@router.get("/async/task-status/{task_id}")
|
|
async def get_task_status(task_id: str):
|
|
"""
|
|
Consulta el estado de una tarea agendada.
|
|
|
|
Args:
|
|
task_id: ID de la tarea a consultar
|
|
|
|
Returns:
|
|
JSONResponse con el estado actual de la tarea
|
|
|
|
Raises:
|
|
HTTPException: Si la tarea no existe o hay errores
|
|
"""
|
|
try:
|
|
# Obtener el resultado de la tarea desde Celery
|
|
task_result = celery_app.AsyncResult(task_id)
|
|
|
|
if not task_result:
|
|
raise HTTPException(status_code=404, detail="Tarea no encontrada")
|
|
|
|
# Preparar respuesta según el estado
|
|
response_data = {
|
|
"task_id": task_id,
|
|
"status": task_result.status,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
if task_result.status == 'PENDING':
|
|
response_data.update({
|
|
"message": "La tarea está pendiente de procesamiento",
|
|
"progress": 0
|
|
})
|
|
elif task_result.status == 'PROGRESS':
|
|
meta = task_result.info
|
|
response_data.update({
|
|
"message": f"Procesando: {meta.get('status', 'En progreso')}",
|
|
"progress": meta.get('progress', 50),
|
|
"current_step": meta.get('status')
|
|
})
|
|
elif task_result.status == 'SUCCESS':
|
|
response_data.update({
|
|
"message": "Tarea completada exitosamente",
|
|
"progress": 100,
|
|
"result": task_result.result
|
|
})
|
|
elif task_result.status == 'FAILURE':
|
|
response_data.update({
|
|
"message": f"Error en la tarea: {str(task_result.info)}",
|
|
"progress": 0,
|
|
"error": str(task_result.info)
|
|
})
|
|
else:
|
|
response_data.update({
|
|
"message": f"Estado desconocido: {task_result.status}",
|
|
"progress": 0
|
|
})
|
|
|
|
# Determinar código de estado HTTP
|
|
status_code = 200
|
|
if task_result.status == 'FAILURE':
|
|
status_code = 500
|
|
elif task_result.status in ['PENDING', 'PROGRESS']:
|
|
status_code = 202
|
|
|
|
return JSONResponse(content=response_data, status_code=status_code)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al consultar estado de tarea {task_id}: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error al consultar el estado de la tarea: {str(e)}"
|
|
)
|
|
|
|
@router.get("/async/tasks/active")
|
|
async def get_active_tasks():
|
|
"""
|
|
Lista todas las tareas activas en el sistema.
|
|
|
|
Returns:
|
|
JSONResponse con la lista de tareas activas
|
|
"""
|
|
try:
|
|
# Obtener tareas activas desde Celery
|
|
inspect = celery_app.control.inspect()
|
|
active_tasks = inspect.active()
|
|
scheduled_tasks = inspect.scheduled()
|
|
|
|
response_data = {
|
|
"active_tasks": active_tasks or {},
|
|
"scheduled_tasks": scheduled_tasks or {},
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
return JSONResponse(content=response_data, status_code=200)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al obtener tareas activas: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error al obtener tareas activas: {str(e)}"
|
|
)
|
|
|
|
@router.delete("/async/task/{task_id}")
|
|
async def cancel_task(task_id: str):
|
|
"""
|
|
Cancela una tarea agendada.
|
|
|
|
Args:
|
|
task_id: ID de la tarea a cancelar
|
|
|
|
Returns:
|
|
JSONResponse confirmando la cancelación
|
|
|
|
Raises:
|
|
HTTPException: Si hay errores al cancelar
|
|
"""
|
|
try:
|
|
# Revocar la tarea
|
|
celery_app.control.revoke(task_id, terminate=True)
|
|
|
|
response_data = {
|
|
"success": True,
|
|
"message": f"Tarea {task_id} cancelada exitosamente",
|
|
"task_id": task_id,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
logger.info(f"Tarea cancelada: {task_id}")
|
|
return JSONResponse(content=response_data, status_code=200)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al cancelar tarea {task_id}: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error al cancelar la tarea: {str(e)}"
|
|
)
|
|
|
|
@router.post("/async/services/partidas")
|
|
async def async_get_partidas(request: ServiceRemesaSchema):
|
|
"""
|
|
Agenda una tarea asíncrona para obtener las partidas de un pedimento.
|
|
|
|
Args:
|
|
request: Datos del pedimento y organización
|
|
|
|
Returns:
|
|
JSONResponse con el task_id y información de la tarea agendada
|
|
|
|
Raises:
|
|
HTTPException: Si hay errores en la validación o al agendar la tarea
|
|
"""
|
|
try:
|
|
logger.info(f"Solicitando consulta asíncrona de partidas - Pedimento: {request.pedimento}")
|
|
|
|
# Preparar datos para la tarea
|
|
request_data = request.model_dump()
|
|
|
|
# Agendar la tarea en Celery
|
|
from tasks import partidas_task
|
|
task = partidas_task.delay(**request_data)
|
|
|
|
# Preparar respuesta
|
|
response_data = {
|
|
"success": True,
|
|
"message": "Tarea agendada exitosamente. La consulta de partidas se está procesando en segundo plano.",
|
|
"task_id": task.id,
|
|
"pedimento": request_data['pedimento'],
|
|
"organizacion": request_data.get('organizacion'),
|
|
"status": "PENDING",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"estimated_completion": "1-3 minutos",
|
|
"check_status_url": f"/async/task-status/{task.id}"
|
|
}
|
|
|
|
logger.info(f"Tarea de partidas agendada exitosamente - Task ID: {task.id}")
|
|
return JSONResponse(content=response_data, status_code=202)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al agendar tarea de partidas: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error al agendar la tarea: {str(e)}"
|
|
)
|
|
|
|
@router.post("/async/services/remesas")
|
|
async def async_get_remesas(request: ServiceRemesaSchema):
|
|
"""
|
|
Agenda una tarea asíncrona para obtener las remesas de un pedimento.
|
|
|
|
Args:
|
|
request: Datos del pedimento y organización
|
|
|
|
Returns:
|
|
JSONResponse con el task_id y información de la tarea agendada
|
|
|
|
Raises:
|
|
HTTPException: Si hay errores en la validación o al agendar la tarea
|
|
"""
|
|
try:
|
|
logger.info(f"Solicitando consulta asíncrona de remesas - Pedimento: {request.pedimento}")
|
|
|
|
# Preparar datos para la tarea
|
|
request_data = request.model_dump()
|
|
|
|
# Agendar la tarea en Celery
|
|
from tasks import remesas_task
|
|
task = remesas_task.delay(**request_data)
|
|
|
|
# Preparar respuesta
|
|
response_data = {
|
|
"success": True,
|
|
"message": "Tarea agendada exitosamente. La consulta de remesas se está procesando en segundo plano.",
|
|
"task_id": task.id,
|
|
"pedimento": request_data['pedimento'],
|
|
"organizacion": request_data.get('organizacion'),
|
|
"status": "PENDING",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"estimated_completion": "1-3 minutos",
|
|
"check_status_url": f"/async/task-status/{task.id}"
|
|
}
|
|
|
|
logger.info(f"Tarea de remesas agendada exitosamente - Task ID: {task.id}")
|
|
return JSONResponse(content=response_data, status_code=202)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al agendar tarea de remesas: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error al agendar la tarea: {str(e)}"
|
|
)
|
|
|
|
@router.post("/async/services/acuse")
|
|
async def async_get_acuse(request: ServiceRemesaSchema):
|
|
"""
|
|
Agenda una tarea asíncrona para obtener el acuse de un edocument.
|
|
|
|
Args:
|
|
request: Datos del pedimento y organización
|
|
|
|
Returns:
|
|
JSONResponse con el task_id y información de la tarea agendada
|
|
|
|
Raises:
|
|
HTTPException: Si hay errores en la validación o al agendar la tarea
|
|
"""
|
|
try:
|
|
logger.info(f"Solicitando consulta asíncrona de acuse - Pedimento: {request.pedimento}")
|
|
|
|
# Preparar datos para la tarea
|
|
request_data = request.model_dump()
|
|
|
|
# Agendar la tarea en Celery
|
|
from tasks import acuse_task
|
|
task = acuse_task.delay(**request_data)
|
|
|
|
# Preparar respuesta
|
|
response_data = {
|
|
"success": True,
|
|
"message": "Tarea agendada exitosamente. La consulta de acuse se está procesando en segundo plano.",
|
|
"task_id": task.id,
|
|
"pedimento": request_data['pedimento'],
|
|
"organizacion": request_data.get('organizacion'),
|
|
"status": "PENDING",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"estimated_completion": "1-3 minutos",
|
|
"check_status_url": f"/async/task-status/{task.id}"
|
|
}
|
|
|
|
logger.info(f"Tarea de acuse agendada exitosamente - Task ID: {task.id}")
|
|
return JSONResponse(content=response_data, status_code=202)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al agendar tarea de acuse: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error al agendar la tarea: {str(e)}"
|
|
)
|
|
|
|
@router.post("/async/services/edocument")
|
|
async def async_get_edocument(request: ServiceRemesaSchema):
|
|
"""
|
|
Agenda una tarea asíncrona para obtener edocuments de un pedimento.
|
|
|
|
Args:
|
|
request: Datos del pedimento y organización
|
|
|
|
Returns:
|
|
JSONResponse con el task_id y información de la tarea agendada
|
|
|
|
Raises:
|
|
HTTPException: Si hay errores en la validación o al agendar la tarea
|
|
"""
|
|
try:
|
|
logger.info(f"Solicitando consulta asíncrona de edocuments - Pedimento: {request.pedimento}")
|
|
|
|
# Preparar datos para la tarea
|
|
request_data = request.model_dump()
|
|
|
|
# Agendar la tarea en Celery
|
|
from tasks import edocument_task
|
|
task = edocument_task.delay(**request_data)
|
|
|
|
# Preparar respuesta
|
|
response_data = {
|
|
"success": True,
|
|
"message": "Tarea agendada exitosamente. La consulta de edocuments se está procesando en segundo plano.",
|
|
"task_id": task.id,
|
|
"pedimento": request_data['pedimento'],
|
|
"organizacion": request_data.get('organizacion'),
|
|
"status": "PENDING",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"estimated_completion": "2-4 minutos",
|
|
"check_status_url": f"/async/task-status/{task.id}"
|
|
}
|
|
|
|
logger.info(f"Tarea de edocuments agendada exitosamente - Task ID: {task.id}")
|
|
return JSONResponse(content=response_data, status_code=202)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al agendar tarea de edocuments: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error al agendar la tarea: {str(e)}"
|
|
)
|
|
|
|
@router.post("/async/services/coves")
|
|
async def async_get_coves(request: ServiceRemesaSchema):
|
|
"""
|
|
Agenda una tarea asíncrona para obtener coves de un pedimento.
|
|
|
|
Args:
|
|
request: Datos del pedimento y organización
|
|
|
|
Returns:
|
|
JSONResponse con el task_id y información de la tarea agendada
|
|
|
|
Raises:
|
|
HTTPException: Si hay errores en la validación o al agendar la tarea
|
|
"""
|
|
try:
|
|
logger.info(f"Solicitando consulta asíncrona de coves - Pedimento: {request.pedimento}")
|
|
|
|
# Preparar datos para la tarea
|
|
request_data = request.model_dump()
|
|
|
|
# Agendar la tarea en Celery
|
|
from tasks import coves_task
|
|
task = coves_task.delay(**request_data)
|
|
|
|
# Preparar respuesta
|
|
response_data = {
|
|
"success": True,
|
|
"message": "Tarea agendada exitosamente. La consulta de coves se está procesando en segundo plano.",
|
|
"task_id": task.id,
|
|
"pedimento": request_data['pedimento'],
|
|
"organizacion": request_data.get('organizacion'),
|
|
"status": "PENDING",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"estimated_completion": "1-2 minutos",
|
|
"check_status_url": f"/async/task-status/{task.id}"
|
|
}
|
|
|
|
logger.info(f"Tarea de coves agendada exitosamente - Task ID: {task.id}")
|
|
return JSONResponse(content=response_data, status_code=202)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al agendar tarea de coves: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error al agendar la tarea: {str(e)}"
|
|
)
|
|
|
|
@router.post("/async/services/acuse-cove")
|
|
async def async_get_acuse_cove(request: ServiceRemesaSchema):
|
|
"""
|
|
Agenda una tarea asíncrona para obtener el acuse de un COVE.
|
|
|
|
Args:
|
|
request: Datos del pedimento y organización
|
|
|
|
Returns:
|
|
JSONResponse con el task_id y información de la tarea agendada
|
|
|
|
Raises:
|
|
HTTPException: Si hay errores en la validación o al agendar la tarea
|
|
"""
|
|
try:
|
|
logger.info(f"Solicitando consulta asíncrona de acuse COVE - Pedimento: {request.pedimento}")
|
|
|
|
# Preparar datos para la tarea
|
|
request_data = request.model_dump()
|
|
|
|
# Agendar la tarea en Celery
|
|
from tasks import acuse_cove_task
|
|
task = acuse_cove_task.delay(**request_data)
|
|
|
|
# Preparar respuesta
|
|
response_data = {
|
|
"success": True,
|
|
"message": "Tarea agendada exitosamente. La consulta de acuse COVE se está procesando en segundo plano.",
|
|
"task_id": task.id,
|
|
"pedimento": request_data['pedimento'],
|
|
"organizacion": request_data.get('organizacion'),
|
|
"status": "PENDING",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"estimated_completion": "1-3 minutos",
|
|
"check_status_url": f"/async/task-status/{task.id}"
|
|
}
|
|
|
|
logger.info(f"Tarea de acuse COVE agendada exitosamente - Task ID: {task.id}")
|
|
return JSONResponse(content=response_data, status_code=202)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al agendar tarea de acuse COVE: {e}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Error al agendar la tarea: {str(e)}"
|
|
)
|