From 288a96bc59fe5735a8c6f08829235efd6394524f Mon Sep 17 00:00:00 2001 From: marcos Date: Thu, 28 May 2026 07:13:32 -0600 Subject: [PATCH] feature/pedimentos-corregidos --- api/api_v2/api.py | 2 + api/api_v2/modules/stream/__init__.py | 0 api/api_v2/modules/stream/routers.py | 97 +++++++++++++++++++++++++++ api/api_v2/modules/tasks/services.py | 5 +- core/redis_events.py | 44 ++++++++++++ utils/helpers.py | 5 +- 6 files changed, 149 insertions(+), 4 deletions(-) create mode 100644 api/api_v2/modules/stream/__init__.py create mode 100644 api/api_v2/modules/stream/routers.py create mode 100644 core/redis_events.py diff --git a/api/api_v2/api.py b/api/api_v2/api.py index 0d9f0c6..6dd0b8d 100644 --- a/api/api_v2/api.py +++ b/api/api_v2/api.py @@ -9,6 +9,7 @@ from api.api_v2.modules.edocs.routers import router as edocs_router from api.api_v2.modules.partidas.routers import router as partidas_router from api.api_v2.modules.pedimentos.routers import router as pedimentos_router from api.api_v2.modules.remesas.routers import router as remesas_router +from api.api_v2.modules.stream.routers import router as stream_router api_router = APIRouter() @@ -20,3 +21,4 @@ api_router.include_router(edocs_router, tags=["EDocuments"]) api_router.include_router(partidas_router, tags=["Partidas"]) api_router.include_router(pedimentos_router, tags=["Pedimentos"]) api_router.include_router(remesas_router, tags=["Remesas"]) +api_router.include_router(stream_router, tags=["Stream"]) diff --git a/api/api_v2/modules/stream/__init__.py b/api/api_v2/modules/stream/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/api_v2/modules/stream/routers.py b/api/api_v2/modules/stream/routers.py new file mode 100644 index 0000000..e5741cd --- /dev/null +++ b/api/api_v2/modules/stream/routers.py @@ -0,0 +1,97 @@ +import asyncio +import json +import logging +import os + +import redis.asyncio as aioredis +from fastapi import APIRouter, Request +from fastapi.responses import StreamingResponse + +router = APIRouter() +logger = logging.getLogger(__name__) + +CHANNEL_PREFIX = "audit_task:" +STATE_PREFIX = "audit_task_state:" +HEARTBEAT_INTERVAL = 25 # segundos +MAX_STREAM_SECONDS = 7200 # 2 horas + + +def _redis_url() -> str: + # Usa el Redis compartido con Django backend para leer los eventos publicados por sus tasks. + host = os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost")) + port = os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", "6379")) + db = os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", "0")) + return f"redis://{host}:{port}/{db}" + + +@router.get("/stream/tasks/{task_id}") +async def stream_task_events(task_id: str, request: Request): + """ + SSE endpoint — el cliente se conecta y recibe eventos de progreso de la tarea + en tiempo real vía Redis Pub/Sub. + + Cabeceras requeridas en Nginx upstream: + proxy_read_timeout 7200; + proxy_buffering off; + """ + + async def event_generator(): + r = aioredis.from_url(_redis_url(), decode_responses=True) + pubsub = r.pubsub() + + try: + # Enviar estado actual si ya existe (cliente que llega tarde) + current_raw = await r.get(f"{STATE_PREFIX}{task_id}") + if current_raw: + current = json.loads(current_raw) + yield f"event: task_update\ndata: {current_raw}\n\n" + if current.get("status") in ("completed", "failed"): + return + + await pubsub.subscribe(f"{CHANNEL_PREFIX}{task_id}") + + tick = 0 + while tick < MAX_STREAM_SECONDS: + if await request.is_disconnected(): + break + + try: + msg = await asyncio.wait_for( + pubsub.get_message(ignore_subscribe_messages=True), + timeout=1.0, + ) + except asyncio.TimeoutError: + msg = None + + if msg and msg["type"] == "message": + data_str = msg["data"] + yield f"event: task_update\ndata: {data_str}\n\n" + parsed = json.loads(data_str) + if parsed.get("status") in ("completed", "failed"): + break + + tick += 1 + if tick % HEARTBEAT_INTERVAL == 0: + yield f"event: heartbeat\ndata: {{}}\n\n" + + except asyncio.CancelledError: + pass + except Exception as exc: + logger.error(f"[stream] Error en SSE para tarea {task_id}: {exc}") + yield f"event: error\ndata: {json.dumps({'message': str(exc)})}\n\n" + finally: + try: + await pubsub.unsubscribe(f"{CHANNEL_PREFIX}{task_id}") + except Exception: + pass + await r.aclose() + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", # deshabilita buffering en Nginx + "Connection": "keep-alive", + }, + ) diff --git a/api/api_v2/modules/tasks/services.py b/api/api_v2/modules/tasks/services.py index ec447df..3be76b5 100644 --- a/api/api_v2/modules/tasks/services.py +++ b/api/api_v2/modules/tasks/services.py @@ -4,6 +4,8 @@ from typing import Dict, Any from fastapi import HTTPException from ..common import create_error_response from core.config import settings +from core.redis_events import publish_task_event + logger = logging.getLogger(__name__) async def update_task( @@ -61,8 +63,9 @@ async def update_task( return await _create_and_update_task( task_id, message, status, pedimento_id, organizacion_id, servicio ) - + response.raise_for_status() + publish_task_event(task_id, status, message) return response.json() except httpx.HTTPError as e: diff --git a/core/redis_events.py b/core/redis_events.py new file mode 100644 index 0000000..64b8667 --- /dev/null +++ b/core/redis_events.py @@ -0,0 +1,44 @@ +import json +import os +import redis +import logging + +logger = logging.getLogger(__name__) + +CHANNEL_PREFIX = "audit_task:" +STATE_PREFIX = "audit_task_state:" +STATE_TTL = 7200 # 2 horas + + +def _get_sync_redis(): + # REDIS_PUBSUB_HOST apunta al Redis compartido con el backend Django. + # En dev: redis_backend_dev. Por defecto usa REDIS_HOST (para entornos con una sola instancia). + return redis.Redis( + host=os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost")), + port=int(os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", 6379))), + db=int(os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", 0))), + decode_responses=True, + socket_connect_timeout=2, + socket_timeout=2, + ) + + +def publish_task_event(task_id: str, status: str, message: str = "", resultado: dict = None, progress: int = None): + """ + Publica un evento de progreso de tarea en Redis Pub/Sub. + Guarda el último estado en una key con TTL para clientes que se conectan tarde. + """ + payload: dict = {"task_id": task_id, "status": status, "message": message} + if resultado is not None: + payload["resultado"] = resultado + if progress is not None: + payload["progress"] = progress + + try: + client = _get_sync_redis() + serialized = json.dumps(payload) + client.publish(f"{CHANNEL_PREFIX}{task_id}", serialized) + client.setex(f"{STATE_PREFIX}{task_id}", STATE_TTL, serialized) + client.close() + except Exception as exc: + logger.error(f"[redis_events] Error publicando evento para tarea {task_id}: {exc}") diff --git a/utils/helpers.py b/utils/helpers.py index 79fd019..b53e60d 100644 --- a/utils/helpers.py +++ b/utils/helpers.py @@ -8,9 +8,8 @@ def soap_error(soap_response): # Testeado Returns: bool: True si no hay errores, False en caso contrario """ - if 'true' in soap_response.text: - return True - if 'true' in soap_response.text: + # Cubre cualquier variante de namespace y capitalización (tieneError, TieneError, ns2:tieneError, etc.) + if 'tieneerror>true<' in soap_response.text.lower(): return True if "El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo" in soap_response.text: return True