11 Commits

Author SHA1 Message Date
1e06d1a2bf Merge pull request 'feature/capturar errores, evitar duplicados, eliminar manejar nuevas flags para descargar datos de vucem' (#12) from feature/capturar-errores-evitar-duplicados into main
Reviewed-on: #12
2026-06-19 13:16:57 +00:00
b9c6ab89c3 feature/capturar errores, evitar duplicados, eliminar manejar nuevas flags para descargar datos de vucem 2026-06-15 11:20:58 -06:00
042d02e240 Merge pull request 'feature/pedimentos-corregidos' (#11) from feature/correccion-pedientos into main
Reviewed-on: #11
2026-05-28 13:20:34 +00:00
288a96bc59 feature/pedimentos-corregidos 2026-05-28 07:13:32 -06:00
9f59ac0d00 Merge pull request 'fix/forzar-procesamiento-acuses' (#10) from fix/forzar-procesamiento-acuses into main
Reviewed-on: #10
2026-05-25 20:55:27 +00:00
fe8e7dc10f fix/forzar-procesamiento-acuses 2026-05-25 14:54:46 -06:00
a6ce91d8af Merge pull request 'feature/rbac-implementation' (#9) from feature/rbac-implementation into main
Reviewed-on: #9
2026-05-21 13:58:59 +00:00
e174df0af3 feature/rbac-implementation 2026-05-21 07:56:20 -06:00
3c10653d6a Merge pull request 'feature/pedimento completo carga remesas, coves, edocs y acuses y aparte descarga sus documentos, se corrigieron las formulas de remesas, acuse y e documents para permitir la correcta descarga de susdocumentos y se aseguro que el status sea el correcto' (#8) from feature/T2026-05-016-y-T2026-05-030 into main
Reviewed-on: #8
2026-05-18 18:04:36 +00:00
Dulce
f96e5a227b feature/pedimento completo carga remesas, coves, edocs y acuses y aparte descarga sus documentos, se corrigieron las formulas de remesas, acuse y e documents para permitir la correcta descarga de susdocumentos y se aseguro que el status sea el correcto 2026-05-18 11:58:42 -06:00
ed00651a8b Merge pull request 'feature/lectura de credenciales de vucem desde archivos de minIO' (#7) from feature/minio-read-credentials into main
Reviewed-on: #7
2026-04-22 17:38:06 +00:00
24 changed files with 835 additions and 245 deletions

View File

@@ -9,6 +9,7 @@ from api.api_v2.modules.edocs.routers import router as edocs_router
from api.api_v2.modules.partidas.routers import router as partidas_router
from api.api_v2.modules.pedimentos.routers import router as pedimentos_router
from api.api_v2.modules.remesas.routers import router as remesas_router
from api.api_v2.modules.stream.routers import router as stream_router
api_router = APIRouter()
@@ -20,3 +21,4 @@ api_router.include_router(edocs_router, tags=["EDocuments"])
api_router.include_router(partidas_router, tags=["Partidas"])
api_router.include_router(pedimentos_router, tags=["Pedimentos"])
api_router.include_router(remesas_router, tags=["Remesas"])
api_router.include_router(stream_router, tags=["Stream"])

View File

@@ -33,12 +33,13 @@ async def obtener_acuse(**kwargs):
file_name_request = f"vu_AC_{pedimento_app}_{idEdocument_efc}_REQUEST.xml"
document_response = await acuse_rest_controller.post_document(
document_response = await acuse_rest_controller.post_or_update_document(
soap_response=soap_xml,
organizacion=organizacion_efc,
pedimento=pedimento_id_efc,
file_name=file_name_request,
document_type=25, # Tipo de documento para request de acuse VU
document_type=25,
identifier=idEdocument_efc,
)
except Exception as e:
@@ -72,11 +73,32 @@ async def obtener_acuse(**kwargs):
if soap_error(response):
logger.error("Error SOAP detectado en la respuesta")
pedimento_efc = kwargs.get('pedimento', {})
organizacion_efc = pedimento_efc.get("organizacion", None)
pedimento_id_efc = pedimento_efc.get("id", None)
pedimento_app = pedimento_efc.get('pedimento_app', 'N/A')
idEdocument_efc = kwargs['edoc'].get('numero_edocument', 'N/A')
file_name_response = f"vu_AC_{pedimento_app}_{idEdocument_efc}_RESPONSE_ERROR.xml"
logger.info(f"Guardando RESPONSE_ERROR: file={file_name_response}, organizacion={organizacion_efc}, pedimento={pedimento_id_efc}")
doc_result = await acuse_rest_controller.post_or_update_document(
soap_response=response.text,
organizacion=organizacion_efc,
pedimento=pedimento_id_efc,
file_name=file_name_response,
document_type=26,
identifier=idEdocument_efc,
)
if doc_result is None:
logger.error(f"post_document retornó None para RESPONSE_ERROR — archivo físico guardado sin registro en BD")
else:
logger.info(f"RESPONSE_ERROR registrado en BD: id={doc_result.get('id')}, document_type={doc_result.get('document_type')}")
raise HTTPException(
status_code=500,
detail=create_error_response(
message="Error en la respuesta del servicio SOAP",
data={"soap_response": response.text[:500]}
data={"soap_response": response.text[:3000]}
)
)
@@ -123,12 +145,13 @@ async def obtener_acuse(**kwargs):
organizacion = pedimento.get("organizacion", None)
pedimento_id = pedimento.get("id", None)
rest_response = await acuse_rest_controller.post_document(
rest_response = await acuse_rest_controller.post_or_update_document(
binary_content=pdf_bytes,
organizacion=organizacion,
pedimento=pedimento_id,
file_name=_file_name,
document_type=4
document_type=4,
identifier=idEdocument_efc,
)
if rest_response is None:
@@ -180,6 +203,7 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
data = {
"id": edoc.get("id"),
"acuse_descargado": status,
"acuse_estado": "descargado" if status else "pendiente",
"numero_edocument": edoc.get("numero_edocument"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
@@ -187,6 +211,36 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
if response is None:
logger.error(f"No se pudo actualizar el estatus del acuse del EDocument {edoc.get('numero_edocument')} en la API")
raise Exception(f"Fallo al actualizar el estatus del acuse del EDocument {edoc.get('numero_edocument')}")
return response
async def marcar_error_acuse(edoc: dict, pedimento: dict, mensaje: str, definitivo: bool = False):
"""
Reporta un fallo de descarga del acuse al registro de negocio (T2026-05-027).
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
la transición a 'error'.
- definitivo=True (fallo permanente): transiciona de inmediato a 'error';
queda fuera del ciclo automático, solo reproceso manual o reset.
"""
data = {
"id": edoc.get("id"),
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
"numero_edocument": edoc.get("numero_edocument"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
}
if definitivo:
data["acuse_estado"] = "error"
response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
if response is None:
logger.error(f"No se pudo registrar el error del acuse del EDocument {edoc.get('numero_edocument')} en la API")
return response
def _decode_acuse_base64_content(base64_content): # Testeado

View File

@@ -4,10 +4,15 @@ import asyncio
import logging
from typing import Dict, Any
from contextlib import asynccontextmanager
from fastapi import HTTPException
from .services import obtener_acuse
from .services import obtener_acuse, marcar_error_acuse
from api.api_v2.modules.tasks.services import register_task, update_task
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
# incrementan el contador de intentos del backend (T2026-05-027)
WORKER_MAX_RETRIES = 2
@celery_app.task(bind=True)
def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]:
@@ -19,12 +24,14 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
pedimento_id = pedimento_info.get('id')
organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
edoc_info = acuse_request.get('edoc', {})
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Registrar el inicio de la tarea
# Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
if self.request.retries == 0:
logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}")
loop.run_until_complete(
register_task(
@@ -75,6 +82,22 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
# En caso de error, actualizar estado
error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}"
logging.error(error_message)
# Reintento interno del worker para fallos transitorios (red, timeout):
# mismo intento orquestado, NO incrementa el contador del backend
if not isinstance(e, HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Fallo definitivo de este intento: registrar el detalle en el registro de
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
try:
loop.run_until_complete(
marcar_error_acuse(edoc_info, pedimento_info, error_message, definitivo=False)
)
except Exception as report_error:
logging.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
try:
loop.run_until_complete(
update_task(
@@ -88,6 +111,6 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
)
except Exception as update_error:
logging.error(f"Error al actualizar estado de tarea: {update_error}")
raise
return {"status": "failed", "message": error_message}
finally:
loop.close()

View File

@@ -91,12 +91,13 @@ async def consume_ws_get_cove(**kwargs):
# Enviar documento de request a EFC
try:
file_name_request = f"vu_COVE_{pedimento_app}_{cove}_REQUEST.xml"
document_response = await coves_rest_controller.post_document(
document_response = await coves_rest_controller.post_or_update_document(
soap_response=soap_xml,
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=file_name_request,
document_type=19,
identifier=cove,
)
except Exception as e:
logger.error(f"Error al enviar documento request: {e}")
@@ -117,12 +118,13 @@ async def consume_ws_get_cove(**kwargs):
raise Exception("No se recibió respuesta del servicio SOAP")
if soap_error(soap_response):
document_response = await coves_rest_controller.post_document(
document_response = await coves_rest_controller.post_or_update_document(
soap_response=soap_response,
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=f"vu_COVE_{pedimento_app}_{cove}_ERROR.xml",
document_type=20,
identifier=cove,
)
raise Exception("Error en la respuesta del servicio SOAP")
@@ -132,17 +134,25 @@ async def consume_ws_get_cove(**kwargs):
# Enviar documento
_file_name = f"vu_COVE_{pedimento_app}_{cove}.xml"
try:
document_response = await coves_rest_controller.post_document(
document_response = await coves_rest_controller.post_or_update_document(
soap_response=soap_response,
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=_file_name,
document_type=8,
identifier=cove,
)
except Exception as e:
logger.error(f"Error detectado en la respuesta SOAP: {str(e)}")
raise Exception(f"Error en la respuesta SOAP: {str(e)}")
# Validar que el documento quedó persistido en la API antes de marcar el
# COVE como descargado (T2026-05-027): post_or_update_document retorna
# None en error sin lanzar excepción
if not document_response or document_response.get("id") is None:
logger.error(f"No se pudo persistir el COVE {cove} en la API; el estatus no se actualiza")
raise Exception(f"No se pudo persistir el COVE {cove} en la API")
logger.info("Documento enviado, actualizando status de COVE")
# Actualizar status del COVE
@@ -205,12 +215,13 @@ async def consume_ws_get_acuse_cove(**kwargs):
# Enviar documento de request a EFC
try:
file_name_request = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_REQUEST.xml"
document_response = await coves_rest_controller.post_document(
document_response = await coves_rest_controller.post_or_update_document(
soap_response=soap_xml,
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=file_name_request,
document_type=23,
identifier=kwargs['cove'].get('cove', 'N/A'),
)
except Exception as e:
logger.error(f"Error al enviar documento request de acuse cove: {e}")
@@ -251,12 +262,13 @@ async def consume_ws_get_acuse_cove(**kwargs):
if soap_error(response):
error_file_name = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_ERROR.xml"
try:
rest_response = await coves_rest_controller.post_document(
rest_response = await coves_rest_controller.post_or_update_document(
soap_response=response,
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=error_file_name,
document_type=24,
identifier=kwargs['cove'].get('cove', 'N/A'),
)
except Exception as e:
logger.error(f"Error al guardar respuesta SOAP errónea: {e}")
@@ -314,12 +326,23 @@ async def consume_ws_get_acuse_cove(**kwargs):
organizacion = pedimento.get("organizacion", None)
pedimento_id = pedimento.get("id", None)
rest_response = await coves_rest_controller.post_document(
cove_identifier = kwargs['cove'].get('cove', '')
rest_response = await coves_rest_controller.post_or_update_document(
binary_content=pdf_bytes,
organizacion=organizacion,
pedimento=pedimento_id,
file_name=_file_name,
document_type=7
document_type=7,
identifier=cove_identifier,
)
# Validar la subida antes de marcar el acuse como descargado (T2026-05-027):
# post_or_update_document retorna None en error sin lanzar excepción
if not rest_response or rest_response.get("id") is None:
logger.error("Error al enviar el acuse de COVE a la API interna; el estatus no se actualiza")
raise HTTPException(
status_code=500,
detail="No se pudo persistir el acuse del COVE en la API; el estatus no se actualiza"
)
acuse_status = await change_acuse_status(
@@ -500,6 +523,7 @@ async def change_cove_status(cove: dict, status: bool, pedimento: dict):
data = {
"id": cove.get("id"),
"cove_descargado": status,
"cove_estado": "descargado" if status else "pendiente",
"numero_cove": cove.get("cove"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
@@ -507,6 +531,11 @@ async def change_cove_status(cove: dict, status: bool, pedimento: dict):
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
if response is None:
logger.error(f"No se pudo actualizar el estatus del COVE {cove.get('cove')} en la API")
raise Exception(f"Fallo al actualizar el estatus del COVE {cove.get('cove')}")
return response
@@ -514,14 +543,46 @@ async def change_acuse_status(cove: dict, status: bool, pedimento: dict):
data = {
"id": cove.get("id"),
"acuse_cove_descargado": status,
"acuse_cove_estado": "descargado" if status else "pendiente",
"numero_cove": cove.get("cove"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
}
print(data)
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
if response is None:
logger.error(f"No se pudo actualizar el estatus del acuse del COVE {cove.get('cove')} en la API")
raise Exception(f"Fallo al actualizar el estatus del acuse del COVE {cove.get('cove')}")
return response
async def marcar_error_cove(cove: dict, pedimento: dict, mensaje: str, acuse: bool = False, definitivo: bool = False):
"""
Reporta un fallo de descarga al registro de negocio (T2026-05-027).
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
la transición a 'error'.
- definitivo=True (fallo permanente: VUCEM responde que no existe, credencial
rechazada): transiciona de inmediato a 'error'; queda fuera del ciclo
automático y solo el reproceso manual o el reset lo reincorporan.
"""
campo = "acuse_cove_estado" if acuse else "cove_estado"
data = {
"id": cove.get("id"),
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
"numero_cove": cove.get("cove"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
}
if definitivo:
data[campo] = "error"
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
if response is None:
logger.error(f"No se pudo registrar el error del COVE {cove.get('cove')} en la API")
return response

View File

@@ -3,13 +3,18 @@ import logging
from celery import Celery
from celery_app import celery_app
from typing import Dict, Any
from fastapi import HTTPException as _HTTPException
from .services import consume_ws_get_cove, consume_ws_get_acuse_cove
from .services import consume_ws_get_cove, consume_ws_get_acuse_cove, marcar_error_cove
from api.api_v2.modules.tasks.services import update_task, register_task
# Logger para el módulo
logger = logging.getLogger(__name__)
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
# incrementan el contador de intentos del backend (T2026-05-027)
WORKER_MAX_RETRIES = 2
@celery_app.task(bind=True)
def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
@@ -22,10 +27,12 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
cove_info = cove_request.get('cove', {})
cove_number = cove_info.get('numero_cove', 'N/A')
# aqui se cambio de cove_number a 'cove' porque no esta asi en el mapeo
cove_number = cove_info.get('cove', 'N/A')
try:
# Registrar el inicio de la tarea
# Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
if self.request.retries == 0:
logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
asyncio.run(
register_task(
@@ -69,6 +76,21 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message)
# Reintento interno del worker para fallos transitorios (red, timeout):
# mismo intento orquestado, NO incrementa el contador del backend
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Fallo definitivo de este intento: registrar el detalle en el registro de
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
try:
asyncio.run(
marcar_error_cove(cove_info, pedimento_info, error_message, acuse=False, definitivo=False)
)
except Exception as report_error:
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
try:
asyncio.run(
update_task(
@@ -83,6 +105,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
except Exception as update_error:
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
if isinstance(e, _HTTPException):
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
raise
@@ -97,10 +121,12 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
cove_info = cove_request.get('cove', {})
cove_number = cove_info.get('numero_cove', 'N/A')
# se cambio el cove_number, ya que no existe directamente en el mapeo, se conoce solo como 'cove'
cove_number = cove_info.get('cove', 'N/A')
try:
# Registrar el inicio de la tarea
# Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
if self.request.retries == 0:
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
asyncio.run(
register_task(
@@ -144,6 +170,21 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message)
# Reintento interno del worker para fallos transitorios (red, timeout):
# mismo intento orquestado, NO incrementa el contador del backend
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Fallo definitivo de este intento: registrar el detalle en el registro de
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
try:
asyncio.run(
marcar_error_cove(cove_info, pedimento_info, error_message, acuse=True, definitivo=False)
)
except Exception as report_error:
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
try:
asyncio.run(
update_task(
@@ -158,4 +199,6 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
except Exception as update_error:
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
if isinstance(e, _HTTPException):
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
raise

View File

@@ -48,12 +48,13 @@ async def obtener_edoc(**kwargs):
file_name_request = f"VU_ED_{pedimento_app}_{numero_documento}_REQUEST.xml"
document_response = await edocs_rest_controller.post_document(
document_response = await edocs_rest_controller.post_or_update_document(
soap_response=soap_xml,
organizacion=organizacion_efc,
pedimento=pedimento_id_efc,
file_name=file_name_request,
document_type=21 # Tipo de documento para request de e-document,
document_type=21,
identifier=numero_documento,
)
except Exception as e:
@@ -97,6 +98,21 @@ async def obtener_edoc(**kwargs):
if soap_error(response):
logger.error("Respuesta SOAP contiene error de VUCEM")
_pedimento_efc = kwargs.get('pedimento', {})
_file_name_error = f"VU_ED_{_pedimento_efc.get('pedimento_app', 'N/A')}_{numero_documento}_RESPONSE_ERROR.xml"
logger.info(f"Guardando RESPONSE_ERROR doc_type=22: file={_file_name_error}, organizacion={_pedimento_efc.get('organizacion')}, pedimento={_pedimento_efc.get('id')}")
_doc_result = await edocs_rest_controller.post_or_update_document(
soap_response=response.text,
organizacion=_pedimento_efc.get('organizacion'),
pedimento=_pedimento_efc.get('id'),
file_name=_file_name_error,
document_type=22,
identifier=numero_documento,
)
if _doc_result is None:
logger.error("post_or_update_document retornó None para RESPONSE_ERROR doc_type=22 — archivo físico sin registro en BD")
else:
logger.info(f"RESPONSE_ERROR registrado en BD: id={_doc_result.get('id')}, document_type={_doc_result.get('document_type')}")
raise HTTPException(
status_code=500,
detail=create_error_response(
@@ -109,8 +125,23 @@ async def obtener_edoc(**kwargs):
try:
edoc_base64 = extract_pdf_bytes_from_xml_content(response.text)
except ValueError as ve:
except Exception as ve:
logger.error(f"Error extrayendo contenido del XML: {ve}")
_pedimento_efc = kwargs.get('pedimento', {})
_file_name_error = f"VU_ED_{_pedimento_efc.get('pedimento_app', 'N/A')}_{numero_documento}_RESPONSE_ERROR.xml"
logger.info(f"Guardando RESPONSE_ERROR doc_type=22 (parse error): file={_file_name_error}")
_doc_result = await edocs_rest_controller.post_or_update_document(
soap_response=response.text,
organizacion=_pedimento_efc.get('organizacion'),
pedimento=_pedimento_efc.get('id'),
file_name=_file_name_error,
document_type=22,
identifier=numero_documento,
)
if _doc_result is None:
logger.error("post_document retornó None para RESPONSE_ERROR doc_type=22 (parse error)")
else:
logger.info(f"RESPONSE_ERROR registrado en BD: id={_doc_result.get('id')}, document_type={_doc_result.get('document_type')}")
raise HTTPException(
status_code=500,
detail=create_error_response(
@@ -172,14 +203,17 @@ async def obtener_edoc(**kwargs):
# No guardaremos el archivo localmente por seguridad
logger.debug(f"Procesando documento {numero_documento} para pedimento {pedimento_id}")
rest_response = await edocs_rest_controller.post_document(
rest_response = await edocs_rest_controller.post_or_update_document(
binary_content=pdf_bytes,
organizacion=organizacion,
pedimento=pedimento_id,
file_name=_file_name,
document_type=5
document_type=5,
identifier=numero_documento,
)
print(f"rest_response >>>> {rest_response}")
if rest_response is None:
logger.error("Error al enviar el documento a la API interna")
raise HTTPException(
@@ -239,9 +273,11 @@ async def obtener_edoc(**kwargs):
async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
# estaba acualizando mal el status de descarga, actualizaba otro campo que no le correspondia
data = {
"id": edoc.get("id"),
"acuse_descargado": status,
"edocument_descargado": status,
"edocument_estado": "descargado" if status else "pendiente",
"numero_edocument": edoc.get("numero_edocument"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
@@ -249,45 +285,75 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
if response is None:
logger.error(f"No se pudo actualizar el estatus del EDocument {edoc.get('numero_edocument')} en la API")
raise Exception(f"Fallo al actualizar el estatus del EDocument {edoc.get('numero_edocument')}")
return response
async def marcar_error_edocument(edoc: dict, pedimento: dict, mensaje: str, definitivo: bool = False):
"""
Reporta un fallo de descarga al registro de negocio (T2026-05-027).
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
la transición a 'error'.
- definitivo=True (fallo permanente): transiciona de inmediato a 'error';
queda fuera del ciclo automático, solo reproceso manual o reset.
"""
data = {
"id": edoc.get("id"),
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
"numero_edocument": edoc.get("numero_edocument"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
}
if definitivo:
data["edocument_estado"] = "error"
response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
if response is None:
logger.error(f"No se pudo registrar el error del EDocument {edoc.get('numero_edocument')} en la API")
return response
NS = {
's': 'http://schemas.xmlsoap.org/soap/envelope/',
't': 'http://tempuri.org/',
'i': 'http://www.w3.org/2001/XMLSchema-instance'
}
# mejorar la extraccion de seccion File
def extract_pdf_bytes_from_xml_content(xml_content: str):
"""
Extrae el PDF y metadatos desde un string XML.
"""
root = ET.fromstring(xml_content)
file_elem = root.find('.//File')
if file_elem is None:
for elem in root.iter():
if elem.tag.endswith('File') and elem.text:
file_elem = elem
break
if file_elem is not None and file_elem.text and file_elem.text.strip():
errores = root.find('.//t:Errores', NS)
tiene_error = root.find('.//t:TieneError', NS)
if tiene_error is not None and tiene_error.text == 'true':
err_msg = errores.text if errores is not None else 'Error desconocido'
raise Exception(f"VUCEM informa error: {err_msg}")
file_elem = root.find('.//t:File', NS)
if file_elem is None or file_elem.get('{http://www.w3.org/2001/XMLSchema-instance}nil') == 'true' or not file_elem.text:
raise ValueError("No se encontró el tag <File> con contenido válido.")
base64_data = file_elem.text.strip().replace('\n', '').replace('\r', '')
pdf_bytes = base64.b64decode(base64_data)
# Extraer CadenaOriginal y SelloDigital con namespaces
cadena_original = None
sello_digital = None
cadena_elem = root.find('.//CadenaOriginal')
if cadena_elem is None:
for elem in root.iter():
if elem.tag.endswith('CadenaOriginal') and elem.text:
cadena_elem = elem
break
if cadena_elem is not None and cadena_elem.text:
cadena_original = cadena_elem.text.strip()
sello_elem = root.find('.//SelloDigital')
if sello_elem is None:
for elem in root.iter():
if elem.tag.endswith('SelloDigital') and elem.text:
sello_elem = elem
break
if sello_elem is not None and sello_elem.text:
sello_digital = sello_elem.text.strip()
cadena_elem = root.find('.//t:CadenaOriginal', NS)
if cadena_elem is not None and cadena_elem.get('{http://www.w3.org/2001/XMLSchema-instance}nil') != 'true':
cadena_original = cadena_elem.text.strip() if cadena_elem.text else None
sello_elem = root.find('.//t:SelloDigital', NS)
if sello_elem is not None and sello_elem.get('{http://www.w3.org/2001/XMLSchema-instance}nil') != 'true':
sello_digital = sello_elem.text.strip() if sello_elem.text else None
return {
"pdf_bytes": pdf_bytes,
"cadena_original": cadena_original,
"sello_digital": sello_digital
}
else:
raise ValueError("No se encontró el tag <File> con contenido válido. Verifique que el XML contiene el tag <File> con datos base64.")

View File

@@ -3,13 +3,18 @@ import logging
import time
from celery_app import celery_app
from typing import Dict
from fastapi import HTTPException as _HTTPException
from .services import obtener_edoc
from .services import obtener_edoc, marcar_error_edocument
from api.api_v2.modules.tasks.services import register_task, update_task
# Logger para el módulo
logger = logging.getLogger(__name__)
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
# incrementan el contador de intentos del backend (T2026-05-027)
WORKER_MAX_RETRIES = 2
@celery_app.task(bind=True)
def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
"""
@@ -21,13 +26,15 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
edoc_info = edoc_data.get('edoc', {})
edoc_number = edoc_info.get('numero_edoc', 'N/A')
# el mapeo de numero de documento no estaba correcto, se corrigio
edoc_number = edoc_info.get('numero_edocument', 'N/A')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Registrar el inicio de la tarea
# Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
if self.request.retries == 0:
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
loop.run_until_complete(
register_task(
@@ -36,7 +43,7 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}",
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=3 # 3 corresponde a "E-document"
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
)
)
@@ -52,7 +59,7 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
message=f"Descargando E-document {edoc_number} para pedimento {pedimento_app}",
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=3
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
)
)
@@ -67,7 +74,7 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
message=f"E-document {edoc_number} descargado exitosamente para pedimento {pedimento_app}",
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=3
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
)
)
@@ -78,6 +85,21 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message, exc_info=True)
# Reintento interno del worker para fallos transitorios (red, timeout):
# mismo intento orquestado, NO incrementa el contador del backend
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Fallo definitivo de este intento: registrar el detalle en el registro de
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
try:
loop.run_until_complete(
marcar_error_edocument(edoc_info, pedimento_info, error_message, definitivo=False)
)
except Exception as report_error:
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
try:
loop.run_until_complete(
update_task(
@@ -86,12 +108,14 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=3
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
)
)
except Exception as update_error:
logger.error(f"Error actualizando estado de tarea: {update_error}")
if isinstance(e, _HTTPException):
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
raise
finally:
# Cerrar el loop para liberar recursos

View File

@@ -59,12 +59,13 @@ async def consume_ws_get_partida(**kwargs):
file_name_request = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_REQUEST.xml"
document_response = await partida_rest_controller.post_document(
document_response = await partida_rest_controller.post_or_update_document(
soap_response=soap_xml,
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=file_name_request,
document_type=17, # Tipo de documento para petición de partidas
document_type=17,
identifier=f"_PT_{pedimento_app}_{partida.get('numero', '')}_",
)
except Exception as e:
logger.error(f"Error al enviar documento request: {e}")
@@ -94,26 +95,37 @@ async def consume_ws_get_partida(**kwargs):
)
)
# Una partida solo cuenta como descargada si la respuesta es una
# consultarPartidaRespuesta real sin <tieneError>true</tieneError>.
# Cualquier otro contenido (eco de la petición, fault sin tieneError,
# HTML de proxy, etc.) se guarda como ERROR y no marca descargado.
motivo_error = None
if soap_error(soap_response):
motivo_error = "La respuesta contiene un error de VUCEM"
elif "consultarpartidarespuesta" not in (soap_response.text or "").lower():
motivo_error = "La respuesta no contiene el nodo consultarPartidaRespuesta"
if motivo_error:
error_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_ERROR.xml"
try:
document_response = await partida_rest_controller.post_document(
document_response = await partida_rest_controller.post_or_update_document(
soap_response=soap_response,
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=error_file_name,
document_type=18,
identifier=f"_PT_{pedimento_app}_{partida.get('numero', '')}_",
)
except Exception as e:
logger.error(f"Error al guardar la respuesta de error: {e}")
# Continuamos con el error original
logger.error("Error en la respuesta del servicio SOAP")
logger.error(f"Error en la respuesta del servicio SOAP: {motivo_error}")
raise HTTPException(
status_code=500,
detail=create_error_response(
message="Error en la respuesta del servicio SOAP",
errors=["La respuesta contiene un error de VUCEM"],
errors=[motivo_error],
data={"soap_response": soap_response.text[:500] if hasattr(soap_response, 'text') else None},
metadata={
"partida_numero": partida.get('numero'),
@@ -127,12 +139,13 @@ async def consume_ws_get_partida(**kwargs):
# Enviar documento
_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}.xml"
try:
document_response = await partida_rest_controller.post_document(
document_response = await partida_rest_controller.post_or_update_document(
soap_response=soap_response,
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=_file_name,
document_type=1,
identifier=f"_PT_{pedimento_app}_{partida.get('numero', '')}_",
)
except Exception as e:
logger.error(f"Error al enviar documento: {e}")
@@ -148,6 +161,22 @@ async def consume_ws_get_partida(**kwargs):
)
)
# post_or_update_document retorna None en error (no lanza excepción);
# sin documento guardado la partida NO debe marcarse como descargada.
if not document_response:
logger.error(f"El documento de la partida no se guardó, no se marca descargado: {_file_name}")
raise HTTPException(
status_code=500,
detail=create_error_response(
message="Error al guardar el documento de la partida",
errors=["El API no confirmó el guardado del documento"],
metadata={
"file_name": _file_name,
"partida_numero": partida.get('numero')
}
)
)
logger.info("Documento enviado, actualizando status de Partida")
# Actualizar status de la partida

View File

@@ -4,6 +4,7 @@ import time
from celery import Celery
from celery_app import celery_app
from typing import Dict, Any
from fastapi import HTTPException as _HTTPException
from .services import consume_ws_get_partida
from api.api_v2.modules.tasks.services import register_task, update_task
@@ -89,6 +90,8 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
)
except Exception as update_error:
logger.error(f"Error al actualizar estado de tarea: {update_error}")
if isinstance(e, _HTTPException):
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
raise
finally:
# Limpiar el event loop

View File

@@ -259,13 +259,14 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
def _remesas(self, root: ET.Element) -> bool:
"""
Método para verificar si el pedimento tiene remesas.
Busca identificadores con clave 'RC' (REMESAS DE CONSOLIDADO).
Busca identificadores con clave 'PC' (PEDIMENTO CONSOLIDADO)
o 'RC' (REMESAS DE CONSOLIDADO).
Args:
root: Elemento raíz del XML.
Returns:
True si encuentra identificadores con clave 'RC', False en caso contrario.
True si encuentra identificadores con clave 'PC' o 'RC', False en caso contrario.
"""
namespaces = {
'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto',
@@ -281,8 +282,8 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces)
clave = clave_elem.text if clave_elem is not None else None
# Si encontramos una clave 'RC', el pedimento tiene remesas
if clave == 'RC':
# PC (consolidado) o RC (remesas de consolidado) indican remesas
if clave in ('PC', 'RC'):
return True
except Exception as e:
@@ -290,7 +291,7 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
print(f"Error procesando identificador para remesas: {e}")
continue
print("No se encontraron remesas (sin identificadores RC)")
print("No se encontraron remesas (sin identificadores PC/RC)")
return False
def _get_tipo_operacion(self, root: ET.Element) -> str:

View File

@@ -64,7 +64,7 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
file_name_request = f"VU_PC_{pedimento_data.get('pedimento_app', 'unknown')}_REQUEST.xml"
document_response = await pedimento_rest_controller.post_document(
document_response = await pedimento_rest_controller.post_or_update_document(
soap_response=soap_xml,
organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'),
@@ -90,8 +90,8 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
if soap_error(soap_response):
logger.error(f"Error en respuesta SOAP: {soap_response.text if hasattr(soap_response, 'text') else 'Sin detalles'}")
document_response = await pedimento_rest_controller.post_document(
soap_response=None,
document_response = await pedimento_rest_controller.post_or_update_document(
soap_response=soap_response,
organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'),
file_name=f"vu_PC_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml",
@@ -111,7 +111,7 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
# Enviar documento
try:
document_response = await pedimento_rest_controller.post_document(
document_response = await pedimento_rest_controller.post_or_update_document(
soap_response=soap_response,
organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'),

View File

@@ -2,6 +2,7 @@ from celery_app import celery_app
from .services import put_pedimento_data
import asyncio
import logging
from fastapi import HTTPException as _HTTPException
from ..tasks.services import register_task, update_task
logger = logging.getLogger(__name__)
@@ -86,6 +87,8 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
logger.error(f"Error actualizando estado de tarea: {update_error}")
# Re-lanzar la excepción para que Celery la marque como fallida
if isinstance(e, _HTTPException):
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
raise
finally:
loop.close()

View File

@@ -64,7 +64,7 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
try:
file_name_request = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_REQUEST.xml"
document_response = await remesa_rest_controller.post_document(
document_response = await remesa_rest_controller.post_or_update_document(
soap_response=soap_xml,
organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'),
@@ -87,10 +87,35 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
)
# Generar nombre de archivo
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml"
# "No hay información" NO es un error — VUCEM confirma que el pedimento no tiene remesas.
# No se crea documento de error; se corrige el flag en el pedimento.
_SIN_REMESAS = '<ns2:mensaje>No hay información para la búsqueda solicitada</ns2:mensaje>'
if hasattr(soap_response, 'text') and _SIN_REMESAS in soap_response.text:
pedimento_id = pedimento_data.get('id')
logger.info(
f"Pedimento {pedimento_data.get('pedimento_app')} no tiene remesas "
f"(confirmado por VUCEM). Corrigiendo flag remesas=False."
)
try:
await remesa_rest_controller._make_request_async(
'PATCH',
f'customs/pedimentos/{pedimento_id}/',
data={'remesas': False},
)
except Exception as patch_err:
logger.warning(f"No se pudo actualizar remesas=False para {pedimento_id}: {patch_err}")
return create_service_response(
message="Pedimento sin remesas confirmado por VUCEM",
data={"remesas": False},
metadata={"pedimento_app": pedimento_data.get('pedimento_app')},
)
if soap_error(soap_response):
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
document_response = await remesa_rest_controller.post_document(
document_response = await remesa_rest_controller.post_or_update_document(
soap_response=soap_response,
organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'),
@@ -98,25 +123,19 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
document_type=16,
)
# Aquí necesitamos extraer el mensaje de error real
error_message = "Error en la respuesta del servicio SOAP"
# Intentar extraer mensaje de error del XML de respuesta
if hasattr(soap_response, 'text') and soap_response.text:
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(soap_response.text)
# Buscar mensajes de error comunes en respuestas SOAP de VUCEM
# Esto puede variar según el servicio, pero comúnmente buscan:
for fault in root.findall('.//{http://schemas.xmlsoap.org/soap/envelope/}Fault'):
faultcode = fault.find('.//faultcode')
faultstring = fault.find('.//faultstring')
if faultstring is not None and faultstring.text:
error_message = faultstring.text
break
# También podría estar en una estructura de error específica de VUCEM
for error in root.findall('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}error'):
msg = error.find('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}message')
if msg is not None and msg.text:
@@ -126,7 +145,6 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
except Exception as parse_error:
logger.error(f"Error al parsear respuesta SOAP para extraer mensaje: {parse_error}")
# Lanzar excepción con el mensaje de error real
raise HTTPException(
status_code=500,
detail=f"Error en la respuesta del servicio SOAP: {error_message}"
@@ -134,12 +152,12 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
# Enviar documento
try:
document_response = await remesa_rest_controller.post_document(
document_response = await remesa_rest_controller.post_or_update_document(
soap_response=soap_response,
organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'),
file_name=file_name,
document_type=5,
document_type=3,
)
except Exception as e:
@@ -149,12 +167,14 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
# Extraer datos del XML
try:
remesas_data = remesa_xml_scraper.extract_remesas(soap_response.text)
logger.info(
f"Remesas extraídas para pedimento {pedimento_data.get('pedimento_app')}: "
f"{len(remesas_data)} COVEs encontrados → {remesas_data}"
)
except Exception as e:
logger.error(f"Error al extraer datos XML: {e}")
raise HTTPException(status_code=500, detail="Error al procesar respuesta XML")
logger.info(f"Remesa procesada exitosamente: {pedimento_data.get('pedimento')}")
return create_service_response(
@@ -202,12 +222,14 @@ async def post_remesa_data(**kwargs) -> Dict[str, Any]:
# Obtener datos del servicio web
try:
ws_data = await obtener_remesa(**kwargs)
result["documento"] = ws_data.get("documento", None)
xml_content = ws_data.get('xml_content', {})
# obtener_remesa devuelve create_service_response → los datos están bajo 'data'
ws_data_content = ws_data.get('data', {})
result["documento"] = ws_data_content.get("documento", None)
xml_content = ws_data_content.get('xml_content', [])
result["xml_content"] = xml_content
if not xml_content:
logger.warning("No se obtuvo contenido XML del servicio web")
logger.warning("No se obtuvo contenido XML del servicio web (lista de remesas vacía)")
return result
except HTTPException:
@@ -259,17 +281,8 @@ async def _process_coves_safely(kwargs: Dict[str, Any], xml_content) -> Optional
async def _post_coves(pedimento_data: Dict[str, Any], coves: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""
Envía COVEs al sistema REST.
Args:
pedimento_data: Datos del pedimento
coves: Lista de diccionarios con datos de COVE (comprobanteVE, remesaAgente, remesaSA)
Returns:
Lista de respuestas exitosas
Raises:
HTTPException: Si no se pudo procesar ningún COVE
Crea COVEs en el sistema REST usando patrón get-or-create para evitar duplicados.
Si el COVE ya existe (creado por pedimento completo u otra remesa), lo reutiliza.
"""
if not coves:
return []
@@ -278,23 +291,38 @@ async def _post_coves(pedimento_data: Dict[str, Any], coves: List[Dict[str, str]
errors = []
for cove in coves:
# Extraer el número de COVE del diccionario
numero_cove = cove.get('comprobanteVE')
if not numero_cove:
logger.warning(f"COVE sin comprobanteVE encontrado: {cove}")
continue
try:
# Verificar si el COVE ya existe antes de intentar crearlo
existing = await remesa_rest_controller._make_request_async(
'GET', f'customs/coves/?numero_cove={numero_cove}'
)
if existing:
results = existing.get('results', existing) if isinstance(existing, dict) else existing
if isinstance(results, list) and results:
logger.info(f"COVE {numero_cove} ya existe (id={results[0].get('id')}), omitiendo creación")
responses.append(results[0])
continue
# No existe → crear
document_data = {
'numero_cove': numero_cove,
'organizacion': pedimento_data.get('organizacion'),
'pedimento': pedimento_data.get('id')
'pedimento': pedimento_data.get('id'),
}
try:
response = await remesa_rest_controller.post_cove(document_data)
if response:
responses.append(response)
logger.debug(f"COVE {numero_cove} procesado exitosamente")
logger.info(f"COVE {numero_cove} creado exitosamente")
else:
error_msg = f"POST de COVE {numero_cove} devolvió respuesta vacía"
logger.warning(error_msg)
errors.append(error_msg)
except Exception as e:
error_msg = f"Error al procesar COVE {numero_cove}: {str(e)}"
logger.warning(error_msg)

View File

@@ -2,6 +2,7 @@ from celery_app import celery_app
from .services import post_remesa_data
import asyncio
import logging
from fastapi import HTTPException as _HTTPException
from ..tasks.services import register_task, update_task
logger = logging.getLogger(__name__)
@@ -19,9 +20,11 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
"""
task_id = self.request.id
servicio = 5 # Código para Pedimento Remesas
print(f"remesa_request >>>> {remesa_request}")
logger.info(f"remesa_request >>>> {remesa_request}")
pedimento_id = remesa_request.get('pedimento', {}).get('id')
organizacion_id = remesa_request.get('pedimento', {}).get('organizacion')
remesa_num = remesa_request.get('remesa', 'N/A')
remesa_num = remesa_request.get('pedimento', {}).get('pedimento_app', 'N/A')
# Crear un NUEVO event loop para esta tarea (evita problemas de loop cerrado)
loop = asyncio.new_event_loop()
@@ -96,6 +99,8 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
logger.error(f"Error actualizando estado de tarea: {update_error}")
# Re-lanzar la excepción para que Celery la marque como fallida
if isinstance(e, _HTTPException):
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
raise
finally:

View File

View File

@@ -0,0 +1,97 @@
import asyncio
import json
import logging
import os
import redis.asyncio as aioredis
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
router = APIRouter()
logger = logging.getLogger(__name__)
CHANNEL_PREFIX = "audit_task:"
STATE_PREFIX = "audit_task_state:"
HEARTBEAT_INTERVAL = 25 # segundos
MAX_STREAM_SECONDS = 7200 # 2 horas
def _redis_url() -> str:
# Usa el Redis compartido con Django backend para leer los eventos publicados por sus tasks.
host = os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost"))
port = os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", "6379"))
db = os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", "0"))
return f"redis://{host}:{port}/{db}"
@router.get("/stream/tasks/{task_id}")
async def stream_task_events(task_id: str, request: Request):
"""
SSE endpoint — el cliente se conecta y recibe eventos de progreso de la tarea
en tiempo real vía Redis Pub/Sub.
Cabeceras requeridas en Nginx upstream:
proxy_read_timeout 7200;
proxy_buffering off;
"""
async def event_generator():
r = aioredis.from_url(_redis_url(), decode_responses=True)
pubsub = r.pubsub()
try:
# Enviar estado actual si ya existe (cliente que llega tarde)
current_raw = await r.get(f"{STATE_PREFIX}{task_id}")
if current_raw:
current = json.loads(current_raw)
yield f"event: task_update\ndata: {current_raw}\n\n"
if current.get("status") in ("completed", "failed"):
return
await pubsub.subscribe(f"{CHANNEL_PREFIX}{task_id}")
tick = 0
while tick < MAX_STREAM_SECONDS:
if await request.is_disconnected():
break
try:
msg = await asyncio.wait_for(
pubsub.get_message(ignore_subscribe_messages=True),
timeout=1.0,
)
except asyncio.TimeoutError:
msg = None
if msg and msg["type"] == "message":
data_str = msg["data"]
yield f"event: task_update\ndata: {data_str}\n\n"
parsed = json.loads(data_str)
if parsed.get("status") in ("completed", "failed"):
break
tick += 1
if tick % HEARTBEAT_INTERVAL == 0:
yield f"event: heartbeat\ndata: {{}}\n\n"
except asyncio.CancelledError:
pass
except Exception as exc:
logger.error(f"[stream] Error en SSE para tarea {task_id}: {exc}")
yield f"event: error\ndata: {json.dumps({'message': str(exc)})}\n\n"
finally:
try:
await pubsub.unsubscribe(f"{CHANNEL_PREFIX}{task_id}")
except Exception:
pass
await r.aclose()
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # deshabilita buffering en Nginx
"Connection": "keep-alive",
},
)

View File

@@ -4,6 +4,8 @@ from typing import Dict, Any
from fastapi import HTTPException
from ..common import create_error_response
from core.config import settings
from core.redis_events import publish_task_event
logger = logging.getLogger(__name__)
async def update_task(
@@ -63,6 +65,7 @@ async def update_task(
)
response.raise_for_status()
publish_task_event(task_id, status, message)
return response.json()
except httpx.HTTPError as e:

View File

@@ -1,6 +1,8 @@
from celery import Celery
from core.config import settings
import os
from celery.schedules import crontab
from datetime import timedelta
# Configuración de Celery
celery_app = Celery(
@@ -32,6 +34,15 @@ celery_app.conf.update(
task_save_success_on_complete=True # Guarda los resultados exitosos
)
# desde aqui se pueden programar tareas pero al estar desde microservicios no tengo acceso a la informacion de organizacion
# por este motivo es mejor programar las tareas desde la aplicacion de django
celery_app.conf.beat_schedule = {
# 'prueba-4': {
# 'task': 'tasks.prueba_hola',
# 'schedule': timedelta(seconds=60), # 12:00 = 6:03 AM
# },
}
# Autodiscovery of tasks
celery_app.autodiscover_tasks()

View File

@@ -58,6 +58,8 @@ class APIRESTController:
response = await client.post(url, json=data, headers=self.headers)
elif method.upper() == 'PUT':
response = await client.put(url, json=data, headers=self.headers)
elif method.upper() == 'PATCH':
response = await client.patch(url, json=data, headers=self.headers)
elif method.upper() == 'DELETE':
response = await client.delete(url, headers=self.headers)
else:
@@ -91,6 +93,61 @@ class APIRESTController:
logger.error(traceback.format_exc())
return None
async def post_or_update_document(
self, soap_response=None, organizacion: str = None,
pedimento: str = None, file_name: str = None,
document_type: int = None, fuente: int = 2,
identifier: str = None, binary_content: bytes = None,
) -> Dict[str, Any]:
"""
Guarda un documento VU (request, error o respuesta). Crea PRIMERO el
documento nuevo y, solo si se guardó con éxito, elimina los previos del
mismo tipo/pedimento/identificador. Así se evitan duplicados en
ejecuciones recurrentes sin perder el documento anterior cuando el
guardado del nuevo falla (antes se borraba el previo antes del POST).
identifier: cadena única del documento dentro del pedimento
(ej: número de COVE, número de e-document). Se usa como
filtro archivo__icontains para distinguir documentos del
mismo tipo pertenecientes a distintos COVEs o edocs.
"""
new_document = await self.post_document(
soap_response=soap_response,
binary_content=binary_content,
organizacion=organizacion,
pedimento=pedimento,
file_name=file_name,
document_type=document_type,
fuente=fuente,
)
if not new_document:
# POST falló: conservar los documentos previos intactos
logger.error(f"post_or_update_document: no se guardó el documento nuevo (document_type={document_type}, identifier={identifier}); se conservan los previos")
return new_document
new_id = new_document.get('id') if isinstance(new_document, dict) else None
if not new_id:
logger.warning(f"post_or_update_document: respuesta sin id, se omite limpieza de previos (document_type={document_type}, identifier={identifier})")
return new_document
try:
query = f'record/documents/?pedimento={pedimento}&document_type={document_type}'
if identifier:
query += f'&archivo__icontains={identifier}'
existing = await self._make_request_async('GET', query)
if existing:
results = existing.get('results', existing) if isinstance(existing, dict) else existing
if isinstance(results, list):
for previous in results:
previous_id = previous.get('id')
if previous_id and previous_id != new_id:
await self._make_request_async('DELETE', f'record/documents/{previous_id}/')
logger.info(f"Documento VU previo eliminado: id={previous_id}, document_type={document_type}, identifier={identifier}")
except Exception as e:
logger.warning(f"No se pudo verificar/eliminar documento VU existente (document_type={document_type}, identifier={identifier}): {e}")
return new_document
async def put_procesamiento(self, service_id: int, data: Dict[str, Any]) -> Dict[str, Any]:
return await self._make_request_async('PUT', f'customs/procesamientopedimentos/{service_id}/', data=data)
@@ -464,6 +521,9 @@ class APIController:
"""
return await self._make_request_async('PUT', f'customs/edocuments/{edocument_id}/', data=data)
async def put_cove(self, cove_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
return await self._make_request_async('PUT', f'customs/coves/{cove_id}/', data=data)
async def get_key(self, id: str) -> bytes:
result = await self._make_request_async('GET', f'vucem/vucem/{id}/download_key/', return_bytes=True)
@@ -510,6 +570,8 @@ class APIController:
print(f"response >>>> {response}")
elif method.upper() == 'PUT':
response = await client.put(url, json=data, headers=self.headers)
elif method.upper() == 'PATCH':
response = await client.patch(url, json=data, headers=self.headers)
elif method.upper() == 'DELETE':
response = await client.delete(url, headers=self.headers)
else:

View File

@@ -135,13 +135,14 @@ class XMLScraper: # Clase me extrae datos de Pedimento
def _remesas(self, root: ET.Element) -> bool:
"""
Método para verificar si el pedimento tiene remesas.
Busca identificadores con clave 'RC' (REMESAS DE CONSOLIDADO).
Busca identificadores con clave 'PC' (PEDIMENTO CONSOLIDADO)
o 'RC' (REMESAS DE CONSOLIDADO).
Args:
root: Elemento raíz del XML.
Returns:
True si encuentra identificadores con clave 'RC', False en caso contrario.
True si encuentra identificadores con clave 'PC' o 'RC', False en caso contrario.
"""
namespaces = {
'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto',
@@ -157,8 +158,8 @@ class XMLScraper: # Clase me extrae datos de Pedimento
clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces)
clave = clave_elem.text if clave_elem is not None else None
# Si encontramos una clave 'RC', el pedimento tiene remesas
if clave == 'RC':
# PC (consolidado) o RC (remesas de consolidado) indican remesas
if clave in ('PC', 'RC'):
return True
except Exception as e:
@@ -166,7 +167,7 @@ class XMLScraper: # Clase me extrae datos de Pedimento
print(f"Error procesando identificador para remesas: {e}")
continue
print("No se encontraron remesas (sin identificadores RC)")
print("No se encontraron remesas (sin identificadores PC/RC)")
return False
def _get_tipo_operacion(self, root: ET.Element) -> str:

44
core/redis_events.py Normal file
View File

@@ -0,0 +1,44 @@
import json
import os
import redis
import logging
logger = logging.getLogger(__name__)
CHANNEL_PREFIX = "audit_task:"
STATE_PREFIX = "audit_task_state:"
STATE_TTL = 7200 # 2 horas
def _get_sync_redis():
# REDIS_PUBSUB_HOST apunta al Redis compartido con el backend Django.
# En dev: redis_backend_dev. Por defecto usa REDIS_HOST (para entornos con una sola instancia).
return redis.Redis(
host=os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost")),
port=int(os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", 6379))),
db=int(os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", 0))),
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2,
)
def publish_task_event(task_id: str, status: str, message: str = "", resultado: dict = None, progress: int = None):
"""
Publica un evento de progreso de tarea en Redis Pub/Sub.
Guarda el último estado en una key con TTL para clientes que se conectan tarde.
"""
payload: dict = {"task_id": task_id, "status": status, "message": message}
if resultado is not None:
payload["resultado"] = resultado
if progress is not None:
payload["progress"] = progress
try:
client = _get_sync_redis()
serialized = json.dumps(payload)
client.publish(f"{CHANNEL_PREFIX}{task_id}", serialized)
client.setex(f"{STATE_PREFIX}{task_id}", STATE_TTL, serialized)
client.close()
except Exception as exc:
logger.error(f"[redis_events] Error publicando evento para tarea {task_id}: {exc}")

View File

@@ -125,9 +125,12 @@ def pedimento_completo_task(self, request_data: Dict[str, Any]):
# Subir documento de pedimento completo si la petición fue exitosa
try:
# solucion al error de descarga de un e-document, el mapeo de identificador no llegaba y ni siquiera insertaba registros en
# la tabla
identificadores_ed_response = xml_content.get('identificadores_ed', []) if 'xml_content' in locals() else []
upload_result = await _post_edocuments(
response_service=service_data,
identificadores_ed=[soap_response.get('documento', {}).get('numero_edocument')]
identificadores_ed=identificadores_ed_response
)
logger.info(f"Documento de pedimento completo subido exitosamente: {upload_result}")
except Exception as upload_err:
@@ -626,6 +629,19 @@ def acuse_task(self, **kwargs):
documento_info["documento"] = soap_response.get('documento', {})
documentos_exitosos += 1
logger.info(f"Acuse del documento {idx + 1} procesado exitosamente")
try:
await rest_controller.put_edocument(
edocument_id=edoc['id'],
data={
"id": edoc['id'],
"acuse_descargado": True,
"numero_edocument": edoc.get('numero_edocument'),
"pedimento": service_data['pedimento']['id'],
"organizacion": service_data['organizacion'],
}
)
except Exception as status_err:
logger.warning(f"Error actualizando acuse_descargado para edoc {edoc.get('numero_edocument')}: {status_err}")
else:
documento_info["error"] = "Error en petición SOAP"
logger.warning(f"No se pudo procesar el acuse del documento {idx + 1}")
@@ -832,28 +848,19 @@ def edocument_task(self, **kwargs):
documento_info["documento"] = soap_response.get('documento', {})
documentos_exitosos += 1
logger.info(f"E-document {idx + 1} procesado exitosamente")
# Subir el documento si la petición fue exitosa
try:
upload_result = await _post_edocuments(
response_service=service_data,
identificadores_ed=[edoc.get('numero_edocument')]
await rest_controller.put_edocument(
edocument_id=edoc['id'],
data={
"id": edoc['id'],
"edocument_descargado": True,
"numero_edocument": edoc.get('numero_edocument'),
"pedimento": service_data['pedimento']['id'],
"organizacion": service_data['organizacion'],
}
)
documento_info["upload_result"] = upload_result
logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente")
except Exception as upload_err:
documento_info["upload_error"] = str(upload_err)
logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}")
# Subir el documento si la petición fue exitosa
try:
upload_result = await _post_edocuments(
response_service=service_data,
identificadores_ed=[edoc.get('numero_edocument')]
)
documento_info["upload_result"] = upload_result
logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente")
except Exception as upload_err:
documento_info["upload_error"] = str(upload_err)
logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}")
except Exception as status_err:
logger.warning(f"Error actualizando edocument_descargado para edoc {edoc.get('numero_edocument')}: {status_err}")
else:
documento_info["error"] = "Error en petición SOAP"
logger.warning(f"No se pudo procesar el e-document {idx + 1}")
@@ -974,6 +981,19 @@ def coves_task(self, **kwargs):
documento_info["documento"] = soap_response.get('documento', {})
documentos_exitosos += 1
logger.info(f"cove del documento {idx + 1} procesado exitosamente")
try:
await rest_controller.put_cove(
cove_id=cove['id'],
data={
"id": cove['id'],
"cove_descargado": True,
"numero_cove": cove.get('numero_cove'),
"pedimento": service_data['pedimento']['id'],
"organizacion": service_data['organizacion'],
}
)
except Exception as status_err:
logger.warning(f"Error actualizando cove_descargado para cove {cove.get('numero_cove')}: {status_err}")
else:
documento_info["error"] = "Error en petición SOAP"
logger.warning(f"No se pudo procesar el cove del documento {idx + 1}")
@@ -1115,17 +1135,19 @@ def acuse_cove_task(self, **kwargs):
documento_info["documento"] = soap_response.get('documento', {})
documentos_exitosos += 1
logger.info(f"Acuse de COVE {idx + 1} procesado exitosamente")
# Subir el documento de COVE si la petición fue exitosa
try:
upload_result = await _post_coves(
response_service=service_data,
identificadores_cove=[cove.get('numero_cove')]
await rest_controller.put_cove(
cove_id=cove['id'],
data={
"id": cove['id'],
"acuse_cove_descargado": True,
"numero_cove": cove.get('numero_cove'),
"pedimento": service_data['pedimento']['id'],
"organizacion": service_data['organizacion'],
}
)
documento_info["upload_result"] = upload_result
logger.info(f"Documento COVE {cove.get('numero_cove')} subido exitosamente")
except Exception as upload_err:
documento_info["upload_error"] = str(upload_err)
logger.error(f"Error al subir documento COVE {cove.get('numero_cove')}: {upload_err}")
except Exception as status_err:
logger.warning(f"Error actualizando acuse_cove_descargado para cove {cove.get('numero_cove')}: {status_err}")
else:
documento_info["error"] = "Error en petición SOAP"
logger.warning(f"No se pudo procesar el acuse de COVE {idx + 1}")
@@ -1166,3 +1188,12 @@ def acuse_cove_task(self, **kwargs):
raise e
return run_async_task(_execute_acuse_cove)
# @celery_app.task(name='tasks.prueba_hola')
# def prueba_hola():
# """
# Tarea de prueba: solo imprime un saludo para verificar la programación.
# """
# logger.info("¡Hola! Tarea programada ejecutada correctamente.")
# print("¡Hola desde la tarea programada!")
# return "OK"

View File

@@ -8,9 +8,8 @@ def soap_error(soap_response): # Testeado
Returns:
bool: True si no hay errores, False en caso contrario
"""
if '<ns2:tieneError>true</ns2:tieneError>' in soap_response.text:
return True
if '<ns3:tieneError>true</ns3:tieneError>' in soap_response.text:
# Cubre cualquier variante de namespace y capitalización (tieneError, TieneError, ns2:tieneError, etc.)
if 'tieneerror>true<' in soap_response.text.lower():
return True
if "<mensaje>El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo</mensaje>" in soap_response.text:
return True

View File

@@ -603,7 +603,7 @@ async def get_soap_acuse(credenciales, response_service, soap_controller, edocum
no_partidas = response_service['pedimento'].get('numero_partidas', 0)
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
pedimento = response_service['pedimento'].get('pedimento', 'N/A')
_file_name = f"vu_AC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{idx}.pdf"
_file_name = f"vu_AC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{edocument['numero_edocument']}.pdf"
# Enviar el documento PDF usando binary_content
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")
@@ -710,7 +710,7 @@ async def get_soap_acuseCOVE(credenciales, response_service, soap_controller, co
no_partidas = response_service['pedimento'].get('numero_partidas', 0)
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
pedimento = response_service['pedimento'].get('pedimento', 'N/A')
_file_name = f"vu_AC_COVE_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{idx}.pdf"
_file_name = f"vu_AC_COVE_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{cove['numero_cove']}.pdf"
# Enviar el documento PDF usando binary_content
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")
@@ -884,7 +884,7 @@ async def get_soap_edocument(credenciales, response_service, soap_controller, ed
no_partidas = response_service['pedimento'].get('numero_partidas', 0)
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
pedimento = response_service['pedimento'].get('pedimento', 'N/A')
_file_name = f"vu_EDC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{idx}.pdf"
_file_name = f"vu_EDC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{edocument['numero_edocument']}.pdf"
# Enviar el documento PDF usando binary_content
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")