import asyncio import json import logging import os import redis.asyncio as aioredis from fastapi import APIRouter, Request from fastapi.responses import StreamingResponse router = APIRouter() logger = logging.getLogger(__name__) CHANNEL_PREFIX = "audit_task:" STATE_PREFIX = "audit_task_state:" HEARTBEAT_INTERVAL = 25 # segundos MAX_STREAM_SECONDS = 7200 # 2 horas def _redis_url() -> str: # Usa el Redis compartido con Django backend para leer los eventos publicados por sus tasks. host = os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost")) port = os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", "6379")) db = os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", "0")) return f"redis://{host}:{port}/{db}" @router.get("/stream/tasks/{task_id}") async def stream_task_events(task_id: str, request: Request): """ SSE endpoint — el cliente se conecta y recibe eventos de progreso de la tarea en tiempo real vía Redis Pub/Sub. Cabeceras requeridas en Nginx upstream: proxy_read_timeout 7200; proxy_buffering off; """ async def event_generator(): r = aioredis.from_url(_redis_url(), decode_responses=True) pubsub = r.pubsub() try: # Enviar estado actual si ya existe (cliente que llega tarde) current_raw = await r.get(f"{STATE_PREFIX}{task_id}") if current_raw: current = json.loads(current_raw) yield f"event: task_update\ndata: {current_raw}\n\n" if current.get("status") in ("completed", "failed"): return await pubsub.subscribe(f"{CHANNEL_PREFIX}{task_id}") tick = 0 while tick < MAX_STREAM_SECONDS: if await request.is_disconnected(): break try: msg = await asyncio.wait_for( pubsub.get_message(ignore_subscribe_messages=True), timeout=1.0, ) except asyncio.TimeoutError: msg = None if msg and msg["type"] == "message": data_str = msg["data"] yield f"event: task_update\ndata: {data_str}\n\n" parsed = json.loads(data_str) if parsed.get("status") in ("completed", "failed"): break tick += 1 if tick % HEARTBEAT_INTERVAL == 0: yield f"event: heartbeat\ndata: {{}}\n\n" except asyncio.CancelledError: pass except Exception as exc: logger.error(f"[stream] Error en SSE para tarea {task_id}: {exc}") yield f"event: error\ndata: {json.dumps({'message': str(exc)})}\n\n" finally: try: await pubsub.unsubscribe(f"{CHANNEL_PREFIX}{task_id}") except Exception: pass await r.aclose() return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", # deshabilita buffering en Nginx "Connection": "keep-alive", }, )