from fastapi import APIRouter, HTTPException, BackgroundTasks from schemas.pedimentoSchema import PedimentoRequest from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema from fastapi.responses import JSONResponse from celery_app import celery_app from tasks import pedimento_completo_task import logging from typing import Dict, Any import uuid from datetime import datetime logger = logging.getLogger(__name__) router = APIRouter() @router.post("/async/services/pedimento_completo") async def async_get_pedimento_completo(request: ServiceRemesaSchema): """ Agenda la tarea de obtener pedimento completo de forma asíncrona. Este endpoint: 1. Valida los datos de entrada 2. Agenda la tarea en Celery 3. Retorna inmediatamente el ID de la tarea para seguimiento Args: request: ServiceRemesaSchema con pedimento y organización Returns: JSONResponse con task_id para consultar el estado de la tarea Raises: HTTPException: En caso de errores de validación """ try: # Convertir request a diccionario request_data = request.model_dump() logger.info(f"Agendando tarea de pedimento completo - Pedimento: {request_data['pedimento']}") # Agendar la tarea en Celery task = pedimento_completo_task.delay(request_data) # Crear respuesta inmediata response_data = { "success": True, "message": "Tarea agendada exitosamente. La consulta del pedimento completo se está procesando en segundo plano.", "task_id": task.id, "pedimento": request_data['pedimento'], "organizacion": request_data.get('organizacion'), "status": "PENDING", "timestamp": datetime.utcnow().isoformat(), "estimated_completion": "2-5 minutos", "check_status_url": f"/async/task-status/{task.id}" } logger.info(f"Tarea agendada exitosamente - Task ID: {task.id}") return JSONResponse(content=response_data, status_code=202) except Exception as e: logger.error(f"Error al agendar tarea de pedimento completo: {e}") raise HTTPException( status_code=500, detail=f"Error al agendar la tarea: {str(e)}" ) @router.get("/async/task-status/{task_id}") async def get_task_status(task_id: str): """ Consulta el estado de una tarea agendada. Args: task_id: ID de la tarea a consultar Returns: JSONResponse con el estado actual de la tarea Raises: HTTPException: Si la tarea no existe o hay errores """ try: # Obtener el resultado de la tarea desde Celery task_result = celery_app.AsyncResult(task_id) if not task_result: raise HTTPException(status_code=404, detail="Tarea no encontrada") # Preparar respuesta según el estado response_data = { "task_id": task_id, "status": task_result.status, "timestamp": datetime.utcnow().isoformat() } if task_result.status == 'PENDING': response_data.update({ "message": "La tarea está pendiente de procesamiento", "progress": 0 }) elif task_result.status == 'PROGRESS': meta = task_result.info response_data.update({ "message": f"Procesando: {meta.get('status', 'En progreso')}", "progress": meta.get('progress', 50), "current_step": meta.get('status') }) elif task_result.status == 'SUCCESS': response_data.update({ "message": "Tarea completada exitosamente", "progress": 100, "result": task_result.result }) elif task_result.status == 'FAILURE': response_data.update({ "message": f"Error en la tarea: {str(task_result.info)}", "progress": 0, "error": str(task_result.info) }) else: response_data.update({ "message": f"Estado desconocido: {task_result.status}", "progress": 0 }) # Determinar código de estado HTTP status_code = 200 if task_result.status == 'FAILURE': status_code = 500 elif task_result.status in ['PENDING', 'PROGRESS']: status_code = 202 return JSONResponse(content=response_data, status_code=status_code) except Exception as e: logger.error(f"Error al consultar estado de tarea {task_id}: {e}") raise HTTPException( status_code=500, detail=f"Error al consultar el estado de la tarea: {str(e)}" ) @router.get("/async/tasks/active") async def get_active_tasks(): """ Lista todas las tareas activas en el sistema. Returns: JSONResponse con la lista de tareas activas """ try: # Obtener tareas activas desde Celery inspect = celery_app.control.inspect() active_tasks = inspect.active() scheduled_tasks = inspect.scheduled() response_data = { "active_tasks": active_tasks or {}, "scheduled_tasks": scheduled_tasks or {}, "timestamp": datetime.utcnow().isoformat() } return JSONResponse(content=response_data, status_code=200) except Exception as e: logger.error(f"Error al obtener tareas activas: {e}") raise HTTPException( status_code=500, detail=f"Error al obtener tareas activas: {str(e)}" ) @router.delete("/async/task/{task_id}") async def cancel_task(task_id: str): """ Cancela una tarea agendada. Args: task_id: ID de la tarea a cancelar Returns: JSONResponse confirmando la cancelación Raises: HTTPException: Si hay errores al cancelar """ try: # Revocar la tarea celery_app.control.revoke(task_id, terminate=True) response_data = { "success": True, "message": f"Tarea {task_id} cancelada exitosamente", "task_id": task_id, "timestamp": datetime.utcnow().isoformat() } logger.info(f"Tarea cancelada: {task_id}") return JSONResponse(content=response_data, status_code=200) except Exception as e: logger.error(f"Error al cancelar tarea {task_id}: {e}") raise HTTPException( status_code=500, detail=f"Error al cancelar la tarea: {str(e)}" ) @router.post("/async/services/partidas") async def async_get_partidas(request: ServiceRemesaSchema): """ Agenda una tarea asíncrona para obtener las partidas de un pedimento. Args: request: Datos del pedimento y organización Returns: JSONResponse con el task_id y información de la tarea agendada Raises: HTTPException: Si hay errores en la validación o al agendar la tarea """ try: logger.info(f"Solicitando consulta asíncrona de partidas - Pedimento: {request.pedimento}") # Preparar datos para la tarea request_data = request.model_dump() # Agendar la tarea en Celery from tasks import partidas_task task = partidas_task.delay(**request_data) # Preparar respuesta response_data = { "success": True, "message": "Tarea agendada exitosamente. La consulta de partidas se está procesando en segundo plano.", "task_id": task.id, "pedimento": request_data['pedimento'], "organizacion": request_data.get('organizacion'), "status": "PENDING", "timestamp": datetime.utcnow().isoformat(), "estimated_completion": "1-3 minutos", "check_status_url": f"/async/task-status/{task.id}" } logger.info(f"Tarea de partidas agendada exitosamente - Task ID: {task.id}") return JSONResponse(content=response_data, status_code=202) except Exception as e: logger.error(f"Error al agendar tarea de partidas: {e}") raise HTTPException( status_code=500, detail=f"Error al agendar la tarea: {str(e)}" ) @router.post("/async/services/remesas") async def async_get_remesas(request: ServiceRemesaSchema): """ Agenda una tarea asíncrona para obtener las remesas de un pedimento. Args: request: Datos del pedimento y organización Returns: JSONResponse con el task_id y información de la tarea agendada Raises: HTTPException: Si hay errores en la validación o al agendar la tarea """ try: logger.info(f"Solicitando consulta asíncrona de remesas - Pedimento: {request.pedimento}") # Preparar datos para la tarea request_data = request.model_dump() # Agendar la tarea en Celery from tasks import remesas_task task = remesas_task.delay(**request_data) # Preparar respuesta response_data = { "success": True, "message": "Tarea agendada exitosamente. La consulta de remesas se está procesando en segundo plano.", "task_id": task.id, "pedimento": request_data['pedimento'], "organizacion": request_data.get('organizacion'), "status": "PENDING", "timestamp": datetime.utcnow().isoformat(), "estimated_completion": "1-3 minutos", "check_status_url": f"/async/task-status/{task.id}" } logger.info(f"Tarea de remesas agendada exitosamente - Task ID: {task.id}") return JSONResponse(content=response_data, status_code=202) except Exception as e: logger.error(f"Error al agendar tarea de remesas: {e}") raise HTTPException( status_code=500, detail=f"Error al agendar la tarea: {str(e)}" ) @router.post("/async/services/acuse") async def async_get_acuse(request: ServiceRemesaSchema): """ Agenda una tarea asíncrona para obtener el acuse de un edocument. Args: request: Datos del pedimento y organización Returns: JSONResponse con el task_id y información de la tarea agendada Raises: HTTPException: Si hay errores en la validación o al agendar la tarea """ try: logger.info(f"Solicitando consulta asíncrona de acuse - Pedimento: {request.pedimento}") # Preparar datos para la tarea request_data = request.model_dump() # Agendar la tarea en Celery from tasks import acuse_task task = acuse_task.delay(**request_data) # Preparar respuesta response_data = { "success": True, "message": "Tarea agendada exitosamente. La consulta de acuse se está procesando en segundo plano.", "task_id": task.id, "pedimento": request_data['pedimento'], "organizacion": request_data.get('organizacion'), "status": "PENDING", "timestamp": datetime.utcnow().isoformat(), "estimated_completion": "1-3 minutos", "check_status_url": f"/async/task-status/{task.id}" } logger.info(f"Tarea de acuse agendada exitosamente - Task ID: {task.id}") return JSONResponse(content=response_data, status_code=202) except Exception as e: logger.error(f"Error al agendar tarea de acuse: {e}") raise HTTPException( status_code=500, detail=f"Error al agendar la tarea: {str(e)}" ) @router.post("/async/services/edocument") async def async_get_edocument(request: ServiceRemesaSchema): """ Agenda una tarea asíncrona para obtener edocuments de un pedimento. Args: request: Datos del pedimento y organización Returns: JSONResponse con el task_id y información de la tarea agendada Raises: HTTPException: Si hay errores en la validación o al agendar la tarea """ try: logger.info(f"Solicitando consulta asíncrona de edocuments - Pedimento: {request.pedimento}") # Preparar datos para la tarea request_data = request.model_dump() # Agendar la tarea en Celery from tasks import edocument_task task = edocument_task.delay(**request_data) # Preparar respuesta response_data = { "success": True, "message": "Tarea agendada exitosamente. La consulta de edocuments se está procesando en segundo plano.", "task_id": task.id, "pedimento": request_data['pedimento'], "organizacion": request_data.get('organizacion'), "status": "PENDING", "timestamp": datetime.utcnow().isoformat(), "estimated_completion": "2-4 minutos", "check_status_url": f"/async/task-status/{task.id}" } logger.info(f"Tarea de edocuments agendada exitosamente - Task ID: {task.id}") return JSONResponse(content=response_data, status_code=202) except Exception as e: logger.error(f"Error al agendar tarea de edocuments: {e}") raise HTTPException( status_code=500, detail=f"Error al agendar la tarea: {str(e)}" ) @router.post("/async/services/coves") async def async_get_coves(request: ServiceRemesaSchema): """ Agenda una tarea asíncrona para obtener coves de un pedimento. Args: request: Datos del pedimento y organización Returns: JSONResponse con el task_id y información de la tarea agendada Raises: HTTPException: Si hay errores en la validación o al agendar la tarea """ try: logger.info(f"Solicitando consulta asíncrona de coves - Pedimento: {request.pedimento}") # Preparar datos para la tarea request_data = request.model_dump() # Agendar la tarea en Celery from tasks import coves_task task = coves_task.delay(**request_data) # Preparar respuesta response_data = { "success": True, "message": "Tarea agendada exitosamente. La consulta de coves se está procesando en segundo plano.", "task_id": task.id, "pedimento": request_data['pedimento'], "organizacion": request_data.get('organizacion'), "status": "PENDING", "timestamp": datetime.utcnow().isoformat(), "estimated_completion": "1-2 minutos", "check_status_url": f"/async/task-status/{task.id}" } logger.info(f"Tarea de coves agendada exitosamente - Task ID: {task.id}") return JSONResponse(content=response_data, status_code=202) except Exception as e: logger.error(f"Error al agendar tarea de coves: {e}") raise HTTPException( status_code=500, detail=f"Error al agendar la tarea: {str(e)}" ) @router.post("/async/services/acuse-cove") async def async_get_acuse_cove(request: ServiceRemesaSchema): """ Agenda una tarea asíncrona para obtener el acuse de un COVE. Args: request: Datos del pedimento y organización Returns: JSONResponse con el task_id y información de la tarea agendada Raises: HTTPException: Si hay errores en la validación o al agendar la tarea """ try: logger.info(f"Solicitando consulta asíncrona de acuse COVE - Pedimento: {request.pedimento}") # Preparar datos para la tarea request_data = request.model_dump() # Agendar la tarea en Celery from tasks import acuse_cove_task task = acuse_cove_task.delay(**request_data) # Preparar respuesta response_data = { "success": True, "message": "Tarea agendada exitosamente. La consulta de acuse COVE se está procesando en segundo plano.", "task_id": task.id, "pedimento": request_data['pedimento'], "organizacion": request_data.get('organizacion'), "status": "PENDING", "timestamp": datetime.utcnow().isoformat(), "estimated_completion": "1-3 minutos", "check_status_url": f"/async/task-status/{task.id}" } logger.info(f"Tarea de acuse COVE agendada exitosamente - Task ID: {task.id}") return JSONResponse(content=response_data, status_code=202) except Exception as e: logger.error(f"Error al agendar tarea de acuse COVE: {e}") raise HTTPException( status_code=500, detail=f"Error al agendar la tarea: {str(e)}" )