import asyncio import logging import time from celery_app import celery_app from typing import Dict from fastapi import HTTPException as _HTTPException from .services import obtener_edoc, marcar_error_edocument from api.api_v2.modules.tasks.services import register_task, update_task # Logger para el módulo logger = logging.getLogger(__name__) # Reintentos internos del worker: pertenecen al MISMO intento orquestado y no # incrementan el contador de intentos del backend (T2026-05-027) WORKER_MAX_RETRIES = 2 @celery_app.task(bind=True) def process_edoc_download_request(self, edoc_data: Dict) -> Dict: """ Tarea de Celery para procesar la descarga de un solo documento edoc. """ task_id = self.request.id pedimento_info = edoc_data.get('pedimento', {}) pedimento_id = pedimento_info.get('id') organizacion_id = pedimento_info.get('organizacion') pedimento_app = pedimento_info.get('pedimento_app', 'N/A') edoc_info = edoc_data.get('edoc', {}) # el mapeo de numero de documento no estaba correcto, se corrigio edoc_number = edoc_info.get('numero_edocument', 'N/A') loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos) if self.request.retries == 0: logger.info(f"[EDOC] Registrando inicio de tarea {task_id}") loop.run_until_complete( register_task( task_id=task_id, status="submitted", message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}", pedimento_id=pedimento_id, organizacion_id=organizacion_id, servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents ) ) # Esperar un momento breve para asegurar que el registro se complete time.sleep(1) # Actualizar estado: procesando logger.info(f"[EDOC] Actualizando estado a processing para tarea {task_id}") loop.run_until_complete( update_task( task_id=task_id, status="processing", message=f"Descargando E-document {edoc_number} para pedimento {pedimento_app}", pedimento_id=pedimento_id, organizacion_id=organizacion_id, servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents ) ) # Obtener el E-document result = loop.run_until_complete(obtener_edoc(**edoc_data)) # Actualizar estado: completado loop.run_until_complete( update_task( task_id=task_id, status="completed", message=f"E-document {edoc_number} descargado exitosamente para pedimento {pedimento_app}", pedimento_id=pedimento_id, organizacion_id=organizacion_id, servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents ) ) return {"status": "success", "result": result} except Exception as e: # En caso de error, actualizar estado error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}" logger.error(error_message, exc_info=True) # Reintento interno del worker para fallos transitorios (red, timeout): # mismo intento orquestado, NO incrementa el contador del backend if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES: raise self.retry(exc=e, countdown=60 * (self.request.retries + 1)) # Fallo definitivo de este intento: registrar el detalle en el registro de # negocio. Permanece 'pendiente'; el tope de intentos automáticos del # backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'. try: loop.run_until_complete( marcar_error_edocument(edoc_info, pedimento_info, error_message, definitivo=False) ) except Exception as report_error: logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}") try: loop.run_until_complete( update_task( task_id=task_id, status="failed", message=error_message, pedimento_id=pedimento_id, organizacion_id=organizacion_id, servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents ) ) except Exception as update_error: logger.error(f"Error actualizando estado de tarea: {update_error}") if isinstance(e, _HTTPException): raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None raise finally: # Cerrar el loop para liberar recursos loop.close()