240 lines
8.8 KiB
Python
240 lines
8.8 KiB
Python
"""
|
|
Autenticación vía Hub de Aduanasoft (Keycloak).
|
|
Tokens locales HS256 (~700 bytes) se emiten tras el exchange con el Hub
|
|
para no exceder el límite de 4096 bytes de cookies del browser.
|
|
|
|
ORDEN CRÍTICO en verify_hub_token:
|
|
cache → local HS256 → Hub /auth/me
|
|
Si el token local se manda al Hub primero, Hub responde 401 y rompe la
|
|
sesión SSO silenciosamente.
|
|
"""
|
|
import logging
|
|
import time
|
|
from typing import Optional
|
|
|
|
import jwt
|
|
import requests
|
|
from django.conf import settings
|
|
from rest_framework.authentication import BaseAuthentication
|
|
from rest_framework.exceptions import AuthenticationFailed
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Cache en memoria: {token: (payload, expires_at)}
|
|
_token_cache: dict = {}
|
|
_CACHE_TTL = 60 # segundos
|
|
|
|
|
|
def _cache_get(token: str) -> Optional[dict]:
|
|
entry = _token_cache.get(token)
|
|
if entry and entry[1] > time.time():
|
|
return entry[0]
|
|
_token_cache.pop(token, None)
|
|
return None
|
|
|
|
|
|
def _cache_set(token: str, payload: dict):
|
|
_token_cache[token] = (payload, time.time() + _CACHE_TTL)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tokens locales
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_local_tokens(user_data: dict) -> dict:
|
|
"""Emite tokens locales compactos HS256. Caben en cookies del browser."""
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
now = datetime.now(timezone.utc)
|
|
base = {
|
|
"sub": str(user_data.get("id") or user_data.get("username", "")),
|
|
"preferred_username": user_data.get("username", ""),
|
|
"email": user_data.get("email", ""),
|
|
"name": user_data.get("name", ""),
|
|
"given_name": user_data.get("first_name", ""),
|
|
"family_name": user_data.get("last_name", ""),
|
|
"is_hub_admin": user_data.get("is_hub_admin", False),
|
|
"tenant_id": user_data.get("tenant_id"),
|
|
"tenant_slug": user_data.get("tenant_slug") or getattr(settings, "HUB_TENANT_SLUG", ""),
|
|
"source": "local",
|
|
"iat": int(now.timestamp()),
|
|
}
|
|
access_payload = {**base, "exp": int((now + timedelta(hours=8)).timestamp())}
|
|
refresh_payload = {**base, "exp": int((now + timedelta(days=30)).timestamp())}
|
|
secret = settings.SECRET_KEY
|
|
|
|
return {
|
|
"access_token": jwt.encode(access_payload, secret, algorithm="HS256"),
|
|
"refresh_token": jwt.encode(refresh_payload, secret, algorithm="HS256"),
|
|
"expires_in": 1800,
|
|
"source": "local",
|
|
}
|
|
|
|
|
|
def _verify_local_token(token: str) -> Optional[dict]:
|
|
"""Decodifica token local HS256. Retorna payload o None si no es local."""
|
|
try:
|
|
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
|
|
if payload.get("source") == "local":
|
|
return payload
|
|
return None
|
|
except jwt.ExpiredSignatureError:
|
|
raise AuthenticationFailed("Token expirado — inicia sesión de nuevo")
|
|
except jwt.InvalidTokenError:
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Verificación contra Hub
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def verify_hub_token(token: str) -> dict:
|
|
"""ORDEN: cache → local HS256 → Hub /auth/me."""
|
|
cached = _cache_get(token)
|
|
if cached:
|
|
return cached
|
|
|
|
# 1. Token local primero (evita 401 del Hub para tokens locales)
|
|
local = _verify_local_token(token)
|
|
if local:
|
|
_cache_set(token, local)
|
|
return local
|
|
|
|
# 2. Validar contra Hub
|
|
hub_url = getattr(settings, "HUB_URL", "https://workspace.aduanasoft.com")
|
|
me_url = f"{hub_url.rstrip('/')}/api/v1/auth/me"
|
|
try:
|
|
r = requests.get(me_url, headers={"Authorization": f"Bearer {token}"}, timeout=5)
|
|
except requests.exceptions.RequestException as exc:
|
|
# Fallback: si hay token local válido lo usamos
|
|
local = _verify_local_token(token)
|
|
if local:
|
|
_cache_set(token, local)
|
|
return local
|
|
logger.error("Hub no disponible: %s", exc)
|
|
raise AuthenticationFailed("Servicio de autenticación no disponible")
|
|
|
|
if r.status_code == 200:
|
|
info = r.json()
|
|
_cache_set(token, info)
|
|
return info
|
|
|
|
if r.status_code in (401, 403):
|
|
raise AuthenticationFailed("Token inválido o sesión expirada")
|
|
|
|
logger.error("Hub respondió %s al verificar token", r.status_code)
|
|
raise AuthenticationFailed("No se pudo verificar el token")
|
|
|
|
|
|
def _get_django_user(hub_data: dict):
|
|
"""Resuelve el CustomUser de Django a partir de los datos del Hub."""
|
|
from api.cuser.models import CustomUser
|
|
|
|
# Token local: sub puede ser Django UUID (login directo) o KC UUID (SSO exchange)
|
|
if hub_data.get("source") == "local":
|
|
from django.db.models import Q
|
|
sub = hub_data.get("sub", "")
|
|
if not sub:
|
|
return None
|
|
# Una sola query: busca por Django UUID o KC UUID simultáneamente
|
|
try:
|
|
return CustomUser.objects.filter(
|
|
Q(id=sub) | Q(keycloak_user_id=sub)
|
|
).first()
|
|
except Exception:
|
|
# sub malformado (no es UUID válido)
|
|
return CustomUser.objects.filter(keycloak_user_id=sub).first()
|
|
|
|
# Token Hub: buscar por keycloak_user_id → email
|
|
kc_id = hub_data.get("keycloak_user_id") or hub_data.get("sub")
|
|
email = hub_data.get("email")
|
|
|
|
if kc_id:
|
|
user = CustomUser.objects.filter(keycloak_user_id=kc_id).first()
|
|
if user:
|
|
return user
|
|
|
|
if email:
|
|
return CustomUser.objects.filter(email=email).first()
|
|
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DRF Authentication Backend
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class HubAuthBackend(BaseAuthentication):
|
|
"""
|
|
Drop-in para reemplazar JWTAuthentication.
|
|
Acepta tokens locales (HS256) y tokens del Hub indistintamente.
|
|
Se añade JUNTO a JWTAuthentication para compatibilidad durante la migración.
|
|
"""
|
|
|
|
def authenticate(self, request):
|
|
token = self._extract_token(request)
|
|
if not token:
|
|
return None
|
|
|
|
# Detectar tokens SimpleJWT sin llamar al Hub.
|
|
# Decodificamos sin verificar firma solo para leer claims.
|
|
# Si el token no tiene source="local" ni claims de KC (realm_access, azp)
|
|
# es un token SimpleJWT legacy → dejar que JWTAuthentication lo maneje.
|
|
try:
|
|
unverified = jwt.decode(
|
|
token,
|
|
options={"verify_signature": False},
|
|
algorithms=["HS256", "RS256"],
|
|
)
|
|
is_hub_token = (
|
|
unverified.get("source") == "local" # token local HS256
|
|
or "realm_access" in unverified # token KC directo
|
|
or "azp" in unverified # token KC (authorized party)
|
|
)
|
|
if not is_hub_token:
|
|
return None # SimpleJWT — pasar al siguiente backend sin tocar el Hub
|
|
except Exception:
|
|
return None # JWT malformado — no es nuestro
|
|
|
|
try:
|
|
hub_data = verify_hub_token(token)
|
|
except AuthenticationFailed:
|
|
return None
|
|
except Exception as exc:
|
|
logger.error("Error inesperado en HubAuthBackend: %s", exc)
|
|
return None
|
|
|
|
user = _get_django_user(hub_data)
|
|
if not user:
|
|
# Retornar None permite que endpoints AllowAny pasen sin bloquear.
|
|
# Los endpoints IsAuthenticated quedarán como "no autenticado" (sin 401 engañoso).
|
|
return None
|
|
|
|
return (user, token)
|
|
|
|
@staticmethod
|
|
def _extract_token(request) -> Optional[str]:
|
|
auth_header = request.META.get("HTTP_AUTHORIZATION", "")
|
|
if auth_header.lower().startswith("bearer "):
|
|
return auth_header[7:].strip() or None
|
|
# Fallback: cookie (flujo SSO con cookies)
|
|
return request.COOKIES.get("access_token")
|
|
|
|
def authenticate_header(self, request):
|
|
return "Bearer"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helper cookies
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def set_session_cookies(response, tokens: dict):
|
|
"""Escribe las cookies de sesión HTTP-only."""
|
|
secure = getattr(settings, "COOKIE_SECURE", not settings.DEBUG)
|
|
kw = dict(httponly=True, secure=secure, samesite="Lax")
|
|
response.set_cookie("access_token", tokens["access_token"], max_age=1800, **kw)
|
|
response.set_cookie("refresh_token", tokens["refresh_token"], max_age=60*60*24*7, **kw)
|
|
response.set_cookie("token_type", "bearer", max_age=60*60*24*7,
|
|
httponly=False, secure=secure, samesite="Lax")
|