Merge pull request 'feature/capturar errores, evitar duplicados, eliminar manejar nuevas flags para descargar datos de vucem' (#12) from feature/capturar-errores-evitar-duplicados into main
Reviewed-on: #12
This commit is contained in:
@@ -203,6 +203,7 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
|
||||
data = {
|
||||
"id": edoc.get("id"),
|
||||
"acuse_descargado": status,
|
||||
"acuse_estado": "descargado" if status else "pendiente",
|
||||
"numero_edocument": edoc.get("numero_edocument"),
|
||||
"pedimento": pedimento.get("id"),
|
||||
"organizacion": pedimento.get("organizacion"),
|
||||
@@ -210,6 +211,36 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
|
||||
|
||||
response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
|
||||
|
||||
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
|
||||
if response is None:
|
||||
logger.error(f"No se pudo actualizar el estatus del acuse del EDocument {edoc.get('numero_edocument')} en la API")
|
||||
raise Exception(f"Fallo al actualizar el estatus del acuse del EDocument {edoc.get('numero_edocument')}")
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def marcar_error_acuse(edoc: dict, pedimento: dict, mensaje: str, definitivo: bool = False):
|
||||
"""
|
||||
Reporta un fallo de descarga del acuse al registro de negocio (T2026-05-027).
|
||||
|
||||
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
|
||||
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
|
||||
la transición a 'error'.
|
||||
- definitivo=True (fallo permanente): transiciona de inmediato a 'error';
|
||||
queda fuera del ciclo automático, solo reproceso manual o reset.
|
||||
"""
|
||||
data = {
|
||||
"id": edoc.get("id"),
|
||||
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
|
||||
"numero_edocument": edoc.get("numero_edocument"),
|
||||
"pedimento": pedimento.get("id"),
|
||||
"organizacion": pedimento.get("organizacion"),
|
||||
}
|
||||
if definitivo:
|
||||
data["acuse_estado"] = "error"
|
||||
response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
|
||||
if response is None:
|
||||
logger.error(f"No se pudo registrar el error del acuse del EDocument {edoc.get('numero_edocument')} en la API")
|
||||
return response
|
||||
|
||||
def _decode_acuse_base64_content(base64_content): # Testeado
|
||||
|
||||
@@ -6,9 +6,13 @@ from typing import Dict, Any
|
||||
from contextlib import asynccontextmanager
|
||||
from fastapi import HTTPException
|
||||
|
||||
from .services import obtener_acuse
|
||||
from .services import obtener_acuse, marcar_error_acuse
|
||||
from api.api_v2.modules.tasks.services import register_task, update_task
|
||||
|
||||
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
|
||||
# incrementan el contador de intentos del backend (T2026-05-027)
|
||||
WORKER_MAX_RETRIES = 2
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||
@@ -20,23 +24,25 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
|
||||
pedimento_id = pedimento_info.get('id')
|
||||
organizacion_id = pedimento_info.get('organizacion')
|
||||
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
||||
|
||||
edoc_info = acuse_request.get('edoc', {})
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
|
||||
try:
|
||||
# Registrar el inicio de la tarea
|
||||
logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}")
|
||||
loop.run_until_complete(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
status="submitted",
|
||||
message=f"Iniciando proceso de acuse para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=6 # 6 corresponde a "Acuse"
|
||||
# Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
|
||||
if self.request.retries == 0:
|
||||
logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}")
|
||||
loop.run_until_complete(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
status="submitted",
|
||||
message=f"Iniciando proceso de acuse para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=6 # 6 corresponde a "Acuse"
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
# Esperar un momento breve para asegurar que el registro se complete
|
||||
import time
|
||||
@@ -76,6 +82,22 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
|
||||
# En caso de error, actualizar estado
|
||||
error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}"
|
||||
logging.error(error_message)
|
||||
|
||||
# Reintento interno del worker para fallos transitorios (red, timeout):
|
||||
# mismo intento orquestado, NO incrementa el contador del backend
|
||||
if not isinstance(e, HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
|
||||
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
|
||||
|
||||
# Fallo definitivo de este intento: registrar el detalle en el registro de
|
||||
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
|
||||
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
marcar_error_acuse(edoc_info, pedimento_info, error_message, definitivo=False)
|
||||
)
|
||||
except Exception as report_error:
|
||||
logging.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
|
||||
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
|
||||
@@ -146,6 +146,13 @@ async def consume_ws_get_cove(**kwargs):
|
||||
logger.error(f"Error detectado en la respuesta SOAP: {str(e)}")
|
||||
raise Exception(f"Error en la respuesta SOAP: {str(e)}")
|
||||
|
||||
# Validar que el documento quedó persistido en la API antes de marcar el
|
||||
# COVE como descargado (T2026-05-027): post_or_update_document retorna
|
||||
# None en error sin lanzar excepción
|
||||
if not document_response or document_response.get("id") is None:
|
||||
logger.error(f"No se pudo persistir el COVE {cove} en la API; el estatus no se actualiza")
|
||||
raise Exception(f"No se pudo persistir el COVE {cove} en la API")
|
||||
|
||||
logger.info("Documento enviado, actualizando status de COVE")
|
||||
|
||||
# Actualizar status del COVE
|
||||
@@ -329,6 +336,15 @@ async def consume_ws_get_acuse_cove(**kwargs):
|
||||
identifier=cove_identifier,
|
||||
)
|
||||
|
||||
# Validar la subida antes de marcar el acuse como descargado (T2026-05-027):
|
||||
# post_or_update_document retorna None en error sin lanzar excepción
|
||||
if not rest_response or rest_response.get("id") is None:
|
||||
logger.error("Error al enviar el acuse de COVE a la API interna; el estatus no se actualiza")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="No se pudo persistir el acuse del COVE en la API; el estatus no se actualiza"
|
||||
)
|
||||
|
||||
acuse_status = await change_acuse_status(
|
||||
cove=kwargs.get('cove'),
|
||||
status=True,
|
||||
@@ -507,6 +523,7 @@ async def change_cove_status(cove: dict, status: bool, pedimento: dict):
|
||||
data = {
|
||||
"id": cove.get("id"),
|
||||
"cove_descargado": status,
|
||||
"cove_estado": "descargado" if status else "pendiente",
|
||||
"numero_cove": cove.get("cove"),
|
||||
"pedimento": pedimento.get("id"),
|
||||
"organizacion": pedimento.get("organizacion"),
|
||||
@@ -514,6 +531,11 @@ async def change_cove_status(cove: dict, status: bool, pedimento: dict):
|
||||
|
||||
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
|
||||
|
||||
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
|
||||
if response is None:
|
||||
logger.error(f"No se pudo actualizar el estatus del COVE {cove.get('cove')} en la API")
|
||||
raise Exception(f"Fallo al actualizar el estatus del COVE {cove.get('cove')}")
|
||||
|
||||
return response
|
||||
|
||||
|
||||
@@ -521,14 +543,46 @@ async def change_acuse_status(cove: dict, status: bool, pedimento: dict):
|
||||
data = {
|
||||
"id": cove.get("id"),
|
||||
"acuse_cove_descargado": status,
|
||||
"acuse_cove_estado": "descargado" if status else "pendiente",
|
||||
"numero_cove": cove.get("cove"),
|
||||
"pedimento": pedimento.get("id"),
|
||||
"organizacion": pedimento.get("organizacion"),
|
||||
}
|
||||
|
||||
print(data)
|
||||
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
|
||||
|
||||
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
|
||||
if response is None:
|
||||
logger.error(f"No se pudo actualizar el estatus del acuse del COVE {cove.get('cove')} en la API")
|
||||
raise Exception(f"Fallo al actualizar el estatus del acuse del COVE {cove.get('cove')}")
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def marcar_error_cove(cove: dict, pedimento: dict, mensaje: str, acuse: bool = False, definitivo: bool = False):
|
||||
"""
|
||||
Reporta un fallo de descarga al registro de negocio (T2026-05-027).
|
||||
|
||||
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
|
||||
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
|
||||
la transición a 'error'.
|
||||
- definitivo=True (fallo permanente: VUCEM responde que no existe, credencial
|
||||
rechazada): transiciona de inmediato a 'error'; queda fuera del ciclo
|
||||
automático y solo el reproceso manual o el reset lo reincorporan.
|
||||
"""
|
||||
campo = "acuse_cove_estado" if acuse else "cove_estado"
|
||||
data = {
|
||||
"id": cove.get("id"),
|
||||
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
|
||||
"numero_cove": cove.get("cove"),
|
||||
"pedimento": pedimento.get("id"),
|
||||
"organizacion": pedimento.get("organizacion"),
|
||||
}
|
||||
if definitivo:
|
||||
data[campo] = "error"
|
||||
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
|
||||
if response is None:
|
||||
logger.error(f"No se pudo registrar el error del COVE {cove.get('cove')} en la API")
|
||||
return response
|
||||
|
||||
|
||||
|
||||
@@ -5,12 +5,16 @@ from celery_app import celery_app
|
||||
from typing import Dict, Any
|
||||
from fastapi import HTTPException as _HTTPException
|
||||
|
||||
from .services import consume_ws_get_cove, consume_ws_get_acuse_cove
|
||||
from .services import consume_ws_get_cove, consume_ws_get_acuse_cove, marcar_error_cove
|
||||
from api.api_v2.modules.tasks.services import update_task, register_task
|
||||
|
||||
# Logger para el módulo
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
|
||||
# incrementan el contador de intentos del backend (T2026-05-027)
|
||||
WORKER_MAX_RETRIES = 2
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||
@@ -27,18 +31,19 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||
cove_number = cove_info.get('cove', 'N/A')
|
||||
|
||||
try:
|
||||
# Registrar el inicio de la tarea
|
||||
logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
|
||||
asyncio.run(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
status="submitted",
|
||||
message=f"Iniciando proceso de COVE {cove_number} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=8 # 8 corresponde a "Cove"
|
||||
# Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
|
||||
if self.request.retries == 0:
|
||||
logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
|
||||
asyncio.run(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
status="submitted",
|
||||
message=f"Iniciando proceso de COVE {cove_number} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=8 # 8 corresponde a "Cove"
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}")
|
||||
asyncio.run(
|
||||
@@ -70,7 +75,22 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||
except Exception as e:
|
||||
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
||||
logger.error(error_message)
|
||||
|
||||
|
||||
# Reintento interno del worker para fallos transitorios (red, timeout):
|
||||
# mismo intento orquestado, NO incrementa el contador del backend
|
||||
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
|
||||
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
|
||||
|
||||
# Fallo definitivo de este intento: registrar el detalle en el registro de
|
||||
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
|
||||
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
|
||||
try:
|
||||
asyncio.run(
|
||||
marcar_error_cove(cove_info, pedimento_info, error_message, acuse=False, definitivo=False)
|
||||
)
|
||||
except Exception as report_error:
|
||||
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
|
||||
|
||||
try:
|
||||
asyncio.run(
|
||||
update_task(
|
||||
@@ -105,18 +125,19 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
||||
cove_number = cove_info.get('cove', 'N/A')
|
||||
|
||||
try:
|
||||
# Registrar el inicio de la tarea
|
||||
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
|
||||
asyncio.run(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
status="submitted",
|
||||
message=f"Iniciando proceso de acuse de COVE {cove_number} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=9 # 9 corresponde a "Acuse Cove"
|
||||
# Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
|
||||
if self.request.retries == 0:
|
||||
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
|
||||
asyncio.run(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
status="submitted",
|
||||
message=f"Iniciando proceso de acuse de COVE {cove_number} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=9 # 9 corresponde a "Acuse Cove"
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}")
|
||||
asyncio.run(
|
||||
@@ -148,7 +169,22 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
||||
except Exception as e:
|
||||
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
||||
logger.error(error_message)
|
||||
|
||||
|
||||
# Reintento interno del worker para fallos transitorios (red, timeout):
|
||||
# mismo intento orquestado, NO incrementa el contador del backend
|
||||
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
|
||||
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
|
||||
|
||||
# Fallo definitivo de este intento: registrar el detalle en el registro de
|
||||
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
|
||||
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
|
||||
try:
|
||||
asyncio.run(
|
||||
marcar_error_cove(cove_info, pedimento_info, error_message, acuse=True, definitivo=False)
|
||||
)
|
||||
except Exception as report_error:
|
||||
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
|
||||
|
||||
try:
|
||||
asyncio.run(
|
||||
update_task(
|
||||
|
||||
@@ -277,6 +277,7 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
|
||||
data = {
|
||||
"id": edoc.get("id"),
|
||||
"edocument_descargado": status,
|
||||
"edocument_estado": "descargado" if status else "pendiente",
|
||||
"numero_edocument": edoc.get("numero_edocument"),
|
||||
"pedimento": pedimento.get("id"),
|
||||
"organizacion": pedimento.get("organizacion"),
|
||||
@@ -284,6 +285,36 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
|
||||
|
||||
response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
|
||||
|
||||
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
|
||||
if response is None:
|
||||
logger.error(f"No se pudo actualizar el estatus del EDocument {edoc.get('numero_edocument')} en la API")
|
||||
raise Exception(f"Fallo al actualizar el estatus del EDocument {edoc.get('numero_edocument')}")
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def marcar_error_edocument(edoc: dict, pedimento: dict, mensaje: str, definitivo: bool = False):
|
||||
"""
|
||||
Reporta un fallo de descarga al registro de negocio (T2026-05-027).
|
||||
|
||||
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
|
||||
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
|
||||
la transición a 'error'.
|
||||
- definitivo=True (fallo permanente): transiciona de inmediato a 'error';
|
||||
queda fuera del ciclo automático, solo reproceso manual o reset.
|
||||
"""
|
||||
data = {
|
||||
"id": edoc.get("id"),
|
||||
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
|
||||
"numero_edocument": edoc.get("numero_edocument"),
|
||||
"pedimento": pedimento.get("id"),
|
||||
"organizacion": pedimento.get("organizacion"),
|
||||
}
|
||||
if definitivo:
|
||||
data["edocument_estado"] = "error"
|
||||
response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
|
||||
if response is None:
|
||||
logger.error(f"No se pudo registrar el error del EDocument {edoc.get('numero_edocument')} en la API")
|
||||
return response
|
||||
|
||||
|
||||
|
||||
@@ -5,12 +5,16 @@ from celery_app import celery_app
|
||||
from typing import Dict
|
||||
from fastapi import HTTPException as _HTTPException
|
||||
|
||||
from .services import obtener_edoc
|
||||
from .services import obtener_edoc, marcar_error_edocument
|
||||
from api.api_v2.modules.tasks.services import register_task, update_task
|
||||
|
||||
# Logger para el módulo
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
|
||||
# incrementan el contador de intentos del backend (T2026-05-027)
|
||||
WORKER_MAX_RETRIES = 2
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
||||
"""
|
||||
@@ -29,18 +33,19 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
try:
|
||||
# Registrar el inicio de la tarea
|
||||
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
|
||||
loop.run_until_complete(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
status="submitted",
|
||||
message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
|
||||
# Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
|
||||
if self.request.retries == 0:
|
||||
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
|
||||
loop.run_until_complete(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
status="submitted",
|
||||
message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
# Esperar un momento breve para asegurar que el registro se complete
|
||||
time.sleep(1)
|
||||
@@ -79,7 +84,22 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
||||
# En caso de error, actualizar estado
|
||||
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
|
||||
logger.error(error_message, exc_info=True)
|
||||
|
||||
|
||||
# Reintento interno del worker para fallos transitorios (red, timeout):
|
||||
# mismo intento orquestado, NO incrementa el contador del backend
|
||||
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
|
||||
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
|
||||
|
||||
# Fallo definitivo de este intento: registrar el detalle en el registro de
|
||||
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
|
||||
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
marcar_error_edocument(edoc_info, pedimento_info, error_message, definitivo=False)
|
||||
)
|
||||
except Exception as report_error:
|
||||
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
|
||||
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
|
||||
@@ -95,7 +95,17 @@ async def consume_ws_get_partida(**kwargs):
|
||||
)
|
||||
)
|
||||
|
||||
# Una partida solo cuenta como descargada si la respuesta es una
|
||||
# consultarPartidaRespuesta real sin <tieneError>true</tieneError>.
|
||||
# Cualquier otro contenido (eco de la petición, fault sin tieneError,
|
||||
# HTML de proxy, etc.) se guarda como ERROR y no marca descargado.
|
||||
motivo_error = None
|
||||
if soap_error(soap_response):
|
||||
motivo_error = "La respuesta contiene un error de VUCEM"
|
||||
elif "consultarpartidarespuesta" not in (soap_response.text or "").lower():
|
||||
motivo_error = "La respuesta no contiene el nodo consultarPartidaRespuesta"
|
||||
|
||||
if motivo_error:
|
||||
error_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_ERROR.xml"
|
||||
try:
|
||||
document_response = await partida_rest_controller.post_or_update_document(
|
||||
@@ -109,13 +119,13 @@ async def consume_ws_get_partida(**kwargs):
|
||||
except Exception as e:
|
||||
logger.error(f"Error al guardar la respuesta de error: {e}")
|
||||
# Continuamos con el error original
|
||||
|
||||
logger.error("Error en la respuesta del servicio SOAP")
|
||||
|
||||
logger.error(f"Error en la respuesta del servicio SOAP: {motivo_error}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=create_error_response(
|
||||
message="Error en la respuesta del servicio SOAP",
|
||||
errors=["La respuesta contiene un error de VUCEM"],
|
||||
errors=[motivo_error],
|
||||
data={"soap_response": soap_response.text[:500] if hasattr(soap_response, 'text') else None},
|
||||
metadata={
|
||||
"partida_numero": partida.get('numero'),
|
||||
@@ -123,7 +133,7 @@ async def consume_ws_get_partida(**kwargs):
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
logger.info("Respuesta SOAP exitosa, enviando documento")
|
||||
|
||||
# Enviar documento
|
||||
@@ -151,6 +161,22 @@ async def consume_ws_get_partida(**kwargs):
|
||||
)
|
||||
)
|
||||
|
||||
# post_or_update_document retorna None en error (no lanza excepción);
|
||||
# sin documento guardado la partida NO debe marcarse como descargada.
|
||||
if not document_response:
|
||||
logger.error(f"El documento de la partida no se guardó, no se marca descargado: {_file_name}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=create_error_response(
|
||||
message="Error al guardar el documento de la partida",
|
||||
errors=["El API no confirmó el guardado del documento"],
|
||||
metadata={
|
||||
"file_name": _file_name,
|
||||
"partida_numero": partida.get('numero')
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
logger.info("Documento enviado, actualizando status de Partida")
|
||||
|
||||
# Actualizar status de la partida
|
||||
|
||||
@@ -259,13 +259,14 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
|
||||
def _remesas(self, root: ET.Element) -> bool:
|
||||
"""
|
||||
Método para verificar si el pedimento tiene remesas.
|
||||
Busca identificadores con clave 'RC' (REMESAS DE CONSOLIDADO).
|
||||
|
||||
Busca identificadores con clave 'PC' (PEDIMENTO CONSOLIDADO)
|
||||
o 'RC' (REMESAS DE CONSOLIDADO).
|
||||
|
||||
Args:
|
||||
root: Elemento raíz del XML.
|
||||
|
||||
|
||||
Returns:
|
||||
True si encuentra identificadores con clave 'RC', False en caso contrario.
|
||||
True si encuentra identificadores con clave 'PC' o 'RC', False en caso contrario.
|
||||
"""
|
||||
namespaces = {
|
||||
'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto',
|
||||
@@ -281,8 +282,8 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
|
||||
clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces)
|
||||
clave = clave_elem.text if clave_elem is not None else None
|
||||
|
||||
# Si encontramos una clave 'RC', el pedimento tiene remesas
|
||||
if clave == 'RC':
|
||||
# PC (consolidado) o RC (remesas de consolidado) indican remesas
|
||||
if clave in ('PC', 'RC'):
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
@@ -290,7 +291,7 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
|
||||
print(f"Error procesando identificador para remesas: {e}")
|
||||
continue
|
||||
|
||||
print("No se encontraron remesas (sin identificadores RC)")
|
||||
print("No se encontraron remesas (sin identificadores PC/RC)")
|
||||
return False
|
||||
|
||||
def _get_tipo_operacion(self, root: ET.Element) -> str:
|
||||
|
||||
@@ -100,31 +100,18 @@ class APIRESTController:
|
||||
identifier: str = None, binary_content: bytes = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Guarda un documento VU (request o error). Si ya existe uno del mismo
|
||||
tipo/pedimento/identificador, elimina el anterior y crea el nuevo,
|
||||
evitando duplicados en ejecuciones recurrentes (tarea programada diaria).
|
||||
Guarda un documento VU (request, error o respuesta). Crea PRIMERO el
|
||||
documento nuevo y, solo si se guardó con éxito, elimina los previos del
|
||||
mismo tipo/pedimento/identificador. Así se evitan duplicados en
|
||||
ejecuciones recurrentes sin perder el documento anterior cuando el
|
||||
guardado del nuevo falla (antes se borraba el previo antes del POST).
|
||||
|
||||
identifier: cadena única del documento dentro del pedimento
|
||||
(ej: número de COVE, número de e-document). Se usa como
|
||||
filtro archivo__icontains para distinguir documentos del
|
||||
mismo tipo pertenecientes a distintos COVEs o edocs.
|
||||
"""
|
||||
try:
|
||||
query = f'record/documents/?pedimento={pedimento}&document_type={document_type}'
|
||||
if identifier:
|
||||
query += f'&archivo__icontains={identifier}'
|
||||
existing = await self._make_request_async('GET', query)
|
||||
if existing:
|
||||
results = existing.get('results', existing) if isinstance(existing, dict) else existing
|
||||
if isinstance(results, list) and results:
|
||||
existing_id = results[0].get('id')
|
||||
if existing_id:
|
||||
await self._make_request_async('DELETE', f'record/documents/{existing_id}/')
|
||||
logger.info(f"Documento VU previo eliminado: id={existing_id}, document_type={document_type}, identifier={identifier}")
|
||||
except Exception as e:
|
||||
logger.warning(f"No se pudo verificar/eliminar documento VU existente (document_type={document_type}, identifier={identifier}): {e}")
|
||||
|
||||
return await self.post_document(
|
||||
new_document = await self.post_document(
|
||||
soap_response=soap_response,
|
||||
binary_content=binary_content,
|
||||
organizacion=organizacion,
|
||||
@@ -133,6 +120,33 @@ class APIRESTController:
|
||||
document_type=document_type,
|
||||
fuente=fuente,
|
||||
)
|
||||
if not new_document:
|
||||
# POST falló: conservar los documentos previos intactos
|
||||
logger.error(f"post_or_update_document: no se guardó el documento nuevo (document_type={document_type}, identifier={identifier}); se conservan los previos")
|
||||
return new_document
|
||||
|
||||
new_id = new_document.get('id') if isinstance(new_document, dict) else None
|
||||
if not new_id:
|
||||
logger.warning(f"post_or_update_document: respuesta sin id, se omite limpieza de previos (document_type={document_type}, identifier={identifier})")
|
||||
return new_document
|
||||
|
||||
try:
|
||||
query = f'record/documents/?pedimento={pedimento}&document_type={document_type}'
|
||||
if identifier:
|
||||
query += f'&archivo__icontains={identifier}'
|
||||
existing = await self._make_request_async('GET', query)
|
||||
if existing:
|
||||
results = existing.get('results', existing) if isinstance(existing, dict) else existing
|
||||
if isinstance(results, list):
|
||||
for previous in results:
|
||||
previous_id = previous.get('id')
|
||||
if previous_id and previous_id != new_id:
|
||||
await self._make_request_async('DELETE', f'record/documents/{previous_id}/')
|
||||
logger.info(f"Documento VU previo eliminado: id={previous_id}, document_type={document_type}, identifier={identifier}")
|
||||
except Exception as e:
|
||||
logger.warning(f"No se pudo verificar/eliminar documento VU existente (document_type={document_type}, identifier={identifier}): {e}")
|
||||
|
||||
return new_document
|
||||
|
||||
async def put_procesamiento(self, service_id: int, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return await self._make_request_async('PUT', f'customs/procesamientopedimentos/{service_id}/', data=data)
|
||||
|
||||
@@ -135,13 +135,14 @@ class XMLScraper: # Clase me extrae datos de Pedimento
|
||||
def _remesas(self, root: ET.Element) -> bool:
|
||||
"""
|
||||
Método para verificar si el pedimento tiene remesas.
|
||||
Busca identificadores con clave 'RC' (REMESAS DE CONSOLIDADO).
|
||||
|
||||
Busca identificadores con clave 'PC' (PEDIMENTO CONSOLIDADO)
|
||||
o 'RC' (REMESAS DE CONSOLIDADO).
|
||||
|
||||
Args:
|
||||
root: Elemento raíz del XML.
|
||||
|
||||
|
||||
Returns:
|
||||
True si encuentra identificadores con clave 'RC', False en caso contrario.
|
||||
True si encuentra identificadores con clave 'PC' o 'RC', False en caso contrario.
|
||||
"""
|
||||
namespaces = {
|
||||
'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto',
|
||||
@@ -157,8 +158,8 @@ class XMLScraper: # Clase me extrae datos de Pedimento
|
||||
clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces)
|
||||
clave = clave_elem.text if clave_elem is not None else None
|
||||
|
||||
# Si encontramos una clave 'RC', el pedimento tiene remesas
|
||||
if clave == 'RC':
|
||||
# PC (consolidado) o RC (remesas de consolidado) indican remesas
|
||||
if clave in ('PC', 'RC'):
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
@@ -166,7 +167,7 @@ class XMLScraper: # Clase me extrae datos de Pedimento
|
||||
print(f"Error procesando identificador para remesas: {e}")
|
||||
continue
|
||||
|
||||
print("No se encontraron remesas (sin identificadores RC)")
|
||||
print("No se encontraron remesas (sin identificadores PC/RC)")
|
||||
return False
|
||||
|
||||
def _get_tipo_operacion(self, root: ET.Element) -> str:
|
||||
|
||||
Reference in New Issue
Block a user