Files
backend/api/customs/management/commands/cleanup_media.py

243 lines
8.9 KiB
Python

import os
import gc
from django.core.management.base import BaseCommand
from django.conf import settings
from django.apps import apps
from django.db import models
from django.db import connection
class Command(BaseCommand):
help = 'Elimina archivos de media que no están referenciados en la base de datos'
def add_arguments(self, parser):
parser.add_argument(
'--dry-run',
action='store_true',
help='Muestra qué archivos se eliminarían sin eliminarlos',
)
parser.add_argument(
'--verbose',
action='store_true',
help='Muestra información detallada del proceso',
)
parser.add_argument(
'--batch-size',
type=int,
default=10000,
help='Tamaño del lote para procesar archivos (default: 10000)',
)
parser.add_argument(
'--limit',
type=int,
help='Limitar el número de archivos huérfanos a procesar',
)
def handle(self, *args, **options):
media_root = settings.MEDIA_ROOT
if not os.path.exists(media_root):
self.stdout.write(
self.style.ERROR(f'El directorio MEDIA_ROOT no existe: {media_root}')
)
return
if options['verbose']:
self.stdout.write(f'Analizando archivos en: {media_root}')
# Obtener archivos de BD de forma más eficiente
files_in_db = self._get_db_files_optimized(options)
if options['verbose']:
self.stdout.write(f'Archivos referenciados en BD: {len(files_in_db)}')
# Procesar archivos en lotes para evitar problemas de memoria
orphaned_files = self._find_orphaned_files_batch(media_root, files_in_db, options)
if not orphaned_files:
self.stdout.write(
self.style.SUCCESS('No se encontraron archivos huérfanos')
)
return
# Aplicar límite si se especifica
if options['limit'] and len(orphaned_files) > options['limit']:
orphaned_files = list(orphaned_files)[:options['limit']]
self.stdout.write(f'Limitando a {options["limit"]} archivos')
# Calcula el tamaño total de archivos huérfanos
total_size = self._calculate_total_size(orphaned_files, options)
size_mb = total_size / (1024 * 1024)
self.stdout.write(
f'Archivos huérfanos encontrados: {len(orphaned_files)} '
f'({size_mb:.2f} MB)'
)
if options['dry_run']:
self.stdout.write('\n--- MODO PRUEBA: Los siguientes archivos se eliminarían ---')
for i, file_path in enumerate(sorted(orphaned_files)):
if i >= 50: # Limitar salida en dry-run
remaining = len(orphaned_files) - 50
self.stdout.write(f' ... y {remaining} archivos más')
break
relative_path = os.path.relpath(file_path, media_root)
self.stdout.write(f' - {relative_path}')
else:
# Pide confirmación antes de eliminar
confirm = input(
f'\n¿Estás seguro de que quieres eliminar {len(orphaned_files)} archivos? (s/N): '
)
if confirm.lower() not in ['s', 'si', '', 'y', 'yes']:
self.stdout.write('Operación cancelada')
return
self._delete_files_batch(orphaned_files, media_root, options)
def _get_db_files_optimized(self, options):
"""Obtiene archivos de BD de forma optimizada"""
files_in_db = set()
media_root = settings.MEDIA_ROOT
for model in apps.get_models():
file_fields = [
field for field in model._meta.get_fields()
if isinstance(field, (models.FileField, models.ImageField))
]
if not file_fields:
continue
if options['verbose']:
self.stdout.write(f'Procesando modelo {model.__name__}...')
# Procesar en lotes para evitar cargar todo en memoria
batch_size = options['batch_size']
offset = 0
while True:
field_names = [field.name for field in file_fields]
queryset = model.objects.values_list(*field_names)[offset:offset + batch_size]
batch = list(queryset)
if not batch:
break
for row in batch:
for file_path in row:
if file_path:
full_path = os.path.join(media_root, str(file_path))
files_in_db.add(full_path)
offset += batch_size
if options['verbose'] and offset % (batch_size * 10) == 0:
self.stdout.write(f' Procesados {offset} registros...')
# Forzar liberación de memoria
gc.collect()
return files_in_db
def _find_orphaned_files_batch(self, media_root, files_in_db, options):
"""Encuentra archivos huérfanos procesando en lotes"""
orphaned_files = []
processed_count = 0
self.stdout.write('Buscando archivos huérfanos...')
for root, dirs, files in os.walk(media_root):
for file in files:
if file.startswith('.'):
continue
full_path = os.path.join(root, file)
if full_path not in files_in_db:
orphaned_files.append(full_path)
processed_count += 1
if options['verbose'] and processed_count % 50000 == 0:
self.stdout.write(f'Procesados {processed_count} archivos, encontrados {len(orphaned_files)} huérfanos...')
# Liberar memoria periódicamente
if processed_count % 100000 == 0:
gc.collect()
return orphaned_files
def _calculate_total_size(self, orphaned_files, options):
"""Calcula el tamaño total de archivos huérfanos"""
total_size = 0
count = 0
for file_path in orphaned_files:
try:
total_size += os.path.getsize(file_path)
count += 1
if options['verbose'] and count % 10000 == 0:
self.stdout.write(f'Calculando tamaño... {count}/{len(orphaned_files)}')
except OSError:
pass
return total_size
def _delete_files_batch(self, orphaned_files, media_root, options):
"""Elimina archivos en lotes"""
deleted_count = 0
deleted_size = 0
batch_size = 1000
total_files = len(orphaned_files)
for i in range(0, total_files, batch_size):
batch = orphaned_files[i:i + batch_size]
for file_path in batch:
try:
file_size = os.path.getsize(file_path)
os.remove(file_path)
deleted_count += 1
deleted_size += file_size
if options['verbose'] and deleted_count % 5000 == 0:
relative_path = os.path.relpath(file_path, media_root)
self.stdout.write(f'Progreso: {deleted_count}/{total_files} - Eliminado: {relative_path}')
except OSError as e:
relative_path = os.path.relpath(file_path, media_root)
self.stdout.write(
self.style.ERROR(f'Error eliminando {relative_path}: {e}')
)
# Mostrar progreso por lotes
progress = (i + batch_size) / total_files * 100
self.stdout.write(f'Progreso: {min(progress, 100):.1f}% ({deleted_count} eliminados)')
# Forzar liberación de memoria
gc.collect()
# Elimina directorios vacíos
self._remove_empty_dirs(media_root)
deleted_mb = deleted_size / (1024 * 1024)
self.stdout.write(
self.style.SUCCESS(
f'Eliminados {deleted_count} archivos huérfanos ({deleted_mb:.2f} MB)'
)
)
def _remove_empty_dirs(self, path):
"""Elimina directorios vacíos recursivamente"""
for root, dirs, files in os.walk(path, topdown=False):
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
try:
if not os.listdir(dir_path):
os.rmdir(dir_path)
except OSError:
pass