from rest_framework import permissions from rest_framework.exceptions import PermissionDenied from rest_framework.authentication import TokenAuthentication # --------------------------------------------------------------------------- # Helpers centrales — toda la lógica de RBAC pasa por aquí # --------------------------------------------------------------------------- def is_internal_service_request(request): """True si la petición proviene de un service account (Token auth + superuser). Misma lógica que IsInternalService, útil en get_queryset() y perform_* methods.""" user = getattr(request, 'user', None) if not user or not user.is_superuser: return False return isinstance(getattr(request, 'successful_authenticator', None), TokenAuthentication) def get_org_context(user): """Retorna la organización activa para filtrado de datos. Superusuarios usan active_organization; usuarios normales usan organizacion.""" if user.is_superuser: return getattr(user, 'active_organization', None) return getattr(user, 'organizacion', None) def user_has_permission(user, codename): """Verifica si un usuario tiene un permiso RBAC por su codename. Orden de evaluación: 1. is_superuser → True siempre 2. UserPermission deny explícito → False 3. UserPermission grant explícito → True 4. Algún UserRole en su org tiene el permiso → True 5. Denegar """ if user.is_superuser: return True org = getattr(user, 'organizacion', None) if not org: return False from api.rbac.models import UserPermission, UserRole try: override = UserPermission.objects.get(user=user, permission__codename=codename) return override.granted except UserPermission.DoesNotExist: pass return UserRole.objects.filter( user=user, role__organizacion=org, role__permissions__codename=codename, ).exists() def user_has_role(user, role_name): """Verifica si un usuario tiene un rol por nombre dentro de su organización. Función puente durante la transición — lee desde UserRole en lugar de auth.Group.""" from api.rbac.models import UserRole org = getattr(user, 'organizacion', None) if not org: return False return UserRole.objects.filter( user=user, role__nombre=role_name, role__organizacion=org, ).exists() # --------------------------------------------------------------------------- # Base compartida — aplica el requisito de org activa a superusuarios # --------------------------------------------------------------------------- class OrgScopedPermission(permissions.BasePermission): """Base para todas las clases de permiso con scope de organización. Superusuario sin active_organization recibe 403, EXCEPTO service accounts (Token auth + superuser) que pasan sin restricción de org.""" message = 'No tienes permiso para realizar esta acción.' def has_permission(self, request, view): if not request.user.is_authenticated: return False if request.user.is_superuser: from rest_framework.authentication import TokenAuthentication # Service account interno: Token auth + superuser → siempre permitido if isinstance(getattr(request, 'successful_authenticator', None), TokenAuthentication): return True # Superuser JWT: requiere active_organization if not getattr(request.user, 'active_organization', None): return False return True # --------------------------------------------------------------------------- # Clases de permiso # --------------------------------------------------------------------------- class IsSameOrganization(OrgScopedPermission): """Usuario autenticado con org activa. Cualquier rol pasa (incluyendo Importador).""" def has_object_permission(self, request, view, obj): org = get_org_context(request.user) if not org: return False dirigido = getattr(obj, 'dirigido', None) if dirigido: return getattr(dirigido, 'organizacion', None) == org return getattr(obj, 'organizacion', None) == org class IsSameOrganizationAndAdmin(OrgScopedPermission): """Usuario con rol admin, Agente Aduanal o user en su organización.""" def has_object_permission(self, request, view, obj): user = request.user if user.is_superuser: return True org = get_org_context(user) if not org: return False tiene_rol = ( user_has_role(user, 'admin') or user_has_role(user, 'Agente Aduanal') or user_has_role(user, 'user') ) if not tiene_rol: return False return getattr(obj, 'organizacion', None) == org class IsSameOrganizationDeveloper(OrgScopedPermission): """Usuario con rol developer, Agente Aduanal o user en su organización.""" def has_object_permission(self, request, view, obj): user = request.user if user.is_superuser: return True org = get_org_context(user) if not org: return False tiene_rol = ( user_has_role(user, 'developer') or user_has_role(user, 'Agente Aduanal') or user_has_role(user, 'user') ) if not tiene_rol: return False return getattr(obj, 'organizacion', None) == org class IsOwnerOrOrgAdmin(OrgScopedPermission): """El propio usuario, staff de Django o usuario con rol admin en la org.""" def has_object_permission(self, request, view, obj): user = request.user return ( obj == user or user.is_staff or user.is_superuser or user_has_role(user, 'admin') ) class IsSuperUser(permissions.BasePermission): """Solo superusuarios de Django. No requiere org activa (para endpoints de gestión global).""" message = 'No tienes permiso para realizar esta acción.' def has_permission(self, request, view): return request.user.is_authenticated and request.user.is_superuser def has_object_permission(self, request, view, obj): return request.user.is_superuser class IsInternalService(permissions.BasePermission): """ Identifica llamadas internas de microservicio → backend. Criterio: autenticación via Token (no JWT) + usuario superuser. Esto garantiza que solo cuentas de servicio predefinidas pasan, sin depender de flags manuales como is_staff que pueden no estar configurados en producción. """ message = 'Acceso reservado para servicios internos.' def has_permission(self, request, view): if not request.user or not request.user.is_authenticated: return False from rest_framework.authentication import TokenAuthentication return ( isinstance(request.successful_authenticator, TokenAuthentication) and request.user.is_superuser ) class HasStoragePermission(OrgScopedPermission): """Usuarios con acceso a operaciones de almacenamiento (organizacion.view).""" def has_permission(self, request, view): if not super().has_permission(request, view): return False return user_has_permission(request.user, 'organizacion.view') def has_object_permission(self, request, view, obj): return user_has_permission(request.user, 'organizacion.view') def require_permission(codename): """ Devuelve una clase de permiso DRF que exige el codename RBAC indicado. Uso en permission_classes: require_permission('pedimentos.view') Uso en get_permissions(): require_permission('pedimentos.create')() """ class _RbacPerm(OrgScopedPermission): def has_permission(self, request, view): if not super().has_permission(request, view): return False return user_has_permission(request.user, codename) _RbacPerm.__name__ = f'HasPerm_{codename.replace(".", "_")}' _RbacPerm.__qualname__ = _RbacPerm.__name__ return _RbacPerm class IsSameOrganizationAndInAllowedGroups(OrgScopedPermission): """Usuario con permiso vucem.manage en su organización. Reemplaza la lógica rota que requería 3 grupos simultáneamente.""" def has_object_permission(self, request, view, obj): user = request.user if user.is_superuser: return True org = get_org_context(user) if not org: return False if not user_has_permission(user, 'vucem.manage'): return False return getattr(obj, 'organizacion', None) == org