Se agrego celery y respuestas instantaneas
This commit is contained in:
33
Dockerfile
33
Dockerfile
@@ -17,18 +17,23 @@ RUN pip install --user --no-cache-dir --verbose -r requirements.txt
|
||||
# Imagen final
|
||||
FROM python:3.11-slim
|
||||
|
||||
# Establecer variables de entorno para FastAPI
|
||||
# Establecer variables de entorno para FastAPI y Celery
|
||||
ENV PYTHONDONTWRITEBYTECODE=1
|
||||
ENV PYTHONUNBUFFERED=1
|
||||
ENV PYTHONPATH=/app
|
||||
ENV PATH=/home/fastapi/.local/bin:$PATH
|
||||
ENV C_FORCE_ROOT=1
|
||||
ENV REDIS_HOST=redis
|
||||
ENV REDIS_PORT=6379
|
||||
ENV REDIS_DB=0
|
||||
|
||||
# Crear usuario no-root para seguridad
|
||||
RUN groupadd -r fastapi && useradd -r -g fastapi fastapi
|
||||
|
||||
# Instalar curl para healthcheck
|
||||
# Instalar dependencias del sistema para Celery y Redis (sin supervisor)
|
||||
RUN apt-get update && apt-get install -y \
|
||||
curl \
|
||||
redis-tools \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Establecer directorio de trabajo
|
||||
@@ -37,23 +42,37 @@ WORKDIR /app
|
||||
# Copiar dependencias instaladas desde el builder
|
||||
COPY --from=builder /root/.local /home/fastapi/.local
|
||||
|
||||
# Cambiar ownership de los archivos de Python packages al usuario fastapi
|
||||
RUN chown -R fastapi:fastapi /home/fastapi/.local
|
||||
|
||||
# Instalar supervisor para manejar múltiples procesos
|
||||
RUN pip install supervisor
|
||||
|
||||
# Copiar el código de la aplicación
|
||||
COPY . .
|
||||
|
||||
# Crear directorios necesarios y establecer permisos
|
||||
RUN mkdir -p /app/logs /app/uploads /app/temp && \
|
||||
RUN mkdir -p /app/logs /app/uploads /app/temp /var/log/supervisor /etc/supervisor/conf.d && \
|
||||
chown -R fastapi:fastapi /app && \
|
||||
chmod -R 755 /app
|
||||
chmod -R 755 /app && \
|
||||
chown -R fastapi:fastapi /var/log/supervisor /etc/supervisor
|
||||
|
||||
# Copiar configuraciones de Supervisor (como root antes de cambiar usuario)
|
||||
COPY supervisord.conf /etc/supervisor/supervisord.conf
|
||||
COPY supervisor_celery.conf /etc/supervisor/conf.d/efc_celery.conf
|
||||
|
||||
# Cambiar al usuario no-root
|
||||
USER fastapi
|
||||
|
||||
# Exponer puerto
|
||||
# Exponer puertos
|
||||
EXPOSE 8001
|
||||
|
||||
# Healthcheck para verificar que el servicio está funcionando
|
||||
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
|
||||
CMD curl -f http://localhost:8001/api/v1/health || exit 1
|
||||
|
||||
# Comando por defecto con configuración optimizada
|
||||
CMD ["sh", "-c", "uvicorn main:app --host 0.0.0.0 --port 8001 --workers 32 --reload"]
|
||||
# Cambiar temporalmente a root para iniciar supervisor
|
||||
USER root
|
||||
|
||||
# Comando por defecto: usar Supervisor para gestionar FastAPI y Celery
|
||||
CMD ["supervisord", "-n", "-c", "/etc/supervisor/supervisord.conf"]
|
||||
@@ -1,6 +1,7 @@
|
||||
# Multi-stage build para optimizar el tamaño de la imagen
|
||||
FROM python:3.11-slim AS builder
|
||||
|
||||
# Instalar dependencias de compilación
|
||||
RUN apt-get update && apt-get install -y \
|
||||
gcc \
|
||||
g++ \
|
||||
@@ -11,23 +12,53 @@ WORKDIR /app
|
||||
COPY requirements.txt .
|
||||
RUN pip install --user --no-cache-dir --verbose -r requirements.txt
|
||||
|
||||
# Imagen final de producción
|
||||
FROM python:3.11-slim
|
||||
|
||||
# Variables de entorno para producción
|
||||
ENV PYTHONDONTWRITEBYTECODE=1
|
||||
ENV PYTHONUNBUFFERED=1
|
||||
ENV PYTHONPATH=/app
|
||||
ENV PATH=/home/fastapi/.local/bin:$PATH
|
||||
ENV C_FORCE_ROOT=1
|
||||
ENV REDIS_HOST=redis
|
||||
ENV REDIS_PORT=6379
|
||||
ENV REDIS_DB=0
|
||||
|
||||
# Crear usuario no-root para seguridad
|
||||
RUN groupadd -r fastapi && useradd -r -g fastapi fastapi
|
||||
RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Instalar dependencias del sistema para Celery y Redis (sin supervisor)
|
||||
RUN apt-get update && apt-get install -y \
|
||||
curl \
|
||||
redis-tools \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copiar dependencias instaladas desde el builder
|
||||
COPY --from=builder /root/.local /home/fastapi/.local
|
||||
|
||||
# Instalar supervisor para manejar múltiples procesos
|
||||
RUN pip install supervisor
|
||||
|
||||
# Copiar el código de la aplicación
|
||||
COPY . .
|
||||
|
||||
USER fastapi
|
||||
# Crear directorios necesarios y establecer permisos
|
||||
RUN mkdir -p /app/logs /app/uploads /app/temp /var/log/supervisor /etc/supervisor/conf.d && \
|
||||
chown -R fastapi:fastapi /app && \
|
||||
chmod -R 755 /app && \
|
||||
chown -R fastapi:fastapi /var/log/supervisor /etc/supervisor
|
||||
|
||||
# Copiar configuraciones de Supervisor para producción (como root antes de cambiar usuario)
|
||||
COPY supervisord.conf /etc/supervisor/supervisord.conf
|
||||
COPY supervisor_celery.conf /etc/supervisor/conf.d/efc_celery.conf
|
||||
|
||||
# Cambiar temporalmente a root para Supervisor
|
||||
USER root
|
||||
|
||||
EXPOSE 8001
|
||||
|
||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8001", "--workers", "12"]
|
||||
# Comando de producción: iniciar Supervisor para gestionar todos los procesos
|
||||
CMD ["supervisord", "-n", "-c", "/etc/supervisor/supervisord.conf"]
|
||||
|
||||
@@ -3,7 +3,8 @@ from fastapi import APIRouter
|
||||
# Debes usar paréntesis () para hacer importaciones multilínea.
|
||||
from api.api_v1.endpoints import (
|
||||
health,
|
||||
pedimentos
|
||||
pedimentos,
|
||||
async_pedimentos
|
||||
)
|
||||
|
||||
api_router = APIRouter()
|
||||
@@ -11,4 +12,5 @@ api_router = APIRouter()
|
||||
# Incluir routers de endpoints
|
||||
api_router.include_router(health.router, tags=["health"])
|
||||
api_router.include_router(pedimentos.router, tags=["pedimentos"])
|
||||
api_router.include_router(async_pedimentos.router, tags=["async-pedimentos"])
|
||||
|
||||
|
||||
493
api/api_v1/endpoints/async_pedimentos.py
Normal file
493
api/api_v1/endpoints/async_pedimentos.py
Normal file
@@ -0,0 +1,493 @@
|
||||
from fastapi import APIRouter, HTTPException, BackgroundTasks
|
||||
from schemas.pedimentoSchema import PedimentoRequest
|
||||
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema
|
||||
from fastapi.responses import JSONResponse
|
||||
from celery_app import celery_app
|
||||
from tasks import pedimento_completo_task
|
||||
import logging
|
||||
from typing import Dict, Any
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
|
||||
@router.post("/async/services/pedimento_completo")
|
||||
async def async_get_pedimento_completo(request: ServiceRemesaSchema):
|
||||
"""
|
||||
Agenda la tarea de obtener pedimento completo de forma asíncrona.
|
||||
|
||||
Este endpoint:
|
||||
1. Valida los datos de entrada
|
||||
2. Agenda la tarea en Celery
|
||||
3. Retorna inmediatamente el ID de la tarea para seguimiento
|
||||
|
||||
Args:
|
||||
request: ServiceRemesaSchema con pedimento y organización
|
||||
|
||||
Returns:
|
||||
JSONResponse con task_id para consultar el estado de la tarea
|
||||
|
||||
Raises:
|
||||
HTTPException: En caso de errores de validación
|
||||
"""
|
||||
try:
|
||||
# Convertir request a diccionario
|
||||
request_data = request.model_dump()
|
||||
|
||||
logger.info(f"Agendando tarea de pedimento completo - Pedimento: {request_data['pedimento']}")
|
||||
|
||||
# Agendar la tarea en Celery
|
||||
task = pedimento_completo_task.delay(request_data)
|
||||
|
||||
# Crear respuesta inmediata
|
||||
response_data = {
|
||||
"success": True,
|
||||
"message": "Tarea agendada exitosamente. La consulta del pedimento completo se está procesando en segundo plano.",
|
||||
"task_id": task.id,
|
||||
"pedimento": request_data['pedimento'],
|
||||
"organizacion": request_data.get('organizacion'),
|
||||
"status": "PENDING",
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"estimated_completion": "2-5 minutos",
|
||||
"check_status_url": f"/async/task-status/{task.id}"
|
||||
}
|
||||
|
||||
logger.info(f"Tarea agendada exitosamente - Task ID: {task.id}")
|
||||
return JSONResponse(content=response_data, status_code=202)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al agendar tarea de pedimento completo: {e}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error al agendar la tarea: {str(e)}"
|
||||
)
|
||||
|
||||
@router.get("/async/task-status/{task_id}")
|
||||
async def get_task_status(task_id: str):
|
||||
"""
|
||||
Consulta el estado de una tarea agendada.
|
||||
|
||||
Args:
|
||||
task_id: ID de la tarea a consultar
|
||||
|
||||
Returns:
|
||||
JSONResponse con el estado actual de la tarea
|
||||
|
||||
Raises:
|
||||
HTTPException: Si la tarea no existe o hay errores
|
||||
"""
|
||||
try:
|
||||
# Obtener el resultado de la tarea desde Celery
|
||||
task_result = celery_app.AsyncResult(task_id)
|
||||
|
||||
if not task_result:
|
||||
raise HTTPException(status_code=404, detail="Tarea no encontrada")
|
||||
|
||||
# Preparar respuesta según el estado
|
||||
response_data = {
|
||||
"task_id": task_id,
|
||||
"status": task_result.status,
|
||||
"timestamp": datetime.utcnow().isoformat()
|
||||
}
|
||||
|
||||
if task_result.status == 'PENDING':
|
||||
response_data.update({
|
||||
"message": "La tarea está pendiente de procesamiento",
|
||||
"progress": 0
|
||||
})
|
||||
elif task_result.status == 'PROGRESS':
|
||||
meta = task_result.info
|
||||
response_data.update({
|
||||
"message": f"Procesando: {meta.get('status', 'En progreso')}",
|
||||
"progress": meta.get('progress', 50),
|
||||
"current_step": meta.get('status')
|
||||
})
|
||||
elif task_result.status == 'SUCCESS':
|
||||
response_data.update({
|
||||
"message": "Tarea completada exitosamente",
|
||||
"progress": 100,
|
||||
"result": task_result.result
|
||||
})
|
||||
elif task_result.status == 'FAILURE':
|
||||
response_data.update({
|
||||
"message": f"Error en la tarea: {str(task_result.info)}",
|
||||
"progress": 0,
|
||||
"error": str(task_result.info)
|
||||
})
|
||||
else:
|
||||
response_data.update({
|
||||
"message": f"Estado desconocido: {task_result.status}",
|
||||
"progress": 0
|
||||
})
|
||||
|
||||
# Determinar código de estado HTTP
|
||||
status_code = 200
|
||||
if task_result.status == 'FAILURE':
|
||||
status_code = 500
|
||||
elif task_result.status in ['PENDING', 'PROGRESS']:
|
||||
status_code = 202
|
||||
|
||||
return JSONResponse(content=response_data, status_code=status_code)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al consultar estado de tarea {task_id}: {e}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error al consultar el estado de la tarea: {str(e)}"
|
||||
)
|
||||
|
||||
@router.get("/async/tasks/active")
|
||||
async def get_active_tasks():
|
||||
"""
|
||||
Lista todas las tareas activas en el sistema.
|
||||
|
||||
Returns:
|
||||
JSONResponse con la lista de tareas activas
|
||||
"""
|
||||
try:
|
||||
# Obtener tareas activas desde Celery
|
||||
inspect = celery_app.control.inspect()
|
||||
active_tasks = inspect.active()
|
||||
scheduled_tasks = inspect.scheduled()
|
||||
|
||||
response_data = {
|
||||
"active_tasks": active_tasks or {},
|
||||
"scheduled_tasks": scheduled_tasks or {},
|
||||
"timestamp": datetime.utcnow().isoformat()
|
||||
}
|
||||
|
||||
return JSONResponse(content=response_data, status_code=200)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al obtener tareas activas: {e}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error al obtener tareas activas: {str(e)}"
|
||||
)
|
||||
|
||||
@router.delete("/async/task/{task_id}")
|
||||
async def cancel_task(task_id: str):
|
||||
"""
|
||||
Cancela una tarea agendada.
|
||||
|
||||
Args:
|
||||
task_id: ID de la tarea a cancelar
|
||||
|
||||
Returns:
|
||||
JSONResponse confirmando la cancelación
|
||||
|
||||
Raises:
|
||||
HTTPException: Si hay errores al cancelar
|
||||
"""
|
||||
try:
|
||||
# Revocar la tarea
|
||||
celery_app.control.revoke(task_id, terminate=True)
|
||||
|
||||
response_data = {
|
||||
"success": True,
|
||||
"message": f"Tarea {task_id} cancelada exitosamente",
|
||||
"task_id": task_id,
|
||||
"timestamp": datetime.utcnow().isoformat()
|
||||
}
|
||||
|
||||
logger.info(f"Tarea cancelada: {task_id}")
|
||||
return JSONResponse(content=response_data, status_code=200)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al cancelar tarea {task_id}: {e}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error al cancelar la tarea: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
@router.post("/async/services/partidas")
|
||||
async def async_get_partidas(request: ServiceRemesaSchema):
|
||||
"""
|
||||
Agenda una tarea asíncrona para obtener las partidas de un pedimento.
|
||||
|
||||
Args:
|
||||
request: Datos del pedimento y organización
|
||||
|
||||
Returns:
|
||||
JSONResponse con el task_id y información de la tarea agendada
|
||||
|
||||
Raises:
|
||||
HTTPException: Si hay errores en la validación o al agendar la tarea
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Solicitando consulta asíncrona de partidas - Pedimento: {request.pedimento}")
|
||||
|
||||
# Preparar datos para la tarea
|
||||
request_data = request.model_dump()
|
||||
|
||||
# Agendar la tarea en Celery
|
||||
from tasks import partidas_task
|
||||
task = partidas_task.delay(**request_data)
|
||||
|
||||
# Preparar respuesta
|
||||
response_data = {
|
||||
"success": True,
|
||||
"message": "Tarea agendada exitosamente. La consulta de partidas se está procesando en segundo plano.",
|
||||
"task_id": task.id,
|
||||
"pedimento": request_data['pedimento'],
|
||||
"organizacion": request_data.get('organizacion'),
|
||||
"status": "PENDING",
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"estimated_completion": "1-3 minutos",
|
||||
"check_status_url": f"/async/task-status/{task.id}"
|
||||
}
|
||||
|
||||
logger.info(f"Tarea de partidas agendada exitosamente - Task ID: {task.id}")
|
||||
return JSONResponse(content=response_data, status_code=202)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al agendar tarea de partidas: {e}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error al agendar la tarea: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
@router.post("/async/services/remesas")
|
||||
async def async_get_remesas(request: ServiceRemesaSchema):
|
||||
"""
|
||||
Agenda una tarea asíncrona para obtener las remesas de un pedimento.
|
||||
|
||||
Args:
|
||||
request: Datos del pedimento y organización
|
||||
|
||||
Returns:
|
||||
JSONResponse con el task_id y información de la tarea agendada
|
||||
|
||||
Raises:
|
||||
HTTPException: Si hay errores en la validación o al agendar la tarea
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Solicitando consulta asíncrona de remesas - Pedimento: {request.pedimento}")
|
||||
|
||||
# Preparar datos para la tarea
|
||||
request_data = request.model_dump()
|
||||
|
||||
# Agendar la tarea en Celery
|
||||
from tasks import remesas_task
|
||||
task = remesas_task.delay(**request_data)
|
||||
|
||||
# Preparar respuesta
|
||||
response_data = {
|
||||
"success": True,
|
||||
"message": "Tarea agendada exitosamente. La consulta de remesas se está procesando en segundo plano.",
|
||||
"task_id": task.id,
|
||||
"pedimento": request_data['pedimento'],
|
||||
"organizacion": request_data.get('organizacion'),
|
||||
"status": "PENDING",
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"estimated_completion": "1-3 minutos",
|
||||
"check_status_url": f"/async/task-status/{task.id}"
|
||||
}
|
||||
|
||||
logger.info(f"Tarea de remesas agendada exitosamente - Task ID: {task.id}")
|
||||
return JSONResponse(content=response_data, status_code=202)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al agendar tarea de remesas: {e}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error al agendar la tarea: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
@router.post("/async/services/acuse")
|
||||
async def async_get_acuse(request: ServiceRemesaSchema):
|
||||
"""
|
||||
Agenda una tarea asíncrona para obtener el acuse de un edocument.
|
||||
|
||||
Args:
|
||||
request: Datos del pedimento y organización
|
||||
|
||||
Returns:
|
||||
JSONResponse con el task_id y información de la tarea agendada
|
||||
|
||||
Raises:
|
||||
HTTPException: Si hay errores en la validación o al agendar la tarea
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Solicitando consulta asíncrona de acuse - Pedimento: {request.pedimento}")
|
||||
|
||||
# Preparar datos para la tarea
|
||||
request_data = request.model_dump()
|
||||
|
||||
# Agendar la tarea en Celery
|
||||
from tasks import acuse_task
|
||||
task = acuse_task.delay(**request_data)
|
||||
|
||||
# Preparar respuesta
|
||||
response_data = {
|
||||
"success": True,
|
||||
"message": "Tarea agendada exitosamente. La consulta de acuse se está procesando en segundo plano.",
|
||||
"task_id": task.id,
|
||||
"pedimento": request_data['pedimento'],
|
||||
"organizacion": request_data.get('organizacion'),
|
||||
"status": "PENDING",
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"estimated_completion": "1-3 minutos",
|
||||
"check_status_url": f"/async/task-status/{task.id}"
|
||||
}
|
||||
|
||||
logger.info(f"Tarea de acuse agendada exitosamente - Task ID: {task.id}")
|
||||
return JSONResponse(content=response_data, status_code=202)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al agendar tarea de acuse: {e}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error al agendar la tarea: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
@router.post("/async/services/edocument")
|
||||
async def async_get_edocument(request: ServiceRemesaSchema):
|
||||
"""
|
||||
Agenda una tarea asíncrona para obtener edocuments de un pedimento.
|
||||
|
||||
Args:
|
||||
request: Datos del pedimento y organización
|
||||
|
||||
Returns:
|
||||
JSONResponse con el task_id y información de la tarea agendada
|
||||
|
||||
Raises:
|
||||
HTTPException: Si hay errores en la validación o al agendar la tarea
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Solicitando consulta asíncrona de edocuments - Pedimento: {request.pedimento}")
|
||||
|
||||
# Preparar datos para la tarea
|
||||
request_data = request.model_dump()
|
||||
|
||||
# Agendar la tarea en Celery
|
||||
from tasks import edocument_task
|
||||
task = edocument_task.delay(**request_data)
|
||||
|
||||
# Preparar respuesta
|
||||
response_data = {
|
||||
"success": True,
|
||||
"message": "Tarea agendada exitosamente. La consulta de edocuments se está procesando en segundo plano.",
|
||||
"task_id": task.id,
|
||||
"pedimento": request_data['pedimento'],
|
||||
"organizacion": request_data.get('organizacion'),
|
||||
"status": "PENDING",
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"estimated_completion": "2-4 minutos",
|
||||
"check_status_url": f"/async/task-status/{task.id}"
|
||||
}
|
||||
|
||||
logger.info(f"Tarea de edocuments agendada exitosamente - Task ID: {task.id}")
|
||||
return JSONResponse(content=response_data, status_code=202)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al agendar tarea de edocuments: {e}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error al agendar la tarea: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
@router.post("/async/services/coves")
|
||||
async def async_get_coves(request: ServiceRemesaSchema):
|
||||
"""
|
||||
Agenda una tarea asíncrona para obtener coves de un pedimento.
|
||||
|
||||
Args:
|
||||
request: Datos del pedimento y organización
|
||||
|
||||
Returns:
|
||||
JSONResponse con el task_id y información de la tarea agendada
|
||||
|
||||
Raises:
|
||||
HTTPException: Si hay errores en la validación o al agendar la tarea
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Solicitando consulta asíncrona de coves - Pedimento: {request.pedimento}")
|
||||
|
||||
# Preparar datos para la tarea
|
||||
request_data = request.model_dump()
|
||||
|
||||
# Agendar la tarea en Celery
|
||||
from tasks import coves_task
|
||||
task = coves_task.delay(**request_data)
|
||||
|
||||
# Preparar respuesta
|
||||
response_data = {
|
||||
"success": True,
|
||||
"message": "Tarea agendada exitosamente. La consulta de coves se está procesando en segundo plano.",
|
||||
"task_id": task.id,
|
||||
"pedimento": request_data['pedimento'],
|
||||
"organizacion": request_data.get('organizacion'),
|
||||
"status": "PENDING",
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"estimated_completion": "1-2 minutos",
|
||||
"check_status_url": f"/async/task-status/{task.id}"
|
||||
}
|
||||
|
||||
logger.info(f"Tarea de coves agendada exitosamente - Task ID: {task.id}")
|
||||
return JSONResponse(content=response_data, status_code=202)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al agendar tarea de coves: {e}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error al agendar la tarea: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
@router.post("/async/services/acuse-cove")
|
||||
async def async_get_acuse_cove(request: ServiceRemesaSchema):
|
||||
"""
|
||||
Agenda una tarea asíncrona para obtener el acuse de un COVE.
|
||||
|
||||
Args:
|
||||
request: Datos del pedimento y organización
|
||||
|
||||
Returns:
|
||||
JSONResponse con el task_id y información de la tarea agendada
|
||||
|
||||
Raises:
|
||||
HTTPException: Si hay errores en la validación o al agendar la tarea
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Solicitando consulta asíncrona de acuse COVE - Pedimento: {request.pedimento}")
|
||||
|
||||
# Preparar datos para la tarea
|
||||
request_data = request.model_dump()
|
||||
|
||||
# Agendar la tarea en Celery
|
||||
from tasks import acuse_cove_task
|
||||
task = acuse_cove_task.delay(**request_data)
|
||||
|
||||
# Preparar respuesta
|
||||
response_data = {
|
||||
"success": True,
|
||||
"message": "Tarea agendada exitosamente. La consulta de acuse COVE se está procesando en segundo plano.",
|
||||
"task_id": task.id,
|
||||
"pedimento": request_data['pedimento'],
|
||||
"organizacion": request_data.get('organizacion'),
|
||||
"status": "PENDING",
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"estimated_completion": "1-3 minutos",
|
||||
"check_status_url": f"/async/task-status/{task.id}"
|
||||
}
|
||||
|
||||
logger.info(f"Tarea de acuse COVE agendada exitosamente - Task ID: {task.id}")
|
||||
return JSONResponse(content=response_data, status_code=202)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al agendar tarea de acuse COVE: {e}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error al agendar la tarea: {str(e)}"
|
||||
)
|
||||
@@ -361,6 +361,7 @@ async def get_pedimento_completo(request: ServiceRemesaSchema):
|
||||
update_success = await _update_service_status(
|
||||
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
||||
)
|
||||
|
||||
if not update_success:
|
||||
raise HTTPException(status_code=500, detail="Error al actualizar estado del servicio")
|
||||
|
||||
|
||||
27
celery_app.py
Normal file
27
celery_app.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from celery import Celery
|
||||
from core.config import settings
|
||||
import os
|
||||
|
||||
# Configuración de Celery
|
||||
celery_app = Celery(
|
||||
"microservice",
|
||||
broker=f"redis://{os.getenv('REDIS_HOST', 'localhost')}:{os.getenv('REDIS_PORT', '6379')}/{os.getenv('REDIS_DB', '0')}",
|
||||
backend=f"redis://{os.getenv('REDIS_HOST', 'localhost')}:{os.getenv('REDIS_PORT', '6379')}/{os.getenv('REDIS_DB', '0')}",
|
||||
include=['tasks']
|
||||
)
|
||||
|
||||
# Configuración adicional
|
||||
celery_app.conf.update(
|
||||
task_serializer='json',
|
||||
accept_content=['json'],
|
||||
result_serializer='json',
|
||||
timezone='UTC',
|
||||
enable_utc=True,
|
||||
task_track_started=True,
|
||||
task_time_limit=3600, # 1 hour timeout
|
||||
worker_prefetch_multiplier=1,
|
||||
worker_max_tasks_per_child=1000,
|
||||
)
|
||||
|
||||
# Autodiscovery of tasks
|
||||
celery_app.autodiscover_tasks()
|
||||
19
main.py
19
main.py
@@ -2,8 +2,8 @@ import logging
|
||||
from fastapi import FastAPI
|
||||
from core.config import settings
|
||||
from api.api_v1.api import api_router
|
||||
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
# Configuración inicial del logging (debe estar al inicio del archivo)
|
||||
# logging.config.dictConfig({
|
||||
# "version": 1,
|
||||
@@ -44,7 +44,7 @@ def create_application() -> FastAPI:
|
||||
"""Función factory para crear la aplicación FastAPI"""
|
||||
application = FastAPI(
|
||||
title=settings.APP_NAME,
|
||||
description="EFC Microservice - Un microservicio profesional por AduanaSoft",
|
||||
description="EFC Microservice - Un microservicio profesional por AduanaSoft con soporte para tareas asíncronas",
|
||||
version=settings.APP_VERSION,
|
||||
debug=settings.DEBUG,
|
||||
)
|
||||
@@ -66,6 +66,21 @@ app.add_middleware(
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# Event handlers para Celery
|
||||
@app.on_event("startup")
|
||||
async def startup_event():
|
||||
"""Inicialización al arrancar la aplicación"""
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.info("Iniciando EFC Microservice con soporte asíncrono")
|
||||
logger.info("Celery configurado para tareas en segundo plano")
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown_event():
|
||||
"""Limpieza al cerrar la aplicación"""
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.info("Cerrando EFC Microservice")
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(
|
||||
|
||||
BIN
output.pdf
BIN
output.pdf
Binary file not shown.
@@ -21,3 +21,6 @@ urllib3==2.5.0
|
||||
uvicorn==0.35.0
|
||||
python-dotenv
|
||||
cryptography
|
||||
celery==5.3.4
|
||||
redis==5.0.1
|
||||
supervisor==4.2.5
|
||||
|
||||
54
supervisor_celery.conf
Normal file
54
supervisor_celery.conf
Normal file
@@ -0,0 +1,54 @@
|
||||
[program:efc_celery_worker]
|
||||
command=/usr/local/bin/python -m celery -A celery_app worker --loglevel=info --pool=solo
|
||||
directory=/app
|
||||
user=fastapi
|
||||
numprocs=1
|
||||
stdout_logfile=/app/logs/celery_worker.log
|
||||
stderr_logfile=/app/logs/celery_worker_error.log
|
||||
stdout_logfile_maxbytes=50MB
|
||||
stderr_logfile_maxbytes=50MB
|
||||
autostart=true
|
||||
autorestart=true
|
||||
startsecs=10
|
||||
stopwaitsecs=600
|
||||
killasgroup=true
|
||||
priority=998
|
||||
environment=REDIS_HOST="redis_microservice_dev",REDIS_PORT="6379",REDIS_DB="0",PYTHONPATH="/app:/home/fastapi/.local/lib/python3.11/site-packages",PATH="/home/fastapi/.local/bin:/usr/local/bin:%(ENV_PATH)s"
|
||||
|
||||
[program:efc_celery_beat]
|
||||
command=/usr/local/bin/python -m celery -A celery_app beat --loglevel=info
|
||||
directory=/app
|
||||
user=fastapi
|
||||
numprocs=1
|
||||
stdout_logfile=/app/logs/celery_beat.log
|
||||
stderr_logfile=/app/logs/celery_beat_error.log
|
||||
stdout_logfile_maxbytes=50MB
|
||||
stderr_logfile_maxbytes=50MB
|
||||
autostart=false
|
||||
autorestart=true
|
||||
startsecs=10
|
||||
stopwaitsecs=600
|
||||
killasgroup=true
|
||||
priority=999
|
||||
environment=REDIS_HOST="redis_microservice_dev",REDIS_PORT="6379",REDIS_DB="0",PYTHONPATH="/app:/home/fastapi/.local/lib/python3.11/site-packages",PATH="/home/fastapi/.local/bin:/usr/local/bin:%(ENV_PATH)s"
|
||||
|
||||
[program:efc_fastapi]
|
||||
command=/usr/local/bin/python -m uvicorn main:app --host 0.0.0.0 --port 8001 --workers 1
|
||||
directory=/app
|
||||
user=fastapi
|
||||
numprocs=1
|
||||
stdout_logfile=/app/logs/fastapi.log
|
||||
stderr_logfile=/app/logs/fastapi_error.log
|
||||
stdout_logfile_maxbytes=50MB
|
||||
stderr_logfile_maxbytes=50MB
|
||||
autostart=true
|
||||
autorestart=true
|
||||
startsecs=10
|
||||
stopwaitsecs=600
|
||||
killasgroup=true
|
||||
priority=997
|
||||
environment=PYTHONPATH="/app:/home/fastapi/.local/lib/python3.11/site-packages",PATH="/home/fastapi/.local/bin:/usr/local/bin:%(ENV_PATH)s"
|
||||
|
||||
[group:efc_services]
|
||||
programs=efc_celery_worker,efc_fastapi
|
||||
priority=999
|
||||
18
supervisord.conf
Normal file
18
supervisord.conf
Normal file
@@ -0,0 +1,18 @@
|
||||
[unix_http_server]
|
||||
file=/tmp/supervisor.sock
|
||||
|
||||
[supervisord]
|
||||
logfile=/var/log/supervisor/supervisord.log
|
||||
pidfile=/tmp/supervisord.pid
|
||||
childlogdir=/var/log/supervisor
|
||||
nodaemon=true
|
||||
user=root
|
||||
|
||||
[rpcinterface:supervisor]
|
||||
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
|
||||
|
||||
[supervisorctl]
|
||||
serverurl=unix:///tmp/supervisor.sock
|
||||
|
||||
[include]
|
||||
files = /etc/supervisor/conf.d/*.conf
|
||||
893
tasks.py
Normal file
893
tasks.py
Normal file
@@ -0,0 +1,893 @@
|
||||
from celery import Celery
|
||||
from celery_app import celery_app
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Dict, Any
|
||||
from contextlib import asynccontextmanager
|
||||
from controllers.RESTController import rest_controller
|
||||
from controllers.SOAPController import soap_controller
|
||||
from utils.peticiones import (
|
||||
get_soap_acuseCOVE, get_soap_cove, get_soap_pedimento_completo,
|
||||
get_soap_remesas, get_soap_partidas, get_soap_acuse, get_soap_edocument
|
||||
)
|
||||
from utils.servicios import (
|
||||
_validate_request_data,
|
||||
_get_pedimento_service,
|
||||
_update_service_status,
|
||||
_get_vucem_credentials,
|
||||
_create_response,
|
||||
_post_edocuments,
|
||||
_schedule_follow_up_services,
|
||||
_post_coves
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Estados del servicio
|
||||
ESTADO_CREADO = 1
|
||||
ESTADO_EN_PROCESO = 2
|
||||
ESTADO_FINALIZADO = 3
|
||||
ESTADO_ERROR = 4
|
||||
|
||||
def run_async_task(async_func, *args, **kwargs):
|
||||
"""Helper function to run async functions in Celery tasks"""
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
return loop.run_until_complete(async_func(*args, **kwargs))
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
@celery_app.task(bind=True, name='tasks.pedimento_completo_task')
|
||||
def pedimento_completo_task(self, request_data: Dict[str, Any]):
|
||||
"""
|
||||
Tarea asíncrona para obtener pedimento completo
|
||||
"""
|
||||
async def _execute_pedimento_completo():
|
||||
operation_name = "pedimento_completo"
|
||||
service_data = None
|
||||
|
||||
try:
|
||||
logger.info(f"[TASK] Iniciando consulta de pedimento completo - Pedimento: {request_data['pedimento']}")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
||||
await _validate_request_data(request_data)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
||||
service_data = await _get_pedimento_service(
|
||||
pedimento_id=request_data['pedimento'],
|
||||
service_type=3,
|
||||
operation_name=operation_name
|
||||
)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
||||
update_success = await _update_service_status(
|
||||
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
||||
)
|
||||
|
||||
if not update_success:
|
||||
raise Exception("Error al actualizar estado del servicio")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo credenciales VUCEM'})
|
||||
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
||||
if not contribuyente_id:
|
||||
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise Exception("ID de contribuyente no encontrado")
|
||||
|
||||
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Realizando petición SOAP'})
|
||||
logger.info("[TASK] Realizando petición SOAP para pedimento completo...")
|
||||
|
||||
soap_response = await get_soap_pedimento_completo(
|
||||
credenciales=credentials,
|
||||
response_service=service_data,
|
||||
soap_controller=soap_controller
|
||||
)
|
||||
|
||||
if not soap_response:
|
||||
raise Exception("Error en la petición SOAP para pedimento completo")
|
||||
|
||||
logger.info("[TASK] Petición SOAP para pedimento completo completada exitosamente")
|
||||
# Subir documento de pedimento completo si la petición fue exitosa
|
||||
try:
|
||||
upload_result = await _post_edocuments(
|
||||
response_service=service_data,
|
||||
identificadores_ed=[soap_response.get('documento', {}).get('numero_edocument')]
|
||||
)
|
||||
logger.info(f"Documento de pedimento completo subido exitosamente: {upload_result}")
|
||||
except Exception as upload_err:
|
||||
logger.error(f"Error al subir documento de pedimento completo: {upload_err}")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
||||
|
||||
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
||||
|
||||
response_data = await _create_response(
|
||||
service_data=service_data,
|
||||
additional_data={
|
||||
"pedimento_completo": soap_response,
|
||||
"documento": soap_response.get('documento', {}),
|
||||
"xml_content": soap_response.get('xml_content', {})
|
||||
},
|
||||
success_message="Pedimento completo obtenido exitosamente"
|
||||
)
|
||||
|
||||
logger.info(f"[TASK] Consulta de pedimento completo completada exitosamente - Servicio: {service_data['id']}")
|
||||
return response_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[TASK] Error en pedimento_completo: {e}")
|
||||
if service_data:
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise e
|
||||
|
||||
return run_async_task(_execute_pedimento_completo)
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def partidas_task(self, **kwargs):
|
||||
"""Tarea asíncrona para obtener partidas"""
|
||||
async def _execute_partidas():
|
||||
operation_name = "PARTIDAS_ASYNC"
|
||||
service_data = None
|
||||
|
||||
try:
|
||||
request_data = kwargs
|
||||
logger.info(f"[TASK] Iniciando consulta de partidas - Pedimento: {request_data.get('pedimento')}")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
||||
await _validate_request_data(request_data)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
||||
service_data = await _get_pedimento_service(
|
||||
pedimento_id=request_data['pedimento'],
|
||||
service_type=4,
|
||||
operation_name=operation_name
|
||||
)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
||||
update_success = await _update_service_status(
|
||||
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
||||
)
|
||||
|
||||
if not update_success:
|
||||
raise Exception("Error al actualizar estado del servicio a 'En proceso'")
|
||||
|
||||
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
||||
if not contribuyente_id:
|
||||
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise Exception("ID de contribuyente no encontrado")
|
||||
|
||||
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Realizando petición SOAP'})
|
||||
logger.info("[TASK] Realizando petición SOAP para partidas...")
|
||||
|
||||
from utils.peticiones import get_soap_partidas
|
||||
|
||||
soap_response = await get_soap_partidas(
|
||||
credenciales=credentials,
|
||||
response_service=service_data,
|
||||
soap_controller=soap_controller,
|
||||
partida=request_data.get('partida', '')
|
||||
)
|
||||
|
||||
if not soap_response:
|
||||
raise Exception("Error en la petición SOAP para partidas")
|
||||
|
||||
logger.info("[TASK] Petición SOAP para partidas completada exitosamente")
|
||||
# Subir documento de partidas si la petición fue exitosa
|
||||
try:
|
||||
upload_result = await _post_edocuments(
|
||||
response_service=service_data,
|
||||
identificadores_ed=[soap_response.get('documento', {}).get('numero_edocument')]
|
||||
)
|
||||
logger.info(f"Documento de partidas subido exitosamente: {upload_result}")
|
||||
except Exception as upload_err:
|
||||
logger.error(f"Error al subir documento de partidas: {upload_err}")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
||||
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
||||
|
||||
response_data = await _create_response(
|
||||
service_data=service_data,
|
||||
additional_data={
|
||||
"partidas": soap_response,
|
||||
"documento": soap_response.get('documento', {}),
|
||||
"xml_content": soap_response.get('xml_content', {})
|
||||
},
|
||||
success_message="Partidas obtenidas exitosamente"
|
||||
)
|
||||
|
||||
logger.info(f"[TASK] Consulta de partidas completada exitosamente - Servicio: {service_data['id']}")
|
||||
return response_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[TASK] Error en partidas: {e}")
|
||||
if service_data:
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise e
|
||||
|
||||
return run_async_task(_execute_partidas)
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def remesas_task(self, **kwargs):
|
||||
"""Tarea asíncrona para obtener remesas"""
|
||||
async def _execute_remesas():
|
||||
operation_name = "REMESAS_ASYNC"
|
||||
service_data = None
|
||||
|
||||
try:
|
||||
request_data = kwargs
|
||||
logger.info(f"[TASK] Iniciando consulta de remesas - Pedimento: {request_data.get('pedimento')}")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
||||
await _validate_request_data(request_data)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
||||
service_data = await _get_pedimento_service(
|
||||
pedimento_id=request_data['pedimento'],
|
||||
service_type=5,
|
||||
operation_name=operation_name
|
||||
)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
||||
update_success = await _update_service_status(
|
||||
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
||||
)
|
||||
|
||||
if not update_success:
|
||||
raise Exception("Error al actualizar estado del servicio a 'En proceso'")
|
||||
|
||||
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
||||
if not contribuyente_id:
|
||||
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise Exception("ID de contribuyente no encontrado")
|
||||
|
||||
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Realizando petición SOAP'})
|
||||
logger.info("[TASK] Realizando petición SOAP para remesas...")
|
||||
|
||||
from utils.peticiones import get_soap_remesas
|
||||
|
||||
soap_response = await get_soap_remesas(
|
||||
credenciales=credentials,
|
||||
response_service=service_data,
|
||||
soap_controller=soap_controller
|
||||
)
|
||||
|
||||
if not soap_response:
|
||||
raise Exception("Error en la petición SOAP para remesas")
|
||||
|
||||
logger.info("[TASK] Petición SOAP para remesas completada exitosamente")
|
||||
# Subir documento de remesas si la petición fue exitosa
|
||||
try:
|
||||
upload_result = await _post_edocuments(
|
||||
response_service=service_data,
|
||||
identificadores_ed=[soap_response.get('documento', {}).get('numero_edocument')]
|
||||
)
|
||||
logger.info(f"Documento de remesas subido exitosamente: {upload_result}")
|
||||
except Exception as upload_err:
|
||||
logger.error(f"Error al subir documento de remesas: {upload_err}")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
||||
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
||||
|
||||
response_data = await _create_response(
|
||||
service_data=service_data,
|
||||
additional_data={
|
||||
"remesas": soap_response,
|
||||
"documento": soap_response.get('documento', {}),
|
||||
"xml_content": soap_response.get('xml_content', {})
|
||||
},
|
||||
success_message="Remesas obtenidas exitosamente"
|
||||
)
|
||||
|
||||
logger.info(f"[TASK] Consulta de remesas completada exitosamente - Servicio: {service_data['id']}")
|
||||
return response_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[TASK] Error en remesas: {e}")
|
||||
if service_data:
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise e
|
||||
|
||||
return run_async_task(_execute_remesas)
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def acuse_task(self, **kwargs):
|
||||
"""Tarea asíncrona para obtener acuse"""
|
||||
async def _execute_acuse():
|
||||
operation_name = "ACUSE_ASYNC"
|
||||
service_data = None
|
||||
|
||||
try:
|
||||
request_data = kwargs
|
||||
logger.info(f"[TASK] Iniciando consulta de acuse - Pedimento: {request_data.get('pedimento')}")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
||||
await _validate_request_data(request_data)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
||||
service_data = await _get_pedimento_service(
|
||||
pedimento_id=request_data['pedimento'],
|
||||
service_type=6,
|
||||
operation_name=operation_name
|
||||
)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
||||
update_success = await _update_service_status(
|
||||
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
||||
)
|
||||
|
||||
if not update_success:
|
||||
raise Exception("Error al actualizar estado del servicio a 'En proceso'")
|
||||
|
||||
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
||||
if not contribuyente_id:
|
||||
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise Exception("ID de contribuyente no encontrado")
|
||||
|
||||
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
||||
|
||||
# Obtener documentos digitalizados (e-documents)
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo documentos digitalizados'})
|
||||
logger.info("[TASK] Obteniendo documentos digitalizados...")
|
||||
|
||||
try:
|
||||
edocs = await rest_controller.get_edocs(service_data['pedimento']['id'])
|
||||
|
||||
if not edocs:
|
||||
logger.warning("No se encontraron documentos digitalizados para el pedimento")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise Exception("No se encontraron documentos digitalizados para el pedimento")
|
||||
|
||||
logger.info(f"Se encontraron {len(edocs)} documentos digitalizados")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al obtener documentos digitalizados: {e}")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise e
|
||||
|
||||
# Procesar acuses de documentos digitalizados
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Procesando acuses de documentos'})
|
||||
documentos_procesados = []
|
||||
documentos_exitosos = 0
|
||||
|
||||
logger.info(f"[TASK] Procesando acuses para {len(edocs)} documentos...")
|
||||
|
||||
from utils.peticiones import get_soap_acuse
|
||||
|
||||
for idx, edoc in enumerate(edocs):
|
||||
documento_info = {
|
||||
"clave": edoc.get('clave', 'N/A'),
|
||||
"descripcion": edoc.get('descripcion', 'N/A'),
|
||||
"numero_edocument": edoc.get('numero_edocument', 'N/A'),
|
||||
"procesado": False,
|
||||
"error": None
|
||||
}
|
||||
|
||||
# Verificar que el documento tenga número de e-document
|
||||
if not edoc.get('numero_edocument'):
|
||||
logger.warning(f"Documento {idx + 1} no tiene numero_edocument, saltando...")
|
||||
documento_info["error"] = "Sin número de e-document"
|
||||
documentos_procesados.append(documento_info)
|
||||
continue
|
||||
|
||||
try:
|
||||
logger.info(f"Procesando acuse para documento {idx + 1}: {edoc['numero_edocument']}")
|
||||
|
||||
soap_response = await get_soap_acuse(
|
||||
credenciales=credentials,
|
||||
response_service=service_data,
|
||||
soap_controller=soap_controller,
|
||||
edocument=edoc,
|
||||
idx=idx + 1
|
||||
)
|
||||
|
||||
if soap_response:
|
||||
documento_info["procesado"] = True
|
||||
documento_info["documento"] = soap_response.get('documento', {})
|
||||
documentos_exitosos += 1
|
||||
logger.info(f"Acuse del documento {idx + 1} procesado exitosamente")
|
||||
else:
|
||||
documento_info["error"] = "Error en petición SOAP"
|
||||
logger.warning(f"No se pudo procesar el acuse del documento {idx + 1}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al procesar acuse del documento {idx + 1}: {e}")
|
||||
documento_info["error"] = str(e)
|
||||
# Continuar con los siguientes documentos
|
||||
|
||||
documentos_procesados.append(documento_info)
|
||||
|
||||
# Verificar si se procesó al menos un documento
|
||||
if documentos_exitosos == 0:
|
||||
logger.error("No se pudo procesar ningún acuse de documento digitalizado")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise Exception("No se pudo procesar ningún acuse de documento digitalizado")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
||||
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
||||
|
||||
response_data = await _create_response(
|
||||
service_data=service_data,
|
||||
additional_data={
|
||||
"acuses": documentos_procesados,
|
||||
"documentos_procesados": len(documentos_procesados),
|
||||
"documentos_exitosos": documentos_exitosos
|
||||
},
|
||||
success_message=f"Acuses procesados exitosamente: {documentos_exitosos} de {len(documentos_procesados)} documentos"
|
||||
)
|
||||
|
||||
logger.info(f"[TASK] Consulta de acuse completada exitosamente - Servicio: {service_data['id']}")
|
||||
return response_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[TASK] Error en acuse: {e}")
|
||||
if service_data:
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise e
|
||||
|
||||
return run_async_task(_execute_acuse)
|
||||
|
||||
|
||||
# Tarea asíncrona para consultar el estado de un pedimento
|
||||
@celery_app.task(bind=True, name='tasks.estado_pedimento_task')
|
||||
def estado_pedimento_task(self, request_data: Dict[str, Any]):
|
||||
"""
|
||||
Tarea asíncrona para consultar el estado de un pedimento
|
||||
"""
|
||||
async def _execute_estado_pedimento():
|
||||
operation_name = "estado_pedimento"
|
||||
service_data = None
|
||||
try:
|
||||
logger.info(f"[TASK] Iniciando consulta de estado de pedimento - Pedimento: {request_data['pedimento']}")
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
||||
await _validate_request_data(request_data)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
||||
service_data = await _get_pedimento_service(
|
||||
pedimento_id=request_data['pedimento'],
|
||||
service_type=1,
|
||||
operation_name=operation_name
|
||||
)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
||||
update_success = await _update_service_status(
|
||||
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
||||
)
|
||||
if not update_success:
|
||||
raise Exception("Error al actualizar estado del servicio")
|
||||
|
||||
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
||||
if not contribuyente_id:
|
||||
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise Exception("ID de contribuyente no encontrado")
|
||||
|
||||
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Realizando petición SOAP'})
|
||||
logger.info("[TASK] Realizando petición SOAP para estado de pedimento...")
|
||||
from utils.peticiones import get_soap_pedimento_estado
|
||||
soap_response = await get_soap_pedimento_estado(
|
||||
credenciales=credentials,
|
||||
response_service=service_data,
|
||||
soap_controller=soap_controller
|
||||
)
|
||||
if not soap_response:
|
||||
raise Exception("Error en la petición SOAP para estado de pedimento")
|
||||
|
||||
logger.info("[TASK] Petición SOAP para estado de pedimento completada exitosamente")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
||||
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
||||
|
||||
response_data = await _create_response(
|
||||
service_data=service_data,
|
||||
additional_data={
|
||||
"estado_pedimento": soap_response
|
||||
},
|
||||
success_message="Estado de pedimento obtenido exitosamente"
|
||||
)
|
||||
logger.info(f"[TASK] Consulta de estado de pedimento completada exitosamente - Servicio: {service_data['id']}")
|
||||
return response_data
|
||||
except Exception as e:
|
||||
logger.error(f"[TASK] Error en estado de pedimento: {e}")
|
||||
if service_data:
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise e
|
||||
return run_async_task(_execute_estado_pedimento)
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def edocument_task(self, **kwargs):
|
||||
"""Tarea asíncrona para obtener edocument"""
|
||||
async def _execute_edocument():
|
||||
operation_name = "EDOCUMENT_ASYNC"
|
||||
service_data = None
|
||||
|
||||
try:
|
||||
request_data = kwargs
|
||||
logger.info(f"[TASK] Iniciando consulta de edocument - Pedimento: {request_data.get('pedimento')}")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
||||
await _validate_request_data(request_data)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
||||
service_data = await _get_pedimento_service(
|
||||
pedimento_id=request_data['pedimento'],
|
||||
service_type=7,
|
||||
operation_name=operation_name
|
||||
)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
||||
update_success = await _update_service_status(
|
||||
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
||||
)
|
||||
|
||||
if not update_success:
|
||||
raise Exception("Error al actualizar estado del servicio a 'En proceso'")
|
||||
|
||||
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
||||
if not contribuyente_id:
|
||||
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise Exception("ID de contribuyente no encontrado")
|
||||
|
||||
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
||||
|
||||
# Obtener documentos digitalizados (e-documents)
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo documentos digitalizados'})
|
||||
logger.info("[TASK] Obteniendo documentos digitalizados...")
|
||||
|
||||
try:
|
||||
edocs = await rest_controller.get_edocs(service_data['pedimento']['id'])
|
||||
|
||||
if not edocs:
|
||||
logger.warning("No se encontraron documentos digitalizados para el pedimento")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise Exception("No se encontraron documentos digitalizados para el pedimento")
|
||||
|
||||
logger.info(f"Se encontraron {len(edocs)} documentos digitalizados")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al obtener documentos digitalizados: {e}")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise e
|
||||
|
||||
# Procesar edocuments
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Procesando documentos electrónicos'})
|
||||
documentos_procesados = []
|
||||
documentos_exitosos = 0
|
||||
|
||||
logger.info(f"[TASK] Procesando edocuments para {len(edocs)} documentos...")
|
||||
|
||||
from utils.peticiones import get_soap_edocument
|
||||
|
||||
for idx, edoc in enumerate(edocs):
|
||||
documento_info = {
|
||||
"clave": edoc.get('clave', 'N/A'),
|
||||
"descripcion": edoc.get('descripcion', 'N/A'),
|
||||
"numero_edocument": edoc.get('numero_edocument', 'N/A'),
|
||||
"procesado": False,
|
||||
"error": None
|
||||
}
|
||||
|
||||
# Verificar que el documento tenga número de e-document
|
||||
if not edoc.get('numero_edocument'):
|
||||
logger.warning(f"Documento {idx + 1} no tiene numero_edocument, saltando...")
|
||||
documento_info["error"] = "Sin número de e-document"
|
||||
documentos_procesados.append(documento_info)
|
||||
continue
|
||||
|
||||
try:
|
||||
logger.info(f"Procesando e-document {idx + 1}: {edoc['numero_edocument']}")
|
||||
|
||||
soap_response = await get_soap_edocument(
|
||||
credenciales=credentials,
|
||||
response_service=service_data,
|
||||
soap_controller=soap_controller,
|
||||
edocument=edoc,
|
||||
idx=idx + 1
|
||||
)
|
||||
|
||||
if soap_response:
|
||||
documento_info["procesado"] = True
|
||||
documento_info["documento"] = soap_response.get('documento', {})
|
||||
documentos_exitosos += 1
|
||||
logger.info(f"E-document {idx + 1} procesado exitosamente")
|
||||
# Subir el documento si la petición fue exitosa
|
||||
try:
|
||||
upload_result = await _post_edocuments(
|
||||
response_service=service_data,
|
||||
identificadores_ed=[edoc.get('numero_edocument')]
|
||||
)
|
||||
documento_info["upload_result"] = upload_result
|
||||
logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente")
|
||||
except Exception as upload_err:
|
||||
documento_info["upload_error"] = str(upload_err)
|
||||
logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}")
|
||||
# Subir el documento si la petición fue exitosa
|
||||
try:
|
||||
upload_result = await _post_edocuments(
|
||||
response_service=service_data,
|
||||
identificadores_ed=[edoc.get('numero_edocument')]
|
||||
)
|
||||
documento_info["upload_result"] = upload_result
|
||||
logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente")
|
||||
except Exception as upload_err:
|
||||
documento_info["upload_error"] = str(upload_err)
|
||||
logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}")
|
||||
else:
|
||||
documento_info["error"] = "Error en petición SOAP"
|
||||
logger.warning(f"No se pudo procesar el e-document {idx + 1}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al procesar e-document {idx + 1}: {e}")
|
||||
documento_info["error"] = str(e)
|
||||
# Continuar con los siguientes documentos
|
||||
|
||||
documentos_procesados.append(documento_info)
|
||||
|
||||
# Verificar si se procesó al menos un documento
|
||||
if documentos_exitosos == 0:
|
||||
logger.error("No se pudo procesar ningún e-document")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise Exception("No se pudo procesar ningún e-document")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
||||
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
||||
|
||||
response_data = await _create_response(
|
||||
service_data=service_data,
|
||||
additional_data={
|
||||
"edocuments": documentos_procesados,
|
||||
"documentos_procesados": len(documentos_procesados),
|
||||
"documentos_exitosos": documentos_exitosos
|
||||
},
|
||||
success_message=f"E-documents procesados exitosamente: {documentos_exitosos} de {len(documentos_procesados)} documentos"
|
||||
)
|
||||
|
||||
logger.info(f"[TASK] Consulta de edocument completada exitosamente - Servicio: {service_data['id']}")
|
||||
return response_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[TASK] Error en edocument: {e}")
|
||||
if service_data:
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise e
|
||||
|
||||
return run_async_task(_execute_edocument)
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def coves_task(self, **kwargs):
|
||||
"""Tarea asíncrona para obtener coves"""
|
||||
async def _execute_coves():
|
||||
operation_name = "COVES_ASYNC"
|
||||
service_data = None
|
||||
|
||||
try:
|
||||
request_data = kwargs
|
||||
logger.info(f"[TASK] Iniciando consulta de coves - Pedimento: {request_data.get('pedimento')}")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
||||
await _validate_request_data(request_data)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
||||
service_data = await _get_pedimento_service(
|
||||
pedimento_id=request_data['pedimento'],
|
||||
service_type=8,
|
||||
operation_name=operation_name
|
||||
)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
||||
update_success = await _update_service_status(
|
||||
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
||||
)
|
||||
|
||||
if not update_success:
|
||||
raise Exception("Error al actualizar estado del servicio a 'En proceso'")
|
||||
|
||||
# Para COVES, se usa el RESTController en lugar de SOAP
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Realizando petición REST'})
|
||||
logger.info("[TASK] Realizando petición REST para coves...")
|
||||
|
||||
coves_response = await rest_controller.get_coves(request_data['pedimento'])
|
||||
|
||||
if not coves_response:
|
||||
raise Exception("Error en la petición REST para coves")
|
||||
|
||||
logger.info("[TASK] Petición REST para coves completada exitosamente")
|
||||
# Subir documento de coves si la petición fue exitosa
|
||||
try:
|
||||
upload_result = await _post_coves(
|
||||
response_service=service_data,
|
||||
identificadores_cove=[coves_response[0].get('numero_cove')] if isinstance(coves_response, list) and coves_response else []
|
||||
)
|
||||
logger.info(f"Documento de coves subido exitosamente: {upload_result}")
|
||||
except Exception as upload_err:
|
||||
logger.error(f"Error al subir documento de coves: {upload_err}")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
||||
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
||||
|
||||
response_data = await _create_response(
|
||||
service_data=service_data,
|
||||
additional_data={
|
||||
"coves": coves_response
|
||||
},
|
||||
success_message="Coves obtenidos exitosamente"
|
||||
)
|
||||
|
||||
logger.info(f"[TASK] Consulta de coves completada exitosamente - Servicio: {service_data['id']}")
|
||||
return response_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[TASK] Error en coves: {e}")
|
||||
if service_data:
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise e
|
||||
|
||||
return run_async_task(_execute_coves)
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def acuse_cove_task(self, **kwargs):
|
||||
"""Tarea asíncrona para obtener acuse de COVE"""
|
||||
async def _execute_acuse_cove():
|
||||
operation_name = "ACUSE_COVE_ASYNC"
|
||||
service_data = None
|
||||
|
||||
try:
|
||||
request_data = kwargs
|
||||
logger.info(f"[TASK] Iniciando consulta de acuse COVE - Pedimento: {request_data.get('pedimento')}")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
||||
await _validate_request_data(request_data)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
||||
service_data = await _get_pedimento_service(
|
||||
pedimento_id=request_data['pedimento'],
|
||||
service_type=9,
|
||||
operation_name=operation_name
|
||||
)
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
||||
update_success = await _update_service_status(
|
||||
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
||||
)
|
||||
|
||||
if not update_success:
|
||||
raise Exception("Error al actualizar estado del servicio a 'En proceso'")
|
||||
|
||||
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
||||
if not contribuyente_id:
|
||||
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise Exception("ID de contribuyente no encontrado")
|
||||
|
||||
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
||||
|
||||
# Obtener COVES
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo COVES'})
|
||||
logger.info("[TASK] Obteniendo COVES...")
|
||||
|
||||
try:
|
||||
coves = await rest_controller.get_coves(service_data['pedimento']['id'])
|
||||
|
||||
if not coves:
|
||||
logger.warning("No se encontraron COVES para el pedimento")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise Exception("No se encontraron COVES para el pedimento")
|
||||
|
||||
logger.info(f"Se encontraron {len(coves)} COVES")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al obtener COVES: {e}")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise e
|
||||
|
||||
# Procesar acuses de COVES
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Procesando acuses de COVES'})
|
||||
documentos_procesados = []
|
||||
documentos_exitosos = 0
|
||||
|
||||
logger.info(f"[TASK] Procesando acuses para {len(coves)} COVES...")
|
||||
|
||||
from utils.peticiones import get_soap_acuseCOVE
|
||||
|
||||
for idx, cove in enumerate(coves):
|
||||
documento_info = {
|
||||
"clave": cove.get('clave', 'N/A'),
|
||||
"descripcion": cove.get('descripcion', 'N/A'),
|
||||
"numero_cove": cove.get('numero_cove', 'N/A'),
|
||||
"procesado": False,
|
||||
"error": None
|
||||
}
|
||||
|
||||
# Verificar que el documento tenga número de COVE
|
||||
if not cove.get('numero_cove'):
|
||||
logger.warning(f"COVE {idx + 1} no tiene numero_cove, saltando...")
|
||||
documento_info["error"] = "Sin número de COVE"
|
||||
documentos_procesados.append(documento_info)
|
||||
continue
|
||||
|
||||
try:
|
||||
logger.info(f"Procesando acuse para COVE {idx + 1}: {cove['numero_cove']}")
|
||||
|
||||
soap_response = await get_soap_acuseCOVE(
|
||||
credenciales=credentials,
|
||||
response_service=service_data,
|
||||
soap_controller=soap_controller,
|
||||
cove=cove,
|
||||
idx=idx + 1
|
||||
)
|
||||
|
||||
if soap_response:
|
||||
documento_info["procesado"] = True
|
||||
documento_info["documento"] = soap_response.get('documento', {})
|
||||
documentos_exitosos += 1
|
||||
logger.info(f"Acuse de COVE {idx + 1} procesado exitosamente")
|
||||
# Subir el documento de COVE si la petición fue exitosa
|
||||
try:
|
||||
upload_result = await _post_coves(
|
||||
response_service=service_data,
|
||||
identificadores_cove=[cove.get('numero_cove')]
|
||||
)
|
||||
documento_info["upload_result"] = upload_result
|
||||
logger.info(f"Documento COVE {cove.get('numero_cove')} subido exitosamente")
|
||||
except Exception as upload_err:
|
||||
documento_info["upload_error"] = str(upload_err)
|
||||
logger.error(f"Error al subir documento COVE {cove.get('numero_cove')}: {upload_err}")
|
||||
else:
|
||||
documento_info["error"] = "Error en petición SOAP"
|
||||
logger.warning(f"No se pudo procesar el acuse de COVE {idx + 1}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al procesar acuse de COVE {idx + 1}: {e}")
|
||||
documento_info["error"] = str(e)
|
||||
# Continuar con los siguientes documentos
|
||||
|
||||
documentos_procesados.append(documento_info)
|
||||
|
||||
# Verificar si se procesó al menos un documento
|
||||
if documentos_exitosos == 0:
|
||||
logger.error("No se pudo procesar ningún acuse de COVE")
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise Exception("No se pudo procesar ningún acuse de COVE")
|
||||
|
||||
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
||||
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
||||
|
||||
response_data = await _create_response(
|
||||
service_data=service_data,
|
||||
additional_data={
|
||||
"acuses_cove": documentos_procesados,
|
||||
"documentos_procesados": len(documentos_procesados),
|
||||
"documentos_exitosos": documentos_exitosos
|
||||
},
|
||||
success_message=f"Acuses de COVE procesados exitosamente: {documentos_exitosos} de {len(documentos_procesados)} documentos"
|
||||
)
|
||||
|
||||
logger.info(f"[TASK] Consulta de acuse COVE completada exitosamente - Servicio: {service_data['id']}")
|
||||
return response_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[TASK] Error en acuse COVE: {e}")
|
||||
if service_data:
|
||||
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||
raise e
|
||||
|
||||
return run_async_task(_execute_acuse_cove)
|
||||
Reference in New Issue
Block a user