import os import time from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed from django.core.management.base import BaseCommand from django.conf import settings from minio import Minio from api.record.models import Document from api.datastage.models import DataStage from api.vucem.models import Vucem from api.reports.models import ReportDocument class Command(BaseCommand): help = 'Migra archivos existentes del sistema local a MinIO (versión optimizada)' def add_arguments(self, parser): parser.add_argument('--dry-run', action='store_true', help='Solo muestra lo que se migraría') parser.add_argument('--model', type=str, help='Document, DataStage, Vucem, ReportDocument') parser.add_argument('--limit', type=int, help='Límite de registros') parser.add_argument('--batch-size', type=int, default=200, help='Tamaño del lote (default: 200)') parser.add_argument('--workers', type=int, default=3, help='Número de workers (default: 3)') parser.add_argument('--offset', type=int, default=0, help='Offset inicial (para reanudar)') def __init__(self): super().__init__() self.client = None self.bucket_name = None def _init_minio_client(self): """Inicializa el cliente MinIO""" if self.client is None: self.client = Minio( endpoint=os.getenv('MINIO_ENDPOINT', 'minio:9000'), access_key=os.getenv('MINIO_ACCESS_KEY'), secret_key=os.getenv('MINIO_SECRET_KEY'), secure=os.getenv('MINIO_SECURE', 'false').lower() == 'true' ) self.bucket_name = os.getenv('MINIO_BUCKET_NAME', 'efc-backend-dev') def handle(self, *args, **options): dry_run = options.get('dry_run', False) model_filter = options.get('model') limit = options.get('limit') batch_size = options.get('batch_size', 200) workers = options.get('workers', 3) offset = options.get('offset', 0) self.stdout.write(self.style.WARNING('=' * 60)) self.stdout.write(self.style.WARNING('INICIANDO MIGRACIÓN A MINIO (OPTIMIZADA)')) self.stdout.write(self.style.WARNING(f'Batch size: {batch_size} | Workers: {workers} | Offset: {offset}')) if dry_run: self.stdout.write(self.style.WARNING('MODO: DRY RUN (sin cambios)')) self.stdout.write(self.style.WARNING('=' * 60)) results = {} if not model_filter or model_filter.lower() == 'document': results['Document'] = self.migrate_documents(dry_run, limit, batch_size, workers, offset) if not model_filter or model_filter.lower() == 'datastage': results['DataStage'] = self.migrate_datastage(dry_run, limit, batch_size, workers, offset) if not model_filter or model_filter.lower() == 'vucem': results['Vucem'] = self.migrate_vucem(dry_run, limit, workers) if not model_filter or model_filter.lower() == 'reportdocument': results['ReportDocument'] = self.migrate_reports(dry_run, limit, batch_size, workers, offset) # Resumen final self.stdout.write('\n' + '=' * 60) self.stdout.write(self.style.SUCCESS('RESUMEN DE MIGRACIÓN')) self.stdout.write('=' * 60) total_migrados = 0 total_no_encontrados = 0 total_errores = 0 for model_name, stats in results.items(): self.stdout.write(f"\n📁 {model_name}:") self.stdout.write(f" ✅ Migrados: {stats['migrated']}") self.stdout.write(f" ⚠️ No encontrados: {stats['not_found']}") self.stdout.write(f" ❌ Errores: {stats['errors']}") total_migrados += stats['migrated'] total_no_encontrados += stats['not_found'] total_errores += stats['errors'] self.stdout.write('\n' + '-' * 40) self.stdout.write(f"📊 TOTAL Migrados: {total_migrados}") self.stdout.write(f"📊 TOTAL No encontrados: {total_no_encontrados}") self.stdout.write(f"📊 TOTAL Errores: {total_errores}") if dry_run: self.stdout.write('\n' + self.style.WARNING('⚠️ MODO DRY RUN - No se realizaron cambios')) def get_local_file_path(self, path_str): """Obtiene la ruta completa del archivo local""" return Path(settings.MEDIA_ROOT) / path_str def migrate_documents(self, dry_run, limit, batch_size, workers, offset): """Migra documentos del modelo Document""" self._init_minio_client() stats = {'migrated': 0, 'not_found': 0, 'errors': 0} queryset = Document.objects.exclude(archivo='').exclude(archivo__isnull=True) queryset = queryset.exclude(archivo__startswith='org_') queryset = queryset.order_by('created_at') if offset: queryset = queryset[offset:] if limit: queryset = queryset[:limit] total = queryset.count() self.stdout.write(f"\n📄 Procesando {total} documentos...") if total == 0: return stats start_time = time.time() processed = 0 # Procesar en lotes for batch_start in range(0, total, batch_size): batch = queryset[batch_start:batch_start + batch_size] batch_docs = list(batch) if dry_run: stats['migrated'] += len(batch_docs) processed += len(batch_docs) self._print_progress(processed, total, start_time, stats) continue # Preparar items para workers items = [] for doc in batch_docs: path_str = str(doc.archivo) local_path = self.get_local_file_path(path_str) if not local_path.exists(): stats['not_found'] += 1 continue pedimento_app = doc.pedimento.pedimento_app if doc.pedimento else 'unknown' items.append({ 'doc': doc, 'local_path': local_path, 'path_str': path_str, 'pedimento_app': pedimento_app }) # Procesar en paralelo if items: with ThreadPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(self._upload_document, item): item for item in items} for future in as_completed(futures): result = future.result() if result['success']: stats['migrated'] += 1 else: stats['errors'] += 1 processed += len(batch_docs) self._print_progress(processed, total, start_time, stats) total_time = time.time() - start_time self.stdout.write(f"\n ✅ Completado en {total_time/60:.1f} minutos") return stats def _upload_document(self, item): """Sube un documento directamente a MinIO""" try: doc = item['doc'] local_path = item['local_path'] pedimento_app = item['pedimento_app'] filename = local_path.name # Generar ruta MinIO object_name = f"org_{doc.organizacion_id}/documents/{pedimento_app}/{filename}" # Subir directamente a MinIO self.client.fput_object( bucket_name=self.bucket_name, object_name=object_name, file_path=str(local_path) ) # Actualizar base de datos doc.archivo = object_name doc.save(update_fields=['archivo']) return {'success': True, 'doc_id': doc.id} except Exception as e: return {'success': False, 'doc_id': doc.id, 'error': str(e)} def migrate_datastage(self, dry_run, limit, batch_size, workers, offset): """Migra archivos del modelo DataStage""" self._init_minio_client() stats = {'migrated': 0, 'not_found': 0, 'errors': 0} queryset = DataStage.objects.exclude(archivo='').exclude(archivo__isnull=True) queryset = queryset.exclude(archivo__startswith='org_') queryset = queryset.order_by('created_at') if offset: queryset = queryset[offset:] if limit: queryset = queryset[:limit] total = queryset.count() self.stdout.write(f"\n📦 Procesando {total} archivos DataStage...") if total == 0: return stats start_time = time.time() processed = 0 for batch_start in range(0, total, batch_size): batch = queryset[batch_start:batch_start + batch_size] batch_docs = list(batch) if dry_run: stats['migrated'] += len(batch_docs) processed += len(batch_docs) self._print_progress(processed, total, start_time, stats) continue items = [] for ds in batch_docs: path_str = str(ds.archivo) local_path = self.get_local_file_path(path_str) if not local_path.exists(): stats['not_found'] += 1 continue items.append({'ds': ds, 'local_path': local_path}) if items: with ThreadPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(self._upload_datastage, item): item for item in items} for future in as_completed(futures): result = future.result() if result['success']: stats['migrated'] += 1 else: stats['errors'] += 1 processed += len(batch_docs) self._print_progress(processed, total, start_time, stats) total_time = time.time() - start_time self.stdout.write(f"\n ✅ Completado en {total_time/60:.1f} minutos") return stats def _upload_datastage(self, item): """Sube un DataStage directamente a MinIO""" try: ds = item['ds'] local_path = item['local_path'] filename = local_path.name object_name = f"org_{ds.organizacion_id}/datastage/{filename}" self.client.fput_object( bucket_name=self.bucket_name, object_name=object_name, file_path=str(local_path) ) ds.archivo = object_name ds.save(update_fields=['archivo']) return {'success': True, 'id': ds.id} except Exception as e: return {'success': False, 'id': ds.id, 'error': str(e)} def migrate_vucem(self, dry_run, limit, workers): """Migra archivos key y cer del modelo Vucem""" self._init_minio_client() stats = {'migrated': 0, 'not_found': 0, 'errors': 0} queryset = Vucem.objects.all() if limit: queryset = queryset[:limit] total = queryset.count() * 2 self.stdout.write(f"\n🔐 Procesando {queryset.count()} registros VUCEM (key + cer)...") if total == 0: return stats items = [] for vucem in queryset: if vucem.key and not str(vucem.key).startswith('org_'): path_str = str(vucem.key) local_path = self.get_local_file_path(path_str) if local_path.exists(): items.append({'vucem': vucem, 'local_path': local_path, 'tipo': 'key'}) else: stats['not_found'] += 1 if vucem.cer and not str(vucem.cer).startswith('org_'): path_str = str(vucem.cer) local_path = self.get_local_file_path(path_str) if local_path.exists(): items.append({'vucem': vucem, 'local_path': local_path, 'tipo': 'cer'}) else: stats['not_found'] += 1 if dry_run: stats['migrated'] = len(items) self.stdout.write(f" 📝 [DRY RUN] Se migrarían {len(items)} archivos") return stats if items: with ThreadPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(self._upload_vucem, item): item for item in items} for future in as_completed(futures): result = future.result() if result['success']: stats['migrated'] += 1 self.stdout.write(self.style.SUCCESS(f" ✅ {result['tipo']} migrado: {result['id']}")) else: stats['errors'] += 1 return stats def _upload_vucem(self, item): """Sube un archivo VUCEM directamente a MinIO""" try: vucem = item['vucem'] local_path = item['local_path'] tipo = item['tipo'] filename = local_path.name if tipo == 'key': object_name = f"org_{vucem.organizacion_id}/vucem_keys/{filename}" vucem.key = object_name vucem.save(update_fields=['key']) else: object_name = f"org_{vucem.organizacion_id}/vucem_certs/{filename}" vucem.cer = object_name vucem.save(update_fields=['cer']) self.client.fput_object( bucket_name=self.bucket_name, object_name=object_name, file_path=str(local_path) ) return {'success': True, 'id': vucem.id, 'tipo': tipo} except Exception as e: return {'success': False, 'id': vucem.id, 'tipo': tipo, 'error': str(e)} def migrate_reports(self, dry_run, limit, batch_size, workers, offset): """Migra archivos del modelo ReportDocument""" self._init_minio_client() stats = {'migrated': 0, 'not_found': 0, 'errors': 0} queryset = ReportDocument.objects.exclude(file='').exclude(file__isnull=True) queryset = queryset.exclude(file__startswith='org_') queryset = queryset.order_by('created_at') if offset: queryset = queryset[offset:] if limit: queryset = queryset[:limit] total = queryset.count() self.stdout.write(f"\n📊 Procesando {total} reportes...") if total == 0: return stats start_time = time.time() processed = 0 for batch_start in range(0, total, batch_size): batch = queryset[batch_start:batch_start + batch_size] batch_docs = list(batch) if dry_run: stats['migrated'] += len(batch_docs) processed += len(batch_docs) self._print_progress(processed, total, start_time, stats) continue items = [] for report in batch_docs: path_str = str(report.file) local_path = self.get_local_file_path(path_str) if not local_path.exists(): stats['not_found'] += 1 continue items.append({'report': report, 'local_path': local_path}) if items: with ThreadPoolExecutor(max_workers=workers) as executor: futures = {executor.submit(self._upload_report, item): item for item in items} for future in as_completed(futures): result = future.result() if result['success']: stats['migrated'] += 1 else: stats['errors'] += 1 processed += len(batch_docs) self._print_progress(processed, total, start_time, stats) total_time = time.time() - start_time self.stdout.write(f"\n ✅ Completado en {total_time/60:.1f} minutos") return stats def _upload_report(self, item): """Sube un reporte directamente a MinIO""" try: report = item['report'] local_path = item['local_path'] filename = local_path.name filters = report.filters or {} org_id = filters.get('organizacion_id', 'unknown') object_name = f"org_{org_id}/reports/{filename}" self.client.fput_object( bucket_name=self.bucket_name, object_name=object_name, file_path=str(local_path) ) report.file = object_name report.save(update_fields=['file']) return {'success': True, 'id': report.id} except Exception as e: return {'success': False, 'id': report.id, 'error': str(e)} def _print_progress(self, processed, total, start_time, stats): """Imprime el progreso actual""" elapsed = time.time() - start_time rate = processed / elapsed if elapsed > 0 else 0 pct = processed * 100 / total if total > 0 else 0 self.stdout.write( f" 📊 {processed}/{total} ({pct:.1f}%) | " f"{rate:.0f} docs/seg | " f"✅ {stats['migrated']} | " f"⚠️ {stats['not_found']} | " f"❌ {stats['errors']}" )