From b9c6ab89c3d5fa9a7d4e482f8b559716d6b41b8e Mon Sep 17 00:00:00 2001 From: marcos Date: Mon, 15 Jun 2026 11:20:58 -0600 Subject: [PATCH] feature/capturar errores, evitar duplicados, eliminar manejar nuevas flags para descargar datos de vucem --- api/api_v2/modules/acuses/services.py | 31 +++++++ api/api_v2/modules/acuses/tasks.py | 50 ++++++++---- api/api_v2/modules/coves/services.py | 56 ++++++++++++- api/api_v2/modules/coves/tasks.py | 86 ++++++++++++++------ api/api_v2/modules/edocs/services.py | 31 +++++++ api/api_v2/modules/edocs/tasks.py | 46 ++++++++--- api/api_v2/modules/partidas/services.py | 34 +++++++- api/api_v2/modules/pedimentos/controllers.py | 15 ++-- controllers/RESTController.py | 52 +++++++----- controllers/XMLController.py | 15 ++-- 10 files changed, 326 insertions(+), 90 deletions(-) diff --git a/api/api_v2/modules/acuses/services.py b/api/api_v2/modules/acuses/services.py index 288e2f8..0ee9f59 100644 --- a/api/api_v2/modules/acuses/services.py +++ b/api/api_v2/modules/acuses/services.py @@ -203,6 +203,7 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict): data = { "id": edoc.get("id"), "acuse_descargado": status, + "acuse_estado": "descargado" if status else "pendiente", "numero_edocument": edoc.get("numero_edocument"), "pedimento": pedimento.get("id"), "organizacion": pedimento.get("organizacion"), @@ -210,6 +211,36 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict): response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data) + # Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027) + if response is None: + logger.error(f"No se pudo actualizar el estatus del acuse del EDocument {edoc.get('numero_edocument')} en la API") + raise Exception(f"Fallo al actualizar el estatus del acuse del EDocument {edoc.get('numero_edocument')}") + + return response + + +async def marcar_error_acuse(edoc: dict, pedimento: dict, mensaje: str, definitivo: bool = False): + """ + Reporta un fallo de descarga del acuse al registro de negocio (T2026-05-027). + + - definitivo=False (fallo transitorio): solo registra ultimo_error; el registro + permanece 'pendiente' y el tope de intentos automáticos del backend gobierna + la transición a 'error'. + - definitivo=True (fallo permanente): transiciona de inmediato a 'error'; + queda fuera del ciclo automático, solo reproceso manual o reset. + """ + data = { + "id": edoc.get("id"), + "ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000], + "numero_edocument": edoc.get("numero_edocument"), + "pedimento": pedimento.get("id"), + "organizacion": pedimento.get("organizacion"), + } + if definitivo: + data["acuse_estado"] = "error" + response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data) + if response is None: + logger.error(f"No se pudo registrar el error del acuse del EDocument {edoc.get('numero_edocument')} en la API") return response def _decode_acuse_base64_content(base64_content): # Testeado diff --git a/api/api_v2/modules/acuses/tasks.py b/api/api_v2/modules/acuses/tasks.py index fbcdd4c..24a7403 100644 --- a/api/api_v2/modules/acuses/tasks.py +++ b/api/api_v2/modules/acuses/tasks.py @@ -6,9 +6,13 @@ from typing import Dict, Any from contextlib import asynccontextmanager from fastapi import HTTPException -from .services import obtener_acuse +from .services import obtener_acuse, marcar_error_acuse from api.api_v2.modules.tasks.services import register_task, update_task +# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no +# incrementan el contador de intentos del backend (T2026-05-027) +WORKER_MAX_RETRIES = 2 + @celery_app.task(bind=True) def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]: @@ -20,23 +24,25 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any] pedimento_id = pedimento_info.get('id') organizacion_id = pedimento_info.get('organizacion') pedimento_app = pedimento_info.get('pedimento_app', 'N/A') - + edoc_info = acuse_request.get('edoc', {}) + loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - + try: - # Registrar el inicio de la tarea - logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}") - loop.run_until_complete( - register_task( - task_id=task_id, - status="submitted", - message=f"Iniciando proceso de acuse para pedimento {pedimento_app}", - pedimento_id=pedimento_id, - organizacion_id=organizacion_id, - servicio=6 # 6 corresponde a "Acuse" + # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos) + if self.request.retries == 0: + logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}") + loop.run_until_complete( + register_task( + task_id=task_id, + status="submitted", + message=f"Iniciando proceso de acuse para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=6 # 6 corresponde a "Acuse" + ) ) - ) # Esperar un momento breve para asegurar que el registro se complete import time @@ -76,6 +82,22 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any] # En caso de error, actualizar estado error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}" logging.error(error_message) + + # Reintento interno del worker para fallos transitorios (red, timeout): + # mismo intento orquestado, NO incrementa el contador del backend + if not isinstance(e, HTTPException) and self.request.retries < WORKER_MAX_RETRIES: + raise self.retry(exc=e, countdown=60 * (self.request.retries + 1)) + + # Fallo definitivo de este intento: registrar el detalle en el registro de + # negocio. Permanece 'pendiente'; el tope de intentos automáticos del + # backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'. + try: + loop.run_until_complete( + marcar_error_acuse(edoc_info, pedimento_info, error_message, definitivo=False) + ) + except Exception as report_error: + logging.error(f"No se pudo registrar el error en el registro de negocio: {report_error}") + try: loop.run_until_complete( update_task( diff --git a/api/api_v2/modules/coves/services.py b/api/api_v2/modules/coves/services.py index 7b6a13e..cdf82f9 100644 --- a/api/api_v2/modules/coves/services.py +++ b/api/api_v2/modules/coves/services.py @@ -146,6 +146,13 @@ async def consume_ws_get_cove(**kwargs): logger.error(f"Error detectado en la respuesta SOAP: {str(e)}") raise Exception(f"Error en la respuesta SOAP: {str(e)}") + # Validar que el documento quedó persistido en la API antes de marcar el + # COVE como descargado (T2026-05-027): post_or_update_document retorna + # None en error sin lanzar excepción + if not document_response or document_response.get("id") is None: + logger.error(f"No se pudo persistir el COVE {cove} en la API; el estatus no se actualiza") + raise Exception(f"No se pudo persistir el COVE {cove} en la API") + logger.info("Documento enviado, actualizando status de COVE") # Actualizar status del COVE @@ -329,6 +336,15 @@ async def consume_ws_get_acuse_cove(**kwargs): identifier=cove_identifier, ) + # Validar la subida antes de marcar el acuse como descargado (T2026-05-027): + # post_or_update_document retorna None en error sin lanzar excepción + if not rest_response or rest_response.get("id") is None: + logger.error("Error al enviar el acuse de COVE a la API interna; el estatus no se actualiza") + raise HTTPException( + status_code=500, + detail="No se pudo persistir el acuse del COVE en la API; el estatus no se actualiza" + ) + acuse_status = await change_acuse_status( cove=kwargs.get('cove'), status=True, @@ -507,6 +523,7 @@ async def change_cove_status(cove: dict, status: bool, pedimento: dict): data = { "id": cove.get("id"), "cove_descargado": status, + "cove_estado": "descargado" if status else "pendiente", "numero_cove": cove.get("cove"), "pedimento": pedimento.get("id"), "organizacion": pedimento.get("organizacion"), @@ -514,6 +531,11 @@ async def change_cove_status(cove: dict, status: bool, pedimento: dict): response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data) + # Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027) + if response is None: + logger.error(f"No se pudo actualizar el estatus del COVE {cove.get('cove')} en la API") + raise Exception(f"Fallo al actualizar el estatus del COVE {cove.get('cove')}") + return response @@ -521,14 +543,46 @@ async def change_acuse_status(cove: dict, status: bool, pedimento: dict): data = { "id": cove.get("id"), "acuse_cove_descargado": status, + "acuse_cove_estado": "descargado" if status else "pendiente", "numero_cove": cove.get("cove"), "pedimento": pedimento.get("id"), "organizacion": pedimento.get("organizacion"), } - print(data) response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data) + # Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027) + if response is None: + logger.error(f"No se pudo actualizar el estatus del acuse del COVE {cove.get('cove')} en la API") + raise Exception(f"Fallo al actualizar el estatus del acuse del COVE {cove.get('cove')}") + + return response + + +async def marcar_error_cove(cove: dict, pedimento: dict, mensaje: str, acuse: bool = False, definitivo: bool = False): + """ + Reporta un fallo de descarga al registro de negocio (T2026-05-027). + + - definitivo=False (fallo transitorio): solo registra ultimo_error; el registro + permanece 'pendiente' y el tope de intentos automáticos del backend gobierna + la transición a 'error'. + - definitivo=True (fallo permanente: VUCEM responde que no existe, credencial + rechazada): transiciona de inmediato a 'error'; queda fuera del ciclo + automático y solo el reproceso manual o el reset lo reincorporan. + """ + campo = "acuse_cove_estado" if acuse else "cove_estado" + data = { + "id": cove.get("id"), + "ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000], + "numero_cove": cove.get("cove"), + "pedimento": pedimento.get("id"), + "organizacion": pedimento.get("organizacion"), + } + if definitivo: + data[campo] = "error" + response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data) + if response is None: + logger.error(f"No se pudo registrar el error del COVE {cove.get('cove')} en la API") return response diff --git a/api/api_v2/modules/coves/tasks.py b/api/api_v2/modules/coves/tasks.py index 7f17492..9376ca8 100644 --- a/api/api_v2/modules/coves/tasks.py +++ b/api/api_v2/modules/coves/tasks.py @@ -5,12 +5,16 @@ from celery_app import celery_app from typing import Dict, Any from fastapi import HTTPException as _HTTPException -from .services import consume_ws_get_cove, consume_ws_get_acuse_cove +from .services import consume_ws_get_cove, consume_ws_get_acuse_cove, marcar_error_cove from api.api_v2.modules.tasks.services import update_task, register_task # Logger para el módulo logger = logging.getLogger(__name__) +# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no +# incrementan el contador de intentos del backend (T2026-05-027) +WORKER_MAX_RETRIES = 2 + @celery_app.task(bind=True) def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: @@ -27,18 +31,19 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: cove_number = cove_info.get('cove', 'N/A') try: - # Registrar el inicio de la tarea - logger.info(f"[COVE] Registrando inicio de tarea {task_id}") - asyncio.run( - register_task( - task_id=task_id, - status="submitted", - message=f"Iniciando proceso de COVE {cove_number} para pedimento {pedimento_app}", - pedimento_id=pedimento_id, - organizacion_id=organizacion_id, - servicio=8 # 8 corresponde a "Cove" + # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos) + if self.request.retries == 0: + logger.info(f"[COVE] Registrando inicio de tarea {task_id}") + asyncio.run( + register_task( + task_id=task_id, + status="submitted", + message=f"Iniciando proceso de COVE {cove_number} para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=8 # 8 corresponde a "Cove" + ) ) - ) logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}") asyncio.run( @@ -70,7 +75,22 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: except Exception as e: error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" logger.error(error_message) - + + # Reintento interno del worker para fallos transitorios (red, timeout): + # mismo intento orquestado, NO incrementa el contador del backend + if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES: + raise self.retry(exc=e, countdown=60 * (self.request.retries + 1)) + + # Fallo definitivo de este intento: registrar el detalle en el registro de + # negocio. Permanece 'pendiente'; el tope de intentos automáticos del + # backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'. + try: + asyncio.run( + marcar_error_cove(cove_info, pedimento_info, error_message, acuse=False, definitivo=False) + ) + except Exception as report_error: + logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}") + try: asyncio.run( update_task( @@ -105,18 +125,19 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, cove_number = cove_info.get('cove', 'N/A') try: - # Registrar el inicio de la tarea - logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}") - asyncio.run( - register_task( - task_id=task_id, - status="submitted", - message=f"Iniciando proceso de acuse de COVE {cove_number} para pedimento {pedimento_app}", - pedimento_id=pedimento_id, - organizacion_id=organizacion_id, - servicio=9 # 9 corresponde a "Acuse Cove" + # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos) + if self.request.retries == 0: + logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}") + asyncio.run( + register_task( + task_id=task_id, + status="submitted", + message=f"Iniciando proceso de acuse de COVE {cove_number} para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=9 # 9 corresponde a "Acuse Cove" + ) ) - ) logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}") asyncio.run( @@ -148,7 +169,22 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, except Exception as e: error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" logger.error(error_message) - + + # Reintento interno del worker para fallos transitorios (red, timeout): + # mismo intento orquestado, NO incrementa el contador del backend + if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES: + raise self.retry(exc=e, countdown=60 * (self.request.retries + 1)) + + # Fallo definitivo de este intento: registrar el detalle en el registro de + # negocio. Permanece 'pendiente'; el tope de intentos automáticos del + # backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'. + try: + asyncio.run( + marcar_error_cove(cove_info, pedimento_info, error_message, acuse=True, definitivo=False) + ) + except Exception as report_error: + logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}") + try: asyncio.run( update_task( diff --git a/api/api_v2/modules/edocs/services.py b/api/api_v2/modules/edocs/services.py index 148baec..55685c8 100644 --- a/api/api_v2/modules/edocs/services.py +++ b/api/api_v2/modules/edocs/services.py @@ -277,6 +277,7 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict): data = { "id": edoc.get("id"), "edocument_descargado": status, + "edocument_estado": "descargado" if status else "pendiente", "numero_edocument": edoc.get("numero_edocument"), "pedimento": pedimento.get("id"), "organizacion": pedimento.get("organizacion"), @@ -284,6 +285,36 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict): response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data) + # Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027) + if response is None: + logger.error(f"No se pudo actualizar el estatus del EDocument {edoc.get('numero_edocument')} en la API") + raise Exception(f"Fallo al actualizar el estatus del EDocument {edoc.get('numero_edocument')}") + + return response + + +async def marcar_error_edocument(edoc: dict, pedimento: dict, mensaje: str, definitivo: bool = False): + """ + Reporta un fallo de descarga al registro de negocio (T2026-05-027). + + - definitivo=False (fallo transitorio): solo registra ultimo_error; el registro + permanece 'pendiente' y el tope de intentos automáticos del backend gobierna + la transición a 'error'. + - definitivo=True (fallo permanente): transiciona de inmediato a 'error'; + queda fuera del ciclo automático, solo reproceso manual o reset. + """ + data = { + "id": edoc.get("id"), + "ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000], + "numero_edocument": edoc.get("numero_edocument"), + "pedimento": pedimento.get("id"), + "organizacion": pedimento.get("organizacion"), + } + if definitivo: + data["edocument_estado"] = "error" + response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data) + if response is None: + logger.error(f"No se pudo registrar el error del EDocument {edoc.get('numero_edocument')} en la API") return response diff --git a/api/api_v2/modules/edocs/tasks.py b/api/api_v2/modules/edocs/tasks.py index 927b8b1..4d9b465 100644 --- a/api/api_v2/modules/edocs/tasks.py +++ b/api/api_v2/modules/edocs/tasks.py @@ -5,12 +5,16 @@ from celery_app import celery_app from typing import Dict from fastapi import HTTPException as _HTTPException -from .services import obtener_edoc +from .services import obtener_edoc, marcar_error_edocument from api.api_v2.modules.tasks.services import register_task, update_task # Logger para el módulo logger = logging.getLogger(__name__) +# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no +# incrementan el contador de intentos del backend (T2026-05-027) +WORKER_MAX_RETRIES = 2 + @celery_app.task(bind=True) def process_edoc_download_request(self, edoc_data: Dict) -> Dict: """ @@ -29,18 +33,19 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict: asyncio.set_event_loop(loop) try: - # Registrar el inicio de la tarea - logger.info(f"[EDOC] Registrando inicio de tarea {task_id}") - loop.run_until_complete( - register_task( - task_id=task_id, - status="submitted", - message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}", - pedimento_id=pedimento_id, - organizacion_id=organizacion_id, - servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents + # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos) + if self.request.retries == 0: + logger.info(f"[EDOC] Registrando inicio de tarea {task_id}") + loop.run_until_complete( + register_task( + task_id=task_id, + status="submitted", + message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents + ) ) - ) # Esperar un momento breve para asegurar que el registro se complete time.sleep(1) @@ -79,7 +84,22 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict: # En caso de error, actualizar estado error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}" logger.error(error_message, exc_info=True) - + + # Reintento interno del worker para fallos transitorios (red, timeout): + # mismo intento orquestado, NO incrementa el contador del backend + if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES: + raise self.retry(exc=e, countdown=60 * (self.request.retries + 1)) + + # Fallo definitivo de este intento: registrar el detalle en el registro de + # negocio. Permanece 'pendiente'; el tope de intentos automáticos del + # backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'. + try: + loop.run_until_complete( + marcar_error_edocument(edoc_info, pedimento_info, error_message, definitivo=False) + ) + except Exception as report_error: + logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}") + try: loop.run_until_complete( update_task( diff --git a/api/api_v2/modules/partidas/services.py b/api/api_v2/modules/partidas/services.py index 65f6736..4232fbb 100644 --- a/api/api_v2/modules/partidas/services.py +++ b/api/api_v2/modules/partidas/services.py @@ -95,7 +95,17 @@ async def consume_ws_get_partida(**kwargs): ) ) + # Una partida solo cuenta como descargada si la respuesta es una + # consultarPartidaRespuesta real sin true. + # Cualquier otro contenido (eco de la petición, fault sin tieneError, + # HTML de proxy, etc.) se guarda como ERROR y no marca descargado. + motivo_error = None if soap_error(soap_response): + motivo_error = "La respuesta contiene un error de VUCEM" + elif "consultarpartidarespuesta" not in (soap_response.text or "").lower(): + motivo_error = "La respuesta no contiene el nodo consultarPartidaRespuesta" + + if motivo_error: error_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_ERROR.xml" try: document_response = await partida_rest_controller.post_or_update_document( @@ -109,13 +119,13 @@ async def consume_ws_get_partida(**kwargs): except Exception as e: logger.error(f"Error al guardar la respuesta de error: {e}") # Continuamos con el error original - - logger.error("Error en la respuesta del servicio SOAP") + + logger.error(f"Error en la respuesta del servicio SOAP: {motivo_error}") raise HTTPException( status_code=500, detail=create_error_response( message="Error en la respuesta del servicio SOAP", - errors=["La respuesta contiene un error de VUCEM"], + errors=[motivo_error], data={"soap_response": soap_response.text[:500] if hasattr(soap_response, 'text') else None}, metadata={ "partida_numero": partida.get('numero'), @@ -123,7 +133,7 @@ async def consume_ws_get_partida(**kwargs): } ) ) - + logger.info("Respuesta SOAP exitosa, enviando documento") # Enviar documento @@ -151,6 +161,22 @@ async def consume_ws_get_partida(**kwargs): ) ) + # post_or_update_document retorna None en error (no lanza excepción); + # sin documento guardado la partida NO debe marcarse como descargada. + if not document_response: + logger.error(f"El documento de la partida no se guardó, no se marca descargado: {_file_name}") + raise HTTPException( + status_code=500, + detail=create_error_response( + message="Error al guardar el documento de la partida", + errors=["El API no confirmó el guardado del documento"], + metadata={ + "file_name": _file_name, + "partida_numero": partida.get('numero') + } + ) + ) + logger.info("Documento enviado, actualizando status de Partida") # Actualizar status de la partida diff --git a/api/api_v2/modules/pedimentos/controllers.py b/api/api_v2/modules/pedimentos/controllers.py index 172ae4b..4a3dcd6 100644 --- a/api/api_v2/modules/pedimentos/controllers.py +++ b/api/api_v2/modules/pedimentos/controllers.py @@ -259,13 +259,14 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento def _remesas(self, root: ET.Element) -> bool: """ Método para verificar si el pedimento tiene remesas. - Busca identificadores con clave 'RC' (REMESAS DE CONSOLIDADO). - + Busca identificadores con clave 'PC' (PEDIMENTO CONSOLIDADO) + o 'RC' (REMESAS DE CONSOLIDADO). + Args: root: Elemento raíz del XML. - + Returns: - True si encuentra identificadores con clave 'RC', False en caso contrario. + True si encuentra identificadores con clave 'PC' o 'RC', False en caso contrario. """ namespaces = { 'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto', @@ -281,8 +282,8 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces) clave = clave_elem.text if clave_elem is not None else None - # Si encontramos una clave 'RC', el pedimento tiene remesas - if clave == 'RC': + # PC (consolidado) o RC (remesas de consolidado) indican remesas + if clave in ('PC', 'RC'): return True except Exception as e: @@ -290,7 +291,7 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento print(f"Error procesando identificador para remesas: {e}") continue - print("No se encontraron remesas (sin identificadores RC)") + print("No se encontraron remesas (sin identificadores PC/RC)") return False def _get_tipo_operacion(self, root: ET.Element) -> str: diff --git a/controllers/RESTController.py b/controllers/RESTController.py index 1263165..bde5000 100644 --- a/controllers/RESTController.py +++ b/controllers/RESTController.py @@ -100,31 +100,18 @@ class APIRESTController: identifier: str = None, binary_content: bytes = None, ) -> Dict[str, Any]: """ - Guarda un documento VU (request o error). Si ya existe uno del mismo - tipo/pedimento/identificador, elimina el anterior y crea el nuevo, - evitando duplicados en ejecuciones recurrentes (tarea programada diaria). + Guarda un documento VU (request, error o respuesta). Crea PRIMERO el + documento nuevo y, solo si se guardó con éxito, elimina los previos del + mismo tipo/pedimento/identificador. Así se evitan duplicados en + ejecuciones recurrentes sin perder el documento anterior cuando el + guardado del nuevo falla (antes se borraba el previo antes del POST). identifier: cadena única del documento dentro del pedimento (ej: número de COVE, número de e-document). Se usa como filtro archivo__icontains para distinguir documentos del mismo tipo pertenecientes a distintos COVEs o edocs. """ - try: - query = f'record/documents/?pedimento={pedimento}&document_type={document_type}' - if identifier: - query += f'&archivo__icontains={identifier}' - existing = await self._make_request_async('GET', query) - if existing: - results = existing.get('results', existing) if isinstance(existing, dict) else existing - if isinstance(results, list) and results: - existing_id = results[0].get('id') - if existing_id: - await self._make_request_async('DELETE', f'record/documents/{existing_id}/') - logger.info(f"Documento VU previo eliminado: id={existing_id}, document_type={document_type}, identifier={identifier}") - except Exception as e: - logger.warning(f"No se pudo verificar/eliminar documento VU existente (document_type={document_type}, identifier={identifier}): {e}") - - return await self.post_document( + new_document = await self.post_document( soap_response=soap_response, binary_content=binary_content, organizacion=organizacion, @@ -133,6 +120,33 @@ class APIRESTController: document_type=document_type, fuente=fuente, ) + if not new_document: + # POST falló: conservar los documentos previos intactos + logger.error(f"post_or_update_document: no se guardó el documento nuevo (document_type={document_type}, identifier={identifier}); se conservan los previos") + return new_document + + new_id = new_document.get('id') if isinstance(new_document, dict) else None + if not new_id: + logger.warning(f"post_or_update_document: respuesta sin id, se omite limpieza de previos (document_type={document_type}, identifier={identifier})") + return new_document + + try: + query = f'record/documents/?pedimento={pedimento}&document_type={document_type}' + if identifier: + query += f'&archivo__icontains={identifier}' + existing = await self._make_request_async('GET', query) + if existing: + results = existing.get('results', existing) if isinstance(existing, dict) else existing + if isinstance(results, list): + for previous in results: + previous_id = previous.get('id') + if previous_id and previous_id != new_id: + await self._make_request_async('DELETE', f'record/documents/{previous_id}/') + logger.info(f"Documento VU previo eliminado: id={previous_id}, document_type={document_type}, identifier={identifier}") + except Exception as e: + logger.warning(f"No se pudo verificar/eliminar documento VU existente (document_type={document_type}, identifier={identifier}): {e}") + + return new_document async def put_procesamiento(self, service_id: int, data: Dict[str, Any]) -> Dict[str, Any]: return await self._make_request_async('PUT', f'customs/procesamientopedimentos/{service_id}/', data=data) diff --git a/controllers/XMLController.py b/controllers/XMLController.py index 3a60f36..8523ad4 100644 --- a/controllers/XMLController.py +++ b/controllers/XMLController.py @@ -135,13 +135,14 @@ class XMLScraper: # Clase me extrae datos de Pedimento def _remesas(self, root: ET.Element) -> bool: """ Método para verificar si el pedimento tiene remesas. - Busca identificadores con clave 'RC' (REMESAS DE CONSOLIDADO). - + Busca identificadores con clave 'PC' (PEDIMENTO CONSOLIDADO) + o 'RC' (REMESAS DE CONSOLIDADO). + Args: root: Elemento raíz del XML. - + Returns: - True si encuentra identificadores con clave 'RC', False en caso contrario. + True si encuentra identificadores con clave 'PC' o 'RC', False en caso contrario. """ namespaces = { 'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto', @@ -157,8 +158,8 @@ class XMLScraper: # Clase me extrae datos de Pedimento clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces) clave = clave_elem.text if clave_elem is not None else None - # Si encontramos una clave 'RC', el pedimento tiene remesas - if clave == 'RC': + # PC (consolidado) o RC (remesas de consolidado) indican remesas + if clave in ('PC', 'RC'): return True except Exception as e: @@ -166,7 +167,7 @@ class XMLScraper: # Clase me extrae datos de Pedimento print(f"Error procesando identificador para remesas: {e}") continue - print("No se encontraron remesas (sin identificadores RC)") + print("No se encontraron remesas (sin identificadores PC/RC)") return False def _get_tipo_operacion(self, root: ET.Element) -> str: