102 lines
3.6 KiB
Python
102 lines
3.6 KiB
Python
"""
|
|
Sincroniza el catálogo de permisos de roles.py con la base de datos.
|
|
|
|
Uso básico (solo catálogo):
|
|
python manage.py sync_rbac
|
|
|
|
Con propagación a roles existentes (agrega permisos nuevos a roles que ya existen):
|
|
python manage.py sync_rbac --roles
|
|
|
|
Con listado de lo que hay actualmente:
|
|
python manage.py sync_rbac --list
|
|
"""
|
|
from django.core.management.base import BaseCommand
|
|
|
|
from api.rbac.roles import DEFAULT_ROLES, PERMISSIONS_CATALOG
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = 'Sincroniza el catálogo de permisos (roles.py → BD) sin necesidad de migración.'
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
'--roles',
|
|
action='store_true',
|
|
help='Propaga los permisos nuevos a los OrganizationRoles existentes que coincidan con DEFAULT_ROLES.',
|
|
)
|
|
parser.add_argument(
|
|
'--list',
|
|
action='store_true',
|
|
help='Lista los permisos actuales en la BD agrupados por módulo.',
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
from api.rbac.models import OrganizationRole, RolePermission
|
|
|
|
if options['list']:
|
|
self._list_permisos(RolePermission)
|
|
return
|
|
|
|
self._sync_catalogo(RolePermission)
|
|
|
|
if options['roles']:
|
|
self._sync_roles(RolePermission, OrganizationRole)
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
def _sync_catalogo(self, RolePermission):
|
|
creados = 0
|
|
existentes = 0
|
|
|
|
for codename, descripcion, modulo in PERMISSIONS_CATALOG:
|
|
_, created = RolePermission.objects.get_or_create(
|
|
codename=codename,
|
|
defaults={'descripcion': descripcion, 'modulo': modulo},
|
|
)
|
|
if created:
|
|
self.stdout.write(self.style.SUCCESS(f' [+] {codename} ({modulo})'))
|
|
creados += 1
|
|
else:
|
|
existentes += 1
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS(f'\nCatálogo: {creados} permisos creados, {existentes} ya existían.')
|
|
)
|
|
|
|
def _sync_roles(self, RolePermission, OrganizationRole):
|
|
perms_map = {p.codename: p for p in RolePermission.objects.all()}
|
|
roles_actualizados = 0
|
|
permisos_agregados = 0
|
|
|
|
for org_role in OrganizationRole.objects.select_related('organizacion').prefetch_related('permissions'):
|
|
config = DEFAULT_ROLES.get(org_role.nombre)
|
|
if not config:
|
|
continue
|
|
|
|
esperados = {c: perms_map[c] for c in config['permissions'] if c in perms_map}
|
|
actuales = {p.codename for p in org_role.permissions.all()}
|
|
nuevos = {c: p for c, p in esperados.items() if c not in actuales}
|
|
|
|
if nuevos:
|
|
org_role.permissions.add(*nuevos.values())
|
|
roles_actualizados += 1
|
|
permisos_agregados += len(nuevos)
|
|
self.stdout.write(
|
|
f' Rol "{org_role.nombre}" en {org_role.organizacion}: '
|
|
f'+{len(nuevos)} → {", ".join(nuevos.keys())}'
|
|
)
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS(
|
|
f'\nRoles: {roles_actualizados} roles actualizados, {permisos_agregados} asignaciones nuevas.'
|
|
)
|
|
)
|
|
|
|
def _list_permisos(self, RolePermission):
|
|
modulo_actual = None
|
|
for perm in RolePermission.objects.order_by('modulo', 'codename'):
|
|
if perm.modulo != modulo_actual:
|
|
modulo_actual = perm.modulo
|
|
self.stdout.write(self.style.HTTP_INFO(f'\n {modulo_actual}'))
|
|
self.stdout.write(f' {perm.codename:<40} {perm.descripcion}')
|