import os import gc from django.core.management.base import BaseCommand from django.conf import settings from django.apps import apps from django.db import models from django.db import connection class Command(BaseCommand): help = 'Elimina archivos de media que no están referenciados en la base de datos' def add_arguments(self, parser): parser.add_argument( '--dry-run', action='store_true', help='Muestra qué archivos se eliminarían sin eliminarlos', ) parser.add_argument( '--verbose', action='store_true', help='Muestra información detallada del proceso', ) parser.add_argument( '--batch-size', type=int, default=10000, help='Tamaño del lote para procesar archivos (default: 10000)', ) parser.add_argument( '--limit', type=int, help='Limitar el número de archivos huérfanos a procesar', ) def handle(self, *args, **options): media_root = settings.MEDIA_ROOT if not os.path.exists(media_root): self.stdout.write( self.style.ERROR(f'El directorio MEDIA_ROOT no existe: {media_root}') ) return if options['verbose']: self.stdout.write(f'Analizando archivos en: {media_root}') # Obtener archivos de BD de forma más eficiente files_in_db = self._get_db_files_optimized(options) if options['verbose']: self.stdout.write(f'Archivos referenciados en BD: {len(files_in_db)}') # Procesar archivos en lotes para evitar problemas de memoria orphaned_files = self._find_orphaned_files_batch(media_root, files_in_db, options) if not orphaned_files: self.stdout.write( self.style.SUCCESS('No se encontraron archivos huérfanos') ) return # Aplicar límite si se especifica if options['limit'] and len(orphaned_files) > options['limit']: orphaned_files = list(orphaned_files)[:options['limit']] self.stdout.write(f'Limitando a {options["limit"]} archivos') # Calcula el tamaño total de archivos huérfanos total_size = self._calculate_total_size(orphaned_files, options) size_mb = total_size / (1024 * 1024) self.stdout.write( f'Archivos huérfanos encontrados: {len(orphaned_files)} ' f'({size_mb:.2f} MB)' ) if options['dry_run']: self.stdout.write('\n--- MODO PRUEBA: Los siguientes archivos se eliminarían ---') for i, file_path in enumerate(sorted(orphaned_files)): if i >= 50: # Limitar salida en dry-run remaining = len(orphaned_files) - 50 self.stdout.write(f' ... y {remaining} archivos más') break relative_path = os.path.relpath(file_path, media_root) self.stdout.write(f' - {relative_path}') else: # Pide confirmación antes de eliminar confirm = input( f'\n¿Estás seguro de que quieres eliminar {len(orphaned_files)} archivos? (s/N): ' ) if confirm.lower() not in ['s', 'si', 'sí', 'y', 'yes']: self.stdout.write('Operación cancelada') return self._delete_files_batch(orphaned_files, media_root, options) def _get_db_files_optimized(self, options): """Obtiene archivos de BD de forma optimizada""" files_in_db = set() media_root = settings.MEDIA_ROOT for model in apps.get_models(): file_fields = [ field for field in model._meta.get_fields() if isinstance(field, (models.FileField, models.ImageField)) ] if not file_fields: continue if options['verbose']: self.stdout.write(f'Procesando modelo {model.__name__}...') # Procesar en lotes para evitar cargar todo en memoria batch_size = options['batch_size'] offset = 0 while True: field_names = [field.name for field in file_fields] queryset = model.objects.values_list(*field_names)[offset:offset + batch_size] batch = list(queryset) if not batch: break for row in batch: for file_path in row: if file_path: full_path = os.path.join(media_root, str(file_path)) files_in_db.add(full_path) offset += batch_size if options['verbose'] and offset % (batch_size * 10) == 0: self.stdout.write(f' Procesados {offset} registros...') # Forzar liberación de memoria gc.collect() return files_in_db def _find_orphaned_files_batch(self, media_root, files_in_db, options): """Encuentra archivos huérfanos procesando en lotes""" orphaned_files = [] processed_count = 0 self.stdout.write('Buscando archivos huérfanos...') for root, dirs, files in os.walk(media_root): for file in files: if file.startswith('.'): continue full_path = os.path.join(root, file) if full_path not in files_in_db: orphaned_files.append(full_path) processed_count += 1 if options['verbose'] and processed_count % 50000 == 0: self.stdout.write(f'Procesados {processed_count} archivos, encontrados {len(orphaned_files)} huérfanos...') # Liberar memoria periódicamente if processed_count % 100000 == 0: gc.collect() return orphaned_files def _calculate_total_size(self, orphaned_files, options): """Calcula el tamaño total de archivos huérfanos""" total_size = 0 count = 0 for file_path in orphaned_files: try: total_size += os.path.getsize(file_path) count += 1 if options['verbose'] and count % 10000 == 0: self.stdout.write(f'Calculando tamaño... {count}/{len(orphaned_files)}') except OSError: pass return total_size def _delete_files_batch(self, orphaned_files, media_root, options): """Elimina archivos en lotes""" deleted_count = 0 deleted_size = 0 batch_size = 1000 total_files = len(orphaned_files) for i in range(0, total_files, batch_size): batch = orphaned_files[i:i + batch_size] for file_path in batch: try: file_size = os.path.getsize(file_path) os.remove(file_path) deleted_count += 1 deleted_size += file_size if options['verbose'] and deleted_count % 5000 == 0: relative_path = os.path.relpath(file_path, media_root) self.stdout.write(f'Progreso: {deleted_count}/{total_files} - Eliminado: {relative_path}') except OSError as e: relative_path = os.path.relpath(file_path, media_root) self.stdout.write( self.style.ERROR(f'Error eliminando {relative_path}: {e}') ) # Mostrar progreso por lotes progress = (i + batch_size) / total_files * 100 self.stdout.write(f'Progreso: {min(progress, 100):.1f}% ({deleted_count} eliminados)') # Forzar liberación de memoria gc.collect() # Elimina directorios vacíos self._remove_empty_dirs(media_root) deleted_mb = deleted_size / (1024 * 1024) self.stdout.write( self.style.SUCCESS( f'Eliminados {deleted_count} archivos huérfanos ({deleted_mb:.2f} MB)' ) ) def _remove_empty_dirs(self, path): """Elimina directorios vacíos recursivamente""" for root, dirs, files in os.walk(path, topdown=False): for dir_name in dirs: dir_path = os.path.join(root, dir_name) try: if not os.listdir(dir_path): os.rmdir(dir_path) except OSError: pass