Compare commits
4 Commits
Fix--Audit
...
tareas-seg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b0cc715eb3 | ||
| 5f41132f80 | |||
| d49747f288 | |||
| 47c8bf51c7 |
@@ -122,7 +122,7 @@ async def consume_ws_get_cove(**kwargs):
|
|||||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||||
pedimento=kwargs.get('pedimento').get('id'),
|
pedimento=kwargs.get('pedimento').get('id'),
|
||||||
file_name=f"vu_COVE_{pedimento_app}_{cove}_ERROR.xml",
|
file_name=f"vu_COVE_{pedimento_app}_{cove}_ERROR.xml",
|
||||||
document_type=10,
|
document_type=20,
|
||||||
)
|
)
|
||||||
|
|
||||||
raise Exception("Error en la respuesta del servicio SOAP")
|
raise Exception("Error en la respuesta del servicio SOAP")
|
||||||
@@ -256,7 +256,7 @@ async def consume_ws_get_acuse_cove(**kwargs):
|
|||||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||||
pedimento=kwargs.get('pedimento').get('id'),
|
pedimento=kwargs.get('pedimento').get('id'),
|
||||||
file_name=error_file_name,
|
file_name=error_file_name,
|
||||||
document_type=10,
|
document_type=24,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error al guardar respuesta SOAP errónea: {e}")
|
logger.error(f"Error al guardar respuesta SOAP errónea: {e}")
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import time
|
|
||||||
from celery import Celery
|
from celery import Celery
|
||||||
from celery_app import celery_app
|
from celery_app import celery_app
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
@@ -27,9 +26,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
# Registrar el inicio de la tarea
|
# Registrar el inicio de la tarea
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
|
logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
|
||||||
loop.run_until_complete(
|
asyncio.run(
|
||||||
register_task(
|
register_task(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
status="submitted",
|
status="submitted",
|
||||||
@@ -40,12 +38,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Esperar un momento breve para asegurar que el registro se complete
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
# Actualizar estado: procesando
|
|
||||||
logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}")
|
logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}")
|
||||||
loop.run_until_complete(
|
asyncio.run(
|
||||||
update_task(
|
update_task(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
status="processing",
|
status="processing",
|
||||||
@@ -56,11 +50,9 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Obtener el COVE
|
cove_response = asyncio.run(consume_ws_get_cove(**cove_request))
|
||||||
cove_response = loop.run_until_complete(consume_ws_get_cove(**cove_request))
|
|
||||||
|
|
||||||
# Actualizar estado: completado
|
asyncio.run(
|
||||||
loop.run_until_complete(
|
|
||||||
update_task(
|
update_task(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
status="completed",
|
status="completed",
|
||||||
@@ -74,19 +66,23 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
return {"status": "processed", "data": cove_response}
|
return {"status": "processed", "data": cove_response}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# En caso de error, actualizar estado
|
|
||||||
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
||||||
logger.error(error_message)
|
logger.error(error_message)
|
||||||
loop.run_until_complete(
|
|
||||||
update_task(
|
try:
|
||||||
task_id=task_id,
|
asyncio.run(
|
||||||
status="failed",
|
update_task(
|
||||||
message=error_message,
|
task_id=task_id,
|
||||||
pedimento_id=pedimento_id,
|
status="failed",
|
||||||
organizacion_id=organizacion_id,
|
message=error_message,
|
||||||
servicio=8
|
pedimento_id=pedimento_id,
|
||||||
|
organizacion_id=organizacion_id,
|
||||||
|
servicio=8
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
except Exception as update_error:
|
||||||
|
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
@@ -105,9 +101,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
# Registrar el inicio de la tarea
|
# Registrar el inicio de la tarea
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
|
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
|
||||||
loop.run_until_complete(
|
asyncio.run(
|
||||||
register_task(
|
register_task(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
status="submitted",
|
status="submitted",
|
||||||
@@ -118,12 +113,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Esperar un momento breve para asegurar que el registro se complete
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
# Actualizar estado: procesando
|
|
||||||
logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}")
|
logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}")
|
||||||
loop.run_until_complete(
|
asyncio.run(
|
||||||
update_task(
|
update_task(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
status="processing",
|
status="processing",
|
||||||
@@ -134,11 +125,9 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Obtener el acuse del COVE
|
acuse_response = asyncio.run(consume_ws_get_acuse_cove(**cove_request))
|
||||||
acuse_response = loop.run_until_complete(consume_ws_get_acuse_cove(**cove_request))
|
|
||||||
|
|
||||||
# Actualizar estado: completado
|
asyncio.run(
|
||||||
loop.run_until_complete(
|
|
||||||
update_task(
|
update_task(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
status="completed",
|
status="completed",
|
||||||
@@ -152,18 +141,21 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
|||||||
return {"status": "processed", "data": acuse_response}
|
return {"status": "processed", "data": acuse_response}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# En caso de error, actualizar estado
|
|
||||||
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
||||||
logger.error(error_message)
|
logger.error(error_message)
|
||||||
loop.run_until_complete(
|
|
||||||
update_task(
|
try:
|
||||||
task_id=task_id,
|
asyncio.run(
|
||||||
status="failed",
|
update_task(
|
||||||
message=error_message,
|
task_id=task_id,
|
||||||
pedimento_id=pedimento_id,
|
status="failed",
|
||||||
organizacion_id=organizacion_id,
|
message=error_message,
|
||||||
servicio=9
|
pedimento_id=pedimento_id,
|
||||||
|
organizacion_id=organizacion_id,
|
||||||
|
servicio=9
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
except Exception as update_error:
|
||||||
raise
|
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
|
||||||
|
|
||||||
|
raise
|
||||||
@@ -23,9 +23,11 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
|||||||
edoc_info = edoc_data.get('edoc', {})
|
edoc_info = edoc_data.get('edoc', {})
|
||||||
edoc_number = edoc_info.get('numero_edoc', 'N/A')
|
edoc_number = edoc_info.get('numero_edoc', 'N/A')
|
||||||
|
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Registrar el inicio de la tarea
|
# Registrar el inicio de la tarea
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
|
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
register_task(
|
register_task(
|
||||||
@@ -74,16 +76,23 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
# En caso de error, actualizar estado
|
# En caso de error, actualizar estado
|
||||||
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
|
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
|
||||||
logger.error(error_message)
|
logger.error(error_message, exc_info=True)
|
||||||
loop.run_until_complete(
|
|
||||||
update_task(
|
try:
|
||||||
task_id=task_id,
|
loop.run_until_complete(
|
||||||
status="failed",
|
update_task(
|
||||||
message=error_message,
|
task_id=task_id,
|
||||||
pedimento_id=pedimento_id,
|
status="failed",
|
||||||
organizacion_id=organizacion_id,
|
message=error_message,
|
||||||
servicio=3
|
pedimento_id=pedimento_id,
|
||||||
|
organizacion_id=organizacion_id,
|
||||||
|
servicio=3
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
except Exception as update_error:
|
||||||
|
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
|
# Cerrar el loop para liberar recursos
|
||||||
|
loop.close()
|
||||||
@@ -102,7 +102,7 @@ async def consume_ws_get_partida(**kwargs):
|
|||||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||||
pedimento=kwargs.get('pedimento').get('id'),
|
pedimento=kwargs.get('pedimento').get('id'),
|
||||||
file_name=error_file_name,
|
file_name=error_file_name,
|
||||||
document_type=10,
|
document_type=18,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error al guardar la respuesta de error: {e}")
|
logger.error(f"Error al guardar la respuesta de error: {e}")
|
||||||
|
|||||||
@@ -24,9 +24,12 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
|
|||||||
partida_info = partida_request.get('partida', {})
|
partida_info = partida_request.get('partida', {})
|
||||||
partida_numero = partida_info.get('numero', 'N/A')
|
partida_numero = partida_info.get('numero', 'N/A')
|
||||||
|
|
||||||
|
# Crear un nuevo event loop para esta tarea
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Registrar el inicio de la tarea
|
# Registrar el inicio de la tarea
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
logger.info(f"[PARTIDA] Registrando inicio de tarea {task_id}")
|
logger.info(f"[PARTIDA] Registrando inicio de tarea {task_id}")
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
register_task(
|
register_task(
|
||||||
@@ -38,9 +41,6 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
|
|||||||
servicio=4 # 4 corresponde a "Pedimento Partidas"
|
servicio=4 # 4 corresponde a "Pedimento Partidas"
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Esperar un momento breve para asegurar que el registro se complete
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
# Actualizar estado: procesando
|
# Actualizar estado: procesando
|
||||||
logger.info(f"[PARTIDA] Actualizando estado a processing para tarea {task_id}")
|
logger.info(f"[PARTIDA] Actualizando estado a processing para tarea {task_id}")
|
||||||
@@ -76,15 +76,20 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
|
|||||||
# En caso de error, actualizar estado
|
# En caso de error, actualizar estado
|
||||||
error_message = f"Error al procesar partida {partida_numero} para pedimento {pedimento_app}: {str(e)}"
|
error_message = f"Error al procesar partida {partida_numero} para pedimento {pedimento_app}: {str(e)}"
|
||||||
logger.error(error_message)
|
logger.error(error_message)
|
||||||
loop.run_until_complete(
|
try:
|
||||||
update_task(
|
loop.run_until_complete(
|
||||||
task_id=task_id,
|
update_task(
|
||||||
status="failed",
|
task_id=task_id,
|
||||||
message=error_message,
|
status="failed",
|
||||||
pedimento_id=pedimento_id,
|
message=error_message,
|
||||||
organizacion_id=organizacion_id,
|
pedimento_id=pedimento_id,
|
||||||
servicio=4
|
organizacion_id=organizacion_id,
|
||||||
|
servicio=4
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
except Exception as update_error:
|
||||||
|
logger.error(f"Error al actualizar estado de tarea: {update_error}")
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
|
# Limpiar el event loop
|
||||||
|
loop.close()
|
||||||
@@ -95,7 +95,7 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
|
|||||||
organizacion=pedimento_data.get('organizacion'),
|
organizacion=pedimento_data.get('organizacion'),
|
||||||
pedimento=pedimento_data.get('id'),
|
pedimento=pedimento_data.get('id'),
|
||||||
file_name=f"vu_PC_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml",
|
file_name=f"vu_PC_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml",
|
||||||
document_type=10,
|
document_type=14,
|
||||||
)
|
)
|
||||||
raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP")
|
raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP")
|
||||||
|
|
||||||
|
|||||||
@@ -17,7 +17,9 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
|
|||||||
Returns:
|
Returns:
|
||||||
dict: Resultado del procesamiento con estado y detalles
|
dict: Resultado del procesamiento con estado y detalles
|
||||||
"""
|
"""
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
task_id = self.request.id
|
task_id = self.request.id
|
||||||
servicio = 3 # Código para Pedimento Completo
|
servicio = 3 # Código para Pedimento Completo
|
||||||
pedimento_id = pedimento_data.get('pedimento', {}).get('id')
|
pedimento_id = pedimento_data.get('pedimento', {}).get('id')
|
||||||
@@ -84,4 +86,6 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
|
|||||||
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||||
|
|
||||||
# Re-lanzar la excepción para que Celery la marque como fallida
|
# Re-lanzar la excepción para que Celery la marque como fallida
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
@@ -88,15 +88,49 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
|
|||||||
# Generar nombre de archivo
|
# Generar nombre de archivo
|
||||||
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml"
|
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml"
|
||||||
if soap_error(soap_response):
|
if soap_error(soap_response):
|
||||||
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
|
|
||||||
document_response = await remesa_rest_controller.post_document(
|
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
|
||||||
soap_response=soap_response,
|
document_response = await remesa_rest_controller.post_document(
|
||||||
organizacion=pedimento_data.get('organizacion'),
|
soap_response=soap_response,
|
||||||
pedimento=pedimento_data.get('id'),
|
organizacion=pedimento_data.get('organizacion'),
|
||||||
file_name=file_name,
|
pedimento=pedimento_data.get('id'),
|
||||||
document_type=10,
|
file_name=file_name,
|
||||||
)
|
document_type=16,
|
||||||
raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP")
|
)
|
||||||
|
|
||||||
|
# Aquí necesitamos extraer el mensaje de error real
|
||||||
|
error_message = "Error en la respuesta del servicio SOAP"
|
||||||
|
|
||||||
|
# Intentar extraer mensaje de error del XML de respuesta
|
||||||
|
if hasattr(soap_response, 'text') and soap_response.text:
|
||||||
|
try:
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
root = ET.fromstring(soap_response.text)
|
||||||
|
|
||||||
|
# Buscar mensajes de error comunes en respuestas SOAP de VUCEM
|
||||||
|
# Esto puede variar según el servicio, pero comúnmente buscan:
|
||||||
|
for fault in root.findall('.//{http://schemas.xmlsoap.org/soap/envelope/}Fault'):
|
||||||
|
faultcode = fault.find('.//faultcode')
|
||||||
|
faultstring = fault.find('.//faultstring')
|
||||||
|
if faultstring is not None and faultstring.text:
|
||||||
|
error_message = faultstring.text
|
||||||
|
break
|
||||||
|
|
||||||
|
# También podría estar en una estructura de error específica de VUCEM
|
||||||
|
for error in root.findall('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}error'):
|
||||||
|
msg = error.find('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}message')
|
||||||
|
if msg is not None and msg.text:
|
||||||
|
error_message = msg.text
|
||||||
|
break
|
||||||
|
|
||||||
|
except Exception as parse_error:
|
||||||
|
logger.error(f"Error al parsear respuesta SOAP para extraer mensaje: {parse_error}")
|
||||||
|
|
||||||
|
# Lanzar excepción con el mensaje de error real
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail=f"Error en la respuesta del servicio SOAP: {error_message}"
|
||||||
|
)
|
||||||
# Enviar documento
|
# Enviar documento
|
||||||
try:
|
try:
|
||||||
|
|
||||||
|
|||||||
@@ -17,13 +17,16 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
|
|||||||
Returns:
|
Returns:
|
||||||
dict: Resultado del procesamiento con estado y detalles
|
dict: Resultado del procesamiento con estado y detalles
|
||||||
"""
|
"""
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
task_id = self.request.id
|
task_id = self.request.id
|
||||||
servicio = 5 # Código para Pedimento Remesas
|
servicio = 5 # Código para Pedimento Remesas
|
||||||
pedimento_id = remesa_request.get('pedimento', {}).get('id')
|
pedimento_id = remesa_request.get('pedimento', {}).get('id')
|
||||||
organizacion_id = remesa_request.get('pedimento', {}).get('organizacion')
|
organizacion_id = remesa_request.get('pedimento', {}).get('organizacion')
|
||||||
remesa_num = remesa_request.get('remesa', 'N/A')
|
remesa_num = remesa_request.get('remesa', 'N/A')
|
||||||
|
|
||||||
|
# Crear un NUEVO event loop para esta tarea (evita problemas de loop cerrado)
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Actualizar estado a processing
|
# Actualizar estado a processing
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
@@ -49,7 +52,6 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
|
|||||||
pedimento_id=pedimento_id,
|
pedimento_id=pedimento_id,
|
||||||
organizacion_id=organizacion_id,
|
organizacion_id=organizacion_id,
|
||||||
servicio=servicio,
|
servicio=servicio,
|
||||||
result=result
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -60,20 +62,43 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
|
|||||||
|
|
||||||
# Actualizar estado a failed
|
# Actualizar estado a failed
|
||||||
try:
|
try:
|
||||||
loop.run_until_complete(
|
# Verificar si el loop aún está abierto
|
||||||
update_task(
|
if not loop.is_closed():
|
||||||
task_id=task_id,
|
loop.run_until_complete(
|
||||||
message=f"Error al procesar remesa {remesa_num}: {str(e)}",
|
update_task(
|
||||||
status="failed",
|
task_id=task_id,
|
||||||
pedimento_id=pedimento_id,
|
message=f"Error al procesar remesa {remesa_num}: {str(e)}",
|
||||||
organizacion_id=organizacion_id,
|
status="failed",
|
||||||
servicio=servicio,
|
pedimento_id=pedimento_id,
|
||||||
error=str(e)
|
organizacion_id=organizacion_id,
|
||||||
|
servicio=servicio,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
else:
|
||||||
|
# Si el loop está cerrado, crear uno nuevo temporal
|
||||||
|
logger.warning(f"Loop cerrado, creando loop temporal para actualizar error")
|
||||||
|
temp_loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(temp_loop)
|
||||||
|
try:
|
||||||
|
temp_loop.run_until_complete(
|
||||||
|
update_task(
|
||||||
|
task_id=task_id,
|
||||||
|
message=f"Error al procesar remesa {remesa_num}: {str(e)}",
|
||||||
|
status="failed",
|
||||||
|
pedimento_id=pedimento_id,
|
||||||
|
organizacion_id=organizacion_id,
|
||||||
|
servicio=servicio
|
||||||
|
)
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
temp_loop.close()
|
||||||
except Exception as update_error:
|
except Exception as update_error:
|
||||||
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||||
|
|
||||||
# Re-lanzar la excepción para que Celery la marque como fallida
|
# Re-lanzar la excepción para que Celery la marque como fallida
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# Limpiar el event loop
|
||||||
|
if not loop.is_closed():
|
||||||
|
loop.close()
|
||||||
@@ -55,8 +55,16 @@ async def update_task(
|
|||||||
json=update_data,
|
json=update_data,
|
||||||
headers=headers
|
headers=headers
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if response.status_code == 404:
|
||||||
|
logger.warning(f"Tarea {task_id} no encontrada, intentando crearla...")
|
||||||
|
return await _create_and_update_task(
|
||||||
|
task_id, message, status, pedimento_id, organizacion_id, servicio
|
||||||
|
)
|
||||||
|
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
return response.json()
|
return response.json()
|
||||||
|
|
||||||
except httpx.HTTPError as e:
|
except httpx.HTTPError as e:
|
||||||
logger.error(f"Error al actualizar tarea {task_id}: {str(e)}")
|
logger.error(f"Error al actualizar tarea {task_id}: {str(e)}")
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
@@ -81,6 +89,72 @@ async def update_task(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _create_and_update_task(
|
||||||
|
task_id: str,
|
||||||
|
message: str,
|
||||||
|
status: str,
|
||||||
|
pedimento_id: str,
|
||||||
|
organizacion_id: str,
|
||||||
|
servicio: int
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Función interna para crear una tarea y luego actualizarla.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Primero crear la tarea
|
||||||
|
logger.info(f"Creando tarea {task_id} antes de actualizar")
|
||||||
|
|
||||||
|
headers = {
|
||||||
|
"Authorization": f"Token {settings.API_TOKEN}"
|
||||||
|
}
|
||||||
|
|
||||||
|
create_data = {
|
||||||
|
"task_id": task_id,
|
||||||
|
"message": message,
|
||||||
|
"status": status,
|
||||||
|
"pedimento": pedimento_id,
|
||||||
|
"organizacion": organizacion_id,
|
||||||
|
"servicio": servicio
|
||||||
|
}
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
# Crear la tarea
|
||||||
|
create_response = await client.post(
|
||||||
|
f"{settings.API_URL}/tasks/tasks/",
|
||||||
|
json=create_data,
|
||||||
|
headers=headers
|
||||||
|
)
|
||||||
|
create_response.raise_for_status()
|
||||||
|
logger.info(f"Tarea {task_id} creada exitosamente")
|
||||||
|
|
||||||
|
# Actualizar la tarea recién creada
|
||||||
|
url = f"{settings.API_URL}/tasks/tasks/{task_id}/"
|
||||||
|
update_response = await client.put(
|
||||||
|
url,
|
||||||
|
json=create_data,
|
||||||
|
headers=headers
|
||||||
|
)
|
||||||
|
update_response.raise_for_status()
|
||||||
|
logger.info(f"Tarea {task_id} actualizada exitosamente después de crear")
|
||||||
|
|
||||||
|
return update_response.json()
|
||||||
|
|
||||||
|
except httpx.HTTPError as e:
|
||||||
|
logger.error(f"Error al crear/actualizar tarea {task_id}: {str(e)}")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail=create_error_response(
|
||||||
|
message="Error al crear la tarea",
|
||||||
|
errors=[str(e)],
|
||||||
|
metadata={
|
||||||
|
"task_id": task_id,
|
||||||
|
"status": status
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def register_task(
|
async def register_task(
|
||||||
task_id: str,
|
task_id: str,
|
||||||
message: str,
|
message: str,
|
||||||
@@ -154,4 +228,4 @@ async def register_task(
|
|||||||
errors=[str(e)],
|
errors=[str(e)],
|
||||||
metadata={"task_id": task_id}
|
metadata={"task_id": task_id}
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@@ -46,7 +46,7 @@ class APIRESTController:
|
|||||||
"""
|
"""
|
||||||
Método asíncrono para hacer peticiones a la API usando httpx.
|
Método asíncrono para hacer peticiones a la API usando httpx.
|
||||||
"""
|
"""
|
||||||
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/') }"
|
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
|
||||||
|
|
||||||
logger.warning(f"Realizando petición {method} a {url}")
|
logger.warning(f"Realizando petición {method} a {url}")
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user