import os import gc import time from collections import defaultdict from django.core.management.base import BaseCommand from django.conf import settings from django.apps import apps from django.db import models, connection class Command(BaseCommand): help = 'Elimina archivos de media que no están referenciados en la base de datos (versión optimizada)' def add_arguments(self, parser): parser.add_argument( '--dry-run', action='store_true', help='Muestra qué archivos se eliminarían sin eliminarlos', ) parser.add_argument( '--verbose', action='store_true', help='Muestra información detallada del proceso', ) parser.add_argument( '--limit', type=int, help='Limitar el número de archivos huérfanos a procesar', ) parser.add_argument( '--sample-size', type=int, default=10000, help='Tamaño de muestra para análisis rápido (default: 10000)', ) parser.add_argument( '--quick-scan', action='store_true', help='Hacer un escaneo rápido con muestra limitada', ) def handle(self, *args, **options): media_root = settings.MEDIA_ROOT if not os.path.exists(media_root): self.stdout.write( self.style.ERROR(f'El directorio MEDIA_ROOT no existe: {media_root}') ) return start_time = time.time() self.stdout.write(f'Iniciando análisis en: {media_root}') if options['quick_scan']: self.stdout.write(f'Modo escaneo rápido - muestra de {options["sample_size"]} archivos') orphaned_files = self._quick_scan(media_root, options) else: # Obtener archivos de BD usando SQL directo files_in_db = self._get_db_files_sql(options) if options['verbose']: self.stdout.write(f'Archivos referenciados en BD: {len(files_in_db)}') # Encontrar archivos huérfanos orphaned_files = self._find_orphaned_files(media_root, files_in_db, options) if not orphaned_files: self.stdout.write( self.style.SUCCESS('No se encontraron archivos huérfanos') ) return # Aplicar límite si se especifica if options['limit'] and len(orphaned_files) > options['limit']: orphaned_files = list(orphaned_files)[:options['limit']] self.stdout.write(f'Limitando a {options["limit"]} archivos') # Calcula el tamaño total total_size = self._calculate_size_sample(orphaned_files, options) size_mb = total_size / (1024 * 1024) elapsed_time = time.time() - start_time self.stdout.write( f'Archivos huérfanos encontrados: {len(orphaned_files)} ' f'({size_mb:.2f} MB) - Tiempo: {elapsed_time:.2f}s' ) if options['dry_run']: self._show_dry_run_results(orphaned_files, media_root) else: self._delete_files(orphaned_files, media_root, options) def _get_db_files_sql(self, options): """Obtiene archivos usando consultas SQL directas""" files_in_db = set() media_root = settings.MEDIA_ROOT # Mapear modelos a sus campos de archivo file_field_map = {} for model in apps.get_models(): file_fields = [ field for field in model._meta.get_fields() if isinstance(field, (models.FileField, models.ImageField)) ] if file_fields: file_field_map[model] = file_fields if options['verbose']: self.stdout.write(f'Encontrados {len(file_field_map)} modelos con campos de archivo') with connection.cursor() as cursor: for model, fields in file_field_map.items(): if options['verbose']: self.stdout.write(f'Procesando {model.__name__}...') table_name = model._meta.db_table field_names = [field.column for field in fields] # Construir query SQL field_selects = ', '.join(field_names) where_conditions = [] for field in field_names: where_conditions.append(f'{field} IS NOT NULL AND {field} != \'\'') where_clause = ' OR '.join(where_conditions) query = f""" SELECT {field_selects} FROM {table_name} WHERE {where_clause} """ cursor.execute(query) rows = cursor.fetchall() for row in rows: for file_path in row: if file_path: full_path = os.path.join(media_root, str(file_path)) files_in_db.add(full_path) if options['verbose']: self.stdout.write(f' {model.__name__}: {len(rows)} registros procesados') return files_in_db def _quick_scan(self, media_root, options): """Escaneo rápido con muestra limitada""" sample_files = [] count = 0 target_size = options['sample_size'] self.stdout.write('Obteniendo muestra de archivos...') for root, dirs, files in os.walk(media_root): for file in files: if file.startswith('.'): continue sample_files.append(os.path.join(root, file)) count += 1 if count >= target_size: break if count >= target_size: break self.stdout.write(f'Muestra obtenida: {len(sample_files)} archivos') # Obtener archivos de BD para comparar files_in_db = self._get_db_files_sql(options) # Encontrar huérfanos en la muestra orphaned_files = [f for f in sample_files if f not in files_in_db] return orphaned_files def _find_orphaned_files(self, media_root, files_in_db, options): """Encuentra archivos huérfanos optimizado""" orphaned_files = [] processed_count = 0 self.stdout.write('Buscando archivos huérfanos...') for root, dirs, files in os.walk(media_root): for file in files: if file.startswith('.'): continue full_path = os.path.join(root, file) if full_path not in files_in_db: orphaned_files.append(full_path) processed_count += 1 if options['verbose'] and processed_count % 100000 == 0: self.stdout.write(f'Procesados {processed_count} archivos, encontrados {len(orphaned_files)} huérfanos...') # Liberar memoria cada 500k archivos if processed_count % 500000 == 0: gc.collect() return orphaned_files def _calculate_size_sample(self, orphaned_files, options): """Calcula tamaño usando muestra si hay muchos archivos""" if len(orphaned_files) <= 1000: # Si hay pocos archivos, calcular tamaño exacto total_size = 0 for file_path in orphaned_files: try: total_size += os.path.getsize(file_path) except OSError: pass return total_size else: # Si hay muchos archivos, usar muestra para estimar sample_size = min(1000, len(orphaned_files)) sample_files = orphaned_files[:sample_size] sample_total = 0 for file_path in sample_files: try: sample_total += os.path.getsize(file_path) except OSError: pass # Extrapolar al total avg_size = sample_total / sample_size if sample_size > 0 else 0 estimated_total = avg_size * len(orphaned_files) self.stdout.write(f'Tamaño estimado basado en muestra de {sample_size} archivos') return estimated_total def _show_dry_run_results(self, orphaned_files, media_root): """Muestra resultados del dry run""" self.stdout.write('\n--- MODO PRUEBA: Los siguientes archivos se eliminarían ---') show_limit = 20 for i, file_path in enumerate(sorted(orphaned_files)): if i >= show_limit: remaining = len(orphaned_files) - show_limit self.stdout.write(f' ... y {remaining} archivos más') break relative_path = os.path.relpath(file_path, media_root) self.stdout.write(f' - {relative_path}') def _delete_files(self, orphaned_files, media_root, options): """Elimina archivos con confirmación""" confirm = input( f'\n¿Estás seguro de que quieres eliminar {len(orphaned_files)} archivos? (s/N): ' ) if confirm.lower() not in ['s', 'si', 'sí', 'y', 'yes']: self.stdout.write('Operación cancelada') return deleted_count = 0 deleted_size = 0 for i, file_path in enumerate(orphaned_files): try: file_size = os.path.getsize(file_path) os.remove(file_path) deleted_count += 1 deleted_size += file_size if options['verbose'] and deleted_count % 1000 == 0: progress = (i + 1) / len(orphaned_files) * 100 self.stdout.write(f'Progreso: {progress:.1f}% ({deleted_count} eliminados)') except OSError as e: relative_path = os.path.relpath(file_path, media_root) self.stdout.write( self.style.ERROR(f'Error eliminando {relative_path}: {e}') ) deleted_mb = deleted_size / (1024 * 1024) self.stdout.write( self.style.SUCCESS( f'Eliminados {deleted_count} archivos huérfanos ({deleted_mb:.2f} MB)' ) )