feature/pedimentos-corregidos #11
@@ -9,6 +9,7 @@ from api.api_v2.modules.edocs.routers import router as edocs_router
|
|||||||
from api.api_v2.modules.partidas.routers import router as partidas_router
|
from api.api_v2.modules.partidas.routers import router as partidas_router
|
||||||
from api.api_v2.modules.pedimentos.routers import router as pedimentos_router
|
from api.api_v2.modules.pedimentos.routers import router as pedimentos_router
|
||||||
from api.api_v2.modules.remesas.routers import router as remesas_router
|
from api.api_v2.modules.remesas.routers import router as remesas_router
|
||||||
|
from api.api_v2.modules.stream.routers import router as stream_router
|
||||||
|
|
||||||
api_router = APIRouter()
|
api_router = APIRouter()
|
||||||
|
|
||||||
@@ -20,3 +21,4 @@ api_router.include_router(edocs_router, tags=["EDocuments"])
|
|||||||
api_router.include_router(partidas_router, tags=["Partidas"])
|
api_router.include_router(partidas_router, tags=["Partidas"])
|
||||||
api_router.include_router(pedimentos_router, tags=["Pedimentos"])
|
api_router.include_router(pedimentos_router, tags=["Pedimentos"])
|
||||||
api_router.include_router(remesas_router, tags=["Remesas"])
|
api_router.include_router(remesas_router, tags=["Remesas"])
|
||||||
|
api_router.include_router(stream_router, tags=["Stream"])
|
||||||
|
|||||||
0
api/api_v2/modules/stream/__init__.py
Normal file
0
api/api_v2/modules/stream/__init__.py
Normal file
97
api/api_v2/modules/stream/routers.py
Normal file
97
api/api_v2/modules/stream/routers.py
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
import redis.asyncio as aioredis
|
||||||
|
from fastapi import APIRouter, Request
|
||||||
|
from fastapi.responses import StreamingResponse
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CHANNEL_PREFIX = "audit_task:"
|
||||||
|
STATE_PREFIX = "audit_task_state:"
|
||||||
|
HEARTBEAT_INTERVAL = 25 # segundos
|
||||||
|
MAX_STREAM_SECONDS = 7200 # 2 horas
|
||||||
|
|
||||||
|
|
||||||
|
def _redis_url() -> str:
|
||||||
|
# Usa el Redis compartido con Django backend para leer los eventos publicados por sus tasks.
|
||||||
|
host = os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost"))
|
||||||
|
port = os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", "6379"))
|
||||||
|
db = os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", "0"))
|
||||||
|
return f"redis://{host}:{port}/{db}"
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/stream/tasks/{task_id}")
|
||||||
|
async def stream_task_events(task_id: str, request: Request):
|
||||||
|
"""
|
||||||
|
SSE endpoint — el cliente se conecta y recibe eventos de progreso de la tarea
|
||||||
|
en tiempo real vía Redis Pub/Sub.
|
||||||
|
|
||||||
|
Cabeceras requeridas en Nginx upstream:
|
||||||
|
proxy_read_timeout 7200;
|
||||||
|
proxy_buffering off;
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def event_generator():
|
||||||
|
r = aioredis.from_url(_redis_url(), decode_responses=True)
|
||||||
|
pubsub = r.pubsub()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Enviar estado actual si ya existe (cliente que llega tarde)
|
||||||
|
current_raw = await r.get(f"{STATE_PREFIX}{task_id}")
|
||||||
|
if current_raw:
|
||||||
|
current = json.loads(current_raw)
|
||||||
|
yield f"event: task_update\ndata: {current_raw}\n\n"
|
||||||
|
if current.get("status") in ("completed", "failed"):
|
||||||
|
return
|
||||||
|
|
||||||
|
await pubsub.subscribe(f"{CHANNEL_PREFIX}{task_id}")
|
||||||
|
|
||||||
|
tick = 0
|
||||||
|
while tick < MAX_STREAM_SECONDS:
|
||||||
|
if await request.is_disconnected():
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
msg = await asyncio.wait_for(
|
||||||
|
pubsub.get_message(ignore_subscribe_messages=True),
|
||||||
|
timeout=1.0,
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
msg = None
|
||||||
|
|
||||||
|
if msg and msg["type"] == "message":
|
||||||
|
data_str = msg["data"]
|
||||||
|
yield f"event: task_update\ndata: {data_str}\n\n"
|
||||||
|
parsed = json.loads(data_str)
|
||||||
|
if parsed.get("status") in ("completed", "failed"):
|
||||||
|
break
|
||||||
|
|
||||||
|
tick += 1
|
||||||
|
if tick % HEARTBEAT_INTERVAL == 0:
|
||||||
|
yield f"event: heartbeat\ndata: {{}}\n\n"
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error(f"[stream] Error en SSE para tarea {task_id}: {exc}")
|
||||||
|
yield f"event: error\ndata: {json.dumps({'message': str(exc)})}\n\n"
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
await pubsub.unsubscribe(f"{CHANNEL_PREFIX}{task_id}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
await r.aclose()
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
event_generator(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"X-Accel-Buffering": "no", # deshabilita buffering en Nginx
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
},
|
||||||
|
)
|
||||||
@@ -4,6 +4,8 @@ from typing import Dict, Any
|
|||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
from ..common import create_error_response
|
from ..common import create_error_response
|
||||||
from core.config import settings
|
from core.config import settings
|
||||||
|
from core.redis_events import publish_task_event
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
async def update_task(
|
async def update_task(
|
||||||
@@ -61,8 +63,9 @@ async def update_task(
|
|||||||
return await _create_and_update_task(
|
return await _create_and_update_task(
|
||||||
task_id, message, status, pedimento_id, organizacion_id, servicio
|
task_id, message, status, pedimento_id, organizacion_id, servicio
|
||||||
)
|
)
|
||||||
|
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
publish_task_event(task_id, status, message)
|
||||||
return response.json()
|
return response.json()
|
||||||
|
|
||||||
except httpx.HTTPError as e:
|
except httpx.HTTPError as e:
|
||||||
|
|||||||
44
core/redis_events.py
Normal file
44
core/redis_events.py
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
import redis
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CHANNEL_PREFIX = "audit_task:"
|
||||||
|
STATE_PREFIX = "audit_task_state:"
|
||||||
|
STATE_TTL = 7200 # 2 horas
|
||||||
|
|
||||||
|
|
||||||
|
def _get_sync_redis():
|
||||||
|
# REDIS_PUBSUB_HOST apunta al Redis compartido con el backend Django.
|
||||||
|
# En dev: redis_backend_dev. Por defecto usa REDIS_HOST (para entornos con una sola instancia).
|
||||||
|
return redis.Redis(
|
||||||
|
host=os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost")),
|
||||||
|
port=int(os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", 6379))),
|
||||||
|
db=int(os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", 0))),
|
||||||
|
decode_responses=True,
|
||||||
|
socket_connect_timeout=2,
|
||||||
|
socket_timeout=2,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def publish_task_event(task_id: str, status: str, message: str = "", resultado: dict = None, progress: int = None):
|
||||||
|
"""
|
||||||
|
Publica un evento de progreso de tarea en Redis Pub/Sub.
|
||||||
|
Guarda el último estado en una key con TTL para clientes que se conectan tarde.
|
||||||
|
"""
|
||||||
|
payload: dict = {"task_id": task_id, "status": status, "message": message}
|
||||||
|
if resultado is not None:
|
||||||
|
payload["resultado"] = resultado
|
||||||
|
if progress is not None:
|
||||||
|
payload["progress"] = progress
|
||||||
|
|
||||||
|
try:
|
||||||
|
client = _get_sync_redis()
|
||||||
|
serialized = json.dumps(payload)
|
||||||
|
client.publish(f"{CHANNEL_PREFIX}{task_id}", serialized)
|
||||||
|
client.setex(f"{STATE_PREFIX}{task_id}", STATE_TTL, serialized)
|
||||||
|
client.close()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error(f"[redis_events] Error publicando evento para tarea {task_id}: {exc}")
|
||||||
@@ -8,9 +8,8 @@ def soap_error(soap_response): # Testeado
|
|||||||
Returns:
|
Returns:
|
||||||
bool: True si no hay errores, False en caso contrario
|
bool: True si no hay errores, False en caso contrario
|
||||||
"""
|
"""
|
||||||
if '<ns2:tieneError>true</ns2:tieneError>' in soap_response.text:
|
# Cubre cualquier variante de namespace y capitalización (tieneError, TieneError, ns2:tieneError, etc.)
|
||||||
return True
|
if 'tieneerror>true<' in soap_response.text.lower():
|
||||||
if '<ns3:tieneError>true</ns3:tieneError>' in soap_response.text:
|
|
||||||
return True
|
return True
|
||||||
if "<mensaje>El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo</mensaje>" in soap_response.text:
|
if "<mensaje>El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo</mensaje>" in soap_response.text:
|
||||||
return True
|
return True
|
||||||
|
|||||||
Reference in New Issue
Block a user