diff --git a/api/api_v2/api.py b/api/api_v2/api.py
index 0d9f0c6..6dd0b8d 100644
--- a/api/api_v2/api.py
+++ b/api/api_v2/api.py
@@ -9,6 +9,7 @@ from api.api_v2.modules.edocs.routers import router as edocs_router
from api.api_v2.modules.partidas.routers import router as partidas_router
from api.api_v2.modules.pedimentos.routers import router as pedimentos_router
from api.api_v2.modules.remesas.routers import router as remesas_router
+from api.api_v2.modules.stream.routers import router as stream_router
api_router = APIRouter()
@@ -20,3 +21,4 @@ api_router.include_router(edocs_router, tags=["EDocuments"])
api_router.include_router(partidas_router, tags=["Partidas"])
api_router.include_router(pedimentos_router, tags=["Pedimentos"])
api_router.include_router(remesas_router, tags=["Remesas"])
+api_router.include_router(stream_router, tags=["Stream"])
diff --git a/api/api_v2/modules/stream/__init__.py b/api/api_v2/modules/stream/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/api/api_v2/modules/stream/routers.py b/api/api_v2/modules/stream/routers.py
new file mode 100644
index 0000000..e5741cd
--- /dev/null
+++ b/api/api_v2/modules/stream/routers.py
@@ -0,0 +1,97 @@
+import asyncio
+import json
+import logging
+import os
+
+import redis.asyncio as aioredis
+from fastapi import APIRouter, Request
+from fastapi.responses import StreamingResponse
+
+router = APIRouter()
+logger = logging.getLogger(__name__)
+
+CHANNEL_PREFIX = "audit_task:"
+STATE_PREFIX = "audit_task_state:"
+HEARTBEAT_INTERVAL = 25 # segundos
+MAX_STREAM_SECONDS = 7200 # 2 horas
+
+
+def _redis_url() -> str:
+ # Usa el Redis compartido con Django backend para leer los eventos publicados por sus tasks.
+ host = os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost"))
+ port = os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", "6379"))
+ db = os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", "0"))
+ return f"redis://{host}:{port}/{db}"
+
+
+@router.get("/stream/tasks/{task_id}")
+async def stream_task_events(task_id: str, request: Request):
+ """
+ SSE endpoint — el cliente se conecta y recibe eventos de progreso de la tarea
+ en tiempo real vía Redis Pub/Sub.
+
+ Cabeceras requeridas en Nginx upstream:
+ proxy_read_timeout 7200;
+ proxy_buffering off;
+ """
+
+ async def event_generator():
+ r = aioredis.from_url(_redis_url(), decode_responses=True)
+ pubsub = r.pubsub()
+
+ try:
+ # Enviar estado actual si ya existe (cliente que llega tarde)
+ current_raw = await r.get(f"{STATE_PREFIX}{task_id}")
+ if current_raw:
+ current = json.loads(current_raw)
+ yield f"event: task_update\ndata: {current_raw}\n\n"
+ if current.get("status") in ("completed", "failed"):
+ return
+
+ await pubsub.subscribe(f"{CHANNEL_PREFIX}{task_id}")
+
+ tick = 0
+ while tick < MAX_STREAM_SECONDS:
+ if await request.is_disconnected():
+ break
+
+ try:
+ msg = await asyncio.wait_for(
+ pubsub.get_message(ignore_subscribe_messages=True),
+ timeout=1.0,
+ )
+ except asyncio.TimeoutError:
+ msg = None
+
+ if msg and msg["type"] == "message":
+ data_str = msg["data"]
+ yield f"event: task_update\ndata: {data_str}\n\n"
+ parsed = json.loads(data_str)
+ if parsed.get("status") in ("completed", "failed"):
+ break
+
+ tick += 1
+ if tick % HEARTBEAT_INTERVAL == 0:
+ yield f"event: heartbeat\ndata: {{}}\n\n"
+
+ except asyncio.CancelledError:
+ pass
+ except Exception as exc:
+ logger.error(f"[stream] Error en SSE para tarea {task_id}: {exc}")
+ yield f"event: error\ndata: {json.dumps({'message': str(exc)})}\n\n"
+ finally:
+ try:
+ await pubsub.unsubscribe(f"{CHANNEL_PREFIX}{task_id}")
+ except Exception:
+ pass
+ await r.aclose()
+
+ return StreamingResponse(
+ event_generator(),
+ media_type="text/event-stream",
+ headers={
+ "Cache-Control": "no-cache",
+ "X-Accel-Buffering": "no", # deshabilita buffering en Nginx
+ "Connection": "keep-alive",
+ },
+ )
diff --git a/api/api_v2/modules/tasks/services.py b/api/api_v2/modules/tasks/services.py
index ec447df..3be76b5 100644
--- a/api/api_v2/modules/tasks/services.py
+++ b/api/api_v2/modules/tasks/services.py
@@ -4,6 +4,8 @@ from typing import Dict, Any
from fastapi import HTTPException
from ..common import create_error_response
from core.config import settings
+from core.redis_events import publish_task_event
+
logger = logging.getLogger(__name__)
async def update_task(
@@ -61,8 +63,9 @@ async def update_task(
return await _create_and_update_task(
task_id, message, status, pedimento_id, organizacion_id, servicio
)
-
+
response.raise_for_status()
+ publish_task_event(task_id, status, message)
return response.json()
except httpx.HTTPError as e:
diff --git a/core/redis_events.py b/core/redis_events.py
new file mode 100644
index 0000000..64b8667
--- /dev/null
+++ b/core/redis_events.py
@@ -0,0 +1,44 @@
+import json
+import os
+import redis
+import logging
+
+logger = logging.getLogger(__name__)
+
+CHANNEL_PREFIX = "audit_task:"
+STATE_PREFIX = "audit_task_state:"
+STATE_TTL = 7200 # 2 horas
+
+
+def _get_sync_redis():
+ # REDIS_PUBSUB_HOST apunta al Redis compartido con el backend Django.
+ # En dev: redis_backend_dev. Por defecto usa REDIS_HOST (para entornos con una sola instancia).
+ return redis.Redis(
+ host=os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost")),
+ port=int(os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", 6379))),
+ db=int(os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", 0))),
+ decode_responses=True,
+ socket_connect_timeout=2,
+ socket_timeout=2,
+ )
+
+
+def publish_task_event(task_id: str, status: str, message: str = "", resultado: dict = None, progress: int = None):
+ """
+ Publica un evento de progreso de tarea en Redis Pub/Sub.
+ Guarda el último estado en una key con TTL para clientes que se conectan tarde.
+ """
+ payload: dict = {"task_id": task_id, "status": status, "message": message}
+ if resultado is not None:
+ payload["resultado"] = resultado
+ if progress is not None:
+ payload["progress"] = progress
+
+ try:
+ client = _get_sync_redis()
+ serialized = json.dumps(payload)
+ client.publish(f"{CHANNEL_PREFIX}{task_id}", serialized)
+ client.setex(f"{STATE_PREFIX}{task_id}", STATE_TTL, serialized)
+ client.close()
+ except Exception as exc:
+ logger.error(f"[redis_events] Error publicando evento para tarea {task_id}: {exc}")
diff --git a/utils/helpers.py b/utils/helpers.py
index 79fd019..b53e60d 100644
--- a/utils/helpers.py
+++ b/utils/helpers.py
@@ -8,9 +8,8 @@ def soap_error(soap_response): # Testeado
Returns:
bool: True si no hay errores, False en caso contrario
"""
- if 'true' in soap_response.text:
- return True
- if 'true' in soap_response.text:
+ # Cubre cualquier variante de namespace y capitalización (tieneError, TieneError, ns2:tieneError, etc.)
+ if 'tieneerror>true<' in soap_response.text.lower():
return True
if "El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo" in soap_response.text:
return True