5 Commits

16 changed files with 475 additions and 94 deletions

View File

@@ -9,6 +9,7 @@ from api.api_v2.modules.edocs.routers import router as edocs_router
from api.api_v2.modules.partidas.routers import router as partidas_router from api.api_v2.modules.partidas.routers import router as partidas_router
from api.api_v2.modules.pedimentos.routers import router as pedimentos_router from api.api_v2.modules.pedimentos.routers import router as pedimentos_router
from api.api_v2.modules.remesas.routers import router as remesas_router from api.api_v2.modules.remesas.routers import router as remesas_router
from api.api_v2.modules.stream.routers import router as stream_router
api_router = APIRouter() api_router = APIRouter()
@@ -20,3 +21,4 @@ api_router.include_router(edocs_router, tags=["EDocuments"])
api_router.include_router(partidas_router, tags=["Partidas"]) api_router.include_router(partidas_router, tags=["Partidas"])
api_router.include_router(pedimentos_router, tags=["Pedimentos"]) api_router.include_router(pedimentos_router, tags=["Pedimentos"])
api_router.include_router(remesas_router, tags=["Remesas"]) api_router.include_router(remesas_router, tags=["Remesas"])
api_router.include_router(stream_router, tags=["Stream"])

View File

@@ -203,6 +203,7 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
data = { data = {
"id": edoc.get("id"), "id": edoc.get("id"),
"acuse_descargado": status, "acuse_descargado": status,
"acuse_estado": "descargado" if status else "pendiente",
"numero_edocument": edoc.get("numero_edocument"), "numero_edocument": edoc.get("numero_edocument"),
"pedimento": pedimento.get("id"), "pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"), "organizacion": pedimento.get("organizacion"),
@@ -210,6 +211,36 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data) response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
if response is None:
logger.error(f"No se pudo actualizar el estatus del acuse del EDocument {edoc.get('numero_edocument')} en la API")
raise Exception(f"Fallo al actualizar el estatus del acuse del EDocument {edoc.get('numero_edocument')}")
return response
async def marcar_error_acuse(edoc: dict, pedimento: dict, mensaje: str, definitivo: bool = False):
"""
Reporta un fallo de descarga del acuse al registro de negocio (T2026-05-027).
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
la transición a 'error'.
- definitivo=True (fallo permanente): transiciona de inmediato a 'error';
queda fuera del ciclo automático, solo reproceso manual o reset.
"""
data = {
"id": edoc.get("id"),
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
"numero_edocument": edoc.get("numero_edocument"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
}
if definitivo:
data["acuse_estado"] = "error"
response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
if response is None:
logger.error(f"No se pudo registrar el error del acuse del EDocument {edoc.get('numero_edocument')} en la API")
return response return response
def _decode_acuse_base64_content(base64_content): # Testeado def _decode_acuse_base64_content(base64_content): # Testeado

View File

@@ -6,9 +6,13 @@ from typing import Dict, Any
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from fastapi import HTTPException from fastapi import HTTPException
from .services import obtener_acuse from .services import obtener_acuse, marcar_error_acuse
from api.api_v2.modules.tasks.services import register_task, update_task from api.api_v2.modules.tasks.services import register_task, update_task
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
# incrementan el contador de intentos del backend (T2026-05-027)
WORKER_MAX_RETRIES = 2
@celery_app.task(bind=True) @celery_app.task(bind=True)
def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]: def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]:
@@ -20,23 +24,25 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
pedimento_id = pedimento_info.get('id') pedimento_id = pedimento_info.get('id')
organizacion_id = pedimento_info.get('organizacion') organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A') pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
edoc_info = acuse_request.get('edoc', {})
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}") if self.request.retries == 0:
loop.run_until_complete( logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}")
register_task( loop.run_until_complete(
task_id=task_id, register_task(
status="submitted", task_id=task_id,
message=f"Iniciando proceso de acuse para pedimento {pedimento_app}", status="submitted",
pedimento_id=pedimento_id, message=f"Iniciando proceso de acuse para pedimento {pedimento_app}",
organizacion_id=organizacion_id, pedimento_id=pedimento_id,
servicio=6 # 6 corresponde a "Acuse" organizacion_id=organizacion_id,
servicio=6 # 6 corresponde a "Acuse"
)
) )
)
# Esperar un momento breve para asegurar que el registro se complete # Esperar un momento breve para asegurar que el registro se complete
import time import time
@@ -76,6 +82,22 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
# En caso de error, actualizar estado # En caso de error, actualizar estado
error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}"
logging.error(error_message) logging.error(error_message)
# Reintento interno del worker para fallos transitorios (red, timeout):
# mismo intento orquestado, NO incrementa el contador del backend
if not isinstance(e, HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Fallo definitivo de este intento: registrar el detalle en el registro de
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
try:
loop.run_until_complete(
marcar_error_acuse(edoc_info, pedimento_info, error_message, definitivo=False)
)
except Exception as report_error:
logging.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
try: try:
loop.run_until_complete( loop.run_until_complete(
update_task( update_task(

View File

@@ -146,6 +146,13 @@ async def consume_ws_get_cove(**kwargs):
logger.error(f"Error detectado en la respuesta SOAP: {str(e)}") logger.error(f"Error detectado en la respuesta SOAP: {str(e)}")
raise Exception(f"Error en la respuesta SOAP: {str(e)}") raise Exception(f"Error en la respuesta SOAP: {str(e)}")
# Validar que el documento quedó persistido en la API antes de marcar el
# COVE como descargado (T2026-05-027): post_or_update_document retorna
# None en error sin lanzar excepción
if not document_response or document_response.get("id") is None:
logger.error(f"No se pudo persistir el COVE {cove} en la API; el estatus no se actualiza")
raise Exception(f"No se pudo persistir el COVE {cove} en la API")
logger.info("Documento enviado, actualizando status de COVE") logger.info("Documento enviado, actualizando status de COVE")
# Actualizar status del COVE # Actualizar status del COVE
@@ -329,6 +336,15 @@ async def consume_ws_get_acuse_cove(**kwargs):
identifier=cove_identifier, identifier=cove_identifier,
) )
# Validar la subida antes de marcar el acuse como descargado (T2026-05-027):
# post_or_update_document retorna None en error sin lanzar excepción
if not rest_response or rest_response.get("id") is None:
logger.error("Error al enviar el acuse de COVE a la API interna; el estatus no se actualiza")
raise HTTPException(
status_code=500,
detail="No se pudo persistir el acuse del COVE en la API; el estatus no se actualiza"
)
acuse_status = await change_acuse_status( acuse_status = await change_acuse_status(
cove=kwargs.get('cove'), cove=kwargs.get('cove'),
status=True, status=True,
@@ -507,6 +523,7 @@ async def change_cove_status(cove: dict, status: bool, pedimento: dict):
data = { data = {
"id": cove.get("id"), "id": cove.get("id"),
"cove_descargado": status, "cove_descargado": status,
"cove_estado": "descargado" if status else "pendiente",
"numero_cove": cove.get("cove"), "numero_cove": cove.get("cove"),
"pedimento": pedimento.get("id"), "pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"), "organizacion": pedimento.get("organizacion"),
@@ -514,6 +531,11 @@ async def change_cove_status(cove: dict, status: bool, pedimento: dict):
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data) response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
if response is None:
logger.error(f"No se pudo actualizar el estatus del COVE {cove.get('cove')} en la API")
raise Exception(f"Fallo al actualizar el estatus del COVE {cove.get('cove')}")
return response return response
@@ -521,14 +543,46 @@ async def change_acuse_status(cove: dict, status: bool, pedimento: dict):
data = { data = {
"id": cove.get("id"), "id": cove.get("id"),
"acuse_cove_descargado": status, "acuse_cove_descargado": status,
"acuse_cove_estado": "descargado" if status else "pendiente",
"numero_cove": cove.get("cove"), "numero_cove": cove.get("cove"),
"pedimento": pedimento.get("id"), "pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"), "organizacion": pedimento.get("organizacion"),
} }
print(data)
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data) response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
if response is None:
logger.error(f"No se pudo actualizar el estatus del acuse del COVE {cove.get('cove')} en la API")
raise Exception(f"Fallo al actualizar el estatus del acuse del COVE {cove.get('cove')}")
return response
async def marcar_error_cove(cove: dict, pedimento: dict, mensaje: str, acuse: bool = False, definitivo: bool = False):
"""
Reporta un fallo de descarga al registro de negocio (T2026-05-027).
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
la transición a 'error'.
- definitivo=True (fallo permanente: VUCEM responde que no existe, credencial
rechazada): transiciona de inmediato a 'error'; queda fuera del ciclo
automático y solo el reproceso manual o el reset lo reincorporan.
"""
campo = "acuse_cove_estado" if acuse else "cove_estado"
data = {
"id": cove.get("id"),
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
"numero_cove": cove.get("cove"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
}
if definitivo:
data[campo] = "error"
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
if response is None:
logger.error(f"No se pudo registrar el error del COVE {cove.get('cove')} en la API")
return response return response

View File

@@ -5,12 +5,16 @@ from celery_app import celery_app
from typing import Dict, Any from typing import Dict, Any
from fastapi import HTTPException as _HTTPException from fastapi import HTTPException as _HTTPException
from .services import consume_ws_get_cove, consume_ws_get_acuse_cove from .services import consume_ws_get_cove, consume_ws_get_acuse_cove, marcar_error_cove
from api.api_v2.modules.tasks.services import update_task, register_task from api.api_v2.modules.tasks.services import update_task, register_task
# Logger para el módulo # Logger para el módulo
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
# incrementan el contador de intentos del backend (T2026-05-027)
WORKER_MAX_RETRIES = 2
@celery_app.task(bind=True) @celery_app.task(bind=True)
def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
@@ -27,18 +31,19 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
cove_number = cove_info.get('cove', 'N/A') cove_number = cove_info.get('cove', 'N/A')
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
logger.info(f"[COVE] Registrando inicio de tarea {task_id}") if self.request.retries == 0:
asyncio.run( logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
register_task( asyncio.run(
task_id=task_id, register_task(
status="submitted", task_id=task_id,
message=f"Iniciando proceso de COVE {cove_number} para pedimento {pedimento_app}", status="submitted",
pedimento_id=pedimento_id, message=f"Iniciando proceso de COVE {cove_number} para pedimento {pedimento_app}",
organizacion_id=organizacion_id, pedimento_id=pedimento_id,
servicio=8 # 8 corresponde a "Cove" organizacion_id=organizacion_id,
servicio=8 # 8 corresponde a "Cove"
)
) )
)
logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}") logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}")
asyncio.run( asyncio.run(
@@ -71,6 +76,21 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message) logger.error(error_message)
# Reintento interno del worker para fallos transitorios (red, timeout):
# mismo intento orquestado, NO incrementa el contador del backend
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Fallo definitivo de este intento: registrar el detalle en el registro de
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
try:
asyncio.run(
marcar_error_cove(cove_info, pedimento_info, error_message, acuse=False, definitivo=False)
)
except Exception as report_error:
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
try: try:
asyncio.run( asyncio.run(
update_task( update_task(
@@ -105,18 +125,19 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
cove_number = cove_info.get('cove', 'N/A') cove_number = cove_info.get('cove', 'N/A')
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}") if self.request.retries == 0:
asyncio.run( logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
register_task( asyncio.run(
task_id=task_id, register_task(
status="submitted", task_id=task_id,
message=f"Iniciando proceso de acuse de COVE {cove_number} para pedimento {pedimento_app}", status="submitted",
pedimento_id=pedimento_id, message=f"Iniciando proceso de acuse de COVE {cove_number} para pedimento {pedimento_app}",
organizacion_id=organizacion_id, pedimento_id=pedimento_id,
servicio=9 # 9 corresponde a "Acuse Cove" organizacion_id=organizacion_id,
servicio=9 # 9 corresponde a "Acuse Cove"
)
) )
)
logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}") logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}")
asyncio.run( asyncio.run(
@@ -149,6 +170,21 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message) logger.error(error_message)
# Reintento interno del worker para fallos transitorios (red, timeout):
# mismo intento orquestado, NO incrementa el contador del backend
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Fallo definitivo de este intento: registrar el detalle en el registro de
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
try:
asyncio.run(
marcar_error_cove(cove_info, pedimento_info, error_message, acuse=True, definitivo=False)
)
except Exception as report_error:
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
try: try:
asyncio.run( asyncio.run(
update_task( update_task(

View File

@@ -277,6 +277,7 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
data = { data = {
"id": edoc.get("id"), "id": edoc.get("id"),
"edocument_descargado": status, "edocument_descargado": status,
"edocument_estado": "descargado" if status else "pendiente",
"numero_edocument": edoc.get("numero_edocument"), "numero_edocument": edoc.get("numero_edocument"),
"pedimento": pedimento.get("id"), "pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"), "organizacion": pedimento.get("organizacion"),
@@ -284,6 +285,36 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data) response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
if response is None:
logger.error(f"No se pudo actualizar el estatus del EDocument {edoc.get('numero_edocument')} en la API")
raise Exception(f"Fallo al actualizar el estatus del EDocument {edoc.get('numero_edocument')}")
return response
async def marcar_error_edocument(edoc: dict, pedimento: dict, mensaje: str, definitivo: bool = False):
"""
Reporta un fallo de descarga al registro de negocio (T2026-05-027).
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
la transición a 'error'.
- definitivo=True (fallo permanente): transiciona de inmediato a 'error';
queda fuera del ciclo automático, solo reproceso manual o reset.
"""
data = {
"id": edoc.get("id"),
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
"numero_edocument": edoc.get("numero_edocument"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
}
if definitivo:
data["edocument_estado"] = "error"
response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
if response is None:
logger.error(f"No se pudo registrar el error del EDocument {edoc.get('numero_edocument')} en la API")
return response return response

View File

@@ -5,12 +5,16 @@ from celery_app import celery_app
from typing import Dict from typing import Dict
from fastapi import HTTPException as _HTTPException from fastapi import HTTPException as _HTTPException
from .services import obtener_edoc from .services import obtener_edoc, marcar_error_edocument
from api.api_v2.modules.tasks.services import register_task, update_task from api.api_v2.modules.tasks.services import register_task, update_task
# Logger para el módulo # Logger para el módulo
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
# incrementan el contador de intentos del backend (T2026-05-027)
WORKER_MAX_RETRIES = 2
@celery_app.task(bind=True) @celery_app.task(bind=True)
def process_edoc_download_request(self, edoc_data: Dict) -> Dict: def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
""" """
@@ -29,18 +33,19 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}") if self.request.retries == 0:
loop.run_until_complete( logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
register_task( loop.run_until_complete(
task_id=task_id, register_task(
status="submitted", task_id=task_id,
message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}", status="submitted",
pedimento_id=pedimento_id, message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}",
organizacion_id=organizacion_id, pedimento_id=pedimento_id,
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents organizacion_id=organizacion_id,
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
)
) )
)
# Esperar un momento breve para asegurar que el registro se complete # Esperar un momento breve para asegurar que el registro se complete
time.sleep(1) time.sleep(1)
@@ -80,6 +85,21 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message, exc_info=True) logger.error(error_message, exc_info=True)
# Reintento interno del worker para fallos transitorios (red, timeout):
# mismo intento orquestado, NO incrementa el contador del backend
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Fallo definitivo de este intento: registrar el detalle en el registro de
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
try:
loop.run_until_complete(
marcar_error_edocument(edoc_info, pedimento_info, error_message, definitivo=False)
)
except Exception as report_error:
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
try: try:
loop.run_until_complete( loop.run_until_complete(
update_task( update_task(

View File

@@ -95,7 +95,17 @@ async def consume_ws_get_partida(**kwargs):
) )
) )
# Una partida solo cuenta como descargada si la respuesta es una
# consultarPartidaRespuesta real sin <tieneError>true</tieneError>.
# Cualquier otro contenido (eco de la petición, fault sin tieneError,
# HTML de proxy, etc.) se guarda como ERROR y no marca descargado.
motivo_error = None
if soap_error(soap_response): if soap_error(soap_response):
motivo_error = "La respuesta contiene un error de VUCEM"
elif "consultarpartidarespuesta" not in (soap_response.text or "").lower():
motivo_error = "La respuesta no contiene el nodo consultarPartidaRespuesta"
if motivo_error:
error_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_ERROR.xml" error_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_ERROR.xml"
try: try:
document_response = await partida_rest_controller.post_or_update_document( document_response = await partida_rest_controller.post_or_update_document(
@@ -110,12 +120,12 @@ async def consume_ws_get_partida(**kwargs):
logger.error(f"Error al guardar la respuesta de error: {e}") logger.error(f"Error al guardar la respuesta de error: {e}")
# Continuamos con el error original # Continuamos con el error original
logger.error("Error en la respuesta del servicio SOAP") logger.error(f"Error en la respuesta del servicio SOAP: {motivo_error}")
raise HTTPException( raise HTTPException(
status_code=500, status_code=500,
detail=create_error_response( detail=create_error_response(
message="Error en la respuesta del servicio SOAP", message="Error en la respuesta del servicio SOAP",
errors=["La respuesta contiene un error de VUCEM"], errors=[motivo_error],
data={"soap_response": soap_response.text[:500] if hasattr(soap_response, 'text') else None}, data={"soap_response": soap_response.text[:500] if hasattr(soap_response, 'text') else None},
metadata={ metadata={
"partida_numero": partida.get('numero'), "partida_numero": partida.get('numero'),
@@ -151,6 +161,22 @@ async def consume_ws_get_partida(**kwargs):
) )
) )
# post_or_update_document retorna None en error (no lanza excepción);
# sin documento guardado la partida NO debe marcarse como descargada.
if not document_response:
logger.error(f"El documento de la partida no se guardó, no se marca descargado: {_file_name}")
raise HTTPException(
status_code=500,
detail=create_error_response(
message="Error al guardar el documento de la partida",
errors=["El API no confirmó el guardado del documento"],
metadata={
"file_name": _file_name,
"partida_numero": partida.get('numero')
}
)
)
logger.info("Documento enviado, actualizando status de Partida") logger.info("Documento enviado, actualizando status de Partida")
# Actualizar status de la partida # Actualizar status de la partida

View File

@@ -259,13 +259,14 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
def _remesas(self, root: ET.Element) -> bool: def _remesas(self, root: ET.Element) -> bool:
""" """
Método para verificar si el pedimento tiene remesas. Método para verificar si el pedimento tiene remesas.
Busca identificadores con clave 'RC' (REMESAS DE CONSOLIDADO). Busca identificadores con clave 'PC' (PEDIMENTO CONSOLIDADO)
o 'RC' (REMESAS DE CONSOLIDADO).
Args: Args:
root: Elemento raíz del XML. root: Elemento raíz del XML.
Returns: Returns:
True si encuentra identificadores con clave 'RC', False en caso contrario. True si encuentra identificadores con clave 'PC' o 'RC', False en caso contrario.
""" """
namespaces = { namespaces = {
'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto', 'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto',
@@ -281,8 +282,8 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces) clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces)
clave = clave_elem.text if clave_elem is not None else None clave = clave_elem.text if clave_elem is not None else None
# Si encontramos una clave 'RC', el pedimento tiene remesas # PC (consolidado) o RC (remesas de consolidado) indican remesas
if clave == 'RC': if clave in ('PC', 'RC'):
return True return True
except Exception as e: except Exception as e:
@@ -290,7 +291,7 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
print(f"Error procesando identificador para remesas: {e}") print(f"Error procesando identificador para remesas: {e}")
continue continue
print("No se encontraron remesas (sin identificadores RC)") print("No se encontraron remesas (sin identificadores PC/RC)")
return False return False
def _get_tipo_operacion(self, root: ET.Element) -> str: def _get_tipo_operacion(self, root: ET.Element) -> str:

View File

View File

@@ -0,0 +1,97 @@
import asyncio
import json
import logging
import os
import redis.asyncio as aioredis
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
router = APIRouter()
logger = logging.getLogger(__name__)
CHANNEL_PREFIX = "audit_task:"
STATE_PREFIX = "audit_task_state:"
HEARTBEAT_INTERVAL = 25 # segundos
MAX_STREAM_SECONDS = 7200 # 2 horas
def _redis_url() -> str:
# Usa el Redis compartido con Django backend para leer los eventos publicados por sus tasks.
host = os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost"))
port = os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", "6379"))
db = os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", "0"))
return f"redis://{host}:{port}/{db}"
@router.get("/stream/tasks/{task_id}")
async def stream_task_events(task_id: str, request: Request):
"""
SSE endpoint — el cliente se conecta y recibe eventos de progreso de la tarea
en tiempo real vía Redis Pub/Sub.
Cabeceras requeridas en Nginx upstream:
proxy_read_timeout 7200;
proxy_buffering off;
"""
async def event_generator():
r = aioredis.from_url(_redis_url(), decode_responses=True)
pubsub = r.pubsub()
try:
# Enviar estado actual si ya existe (cliente que llega tarde)
current_raw = await r.get(f"{STATE_PREFIX}{task_id}")
if current_raw:
current = json.loads(current_raw)
yield f"event: task_update\ndata: {current_raw}\n\n"
if current.get("status") in ("completed", "failed"):
return
await pubsub.subscribe(f"{CHANNEL_PREFIX}{task_id}")
tick = 0
while tick < MAX_STREAM_SECONDS:
if await request.is_disconnected():
break
try:
msg = await asyncio.wait_for(
pubsub.get_message(ignore_subscribe_messages=True),
timeout=1.0,
)
except asyncio.TimeoutError:
msg = None
if msg and msg["type"] == "message":
data_str = msg["data"]
yield f"event: task_update\ndata: {data_str}\n\n"
parsed = json.loads(data_str)
if parsed.get("status") in ("completed", "failed"):
break
tick += 1
if tick % HEARTBEAT_INTERVAL == 0:
yield f"event: heartbeat\ndata: {{}}\n\n"
except asyncio.CancelledError:
pass
except Exception as exc:
logger.error(f"[stream] Error en SSE para tarea {task_id}: {exc}")
yield f"event: error\ndata: {json.dumps({'message': str(exc)})}\n\n"
finally:
try:
await pubsub.unsubscribe(f"{CHANNEL_PREFIX}{task_id}")
except Exception:
pass
await r.aclose()
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # deshabilita buffering en Nginx
"Connection": "keep-alive",
},
)

View File

@@ -4,6 +4,8 @@ from typing import Dict, Any
from fastapi import HTTPException from fastapi import HTTPException
from ..common import create_error_response from ..common import create_error_response
from core.config import settings from core.config import settings
from core.redis_events import publish_task_event
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def update_task( async def update_task(
@@ -63,6 +65,7 @@ async def update_task(
) )
response.raise_for_status() response.raise_for_status()
publish_task_event(task_id, status, message)
return response.json() return response.json()
except httpx.HTTPError as e: except httpx.HTTPError as e:

View File

@@ -100,31 +100,18 @@ class APIRESTController:
identifier: str = None, binary_content: bytes = None, identifier: str = None, binary_content: bytes = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Guarda un documento VU (request o error). Si ya existe uno del mismo Guarda un documento VU (request, error o respuesta). Crea PRIMERO el
tipo/pedimento/identificador, elimina el anterior y crea el nuevo, documento nuevo y, solo si se guardó con éxito, elimina los previos del
evitando duplicados en ejecuciones recurrentes (tarea programada diaria). mismo tipo/pedimento/identificador. Así se evitan duplicados en
ejecuciones recurrentes sin perder el documento anterior cuando el
guardado del nuevo falla (antes se borraba el previo antes del POST).
identifier: cadena única del documento dentro del pedimento identifier: cadena única del documento dentro del pedimento
(ej: número de COVE, número de e-document). Se usa como (ej: número de COVE, número de e-document). Se usa como
filtro archivo__icontains para distinguir documentos del filtro archivo__icontains para distinguir documentos del
mismo tipo pertenecientes a distintos COVEs o edocs. mismo tipo pertenecientes a distintos COVEs o edocs.
""" """
try: new_document = await self.post_document(
query = f'record/documents/?pedimento={pedimento}&document_type={document_type}'
if identifier:
query += f'&archivo__icontains={identifier}'
existing = await self._make_request_async('GET', query)
if existing:
results = existing.get('results', existing) if isinstance(existing, dict) else existing
if isinstance(results, list) and results:
existing_id = results[0].get('id')
if existing_id:
await self._make_request_async('DELETE', f'record/documents/{existing_id}/')
logger.info(f"Documento VU previo eliminado: id={existing_id}, document_type={document_type}, identifier={identifier}")
except Exception as e:
logger.warning(f"No se pudo verificar/eliminar documento VU existente (document_type={document_type}, identifier={identifier}): {e}")
return await self.post_document(
soap_response=soap_response, soap_response=soap_response,
binary_content=binary_content, binary_content=binary_content,
organizacion=organizacion, organizacion=organizacion,
@@ -133,6 +120,33 @@ class APIRESTController:
document_type=document_type, document_type=document_type,
fuente=fuente, fuente=fuente,
) )
if not new_document:
# POST falló: conservar los documentos previos intactos
logger.error(f"post_or_update_document: no se guardó el documento nuevo (document_type={document_type}, identifier={identifier}); se conservan los previos")
return new_document
new_id = new_document.get('id') if isinstance(new_document, dict) else None
if not new_id:
logger.warning(f"post_or_update_document: respuesta sin id, se omite limpieza de previos (document_type={document_type}, identifier={identifier})")
return new_document
try:
query = f'record/documents/?pedimento={pedimento}&document_type={document_type}'
if identifier:
query += f'&archivo__icontains={identifier}'
existing = await self._make_request_async('GET', query)
if existing:
results = existing.get('results', existing) if isinstance(existing, dict) else existing
if isinstance(results, list):
for previous in results:
previous_id = previous.get('id')
if previous_id and previous_id != new_id:
await self._make_request_async('DELETE', f'record/documents/{previous_id}/')
logger.info(f"Documento VU previo eliminado: id={previous_id}, document_type={document_type}, identifier={identifier}")
except Exception as e:
logger.warning(f"No se pudo verificar/eliminar documento VU existente (document_type={document_type}, identifier={identifier}): {e}")
return new_document
async def put_procesamiento(self, service_id: int, data: Dict[str, Any]) -> Dict[str, Any]: async def put_procesamiento(self, service_id: int, data: Dict[str, Any]) -> Dict[str, Any]:
return await self._make_request_async('PUT', f'customs/procesamientopedimentos/{service_id}/', data=data) return await self._make_request_async('PUT', f'customs/procesamientopedimentos/{service_id}/', data=data)

View File

@@ -135,13 +135,14 @@ class XMLScraper: # Clase me extrae datos de Pedimento
def _remesas(self, root: ET.Element) -> bool: def _remesas(self, root: ET.Element) -> bool:
""" """
Método para verificar si el pedimento tiene remesas. Método para verificar si el pedimento tiene remesas.
Busca identificadores con clave 'RC' (REMESAS DE CONSOLIDADO). Busca identificadores con clave 'PC' (PEDIMENTO CONSOLIDADO)
o 'RC' (REMESAS DE CONSOLIDADO).
Args: Args:
root: Elemento raíz del XML. root: Elemento raíz del XML.
Returns: Returns:
True si encuentra identificadores con clave 'RC', False en caso contrario. True si encuentra identificadores con clave 'PC' o 'RC', False en caso contrario.
""" """
namespaces = { namespaces = {
'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto', 'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto',
@@ -157,8 +158,8 @@ class XMLScraper: # Clase me extrae datos de Pedimento
clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces) clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces)
clave = clave_elem.text if clave_elem is not None else None clave = clave_elem.text if clave_elem is not None else None
# Si encontramos una clave 'RC', el pedimento tiene remesas # PC (consolidado) o RC (remesas de consolidado) indican remesas
if clave == 'RC': if clave in ('PC', 'RC'):
return True return True
except Exception as e: except Exception as e:
@@ -166,7 +167,7 @@ class XMLScraper: # Clase me extrae datos de Pedimento
print(f"Error procesando identificador para remesas: {e}") print(f"Error procesando identificador para remesas: {e}")
continue continue
print("No se encontraron remesas (sin identificadores RC)") print("No se encontraron remesas (sin identificadores PC/RC)")
return False return False
def _get_tipo_operacion(self, root: ET.Element) -> str: def _get_tipo_operacion(self, root: ET.Element) -> str:

44
core/redis_events.py Normal file
View File

@@ -0,0 +1,44 @@
import json
import os
import redis
import logging
logger = logging.getLogger(__name__)
CHANNEL_PREFIX = "audit_task:"
STATE_PREFIX = "audit_task_state:"
STATE_TTL = 7200 # 2 horas
def _get_sync_redis():
# REDIS_PUBSUB_HOST apunta al Redis compartido con el backend Django.
# En dev: redis_backend_dev. Por defecto usa REDIS_HOST (para entornos con una sola instancia).
return redis.Redis(
host=os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost")),
port=int(os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", 6379))),
db=int(os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", 0))),
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2,
)
def publish_task_event(task_id: str, status: str, message: str = "", resultado: dict = None, progress: int = None):
"""
Publica un evento de progreso de tarea en Redis Pub/Sub.
Guarda el último estado en una key con TTL para clientes que se conectan tarde.
"""
payload: dict = {"task_id": task_id, "status": status, "message": message}
if resultado is not None:
payload["resultado"] = resultado
if progress is not None:
payload["progress"] = progress
try:
client = _get_sync_redis()
serialized = json.dumps(payload)
client.publish(f"{CHANNEL_PREFIX}{task_id}", serialized)
client.setex(f"{STATE_PREFIX}{task_id}", STATE_TTL, serialized)
client.close()
except Exception as exc:
logger.error(f"[redis_events] Error publicando evento para tarea {task_id}: {exc}")

View File

@@ -8,9 +8,8 @@ def soap_error(soap_response): # Testeado
Returns: Returns:
bool: True si no hay errores, False en caso contrario bool: True si no hay errores, False en caso contrario
""" """
if '<ns2:tieneError>true</ns2:tieneError>' in soap_response.text: # Cubre cualquier variante de namespace y capitalización (tieneError, TieneError, ns2:tieneError, etc.)
return True if 'tieneerror>true<' in soap_response.text.lower():
if '<ns3:tieneError>true</ns3:tieneError>' in soap_response.text:
return True return True
if "<mensaje>El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo</mensaje>" in soap_response.text: if "<mensaje>El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo</mensaje>" in soap_response.text:
return True return True