from django.db.models import Count from rest_framework import status from rest_framework.decorators import action from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.views import APIView from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet from api.rbac.models import OrganizationRole, RolePermission, UserPermission, UserRole from api.rbac.serializers import ( MyPermissionsSerializer, OrganizationRoleSerializer, OrganizationRoleWriteSerializer, RolePermissionSerializer, UserPermissionSerializer, UserRoleSerializer, ) from core.permissions import OrgScopedPermission, get_org_context, is_internal_service_request, require_permission, user_has_permission def _require_manage_roles(user): """Retorna True si el usuario puede gestionar roles/permisos en su org.""" return user.is_superuser or user_has_permission(user, 'usuarios.manage_roles') # --------------------------------------------------------------------------- # Catálogo de permisos (lectura para todos los autenticados con org) # --------------------------------------------------------------------------- class RolePermissionViewSet(ReadOnlyModelViewSet): """Lista el catálogo global de permisos disponibles, agrupados por módulo.""" my_tags = ['RBAC'] serializer_class = RolePermissionSerializer permission_classes = [IsAuthenticated, require_permission('usuarios.manage_roles')] def get_queryset(self): return RolePermission.objects.all().order_by('modulo', 'codename') @action(detail=False, methods=['get'], url_path='by-module') def by_module(self, request): """Devuelve el catálogo agrupado por módulo.""" perms = self.get_queryset() result = {} for p in perms: result.setdefault(p.modulo, []).append( RolePermissionSerializer(p).data ) return Response(result) # --------------------------------------------------------------------------- # Roles de la organización # --------------------------------------------------------------------------- class OrganizationRoleViewSet(ModelViewSet): """ CRUD de roles de la organización activa. Solo usuarios con usuarios.manage_roles pueden crear/editar/eliminar. """ my_tags = ['RBAC'] permission_classes = [IsAuthenticated, require_permission('usuarios.manage_roles')] def get_queryset(self): if is_internal_service_request(self.request): return ( OrganizationRole.objects .annotate(user_count=Count('user_roles')) .prefetch_related('permissions') .order_by('nombre') ) org = get_org_context(self.request.user) if not org: return OrganizationRole.objects.none() return ( OrganizationRole.objects .filter(organizacion=org) .annotate(user_count=Count('user_roles')) .prefetch_related('permissions') .order_by('nombre') ) def get_serializer_class(self): if self.action in ('create', 'update', 'partial_update'): return OrganizationRoleWriteSerializer return OrganizationRoleSerializer def _check_manage_roles(self): if not _require_manage_roles(self.request.user): return Response( {'detail': 'Se requiere el permiso usuarios.manage_roles.'}, status=status.HTTP_403_FORBIDDEN, ) return None def create(self, request, *args, **kwargs): err = self._check_manage_roles() if err: return err org = get_org_context(request.user) if not org: return Response({'detail': 'Sin organización activa.'}, status=status.HTTP_403_FORBIDDEN) serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) serializer.save(organizacion=org) return Response( OrganizationRoleSerializer(serializer.instance).data, status=status.HTTP_201_CREATED, ) def update(self, request, *args, **kwargs): err = self._check_manage_roles() if err: return err instance = self.get_object() # No se puede cambiar nombre ni permisos de un rol is_admin_role if instance.is_admin_role and not request.user.is_superuser: return Response( {'detail': 'No se puede modificar un rol de administrador.'}, status=status.HTTP_403_FORBIDDEN, ) return super().update(request, *args, **kwargs) def destroy(self, request, *args, **kwargs): err = self._check_manage_roles() if err: return err instance = self.get_object() if instance.is_admin_role and not request.user.is_superuser: return Response( {'detail': 'No se puede eliminar un rol de administrador.'}, status=status.HTTP_403_FORBIDDEN, ) if instance.user_roles.exists(): return Response( {'detail': 'No se puede eliminar un rol con usuarios asignados.'}, status=status.HTTP_400_BAD_REQUEST, ) return super().destroy(request, *args, **kwargs) # --------------------------------------------------------------------------- # Asignación de roles a usuarios # --------------------------------------------------------------------------- class UserRoleViewSet(ModelViewSet): """ Asigna y revoca roles de usuarios en la organización activa. Solo usuarios con usuarios.manage_roles pueden modificar. """ my_tags = ['RBAC'] serializer_class = UserRoleSerializer permission_classes = [IsAuthenticated, require_permission('usuarios.manage_roles')] http_method_names = ['get', 'post', 'delete', 'head', 'options'] def get_queryset(self): if is_internal_service_request(self.request): qs = UserRole.objects.select_related('user', 'role') user_id = self.request.query_params.get('user_id') if user_id: qs = qs.filter(user_id=user_id) return qs org = get_org_context(self.request.user) if not org: return UserRole.objects.none() qs = ( UserRole.objects .filter(role__organizacion=org) .select_related('user', 'role') ) user_id = self.request.query_params.get('user_id') if user_id: qs = qs.filter(user_id=user_id) return qs def create(self, request, *args, **kwargs): if not _require_manage_roles(request.user): return Response( {'detail': 'Se requiere el permiso usuarios.manage_roles.'}, status=status.HTTP_403_FORBIDDEN, ) org = get_org_context(request.user) if not org: return Response({'detail': 'Sin organización activa.'}, status=status.HTTP_403_FORBIDDEN) user_id = request.data.get('user_id') role_id = request.data.get('role_id') if not user_id or not role_id: return Response({'detail': 'user_id y role_id son requeridos.'}, status=status.HTTP_400_BAD_REQUEST) # Verificar que el rol pertenece a la misma org try: role = OrganizationRole.objects.get(id=role_id, organizacion=org) except OrganizationRole.DoesNotExist: return Response({'detail': 'El rol no pertenece a esta organización.'}, status=status.HTTP_404_NOT_FOUND) # Verificar que el usuario pertenece a la misma org from api.cuser.models import CustomUser try: target_user = CustomUser.objects.get(id=user_id, organizacion=org) except CustomUser.DoesNotExist: return Response({'detail': 'El usuario no pertenece a esta organización.'}, status=status.HTTP_404_NOT_FOUND) user_role, created = UserRole.objects.get_or_create(user=target_user, role=role) serializer = self.get_serializer(user_role) return Response(serializer.data, status=status.HTTP_201_CREATED if created else status.HTTP_200_OK) def destroy(self, request, *args, **kwargs): if not _require_manage_roles(request.user): return Response( {'detail': 'Se requiere el permiso usuarios.manage_roles.'}, status=status.HTTP_403_FORBIDDEN, ) instance = self.get_object() org = get_org_context(request.user) # Proteger al owner de la org: no se le puede quitar el rol admin if org and hasattr(org, 'owner') and org.owner and instance.user == org.owner: if instance.role.is_admin_role: return Response( {'detail': 'No se puede revocar el rol de administrador al propietario de la organización.'}, status=status.HTTP_403_FORBIDDEN, ) return super().destroy(request, *args, **kwargs) # --------------------------------------------------------------------------- # Permisos singulares (overrides por usuario) # --------------------------------------------------------------------------- class UserPermissionViewSet(ModelViewSet): """ Otorga o deniega permisos singulares a usuarios, sin necesidad de crear un rol. granted=true → otorgar; granted=false → denegar explícitamente (override sobre roles). Solo usuarios con usuarios.manage_roles pueden modificar. """ my_tags = ['RBAC'] serializer_class = UserPermissionSerializer permission_classes = [IsAuthenticated, require_permission('usuarios.manage_roles')] http_method_names = ['get', 'post', 'patch', 'delete', 'head', 'options'] def get_queryset(self): if is_internal_service_request(self.request): qs = UserPermission.objects.select_related('user', 'permission') user_id = self.request.query_params.get('user_id') if user_id: qs = qs.filter(user_id=user_id) return qs org = get_org_context(self.request.user) if not org: return UserPermission.objects.none() qs = ( UserPermission.objects .filter(user__organizacion=org) .select_related('user', 'permission') ) user_id = self.request.query_params.get('user_id') if user_id: qs = qs.filter(user_id=user_id) return qs def _check(self): if not _require_manage_roles(self.request.user): return Response( {'detail': 'Se requiere el permiso usuarios.manage_roles.'}, status=status.HTTP_403_FORBIDDEN, ) return None def create(self, request, *args, **kwargs): err = self._check() if err: return err org = get_org_context(request.user) if not org: return Response({'detail': 'Sin organización activa.'}, status=status.HTTP_403_FORBIDDEN) user_id = request.data.get('user_id') permission_id = request.data.get('permission_id') granted = request.data.get('granted', True) if not user_id or not permission_id: return Response({'detail': 'user_id y permission_id son requeridos.'}, status=status.HTTP_400_BAD_REQUEST) from api.cuser.models import CustomUser try: target_user = CustomUser.objects.get(id=user_id, organizacion=org) except CustomUser.DoesNotExist: return Response({'detail': 'El usuario no pertenece a esta organización.'}, status=status.HTTP_404_NOT_FOUND) try: perm = RolePermission.objects.get(id=permission_id) except RolePermission.DoesNotExist: return Response({'detail': 'Permiso no encontrado.'}, status=status.HTTP_404_NOT_FOUND) override, created = UserPermission.objects.update_or_create( user=target_user, permission=perm, defaults={'granted': granted}, ) serializer = self.get_serializer(override) return Response(serializer.data, status=status.HTTP_201_CREATED if created else status.HTTP_200_OK) def partial_update(self, request, *args, **kwargs): err = self._check() if err: return err return super().partial_update(request, *args, **kwargs) def destroy(self, request, *args, **kwargs): err = self._check() if err: return err return super().destroy(request, *args, **kwargs) # --------------------------------------------------------------------------- # Mis permisos efectivos (para el frontend) # --------------------------------------------------------------------------- class MyPermissionsView(APIView): """ Retorna los permisos efectivos del usuario autenticado. El frontend usa esto para decidir qué mostrar/ocultar. """ my_tags = ['RBAC'] permission_classes = [IsAuthenticated & OrgScopedPermission] def get(self, request): user = request.user org = get_org_context(user) if user.is_superuser: all_perms = list(RolePermission.objects.values_list('codename', flat=True)) return Response({'permissions': all_perms, 'roles': ['superuser']}) if not org: return Response({'permissions': [], 'roles': []}) # Roles del usuario en la org roles = list( UserRole.objects.filter(user=user, role__organizacion=org) .values_list('role__nombre', flat=True) ) # Permisos de roles perms_set = set( UserRole.objects.filter(user=user, role__organizacion=org) .values_list('role__permissions__codename', flat=True) ) perms_set.discard(None) # Aplicar overrides singulares for override in UserPermission.objects.filter(user=user).select_related('permission'): if override.granted: perms_set.add(override.permission.codename) else: perms_set.discard(override.permission.codename) return Response({'permissions': sorted(perms_set), 'roles': roles}) # --------------------------------------------------------------------------- # Switch de organización (solo superusuarios) # --------------------------------------------------------------------------- class SwitchOrganizationView(APIView): """ Permite a un superusuario cambiar su organización activa. POST { "organization_id": "" } → actualiza active_organization del superuser. DELETE → limpia active_organization (el superuser queda sin contexto de org). """ my_tags = ['RBAC'] permission_classes = [IsAuthenticated] def post(self, request): if not request.user.is_superuser: return Response( {'detail': 'Solo superusuarios pueden cambiar de organización.'}, status=status.HTTP_403_FORBIDDEN, ) org_id = request.data.get('organization_id') if not org_id: return Response({'detail': 'organization_id es requerido.'}, status=status.HTTP_400_BAD_REQUEST) from api.organization.models import Organizacion try: import uuid as _uuid org = Organizacion.objects.get(id=_uuid.UUID(str(org_id))) except (Organizacion.DoesNotExist, ValueError): return Response({'detail': 'Organización no encontrada.'}, status=status.HTTP_404_NOT_FOUND) request.user.active_organization = org request.user.save(update_fields=['active_organization']) return Response({ 'detail': f'Organización activa actualizada a: {org.nombre}', 'organization': {'id': str(org.id), 'nombre': org.nombre}, }) def delete(self, request): if not request.user.is_superuser: return Response( {'detail': 'Solo superusuarios pueden limpiar la organización activa.'}, status=status.HTTP_403_FORBIDDEN, ) request.user.active_organization = None request.user.save(update_fields=['active_organization']) return Response({'detail': 'Organización activa removida.'})