98 lines
3.2 KiB
Python
98 lines
3.2 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
|
|
import redis.asyncio as aioredis
|
|
from fastapi import APIRouter, Request
|
|
from fastapi.responses import StreamingResponse
|
|
|
|
router = APIRouter()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
CHANNEL_PREFIX = "audit_task:"
|
|
STATE_PREFIX = "audit_task_state:"
|
|
HEARTBEAT_INTERVAL = 25 # segundos
|
|
MAX_STREAM_SECONDS = 7200 # 2 horas
|
|
|
|
|
|
def _redis_url() -> str:
|
|
# Usa el Redis compartido con Django backend para leer los eventos publicados por sus tasks.
|
|
host = os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost"))
|
|
port = os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", "6379"))
|
|
db = os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", "0"))
|
|
return f"redis://{host}:{port}/{db}"
|
|
|
|
|
|
@router.get("/stream/tasks/{task_id}")
|
|
async def stream_task_events(task_id: str, request: Request):
|
|
"""
|
|
SSE endpoint — el cliente se conecta y recibe eventos de progreso de la tarea
|
|
en tiempo real vía Redis Pub/Sub.
|
|
|
|
Cabeceras requeridas en Nginx upstream:
|
|
proxy_read_timeout 7200;
|
|
proxy_buffering off;
|
|
"""
|
|
|
|
async def event_generator():
|
|
r = aioredis.from_url(_redis_url(), decode_responses=True)
|
|
pubsub = r.pubsub()
|
|
|
|
try:
|
|
# Enviar estado actual si ya existe (cliente que llega tarde)
|
|
current_raw = await r.get(f"{STATE_PREFIX}{task_id}")
|
|
if current_raw:
|
|
current = json.loads(current_raw)
|
|
yield f"event: task_update\ndata: {current_raw}\n\n"
|
|
if current.get("status") in ("completed", "failed"):
|
|
return
|
|
|
|
await pubsub.subscribe(f"{CHANNEL_PREFIX}{task_id}")
|
|
|
|
tick = 0
|
|
while tick < MAX_STREAM_SECONDS:
|
|
if await request.is_disconnected():
|
|
break
|
|
|
|
try:
|
|
msg = await asyncio.wait_for(
|
|
pubsub.get_message(ignore_subscribe_messages=True),
|
|
timeout=1.0,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
msg = None
|
|
|
|
if msg and msg["type"] == "message":
|
|
data_str = msg["data"]
|
|
yield f"event: task_update\ndata: {data_str}\n\n"
|
|
parsed = json.loads(data_str)
|
|
if parsed.get("status") in ("completed", "failed"):
|
|
break
|
|
|
|
tick += 1
|
|
if tick % HEARTBEAT_INTERVAL == 0:
|
|
yield f"event: heartbeat\ndata: {{}}\n\n"
|
|
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception as exc:
|
|
logger.error(f"[stream] Error en SSE para tarea {task_id}: {exc}")
|
|
yield f"event: error\ndata: {json.dumps({'message': str(exc)})}\n\n"
|
|
finally:
|
|
try:
|
|
await pubsub.unsubscribe(f"{CHANNEL_PREFIX}{task_id}")
|
|
except Exception:
|
|
pass
|
|
await r.aclose()
|
|
|
|
return StreamingResponse(
|
|
event_generator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no", # deshabilita buffering en Nginx
|
|
"Connection": "keep-alive",
|
|
},
|
|
)
|