eliminar fallos de asyncio

This commit is contained in:
Dulce
2026-03-26 11:41:52 -06:00
parent 5f41132f80
commit b0cc715eb3
8 changed files with 240 additions and 97 deletions

View File

@@ -1,6 +1,5 @@
import asyncio
import logging
import time
from celery import Celery
from celery_app import celery_app
from typing import Dict, Any
@@ -27,9 +26,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
try:
# Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
loop.run_until_complete(
asyncio.run(
register_task(
task_id=task_id,
status="submitted",
@@ -40,12 +38,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
)
)
# Esperar un momento breve para asegurar que el registro se complete
time.sleep(1)
# Actualizar estado: procesando
logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}")
loop.run_until_complete(
asyncio.run(
update_task(
task_id=task_id,
status="processing",
@@ -56,11 +50,9 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
)
)
# Obtener el COVE
cove_response = loop.run_until_complete(consume_ws_get_cove(**cove_request))
cove_response = asyncio.run(consume_ws_get_cove(**cove_request))
# Actualizar estado: completado
loop.run_until_complete(
asyncio.run(
update_task(
task_id=task_id,
status="completed",
@@ -74,19 +66,23 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
return {"status": "processed", "data": cove_response}
except Exception as e:
# En caso de error, actualizar estado
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message)
loop.run_until_complete(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=8
try:
asyncio.run(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=8
)
)
)
except Exception as update_error:
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
raise
@@ -105,9 +101,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
try:
# Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
loop.run_until_complete(
asyncio.run(
register_task(
task_id=task_id,
status="submitted",
@@ -118,12 +113,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
)
)
# Esperar un momento breve para asegurar que el registro se complete
time.sleep(1)
# Actualizar estado: procesando
logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}")
loop.run_until_complete(
asyncio.run(
update_task(
task_id=task_id,
status="processing",
@@ -134,11 +125,9 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
)
)
# Obtener el acuse del COVE
acuse_response = loop.run_until_complete(consume_ws_get_acuse_cove(**cove_request))
acuse_response = asyncio.run(consume_ws_get_acuse_cove(**cove_request))
# Actualizar estado: completado
loop.run_until_complete(
asyncio.run(
update_task(
task_id=task_id,
status="completed",
@@ -152,18 +141,21 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
return {"status": "processed", "data": acuse_response}
except Exception as e:
# En caso de error, actualizar estado
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message)
loop.run_until_complete(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=9
try:
asyncio.run(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=9
)
)
)
raise
except Exception as update_error:
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
raise