Files
microservice/api/api_v2/modules/tasks/routers.py

123 lines
4.1 KiB
Python

from fastapi import FastAPI, Depends
from fastapi.routing import APIRouter
from celery_app import celery_app
from fastapi import APIRouter, HTTPException, BackgroundTasks
from datetime import datetime
from fastapi.responses import JSONResponse
from typing import Dict, Any, List, Optional
from api.api_v2.modules.authentication.services import get_current_user
import logging
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/async/task-status/{task_id}", response_model=Dict[str, Any])
async def get_task_status(task_id: str, current_user: Dict = Depends(get_current_user)):
"""
Consulta el estado de una tarea agendada.
Args:
task_id: ID de la tarea a consultar
Returns:
JSONResponse con el estado actual de la tarea
Raises:
HTTPException: Si la tarea no existe o hay errores
"""
try:
# Obtener el resultado de la tarea desde Celery
task_result = celery_app.AsyncResult(task_id)
if not task_result:
raise HTTPException(status_code=404, detail="Tarea no encontrada")
# Preparar respuesta según el estado
response_data = {
"task_id": task_id,
"status": task_result.status,
"timestamp": datetime.utcnow().isoformat()
}
if task_result.status == 'PENDING':
response_data.update({
"message": "La tarea está pendiente de procesamiento",
"progress": 0
})
elif task_result.status == 'PROGRESS':
meta = task_result.info
response_data.update({
"message": f"Procesando: {meta.get('status', 'En progreso')}",
"progress": meta.get('progress', 50),
"current_step": meta.get('status')
})
elif task_result.status == 'SUCCESS':
response_data.update({
"message": "Tarea completada exitosamente",
"progress": 100,
"result": task_result.result
})
elif task_result.status == 'FAILURE':
response_data.update({
"message": f"Error en la tarea: {str(task_result.info)}",
"progress": 0,
"error": str(task_result.info)
})
else:
response_data.update({
"message": f"Estado desconocido: {task_result.status}",
"progress": 0
})
# Determinar código de estado HTTP
status_code = 200
if task_result.status == 'FAILURE':
status_code = 500
elif task_result.status in ['PENDING', 'PROGRESS']:
status_code = 202
return JSONResponse(content=response_data, status_code=status_code)
except Exception as e:
logger.error(f"Error al consultar estado de tarea {task_id}: {e}")
raise HTTPException(
status_code=500,
detail=f"Error al consultar el estado de la tarea: {str(e)}"
)
@router.delete("/async/task/{task_id}", response_model=Dict[str, Any])
async def cancel_task(task_id: str, current_user: Dict = Depends(get_current_user)):
"""
Cancela una tarea agendada.
Args:
task_id: ID de la tarea a cancelar
Returns:
JSONResponse confirmando la cancelación
Raises:
HTTPException: Si hay errores al cancelar
"""
try:
# Revocar la tarea
celery_app.control.revoke(task_id, terminate=True)
response_data = {
"success": True,
"message": f"Tarea {task_id} cancelada exitosamente",
"task_id": task_id,
"timestamp": datetime.utcnow().isoformat()
}
logger.info(f"Tarea cancelada: {task_id}")
return JSONResponse(content=response_data, status_code=200)
except Exception as e:
logger.error(f"Error al cancelar tarea {task_id}: {e}")
raise HTTPException(
status_code=500,
detail=f"Error al cancelar la tarea: {str(e)}"
)