""" Autenticación vía Hub de Aduanasoft (Keycloak). Tokens locales HS256 (~700 bytes) se emiten tras el exchange con el Hub para no exceder el límite de 4096 bytes de cookies del browser. ORDEN CRÍTICO en verify_hub_token: cache → local HS256 → Hub /auth/me Si el token local se manda al Hub primero, Hub responde 401 y rompe la sesión SSO silenciosamente. """ import logging import time from typing import Optional import jwt import requests from django.conf import settings from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed logger = logging.getLogger(__name__) # Cache en memoria: {token: (payload, expires_at)} _token_cache: dict = {} _CACHE_TTL = 60 # segundos def _cache_get(token: str) -> Optional[dict]: entry = _token_cache.get(token) if entry and entry[1] > time.time(): return entry[0] _token_cache.pop(token, None) return None def _cache_set(token: str, payload: dict): _token_cache[token] = (payload, time.time() + _CACHE_TTL) # --------------------------------------------------------------------------- # Tokens locales # --------------------------------------------------------------------------- def create_local_tokens(user_data: dict) -> dict: """Emite tokens locales compactos HS256. Caben en cookies del browser.""" import uuid from datetime import datetime, timedelta, timezone now = datetime.now(timezone.utc) base = { "sub": str(user_data.get("id") or user_data.get("username", "")), "preferred_username": user_data.get("username", ""), "email": user_data.get("email", ""), "name": user_data.get("name", ""), "given_name": user_data.get("first_name", ""), "family_name": user_data.get("last_name", ""), "is_hub_admin": user_data.get("is_hub_admin", False), "tenant_id": user_data.get("tenant_id"), "tenant_slug": user_data.get("tenant_slug") or getattr(settings, "HUB_TENANT_SLUG", ""), "source": "local", "iat": int(now.timestamp()), } access_payload = {**base, "exp": int((now + timedelta(hours=8)).timestamp())} refresh_payload = {**base, "exp": int((now + timedelta(days=30)).timestamp())} secret = settings.SECRET_KEY return { "access_token": jwt.encode(access_payload, secret, algorithm="HS256"), "refresh_token": jwt.encode(refresh_payload, secret, algorithm="HS256"), "expires_in": 1800, "source": "local", } def _verify_local_token(token: str) -> Optional[dict]: """Decodifica token local HS256. Retorna payload o None si no es local.""" try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"]) if payload.get("source") == "local": return payload return None except jwt.ExpiredSignatureError: raise AuthenticationFailed("Token expirado — inicia sesión de nuevo") except jwt.InvalidTokenError: return None # --------------------------------------------------------------------------- # Verificación contra Hub # --------------------------------------------------------------------------- def verify_hub_token(token: str) -> dict: """ORDEN: cache → local HS256 → Hub /auth/me.""" cached = _cache_get(token) if cached: return cached # 1. Token local primero (evita 401 del Hub para tokens locales) local = _verify_local_token(token) if local: _cache_set(token, local) return local # 2. Validar contra Hub hub_url = getattr(settings, "HUB_URL", "https://workspace.aduanasoft.com") me_url = f"{hub_url.rstrip('/')}/api/v1/auth/me" try: r = requests.get(me_url, headers={"Authorization": f"Bearer {token}"}, timeout=5) except requests.exceptions.RequestException as exc: # Fallback: si hay token local válido lo usamos local = _verify_local_token(token) if local: _cache_set(token, local) return local logger.error("Hub no disponible: %s", exc) raise AuthenticationFailed("Servicio de autenticación no disponible") if r.status_code == 200: info = r.json() _cache_set(token, info) return info if r.status_code in (401, 403): raise AuthenticationFailed("Token inválido o sesión expirada") logger.error("Hub respondió %s al verificar token", r.status_code) raise AuthenticationFailed("No se pudo verificar el token") def _get_django_user(hub_data: dict): """Resuelve el CustomUser de Django a partir de los datos del Hub.""" from api.cuser.models import CustomUser # Token local: sub puede ser Django UUID (login directo) o KC UUID (SSO exchange) if hub_data.get("source") == "local": from django.db.models import Q sub = hub_data.get("sub", "") if not sub: return None # Una sola query: busca por Django UUID o KC UUID simultáneamente try: return CustomUser.objects.filter( Q(id=sub) | Q(keycloak_user_id=sub) ).first() except Exception: # sub malformado (no es UUID válido) return CustomUser.objects.filter(keycloak_user_id=sub).first() # Token Hub: buscar por keycloak_user_id → email kc_id = hub_data.get("keycloak_user_id") or hub_data.get("sub") email = hub_data.get("email") if kc_id: user = CustomUser.objects.filter(keycloak_user_id=kc_id).first() if user: return user if email: return CustomUser.objects.filter(email=email).first() return None # --------------------------------------------------------------------------- # DRF Authentication Backend # --------------------------------------------------------------------------- class HubAuthBackend(BaseAuthentication): """ Drop-in para reemplazar JWTAuthentication. Acepta tokens locales (HS256) y tokens del Hub indistintamente. Se añade JUNTO a JWTAuthentication para compatibilidad durante la migración. """ def authenticate(self, request): token = self._extract_token(request) if not token: return None # Detectar tokens SimpleJWT sin llamar al Hub. # Decodificamos sin verificar firma solo para leer claims. # Si el token no tiene source="local" ni claims de KC (realm_access, azp) # es un token SimpleJWT legacy → dejar que JWTAuthentication lo maneje. try: unverified = jwt.decode( token, options={"verify_signature": False}, algorithms=["HS256", "RS256"], ) is_hub_token = ( unverified.get("source") == "local" # token local HS256 or "realm_access" in unverified # token KC directo or "azp" in unverified # token KC (authorized party) ) if not is_hub_token: return None # SimpleJWT — pasar al siguiente backend sin tocar el Hub except Exception: return None # JWT malformado — no es nuestro try: hub_data = verify_hub_token(token) except AuthenticationFailed: return None except Exception as exc: logger.error("Error inesperado en HubAuthBackend: %s", exc) return None user = _get_django_user(hub_data) if not user: # Retornar None permite que endpoints AllowAny pasen sin bloquear. # Los endpoints IsAuthenticated quedarán como "no autenticado" (sin 401 engañoso). return None return (user, token) @staticmethod def _extract_token(request) -> Optional[str]: auth_header = request.META.get("HTTP_AUTHORIZATION", "") if auth_header.lower().startswith("bearer "): return auth_header[7:].strip() or None # Fallback: cookie (flujo SSO con cookies) return request.COOKIES.get("access_token") def authenticate_header(self, request): return "Bearer" # --------------------------------------------------------------------------- # Helper cookies # --------------------------------------------------------------------------- def set_session_cookies(response, tokens: dict): """Escribe las cookies de sesión HTTP-only.""" secure = getattr(settings, "COOKIE_SECURE", not settings.DEBUG) kw = dict(httponly=True, secure=secure, samesite="Lax") response.set_cookie("access_token", tokens["access_token"], max_age=1800, **kw) response.set_cookie("refresh_token", tokens["refresh_token"], max_age=60*60*24*7, **kw) response.set_cookie("token_type", "bearer", max_age=60*60*24*7, httponly=False, secure=secure, samesite="Lax")