import asyncio import logging from celery import Celery from celery_app import celery_app from typing import Dict, Any from .services import consume_ws_get_cove, consume_ws_get_acuse_cove from api.api_v2.modules.tasks.tasks import run_async_task # Logger para el módulo logger = logging.getLogger(__name__) @celery_app.task(bind=True) def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: """ Tarea de Celery para procesar la solicitud de descarga de COVE. Args: cove_request: Diccionario con los datos de la solicitud de COVE. Returns: Diccionario con la respuesta del COVE procesado. """ try: logger.info(f"Iniciando procesamiento de COVE - Task ID: {self.request.id}") # Actualizar progreso self.update_state(state='PROGRESS', meta={'current': 10, 'total': 100, 'status': 'Iniciando procesamiento de COVE'}) # Usar run_async_task para ejecutar la función asíncrona result = run_async_task(consume_ws_get_cove, **cove_request) # Actualizar progreso self.update_state(state='SUCCESS', meta={'current': 100, 'total': 100, 'status': 'COVE procesado exitosamente'}) logger.info(f"COVE procesado exitosamente - Task ID: {self.request.id}") # Asegurar que la respuesta sea serializable para Celery return { "status": "processed", "data": result, "task_id": self.request.id } except Exception as e: error_msg = f"Error procesando COVE: {str(e)}" logger.error(error_msg, exc_info=True) # Actualizar estado con error self.update_state( state='FAILURE', meta={ 'current': 0, 'total': 100, 'status': error_msg, 'error': str(e) } ) # Re-lanzar excepción para que Celery la registre raise e @celery_app.task(bind=True) def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: """ Tarea de Celery para procesar la solicitud de acuse de COVE. Args: cove_request: Diccionario con los datos de la solicitud de acuse. Returns: Diccionario con la respuesta del acuse procesado. """ try: logger.info(f"Iniciando procesamiento de acuse de COVE - Task ID: {self.request.id}") # Actualizar progreso self.update_state(state='PROGRESS', meta={'current': 10, 'total': 100, 'status': 'Iniciando procesamiento de acuse de COVE'}) # Usar run_async_task para ejecutar la función asíncrona result = run_async_task(consume_ws_get_acuse_cove, **cove_request) # Actualizar progreso self.update_state(state='SUCCESS', meta={'current': 100, 'total': 100, 'status': 'Acuse de COVE procesado exitosamente'}) logger.info(f"Acuse de COVE procesado exitosamente - Task ID: {self.request.id}") # Asegurar que la respuesta sea serializable para Celery return { "status": "processed", "data": result, "task_id": self.request.id } except Exception as e: error_msg = f"Error procesando acuse de COVE: {str(e)}" logger.error(error_msg, exc_info=True) # Actualizar estado con error self.update_state( state='FAILURE', meta={ 'current': 0, 'total': 100, 'status': error_msg, 'error': str(e) } ) # Re-lanzar excepción para que Celery la registre raise e