feature/rbac permisos y roles implementados

This commit is contained in:
2026-05-21 07:54:59 -06:00
parent 9bbed42cf3
commit a318b70324
38 changed files with 2596 additions and 901 deletions

View File

@@ -1,100 +1,244 @@
# permissions.py
from rest_framework import permissions
from api.cuser.models import CustomUser
from rest_framework.exceptions import PermissionDenied
from rest_framework.authentication import TokenAuthentication
class IsSameOrganization(permissions.BasePermission):
"""
Permiso personalizado que solo permite acceder a usuarios de la misma organización
o a administradores/staff.
"""
def has_permission(self, request, view):
# Permite listar/crear solo si el usuario está autenticado
return request.user.is_authenticated
def has_object_permission(self, request, view, obj):
# Permite operaciones sobre un objeto específico solo si:
# - El objeto pertenece a la misma organización (acceso por usuario relacionado)
return (getattr(obj, 'dirigido', None) and obj.dirigido.organizacion == request.user.organizacion)
class IsSameOrganizationAndAdmin(permissions.BasePermission):
"""
Permiso personalizado que solo permite acceder a usuarios de la misma organización
o a administradores/staff.
"""
def has_permission(self, request, view):
# Permite listar/crear solo si el usuario está autenticado
return request.user.is_authenticated
# ---------------------------------------------------------------------------
# Helpers centrales — toda la lógica de RBAC pasa por aquí
# ---------------------------------------------------------------------------
def has_object_permission(self, request, view, obj):
# Permite operaciones solo si el usuario es admin, Agente Aduanal o user y la organización coincide
allowed_groups = ['admin', 'Agente Aduanal', 'user']
user_in_group = request.user.groups.filter(name__in=allowed_groups).exists()
if not user_in_group:
return False
if hasattr(obj, 'organizacion'):
return obj.organizacion == request.user.organizacion
def is_internal_service_request(request):
"""True si la petición proviene de un service account (Token auth + superuser).
Misma lógica que IsInternalService, útil en get_queryset() y perform_* methods."""
user = getattr(request, 'user', None)
if not user or not user.is_superuser:
return False
class IsSameOrganizationDeveloper(permissions.BasePermission):
"""
Permiso personalizado que solo permite acceder a usuarios de la misma organización
o a administradores/staff.
"""
def has_permission(self, request, view):
# Permite listar/crear solo si el usuario está autenticado
return request.user.is_authenticated
return isinstance(getattr(request, 'successful_authenticator', None), TokenAuthentication)
def has_object_permission(self, request, view, obj):
# Permite operaciones solo si el usuario es developer, Agente Aduanal o user y la organización coincide
allowed_groups = ['developer', 'Agente Aduanal', 'user']
user_in_group = request.user.groups.filter(name__in=allowed_groups).exists()
if not user_in_group:
return False
if hasattr(obj, 'organizacion'):
return obj.organizacion == request.user.organizacion
def get_org_context(user):
"""Retorna la organización activa para filtrado de datos.
Superusuarios usan active_organization; usuarios normales usan organizacion."""
if user.is_superuser:
return getattr(user, 'active_organization', None)
return getattr(user, 'organizacion', None)
def user_has_permission(user, codename):
"""Verifica si un usuario tiene un permiso RBAC por su codename.
Orden de evaluación:
1. is_superuser → True siempre
2. UserPermission deny explícito → False
3. UserPermission grant explícito → True
4. Algún UserRole en su org tiene el permiso → True
5. Denegar
"""
if user.is_superuser:
return True
org = getattr(user, 'organizacion', None)
if not org:
return False
class IsOwnerOrOrgAdmin(permissions.BasePermission):
def has_object_permission(self, request, view, obj):
return (
obj == request.user or
request.user.is_staff or
request.user.groups.filter(name='admin').exists()
)
class IsSuperUser(permissions.BasePermission):
def has_object_permission(self, request, view, obj):
return request.user.is_superuser
class HasStoragePermission(permissions.BasePermission):
"""
Permiso personalizado que permite el acceso a los usuarios que tienen permisos de almacenamiento.
"""
from api.rbac.models import UserPermission, UserRole
try:
override = UserPermission.objects.get(user=user, permission__codename=codename)
return override.granted
except UserPermission.DoesNotExist:
pass
return UserRole.objects.filter(
user=user,
role__organizacion=org,
role__permissions__codename=codename,
).exists()
def user_has_role(user, role_name):
"""Verifica si un usuario tiene un rol por nombre dentro de su organización.
Función puente durante la transición — lee desde UserRole en lugar de auth.Group."""
from api.rbac.models import UserRole
org = getattr(user, 'organizacion', None)
if not org:
return False
return UserRole.objects.filter(
user=user,
role__nombre=role_name,
role__organizacion=org,
).exists()
# ---------------------------------------------------------------------------
# Base compartida — aplica el requisito de org activa a superusuarios
# ---------------------------------------------------------------------------
class OrgScopedPermission(permissions.BasePermission):
"""Base para todas las clases de permiso con scope de organización.
Superusuario sin active_organization recibe 403, EXCEPTO service accounts
(Token auth + superuser) que pasan sin restricción de org."""
message = 'No tienes permiso para realizar esta acción.'
def has_permission(self, request, view):
# Permite el acceso si el usuario tiene el permiso 'can_access_storage'
return request.user.has_perm('api.cuser.can_access_storage')
if not request.user.is_authenticated:
return False
if request.user.is_superuser:
from rest_framework.authentication import TokenAuthentication
# Service account interno: Token auth + superuser → siempre permitido
if isinstance(getattr(request, 'successful_authenticator', None), TokenAuthentication):
return True
# Superuser JWT: requiere active_organization
if not getattr(request.user, 'active_organization', None):
return False
return True
# ---------------------------------------------------------------------------
# Clases de permiso
# ---------------------------------------------------------------------------
class IsSameOrganization(OrgScopedPermission):
"""Usuario autenticado con org activa. Cualquier rol pasa (incluyendo Importador)."""
def has_object_permission(self, request, view, obj):
# Permite operaciones sobre un objeto específico si el usuario tiene el permiso
return request.user.has_perm('api.cuser.can_access_storage')
org = get_org_context(request.user)
if not org:
return False
dirigido = getattr(obj, 'dirigido', None)
if dirigido:
return getattr(dirigido, 'organizacion', None) == org
return getattr(obj, 'organizacion', None) == org
class IsSameOrganizationAndInAllowedGroups(permissions.BasePermission):
"""
Permite update/delete solo si el usuario está en TODOS los grupos permitidos
y pertenece a la misma organización que el registro, o es superuser.
"""
allowed_groups = ['admin', 'Agente Aduanal', 'user']
class IsSameOrganizationAndAdmin(OrgScopedPermission):
"""Usuario con rol admin, Agente Aduanal o user en su organización."""
def has_object_permission(self, request, view, obj):
user = request.user
if not user.is_authenticated:
return False
if user.is_superuser:
return True
if not hasattr(user, 'organizacion') or not user.organizacion:
org = get_org_context(user)
if not org:
return False
# Debe tener los tres grupos asignados
for group in self.allowed_groups:
if not user.groups.filter(name=group).exists():
tiene_rol = (
user_has_role(user, 'admin') or
user_has_role(user, 'Agente Aduanal') or
user_has_role(user, 'user')
)
if not tiene_rol:
return False
return getattr(obj, 'organizacion', None) == org
class IsSameOrganizationDeveloper(OrgScopedPermission):
"""Usuario con rol developer, Agente Aduanal o user en su organización."""
def has_object_permission(self, request, view, obj):
user = request.user
if user.is_superuser:
return True
org = get_org_context(user)
if not org:
return False
tiene_rol = (
user_has_role(user, 'developer') or
user_has_role(user, 'Agente Aduanal') or
user_has_role(user, 'user')
)
if not tiene_rol:
return False
return getattr(obj, 'organizacion', None) == org
class IsOwnerOrOrgAdmin(OrgScopedPermission):
"""El propio usuario, staff de Django o usuario con rol admin en la org."""
def has_object_permission(self, request, view, obj):
user = request.user
return (
obj == user or
user.is_staff or
user.is_superuser or
user_has_role(user, 'admin')
)
class IsSuperUser(permissions.BasePermission):
"""Solo superusuarios de Django. No requiere org activa (para endpoints de gestión global)."""
message = 'No tienes permiso para realizar esta acción.'
def has_permission(self, request, view):
return request.user.is_authenticated and request.user.is_superuser
def has_object_permission(self, request, view, obj):
return request.user.is_superuser
class IsInternalService(permissions.BasePermission):
"""
Identifica llamadas internas de microservicio → backend.
Criterio: autenticación via Token (no JWT) + usuario superuser.
Esto garantiza que solo cuentas de servicio predefinidas pasan,
sin depender de flags manuales como is_staff que pueden no estar
configurados en producción.
"""
message = 'Acceso reservado para servicios internos.'
def has_permission(self, request, view):
if not request.user or not request.user.is_authenticated:
return False
from rest_framework.authentication import TokenAuthentication
return (
isinstance(request.successful_authenticator, TokenAuthentication)
and request.user.is_superuser
)
class HasStoragePermission(OrgScopedPermission):
"""Usuarios con acceso a operaciones de almacenamiento (organizacion.view)."""
def has_permission(self, request, view):
if not super().has_permission(request, view):
return False
return user_has_permission(request.user, 'organizacion.view')
def has_object_permission(self, request, view, obj):
return user_has_permission(request.user, 'organizacion.view')
def require_permission(codename):
"""
Devuelve una clase de permiso DRF que exige el codename RBAC indicado.
Uso en permission_classes: require_permission('pedimentos.view')
Uso en get_permissions(): require_permission('pedimentos.create')()
"""
class _RbacPerm(OrgScopedPermission):
def has_permission(self, request, view):
if not super().has_permission(request, view):
return False
return obj.organizacion == user.organizacion
return user_has_permission(request.user, codename)
_RbacPerm.__name__ = f'HasPerm_{codename.replace(".", "_")}'
_RbacPerm.__qualname__ = _RbacPerm.__name__
return _RbacPerm
class IsSameOrganizationAndInAllowedGroups(OrgScopedPermission):
"""Usuario con permiso vucem.manage en su organización.
Reemplaza la lógica rota que requería 3 grupos simultáneamente."""
def has_object_permission(self, request, view, obj):
user = request.user
if user.is_superuser:
return True
org = get_org_context(user)
if not org:
return False
if not user_has_permission(user, 'vucem.manage'):
return False
return getattr(obj, 'organizacion', None) == org