Se agregaron partidas y modelos

This commit is contained in:
2025-10-02 21:58:44 -06:00
parent 76909d0618
commit 466e12e623
30 changed files with 1090 additions and 8 deletions

View File

@@ -0,0 +1,243 @@
import os
import gc
from django.core.management.base import BaseCommand
from django.conf import settings
from django.apps import apps
from django.db import models
from django.db import connection
class Command(BaseCommand):
help = 'Elimina archivos de media que no están referenciados en la base de datos'
def add_arguments(self, parser):
parser.add_argument(
'--dry-run',
action='store_true',
help='Muestra qué archivos se eliminarían sin eliminarlos',
)
parser.add_argument(
'--verbose',
action='store_true',
help='Muestra información detallada del proceso',
)
parser.add_argument(
'--batch-size',
type=int,
default=10000,
help='Tamaño del lote para procesar archivos (default: 10000)',
)
parser.add_argument(
'--limit',
type=int,
help='Limitar el número de archivos huérfanos a procesar',
)
def handle(self, *args, **options):
media_root = settings.MEDIA_ROOT
if not os.path.exists(media_root):
self.stdout.write(
self.style.ERROR(f'El directorio MEDIA_ROOT no existe: {media_root}')
)
return
if options['verbose']:
self.stdout.write(f'Analizando archivos en: {media_root}')
# Obtener archivos de BD de forma más eficiente
files_in_db = self._get_db_files_optimized(options)
if options['verbose']:
self.stdout.write(f'Archivos referenciados en BD: {len(files_in_db)}')
# Procesar archivos en lotes para evitar problemas de memoria
orphaned_files = self._find_orphaned_files_batch(media_root, files_in_db, options)
if not orphaned_files:
self.stdout.write(
self.style.SUCCESS('No se encontraron archivos huérfanos')
)
return
# Aplicar límite si se especifica
if options['limit'] and len(orphaned_files) > options['limit']:
orphaned_files = list(orphaned_files)[:options['limit']]
self.stdout.write(f'Limitando a {options["limit"]} archivos')
# Calcula el tamaño total de archivos huérfanos
total_size = self._calculate_total_size(orphaned_files, options)
size_mb = total_size / (1024 * 1024)
self.stdout.write(
f'Archivos huérfanos encontrados: {len(orphaned_files)} '
f'({size_mb:.2f} MB)'
)
if options['dry_run']:
self.stdout.write('\n--- MODO PRUEBA: Los siguientes archivos se eliminarían ---')
for i, file_path in enumerate(sorted(orphaned_files)):
if i >= 50: # Limitar salida en dry-run
remaining = len(orphaned_files) - 50
self.stdout.write(f' ... y {remaining} archivos más')
break
relative_path = os.path.relpath(file_path, media_root)
self.stdout.write(f' - {relative_path}')
else:
# Pide confirmación antes de eliminar
confirm = input(
f'\n¿Estás seguro de que quieres eliminar {len(orphaned_files)} archivos? (s/N): '
)
if confirm.lower() not in ['s', 'si', '', 'y', 'yes']:
self.stdout.write('Operación cancelada')
return
self._delete_files_batch(orphaned_files, media_root, options)
def _get_db_files_optimized(self, options):
"""Obtiene archivos de BD de forma optimizada"""
files_in_db = set()
media_root = settings.MEDIA_ROOT
for model in apps.get_models():
file_fields = [
field for field in model._meta.get_fields()
if isinstance(field, (models.FileField, models.ImageField))
]
if not file_fields:
continue
if options['verbose']:
self.stdout.write(f'Procesando modelo {model.__name__}...')
# Procesar en lotes para evitar cargar todo en memoria
batch_size = options['batch_size']
offset = 0
while True:
field_names = [field.name for field in file_fields]
queryset = model.objects.values_list(*field_names)[offset:offset + batch_size]
batch = list(queryset)
if not batch:
break
for row in batch:
for file_path in row:
if file_path:
full_path = os.path.join(media_root, str(file_path))
files_in_db.add(full_path)
offset += batch_size
if options['verbose'] and offset % (batch_size * 10) == 0:
self.stdout.write(f' Procesados {offset} registros...')
# Forzar liberación de memoria
gc.collect()
return files_in_db
def _find_orphaned_files_batch(self, media_root, files_in_db, options):
"""Encuentra archivos huérfanos procesando en lotes"""
orphaned_files = []
processed_count = 0
self.stdout.write('Buscando archivos huérfanos...')
for root, dirs, files in os.walk(media_root):
for file in files:
if file.startswith('.'):
continue
full_path = os.path.join(root, file)
if full_path not in files_in_db:
orphaned_files.append(full_path)
processed_count += 1
if options['verbose'] and processed_count % 50000 == 0:
self.stdout.write(f'Procesados {processed_count} archivos, encontrados {len(orphaned_files)} huérfanos...')
# Liberar memoria periódicamente
if processed_count % 100000 == 0:
gc.collect()
return orphaned_files
def _calculate_total_size(self, orphaned_files, options):
"""Calcula el tamaño total de archivos huérfanos"""
total_size = 0
count = 0
for file_path in orphaned_files:
try:
total_size += os.path.getsize(file_path)
count += 1
if options['verbose'] and count % 10000 == 0:
self.stdout.write(f'Calculando tamaño... {count}/{len(orphaned_files)}')
except OSError:
pass
return total_size
def _delete_files_batch(self, orphaned_files, media_root, options):
"""Elimina archivos en lotes"""
deleted_count = 0
deleted_size = 0
batch_size = 1000
total_files = len(orphaned_files)
for i in range(0, total_files, batch_size):
batch = orphaned_files[i:i + batch_size]
for file_path in batch:
try:
file_size = os.path.getsize(file_path)
os.remove(file_path)
deleted_count += 1
deleted_size += file_size
if options['verbose'] and deleted_count % 5000 == 0:
relative_path = os.path.relpath(file_path, media_root)
self.stdout.write(f'Progreso: {deleted_count}/{total_files} - Eliminado: {relative_path}')
except OSError as e:
relative_path = os.path.relpath(file_path, media_root)
self.stdout.write(
self.style.ERROR(f'Error eliminando {relative_path}: {e}')
)
# Mostrar progreso por lotes
progress = (i + batch_size) / total_files * 100
self.stdout.write(f'Progreso: {min(progress, 100):.1f}% ({deleted_count} eliminados)')
# Forzar liberación de memoria
gc.collect()
# Elimina directorios vacíos
self._remove_empty_dirs(media_root)
deleted_mb = deleted_size / (1024 * 1024)
self.stdout.write(
self.style.SUCCESS(
f'Eliminados {deleted_count} archivos huérfanos ({deleted_mb:.2f} MB)'
)
)
def _remove_empty_dirs(self, path):
"""Elimina directorios vacíos recursivamente"""
for root, dirs, files in os.walk(path, topdown=False):
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
try:
if not os.listdir(dir_path):
os.rmdir(dir_path)
except OSError:
pass