diff --git a/Dockerfile b/Dockerfile index ff04b49..2074094 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,18 +17,23 @@ RUN pip install --user --no-cache-dir --verbose -r requirements.txt # Imagen final FROM python:3.11-slim -# Establecer variables de entorno para FastAPI +# Establecer variables de entorno para FastAPI y Celery ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 ENV PYTHONPATH=/app ENV PATH=/home/fastapi/.local/bin:$PATH +ENV C_FORCE_ROOT=1 +ENV REDIS_HOST=redis +ENV REDIS_PORT=6379 +ENV REDIS_DB=0 # Crear usuario no-root para seguridad RUN groupadd -r fastapi && useradd -r -g fastapi fastapi -# Instalar curl para healthcheck +# Instalar dependencias del sistema para Celery y Redis (sin supervisor) RUN apt-get update && apt-get install -y \ curl \ + redis-tools \ && rm -rf /var/lib/apt/lists/* # Establecer directorio de trabajo @@ -37,23 +42,37 @@ WORKDIR /app # Copiar dependencias instaladas desde el builder COPY --from=builder /root/.local /home/fastapi/.local +# Cambiar ownership de los archivos de Python packages al usuario fastapi +RUN chown -R fastapi:fastapi /home/fastapi/.local + +# Instalar supervisor para manejar múltiples procesos +RUN pip install supervisor + # Copiar el código de la aplicación COPY . . # Crear directorios necesarios y establecer permisos -RUN mkdir -p /app/logs /app/uploads /app/temp && \ +RUN mkdir -p /app/logs /app/uploads /app/temp /var/log/supervisor /etc/supervisor/conf.d && \ chown -R fastapi:fastapi /app && \ - chmod -R 755 /app + chmod -R 755 /app && \ + chown -R fastapi:fastapi /var/log/supervisor /etc/supervisor + +# Copiar configuraciones de Supervisor (como root antes de cambiar usuario) +COPY supervisord.conf /etc/supervisor/supervisord.conf +COPY supervisor_celery.conf /etc/supervisor/conf.d/efc_celery.conf # Cambiar al usuario no-root USER fastapi -# Exponer puerto +# Exponer puertos EXPOSE 8001 # Healthcheck para verificar que el servicio está funcionando HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8001/api/v1/health || exit 1 -# Comando por defecto con configuración optimizada -CMD ["sh", "-c", "uvicorn main:app --host 0.0.0.0 --port 8001 --workers 32 --reload"] \ No newline at end of file +# Cambiar temporalmente a root para iniciar supervisor +USER root + +# Comando por defecto: usar Supervisor para gestionar FastAPI y Celery +CMD ["supervisord", "-n", "-c", "/etc/supervisor/supervisord.conf"] \ No newline at end of file diff --git a/Dockerfile.prod b/Dockerfile.prod index 082f994..9751747 100644 --- a/Dockerfile.prod +++ b/Dockerfile.prod @@ -1,6 +1,7 @@ # Multi-stage build para optimizar el tamaño de la imagen FROM python:3.11-slim AS builder +# Instalar dependencias de compilación RUN apt-get update && apt-get install -y \ gcc \ g++ \ @@ -11,23 +12,53 @@ WORKDIR /app COPY requirements.txt . RUN pip install --user --no-cache-dir --verbose -r requirements.txt +# Imagen final de producción FROM python:3.11-slim +# Variables de entorno para producción ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 ENV PYTHONPATH=/app ENV PATH=/home/fastapi/.local/bin:$PATH +ENV C_FORCE_ROOT=1 +ENV REDIS_HOST=redis +ENV REDIS_PORT=6379 +ENV REDIS_DB=0 +# Crear usuario no-root para seguridad RUN groupadd -r fastapi && useradd -r -g fastapi fastapi -RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/* + +# Instalar dependencias del sistema para Celery y Redis (sin supervisor) +RUN apt-get update && apt-get install -y \ + curl \ + redis-tools \ + && rm -rf /var/lib/apt/lists/* WORKDIR /app +# Copiar dependencias instaladas desde el builder COPY --from=builder /root/.local /home/fastapi/.local + +# Instalar supervisor para manejar múltiples procesos +RUN pip install supervisor + +# Copiar el código de la aplicación COPY . . -USER fastapi +# Crear directorios necesarios y establecer permisos +RUN mkdir -p /app/logs /app/uploads /app/temp /var/log/supervisor /etc/supervisor/conf.d && \ + chown -R fastapi:fastapi /app && \ + chmod -R 755 /app && \ + chown -R fastapi:fastapi /var/log/supervisor /etc/supervisor + +# Copiar configuraciones de Supervisor para producción (como root antes de cambiar usuario) +COPY supervisord.conf /etc/supervisor/supervisord.conf +COPY supervisor_celery.conf /etc/supervisor/conf.d/efc_celery.conf + +# Cambiar temporalmente a root para Supervisor +USER root EXPOSE 8001 -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001", "--workers", "12"] +# Comando de producción: iniciar Supervisor para gestionar todos los procesos +CMD ["supervisord", "-n", "-c", "/etc/supervisor/supervisord.conf"] diff --git a/api/api_v1/api.py b/api/api_v1/api.py index 30a1f6b..7d27651 100644 --- a/api/api_v1/api.py +++ b/api/api_v1/api.py @@ -3,7 +3,8 @@ from fastapi import APIRouter # Debes usar paréntesis () para hacer importaciones multilínea. from api.api_v1.endpoints import ( health, - pedimentos + pedimentos, + async_pedimentos ) api_router = APIRouter() @@ -11,4 +12,5 @@ api_router = APIRouter() # Incluir routers de endpoints api_router.include_router(health.router, tags=["health"]) api_router.include_router(pedimentos.router, tags=["pedimentos"]) +api_router.include_router(async_pedimentos.router, tags=["async-pedimentos"]) diff --git a/api/api_v1/endpoints/async_pedimentos.py b/api/api_v1/endpoints/async_pedimentos.py new file mode 100644 index 0000000..41e2803 --- /dev/null +++ b/api/api_v1/endpoints/async_pedimentos.py @@ -0,0 +1,493 @@ +from fastapi import APIRouter, HTTPException, BackgroundTasks +from schemas.pedimentoSchema import PedimentoRequest +from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema +from fastapi.responses import JSONResponse +from celery_app import celery_app +from tasks import pedimento_completo_task +import logging +from typing import Dict, Any +import uuid +from datetime import datetime + +logger = logging.getLogger(__name__) + +router = APIRouter() + + + +@router.post("/async/services/pedimento_completo") +async def async_get_pedimento_completo(request: ServiceRemesaSchema): + """ + Agenda la tarea de obtener pedimento completo de forma asíncrona. + + Este endpoint: + 1. Valida los datos de entrada + 2. Agenda la tarea en Celery + 3. Retorna inmediatamente el ID de la tarea para seguimiento + + Args: + request: ServiceRemesaSchema con pedimento y organización + + Returns: + JSONResponse con task_id para consultar el estado de la tarea + + Raises: + HTTPException: En caso de errores de validación + """ + try: + # Convertir request a diccionario + request_data = request.model_dump() + + logger.info(f"Agendando tarea de pedimento completo - Pedimento: {request_data['pedimento']}") + + # Agendar la tarea en Celery + task = pedimento_completo_task.delay(request_data) + + # Crear respuesta inmediata + response_data = { + "success": True, + "message": "Tarea agendada exitosamente. La consulta del pedimento completo se está procesando en segundo plano.", + "task_id": task.id, + "pedimento": request_data['pedimento'], + "organizacion": request_data.get('organizacion'), + "status": "PENDING", + "timestamp": datetime.utcnow().isoformat(), + "estimated_completion": "2-5 minutos", + "check_status_url": f"/async/task-status/{task.id}" + } + + logger.info(f"Tarea agendada exitosamente - Task ID: {task.id}") + return JSONResponse(content=response_data, status_code=202) + + except Exception as e: + logger.error(f"Error al agendar tarea de pedimento completo: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al agendar la tarea: {str(e)}" + ) + +@router.get("/async/task-status/{task_id}") +async def get_task_status(task_id: str): + """ + Consulta el estado de una tarea agendada. + + Args: + task_id: ID de la tarea a consultar + + Returns: + JSONResponse con el estado actual de la tarea + + Raises: + HTTPException: Si la tarea no existe o hay errores + """ + try: + # Obtener el resultado de la tarea desde Celery + task_result = celery_app.AsyncResult(task_id) + + if not task_result: + raise HTTPException(status_code=404, detail="Tarea no encontrada") + + # Preparar respuesta según el estado + response_data = { + "task_id": task_id, + "status": task_result.status, + "timestamp": datetime.utcnow().isoformat() + } + + if task_result.status == 'PENDING': + response_data.update({ + "message": "La tarea está pendiente de procesamiento", + "progress": 0 + }) + elif task_result.status == 'PROGRESS': + meta = task_result.info + response_data.update({ + "message": f"Procesando: {meta.get('status', 'En progreso')}", + "progress": meta.get('progress', 50), + "current_step": meta.get('status') + }) + elif task_result.status == 'SUCCESS': + response_data.update({ + "message": "Tarea completada exitosamente", + "progress": 100, + "result": task_result.result + }) + elif task_result.status == 'FAILURE': + response_data.update({ + "message": f"Error en la tarea: {str(task_result.info)}", + "progress": 0, + "error": str(task_result.info) + }) + else: + response_data.update({ + "message": f"Estado desconocido: {task_result.status}", + "progress": 0 + }) + + # Determinar código de estado HTTP + status_code = 200 + if task_result.status == 'FAILURE': + status_code = 500 + elif task_result.status in ['PENDING', 'PROGRESS']: + status_code = 202 + + return JSONResponse(content=response_data, status_code=status_code) + + except Exception as e: + logger.error(f"Error al consultar estado de tarea {task_id}: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al consultar el estado de la tarea: {str(e)}" + ) + +@router.get("/async/tasks/active") +async def get_active_tasks(): + """ + Lista todas las tareas activas en el sistema. + + Returns: + JSONResponse con la lista de tareas activas + """ + try: + # Obtener tareas activas desde Celery + inspect = celery_app.control.inspect() + active_tasks = inspect.active() + scheduled_tasks = inspect.scheduled() + + response_data = { + "active_tasks": active_tasks or {}, + "scheduled_tasks": scheduled_tasks or {}, + "timestamp": datetime.utcnow().isoformat() + } + + return JSONResponse(content=response_data, status_code=200) + + except Exception as e: + logger.error(f"Error al obtener tareas activas: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al obtener tareas activas: {str(e)}" + ) + +@router.delete("/async/task/{task_id}") +async def cancel_task(task_id: str): + """ + Cancela una tarea agendada. + + Args: + task_id: ID de la tarea a cancelar + + Returns: + JSONResponse confirmando la cancelación + + Raises: + HTTPException: Si hay errores al cancelar + """ + try: + # Revocar la tarea + celery_app.control.revoke(task_id, terminate=True) + + response_data = { + "success": True, + "message": f"Tarea {task_id} cancelada exitosamente", + "task_id": task_id, + "timestamp": datetime.utcnow().isoformat() + } + + logger.info(f"Tarea cancelada: {task_id}") + return JSONResponse(content=response_data, status_code=200) + + except Exception as e: + logger.error(f"Error al cancelar tarea {task_id}: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al cancelar la tarea: {str(e)}" + ) + + +@router.post("/async/services/partidas") +async def async_get_partidas(request: ServiceRemesaSchema): + """ + Agenda una tarea asíncrona para obtener las partidas de un pedimento. + + Args: + request: Datos del pedimento y organización + + Returns: + JSONResponse con el task_id y información de la tarea agendada + + Raises: + HTTPException: Si hay errores en la validación o al agendar la tarea + """ + try: + logger.info(f"Solicitando consulta asíncrona de partidas - Pedimento: {request.pedimento}") + + # Preparar datos para la tarea + request_data = request.model_dump() + + # Agendar la tarea en Celery + from tasks import partidas_task + task = partidas_task.delay(**request_data) + + # Preparar respuesta + response_data = { + "success": True, + "message": "Tarea agendada exitosamente. La consulta de partidas se está procesando en segundo plano.", + "task_id": task.id, + "pedimento": request_data['pedimento'], + "organizacion": request_data.get('organizacion'), + "status": "PENDING", + "timestamp": datetime.utcnow().isoformat(), + "estimated_completion": "1-3 minutos", + "check_status_url": f"/async/task-status/{task.id}" + } + + logger.info(f"Tarea de partidas agendada exitosamente - Task ID: {task.id}") + return JSONResponse(content=response_data, status_code=202) + + except Exception as e: + logger.error(f"Error al agendar tarea de partidas: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al agendar la tarea: {str(e)}" + ) + + +@router.post("/async/services/remesas") +async def async_get_remesas(request: ServiceRemesaSchema): + """ + Agenda una tarea asíncrona para obtener las remesas de un pedimento. + + Args: + request: Datos del pedimento y organización + + Returns: + JSONResponse con el task_id y información de la tarea agendada + + Raises: + HTTPException: Si hay errores en la validación o al agendar la tarea + """ + try: + logger.info(f"Solicitando consulta asíncrona de remesas - Pedimento: {request.pedimento}") + + # Preparar datos para la tarea + request_data = request.model_dump() + + # Agendar la tarea en Celery + from tasks import remesas_task + task = remesas_task.delay(**request_data) + + # Preparar respuesta + response_data = { + "success": True, + "message": "Tarea agendada exitosamente. La consulta de remesas se está procesando en segundo plano.", + "task_id": task.id, + "pedimento": request_data['pedimento'], + "organizacion": request_data.get('organizacion'), + "status": "PENDING", + "timestamp": datetime.utcnow().isoformat(), + "estimated_completion": "1-3 minutos", + "check_status_url": f"/async/task-status/{task.id}" + } + + logger.info(f"Tarea de remesas agendada exitosamente - Task ID: {task.id}") + return JSONResponse(content=response_data, status_code=202) + + except Exception as e: + logger.error(f"Error al agendar tarea de remesas: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al agendar la tarea: {str(e)}" + ) + + +@router.post("/async/services/acuse") +async def async_get_acuse(request: ServiceRemesaSchema): + """ + Agenda una tarea asíncrona para obtener el acuse de un edocument. + + Args: + request: Datos del pedimento y organización + + Returns: + JSONResponse con el task_id y información de la tarea agendada + + Raises: + HTTPException: Si hay errores en la validación o al agendar la tarea + """ + try: + logger.info(f"Solicitando consulta asíncrona de acuse - Pedimento: {request.pedimento}") + + # Preparar datos para la tarea + request_data = request.model_dump() + + # Agendar la tarea en Celery + from tasks import acuse_task + task = acuse_task.delay(**request_data) + + # Preparar respuesta + response_data = { + "success": True, + "message": "Tarea agendada exitosamente. La consulta de acuse se está procesando en segundo plano.", + "task_id": task.id, + "pedimento": request_data['pedimento'], + "organizacion": request_data.get('organizacion'), + "status": "PENDING", + "timestamp": datetime.utcnow().isoformat(), + "estimated_completion": "1-3 minutos", + "check_status_url": f"/async/task-status/{task.id}" + } + + logger.info(f"Tarea de acuse agendada exitosamente - Task ID: {task.id}") + return JSONResponse(content=response_data, status_code=202) + + except Exception as e: + logger.error(f"Error al agendar tarea de acuse: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al agendar la tarea: {str(e)}" + ) + + +@router.post("/async/services/edocument") +async def async_get_edocument(request: ServiceRemesaSchema): + """ + Agenda una tarea asíncrona para obtener edocuments de un pedimento. + + Args: + request: Datos del pedimento y organización + + Returns: + JSONResponse con el task_id y información de la tarea agendada + + Raises: + HTTPException: Si hay errores en la validación o al agendar la tarea + """ + try: + logger.info(f"Solicitando consulta asíncrona de edocuments - Pedimento: {request.pedimento}") + + # Preparar datos para la tarea + request_data = request.model_dump() + + # Agendar la tarea en Celery + from tasks import edocument_task + task = edocument_task.delay(**request_data) + + # Preparar respuesta + response_data = { + "success": True, + "message": "Tarea agendada exitosamente. La consulta de edocuments se está procesando en segundo plano.", + "task_id": task.id, + "pedimento": request_data['pedimento'], + "organizacion": request_data.get('organizacion'), + "status": "PENDING", + "timestamp": datetime.utcnow().isoformat(), + "estimated_completion": "2-4 minutos", + "check_status_url": f"/async/task-status/{task.id}" + } + + logger.info(f"Tarea de edocuments agendada exitosamente - Task ID: {task.id}") + return JSONResponse(content=response_data, status_code=202) + + except Exception as e: + logger.error(f"Error al agendar tarea de edocuments: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al agendar la tarea: {str(e)}" + ) + + +@router.post("/async/services/coves") +async def async_get_coves(request: ServiceRemesaSchema): + """ + Agenda una tarea asíncrona para obtener coves de un pedimento. + + Args: + request: Datos del pedimento y organización + + Returns: + JSONResponse con el task_id y información de la tarea agendada + + Raises: + HTTPException: Si hay errores en la validación o al agendar la tarea + """ + try: + logger.info(f"Solicitando consulta asíncrona de coves - Pedimento: {request.pedimento}") + + # Preparar datos para la tarea + request_data = request.model_dump() + + # Agendar la tarea en Celery + from tasks import coves_task + task = coves_task.delay(**request_data) + + # Preparar respuesta + response_data = { + "success": True, + "message": "Tarea agendada exitosamente. La consulta de coves se está procesando en segundo plano.", + "task_id": task.id, + "pedimento": request_data['pedimento'], + "organizacion": request_data.get('organizacion'), + "status": "PENDING", + "timestamp": datetime.utcnow().isoformat(), + "estimated_completion": "1-2 minutos", + "check_status_url": f"/async/task-status/{task.id}" + } + + logger.info(f"Tarea de coves agendada exitosamente - Task ID: {task.id}") + return JSONResponse(content=response_data, status_code=202) + + except Exception as e: + logger.error(f"Error al agendar tarea de coves: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al agendar la tarea: {str(e)}" + ) + + +@router.post("/async/services/acuse-cove") +async def async_get_acuse_cove(request: ServiceRemesaSchema): + """ + Agenda una tarea asíncrona para obtener el acuse de un COVE. + + Args: + request: Datos del pedimento y organización + + Returns: + JSONResponse con el task_id y información de la tarea agendada + + Raises: + HTTPException: Si hay errores en la validación o al agendar la tarea + """ + try: + logger.info(f"Solicitando consulta asíncrona de acuse COVE - Pedimento: {request.pedimento}") + + # Preparar datos para la tarea + request_data = request.model_dump() + + # Agendar la tarea en Celery + from tasks import acuse_cove_task + task = acuse_cove_task.delay(**request_data) + + # Preparar respuesta + response_data = { + "success": True, + "message": "Tarea agendada exitosamente. La consulta de acuse COVE se está procesando en segundo plano.", + "task_id": task.id, + "pedimento": request_data['pedimento'], + "organizacion": request_data.get('organizacion'), + "status": "PENDING", + "timestamp": datetime.utcnow().isoformat(), + "estimated_completion": "1-3 minutos", + "check_status_url": f"/async/task-status/{task.id}" + } + + logger.info(f"Tarea de acuse COVE agendada exitosamente - Task ID: {task.id}") + return JSONResponse(content=response_data, status_code=202) + + except Exception as e: + logger.error(f"Error al agendar tarea de acuse COVE: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al agendar la tarea: {str(e)}" + ) diff --git a/api/api_v1/endpoints/pedimentos.py b/api/api_v1/endpoints/pedimentos.py index a549d61..8cba332 100644 --- a/api/api_v1/endpoints/pedimentos.py +++ b/api/api_v1/endpoints/pedimentos.py @@ -361,6 +361,7 @@ async def get_pedimento_completo(request: ServiceRemesaSchema): update_success = await _update_service_status( service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name ) + if not update_success: raise HTTPException(status_code=500, detail="Error al actualizar estado del servicio") diff --git a/celery_app.py b/celery_app.py new file mode 100644 index 0000000..3e04c76 --- /dev/null +++ b/celery_app.py @@ -0,0 +1,27 @@ +from celery import Celery +from core.config import settings +import os + +# Configuración de Celery +celery_app = Celery( + "microservice", + broker=f"redis://{os.getenv('REDIS_HOST', 'localhost')}:{os.getenv('REDIS_PORT', '6379')}/{os.getenv('REDIS_DB', '0')}", + backend=f"redis://{os.getenv('REDIS_HOST', 'localhost')}:{os.getenv('REDIS_PORT', '6379')}/{os.getenv('REDIS_DB', '0')}", + include=['tasks'] +) + +# Configuración adicional +celery_app.conf.update( + task_serializer='json', + accept_content=['json'], + result_serializer='json', + timezone='UTC', + enable_utc=True, + task_track_started=True, + task_time_limit=3600, # 1 hour timeout + worker_prefetch_multiplier=1, + worker_max_tasks_per_child=1000, +) + +# Autodiscovery of tasks +celery_app.autodiscover_tasks() \ No newline at end of file diff --git a/main.py b/main.py index 79af994..ebccc99 100644 --- a/main.py +++ b/main.py @@ -2,8 +2,8 @@ import logging from fastapi import FastAPI from core.config import settings from api.api_v1.api import api_router - from fastapi.middleware.cors import CORSMiddleware + # Configuración inicial del logging (debe estar al inicio del archivo) # logging.config.dictConfig({ # "version": 1, @@ -44,7 +44,7 @@ def create_application() -> FastAPI: """Función factory para crear la aplicación FastAPI""" application = FastAPI( title=settings.APP_NAME, - description="EFC Microservice - Un microservicio profesional por AduanaSoft", + description="EFC Microservice - Un microservicio profesional por AduanaSoft con soporte para tareas asíncronas", version=settings.APP_VERSION, debug=settings.DEBUG, ) @@ -66,6 +66,21 @@ app.add_middleware( allow_methods=["*"], allow_headers=["*"], ) + +# Event handlers para Celery +@app.on_event("startup") +async def startup_event(): + """Inicialización al arrancar la aplicación""" + logger = logging.getLogger(__name__) + logger.info("Iniciando EFC Microservice con soporte asíncrono") + logger.info("Celery configurado para tareas en segundo plano") + +@app.on_event("shutdown") +async def shutdown_event(): + """Limpieza al cerrar la aplicación""" + logger = logging.getLogger(__name__) + logger.info("Cerrando EFC Microservice") + if __name__ == "__main__": import uvicorn uvicorn.run( diff --git a/output.pdf b/output.pdf deleted file mode 100644 index b05fd20..0000000 Binary files a/output.pdf and /dev/null differ diff --git a/requirements.txt b/requirements.txt index d5658dd..cd56ab5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,3 +21,6 @@ urllib3==2.5.0 uvicorn==0.35.0 python-dotenv cryptography +celery==5.3.4 +redis==5.0.1 +supervisor==4.2.5 diff --git a/supervisor_celery.conf b/supervisor_celery.conf new file mode 100644 index 0000000..371ac2d --- /dev/null +++ b/supervisor_celery.conf @@ -0,0 +1,54 @@ +[program:efc_celery_worker] +command=/usr/local/bin/python -m celery -A celery_app worker --loglevel=info --pool=solo +directory=/app +user=fastapi +numprocs=1 +stdout_logfile=/app/logs/celery_worker.log +stderr_logfile=/app/logs/celery_worker_error.log +stdout_logfile_maxbytes=50MB +stderr_logfile_maxbytes=50MB +autostart=true +autorestart=true +startsecs=10 +stopwaitsecs=600 +killasgroup=true +priority=998 +environment=REDIS_HOST="redis_microservice_dev",REDIS_PORT="6379",REDIS_DB="0",PYTHONPATH="/app:/home/fastapi/.local/lib/python3.11/site-packages",PATH="/home/fastapi/.local/bin:/usr/local/bin:%(ENV_PATH)s" + +[program:efc_celery_beat] +command=/usr/local/bin/python -m celery -A celery_app beat --loglevel=info +directory=/app +user=fastapi +numprocs=1 +stdout_logfile=/app/logs/celery_beat.log +stderr_logfile=/app/logs/celery_beat_error.log +stdout_logfile_maxbytes=50MB +stderr_logfile_maxbytes=50MB +autostart=false +autorestart=true +startsecs=10 +stopwaitsecs=600 +killasgroup=true +priority=999 +environment=REDIS_HOST="redis_microservice_dev",REDIS_PORT="6379",REDIS_DB="0",PYTHONPATH="/app:/home/fastapi/.local/lib/python3.11/site-packages",PATH="/home/fastapi/.local/bin:/usr/local/bin:%(ENV_PATH)s" + +[program:efc_fastapi] +command=/usr/local/bin/python -m uvicorn main:app --host 0.0.0.0 --port 8001 --workers 1 +directory=/app +user=fastapi +numprocs=1 +stdout_logfile=/app/logs/fastapi.log +stderr_logfile=/app/logs/fastapi_error.log +stdout_logfile_maxbytes=50MB +stderr_logfile_maxbytes=50MB +autostart=true +autorestart=true +startsecs=10 +stopwaitsecs=600 +killasgroup=true +priority=997 +environment=PYTHONPATH="/app:/home/fastapi/.local/lib/python3.11/site-packages",PATH="/home/fastapi/.local/bin:/usr/local/bin:%(ENV_PATH)s" + +[group:efc_services] +programs=efc_celery_worker,efc_fastapi +priority=999 diff --git a/supervisord.conf b/supervisord.conf new file mode 100644 index 0000000..e96967a --- /dev/null +++ b/supervisord.conf @@ -0,0 +1,18 @@ +[unix_http_server] +file=/tmp/supervisor.sock + +[supervisord] +logfile=/var/log/supervisor/supervisord.log +pidfile=/tmp/supervisord.pid +childlogdir=/var/log/supervisor +nodaemon=true +user=root + +[rpcinterface:supervisor] +supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface + +[supervisorctl] +serverurl=unix:///tmp/supervisor.sock + +[include] +files = /etc/supervisor/conf.d/*.conf diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..a2aa9e5 --- /dev/null +++ b/tasks.py @@ -0,0 +1,893 @@ +from celery import Celery +from celery_app import celery_app +import asyncio +import logging +from typing import Dict, Any +from contextlib import asynccontextmanager +from controllers.RESTController import rest_controller +from controllers.SOAPController import soap_controller +from utils.peticiones import ( + get_soap_acuseCOVE, get_soap_cove, get_soap_pedimento_completo, + get_soap_remesas, get_soap_partidas, get_soap_acuse, get_soap_edocument +) +from utils.servicios import ( + _validate_request_data, + _get_pedimento_service, + _update_service_status, + _get_vucem_credentials, + _create_response, + _post_edocuments, + _schedule_follow_up_services, + _post_coves +) + +logger = logging.getLogger(__name__) + +# Estados del servicio +ESTADO_CREADO = 1 +ESTADO_EN_PROCESO = 2 +ESTADO_FINALIZADO = 3 +ESTADO_ERROR = 4 + +def run_async_task(async_func, *args, **kwargs): + """Helper function to run async functions in Celery tasks""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(async_func(*args, **kwargs)) + finally: + loop.close() + + +@celery_app.task(bind=True, name='tasks.pedimento_completo_task') +def pedimento_completo_task(self, request_data: Dict[str, Any]): + """ + Tarea asíncrona para obtener pedimento completo + """ + async def _execute_pedimento_completo(): + operation_name = "pedimento_completo" + service_data = None + + try: + logger.info(f"[TASK] Iniciando consulta de pedimento completo - Pedimento: {request_data['pedimento']}") + + self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'}) + await _validate_request_data(request_data) + + self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'}) + service_data = await _get_pedimento_service( + pedimento_id=request_data['pedimento'], + service_type=3, + operation_name=operation_name + ) + + self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'}) + update_success = await _update_service_status( + service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name + ) + + if not update_success: + raise Exception("Error al actualizar estado del servicio") + + self.update_state(state='PROGRESS', meta={'status': 'Obteniendo credenciales VUCEM'}) + contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '') + if not contribuyente_id: + logger.error("No se encontró ID de contribuyente en los datos del servicio") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("ID de contribuyente no encontrado") + + credentials = await _get_vucem_credentials(contribuyente_id, operation_name) + + self.update_state(state='PROGRESS', meta={'status': 'Realizando petición SOAP'}) + logger.info("[TASK] Realizando petición SOAP para pedimento completo...") + + soap_response = await get_soap_pedimento_completo( + credenciales=credentials, + response_service=service_data, + soap_controller=soap_controller + ) + + if not soap_response: + raise Exception("Error en la petición SOAP para pedimento completo") + + logger.info("[TASK] Petición SOAP para pedimento completo completada exitosamente") + # Subir documento de pedimento completo si la petición fue exitosa + try: + upload_result = await _post_edocuments( + response_service=service_data, + identificadores_ed=[soap_response.get('documento', {}).get('numero_edocument')] + ) + logger.info(f"Documento de pedimento completo subido exitosamente: {upload_result}") + except Exception as upload_err: + logger.error(f"Error al subir documento de pedimento completo: {upload_err}") + + self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'}) + + await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name) + + response_data = await _create_response( + service_data=service_data, + additional_data={ + "pedimento_completo": soap_response, + "documento": soap_response.get('documento', {}), + "xml_content": soap_response.get('xml_content', {}) + }, + success_message="Pedimento completo obtenido exitosamente" + ) + + logger.info(f"[TASK] Consulta de pedimento completo completada exitosamente - Servicio: {service_data['id']}") + return response_data + + except Exception as e: + logger.error(f"[TASK] Error en pedimento_completo: {e}") + if service_data: + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise e + + return run_async_task(_execute_pedimento_completo) + + +@celery_app.task(bind=True) +def partidas_task(self, **kwargs): + """Tarea asíncrona para obtener partidas""" + async def _execute_partidas(): + operation_name = "PARTIDAS_ASYNC" + service_data = None + + try: + request_data = kwargs + logger.info(f"[TASK] Iniciando consulta de partidas - Pedimento: {request_data.get('pedimento')}") + + self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'}) + await _validate_request_data(request_data) + + self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'}) + service_data = await _get_pedimento_service( + pedimento_id=request_data['pedimento'], + service_type=4, + operation_name=operation_name + ) + + self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'}) + update_success = await _update_service_status( + service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name + ) + + if not update_success: + raise Exception("Error al actualizar estado del servicio a 'En proceso'") + + contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '') + if not contribuyente_id: + logger.error("No se encontró ID de contribuyente en los datos del servicio") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("ID de contribuyente no encontrado") + + credentials = await _get_vucem_credentials(contribuyente_id, operation_name) + + self.update_state(state='PROGRESS', meta={'status': 'Realizando petición SOAP'}) + logger.info("[TASK] Realizando petición SOAP para partidas...") + + from utils.peticiones import get_soap_partidas + + soap_response = await get_soap_partidas( + credenciales=credentials, + response_service=service_data, + soap_controller=soap_controller, + partida=request_data.get('partida', '') + ) + + if not soap_response: + raise Exception("Error en la petición SOAP para partidas") + + logger.info("[TASK] Petición SOAP para partidas completada exitosamente") + # Subir documento de partidas si la petición fue exitosa + try: + upload_result = await _post_edocuments( + response_service=service_data, + identificadores_ed=[soap_response.get('documento', {}).get('numero_edocument')] + ) + logger.info(f"Documento de partidas subido exitosamente: {upload_result}") + except Exception as upload_err: + logger.error(f"Error al subir documento de partidas: {upload_err}") + + self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'}) + await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name) + + response_data = await _create_response( + service_data=service_data, + additional_data={ + "partidas": soap_response, + "documento": soap_response.get('documento', {}), + "xml_content": soap_response.get('xml_content', {}) + }, + success_message="Partidas obtenidas exitosamente" + ) + + logger.info(f"[TASK] Consulta de partidas completada exitosamente - Servicio: {service_data['id']}") + return response_data + + except Exception as e: + logger.error(f"[TASK] Error en partidas: {e}") + if service_data: + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise e + + return run_async_task(_execute_partidas) + + +@celery_app.task(bind=True) +def remesas_task(self, **kwargs): + """Tarea asíncrona para obtener remesas""" + async def _execute_remesas(): + operation_name = "REMESAS_ASYNC" + service_data = None + + try: + request_data = kwargs + logger.info(f"[TASK] Iniciando consulta de remesas - Pedimento: {request_data.get('pedimento')}") + + self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'}) + await _validate_request_data(request_data) + + self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'}) + service_data = await _get_pedimento_service( + pedimento_id=request_data['pedimento'], + service_type=5, + operation_name=operation_name + ) + + self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'}) + update_success = await _update_service_status( + service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name + ) + + if not update_success: + raise Exception("Error al actualizar estado del servicio a 'En proceso'") + + contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '') + if not contribuyente_id: + logger.error("No se encontró ID de contribuyente en los datos del servicio") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("ID de contribuyente no encontrado") + + credentials = await _get_vucem_credentials(contribuyente_id, operation_name) + + self.update_state(state='PROGRESS', meta={'status': 'Realizando petición SOAP'}) + logger.info("[TASK] Realizando petición SOAP para remesas...") + + from utils.peticiones import get_soap_remesas + + soap_response = await get_soap_remesas( + credenciales=credentials, + response_service=service_data, + soap_controller=soap_controller + ) + + if not soap_response: + raise Exception("Error en la petición SOAP para remesas") + + logger.info("[TASK] Petición SOAP para remesas completada exitosamente") + # Subir documento de remesas si la petición fue exitosa + try: + upload_result = await _post_edocuments( + response_service=service_data, + identificadores_ed=[soap_response.get('documento', {}).get('numero_edocument')] + ) + logger.info(f"Documento de remesas subido exitosamente: {upload_result}") + except Exception as upload_err: + logger.error(f"Error al subir documento de remesas: {upload_err}") + + self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'}) + await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name) + + response_data = await _create_response( + service_data=service_data, + additional_data={ + "remesas": soap_response, + "documento": soap_response.get('documento', {}), + "xml_content": soap_response.get('xml_content', {}) + }, + success_message="Remesas obtenidas exitosamente" + ) + + logger.info(f"[TASK] Consulta de remesas completada exitosamente - Servicio: {service_data['id']}") + return response_data + + except Exception as e: + logger.error(f"[TASK] Error en remesas: {e}") + if service_data: + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise e + + return run_async_task(_execute_remesas) + + +@celery_app.task(bind=True) +def acuse_task(self, **kwargs): + """Tarea asíncrona para obtener acuse""" + async def _execute_acuse(): + operation_name = "ACUSE_ASYNC" + service_data = None + + try: + request_data = kwargs + logger.info(f"[TASK] Iniciando consulta de acuse - Pedimento: {request_data.get('pedimento')}") + + self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'}) + await _validate_request_data(request_data) + + self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'}) + service_data = await _get_pedimento_service( + pedimento_id=request_data['pedimento'], + service_type=6, + operation_name=operation_name + ) + + self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'}) + update_success = await _update_service_status( + service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name + ) + + if not update_success: + raise Exception("Error al actualizar estado del servicio a 'En proceso'") + + contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '') + if not contribuyente_id: + logger.error("No se encontró ID de contribuyente en los datos del servicio") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("ID de contribuyente no encontrado") + + credentials = await _get_vucem_credentials(contribuyente_id, operation_name) + + # Obtener documentos digitalizados (e-documents) + self.update_state(state='PROGRESS', meta={'status': 'Obteniendo documentos digitalizados'}) + logger.info("[TASK] Obteniendo documentos digitalizados...") + + try: + edocs = await rest_controller.get_edocs(service_data['pedimento']['id']) + + if not edocs: + logger.warning("No se encontraron documentos digitalizados para el pedimento") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("No se encontraron documentos digitalizados para el pedimento") + + logger.info(f"Se encontraron {len(edocs)} documentos digitalizados") + + except Exception as e: + logger.error(f"Error al obtener documentos digitalizados: {e}") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise e + + # Procesar acuses de documentos digitalizados + self.update_state(state='PROGRESS', meta={'status': 'Procesando acuses de documentos'}) + documentos_procesados = [] + documentos_exitosos = 0 + + logger.info(f"[TASK] Procesando acuses para {len(edocs)} documentos...") + + from utils.peticiones import get_soap_acuse + + for idx, edoc in enumerate(edocs): + documento_info = { + "clave": edoc.get('clave', 'N/A'), + "descripcion": edoc.get('descripcion', 'N/A'), + "numero_edocument": edoc.get('numero_edocument', 'N/A'), + "procesado": False, + "error": None + } + + # Verificar que el documento tenga número de e-document + if not edoc.get('numero_edocument'): + logger.warning(f"Documento {idx + 1} no tiene numero_edocument, saltando...") + documento_info["error"] = "Sin número de e-document" + documentos_procesados.append(documento_info) + continue + + try: + logger.info(f"Procesando acuse para documento {idx + 1}: {edoc['numero_edocument']}") + + soap_response = await get_soap_acuse( + credenciales=credentials, + response_service=service_data, + soap_controller=soap_controller, + edocument=edoc, + idx=idx + 1 + ) + + if soap_response: + documento_info["procesado"] = True + documento_info["documento"] = soap_response.get('documento', {}) + documentos_exitosos += 1 + logger.info(f"Acuse del documento {idx + 1} procesado exitosamente") + else: + documento_info["error"] = "Error en petición SOAP" + logger.warning(f"No se pudo procesar el acuse del documento {idx + 1}") + + except Exception as e: + logger.error(f"Error al procesar acuse del documento {idx + 1}: {e}") + documento_info["error"] = str(e) + # Continuar con los siguientes documentos + + documentos_procesados.append(documento_info) + + # Verificar si se procesó al menos un documento + if documentos_exitosos == 0: + logger.error("No se pudo procesar ningún acuse de documento digitalizado") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("No se pudo procesar ningún acuse de documento digitalizado") + + self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'}) + await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name) + + response_data = await _create_response( + service_data=service_data, + additional_data={ + "acuses": documentos_procesados, + "documentos_procesados": len(documentos_procesados), + "documentos_exitosos": documentos_exitosos + }, + success_message=f"Acuses procesados exitosamente: {documentos_exitosos} de {len(documentos_procesados)} documentos" + ) + + logger.info(f"[TASK] Consulta de acuse completada exitosamente - Servicio: {service_data['id']}") + return response_data + + except Exception as e: + logger.error(f"[TASK] Error en acuse: {e}") + if service_data: + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise e + + return run_async_task(_execute_acuse) + + +# Tarea asíncrona para consultar el estado de un pedimento +@celery_app.task(bind=True, name='tasks.estado_pedimento_task') +def estado_pedimento_task(self, request_data: Dict[str, Any]): + """ + Tarea asíncrona para consultar el estado de un pedimento + """ + async def _execute_estado_pedimento(): + operation_name = "estado_pedimento" + service_data = None + try: + logger.info(f"[TASK] Iniciando consulta de estado de pedimento - Pedimento: {request_data['pedimento']}") + self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'}) + await _validate_request_data(request_data) + + self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'}) + service_data = await _get_pedimento_service( + pedimento_id=request_data['pedimento'], + service_type=1, + operation_name=operation_name + ) + + self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'}) + update_success = await _update_service_status( + service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name + ) + if not update_success: + raise Exception("Error al actualizar estado del servicio") + + contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '') + if not contribuyente_id: + logger.error("No se encontró ID de contribuyente en los datos del servicio") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("ID de contribuyente no encontrado") + + credentials = await _get_vucem_credentials(contribuyente_id, operation_name) + + self.update_state(state='PROGRESS', meta={'status': 'Realizando petición SOAP'}) + logger.info("[TASK] Realizando petición SOAP para estado de pedimento...") + from utils.peticiones import get_soap_pedimento_estado + soap_response = await get_soap_pedimento_estado( + credenciales=credentials, + response_service=service_data, + soap_controller=soap_controller + ) + if not soap_response: + raise Exception("Error en la petición SOAP para estado de pedimento") + + logger.info("[TASK] Petición SOAP para estado de pedimento completada exitosamente") + + self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'}) + await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name) + + response_data = await _create_response( + service_data=service_data, + additional_data={ + "estado_pedimento": soap_response + }, + success_message="Estado de pedimento obtenido exitosamente" + ) + logger.info(f"[TASK] Consulta de estado de pedimento completada exitosamente - Servicio: {service_data['id']}") + return response_data + except Exception as e: + logger.error(f"[TASK] Error en estado de pedimento: {e}") + if service_data: + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise e + return run_async_task(_execute_estado_pedimento) + + +@celery_app.task(bind=True) +def edocument_task(self, **kwargs): + """Tarea asíncrona para obtener edocument""" + async def _execute_edocument(): + operation_name = "EDOCUMENT_ASYNC" + service_data = None + + try: + request_data = kwargs + logger.info(f"[TASK] Iniciando consulta de edocument - Pedimento: {request_data.get('pedimento')}") + + self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'}) + await _validate_request_data(request_data) + + self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'}) + service_data = await _get_pedimento_service( + pedimento_id=request_data['pedimento'], + service_type=7, + operation_name=operation_name + ) + + self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'}) + update_success = await _update_service_status( + service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name + ) + + if not update_success: + raise Exception("Error al actualizar estado del servicio a 'En proceso'") + + contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '') + if not contribuyente_id: + logger.error("No se encontró ID de contribuyente en los datos del servicio") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("ID de contribuyente no encontrado") + + credentials = await _get_vucem_credentials(contribuyente_id, operation_name) + + # Obtener documentos digitalizados (e-documents) + self.update_state(state='PROGRESS', meta={'status': 'Obteniendo documentos digitalizados'}) + logger.info("[TASK] Obteniendo documentos digitalizados...") + + try: + edocs = await rest_controller.get_edocs(service_data['pedimento']['id']) + + if not edocs: + logger.warning("No se encontraron documentos digitalizados para el pedimento") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("No se encontraron documentos digitalizados para el pedimento") + + logger.info(f"Se encontraron {len(edocs)} documentos digitalizados") + + except Exception as e: + logger.error(f"Error al obtener documentos digitalizados: {e}") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise e + + # Procesar edocuments + self.update_state(state='PROGRESS', meta={'status': 'Procesando documentos electrónicos'}) + documentos_procesados = [] + documentos_exitosos = 0 + + logger.info(f"[TASK] Procesando edocuments para {len(edocs)} documentos...") + + from utils.peticiones import get_soap_edocument + + for idx, edoc in enumerate(edocs): + documento_info = { + "clave": edoc.get('clave', 'N/A'), + "descripcion": edoc.get('descripcion', 'N/A'), + "numero_edocument": edoc.get('numero_edocument', 'N/A'), + "procesado": False, + "error": None + } + + # Verificar que el documento tenga número de e-document + if not edoc.get('numero_edocument'): + logger.warning(f"Documento {idx + 1} no tiene numero_edocument, saltando...") + documento_info["error"] = "Sin número de e-document" + documentos_procesados.append(documento_info) + continue + + try: + logger.info(f"Procesando e-document {idx + 1}: {edoc['numero_edocument']}") + + soap_response = await get_soap_edocument( + credenciales=credentials, + response_service=service_data, + soap_controller=soap_controller, + edocument=edoc, + idx=idx + 1 + ) + + if soap_response: + documento_info["procesado"] = True + documento_info["documento"] = soap_response.get('documento', {}) + documentos_exitosos += 1 + logger.info(f"E-document {idx + 1} procesado exitosamente") + # Subir el documento si la petición fue exitosa + try: + upload_result = await _post_edocuments( + response_service=service_data, + identificadores_ed=[edoc.get('numero_edocument')] + ) + documento_info["upload_result"] = upload_result + logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente") + except Exception as upload_err: + documento_info["upload_error"] = str(upload_err) + logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}") + # Subir el documento si la petición fue exitosa + try: + upload_result = await _post_edocuments( + response_service=service_data, + identificadores_ed=[edoc.get('numero_edocument')] + ) + documento_info["upload_result"] = upload_result + logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente") + except Exception as upload_err: + documento_info["upload_error"] = str(upload_err) + logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}") + else: + documento_info["error"] = "Error en petición SOAP" + logger.warning(f"No se pudo procesar el e-document {idx + 1}") + + except Exception as e: + logger.error(f"Error al procesar e-document {idx + 1}: {e}") + documento_info["error"] = str(e) + # Continuar con los siguientes documentos + + documentos_procesados.append(documento_info) + + # Verificar si se procesó al menos un documento + if documentos_exitosos == 0: + logger.error("No se pudo procesar ningún e-document") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("No se pudo procesar ningún e-document") + + self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'}) + await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name) + + response_data = await _create_response( + service_data=service_data, + additional_data={ + "edocuments": documentos_procesados, + "documentos_procesados": len(documentos_procesados), + "documentos_exitosos": documentos_exitosos + }, + success_message=f"E-documents procesados exitosamente: {documentos_exitosos} de {len(documentos_procesados)} documentos" + ) + + logger.info(f"[TASK] Consulta de edocument completada exitosamente - Servicio: {service_data['id']}") + return response_data + + except Exception as e: + logger.error(f"[TASK] Error en edocument: {e}") + if service_data: + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise e + + return run_async_task(_execute_edocument) + + +@celery_app.task(bind=True) +def coves_task(self, **kwargs): + """Tarea asíncrona para obtener coves""" + async def _execute_coves(): + operation_name = "COVES_ASYNC" + service_data = None + + try: + request_data = kwargs + logger.info(f"[TASK] Iniciando consulta de coves - Pedimento: {request_data.get('pedimento')}") + + self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'}) + await _validate_request_data(request_data) + + self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'}) + service_data = await _get_pedimento_service( + pedimento_id=request_data['pedimento'], + service_type=8, + operation_name=operation_name + ) + + self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'}) + update_success = await _update_service_status( + service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name + ) + + if not update_success: + raise Exception("Error al actualizar estado del servicio a 'En proceso'") + + # Para COVES, se usa el RESTController en lugar de SOAP + self.update_state(state='PROGRESS', meta={'status': 'Realizando petición REST'}) + logger.info("[TASK] Realizando petición REST para coves...") + + coves_response = await rest_controller.get_coves(request_data['pedimento']) + + if not coves_response: + raise Exception("Error en la petición REST para coves") + + logger.info("[TASK] Petición REST para coves completada exitosamente") + # Subir documento de coves si la petición fue exitosa + try: + upload_result = await _post_coves( + response_service=service_data, + identificadores_cove=[coves_response[0].get('numero_cove')] if isinstance(coves_response, list) and coves_response else [] + ) + logger.info(f"Documento de coves subido exitosamente: {upload_result}") + except Exception as upload_err: + logger.error(f"Error al subir documento de coves: {upload_err}") + + self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'}) + await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name) + + response_data = await _create_response( + service_data=service_data, + additional_data={ + "coves": coves_response + }, + success_message="Coves obtenidos exitosamente" + ) + + logger.info(f"[TASK] Consulta de coves completada exitosamente - Servicio: {service_data['id']}") + return response_data + + except Exception as e: + logger.error(f"[TASK] Error en coves: {e}") + if service_data: + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise e + + return run_async_task(_execute_coves) + + +@celery_app.task(bind=True) +def acuse_cove_task(self, **kwargs): + """Tarea asíncrona para obtener acuse de COVE""" + async def _execute_acuse_cove(): + operation_name = "ACUSE_COVE_ASYNC" + service_data = None + + try: + request_data = kwargs + logger.info(f"[TASK] Iniciando consulta de acuse COVE - Pedimento: {request_data.get('pedimento')}") + + self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'}) + await _validate_request_data(request_data) + + self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'}) + service_data = await _get_pedimento_service( + pedimento_id=request_data['pedimento'], + service_type=9, + operation_name=operation_name + ) + + self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'}) + update_success = await _update_service_status( + service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name + ) + + if not update_success: + raise Exception("Error al actualizar estado del servicio a 'En proceso'") + + contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '') + if not contribuyente_id: + logger.error("No se encontró ID de contribuyente en los datos del servicio") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("ID de contribuyente no encontrado") + + credentials = await _get_vucem_credentials(contribuyente_id, operation_name) + + # Obtener COVES + self.update_state(state='PROGRESS', meta={'status': 'Obteniendo COVES'}) + logger.info("[TASK] Obteniendo COVES...") + + try: + coves = await rest_controller.get_coves(service_data['pedimento']['id']) + + if not coves: + logger.warning("No se encontraron COVES para el pedimento") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("No se encontraron COVES para el pedimento") + + logger.info(f"Se encontraron {len(coves)} COVES") + + except Exception as e: + logger.error(f"Error al obtener COVES: {e}") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise e + + # Procesar acuses de COVES + self.update_state(state='PROGRESS', meta={'status': 'Procesando acuses de COVES'}) + documentos_procesados = [] + documentos_exitosos = 0 + + logger.info(f"[TASK] Procesando acuses para {len(coves)} COVES...") + + from utils.peticiones import get_soap_acuseCOVE + + for idx, cove in enumerate(coves): + documento_info = { + "clave": cove.get('clave', 'N/A'), + "descripcion": cove.get('descripcion', 'N/A'), + "numero_cove": cove.get('numero_cove', 'N/A'), + "procesado": False, + "error": None + } + + # Verificar que el documento tenga número de COVE + if not cove.get('numero_cove'): + logger.warning(f"COVE {idx + 1} no tiene numero_cove, saltando...") + documento_info["error"] = "Sin número de COVE" + documentos_procesados.append(documento_info) + continue + + try: + logger.info(f"Procesando acuse para COVE {idx + 1}: {cove['numero_cove']}") + + soap_response = await get_soap_acuseCOVE( + credenciales=credentials, + response_service=service_data, + soap_controller=soap_controller, + cove=cove, + idx=idx + 1 + ) + + if soap_response: + documento_info["procesado"] = True + documento_info["documento"] = soap_response.get('documento', {}) + documentos_exitosos += 1 + logger.info(f"Acuse de COVE {idx + 1} procesado exitosamente") + # Subir el documento de COVE si la petición fue exitosa + try: + upload_result = await _post_coves( + response_service=service_data, + identificadores_cove=[cove.get('numero_cove')] + ) + documento_info["upload_result"] = upload_result + logger.info(f"Documento COVE {cove.get('numero_cove')} subido exitosamente") + except Exception as upload_err: + documento_info["upload_error"] = str(upload_err) + logger.error(f"Error al subir documento COVE {cove.get('numero_cove')}: {upload_err}") + else: + documento_info["error"] = "Error en petición SOAP" + logger.warning(f"No se pudo procesar el acuse de COVE {idx + 1}") + + except Exception as e: + logger.error(f"Error al procesar acuse de COVE {idx + 1}: {e}") + documento_info["error"] = str(e) + # Continuar con los siguientes documentos + + documentos_procesados.append(documento_info) + + # Verificar si se procesó al menos un documento + if documentos_exitosos == 0: + logger.error("No se pudo procesar ningún acuse de COVE") + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("No se pudo procesar ningún acuse de COVE") + + self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'}) + await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name) + + response_data = await _create_response( + service_data=service_data, + additional_data={ + "acuses_cove": documentos_procesados, + "documentos_procesados": len(documentos_procesados), + "documentos_exitosos": documentos_exitosos + }, + success_message=f"Acuses de COVE procesados exitosamente: {documentos_exitosos} de {len(documentos_procesados)} documentos" + ) + + logger.info(f"[TASK] Consulta de acuse COVE completada exitosamente - Servicio: {service_data['id']}") + return response_data + + except Exception as e: + logger.error(f"[TASK] Error en acuse COVE: {e}") + if service_data: + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise e + + return run_async_task(_execute_acuse_cove) \ No newline at end of file