import asyncio import logging import time from celery import Celery from celery_app import celery_app from typing import Dict, Any from .services import consume_ws_get_partida from api.api_v2.modules.tasks.services import register_task, update_task # Logger para el módulo logger = logging.getLogger(__name__) @celery_app.task(bind=True) def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str, Any]: """ Tarea de Celery para procesar la solicitud de partida. """ task_id = self.request.id pedimento_info = partida_request.get('pedimento', {}) pedimento_id = pedimento_info.get('id') organizacion_id = pedimento_info.get('organizacion') pedimento_app = pedimento_info.get('pedimento_app', 'N/A') partida_info = partida_request.get('partida', {}) partida_numero = partida_info.get('numero', 'N/A') try: # Registrar el inicio de la tarea loop = asyncio.get_event_loop() logger.info(f"[PARTIDA] Registrando inicio de tarea {task_id}") loop.run_until_complete( register_task( task_id=task_id, status="submitted", message=f"Iniciando proceso de partida {partida_numero} para pedimento {pedimento_app}", pedimento_id=pedimento_id, organizacion_id=organizacion_id, servicio=4 # 4 corresponde a "Pedimento Partidas" ) ) # Esperar un momento breve para asegurar que el registro se complete time.sleep(1) # Actualizar estado: procesando logger.info(f"[PARTIDA] Actualizando estado a processing para tarea {task_id}") loop.run_until_complete( update_task( task_id=task_id, status="processing", message=f"Procesando partida {partida_numero} para pedimento {pedimento_app}", pedimento_id=pedimento_id, organizacion_id=organizacion_id, servicio=4 ) ) # Obtener la partida partida_response = loop.run_until_complete(consume_ws_get_partida(**partida_request)) # Actualizar estado: completado loop.run_until_complete( update_task( task_id=task_id, status="completed", message=f"Partida {partida_numero} procesada exitosamente para pedimento {pedimento_app}", pedimento_id=pedimento_id, organizacion_id=organizacion_id, servicio=4 ) ) return {"status": "processed", "data": partida_response} except Exception as e: # En caso de error, actualizar estado error_message = f"Error al procesar partida {partida_numero} para pedimento {pedimento_app}: {str(e)}" logger.error(error_message) loop.run_until_complete( update_task( task_id=task_id, status="failed", message=error_message, pedimento_id=pedimento_id, organizacion_id=organizacion_id, servicio=4 ) ) raise