11 Commits

Author SHA1 Message Date
b9c6ab89c3 feature/capturar errores, evitar duplicados, eliminar manejar nuevas flags para descargar datos de vucem 2026-06-15 11:20:58 -06:00
042d02e240 Merge pull request 'feature/pedimentos-corregidos' (#11) from feature/correccion-pedientos into main
Reviewed-on: #11
2026-05-28 13:20:34 +00:00
288a96bc59 feature/pedimentos-corregidos 2026-05-28 07:13:32 -06:00
9f59ac0d00 Merge pull request 'fix/forzar-procesamiento-acuses' (#10) from fix/forzar-procesamiento-acuses into main
Reviewed-on: #10
2026-05-25 20:55:27 +00:00
fe8e7dc10f fix/forzar-procesamiento-acuses 2026-05-25 14:54:46 -06:00
a6ce91d8af Merge pull request 'feature/rbac-implementation' (#9) from feature/rbac-implementation into main
Reviewed-on: #9
2026-05-21 13:58:59 +00:00
e174df0af3 feature/rbac-implementation 2026-05-21 07:56:20 -06:00
3c10653d6a Merge pull request 'feature/pedimento completo carga remesas, coves, edocs y acuses y aparte descarga sus documentos, se corrigieron las formulas de remesas, acuse y e documents para permitir la correcta descarga de susdocumentos y se aseguro que el status sea el correcto' (#8) from feature/T2026-05-016-y-T2026-05-030 into main
Reviewed-on: #8
2026-05-18 18:04:36 +00:00
Dulce
f96e5a227b feature/pedimento completo carga remesas, coves, edocs y acuses y aparte descarga sus documentos, se corrigieron las formulas de remesas, acuse y e documents para permitir la correcta descarga de susdocumentos y se aseguro que el status sea el correcto 2026-05-18 11:58:42 -06:00
ed00651a8b Merge pull request 'feature/lectura de credenciales de vucem desde archivos de minIO' (#7) from feature/minio-read-credentials into main
Reviewed-on: #7
2026-04-22 17:38:06 +00:00
Dulce
2e779e83f8 feature/lectura de credenciales de vucem desde archivos de minIO 2026-04-22 11:11:34 -06:00
25 changed files with 938 additions and 262 deletions

View File

@@ -9,6 +9,7 @@ from api.api_v2.modules.edocs.routers import router as edocs_router
from api.api_v2.modules.partidas.routers import router as partidas_router from api.api_v2.modules.partidas.routers import router as partidas_router
from api.api_v2.modules.pedimentos.routers import router as pedimentos_router from api.api_v2.modules.pedimentos.routers import router as pedimentos_router
from api.api_v2.modules.remesas.routers import router as remesas_router from api.api_v2.modules.remesas.routers import router as remesas_router
from api.api_v2.modules.stream.routers import router as stream_router
api_router = APIRouter() api_router = APIRouter()
@@ -20,3 +21,4 @@ api_router.include_router(edocs_router, tags=["EDocuments"])
api_router.include_router(partidas_router, tags=["Partidas"]) api_router.include_router(partidas_router, tags=["Partidas"])
api_router.include_router(pedimentos_router, tags=["Pedimentos"]) api_router.include_router(pedimentos_router, tags=["Pedimentos"])
api_router.include_router(remesas_router, tags=["Remesas"]) api_router.include_router(remesas_router, tags=["Remesas"])
api_router.include_router(stream_router, tags=["Stream"])

View File

@@ -33,12 +33,13 @@ async def obtener_acuse(**kwargs):
file_name_request = f"vu_AC_{pedimento_app}_{idEdocument_efc}_REQUEST.xml" file_name_request = f"vu_AC_{pedimento_app}_{idEdocument_efc}_REQUEST.xml"
document_response = await acuse_rest_controller.post_document( document_response = await acuse_rest_controller.post_or_update_document(
soap_response=soap_xml, soap_response=soap_xml,
organizacion=organizacion_efc, organizacion=organizacion_efc,
pedimento=pedimento_id_efc, pedimento=pedimento_id_efc,
file_name=file_name_request, file_name=file_name_request,
document_type=25, # Tipo de documento para request de acuse VU document_type=25,
identifier=idEdocument_efc,
) )
except Exception as e: except Exception as e:
@@ -72,11 +73,32 @@ async def obtener_acuse(**kwargs):
if soap_error(response): if soap_error(response):
logger.error("Error SOAP detectado en la respuesta") logger.error("Error SOAP detectado en la respuesta")
pedimento_efc = kwargs.get('pedimento', {})
organizacion_efc = pedimento_efc.get("organizacion", None)
pedimento_id_efc = pedimento_efc.get("id", None)
pedimento_app = pedimento_efc.get('pedimento_app', 'N/A')
idEdocument_efc = kwargs['edoc'].get('numero_edocument', 'N/A')
file_name_response = f"vu_AC_{pedimento_app}_{idEdocument_efc}_RESPONSE_ERROR.xml"
logger.info(f"Guardando RESPONSE_ERROR: file={file_name_response}, organizacion={organizacion_efc}, pedimento={pedimento_id_efc}")
doc_result = await acuse_rest_controller.post_or_update_document(
soap_response=response.text,
organizacion=organizacion_efc,
pedimento=pedimento_id_efc,
file_name=file_name_response,
document_type=26,
identifier=idEdocument_efc,
)
if doc_result is None:
logger.error(f"post_document retornó None para RESPONSE_ERROR — archivo físico guardado sin registro en BD")
else:
logger.info(f"RESPONSE_ERROR registrado en BD: id={doc_result.get('id')}, document_type={doc_result.get('document_type')}")
raise HTTPException( raise HTTPException(
status_code=500, status_code=500,
detail=create_error_response( detail=create_error_response(
message="Error en la respuesta del servicio SOAP", message="Error en la respuesta del servicio SOAP",
data={"soap_response": response.text[:500]} data={"soap_response": response.text[:3000]}
) )
) )
@@ -123,12 +145,13 @@ async def obtener_acuse(**kwargs):
organizacion = pedimento.get("organizacion", None) organizacion = pedimento.get("organizacion", None)
pedimento_id = pedimento.get("id", None) pedimento_id = pedimento.get("id", None)
rest_response = await acuse_rest_controller.post_document( rest_response = await acuse_rest_controller.post_or_update_document(
binary_content=pdf_bytes, binary_content=pdf_bytes,
organizacion=organizacion, organizacion=organizacion,
pedimento=pedimento_id, pedimento=pedimento_id,
file_name=_file_name, file_name=_file_name,
document_type=4 document_type=4,
identifier=idEdocument_efc,
) )
if rest_response is None: if rest_response is None:
@@ -180,6 +203,7 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
data = { data = {
"id": edoc.get("id"), "id": edoc.get("id"),
"acuse_descargado": status, "acuse_descargado": status,
"acuse_estado": "descargado" if status else "pendiente",
"numero_edocument": edoc.get("numero_edocument"), "numero_edocument": edoc.get("numero_edocument"),
"pedimento": pedimento.get("id"), "pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"), "organizacion": pedimento.get("organizacion"),
@@ -187,6 +211,36 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data) response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
if response is None:
logger.error(f"No se pudo actualizar el estatus del acuse del EDocument {edoc.get('numero_edocument')} en la API")
raise Exception(f"Fallo al actualizar el estatus del acuse del EDocument {edoc.get('numero_edocument')}")
return response
async def marcar_error_acuse(edoc: dict, pedimento: dict, mensaje: str, definitivo: bool = False):
"""
Reporta un fallo de descarga del acuse al registro de negocio (T2026-05-027).
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
la transición a 'error'.
- definitivo=True (fallo permanente): transiciona de inmediato a 'error';
queda fuera del ciclo automático, solo reproceso manual o reset.
"""
data = {
"id": edoc.get("id"),
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
"numero_edocument": edoc.get("numero_edocument"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
}
if definitivo:
data["acuse_estado"] = "error"
response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
if response is None:
logger.error(f"No se pudo registrar el error del acuse del EDocument {edoc.get('numero_edocument')} en la API")
return response return response
def _decode_acuse_base64_content(base64_content): # Testeado def _decode_acuse_base64_content(base64_content): # Testeado

View File

@@ -4,10 +4,15 @@ import asyncio
import logging import logging
from typing import Dict, Any from typing import Dict, Any
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from fastapi import HTTPException
from .services import obtener_acuse from .services import obtener_acuse, marcar_error_acuse
from api.api_v2.modules.tasks.services import register_task, update_task from api.api_v2.modules.tasks.services import register_task, update_task
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
# incrementan el contador de intentos del backend (T2026-05-027)
WORKER_MAX_RETRIES = 2
@celery_app.task(bind=True) @celery_app.task(bind=True)
def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]: def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]:
@@ -19,23 +24,25 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
pedimento_id = pedimento_info.get('id') pedimento_id = pedimento_info.get('id')
organizacion_id = pedimento_info.get('organizacion') organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A') pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
edoc_info = acuse_request.get('edoc', {})
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}") if self.request.retries == 0:
loop.run_until_complete( logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}")
register_task( loop.run_until_complete(
task_id=task_id, register_task(
status="submitted", task_id=task_id,
message=f"Iniciando proceso de acuse para pedimento {pedimento_app}", status="submitted",
pedimento_id=pedimento_id, message=f"Iniciando proceso de acuse para pedimento {pedimento_app}",
organizacion_id=organizacion_id, pedimento_id=pedimento_id,
servicio=6 # 6 corresponde a "Acuse" organizacion_id=organizacion_id,
servicio=6 # 6 corresponde a "Acuse"
)
) )
)
# Esperar un momento breve para asegurar que el registro se complete # Esperar un momento breve para asegurar que el registro se complete
import time import time
@@ -75,6 +82,22 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
# En caso de error, actualizar estado # En caso de error, actualizar estado
error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}"
logging.error(error_message) logging.error(error_message)
# Reintento interno del worker para fallos transitorios (red, timeout):
# mismo intento orquestado, NO incrementa el contador del backend
if not isinstance(e, HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Fallo definitivo de este intento: registrar el detalle en el registro de
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
try:
loop.run_until_complete(
marcar_error_acuse(edoc_info, pedimento_info, error_message, definitivo=False)
)
except Exception as report_error:
logging.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
try: try:
loop.run_until_complete( loop.run_until_complete(
update_task( update_task(
@@ -88,6 +111,6 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
) )
except Exception as update_error: except Exception as update_error:
logging.error(f"Error al actualizar estado de tarea: {update_error}") logging.error(f"Error al actualizar estado de tarea: {update_error}")
raise return {"status": "failed", "message": error_message}
finally: finally:
loop.close() loop.close()

View File

@@ -91,12 +91,13 @@ async def consume_ws_get_cove(**kwargs):
# Enviar documento de request a EFC # Enviar documento de request a EFC
try: try:
file_name_request = f"vu_COVE_{pedimento_app}_{cove}_REQUEST.xml" file_name_request = f"vu_COVE_{pedimento_app}_{cove}_REQUEST.xml"
document_response = await coves_rest_controller.post_document( document_response = await coves_rest_controller.post_or_update_document(
soap_response=soap_xml, soap_response=soap_xml,
organizacion=kwargs.get('pedimento').get('organizacion'), organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'), pedimento=kwargs.get('pedimento').get('id'),
file_name=file_name_request, file_name=file_name_request,
document_type=19, document_type=19,
identifier=cove,
) )
except Exception as e: except Exception as e:
logger.error(f"Error al enviar documento request: {e}") logger.error(f"Error al enviar documento request: {e}")
@@ -117,12 +118,13 @@ async def consume_ws_get_cove(**kwargs):
raise Exception("No se recibió respuesta del servicio SOAP") raise Exception("No se recibió respuesta del servicio SOAP")
if soap_error(soap_response): if soap_error(soap_response):
document_response = await coves_rest_controller.post_document( document_response = await coves_rest_controller.post_or_update_document(
soap_response=soap_response, soap_response=soap_response,
organizacion=kwargs.get('pedimento').get('organizacion'), organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'), pedimento=kwargs.get('pedimento').get('id'),
file_name=f"vu_COVE_{pedimento_app}_{cove}_ERROR.xml", file_name=f"vu_COVE_{pedimento_app}_{cove}_ERROR.xml",
document_type=20, document_type=20,
identifier=cove,
) )
raise Exception("Error en la respuesta del servicio SOAP") raise Exception("Error en la respuesta del servicio SOAP")
@@ -132,17 +134,25 @@ async def consume_ws_get_cove(**kwargs):
# Enviar documento # Enviar documento
_file_name = f"vu_COVE_{pedimento_app}_{cove}.xml" _file_name = f"vu_COVE_{pedimento_app}_{cove}.xml"
try: try:
document_response = await coves_rest_controller.post_document( document_response = await coves_rest_controller.post_or_update_document(
soap_response=soap_response, soap_response=soap_response,
organizacion=kwargs.get('pedimento').get('organizacion'), organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'), pedimento=kwargs.get('pedimento').get('id'),
file_name=_file_name, file_name=_file_name,
document_type=8, document_type=8,
identifier=cove,
) )
except Exception as e: except Exception as e:
logger.error(f"Error detectado en la respuesta SOAP: {str(e)}") logger.error(f"Error detectado en la respuesta SOAP: {str(e)}")
raise Exception(f"Error en la respuesta SOAP: {str(e)}") raise Exception(f"Error en la respuesta SOAP: {str(e)}")
# Validar que el documento quedó persistido en la API antes de marcar el
# COVE como descargado (T2026-05-027): post_or_update_document retorna
# None en error sin lanzar excepción
if not document_response or document_response.get("id") is None:
logger.error(f"No se pudo persistir el COVE {cove} en la API; el estatus no se actualiza")
raise Exception(f"No se pudo persistir el COVE {cove} en la API")
logger.info("Documento enviado, actualizando status de COVE") logger.info("Documento enviado, actualizando status de COVE")
# Actualizar status del COVE # Actualizar status del COVE
@@ -205,12 +215,13 @@ async def consume_ws_get_acuse_cove(**kwargs):
# Enviar documento de request a EFC # Enviar documento de request a EFC
try: try:
file_name_request = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_REQUEST.xml" file_name_request = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_REQUEST.xml"
document_response = await coves_rest_controller.post_document( document_response = await coves_rest_controller.post_or_update_document(
soap_response=soap_xml, soap_response=soap_xml,
organizacion=kwargs.get('pedimento').get('organizacion'), organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'), pedimento=kwargs.get('pedimento').get('id'),
file_name=file_name_request, file_name=file_name_request,
document_type=23, document_type=23,
identifier=kwargs['cove'].get('cove', 'N/A'),
) )
except Exception as e: except Exception as e:
logger.error(f"Error al enviar documento request de acuse cove: {e}") logger.error(f"Error al enviar documento request de acuse cove: {e}")
@@ -251,12 +262,13 @@ async def consume_ws_get_acuse_cove(**kwargs):
if soap_error(response): if soap_error(response):
error_file_name = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_ERROR.xml" error_file_name = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_ERROR.xml"
try: try:
rest_response = await coves_rest_controller.post_document( rest_response = await coves_rest_controller.post_or_update_document(
soap_response=response, soap_response=response,
organizacion=kwargs.get('pedimento').get('organizacion'), organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'), pedimento=kwargs.get('pedimento').get('id'),
file_name=error_file_name, file_name=error_file_name,
document_type=24, document_type=24,
identifier=kwargs['cove'].get('cove', 'N/A'),
) )
except Exception as e: except Exception as e:
logger.error(f"Error al guardar respuesta SOAP errónea: {e}") logger.error(f"Error al guardar respuesta SOAP errónea: {e}")
@@ -314,14 +326,25 @@ async def consume_ws_get_acuse_cove(**kwargs):
organizacion = pedimento.get("organizacion", None) organizacion = pedimento.get("organizacion", None)
pedimento_id = pedimento.get("id", None) pedimento_id = pedimento.get("id", None)
rest_response = await coves_rest_controller.post_document( cove_identifier = kwargs['cove'].get('cove', '')
rest_response = await coves_rest_controller.post_or_update_document(
binary_content=pdf_bytes, binary_content=pdf_bytes,
organizacion=organizacion, organizacion=organizacion,
pedimento=pedimento_id, pedimento=pedimento_id,
file_name=_file_name, file_name=_file_name,
document_type=7 document_type=7,
identifier=cove_identifier,
) )
# Validar la subida antes de marcar el acuse como descargado (T2026-05-027):
# post_or_update_document retorna None en error sin lanzar excepción
if not rest_response or rest_response.get("id") is None:
logger.error("Error al enviar el acuse de COVE a la API interna; el estatus no se actualiza")
raise HTTPException(
status_code=500,
detail="No se pudo persistir el acuse del COVE en la API; el estatus no se actualiza"
)
acuse_status = await change_acuse_status( acuse_status = await change_acuse_status(
cove=kwargs.get('cove'), cove=kwargs.get('cove'),
status=True, status=True,
@@ -500,6 +523,7 @@ async def change_cove_status(cove: dict, status: bool, pedimento: dict):
data = { data = {
"id": cove.get("id"), "id": cove.get("id"),
"cove_descargado": status, "cove_descargado": status,
"cove_estado": "descargado" if status else "pendiente",
"numero_cove": cove.get("cove"), "numero_cove": cove.get("cove"),
"pedimento": pedimento.get("id"), "pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"), "organizacion": pedimento.get("organizacion"),
@@ -507,6 +531,11 @@ async def change_cove_status(cove: dict, status: bool, pedimento: dict):
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data) response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
if response is None:
logger.error(f"No se pudo actualizar el estatus del COVE {cove.get('cove')} en la API")
raise Exception(f"Fallo al actualizar el estatus del COVE {cove.get('cove')}")
return response return response
@@ -514,14 +543,46 @@ async def change_acuse_status(cove: dict, status: bool, pedimento: dict):
data = { data = {
"id": cove.get("id"), "id": cove.get("id"),
"acuse_cove_descargado": status, "acuse_cove_descargado": status,
"acuse_cove_estado": "descargado" if status else "pendiente",
"numero_cove": cove.get("cove"), "numero_cove": cove.get("cove"),
"pedimento": pedimento.get("id"), "pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"), "organizacion": pedimento.get("organizacion"),
} }
print(data)
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data) response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
if response is None:
logger.error(f"No se pudo actualizar el estatus del acuse del COVE {cove.get('cove')} en la API")
raise Exception(f"Fallo al actualizar el estatus del acuse del COVE {cove.get('cove')}")
return response
async def marcar_error_cove(cove: dict, pedimento: dict, mensaje: str, acuse: bool = False, definitivo: bool = False):
"""
Reporta un fallo de descarga al registro de negocio (T2026-05-027).
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
la transición a 'error'.
- definitivo=True (fallo permanente: VUCEM responde que no existe, credencial
rechazada): transiciona de inmediato a 'error'; queda fuera del ciclo
automático y solo el reproceso manual o el reset lo reincorporan.
"""
campo = "acuse_cove_estado" if acuse else "cove_estado"
data = {
"id": cove.get("id"),
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
"numero_cove": cove.get("cove"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
}
if definitivo:
data[campo] = "error"
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
if response is None:
logger.error(f"No se pudo registrar el error del COVE {cove.get('cove')} en la API")
return response return response

View File

@@ -3,13 +3,18 @@ import logging
from celery import Celery from celery import Celery
from celery_app import celery_app from celery_app import celery_app
from typing import Dict, Any from typing import Dict, Any
from fastapi import HTTPException as _HTTPException
from .services import consume_ws_get_cove, consume_ws_get_acuse_cove from .services import consume_ws_get_cove, consume_ws_get_acuse_cove, marcar_error_cove
from api.api_v2.modules.tasks.services import update_task, register_task from api.api_v2.modules.tasks.services import update_task, register_task
# Logger para el módulo # Logger para el módulo
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
# incrementan el contador de intentos del backend (T2026-05-027)
WORKER_MAX_RETRIES = 2
@celery_app.task(bind=True) @celery_app.task(bind=True)
def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
@@ -22,21 +27,23 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
organizacion_id = pedimento_info.get('organizacion') organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A') pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
cove_info = cove_request.get('cove', {}) cove_info = cove_request.get('cove', {})
cove_number = cove_info.get('numero_cove', 'N/A') # aqui se cambio de cove_number a 'cove' porque no esta asi en el mapeo
cove_number = cove_info.get('cove', 'N/A')
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
logger.info(f"[COVE] Registrando inicio de tarea {task_id}") if self.request.retries == 0:
asyncio.run( logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
register_task( asyncio.run(
task_id=task_id, register_task(
status="submitted", task_id=task_id,
message=f"Iniciando proceso de COVE {cove_number} para pedimento {pedimento_app}", status="submitted",
pedimento_id=pedimento_id, message=f"Iniciando proceso de COVE {cove_number} para pedimento {pedimento_app}",
organizacion_id=organizacion_id, pedimento_id=pedimento_id,
servicio=8 # 8 corresponde a "Cove" organizacion_id=organizacion_id,
servicio=8 # 8 corresponde a "Cove"
)
) )
)
logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}") logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}")
asyncio.run( asyncio.run(
@@ -69,6 +76,21 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message) logger.error(error_message)
# Reintento interno del worker para fallos transitorios (red, timeout):
# mismo intento orquestado, NO incrementa el contador del backend
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Fallo definitivo de este intento: registrar el detalle en el registro de
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
try:
asyncio.run(
marcar_error_cove(cove_info, pedimento_info, error_message, acuse=False, definitivo=False)
)
except Exception as report_error:
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
try: try:
asyncio.run( asyncio.run(
update_task( update_task(
@@ -83,6 +105,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
except Exception as update_error: except Exception as update_error:
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}") logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
if isinstance(e, _HTTPException):
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
raise raise
@@ -97,21 +121,23 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
organizacion_id = pedimento_info.get('organizacion') organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A') pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
cove_info = cove_request.get('cove', {}) cove_info = cove_request.get('cove', {})
cove_number = cove_info.get('numero_cove', 'N/A') # se cambio el cove_number, ya que no existe directamente en el mapeo, se conoce solo como 'cove'
cove_number = cove_info.get('cove', 'N/A')
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}") if self.request.retries == 0:
asyncio.run( logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
register_task( asyncio.run(
task_id=task_id, register_task(
status="submitted", task_id=task_id,
message=f"Iniciando proceso de acuse de COVE {cove_number} para pedimento {pedimento_app}", status="submitted",
pedimento_id=pedimento_id, message=f"Iniciando proceso de acuse de COVE {cove_number} para pedimento {pedimento_app}",
organizacion_id=organizacion_id, pedimento_id=pedimento_id,
servicio=9 # 9 corresponde a "Acuse Cove" organizacion_id=organizacion_id,
servicio=9 # 9 corresponde a "Acuse Cove"
)
) )
)
logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}") logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}")
asyncio.run( asyncio.run(
@@ -144,6 +170,21 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message) logger.error(error_message)
# Reintento interno del worker para fallos transitorios (red, timeout):
# mismo intento orquestado, NO incrementa el contador del backend
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Fallo definitivo de este intento: registrar el detalle en el registro de
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
try:
asyncio.run(
marcar_error_cove(cove_info, pedimento_info, error_message, acuse=True, definitivo=False)
)
except Exception as report_error:
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
try: try:
asyncio.run( asyncio.run(
update_task( update_task(
@@ -158,4 +199,6 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
except Exception as update_error: except Exception as update_error:
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}") logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
if isinstance(e, _HTTPException):
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
raise raise

View File

@@ -48,12 +48,13 @@ async def obtener_edoc(**kwargs):
file_name_request = f"VU_ED_{pedimento_app}_{numero_documento}_REQUEST.xml" file_name_request = f"VU_ED_{pedimento_app}_{numero_documento}_REQUEST.xml"
document_response = await edocs_rest_controller.post_document( document_response = await edocs_rest_controller.post_or_update_document(
soap_response=soap_xml, soap_response=soap_xml,
organizacion=organizacion_efc, organizacion=organizacion_efc,
pedimento=pedimento_id_efc, pedimento=pedimento_id_efc,
file_name=file_name_request, file_name=file_name_request,
document_type=21 # Tipo de documento para request de e-document, document_type=21,
identifier=numero_documento,
) )
except Exception as e: except Exception as e:
@@ -97,6 +98,21 @@ async def obtener_edoc(**kwargs):
if soap_error(response): if soap_error(response):
logger.error("Respuesta SOAP contiene error de VUCEM") logger.error("Respuesta SOAP contiene error de VUCEM")
_pedimento_efc = kwargs.get('pedimento', {})
_file_name_error = f"VU_ED_{_pedimento_efc.get('pedimento_app', 'N/A')}_{numero_documento}_RESPONSE_ERROR.xml"
logger.info(f"Guardando RESPONSE_ERROR doc_type=22: file={_file_name_error}, organizacion={_pedimento_efc.get('organizacion')}, pedimento={_pedimento_efc.get('id')}")
_doc_result = await edocs_rest_controller.post_or_update_document(
soap_response=response.text,
organizacion=_pedimento_efc.get('organizacion'),
pedimento=_pedimento_efc.get('id'),
file_name=_file_name_error,
document_type=22,
identifier=numero_documento,
)
if _doc_result is None:
logger.error("post_or_update_document retornó None para RESPONSE_ERROR doc_type=22 — archivo físico sin registro en BD")
else:
logger.info(f"RESPONSE_ERROR registrado en BD: id={_doc_result.get('id')}, document_type={_doc_result.get('document_type')}")
raise HTTPException( raise HTTPException(
status_code=500, status_code=500,
detail=create_error_response( detail=create_error_response(
@@ -109,8 +125,23 @@ async def obtener_edoc(**kwargs):
try: try:
edoc_base64 = extract_pdf_bytes_from_xml_content(response.text) edoc_base64 = extract_pdf_bytes_from_xml_content(response.text)
except ValueError as ve: except Exception as ve:
logger.error(f"Error extrayendo contenido del XML: {ve}") logger.error(f"Error extrayendo contenido del XML: {ve}")
_pedimento_efc = kwargs.get('pedimento', {})
_file_name_error = f"VU_ED_{_pedimento_efc.get('pedimento_app', 'N/A')}_{numero_documento}_RESPONSE_ERROR.xml"
logger.info(f"Guardando RESPONSE_ERROR doc_type=22 (parse error): file={_file_name_error}")
_doc_result = await edocs_rest_controller.post_or_update_document(
soap_response=response.text,
organizacion=_pedimento_efc.get('organizacion'),
pedimento=_pedimento_efc.get('id'),
file_name=_file_name_error,
document_type=22,
identifier=numero_documento,
)
if _doc_result is None:
logger.error("post_document retornó None para RESPONSE_ERROR doc_type=22 (parse error)")
else:
logger.info(f"RESPONSE_ERROR registrado en BD: id={_doc_result.get('id')}, document_type={_doc_result.get('document_type')}")
raise HTTPException( raise HTTPException(
status_code=500, status_code=500,
detail=create_error_response( detail=create_error_response(
@@ -172,14 +203,17 @@ async def obtener_edoc(**kwargs):
# No guardaremos el archivo localmente por seguridad # No guardaremos el archivo localmente por seguridad
logger.debug(f"Procesando documento {numero_documento} para pedimento {pedimento_id}") logger.debug(f"Procesando documento {numero_documento} para pedimento {pedimento_id}")
rest_response = await edocs_rest_controller.post_document( rest_response = await edocs_rest_controller.post_or_update_document(
binary_content=pdf_bytes, binary_content=pdf_bytes,
organizacion=organizacion, organizacion=organizacion,
pedimento=pedimento_id, pedimento=pedimento_id,
file_name=_file_name, file_name=_file_name,
document_type=5 document_type=5,
identifier=numero_documento,
) )
print(f"rest_response >>>> {rest_response}")
if rest_response is None: if rest_response is None:
logger.error("Error al enviar el documento a la API interna") logger.error("Error al enviar el documento a la API interna")
raise HTTPException( raise HTTPException(
@@ -239,9 +273,11 @@ async def obtener_edoc(**kwargs):
async def change_edocument_status(edoc: dict, status: bool, pedimento: dict): async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
# estaba acualizando mal el status de descarga, actualizaba otro campo que no le correspondia
data = { data = {
"id": edoc.get("id"), "id": edoc.get("id"),
"acuse_descargado": status, "edocument_descargado": status,
"edocument_estado": "descargado" if status else "pendiente",
"numero_edocument": edoc.get("numero_edocument"), "numero_edocument": edoc.get("numero_edocument"),
"pedimento": pedimento.get("id"), "pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"), "organizacion": pedimento.get("organizacion"),
@@ -249,45 +285,75 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data) response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
if response is None:
logger.error(f"No se pudo actualizar el estatus del EDocument {edoc.get('numero_edocument')} en la API")
raise Exception(f"Fallo al actualizar el estatus del EDocument {edoc.get('numero_edocument')}")
return response return response
async def marcar_error_edocument(edoc: dict, pedimento: dict, mensaje: str, definitivo: bool = False):
"""
Reporta un fallo de descarga al registro de negocio (T2026-05-027).
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
la transición a 'error'.
- definitivo=True (fallo permanente): transiciona de inmediato a 'error';
queda fuera del ciclo automático, solo reproceso manual o reset.
"""
data = {
"id": edoc.get("id"),
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
"numero_edocument": edoc.get("numero_edocument"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
}
if definitivo:
data["edocument_estado"] = "error"
response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
if response is None:
logger.error(f"No se pudo registrar el error del EDocument {edoc.get('numero_edocument')} en la API")
return response
NS = {
's': 'http://schemas.xmlsoap.org/soap/envelope/',
't': 'http://tempuri.org/',
'i': 'http://www.w3.org/2001/XMLSchema-instance'
}
# mejorar la extraccion de seccion File
def extract_pdf_bytes_from_xml_content(xml_content: str): def extract_pdf_bytes_from_xml_content(xml_content: str):
"""
Extrae el PDF y metadatos desde un string XML.
"""
root = ET.fromstring(xml_content) root = ET.fromstring(xml_content)
file_elem = root.find('.//File')
if file_elem is None: errores = root.find('.//t:Errores', NS)
for elem in root.iter(): tiene_error = root.find('.//t:TieneError', NS)
if elem.tag.endswith('File') and elem.text:
file_elem = elem if tiene_error is not None and tiene_error.text == 'true':
break err_msg = errores.text if errores is not None else 'Error desconocido'
if file_elem is not None and file_elem.text and file_elem.text.strip(): raise Exception(f"VUCEM informa error: {err_msg}")
base64_data = file_elem.text.strip().replace('\n', '').replace('\r', '')
pdf_bytes = base64.b64decode(base64_data) file_elem = root.find('.//t:File', NS)
cadena_original = None if file_elem is None or file_elem.get('{http://www.w3.org/2001/XMLSchema-instance}nil') == 'true' or not file_elem.text:
sello_digital = None raise ValueError("No se encontró el tag <File> con contenido válido.")
cadena_elem = root.find('.//CadenaOriginal')
if cadena_elem is None: base64_data = file_elem.text.strip().replace('\n', '').replace('\r', '')
for elem in root.iter(): pdf_bytes = base64.b64decode(base64_data)
if elem.tag.endswith('CadenaOriginal') and elem.text:
cadena_elem = elem # Extraer CadenaOriginal y SelloDigital con namespaces
break cadena_original = None
if cadena_elem is not None and cadena_elem.text: sello_digital = None
cadena_original = cadena_elem.text.strip() cadena_elem = root.find('.//t:CadenaOriginal', NS)
sello_elem = root.find('.//SelloDigital') if cadena_elem is not None and cadena_elem.get('{http://www.w3.org/2001/XMLSchema-instance}nil') != 'true':
if sello_elem is None: cadena_original = cadena_elem.text.strip() if cadena_elem.text else None
for elem in root.iter(): sello_elem = root.find('.//t:SelloDigital', NS)
if elem.tag.endswith('SelloDigital') and elem.text: if sello_elem is not None and sello_elem.get('{http://www.w3.org/2001/XMLSchema-instance}nil') != 'true':
sello_elem = elem sello_digital = sello_elem.text.strip() if sello_elem.text else None
break
if sello_elem is not None and sello_elem.text: return {
sello_digital = sello_elem.text.strip() "pdf_bytes": pdf_bytes,
return { "cadena_original": cadena_original,
"pdf_bytes": pdf_bytes, "sello_digital": sello_digital
"cadena_original": cadena_original, }
"sello_digital": sello_digital
}
else:
raise ValueError("No se encontró el tag <File> con contenido válido. Verifique que el XML contiene el tag <File> con datos base64.")

View File

@@ -3,13 +3,18 @@ import logging
import time import time
from celery_app import celery_app from celery_app import celery_app
from typing import Dict from typing import Dict
from fastapi import HTTPException as _HTTPException
from .services import obtener_edoc from .services import obtener_edoc, marcar_error_edocument
from api.api_v2.modules.tasks.services import register_task, update_task from api.api_v2.modules.tasks.services import register_task, update_task
# Logger para el módulo # Logger para el módulo
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
# incrementan el contador de intentos del backend (T2026-05-027)
WORKER_MAX_RETRIES = 2
@celery_app.task(bind=True) @celery_app.task(bind=True)
def process_edoc_download_request(self, edoc_data: Dict) -> Dict: def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
""" """
@@ -21,24 +26,26 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
organizacion_id = pedimento_info.get('organizacion') organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A') pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
edoc_info = edoc_data.get('edoc', {}) edoc_info = edoc_data.get('edoc', {})
edoc_number = edoc_info.get('numero_edoc', 'N/A') # el mapeo de numero de documento no estaba correcto, se corrigio
edoc_number = edoc_info.get('numero_edocument', 'N/A')
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}") if self.request.retries == 0:
loop.run_until_complete( logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
register_task( loop.run_until_complete(
task_id=task_id, register_task(
status="submitted", task_id=task_id,
message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}", status="submitted",
pedimento_id=pedimento_id, message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}",
organizacion_id=organizacion_id, pedimento_id=pedimento_id,
servicio=3 # 3 corresponde a "E-document" organizacion_id=organizacion_id,
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
)
) )
)
# Esperar un momento breve para asegurar que el registro se complete # Esperar un momento breve para asegurar que el registro se complete
time.sleep(1) time.sleep(1)
@@ -52,7 +59,7 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
message=f"Descargando E-document {edoc_number} para pedimento {pedimento_app}", message=f"Descargando E-document {edoc_number} para pedimento {pedimento_app}",
pedimento_id=pedimento_id, pedimento_id=pedimento_id,
organizacion_id=organizacion_id, organizacion_id=organizacion_id,
servicio=3 servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
) )
) )
@@ -67,7 +74,7 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
message=f"E-document {edoc_number} descargado exitosamente para pedimento {pedimento_app}", message=f"E-document {edoc_number} descargado exitosamente para pedimento {pedimento_app}",
pedimento_id=pedimento_id, pedimento_id=pedimento_id,
organizacion_id=organizacion_id, organizacion_id=organizacion_id,
servicio=3 servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
) )
) )
@@ -78,6 +85,21 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message, exc_info=True) logger.error(error_message, exc_info=True)
# Reintento interno del worker para fallos transitorios (red, timeout):
# mismo intento orquestado, NO incrementa el contador del backend
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Fallo definitivo de este intento: registrar el detalle en el registro de
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
try:
loop.run_until_complete(
marcar_error_edocument(edoc_info, pedimento_info, error_message, definitivo=False)
)
except Exception as report_error:
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
try: try:
loop.run_until_complete( loop.run_until_complete(
update_task( update_task(
@@ -86,12 +108,14 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
message=error_message, message=error_message,
pedimento_id=pedimento_id, pedimento_id=pedimento_id,
organizacion_id=organizacion_id, organizacion_id=organizacion_id,
servicio=3 servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
) )
) )
except Exception as update_error: except Exception as update_error:
logger.error(f"Error actualizando estado de tarea: {update_error}") logger.error(f"Error actualizando estado de tarea: {update_error}")
if isinstance(e, _HTTPException):
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
raise raise
finally: finally:
# Cerrar el loop para liberar recursos # Cerrar el loop para liberar recursos

View File

@@ -59,12 +59,13 @@ async def consume_ws_get_partida(**kwargs):
file_name_request = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_REQUEST.xml" file_name_request = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_REQUEST.xml"
document_response = await partida_rest_controller.post_document( document_response = await partida_rest_controller.post_or_update_document(
soap_response=soap_xml, soap_response=soap_xml,
organizacion=kwargs.get('pedimento').get('organizacion'), organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'), pedimento=kwargs.get('pedimento').get('id'),
file_name=file_name_request, file_name=file_name_request,
document_type=17, # Tipo de documento para petición de partidas document_type=17,
identifier=f"_PT_{pedimento_app}_{partida.get('numero', '')}_",
) )
except Exception as e: except Exception as e:
logger.error(f"Error al enviar documento request: {e}") logger.error(f"Error al enviar documento request: {e}")
@@ -94,26 +95,37 @@ async def consume_ws_get_partida(**kwargs):
) )
) )
# Una partida solo cuenta como descargada si la respuesta es una
# consultarPartidaRespuesta real sin <tieneError>true</tieneError>.
# Cualquier otro contenido (eco de la petición, fault sin tieneError,
# HTML de proxy, etc.) se guarda como ERROR y no marca descargado.
motivo_error = None
if soap_error(soap_response): if soap_error(soap_response):
motivo_error = "La respuesta contiene un error de VUCEM"
elif "consultarpartidarespuesta" not in (soap_response.text or "").lower():
motivo_error = "La respuesta no contiene el nodo consultarPartidaRespuesta"
if motivo_error:
error_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_ERROR.xml" error_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_ERROR.xml"
try: try:
document_response = await partida_rest_controller.post_document( document_response = await partida_rest_controller.post_or_update_document(
soap_response=soap_response, soap_response=soap_response,
organizacion=kwargs.get('pedimento').get('organizacion'), organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'), pedimento=kwargs.get('pedimento').get('id'),
file_name=error_file_name, file_name=error_file_name,
document_type=18, document_type=18,
identifier=f"_PT_{pedimento_app}_{partida.get('numero', '')}_",
) )
except Exception as e: except Exception as e:
logger.error(f"Error al guardar la respuesta de error: {e}") logger.error(f"Error al guardar la respuesta de error: {e}")
# Continuamos con el error original # Continuamos con el error original
logger.error("Error en la respuesta del servicio SOAP") logger.error(f"Error en la respuesta del servicio SOAP: {motivo_error}")
raise HTTPException( raise HTTPException(
status_code=500, status_code=500,
detail=create_error_response( detail=create_error_response(
message="Error en la respuesta del servicio SOAP", message="Error en la respuesta del servicio SOAP",
errors=["La respuesta contiene un error de VUCEM"], errors=[motivo_error],
data={"soap_response": soap_response.text[:500] if hasattr(soap_response, 'text') else None}, data={"soap_response": soap_response.text[:500] if hasattr(soap_response, 'text') else None},
metadata={ metadata={
"partida_numero": partida.get('numero'), "partida_numero": partida.get('numero'),
@@ -127,12 +139,13 @@ async def consume_ws_get_partida(**kwargs):
# Enviar documento # Enviar documento
_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}.xml" _file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}.xml"
try: try:
document_response = await partida_rest_controller.post_document( document_response = await partida_rest_controller.post_or_update_document(
soap_response=soap_response, soap_response=soap_response,
organizacion=kwargs.get('pedimento').get('organizacion'), organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'), pedimento=kwargs.get('pedimento').get('id'),
file_name=_file_name, file_name=_file_name,
document_type=1, document_type=1,
identifier=f"_PT_{pedimento_app}_{partida.get('numero', '')}_",
) )
except Exception as e: except Exception as e:
logger.error(f"Error al enviar documento: {e}") logger.error(f"Error al enviar documento: {e}")
@@ -148,6 +161,22 @@ async def consume_ws_get_partida(**kwargs):
) )
) )
# post_or_update_document retorna None en error (no lanza excepción);
# sin documento guardado la partida NO debe marcarse como descargada.
if not document_response:
logger.error(f"El documento de la partida no se guardó, no se marca descargado: {_file_name}")
raise HTTPException(
status_code=500,
detail=create_error_response(
message="Error al guardar el documento de la partida",
errors=["El API no confirmó el guardado del documento"],
metadata={
"file_name": _file_name,
"partida_numero": partida.get('numero')
}
)
)
logger.info("Documento enviado, actualizando status de Partida") logger.info("Documento enviado, actualizando status de Partida")
# Actualizar status de la partida # Actualizar status de la partida

View File

@@ -4,6 +4,7 @@ import time
from celery import Celery from celery import Celery
from celery_app import celery_app from celery_app import celery_app
from typing import Dict, Any from typing import Dict, Any
from fastapi import HTTPException as _HTTPException
from .services import consume_ws_get_partida from .services import consume_ws_get_partida
from api.api_v2.modules.tasks.services import register_task, update_task from api.api_v2.modules.tasks.services import register_task, update_task
@@ -89,6 +90,8 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
) )
except Exception as update_error: except Exception as update_error:
logger.error(f"Error al actualizar estado de tarea: {update_error}") logger.error(f"Error al actualizar estado de tarea: {update_error}")
if isinstance(e, _HTTPException):
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
raise raise
finally: finally:
# Limpiar el event loop # Limpiar el event loop

View File

@@ -259,13 +259,14 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
def _remesas(self, root: ET.Element) -> bool: def _remesas(self, root: ET.Element) -> bool:
""" """
Método para verificar si el pedimento tiene remesas. Método para verificar si el pedimento tiene remesas.
Busca identificadores con clave 'RC' (REMESAS DE CONSOLIDADO). Busca identificadores con clave 'PC' (PEDIMENTO CONSOLIDADO)
o 'RC' (REMESAS DE CONSOLIDADO).
Args: Args:
root: Elemento raíz del XML. root: Elemento raíz del XML.
Returns: Returns:
True si encuentra identificadores con clave 'RC', False en caso contrario. True si encuentra identificadores con clave 'PC' o 'RC', False en caso contrario.
""" """
namespaces = { namespaces = {
'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto', 'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto',
@@ -281,8 +282,8 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces) clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces)
clave = clave_elem.text if clave_elem is not None else None clave = clave_elem.text if clave_elem is not None else None
# Si encontramos una clave 'RC', el pedimento tiene remesas # PC (consolidado) o RC (remesas de consolidado) indican remesas
if clave == 'RC': if clave in ('PC', 'RC'):
return True return True
except Exception as e: except Exception as e:
@@ -290,7 +291,7 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
print(f"Error procesando identificador para remesas: {e}") print(f"Error procesando identificador para remesas: {e}")
continue continue
print("No se encontraron remesas (sin identificadores RC)") print("No se encontraron remesas (sin identificadores PC/RC)")
return False return False
def _get_tipo_operacion(self, root: ET.Element) -> str: def _get_tipo_operacion(self, root: ET.Element) -> str:

View File

@@ -64,7 +64,7 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
file_name_request = f"VU_PC_{pedimento_data.get('pedimento_app', 'unknown')}_REQUEST.xml" file_name_request = f"VU_PC_{pedimento_data.get('pedimento_app', 'unknown')}_REQUEST.xml"
document_response = await pedimento_rest_controller.post_document( document_response = await pedimento_rest_controller.post_or_update_document(
soap_response=soap_xml, soap_response=soap_xml,
organizacion=pedimento_data.get('organizacion'), organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'), pedimento=pedimento_data.get('id'),
@@ -90,8 +90,8 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
if soap_error(soap_response): if soap_error(soap_response):
logger.error(f"Error en respuesta SOAP: {soap_response.text if hasattr(soap_response, 'text') else 'Sin detalles'}") logger.error(f"Error en respuesta SOAP: {soap_response.text if hasattr(soap_response, 'text') else 'Sin detalles'}")
document_response = await pedimento_rest_controller.post_document( document_response = await pedimento_rest_controller.post_or_update_document(
soap_response=None, soap_response=soap_response,
organizacion=pedimento_data.get('organizacion'), organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'), pedimento=pedimento_data.get('id'),
file_name=f"vu_PC_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml", file_name=f"vu_PC_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml",
@@ -111,7 +111,7 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
# Enviar documento # Enviar documento
try: try:
document_response = await pedimento_rest_controller.post_document( document_response = await pedimento_rest_controller.post_or_update_document(
soap_response=soap_response, soap_response=soap_response,
organizacion=pedimento_data.get('organizacion'), organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'), pedimento=pedimento_data.get('id'),

View File

@@ -2,6 +2,7 @@ from celery_app import celery_app
from .services import put_pedimento_data from .services import put_pedimento_data
import asyncio import asyncio
import logging import logging
from fastapi import HTTPException as _HTTPException
from ..tasks.services import register_task, update_task from ..tasks.services import register_task, update_task
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -86,6 +87,8 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
logger.error(f"Error actualizando estado de tarea: {update_error}") logger.error(f"Error actualizando estado de tarea: {update_error}")
# Re-lanzar la excepción para que Celery la marque como fallida # Re-lanzar la excepción para que Celery la marque como fallida
if isinstance(e, _HTTPException):
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
raise raise
finally: finally:
loop.close() loop.close()

View File

@@ -64,7 +64,7 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
try: try:
file_name_request = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_REQUEST.xml" file_name_request = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_REQUEST.xml"
document_response = await remesa_rest_controller.post_document( document_response = await remesa_rest_controller.post_or_update_document(
soap_response=soap_xml, soap_response=soap_xml,
organizacion=pedimento_data.get('organizacion'), organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'), pedimento=pedimento_data.get('id'),
@@ -87,10 +87,35 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
) )
# Generar nombre de archivo # Generar nombre de archivo
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml" file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml"
# "No hay información" NO es un error — VUCEM confirma que el pedimento no tiene remesas.
# No se crea documento de error; se corrige el flag en el pedimento.
_SIN_REMESAS = '<ns2:mensaje>No hay información para la búsqueda solicitada</ns2:mensaje>'
if hasattr(soap_response, 'text') and _SIN_REMESAS in soap_response.text:
pedimento_id = pedimento_data.get('id')
logger.info(
f"Pedimento {pedimento_data.get('pedimento_app')} no tiene remesas "
f"(confirmado por VUCEM). Corrigiendo flag remesas=False."
)
try:
await remesa_rest_controller._make_request_async(
'PATCH',
f'customs/pedimentos/{pedimento_id}/',
data={'remesas': False},
)
except Exception as patch_err:
logger.warning(f"No se pudo actualizar remesas=False para {pedimento_id}: {patch_err}")
return create_service_response(
message="Pedimento sin remesas confirmado por VUCEM",
data={"remesas": False},
metadata={"pedimento_app": pedimento_data.get('pedimento_app')},
)
if soap_error(soap_response): if soap_error(soap_response):
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml" file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
document_response = await remesa_rest_controller.post_document( document_response = await remesa_rest_controller.post_or_update_document(
soap_response=soap_response, soap_response=soap_response,
organizacion=pedimento_data.get('organizacion'), organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'), pedimento=pedimento_data.get('id'),
@@ -98,25 +123,19 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
document_type=16, document_type=16,
) )
# Aquí necesitamos extraer el mensaje de error real
error_message = "Error en la respuesta del servicio SOAP" error_message = "Error en la respuesta del servicio SOAP"
# Intentar extraer mensaje de error del XML de respuesta
if hasattr(soap_response, 'text') and soap_response.text: if hasattr(soap_response, 'text') and soap_response.text:
try: try:
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
root = ET.fromstring(soap_response.text) root = ET.fromstring(soap_response.text)
# Buscar mensajes de error comunes en respuestas SOAP de VUCEM
# Esto puede variar según el servicio, pero comúnmente buscan:
for fault in root.findall('.//{http://schemas.xmlsoap.org/soap/envelope/}Fault'): for fault in root.findall('.//{http://schemas.xmlsoap.org/soap/envelope/}Fault'):
faultcode = fault.find('.//faultcode')
faultstring = fault.find('.//faultstring') faultstring = fault.find('.//faultstring')
if faultstring is not None and faultstring.text: if faultstring is not None and faultstring.text:
error_message = faultstring.text error_message = faultstring.text
break break
# También podría estar en una estructura de error específica de VUCEM
for error in root.findall('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}error'): for error in root.findall('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}error'):
msg = error.find('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}message') msg = error.find('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}message')
if msg is not None and msg.text: if msg is not None and msg.text:
@@ -126,7 +145,6 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
except Exception as parse_error: except Exception as parse_error:
logger.error(f"Error al parsear respuesta SOAP para extraer mensaje: {parse_error}") logger.error(f"Error al parsear respuesta SOAP para extraer mensaje: {parse_error}")
# Lanzar excepción con el mensaje de error real
raise HTTPException( raise HTTPException(
status_code=500, status_code=500,
detail=f"Error en la respuesta del servicio SOAP: {error_message}" detail=f"Error en la respuesta del servicio SOAP: {error_message}"
@@ -134,12 +152,12 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
# Enviar documento # Enviar documento
try: try:
document_response = await remesa_rest_controller.post_document( document_response = await remesa_rest_controller.post_or_update_document(
soap_response=soap_response, soap_response=soap_response,
organizacion=pedimento_data.get('organizacion'), organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'), pedimento=pedimento_data.get('id'),
file_name=file_name, file_name=file_name,
document_type=5, document_type=3,
) )
except Exception as e: except Exception as e:
@@ -149,12 +167,14 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
# Extraer datos del XML # Extraer datos del XML
try: try:
remesas_data = remesa_xml_scraper.extract_remesas(soap_response.text) remesas_data = remesa_xml_scraper.extract_remesas(soap_response.text)
logger.info(
f"Remesas extraídas para pedimento {pedimento_data.get('pedimento_app')}: "
f"{len(remesas_data)} COVEs encontrados → {remesas_data}"
)
except Exception as e: except Exception as e:
logger.error(f"Error al extraer datos XML: {e}") logger.error(f"Error al extraer datos XML: {e}")
raise HTTPException(status_code=500, detail="Error al procesar respuesta XML") raise HTTPException(status_code=500, detail="Error al procesar respuesta XML")
logger.info(f"Remesa procesada exitosamente: {pedimento_data.get('pedimento')}") logger.info(f"Remesa procesada exitosamente: {pedimento_data.get('pedimento')}")
return create_service_response( return create_service_response(
@@ -202,12 +222,14 @@ async def post_remesa_data(**kwargs) -> Dict[str, Any]:
# Obtener datos del servicio web # Obtener datos del servicio web
try: try:
ws_data = await obtener_remesa(**kwargs) ws_data = await obtener_remesa(**kwargs)
result["documento"] = ws_data.get("documento", None) # obtener_remesa devuelve create_service_response → los datos están bajo 'data'
xml_content = ws_data.get('xml_content', {}) ws_data_content = ws_data.get('data', {})
result["documento"] = ws_data_content.get("documento", None)
xml_content = ws_data_content.get('xml_content', [])
result["xml_content"] = xml_content result["xml_content"] = xml_content
if not xml_content: if not xml_content:
logger.warning("No se obtuvo contenido XML del servicio web") logger.warning("No se obtuvo contenido XML del servicio web (lista de remesas vacía)")
return result return result
except HTTPException: except HTTPException:
@@ -259,17 +281,8 @@ async def _process_coves_safely(kwargs: Dict[str, Any], xml_content) -> Optional
async def _post_coves(pedimento_data: Dict[str, Any], coves: List[Dict[str, str]]) -> List[Dict[str, Any]]: async def _post_coves(pedimento_data: Dict[str, Any], coves: List[Dict[str, str]]) -> List[Dict[str, Any]]:
""" """
Envía COVEs al sistema REST. Crea COVEs en el sistema REST usando patrón get-or-create para evitar duplicados.
Si el COVE ya existe (creado por pedimento completo u otra remesa), lo reutiliza.
Args:
pedimento_data: Datos del pedimento
coves: Lista de diccionarios con datos de COVE (comprobanteVE, remesaAgente, remesaSA)
Returns:
Lista de respuestas exitosas
Raises:
HTTPException: Si no se pudo procesar ningún COVE
""" """
if not coves: if not coves:
return [] return []
@@ -278,23 +291,38 @@ async def _post_coves(pedimento_data: Dict[str, Any], coves: List[Dict[str, str]
errors = [] errors = []
for cove in coves: for cove in coves:
# Extraer el número de COVE del diccionario
numero_cove = cove.get('comprobanteVE') numero_cove = cove.get('comprobanteVE')
if not numero_cove: if not numero_cove:
logger.warning(f"COVE sin comprobanteVE encontrado: {cove}") logger.warning(f"COVE sin comprobanteVE encontrado: {cove}")
continue continue
document_data = {
'numero_cove': numero_cove,
'organizacion': pedimento_data.get('organizacion'),
'pedimento': pedimento_data.get('id')
}
try: try:
# Verificar si el COVE ya existe antes de intentar crearlo
existing = await remesa_rest_controller._make_request_async(
'GET', f'customs/coves/?numero_cove={numero_cove}'
)
if existing:
results = existing.get('results', existing) if isinstance(existing, dict) else existing
if isinstance(results, list) and results:
logger.info(f"COVE {numero_cove} ya existe (id={results[0].get('id')}), omitiendo creación")
responses.append(results[0])
continue
# No existe → crear
document_data = {
'numero_cove': numero_cove,
'organizacion': pedimento_data.get('organizacion'),
'pedimento': pedimento_data.get('id'),
}
response = await remesa_rest_controller.post_cove(document_data) response = await remesa_rest_controller.post_cove(document_data)
if response: if response:
responses.append(response) responses.append(response)
logger.debug(f"COVE {numero_cove} procesado exitosamente") logger.info(f"COVE {numero_cove} creado exitosamente")
else:
error_msg = f"POST de COVE {numero_cove} devolvió respuesta vacía"
logger.warning(error_msg)
errors.append(error_msg)
except Exception as e: except Exception as e:
error_msg = f"Error al procesar COVE {numero_cove}: {str(e)}" error_msg = f"Error al procesar COVE {numero_cove}: {str(e)}"
logger.warning(error_msg) logger.warning(error_msg)

View File

@@ -2,6 +2,7 @@ from celery_app import celery_app
from .services import post_remesa_data from .services import post_remesa_data
import asyncio import asyncio
import logging import logging
from fastapi import HTTPException as _HTTPException
from ..tasks.services import register_task, update_task from ..tasks.services import register_task, update_task
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -19,9 +20,11 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
""" """
task_id = self.request.id task_id = self.request.id
servicio = 5 # Código para Pedimento Remesas servicio = 5 # Código para Pedimento Remesas
print(f"remesa_request >>>> {remesa_request}")
logger.info(f"remesa_request >>>> {remesa_request}")
pedimento_id = remesa_request.get('pedimento', {}).get('id') pedimento_id = remesa_request.get('pedimento', {}).get('id')
organizacion_id = remesa_request.get('pedimento', {}).get('organizacion') organizacion_id = remesa_request.get('pedimento', {}).get('organizacion')
remesa_num = remesa_request.get('remesa', 'N/A') remesa_num = remesa_request.get('pedimento', {}).get('pedimento_app', 'N/A')
# Crear un NUEVO event loop para esta tarea (evita problemas de loop cerrado) # Crear un NUEVO event loop para esta tarea (evita problemas de loop cerrado)
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
@@ -96,6 +99,8 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
logger.error(f"Error actualizando estado de tarea: {update_error}") logger.error(f"Error actualizando estado de tarea: {update_error}")
# Re-lanzar la excepción para que Celery la marque como fallida # Re-lanzar la excepción para que Celery la marque como fallida
if isinstance(e, _HTTPException):
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
raise raise
finally: finally:

View File

View File

@@ -0,0 +1,97 @@
import asyncio
import json
import logging
import os
import redis.asyncio as aioredis
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
router = APIRouter()
logger = logging.getLogger(__name__)
CHANNEL_PREFIX = "audit_task:"
STATE_PREFIX = "audit_task_state:"
HEARTBEAT_INTERVAL = 25 # segundos
MAX_STREAM_SECONDS = 7200 # 2 horas
def _redis_url() -> str:
# Usa el Redis compartido con Django backend para leer los eventos publicados por sus tasks.
host = os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost"))
port = os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", "6379"))
db = os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", "0"))
return f"redis://{host}:{port}/{db}"
@router.get("/stream/tasks/{task_id}")
async def stream_task_events(task_id: str, request: Request):
"""
SSE endpoint — el cliente se conecta y recibe eventos de progreso de la tarea
en tiempo real vía Redis Pub/Sub.
Cabeceras requeridas en Nginx upstream:
proxy_read_timeout 7200;
proxy_buffering off;
"""
async def event_generator():
r = aioredis.from_url(_redis_url(), decode_responses=True)
pubsub = r.pubsub()
try:
# Enviar estado actual si ya existe (cliente que llega tarde)
current_raw = await r.get(f"{STATE_PREFIX}{task_id}")
if current_raw:
current = json.loads(current_raw)
yield f"event: task_update\ndata: {current_raw}\n\n"
if current.get("status") in ("completed", "failed"):
return
await pubsub.subscribe(f"{CHANNEL_PREFIX}{task_id}")
tick = 0
while tick < MAX_STREAM_SECONDS:
if await request.is_disconnected():
break
try:
msg = await asyncio.wait_for(
pubsub.get_message(ignore_subscribe_messages=True),
timeout=1.0,
)
except asyncio.TimeoutError:
msg = None
if msg and msg["type"] == "message":
data_str = msg["data"]
yield f"event: task_update\ndata: {data_str}\n\n"
parsed = json.loads(data_str)
if parsed.get("status") in ("completed", "failed"):
break
tick += 1
if tick % HEARTBEAT_INTERVAL == 0:
yield f"event: heartbeat\ndata: {{}}\n\n"
except asyncio.CancelledError:
pass
except Exception as exc:
logger.error(f"[stream] Error en SSE para tarea {task_id}: {exc}")
yield f"event: error\ndata: {json.dumps({'message': str(exc)})}\n\n"
finally:
try:
await pubsub.unsubscribe(f"{CHANNEL_PREFIX}{task_id}")
except Exception:
pass
await r.aclose()
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # deshabilita buffering en Nginx
"Connection": "keep-alive",
},
)

View File

@@ -4,6 +4,8 @@ from typing import Dict, Any
from fastapi import HTTPException from fastapi import HTTPException
from ..common import create_error_response from ..common import create_error_response
from core.config import settings from core.config import settings
from core.redis_events import publish_task_event
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def update_task( async def update_task(
@@ -63,6 +65,7 @@ async def update_task(
) )
response.raise_for_status() response.raise_for_status()
publish_task_event(task_id, status, message)
return response.json() return response.json()
except httpx.HTTPError as e: except httpx.HTTPError as e:

View File

@@ -1,6 +1,8 @@
from celery import Celery from celery import Celery
from core.config import settings from core.config import settings
import os import os
from celery.schedules import crontab
from datetime import timedelta
# Configuración de Celery # Configuración de Celery
celery_app = Celery( celery_app = Celery(
@@ -32,6 +34,15 @@ celery_app.conf.update(
task_save_success_on_complete=True # Guarda los resultados exitosos task_save_success_on_complete=True # Guarda los resultados exitosos
) )
# desde aqui se pueden programar tareas pero al estar desde microservicios no tengo acceso a la informacion de organizacion
# por este motivo es mejor programar las tareas desde la aplicacion de django
celery_app.conf.beat_schedule = {
# 'prueba-4': {
# 'task': 'tasks.prueba_hola',
# 'schedule': timedelta(seconds=60), # 12:00 = 6:03 AM
# },
}
# Autodiscovery of tasks # Autodiscovery of tasks
celery_app.autodiscover_tasks() celery_app.autodiscover_tasks()

View File

@@ -58,6 +58,8 @@ class APIRESTController:
response = await client.post(url, json=data, headers=self.headers) response = await client.post(url, json=data, headers=self.headers)
elif method.upper() == 'PUT': elif method.upper() == 'PUT':
response = await client.put(url, json=data, headers=self.headers) response = await client.put(url, json=data, headers=self.headers)
elif method.upper() == 'PATCH':
response = await client.patch(url, json=data, headers=self.headers)
elif method.upper() == 'DELETE': elif method.upper() == 'DELETE':
response = await client.delete(url, headers=self.headers) response = await client.delete(url, headers=self.headers)
else: else:
@@ -91,6 +93,61 @@ class APIRESTController:
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
return None return None
async def post_or_update_document(
self, soap_response=None, organizacion: str = None,
pedimento: str = None, file_name: str = None,
document_type: int = None, fuente: int = 2,
identifier: str = None, binary_content: bytes = None,
) -> Dict[str, Any]:
"""
Guarda un documento VU (request, error o respuesta). Crea PRIMERO el
documento nuevo y, solo si se guardó con éxito, elimina los previos del
mismo tipo/pedimento/identificador. Así se evitan duplicados en
ejecuciones recurrentes sin perder el documento anterior cuando el
guardado del nuevo falla (antes se borraba el previo antes del POST).
identifier: cadena única del documento dentro del pedimento
(ej: número de COVE, número de e-document). Se usa como
filtro archivo__icontains para distinguir documentos del
mismo tipo pertenecientes a distintos COVEs o edocs.
"""
new_document = await self.post_document(
soap_response=soap_response,
binary_content=binary_content,
organizacion=organizacion,
pedimento=pedimento,
file_name=file_name,
document_type=document_type,
fuente=fuente,
)
if not new_document:
# POST falló: conservar los documentos previos intactos
logger.error(f"post_or_update_document: no se guardó el documento nuevo (document_type={document_type}, identifier={identifier}); se conservan los previos")
return new_document
new_id = new_document.get('id') if isinstance(new_document, dict) else None
if not new_id:
logger.warning(f"post_or_update_document: respuesta sin id, se omite limpieza de previos (document_type={document_type}, identifier={identifier})")
return new_document
try:
query = f'record/documents/?pedimento={pedimento}&document_type={document_type}'
if identifier:
query += f'&archivo__icontains={identifier}'
existing = await self._make_request_async('GET', query)
if existing:
results = existing.get('results', existing) if isinstance(existing, dict) else existing
if isinstance(results, list):
for previous in results:
previous_id = previous.get('id')
if previous_id and previous_id != new_id:
await self._make_request_async('DELETE', f'record/documents/{previous_id}/')
logger.info(f"Documento VU previo eliminado: id={previous_id}, document_type={document_type}, identifier={identifier}")
except Exception as e:
logger.warning(f"No se pudo verificar/eliminar documento VU existente (document_type={document_type}, identifier={identifier}): {e}")
return new_document
async def put_procesamiento(self, service_id: int, data: Dict[str, Any]) -> Dict[str, Any]: async def put_procesamiento(self, service_id: int, data: Dict[str, Any]) -> Dict[str, Any]:
return await self._make_request_async('PUT', f'customs/procesamientopedimentos/{service_id}/', data=data) return await self._make_request_async('PUT', f'customs/procesamientopedimentos/{service_id}/', data=data)
@@ -464,25 +521,29 @@ class APIController:
""" """
return await self._make_request_async('PUT', f'customs/edocuments/{edocument_id}/', data=data) return await self._make_request_async('PUT', f'customs/edocuments/{edocument_id}/', data=data)
async def get_cer(self, id: str) -> bytes: async def put_cove(self, cove_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
""" return await self._make_request_async('PUT', f'customs/coves/{cove_id}/', data=data)
Método para obtener un certificado específico desde la API (como binario).
Args:
id: UUID del certificado a consultar
Returns:
bytes: Contenido binario del certificado
"""
return await self._make_request_async('GET', f'vucem/vucem/{id}/download_cer/', return_bytes=True)
async def get_key(self, id: str) -> bytes: async def get_key(self, id: str) -> bytes:
""" result = await self._make_request_async('GET', f'vucem/vucem/{id}/download_key/', return_bytes=True)
Método para obtener una llave específica desde la API (como binario).
Args: if result is None:
id: UUID de la llave a consultar logger.info(f"get_key retornó None")
Returns: else:
bytes: Contenido binario de la llave logger.info(f"get_key retornó {len(result)} bytes")
"""
return await self._make_request_async('GET', f'vucem/vucem/{id}/download_key/', return_bytes=True) return result
async def get_cer(self, id: str) -> bytes:
result = await self._make_request_async('GET', f'vucem/vucem/{id}/download_cer/', return_bytes=True)
if result is None:
logger.info(f"get_cer retornó None")
else:
logger.info(f"get_cer retornó {len(result)} bytes")
return result
async def _make_request_async(self, method: str, endpoint: str, data=None, return_bytes: bool = False): async def _make_request_async(self, method: str, endpoint: str, data=None, return_bytes: bool = False):
""" """
@@ -509,6 +570,8 @@ class APIController:
print(f"response >>>> {response}") print(f"response >>>> {response}")
elif method.upper() == 'PUT': elif method.upper() == 'PUT':
response = await client.put(url, json=data, headers=self.headers) response = await client.put(url, json=data, headers=self.headers)
elif method.upper() == 'PATCH':
response = await client.patch(url, json=data, headers=self.headers)
elif method.upper() == 'DELETE': elif method.upper() == 'DELETE':
response = await client.delete(url, headers=self.headers) response = await client.delete(url, headers=self.headers)
else: else:

View File

@@ -135,13 +135,14 @@ class XMLScraper: # Clase me extrae datos de Pedimento
def _remesas(self, root: ET.Element) -> bool: def _remesas(self, root: ET.Element) -> bool:
""" """
Método para verificar si el pedimento tiene remesas. Método para verificar si el pedimento tiene remesas.
Busca identificadores con clave 'RC' (REMESAS DE CONSOLIDADO). Busca identificadores con clave 'PC' (PEDIMENTO CONSOLIDADO)
o 'RC' (REMESAS DE CONSOLIDADO).
Args: Args:
root: Elemento raíz del XML. root: Elemento raíz del XML.
Returns: Returns:
True si encuentra identificadores con clave 'RC', False en caso contrario. True si encuentra identificadores con clave 'PC' o 'RC', False en caso contrario.
""" """
namespaces = { namespaces = {
'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto', 'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto',
@@ -157,8 +158,8 @@ class XMLScraper: # Clase me extrae datos de Pedimento
clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces) clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces)
clave = clave_elem.text if clave_elem is not None else None clave = clave_elem.text if clave_elem is not None else None
# Si encontramos una clave 'RC', el pedimento tiene remesas # PC (consolidado) o RC (remesas de consolidado) indican remesas
if clave == 'RC': if clave in ('PC', 'RC'):
return True return True
except Exception as e: except Exception as e:
@@ -166,7 +167,7 @@ class XMLScraper: # Clase me extrae datos de Pedimento
print(f"Error procesando identificador para remesas: {e}") print(f"Error procesando identificador para remesas: {e}")
continue continue
print("No se encontraron remesas (sin identificadores RC)") print("No se encontraron remesas (sin identificadores PC/RC)")
return False return False
def _get_tipo_operacion(self, root: ET.Element) -> str: def _get_tipo_operacion(self, root: ET.Element) -> str:

44
core/redis_events.py Normal file
View File

@@ -0,0 +1,44 @@
import json
import os
import redis
import logging
logger = logging.getLogger(__name__)
CHANNEL_PREFIX = "audit_task:"
STATE_PREFIX = "audit_task_state:"
STATE_TTL = 7200 # 2 horas
def _get_sync_redis():
# REDIS_PUBSUB_HOST apunta al Redis compartido con el backend Django.
# En dev: redis_backend_dev. Por defecto usa REDIS_HOST (para entornos con una sola instancia).
return redis.Redis(
host=os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost")),
port=int(os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", 6379))),
db=int(os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", 0))),
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2,
)
def publish_task_event(task_id: str, status: str, message: str = "", resultado: dict = None, progress: int = None):
"""
Publica un evento de progreso de tarea en Redis Pub/Sub.
Guarda el último estado en una key con TTL para clientes que se conectan tarde.
"""
payload: dict = {"task_id": task_id, "status": status, "message": message}
if resultado is not None:
payload["resultado"] = resultado
if progress is not None:
payload["progress"] = progress
try:
client = _get_sync_redis()
serialized = json.dumps(payload)
client.publish(f"{CHANNEL_PREFIX}{task_id}", serialized)
client.setex(f"{STATE_PREFIX}{task_id}", STATE_TTL, serialized)
client.close()
except Exception as exc:
logger.error(f"[redis_events] Error publicando evento para tarea {task_id}: {exc}")

View File

@@ -125,9 +125,12 @@ def pedimento_completo_task(self, request_data: Dict[str, Any]):
# Subir documento de pedimento completo si la petición fue exitosa # Subir documento de pedimento completo si la petición fue exitosa
try: try:
# solucion al error de descarga de un e-document, el mapeo de identificador no llegaba y ni siquiera insertaba registros en
# la tabla
identificadores_ed_response = xml_content.get('identificadores_ed', []) if 'xml_content' in locals() else []
upload_result = await _post_edocuments( upload_result = await _post_edocuments(
response_service=service_data, response_service=service_data,
identificadores_ed=[soap_response.get('documento', {}).get('numero_edocument')] identificadores_ed=identificadores_ed_response
) )
logger.info(f"Documento de pedimento completo subido exitosamente: {upload_result}") logger.info(f"Documento de pedimento completo subido exitosamente: {upload_result}")
except Exception as upload_err: except Exception as upload_err:
@@ -626,6 +629,19 @@ def acuse_task(self, **kwargs):
documento_info["documento"] = soap_response.get('documento', {}) documento_info["documento"] = soap_response.get('documento', {})
documentos_exitosos += 1 documentos_exitosos += 1
logger.info(f"Acuse del documento {idx + 1} procesado exitosamente") logger.info(f"Acuse del documento {idx + 1} procesado exitosamente")
try:
await rest_controller.put_edocument(
edocument_id=edoc['id'],
data={
"id": edoc['id'],
"acuse_descargado": True,
"numero_edocument": edoc.get('numero_edocument'),
"pedimento": service_data['pedimento']['id'],
"organizacion": service_data['organizacion'],
}
)
except Exception as status_err:
logger.warning(f"Error actualizando acuse_descargado para edoc {edoc.get('numero_edocument')}: {status_err}")
else: else:
documento_info["error"] = "Error en petición SOAP" documento_info["error"] = "Error en petición SOAP"
logger.warning(f"No se pudo procesar el acuse del documento {idx + 1}") logger.warning(f"No se pudo procesar el acuse del documento {idx + 1}")
@@ -832,28 +848,19 @@ def edocument_task(self, **kwargs):
documento_info["documento"] = soap_response.get('documento', {}) documento_info["documento"] = soap_response.get('documento', {})
documentos_exitosos += 1 documentos_exitosos += 1
logger.info(f"E-document {idx + 1} procesado exitosamente") logger.info(f"E-document {idx + 1} procesado exitosamente")
# Subir el documento si la petición fue exitosa
try: try:
upload_result = await _post_edocuments( await rest_controller.put_edocument(
response_service=service_data, edocument_id=edoc['id'],
identificadores_ed=[edoc.get('numero_edocument')] data={
"id": edoc['id'],
"edocument_descargado": True,
"numero_edocument": edoc.get('numero_edocument'),
"pedimento": service_data['pedimento']['id'],
"organizacion": service_data['organizacion'],
}
) )
documento_info["upload_result"] = upload_result except Exception as status_err:
logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente") logger.warning(f"Error actualizando edocument_descargado para edoc {edoc.get('numero_edocument')}: {status_err}")
except Exception as upload_err:
documento_info["upload_error"] = str(upload_err)
logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}")
# Subir el documento si la petición fue exitosa
try:
upload_result = await _post_edocuments(
response_service=service_data,
identificadores_ed=[edoc.get('numero_edocument')]
)
documento_info["upload_result"] = upload_result
logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente")
except Exception as upload_err:
documento_info["upload_error"] = str(upload_err)
logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}")
else: else:
documento_info["error"] = "Error en petición SOAP" documento_info["error"] = "Error en petición SOAP"
logger.warning(f"No se pudo procesar el e-document {idx + 1}") logger.warning(f"No se pudo procesar el e-document {idx + 1}")
@@ -974,6 +981,19 @@ def coves_task(self, **kwargs):
documento_info["documento"] = soap_response.get('documento', {}) documento_info["documento"] = soap_response.get('documento', {})
documentos_exitosos += 1 documentos_exitosos += 1
logger.info(f"cove del documento {idx + 1} procesado exitosamente") logger.info(f"cove del documento {idx + 1} procesado exitosamente")
try:
await rest_controller.put_cove(
cove_id=cove['id'],
data={
"id": cove['id'],
"cove_descargado": True,
"numero_cove": cove.get('numero_cove'),
"pedimento": service_data['pedimento']['id'],
"organizacion": service_data['organizacion'],
}
)
except Exception as status_err:
logger.warning(f"Error actualizando cove_descargado para cove {cove.get('numero_cove')}: {status_err}")
else: else:
documento_info["error"] = "Error en petición SOAP" documento_info["error"] = "Error en petición SOAP"
logger.warning(f"No se pudo procesar el cove del documento {idx + 1}") logger.warning(f"No se pudo procesar el cove del documento {idx + 1}")
@@ -1115,17 +1135,19 @@ def acuse_cove_task(self, **kwargs):
documento_info["documento"] = soap_response.get('documento', {}) documento_info["documento"] = soap_response.get('documento', {})
documentos_exitosos += 1 documentos_exitosos += 1
logger.info(f"Acuse de COVE {idx + 1} procesado exitosamente") logger.info(f"Acuse de COVE {idx + 1} procesado exitosamente")
# Subir el documento de COVE si la petición fue exitosa
try: try:
upload_result = await _post_coves( await rest_controller.put_cove(
response_service=service_data, cove_id=cove['id'],
identificadores_cove=[cove.get('numero_cove')] data={
"id": cove['id'],
"acuse_cove_descargado": True,
"numero_cove": cove.get('numero_cove'),
"pedimento": service_data['pedimento']['id'],
"organizacion": service_data['organizacion'],
}
) )
documento_info["upload_result"] = upload_result except Exception as status_err:
logger.info(f"Documento COVE {cove.get('numero_cove')} subido exitosamente") logger.warning(f"Error actualizando acuse_cove_descargado para cove {cove.get('numero_cove')}: {status_err}")
except Exception as upload_err:
documento_info["upload_error"] = str(upload_err)
logger.error(f"Error al subir documento COVE {cove.get('numero_cove')}: {upload_err}")
else: else:
documento_info["error"] = "Error en petición SOAP" documento_info["error"] = "Error en petición SOAP"
logger.warning(f"No se pudo procesar el acuse de COVE {idx + 1}") logger.warning(f"No se pudo procesar el acuse de COVE {idx + 1}")
@@ -1166,3 +1188,12 @@ def acuse_cove_task(self, **kwargs):
raise e raise e
return run_async_task(_execute_acuse_cove) return run_async_task(_execute_acuse_cove)
# @celery_app.task(name='tasks.prueba_hola')
# def prueba_hola():
# """
# Tarea de prueba: solo imprime un saludo para verificar la programación.
# """
# logger.info("¡Hola! Tarea programada ejecutada correctamente.")
# print("¡Hola desde la tarea programada!")
# return "OK"

View File

@@ -8,9 +8,8 @@ def soap_error(soap_response): # Testeado
Returns: Returns:
bool: True si no hay errores, False en caso contrario bool: True si no hay errores, False en caso contrario
""" """
if '<ns2:tieneError>true</ns2:tieneError>' in soap_response.text: # Cubre cualquier variante de namespace y capitalización (tieneError, TieneError, ns2:tieneError, etc.)
return True if 'tieneerror>true<' in soap_response.text.lower():
if '<ns3:tieneError>true</ns3:tieneError>' in soap_response.text:
return True return True
if "<mensaje>El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo</mensaje>" in soap_response.text: if "<mensaje>El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo</mensaje>" in soap_response.text:
return True return True

85
utils/minio_client.py Normal file
View File

@@ -0,0 +1,85 @@
# microservice/utils/minio_client.py
import os
from minio import Minio
from minio.error import S3Error
from datetime import timedelta
from typing import Optional, BinaryIO
import logging
logger = logging.getLogger(__name__)
class MinIOClient:
"""Cliente MinIO para FastAPI"""
def __init__(self):
self.client = None
self.bucket_name = None
self._initialize()
def _initialize(self):
"""Inicializa el cliente MinIO"""
endpoint = os.environ.get('MINIO_ENDPOINT', 'minio:9000')
access_key = os.environ.get('MINIO_ACCESS_KEY')
secret_key = os.environ.get('MINIO_SECRET_KEY')
secure = os.environ.get('MINIO_SECURE', 'false').lower() == 'true'
self.bucket_name = os.environ.get('MINIO_BUCKET_NAME', 'efc-microservice-dev')
self.client = Minio(
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure
)
# Asegurar bucket
if not self.client.bucket_exists(self.bucket_name):
self.client.make_bucket(self.bucket_name)
logger.info(f"Bucket '{self.bucket_name}' creado")
def upload_file(
self,
object_name: str,
file_path: str = None,
file_data: bytes = None,
content_type: str = None
) -> bool:
"""Sube archivo a MinIO (síncrono para Celery)"""
try:
if file_path:
self.client.fput_object(
self.bucket_name,
object_name,
file_path,
content_type=content_type
)
elif file_data:
import io
data_stream = io.BytesIO(file_data)
self.client.put_object(
self.bucket_name,
object_name,
data_stream,
len(file_data),
content_type=content_type
)
return True
except Exception as e:
logger.error(f"Error subiendo archivo: {e}")
return False
def get_presigned_url(self, object_name: str, expires: int = 3600) -> Optional[str]:
"""Genera URL firmada"""
try:
return self.client.presigned_get_object(
self.bucket_name,
object_name,
expires=timedelta(seconds=expires)
)
except Exception as e:
logger.error(f"Error generando URL: {e}")
return None
# Singleton
minio_client = MinIOClient()

View File

@@ -603,7 +603,7 @@ async def get_soap_acuse(credenciales, response_service, soap_controller, edocum
no_partidas = response_service['pedimento'].get('numero_partidas', 0) no_partidas = response_service['pedimento'].get('numero_partidas', 0)
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A') tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
pedimento = response_service['pedimento'].get('pedimento', 'N/A') pedimento = response_service['pedimento'].get('pedimento', 'N/A')
_file_name = f"vu_AC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{idx}.pdf" _file_name = f"vu_AC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{edocument['numero_edocument']}.pdf"
# Enviar el documento PDF usando binary_content # Enviar el documento PDF usando binary_content
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)") logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")
@@ -710,7 +710,7 @@ async def get_soap_acuseCOVE(credenciales, response_service, soap_controller, co
no_partidas = response_service['pedimento'].get('numero_partidas', 0) no_partidas = response_service['pedimento'].get('numero_partidas', 0)
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A') tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
pedimento = response_service['pedimento'].get('pedimento', 'N/A') pedimento = response_service['pedimento'].get('pedimento', 'N/A')
_file_name = f"vu_AC_COVE_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{idx}.pdf" _file_name = f"vu_AC_COVE_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{cove['numero_cove']}.pdf"
# Enviar el documento PDF usando binary_content # Enviar el documento PDF usando binary_content
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)") logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")
@@ -884,7 +884,7 @@ async def get_soap_edocument(credenciales, response_service, soap_controller, ed
no_partidas = response_service['pedimento'].get('numero_partidas', 0) no_partidas = response_service['pedimento'].get('numero_partidas', 0)
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A') tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
pedimento = response_service['pedimento'].get('pedimento', 'N/A') pedimento = response_service['pedimento'].get('pedimento', 'N/A')
_file_name = f"vu_EDC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{idx}.pdf" _file_name = f"vu_EDC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{edocument['numero_edocument']}.pdf"
# Enviar el documento PDF usando binary_content # Enviar el documento PDF usando binary_content
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)") logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")