From b0cc715eb3401704adfe2d8723f5c0effcf9129f Mon Sep 17 00:00:00 2001 From: Dulce Date: Thu, 26 Mar 2026 11:41:52 -0600 Subject: [PATCH] eliminar fallos de asyncio --- api/api_v2/modules/coves/tasks.py | 82 ++++++++++++-------------- api/api_v2/modules/edocs/tasks.py | 33 +++++++---- api/api_v2/modules/partidas/tasks.py | 33 ++++++----- api/api_v2/modules/pedimentos/tasks.py | 8 ++- api/api_v2/modules/remesas/services.py | 52 +++++++++++++--- api/api_v2/modules/remesas/tasks.py | 51 ++++++++++++---- api/api_v2/modules/tasks/services.py | 76 +++++++++++++++++++++++- controllers/RESTController.py | 2 +- 8 files changed, 240 insertions(+), 97 deletions(-) diff --git a/api/api_v2/modules/coves/tasks.py b/api/api_v2/modules/coves/tasks.py index f1d8201..cb9adcf 100644 --- a/api/api_v2/modules/coves/tasks.py +++ b/api/api_v2/modules/coves/tasks.py @@ -1,6 +1,5 @@ import asyncio import logging -import time from celery import Celery from celery_app import celery_app from typing import Dict, Any @@ -27,9 +26,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: try: # Registrar el inicio de la tarea - loop = asyncio.get_event_loop() logger.info(f"[COVE] Registrando inicio de tarea {task_id}") - loop.run_until_complete( + asyncio.run( register_task( task_id=task_id, status="submitted", @@ -40,12 +38,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: ) ) - # Esperar un momento breve para asegurar que el registro se complete - time.sleep(1) - - # Actualizar estado: procesando logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}") - loop.run_until_complete( + asyncio.run( update_task( task_id=task_id, status="processing", @@ -56,11 +50,9 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: ) ) - # Obtener el COVE - cove_response = loop.run_until_complete(consume_ws_get_cove(**cove_request)) + cove_response = asyncio.run(consume_ws_get_cove(**cove_request)) - # Actualizar estado: completado - loop.run_until_complete( + asyncio.run( update_task( task_id=task_id, status="completed", @@ -74,19 +66,23 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: return {"status": "processed", "data": cove_response} except Exception as e: - # En caso de error, actualizar estado error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" logger.error(error_message) - loop.run_until_complete( - update_task( - task_id=task_id, - status="failed", - message=error_message, - pedimento_id=pedimento_id, - organizacion_id=organizacion_id, - servicio=8 + + try: + asyncio.run( + update_task( + task_id=task_id, + status="failed", + message=error_message, + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=8 + ) ) - ) + except Exception as update_error: + logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}") + raise @@ -105,9 +101,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, try: # Registrar el inicio de la tarea - loop = asyncio.get_event_loop() logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}") - loop.run_until_complete( + asyncio.run( register_task( task_id=task_id, status="submitted", @@ -118,12 +113,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, ) ) - # Esperar un momento breve para asegurar que el registro se complete - time.sleep(1) - - # Actualizar estado: procesando logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}") - loop.run_until_complete( + asyncio.run( update_task( task_id=task_id, status="processing", @@ -134,11 +125,9 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, ) ) - # Obtener el acuse del COVE - acuse_response = loop.run_until_complete(consume_ws_get_acuse_cove(**cove_request)) + acuse_response = asyncio.run(consume_ws_get_acuse_cove(**cove_request)) - # Actualizar estado: completado - loop.run_until_complete( + asyncio.run( update_task( task_id=task_id, status="completed", @@ -152,18 +141,21 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, return {"status": "processed", "data": acuse_response} except Exception as e: - # En caso de error, actualizar estado error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" logger.error(error_message) - loop.run_until_complete( - update_task( - task_id=task_id, - status="failed", - message=error_message, - pedimento_id=pedimento_id, - organizacion_id=organizacion_id, - servicio=9 + + try: + asyncio.run( + update_task( + task_id=task_id, + status="failed", + message=error_message, + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=9 + ) ) - ) - raise - + except Exception as update_error: + logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}") + + raise \ No newline at end of file diff --git a/api/api_v2/modules/edocs/tasks.py b/api/api_v2/modules/edocs/tasks.py index d6fad28..0bff9cb 100644 --- a/api/api_v2/modules/edocs/tasks.py +++ b/api/api_v2/modules/edocs/tasks.py @@ -23,9 +23,11 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict: edoc_info = edoc_data.get('edoc', {}) edoc_number = edoc_info.get('numero_edoc', 'N/A') + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: # Registrar el inicio de la tarea - loop = asyncio.get_event_loop() logger.info(f"[EDOC] Registrando inicio de tarea {task_id}") loop.run_until_complete( register_task( @@ -74,16 +76,23 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict: except Exception as e: # En caso de error, actualizar estado error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}" - logger.error(error_message) - loop.run_until_complete( - update_task( - task_id=task_id, - status="failed", - message=error_message, - pedimento_id=pedimento_id, - organizacion_id=organizacion_id, - servicio=3 + logger.error(error_message, exc_info=True) + + try: + loop.run_until_complete( + update_task( + task_id=task_id, + status="failed", + message=error_message, + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=3 + ) ) - ) + except Exception as update_error: + logger.error(f"Error actualizando estado de tarea: {update_error}") + raise - + finally: + # Cerrar el loop para liberar recursos + loop.close() \ No newline at end of file diff --git a/api/api_v2/modules/partidas/tasks.py b/api/api_v2/modules/partidas/tasks.py index 9c53513..91d58ab 100644 --- a/api/api_v2/modules/partidas/tasks.py +++ b/api/api_v2/modules/partidas/tasks.py @@ -24,9 +24,12 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str, partida_info = partida_request.get('partida', {}) partida_numero = partida_info.get('numero', 'N/A') + # Crear un nuevo event loop para esta tarea + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: # Registrar el inicio de la tarea - loop = asyncio.get_event_loop() logger.info(f"[PARTIDA] Registrando inicio de tarea {task_id}") loop.run_until_complete( register_task( @@ -38,9 +41,6 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str, servicio=4 # 4 corresponde a "Pedimento Partidas" ) ) - - # Esperar un momento breve para asegurar que el registro se complete - time.sleep(1) # Actualizar estado: procesando logger.info(f"[PARTIDA] Actualizando estado a processing para tarea {task_id}") @@ -76,15 +76,20 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str, # En caso de error, actualizar estado error_message = f"Error al procesar partida {partida_numero} para pedimento {pedimento_app}: {str(e)}" logger.error(error_message) - loop.run_until_complete( - update_task( - task_id=task_id, - status="failed", - message=error_message, - pedimento_id=pedimento_id, - organizacion_id=organizacion_id, - servicio=4 + try: + loop.run_until_complete( + update_task( + task_id=task_id, + status="failed", + message=error_message, + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=4 + ) ) - ) + except Exception as update_error: + logger.error(f"Error al actualizar estado de tarea: {update_error}") raise - + finally: + # Limpiar el event loop + loop.close() \ No newline at end of file diff --git a/api/api_v2/modules/pedimentos/tasks.py b/api/api_v2/modules/pedimentos/tasks.py index 27b0c46..9169829 100644 --- a/api/api_v2/modules/pedimentos/tasks.py +++ b/api/api_v2/modules/pedimentos/tasks.py @@ -17,7 +17,9 @@ def process_pedimento_completo_request(self, pedimento_data: dict): Returns: dict: Resultado del procesamiento con estado y detalles """ - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + task_id = self.request.id servicio = 3 # Código para Pedimento Completo pedimento_id = pedimento_data.get('pedimento', {}).get('id') @@ -84,4 +86,6 @@ def process_pedimento_completo_request(self, pedimento_data: dict): logger.error(f"Error actualizando estado de tarea: {update_error}") # Re-lanzar la excepción para que Celery la marque como fallida - raise \ No newline at end of file + raise + finally: + loop.close() \ No newline at end of file diff --git a/api/api_v2/modules/remesas/services.py b/api/api_v2/modules/remesas/services.py index 3f47b0b..49e7c47 100644 --- a/api/api_v2/modules/remesas/services.py +++ b/api/api_v2/modules/remesas/services.py @@ -88,15 +88,49 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]: # Generar nombre de archivo file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml" if soap_error(soap_response): - file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml" - document_response = await remesa_rest_controller.post_document( - soap_response=soap_response, - organizacion=pedimento_data.get('organizacion'), - pedimento=pedimento_data.get('id'), - file_name=file_name, - document_type=16, - ) - raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP") + + file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml" + document_response = await remesa_rest_controller.post_document( + soap_response=soap_response, + organizacion=pedimento_data.get('organizacion'), + pedimento=pedimento_data.get('id'), + file_name=file_name, + document_type=16, + ) + + # Aquí necesitamos extraer el mensaje de error real + error_message = "Error en la respuesta del servicio SOAP" + + # Intentar extraer mensaje de error del XML de respuesta + if hasattr(soap_response, 'text') and soap_response.text: + try: + import xml.etree.ElementTree as ET + root = ET.fromstring(soap_response.text) + + # Buscar mensajes de error comunes en respuestas SOAP de VUCEM + # Esto puede variar según el servicio, pero comúnmente buscan: + for fault in root.findall('.//{http://schemas.xmlsoap.org/soap/envelope/}Fault'): + faultcode = fault.find('.//faultcode') + faultstring = fault.find('.//faultstring') + if faultstring is not None and faultstring.text: + error_message = faultstring.text + break + + # También podría estar en una estructura de error específica de VUCEM + for error in root.findall('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}error'): + msg = error.find('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}message') + if msg is not None and msg.text: + error_message = msg.text + break + + except Exception as parse_error: + logger.error(f"Error al parsear respuesta SOAP para extraer mensaje: {parse_error}") + + # Lanzar excepción con el mensaje de error real + raise HTTPException( + status_code=500, + detail=f"Error en la respuesta del servicio SOAP: {error_message}" + ) # Enviar documento try: diff --git a/api/api_v2/modules/remesas/tasks.py b/api/api_v2/modules/remesas/tasks.py index 1c1717e..367d359 100644 --- a/api/api_v2/modules/remesas/tasks.py +++ b/api/api_v2/modules/remesas/tasks.py @@ -17,13 +17,16 @@ def process_remesa_request(self, remesa_request: dict) -> dict: Returns: dict: Resultado del procesamiento con estado y detalles """ - loop = asyncio.get_event_loop() task_id = self.request.id servicio = 5 # Código para Pedimento Remesas pedimento_id = remesa_request.get('pedimento', {}).get('id') organizacion_id = remesa_request.get('pedimento', {}).get('organizacion') remesa_num = remesa_request.get('remesa', 'N/A') + # Crear un NUEVO event loop para esta tarea (evita problemas de loop cerrado) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: # Actualizar estado a processing loop.run_until_complete( @@ -49,7 +52,6 @@ def process_remesa_request(self, remesa_request: dict) -> dict: pedimento_id=pedimento_id, organizacion_id=organizacion_id, servicio=servicio, - result=result ) ) @@ -60,20 +62,43 @@ def process_remesa_request(self, remesa_request: dict) -> dict: # Actualizar estado a failed try: - loop.run_until_complete( - update_task( - task_id=task_id, - message=f"Error al procesar remesa {remesa_num}: {str(e)}", - status="failed", - pedimento_id=pedimento_id, - organizacion_id=organizacion_id, - servicio=servicio, - error=str(e) + # Verificar si el loop aún está abierto + if not loop.is_closed(): + loop.run_until_complete( + update_task( + task_id=task_id, + message=f"Error al procesar remesa {remesa_num}: {str(e)}", + status="failed", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=servicio, + ) ) - ) + else: + # Si el loop está cerrado, crear uno nuevo temporal + logger.warning(f"Loop cerrado, creando loop temporal para actualizar error") + temp_loop = asyncio.new_event_loop() + asyncio.set_event_loop(temp_loop) + try: + temp_loop.run_until_complete( + update_task( + task_id=task_id, + message=f"Error al procesar remesa {remesa_num}: {str(e)}", + status="failed", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=servicio + ) + ) + finally: + temp_loop.close() except Exception as update_error: logger.error(f"Error actualizando estado de tarea: {update_error}") # Re-lanzar la excepción para que Celery la marque como fallida raise - + + finally: + # Limpiar el event loop + if not loop.is_closed(): + loop.close() \ No newline at end of file diff --git a/api/api_v2/modules/tasks/services.py b/api/api_v2/modules/tasks/services.py index 24a079b..ec447df 100644 --- a/api/api_v2/modules/tasks/services.py +++ b/api/api_v2/modules/tasks/services.py @@ -55,8 +55,16 @@ async def update_task( json=update_data, headers=headers ) + + if response.status_code == 404: + logger.warning(f"Tarea {task_id} no encontrada, intentando crearla...") + return await _create_and_update_task( + task_id, message, status, pedimento_id, organizacion_id, servicio + ) + response.raise_for_status() return response.json() + except httpx.HTTPError as e: logger.error(f"Error al actualizar tarea {task_id}: {str(e)}") raise HTTPException( @@ -81,6 +89,72 @@ async def update_task( ) ) + +async def _create_and_update_task( + task_id: str, + message: str, + status: str, + pedimento_id: str, + organizacion_id: str, + servicio: int +) -> Dict[str, Any]: + """ + Función interna para crear una tarea y luego actualizarla. + """ + try: + # Primero crear la tarea + logger.info(f"Creando tarea {task_id} antes de actualizar") + + headers = { + "Authorization": f"Token {settings.API_TOKEN}" + } + + create_data = { + "task_id": task_id, + "message": message, + "status": status, + "pedimento": pedimento_id, + "organizacion": organizacion_id, + "servicio": servicio + } + + async with httpx.AsyncClient() as client: + # Crear la tarea + create_response = await client.post( + f"{settings.API_URL}/tasks/tasks/", + json=create_data, + headers=headers + ) + create_response.raise_for_status() + logger.info(f"Tarea {task_id} creada exitosamente") + + # Actualizar la tarea recién creada + url = f"{settings.API_URL}/tasks/tasks/{task_id}/" + update_response = await client.put( + url, + json=create_data, + headers=headers + ) + update_response.raise_for_status() + logger.info(f"Tarea {task_id} actualizada exitosamente después de crear") + + return update_response.json() + + except httpx.HTTPError as e: + logger.error(f"Error al crear/actualizar tarea {task_id}: {str(e)}") + raise HTTPException( + status_code=500, + detail=create_error_response( + message="Error al crear la tarea", + errors=[str(e)], + metadata={ + "task_id": task_id, + "status": status + } + ) + ) + + async def register_task( task_id: str, message: str, @@ -154,4 +228,4 @@ async def register_task( errors=[str(e)], metadata={"task_id": task_id} ) - ) + ) \ No newline at end of file diff --git a/controllers/RESTController.py b/controllers/RESTController.py index 0f67a6b..991c5c5 100644 --- a/controllers/RESTController.py +++ b/controllers/RESTController.py @@ -46,7 +46,7 @@ class APIRESTController: """ Método asíncrono para hacer peticiones a la API usando httpx. """ - url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/') }" + url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}" logger.warning(f"Realizando petición {method} a {url}") try: