from celery import Celery, current_task from celery_app import celery_app import asyncio import logging from typing import Dict, Any from contextlib import asynccontextmanager from fastapi import HTTPException from .services import obtener_acuse, marcar_error_acuse from api.api_v2.modules.tasks.services import register_task, update_task # Reintentos internos del worker: pertenecen al MISMO intento orquestado y no # incrementan el contador de intentos del backend (T2026-05-027) WORKER_MAX_RETRIES = 2 @celery_app.task(bind=True) def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]: """ Tarea para procesar solicitudes de acuse. """ task_id = self.request.id pedimento_info = acuse_request.get('pedimento', {}) pedimento_id = pedimento_info.get('id') organizacion_id = pedimento_info.get('organizacion') pedimento_app = pedimento_info.get('pedimento_app', 'N/A') edoc_info = acuse_request.get('edoc', {}) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos) if self.request.retries == 0: logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}") loop.run_until_complete( register_task( task_id=task_id, status="submitted", message=f"Iniciando proceso de acuse para pedimento {pedimento_app}", pedimento_id=pedimento_id, organizacion_id=organizacion_id, servicio=6 # 6 corresponde a "Acuse" ) ) # Esperar un momento breve para asegurar que el registro se complete import time time.sleep(1) # Actualizar estado: procesando logging.info(f"[ACUSE] Actualizando estado a processing para tarea {task_id}") loop.run_until_complete( update_task( task_id=task_id, status="processing", message=f"Procesando acuse para pedimento {pedimento_app}", pedimento_id=pedimento_id, organizacion_id=organizacion_id, servicio=6 ) ) # Obtener el acuse acuse_response = loop.run_until_complete(obtener_acuse(**acuse_request)) # Actualizar estado: completado loop.run_until_complete( update_task( task_id=task_id, status="completed", message=f"Acuse obtenido exitosamente para pedimento {pedimento_app}", pedimento_id=pedimento_id, organizacion_id=organizacion_id, servicio=6 ) ) return {"status": "processed", "data": acuse_response} except Exception as e: # En caso de error, actualizar estado error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}" logging.error(error_message) # Reintento interno del worker para fallos transitorios (red, timeout): # mismo intento orquestado, NO incrementa el contador del backend if not isinstance(e, HTTPException) and self.request.retries < WORKER_MAX_RETRIES: raise self.retry(exc=e, countdown=60 * (self.request.retries + 1)) # Fallo definitivo de este intento: registrar el detalle en el registro de # negocio. Permanece 'pendiente'; el tope de intentos automáticos del # backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'. try: loop.run_until_complete( marcar_error_acuse(edoc_info, pedimento_info, error_message, definitivo=False) ) except Exception as report_error: logging.error(f"No se pudo registrar el error en el registro de negocio: {report_error}") try: loop.run_until_complete( update_task( task_id=task_id, status="failed", message=error_message, pedimento_id=pedimento_id, organizacion_id=organizacion_id, servicio=6 ) ) except Exception as update_error: logging.error(f"Error al actualizar estado de tarea: {update_error}") return {"status": "failed", "message": error_message} finally: loop.close()