245 lines
8.6 KiB
Python
245 lines
8.6 KiB
Python
from rest_framework import permissions
|
|
from rest_framework.exceptions import PermissionDenied
|
|
from rest_framework.authentication import TokenAuthentication
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers centrales — toda la lógica de RBAC pasa por aquí
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def is_internal_service_request(request):
|
|
"""True si la petición proviene de un service account (Token auth + superuser).
|
|
Misma lógica que IsInternalService, útil en get_queryset() y perform_* methods."""
|
|
user = getattr(request, 'user', None)
|
|
if not user or not user.is_superuser:
|
|
return False
|
|
return isinstance(getattr(request, 'successful_authenticator', None), TokenAuthentication)
|
|
|
|
|
|
def get_org_context(user):
|
|
"""Retorna la organización activa para filtrado de datos.
|
|
Superusuarios usan active_organization; usuarios normales usan organizacion."""
|
|
if user.is_superuser:
|
|
return getattr(user, 'active_organization', None)
|
|
return getattr(user, 'organizacion', None)
|
|
|
|
|
|
def user_has_permission(user, codename):
|
|
"""Verifica si un usuario tiene un permiso RBAC por su codename.
|
|
|
|
Orden de evaluación:
|
|
1. is_superuser → True siempre
|
|
2. UserPermission deny explícito → False
|
|
3. UserPermission grant explícito → True
|
|
4. Algún UserRole en su org tiene el permiso → True
|
|
5. Denegar
|
|
"""
|
|
if user.is_superuser:
|
|
return True
|
|
|
|
org = getattr(user, 'organizacion', None)
|
|
if not org:
|
|
return False
|
|
|
|
from api.rbac.models import UserPermission, UserRole
|
|
|
|
try:
|
|
override = UserPermission.objects.get(user=user, permission__codename=codename)
|
|
return override.granted
|
|
except UserPermission.DoesNotExist:
|
|
pass
|
|
|
|
return UserRole.objects.filter(
|
|
user=user,
|
|
role__organizacion=org,
|
|
role__permissions__codename=codename,
|
|
).exists()
|
|
|
|
|
|
def user_has_role(user, role_name):
|
|
"""Verifica si un usuario tiene un rol por nombre dentro de su organización.
|
|
Función puente durante la transición — lee desde UserRole en lugar de auth.Group."""
|
|
from api.rbac.models import UserRole
|
|
|
|
org = getattr(user, 'organizacion', None)
|
|
if not org:
|
|
return False
|
|
return UserRole.objects.filter(
|
|
user=user,
|
|
role__nombre=role_name,
|
|
role__organizacion=org,
|
|
).exists()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Base compartida — aplica el requisito de org activa a superusuarios
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class OrgScopedPermission(permissions.BasePermission):
|
|
"""Base para todas las clases de permiso con scope de organización.
|
|
Superusuario sin active_organization recibe 403, EXCEPTO service accounts
|
|
(Token auth + superuser) que pasan sin restricción de org."""
|
|
|
|
message = 'No tienes permiso para realizar esta acción.'
|
|
|
|
def has_permission(self, request, view):
|
|
if not request.user.is_authenticated:
|
|
return False
|
|
if request.user.is_superuser:
|
|
from rest_framework.authentication import TokenAuthentication
|
|
# Service account interno: Token auth + superuser → siempre permitido
|
|
if isinstance(getattr(request, 'successful_authenticator', None), TokenAuthentication):
|
|
return True
|
|
# Superuser JWT: requiere active_organization
|
|
if not getattr(request.user, 'active_organization', None):
|
|
return False
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Clases de permiso
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class IsSameOrganization(OrgScopedPermission):
|
|
"""Usuario autenticado con org activa. Cualquier rol pasa (incluyendo Importador)."""
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
org = get_org_context(request.user)
|
|
if not org:
|
|
return False
|
|
dirigido = getattr(obj, 'dirigido', None)
|
|
if dirigido:
|
|
return getattr(dirigido, 'organizacion', None) == org
|
|
return getattr(obj, 'organizacion', None) == org
|
|
|
|
|
|
class IsSameOrganizationAndAdmin(OrgScopedPermission):
|
|
"""Usuario con rol admin, Agente Aduanal o user en su organización."""
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
user = request.user
|
|
if user.is_superuser:
|
|
return True
|
|
org = get_org_context(user)
|
|
if not org:
|
|
return False
|
|
tiene_rol = (
|
|
user_has_role(user, 'admin') or
|
|
user_has_role(user, 'Agente Aduanal') or
|
|
user_has_role(user, 'user')
|
|
)
|
|
if not tiene_rol:
|
|
return False
|
|
return getattr(obj, 'organizacion', None) == org
|
|
|
|
|
|
class IsSameOrganizationDeveloper(OrgScopedPermission):
|
|
"""Usuario con rol developer, Agente Aduanal o user en su organización."""
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
user = request.user
|
|
if user.is_superuser:
|
|
return True
|
|
org = get_org_context(user)
|
|
if not org:
|
|
return False
|
|
tiene_rol = (
|
|
user_has_role(user, 'developer') or
|
|
user_has_role(user, 'Agente Aduanal') or
|
|
user_has_role(user, 'user')
|
|
)
|
|
if not tiene_rol:
|
|
return False
|
|
return getattr(obj, 'organizacion', None) == org
|
|
|
|
|
|
class IsOwnerOrOrgAdmin(OrgScopedPermission):
|
|
"""El propio usuario, staff de Django o usuario con rol admin en la org."""
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
user = request.user
|
|
return (
|
|
obj == user or
|
|
user.is_staff or
|
|
user.is_superuser or
|
|
user_has_role(user, 'admin')
|
|
)
|
|
|
|
|
|
class IsSuperUser(permissions.BasePermission):
|
|
"""Solo superusuarios de Django. No requiere org activa (para endpoints de gestión global)."""
|
|
|
|
message = 'No tienes permiso para realizar esta acción.'
|
|
|
|
def has_permission(self, request, view):
|
|
return request.user.is_authenticated and request.user.is_superuser
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
return request.user.is_superuser
|
|
|
|
|
|
class IsInternalService(permissions.BasePermission):
|
|
"""
|
|
Identifica llamadas internas de microservicio → backend.
|
|
|
|
Criterio: autenticación via Token (no JWT) + usuario superuser.
|
|
Esto garantiza que solo cuentas de servicio predefinidas pasan,
|
|
sin depender de flags manuales como is_staff que pueden no estar
|
|
configurados en producción.
|
|
"""
|
|
|
|
message = 'Acceso reservado para servicios internos.'
|
|
|
|
def has_permission(self, request, view):
|
|
if not request.user or not request.user.is_authenticated:
|
|
return False
|
|
from rest_framework.authentication import TokenAuthentication
|
|
return (
|
|
isinstance(request.successful_authenticator, TokenAuthentication)
|
|
and request.user.is_superuser
|
|
)
|
|
|
|
|
|
class HasStoragePermission(OrgScopedPermission):
|
|
"""Usuarios con acceso a operaciones de almacenamiento (organizacion.view)."""
|
|
|
|
def has_permission(self, request, view):
|
|
if not super().has_permission(request, view):
|
|
return False
|
|
return user_has_permission(request.user, 'organizacion.view')
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
return user_has_permission(request.user, 'organizacion.view')
|
|
|
|
|
|
def require_permission(codename):
|
|
"""
|
|
Devuelve una clase de permiso DRF que exige el codename RBAC indicado.
|
|
Uso en permission_classes: require_permission('pedimentos.view')
|
|
Uso en get_permissions(): require_permission('pedimentos.create')()
|
|
"""
|
|
class _RbacPerm(OrgScopedPermission):
|
|
def has_permission(self, request, view):
|
|
if not super().has_permission(request, view):
|
|
return False
|
|
return user_has_permission(request.user, codename)
|
|
_RbacPerm.__name__ = f'HasPerm_{codename.replace(".", "_")}'
|
|
_RbacPerm.__qualname__ = _RbacPerm.__name__
|
|
return _RbacPerm
|
|
|
|
|
|
class IsSameOrganizationAndInAllowedGroups(OrgScopedPermission):
|
|
"""Usuario con permiso vucem.manage en su organización.
|
|
Reemplaza la lógica rota que requería 3 grupos simultáneamente."""
|
|
|
|
def has_object_permission(self, request, view, obj):
|
|
user = request.user
|
|
if user.is_superuser:
|
|
return True
|
|
org = get_org_context(user)
|
|
if not org:
|
|
return False
|
|
if not user_has_permission(user, 'vucem.manage'):
|
|
return False
|
|
return getattr(obj, 'organizacion', None) == org
|