fix/de los tickets T2026-05-027, T2025-09-004 y T2025-09-056

This commit is contained in:
2026-06-15 11:18:58 -06:00
parent 7644446267
commit 23ed52c78a
29 changed files with 2992 additions and 987 deletions

View File

@@ -0,0 +1,18 @@
# Generated by Django 5.2.3 on 2026-06-11 14:42
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('reports', '0003_alter_reportdocument_file'),
]
operations = [
migrations.AlterField(
model_name='reportdocument',
name='report_type',
field=models.CharField(choices=[('cumplimiento', 'cumplimiento'), ('control_pedimento', 'control_pedimento'), ('datastage', 'datastage')], default='cumplimiento', max_length=30),
),
]

View File

@@ -12,6 +12,7 @@ class ReportDocument(models.Model):
TYPE_REPORT = [
('cumplimiento', 'cumplimiento'),
('control_pedimento', 'control_pedimento'),
('datastage', 'datastage'),
]
user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE, related_name='report_documents')
filters = models.JSONField(blank=True, null=True)

View File

View File

@@ -0,0 +1,557 @@
"""
Lógica de exportación de reportes DataStage, extraída de ExportDataStageView
para poder ejecutarse dentro de una task Celery (sin request/HttpResponse).
Cada builder devuelve una tupla (content_bytes, filename, content_type, total_rows).
El aislamiento multi-tenant viene resuelto en global_filters['organizacion']
(la vista lo resuelve con get_org_context antes de encolar).
"""
import csv
import datetime
import hashlib
import io
import uuid
import zipfile
import openpyxl
from django.apps import apps
from django.core.paginator import Paginator
from api.organization.models import Organizacion
MAX_RECORDS_PER_FILE = 500000 # Límite por archivo Excel antes de particionar en ZIP
XLSX_CONTENT_TYPE = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
CSV_CONTENT_TYPE = 'text/csv; charset=utf-8'
ZIP_CONTENT_TYPE = 'application/zip'
RELATION_FIELDS = ['seccion_aduanera', 'patente', 'pedimento']
def safe_excel_value(value):
"""Convierte cualquier valor a un formato seguro para Excel/CSV."""
if value is None:
return ''
elif isinstance(value, (uuid.UUID,)):
return str(value)
elif hasattr(value, 'uuid'):
return str(value.uuid)
elif hasattr(value, 'id'):
return str(value.id)
elif isinstance(value, (datetime.datetime, datetime.date)):
return value.isoformat()
elif isinstance(value, (dict, list)):
return str(value)
else:
return str(value)
def apply_global_filters_to_model(global_filters, model):
"""Traduce los filtros globales a filtros ORM según los campos del modelo."""
filters = {}
model_fields = [f.name for f in model._meta.get_fields()]
# Organización — FK usa UUID, CharField usa el string tal cual
org_value = global_filters.get('organizacion')
if org_value and org_value != '' and 'organizacion' in model_fields:
field = model._meta.get_field('organizacion')
if hasattr(field, 'related_model'):
try:
filters['organizacion_id'] = uuid.UUID(org_value)
except Exception:
filters['organizacion_id'] = org_value
else:
filters['organizacion'] = org_value
rfc_value = global_filters.get('rfc')
if rfc_value and rfc_value != '' and 'rfc' in model_fields:
filters['rfc'] = rfc_value
if global_filters.get('patente'):
filters['patente'] = global_filters['patente']
if global_filters.get('pedimento'):
filters['pedimento'] = global_filters['pedimento']
if 'fecha_pago_real' in model_fields:
if global_filters.get('fecha_pago_desde'):
filters['fecha_pago_real__gte'] = global_filters['fecha_pago_desde']
if global_filters.get('fecha_pago_hasta'):
filters['fecha_pago_real__lte'] = global_filters['fecha_pago_hasta']
return filters
def apply_related_filters(global_filters, model, related_keys):
"""Filtros para modo múltiple: globales + llaves de cruce entre modelos."""
filters = {}
model_fields = [f.name for f in model._meta.get_fields()]
if 'organizacion' in model_fields and global_filters.get('organizacion'):
org_value = global_filters['organizacion']
try:
field = model._meta.get_field('organizacion')
if hasattr(field, 'related_model'):
filters['organizacion_id'] = uuid.UUID(org_value)
else:
filters['organizacion'] = org_value
except Exception:
filters['organizacion_id'] = org_value
if 'rfc' in model_fields and global_filters.get('rfc'):
filters['rfc'] = global_filters['rfc']
if 'fecha_pago_real' in model_fields:
if global_filters.get('fecha_pago_desde'):
filters['fecha_pago_real__gte'] = global_filters['fecha_pago_desde']
if global_filters.get('fecha_pago_hasta'):
filters['fecha_pago_real__lte'] = global_filters['fecha_pago_hasta']
if any(related_keys.values()):
if related_keys.get('patentes') and 'patente' in model_fields:
filters['patente__in'] = related_keys['patentes']
if related_keys.get('pedimentos') and 'pedimento' in model_fields:
filters['pedimento__in'] = related_keys['pedimentos']
if related_keys.get('datastage_ids') and 'datastage_id' in model_fields:
filters['datastage_id__in'] = related_keys['datastage_ids']
else:
if 'patente' in model_fields and global_filters.get('patente'):
filters['patente'] = global_filters['patente']
if 'pedimento' in model_fields and global_filters.get('pedimento'):
filters['pedimento'] = global_filters['pedimento']
return filters
def get_related_keys_from_filters(global_filters, models_data):
"""
Construye el conjunto de (patente, pedimento, datastage_id) que servirá como
llave de cruce entre modelos.
Regla clave: si el filtro RFC está activo, solo los modelos que tienen el campo
'rfc' pueden contribuir a related_keys. Los modelos sin 'rfc' (ej. 505, 506)
no se usan como semilla — solo se filtrarán más tarde usando las claves ya
construidas, evitando que contaminen el resultado con pedimentos de otros RFC.
"""
related_keys = {
'patentes': set(),
'pedimentos': set(),
'datastage_ids': set()
}
# Sin filtros significativos → sin cruce
if not any(v for v in global_filters.values() if v not in [None, '']):
return {}
rfc_filter_active = bool(global_filters.get('rfc'))
date_filter_active = bool(global_filters.get('fecha_pago_desde') or global_filters.get('fecha_pago_hasta'))
all_records_with_filters = []
for model_data in models_data:
model_name = model_data.get('model')
try:
model = apps.get_model('datastage', model_name)
model_field_names = {f.name for f in model._meta.get_fields() if hasattr(f, 'name')}
# Un modelo puede ser semilla de related_keys SOLO si tiene campos
# para aplicar TODOS los filtros activos
if rfc_filter_active and 'rfc' not in model_field_names:
continue
if date_filter_active and 'fecha_pago_real' not in model_field_names:
continue
filters = apply_global_filters_to_model(global_filters, model)
if not filters:
continue
records = model.objects.filter(**filters).values('patente', 'pedimento', 'datastage_id')
all_records_with_filters.extend(list(records))
except LookupError:
continue
if not all_records_with_filters:
return {'patentes': set(), 'pedimentos': set(), 'datastage_ids': set()}
for record in all_records_with_filters:
if record.get('patente'):
related_keys['patentes'].add(record['patente'])
if record.get('pedimento'):
related_keys['pedimentos'].add(record['pedimento'])
if record.get('datastage_id'):
related_keys['datastage_ids'].add(record['datastage_id'])
return {k: list(v) for k, v in related_keys.items() if v}
# ---------------------------------------------------------------------------
# Exportación simple (un solo modelo)
# ---------------------------------------------------------------------------
def build_simple_export(model_name, fields, global_filters, export_format, progress_cb=None):
progress_cb = progress_cb or (lambda p, m: None)
try:
model = apps.get_model('datastage', model_name)
except LookupError:
raise ValueError(f'Modelo {model_name} no encontrado')
filters = apply_global_filters_to_model(global_filters, model)
queryset = model.objects.filter(**filters).values(*fields)
total_records = queryset.count()
progress_cb(20, f'{model_name}: {total_records} registros encontrados')
if export_format == 'excel':
if total_records > MAX_RECORDS_PER_FILE:
content, filename, content_type = _simple_excel_partitioned(model_name, fields, queryset, progress_cb)
else:
content, filename, content_type = _simple_excel(model_name, fields, queryset, progress_cb)
else:
# CSV no tiene límite de filas — siempre un solo archivo
content, filename, content_type = _simple_csv(model_name, fields, queryset, progress_cb)
return content, filename, content_type, total_records
def _simple_excel(model_name, fields, queryset, progress_cb):
progress_cb(40, f'Escribiendo Excel de {model_name}...')
wb = openpyxl.Workbook()
ws = wb.active
ws.append(fields)
for row in queryset:
ws.append([safe_excel_value(row[field]) for field in fields])
progress_cb(88, 'Serializando archivo...')
output = io.BytesIO()
wb.save(output)
return output.getvalue(), f'{model_name}.xlsx', XLSX_CONTENT_TYPE
def _simple_csv(model_name, fields, queryset, progress_cb):
progress_cb(40, f'Escribiendo CSV de {model_name}...')
buf = io.StringIO()
writer = csv.DictWriter(buf, fieldnames=fields)
writer.writeheader()
for row in queryset:
writer.writerow(row)
progress_cb(88, 'Serializando archivo...')
return buf.getvalue().encode('utf-8'), f'{model_name}.csv', CSV_CONTENT_TYPE
def _simple_excel_partitioned(model_name, fields, queryset, progress_cb):
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
paginator = Paginator(queryset, MAX_RECORDS_PER_FILE)
for page_num in paginator.page_range:
pct = 25 + int((page_num / paginator.num_pages) * 55)
progress_cb(pct, f'Particionando {model_name}: parte {page_num}/{paginator.num_pages}')
page = paginator.page(page_num)
wb = openpyxl.Workbook()
ws = wb.active
ws.title = f'Parte_{page_num}'[:31]
ws.append(fields)
for row in page.object_list:
ws.append([safe_excel_value(row[field]) for field in fields])
part_buffer = io.BytesIO()
wb.save(part_buffer)
zip_file.writestr(f'{model_name}_part{page_num}.xlsx', part_buffer.getvalue())
progress_cb(88, 'Serializando archivo...')
return zip_buffer.getvalue(), f'{model_name}_particionado.zip', ZIP_CONTENT_TYPE
def _simple_csv_partitioned(model_name, fields, queryset, progress_cb):
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
paginator = Paginator(queryset, MAX_RECORDS_PER_FILE)
for page_num in paginator.page_range:
pct = 25 + int((page_num / paginator.num_pages) * 55)
progress_cb(pct, f'Particionando {model_name}: parte {page_num}/{paginator.num_pages}')
page = paginator.page(page_num)
csv_buffer = io.StringIO()
writer = csv.writer(csv_buffer)
writer.writerow(fields)
for row in page.object_list:
writer.writerow([safe_excel_value(row[field]) for field in fields])
zip_file.writestr(f'{model_name}_part{page_num}.csv', csv_buffer.getvalue())
progress_cb(88, 'Serializando archivo...')
return zip_buffer.getvalue(), f'{model_name}_particionado.zip', ZIP_CONTENT_TYPE
# ---------------------------------------------------------------------------
# Exportación múltiple (varios modelos agrupados por llaves de cruce)
# ---------------------------------------------------------------------------
def _collect_multiple_data(models_data, global_filters, related_keys, progress_cb):
"""
Recolecta y agrupa los registros de todos los modelos por la llave
seccion_aduanera + patente + pedimento. Mapea organizacion_id → nombre.
"""
org_mapping = {str(org.id): org.nombre for org in Organizacion.objects.all()}
all_models_data = {}
total_models = len(models_data) or 1
for idx, model_data in enumerate(models_data):
model_name = model_data.get('model')
fields = model_data.get('fields', [])
if not model_name or not fields:
continue
# Normalizar campos: 'organizacion' → 'organizacion_id', sin duplicados
normalized_fields = []
for f in fields:
key = f.strip() if isinstance(f, str) else f
if isinstance(key, str) and key.lower() == 'organizacion':
if 'organizacion_id' not in normalized_fields:
normalized_fields.append('organizacion_id')
else:
if key not in normalized_fields:
normalized_fields.append(key)
fields = normalized_fields
for req_field in RELATION_FIELDS:
if req_field not in fields:
fields.append(req_field)
try:
model = apps.get_model('datastage', model_name)
model_field_names = [f.name for f in model._meta.get_fields() if hasattr(f, 'name')]
if 'organizacion_id' not in fields and 'organizacion_id' in model_field_names:
fields.append('organizacion_id')
filters = apply_related_filters(global_filters, model, related_keys)
queryset = model.objects.filter(**filters).values(*fields) if filters else model.objects.none()
count = queryset.count()
pct = 20 + int((idx / total_models) * 55)
progress_cb(pct, f'Modelo {idx + 1}/{total_models}: {model_name} ({count} registros)')
if count == 0:
continue
relation_fields = [fn for fn in RELATION_FIELDS if fn in fields]
if not relation_fields:
relation_fields = ['datastage_id'] if 'datastage_id' in fields else [fields[0]]
for record in queryset:
key_parts = [str(record[rf]) for rf in relation_fields if rf in record and record[rf] is not None]
if not key_parts:
key = hashlib.md5(str(sorted(record.items())).encode()).hexdigest()[:10]
else:
key = "_".join(key_parts)
processed_record = {}
for field_name, value in record.items():
if field_name == 'organizacion_id' and value:
org_id_str = str(value)
if org_id_str in org_mapping:
processed_value = org_mapping[org_id_str]
else:
try:
org = Organizacion.objects.filter(id=value).first()
processed_value = org.nombre if org else org_id_str
org_mapping[org_id_str] = processed_value
except Exception:
processed_value = org_id_str
else:
processed_value = value
if field_name in relation_fields:
prefixed_field_name = field_name
else:
prefixed_field_name = f"{model_name}_{field_name}"
if field_name == 'organizacion_id':
prefixed_field_name = prefixed_field_name.replace('organizacion_id', 'organizacion_nombre')
processed_record[prefixed_field_name] = safe_excel_value(processed_value)
if key not in all_models_data:
all_models_data[key] = {'relation_fields': {}, 'model_records': {}}
for rel_field in relation_fields:
if rel_field in record:
all_models_data[key]['relation_fields'][rel_field] = record[rel_field]
if model_name not in all_models_data[key]['model_records']:
all_models_data[key]['model_records'][model_name] = []
all_models_data[key]['model_records'][model_name].append(processed_record)
except LookupError:
continue
return all_models_data
def _build_combined_rows(all_models_data):
"""Construye filas combinadas — repite el último registro en lugar de dejar vacíos."""
combined_rows = []
for key, data in all_models_data.items():
relation_fields_data = data['relation_fields']
model_records = data['model_records']
max_records_per_key = max((len(recs) for recs in model_records.values()), default=1)
for i in range(max_records_per_key):
row_data = {}
for rel_field, rel_value in relation_fields_data.items():
row_data[rel_field] = safe_excel_value(rel_value)
for model_name, records in model_records.items():
# Usar posición i o el último registro disponible
record = records[i] if i < len(records) else records[-1]
for field_name, value in record.items():
row_data[field_name] = value
combined_rows.append(row_data)
return combined_rows
def _ordered_fields(combined_rows):
"""Encabezados: campos de relación primero, luego organización, luego el resto."""
all_fields_set = set()
for row in combined_rows:
all_fields_set.update(row.keys())
all_fields = []
for rel_field in RELATION_FIELDS:
if rel_field in all_fields_set:
all_fields.append(rel_field)
all_fields_set.discard(rel_field)
org_fields = sorted(f for f in all_fields_set if 'organizacion' in f.lower())
for org_field in org_fields:
all_fields.append(org_field)
all_fields_set.discard(org_field)
all_fields.extend(sorted(all_fields_set))
return all_fields
def build_multiple_export(models_data, global_filters, export_format, progress_cb=None):
progress_cb = progress_cb or (lambda p, m: None)
progress_cb(15, 'Resolviendo llaves de cruce entre modelos...')
related_keys = get_related_keys_from_filters(global_filters, models_data)
all_models_data = _collect_multiple_data(models_data, global_filters, related_keys, progress_cb)
# Sin datos → archivo con mensaje, no error (el frontend espera un archivo)
if not all_models_data:
if export_format == 'excel':
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Sin datos"
ws.append(["No se encontraron datos para los filtros especificados"])
output = io.BytesIO()
wb.save(output)
return output.getvalue(), 'datastage_sin_datos.xlsx', XLSX_CONTENT_TYPE, 0
else:
buf = io.StringIO()
csv.writer(buf).writerow(['No se encontraron datos para los filtros especificados'])
return buf.getvalue().encode('utf-8'), 'datastage_sin_datos.csv', CSV_CONTENT_TYPE, 0
progress_cb(80, 'Combinando filas...')
combined_rows = _build_combined_rows(all_models_data)
all_fields = _ordered_fields(combined_rows)
total_rows = len(combined_rows)
if export_format == 'excel':
content, filename, content_type = _multiple_excel(combined_rows, all_fields, progress_cb)
else:
content, filename, content_type = _multiple_csv(combined_rows, all_fields, progress_cb)
return content, filename, content_type, total_rows
def _multiple_excel(combined_rows, all_fields, progress_cb):
now_str = datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')
title_row = ["Reporte Datastage"]
date_row = [f"Generado: {now_str}"]
def _write_sheet(ws, sheet_name, page_rows):
ws.title = sheet_name[:31]
ws.append(title_row)
ws.append(date_row)
ws.append([])
ws.append(all_fields)
for row_data in page_rows:
ws.append([row_data.get(field, '') for field in all_fields])
for column in ws.columns:
max_length = 0
col_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except Exception:
pass
ws.column_dimensions[col_letter].width = min(max_length + 2, 50)
# Excel directo si cabe en un archivo; ZIP solo si se necesita particionar
paginator = Paginator(combined_rows, MAX_RECORDS_PER_FILE)
if paginator.num_pages == 1:
progress_cb(88, 'Serializando archivo...')
wb = openpyxl.Workbook()
_write_sheet(wb.active, "Datastage", paginator.page(1).object_list)
output = io.BytesIO()
wb.save(output)
return output.getvalue(), 'datastage_reporte.xlsx', XLSX_CONTENT_TYPE
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for page_num in paginator.page_range:
progress_cb(80 + int((page_num / paginator.num_pages) * 8),
f'Particionando: parte {page_num}/{paginator.num_pages}')
page = paginator.page(page_num)
current_wb = openpyxl.Workbook()
_write_sheet(current_wb.active, f"Datastage_p{page_num}", page.object_list)
part_buffer = io.BytesIO()
current_wb.save(part_buffer)
zip_file.writestr(f"datastage_part{page_num}.xlsx", part_buffer.getvalue())
progress_cb(88, 'Serializando archivo...')
return zip_buffer.getvalue(), 'datastage_combinado.zip', ZIP_CONTENT_TYPE
def _multiple_csv(combined_rows, all_fields, progress_cb):
progress_cb(88, 'Serializando archivo...')
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(all_fields)
for row_data in combined_rows:
writer.writerow([row_data.get(field, '') for field in all_fields])
return buf.getvalue().encode('utf-8'), 'datastage_reporte.csv', CSV_CONTENT_TYPE
# ---------------------------------------------------------------------------
# Dispatcher
# ---------------------------------------------------------------------------
def build_datastage_export(payload, progress_cb=None):
"""
Genera el reporte DataStage a partir del payload persistido en
ReportDocument.filters. Lanza ValueError si el payload es inválido.
Retorna (content_bytes, filename, content_type, total_rows).
"""
modo = payload.get('modo', 'simple')
export_format = payload.get('format', 'csv')
global_filters = payload.get('globalFilters') or {}
if modo == 'multiple':
models_data = payload.get('models') or []
if not models_data:
raise ValueError('models es requerido para exportación múltiple')
return build_multiple_export(models_data, global_filters, export_format, progress_cb)
model_name = payload.get('model')
fields = payload.get('fields')
if not model_name or not fields:
raise ValueError('model y fields son requeridos para exportación simple')
return build_simple_export(model_name, fields, global_filters, export_format, progress_cb)

View File

@@ -0,0 +1,3 @@
# Importa los módulos de tasks para que autodiscover_tasks() los registre en el worker
from .report_document import generate_report_document, generate_report_control_pedimento
from .report_datastage import generate_report_datastage

View File

@@ -0,0 +1,105 @@
import logging
import traceback
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
from django.core.files.uploadedfile import SimpleUploadedFile
from django.utils import timezone
from api.reports.models import ReportDocument
from api.reports.services.datastage_export import build_datastage_export
from api.utils.storage_service import storage_service
from core.redis_events import publish_task_event
logger = logging.getLogger('api.reports.tasks')
@shared_task(bind=True, queue='reports', soft_time_limit=1800, time_limit=1860)
def generate_report_datastage(self, report_id):
task_id = self.request.id
report = None
def _fail(msg, exc=None):
"""Marca el reporte como error, notifica al frontend y loguea. Sin re-raise."""
tb = traceback.format_exc() if exc else ''
full_msg = f"{msg}\n\n{tb}".strip() if tb else msg
logger.error('[reporte_datastage] report=%s FALLO: %s', report_id, full_msg)
if report:
report.status = 'error'
report.error_message = full_msg
report.finished_at = timezone.now()
report.save(update_fields=['status', 'error_message', 'finished_at'])
publish_task_event(task_id, 'failed', msg, progress=0)
# ── 1. Obtener reporte ────────────────────────────────────────────────────
try:
report = ReportDocument.objects.get(id=report_id)
except ReportDocument.DoesNotExist:
logger.error('[reporte_datastage] ReportDocument %s no existe', report_id)
publish_task_event(task_id, 'failed', f'Reporte {report_id} no encontrado', progress=0)
return
logger.info('[reporte_datastage] Iniciando report=%s user=%s', report_id, report.user_id)
report.status = 'processing'
report.save(update_fields=['status'])
publish_task_event(task_id, 'processing', 'Iniciando generación de reporte...', progress=5)
try:
# La organización ya viene resuelta en el payload (la vista la fija antes de encolar)
payload = report.filters or {}
org_id = payload.get('organizacion_id')
def _progress(pct, msg):
publish_task_event(task_id, 'processing', msg, progress=pct)
# ── 2. Generar archivo (xlsx / csv / zip según modo, formato y volumen) ──
content, filename, content_type, total_rows = build_datastage_export(payload, _progress)
# ── 3. Subir a almacenamiento ─────────────────────────────────────────
logger.info('[reporte_datastage] report=%s archivo=%s size=%.1fKB filas=%d',
report_id, filename, len(content) / 1024, total_rows)
publish_task_event(task_id, 'processing', 'Subiendo a almacenamiento...', progress=93)
final_name = f"datastage_{report.id}_{timezone.now().strftime('%Y%m%d%H%M%S')}_{filename}"
ruta = storage_service.save_report(
file=SimpleUploadedFile(
name=final_name,
content=content,
content_type=content_type,
),
organizacion_id=org_id,
metadata={
'report_id': str(report.id),
'report_type': 'datastage',
'user_id': str(report.user.id) if report.user else None,
},
)
if ruta:
logger.info('[reporte_datastage] report=%s guardado en storage=%s', report_id, ruta)
report.file = ruta
report.status = 'ready'
else:
_fail('Error al guardar el archivo en almacenamiento (storage retornó None)')
return
report.finished_at = timezone.now()
report.save(update_fields=['status', 'file', 'finished_at', 'error_message'])
resultado = {
'report_id': str(report.id),
'total_registros': total_rows,
'archivo': final_name,
}
publish_task_event(task_id, 'completed', 'Reporte generado exitosamente.', progress=100, resultado=resultado)
logger.info('[reporte_datastage] report=%s COMPLETADO filas=%d', report_id, total_rows)
return resultado
except SoftTimeLimitExceeded:
_fail('El reporte tardó más de 30 minutos y fue cancelado. Intenta con filtros más acotados.')
except ValueError as exc:
_fail(str(exc))
except Exception as exc:
_fail(str(exc), exc=exc)

View File

@@ -0,0 +1,348 @@
"""
Tests para el reporte DataStage asíncrono (Celery + SSE).
Ejecución:
python manage.py test api.reports.tests_datastage
"""
import csv
import io
from unittest.mock import MagicMock, patch
import openpyxl
from django.apps import apps
from django.contrib.auth import get_user_model
from django.db import connection
from django.test import TestCase
from rest_framework.test import APIClient
from api.licence.models import Licencia
from api.organization.models import Organizacion
from api.reports.models import ReportDocument
from api.reports.tasks.report_datastage import generate_report_datastage
User = get_user_model()
FAKE_PATH = 'org_x/reports/datastage_test.xlsx'
# ── fixtures ──────────────────────────────────────────────────────────────────
def _ensure_registro_created_at():
"""Las migraciones 0013/0014 de datastage agregan created_at solo en estado
(SeparateDatabaseAndState) porque la columna ya existía en la BD real; en la
BD de test hay que crearla explícitamente para poder insertar registros."""
with connection.cursor() as cur:
cur.execute('ALTER TABLE registro501 ADD COLUMN IF NOT EXISTS created_at timestamptz')
cur.execute('ALTER TABLE registro502 ADD COLUMN IF NOT EXISTS created_at timestamptz')
def _org(nombre='Org DataStage'):
lic = Licencia.objects.create(nombre=f'Lic {nombre}', almacenamiento=10)
return Organizacion.objects.create(nombre=nombre, is_active=True, is_verified=True, licencia=lic)
def _user(org, username='ds_user', superuser=False):
if superuser:
u = User.objects.create_superuser(username=username, password='pass', email=f'{username}@test.mx')
# Superuser JWT requiere active_organization (OrgScopedPermission)
u.active_organization = org
u.save(update_fields=['active_organization'])
return u
return User.objects.create_user(username=username, password='pass', organizacion=org)
def _registro501(org, pedimento='1000001', rfc='XAXX010101000', patente='3910'):
Registro501 = apps.get_model('datastage', 'Registro501')
return Registro501.objects.create(
organizacion=org, patente=patente, pedimento=pedimento,
seccion_aduanera='160', rfc=rfc,
)
def _registro502(org, pedimento='1000001', patente='3910', transportista='Transportes Test'):
Registro502 = apps.get_model('datastage', 'Registro502')
return Registro502.objects.create(
organizacion=org, patente=patente, pedimento=pedimento,
seccion_aduanera='160', nombre_transportista=transportista,
)
def _reporte(user, payload):
return ReportDocument.objects.create(
user=user, filters=payload, status='pending', report_type='datastage'
)
def _payload_simple(org, fmt='excel', model='Registro501', fields=None):
return {
'modo': 'simple',
'format': fmt,
'globalFilters': {'organizacion': str(org.id)},
'organizacion_id': str(org.id),
'model': model,
'fields': fields or ['patente', 'pedimento', 'rfc'],
}
def _payload_multiple(org, fmt='excel'):
return {
'modo': 'multiple',
'format': fmt,
'globalFilters': {'organizacion': str(org.id)},
'organizacion_id': str(org.id),
'models': [
{'model': 'Registro501', 'name': 'Datos generales', 'fields': ['rfc', 'patente', 'pedimento']},
{'model': 'Registro502', 'name': 'Transporte', 'fields': ['nombre_transportista']},
],
}
def _archivo_desde_mock(mock_save):
"""Devuelve (nombre, bytes) del archivo que recibió storage_service.save_report."""
uf = mock_save.call_args[1]['file']
return uf.name, uf.read()
# ── 1. Task Celery ────────────────────────────────────────────────────────────
# Se mockean Redis (publish_task_event) y MinIO (storage_service.save_report).
@patch('api.reports.tasks.report_datastage.publish_task_event')
@patch('api.reports.tasks.report_datastage.storage_service.save_report',
return_value=FAKE_PATH)
class TestGenerateReportDatastage(TestCase):
@classmethod
def setUpTestData(cls):
_ensure_registro_created_at()
cls.org = _org()
cls.user = _user(cls.org)
def _run(self, report):
generate_report_datastage.apply(args=[str(report.id)])
report.refresh_from_db()
# ── 1.1 Simple / Excel ────────────────────────────────────────────────────
def test_simple_excel_status_ready_y_archivo_xlsx(self, mock_save, mock_pub):
_registro501(self.org)
report = _reporte(self.user, _payload_simple(self.org, fmt='excel'))
self._run(report)
self.assertEqual(report.status, 'ready')
self.assertEqual(report.file, FAKE_PATH)
self.assertIsNotNone(report.finished_at)
nombre, contenido = _archivo_desde_mock(mock_save)
self.assertTrue(nombre.endswith('.xlsx'), f'Esperado .xlsx, recibido: {nombre}')
wb = openpyxl.load_workbook(io.BytesIO(contenido))
valores = [str(c.value) for row in wb.active.iter_rows() for c in row if c.value]
self.assertIn('XAXX010101000', valores)
# ── 1.2 Simple / CSV ──────────────────────────────────────────────────────
def test_simple_csv_status_ready_y_archivo_csv(self, mock_save, mock_pub):
_registro501(self.org)
report = _reporte(self.user, _payload_simple(self.org, fmt='csv'))
self._run(report)
self.assertEqual(report.status, 'ready')
nombre, contenido = _archivo_desde_mock(mock_save)
self.assertTrue(nombre.endswith('.csv'), f'Esperado .csv, recibido: {nombre}')
rows = list(csv.reader(io.StringIO(contenido.decode('utf-8'))))
self.assertEqual(rows[0], ['patente', 'pedimento', 'rfc'])
self.assertIn('XAXX010101000', rows[1])
# ── 1.3 Aislamiento por organización ──────────────────────────────────────
def test_simple_no_incluye_datos_de_otra_organizacion(self, mock_save, mock_pub):
_registro501(self.org, rfc='XAXX010101000')
otra_org = _org('Otra Org')
_registro501(otra_org, pedimento='9999999', rfc='XEXX010101000')
report = _reporte(self.user, _payload_simple(self.org, fmt='csv'))
self._run(report)
_, contenido = _archivo_desde_mock(mock_save)
texto = contenido.decode('utf-8')
self.assertIn('XAXX010101000', texto)
self.assertNotIn('XEXX010101000', texto)
# ── 1.4 Múltiple / Excel ──────────────────────────────────────────────────
def test_multiple_excel_combina_modelos_por_llave(self, mock_save, mock_pub):
_registro501(self.org)
_registro502(self.org)
report = _reporte(self.user, _payload_multiple(self.org, fmt='excel'))
self._run(report)
self.assertEqual(report.status, 'ready')
nombre, contenido = _archivo_desde_mock(mock_save)
self.assertTrue(nombre.endswith('datastage_reporte.xlsx'), f'Nombre inesperado: {nombre}')
wb = openpyxl.load_workbook(io.BytesIO(contenido))
valores = [str(c.value) for row in wb.active.iter_rows() for c in row if c.value]
# Campos de ambos modelos en la misma hoja (prefijados por modelo)
self.assertIn('Registro501_rfc', valores)
self.assertIn('Registro502_nombre_transportista', valores)
self.assertIn('XAXX010101000', valores)
self.assertIn('Transportes Test', valores)
# ── 1.5 Múltiple / CSV ────────────────────────────────────────────────────
def test_multiple_csv_combina_modelos(self, mock_save, mock_pub):
_registro501(self.org)
_registro502(self.org)
report = _reporte(self.user, _payload_multiple(self.org, fmt='csv'))
self._run(report)
self.assertEqual(report.status, 'ready')
nombre, contenido = _archivo_desde_mock(mock_save)
self.assertTrue(nombre.endswith('datastage_reporte.csv'), f'Nombre inesperado: {nombre}')
texto = contenido.decode('utf-8')
self.assertIn('Registro501_rfc', texto)
self.assertIn('Transportes Test', texto)
# ── 1.6 Sin datos ─────────────────────────────────────────────────────────
def test_multiple_sin_datos_genera_archivo_sin_datos_y_ready(self, mock_save, mock_pub):
report = _reporte(self.user, _payload_multiple(self.org, fmt='excel'))
self._run(report)
self.assertEqual(report.status, 'ready')
nombre, _ = _archivo_desde_mock(mock_save)
self.assertIn('sin_datos', nombre)
# ── 1.7 Payload inválido ──────────────────────────────────────────────────
def test_modelo_inexistente_marca_error_y_publica_failed(self, mock_save, mock_pub):
report = _reporte(self.user, _payload_simple(self.org, model='NoExiste'))
self._run(report)
self.assertEqual(report.status, 'error')
self.assertIn('NoExiste', report.error_message)
statuses = [c[0][1] for c in mock_pub.call_args_list]
self.assertIn('failed', statuses)
self.assertNotIn('completed', statuses)
def test_payload_sin_model_marca_error(self, mock_save, mock_pub):
payload = _payload_simple(self.org)
del payload['model']
report = _reporte(self.user, payload)
self._run(report)
self.assertEqual(report.status, 'error')
mock_save.assert_not_called()
# ── 1.8 Eventos de progreso ───────────────────────────────────────────────
def test_ultimo_evento_es_completed_con_100_y_resultado(self, mock_save, mock_pub):
_registro501(self.org)
report = _reporte(self.user, _payload_simple(self.org))
self._run(report)
ultimo = mock_pub.call_args_list[-1]
self.assertEqual(ultimo[0][1], 'completed')
self.assertEqual(ultimo[1].get('progress'), 100)
resultado = ultimo[1].get('resultado')
self.assertEqual(resultado['report_id'], str(report.id))
self.assertEqual(resultado['total_registros'], 1)
def test_se_publican_eventos_de_progreso(self, mock_save, mock_pub):
_registro501(self.org)
report = _reporte(self.user, _payload_simple(self.org))
self._run(report)
self.assertGreaterEqual(mock_pub.call_count, 4, 'Se esperan mínimo 4 eventos')
# ── 1.9 Storage falla ─────────────────────────────────────────────────────
def test_storage_none_deja_status_error_y_failed(self, mock_save, mock_pub):
mock_save.return_value = None
_registro501(self.org)
report = _reporte(self.user, _payload_simple(self.org))
self._run(report)
self.assertEqual(report.status, 'error')
self.assertIn('almacenamiento', report.error_message)
statuses = [c[0][1] for c in mock_pub.call_args_list]
self.assertIn('failed', statuses)
# ── 2. Vista (encolado 202) ───────────────────────────────────────────────────
class TestExportDataStageView202(TestCase):
@classmethod
def setUpTestData(cls):
cls.org = _org('Org Vista')
cls.user = _user(cls.org, username='vista_admin', superuser=True)
def setUp(self):
self.client = APIClient()
self.client.force_authenticate(user=self.user)
def _post(self, body):
with patch('api.reports.views.generate_report_datastage.delay',
return_value=MagicMock(id='fake-task-id')) as mock_delay:
res = self.client.post('/api/v1/reports/exportmodel/datastage/', body, format='json')
return res, mock_delay
def test_post_simple_responde_202_con_task_y_report(self):
body = {
'modo': 'simple', 'format': 'excel',
'globalFilters': {'organizacion': str(self.org.id)},
'model': 'Registro501', 'fields': ['patente', 'pedimento'],
}
res, mock_delay = self._post(body)
self.assertEqual(res.status_code, 202)
self.assertEqual(res.data['task_id'], 'fake-task-id')
self.assertEqual(res.data['status'], 'pending')
report = ReportDocument.objects.get(id=res.data['report_id'])
self.assertEqual(report.report_type, 'datastage')
self.assertEqual(report.user_id, self.user.id)
self.assertEqual(report.filters['organizacion_id'], str(self.org.id))
mock_delay.assert_called_once_with(report.id)
def test_post_multiple_persiste_models_en_filters(self):
body = {
'modo': 'multiple', 'format': 'csv',
'globalFilters': {'organizacion': str(self.org.id)},
'models': [{'model': 'Registro501', 'fields': ['rfc']}],
}
res, _ = self._post(body)
self.assertEqual(res.status_code, 202)
report = ReportDocument.objects.get(id=res.data['report_id'])
self.assertEqual(report.filters['modo'], 'multiple')
self.assertEqual(report.filters['models'][0]['model'], 'Registro501')
def test_post_simple_sin_fields_responde_400(self):
body = {
'modo': 'simple', 'format': 'excel',
'globalFilters': {'organizacion': str(self.org.id)},
'model': 'Registro501',
}
res, mock_delay = self._post(body)
self.assertEqual(res.status_code, 400)
mock_delay.assert_not_called()
self.assertFalse(ReportDocument.objects.filter(report_type='datastage').exists())
def test_post_multiple_sin_models_responde_400(self):
body = {
'modo': 'multiple', 'format': 'excel',
'globalFilters': {'organizacion': str(self.org.id)},
}
res, mock_delay = self._post(body)
self.assertEqual(res.status_code, 400)
mock_delay.assert_not_called()
def test_post_modelo_inexistente_responde_404(self):
body = {
'modo': 'simple', 'format': 'excel',
'globalFilters': {'organizacion': str(self.org.id)},
'model': 'NoExiste', 'fields': ['x'],
}
res, mock_delay = self._post(body)
self.assertEqual(res.status_code, 404)
mock_delay.assert_not_called()

View File

@@ -25,7 +25,10 @@ from core.permissions import (
require_permission,
user_has_permission,
)
from .models import ReportDocument
from .serializers import ExportModelSerializer
from .services import datastage_export
from .tasks.report_datastage import generate_report_datastage
def export_model_to_csv(request, model_name, fields, module='datastage', filters=None):
model = apps.get_model(module, model_name)
@@ -90,28 +93,13 @@ class ExportDataStageView(APIView):
return [IsAuthenticated(), require_permission('reportes.view')()]
return [IsAuthenticated(), require_permission('reportes.export')()]
# Constantes para partición
# MAX_RECORDS_PER_FILE = 100 # Límite seguro por archivo
MAX_RECORDS_PER_FILE = 120000 # Límite seguro por archivo
# La lógica de exportación vive en services/datastage_export.py (la usa la
# task Celery generate_report_datastage); estos delegados conservan la
# interfaz para los métodos legacy de esta clase.
MAX_RECORDS_PER_FILE = datastage_export.MAX_RECORDS_PER_FILE
def safe_excel_value(self, value):
"""
Convierte cualquier valor a un formato seguro para Excel
"""
if value is None:
return ''
elif isinstance(value, (uuid.UUID,)):
return str(value)
elif hasattr(value, 'uuid'):
return str(value.uuid)
elif hasattr(value, 'id'):
return str(value.id)
elif isinstance(value, (datetime.datetime, datetime.date)):
return value.isoformat()
elif isinstance(value, (dict, list)):
return str(value)
else:
return str(value)
return datastage_export.safe_excel_value(value)
def get(self, request, *args, **kwargs):
"""Retorna RFCs distintos de Registro501 para la organización activa del usuario."""
@@ -134,19 +122,67 @@ class ExportDataStageView(APIView):
except LookupError:
return Response({'rfcs': []})
@swagger_auto_schema(request_body=ExportModelSerializer, responses={200: 'Archivo generado (Excel o CSV)'})
@swagger_auto_schema(request_body=ExportModelSerializer, responses={202: 'Reporte encolado (Celery)'})
def post(self, request, *args, **kwargs):
"""
Endpoint específico para exportación de DataStage con soporte múltiple
Encola la generación asíncrona del reporte DataStage (Celery + SSE).
Responde 202 con report_id y task_id; el progreso se sigue por SSE
(/stream/tasks/{task_id}) y el archivo se descarga después vía
/reports/report-document-download/{report_id}/.
"""
# Verificar si es modo múltiple
modo = request.data.get('modo', 'simple')
export_format = request.data.get('format', 'csv')
global_filters = request.data.get('globalFilters', {})
# Validar payload antes de encolar (mismos errores que el flujo síncrono)
if modo == 'multiple':
return self.handle_multiple_export(request)
models_data = request.data.get('models', [])
if not models_data:
return Response({'error': 'models are required for multiple export'}, status=status.HTTP_400_BAD_REQUEST)
else:
return self.handle_simple_export(request)
model_name = request.data.get('model')
fields = request.data.get('fields')
if not model_name or not fields:
return Response({'error': 'model and fields are required'}, status=status.HTTP_400_BAD_REQUEST)
try:
apps.get_model('datastage', model_name)
except LookupError:
return Response({'error': f'Model {model_name} not found'}, status=status.HTTP_404_NOT_FOUND)
global_filters, err = self._resolve_org_filter(global_filters, request.user)
if err:
return err
# La org ya resuelta viaja en el payload: la task no tiene request.user
payload = {
'modo': modo,
'format': export_format,
'globalFilters': global_filters,
'organizacion_id': global_filters.get('organizacion'),
}
if modo == 'multiple':
payload['models'] = models_data
else:
payload['model'] = model_name
payload['fields'] = fields
report = ReportDocument.objects.create(
user=request.user,
filters=payload,
status='pending',
report_type='datastage',
)
task = generate_report_datastage.delay(report.id)
return Response({
'report_id': report.id,
'task_id': task.id,
'status': report.status,
'created_at': report.created_at,
'download_url': None,
}, status=status.HTTP_202_ACCEPTED)
def _resolve_org_filter(self, global_filters, user):
"""
Devuelve los global_filters asegurando que siempre haya una organización.
@@ -164,63 +200,6 @@ class ExportDataStageView(APIView):
filters['organizacion'] = str(org.id)
return filters, None
def handle_simple_export(self, request):
"""Maneja exportación simple de DataStage (un solo modelo)"""
model_name = request.data.get('model')
fields = request.data.get('fields')
global_filters = request.data.get('globalFilters', {})
export_type = request.data.get('format', 'csv')
module = 'datastage'
if not model_name or not fields:
return Response({'error': 'model and fields are required'}, status=status.HTTP_400_BAD_REQUEST)
global_filters, err = self._resolve_org_filter(global_filters, request.user)
if err:
return err
try:
model = apps.get_model(module, model_name)
filters = self.apply_global_filters_to_model(global_filters, model, request.user)
queryset = model.objects.filter(**filters).values(*fields)
total_records = queryset.count()
if export_type == 'excel':
# Verificar si necesita partición
if total_records > self.MAX_RECORDS_PER_FILE:
return self.export_single_model_partitioned(request, model_name, fields, filters, total_records)
else:
return export_model_to_excel(request, model_name, fields, module, filters)
else:
if total_records > self.MAX_RECORDS_PER_FILE:
return self.export_single_model_csv_partitioned(request, model_name, fields, filters, total_records)
else:
return export_model_to_csv(request, model_name, fields, module, filters)
except LookupError:
return Response({'error': f'Model {model_name} not found'}, status=status.HTTP_404_NOT_FOUND)
def handle_multiple_export(self, request):
"""Maneja exportación múltiple de DataStage (varios modelos)"""
models_data = request.data.get('models', [])
export_type = request.data.get('format', 'csv')
global_filters = request.data.get('globalFilters', {})
if not models_data:
return Response({'error': 'models are required for multiple export'}, status=status.HTTP_400_BAD_REQUEST)
global_filters, err = self._resolve_org_filter(global_filters, request.user)
if err:
return err
related_keys = self.get_related_keys_from_filters(global_filters, models_data, request.user)
if export_type == 'excel':
return self.export_datastage_multiple_partitioned_excel_agrupados(request, models_data, global_filters, related_keys)
else:
return self.export_datastage_multiple_to_csv_combined(request, models_data, global_filters, related_keys)
def estimate_total_records(self, models_data, global_filters, related_keys, user):
"""Estima el total de registros para todos los modelos"""
total = 0
@@ -297,235 +276,6 @@ class ExportDataStageView(APIView):
response['Content-Disposition'] = 'attachment; filename="datastage_related_report.xlsx"'
return response
def export_datastage_multiple_partitioned_excel_agrupados(self, request, models_data, global_filters, related_keys):
"""Exporta múltiples modelos de DataStage agrupados en la misma hoja de Excel, con particionado por límite de registros"""
try:
from api.organization.models import Organizacion
org_mapping = {str(org.id): org.nombre for org in Organizacion.objects.all()}
# 1. Recopilar todos los datos FUERA del contexto ZIP
all_models_data = {}
model_field_mappings = {}
for model_data in models_data:
model_name = model_data.get('model')
fields = model_data.get('fields', [])
if not model_name or not fields:
continue
normalized_fields = []
for f in fields:
try:
key = f.strip() if isinstance(f, str) else f
except Exception:
key = f
if isinstance(key, str) and key.lower() == 'organizacion':
if 'organizacion_id' not in normalized_fields:
normalized_fields.append('organizacion_id')
else:
if key not in normalized_fields:
normalized_fields.append(key)
fields = normalized_fields
required_fields = ['seccion_aduanera', 'patente', 'pedimento']
for field in required_fields:
if field not in fields:
fields.append(field)
if 'organizacion_id' not in fields and 'organizacion_id' in [f.name for f in apps.get_model('datastage', model_name)._meta.get_fields()]:
fields.append('organizacion_id')
try:
model = apps.get_model('datastage', model_name)
filters = self.apply_related_filters(global_filters, model, related_keys, request.user)
if filters:
queryset = model.objects.filter(**filters).values(*fields)
else:
queryset = model.objects.none()
if queryset.count() == 0:
continue
relation_fields = [fn for fn in ['seccion_aduanera', 'patente', 'pedimento'] if fn in fields]
if not relation_fields:
relation_fields = ['datastage_id'] if 'datastage_id' in fields else [fields[0]]
if model_name not in model_field_mappings:
model_field_mappings[model_name] = fields
for record in queryset:
key_parts = [str(record[rf]) for rf in relation_fields if rf in record and record[rf] is not None]
if not key_parts:
import hashlib
key = hashlib.md5(str(sorted(record.items())).encode()).hexdigest()[:10]
else:
key = "_".join(key_parts)
processed_record = {}
for field_name, value in record.items():
if field_name == 'organizacion_id' and value:
org_id_str = str(value)
if org_id_str in org_mapping:
processed_value = org_mapping[org_id_str]
else:
try:
org = Organizacion.objects.filter(id=value).first()
processed_value = org.nombre if org else org_id_str
org_mapping[org_id_str] = processed_value
except Exception:
processed_value = org_id_str
else:
processed_value = value
if field_name in relation_fields:
prefixed_field_name = field_name
else:
prefixed_field_name = f"{model_name}_{field_name}"
if field_name == 'organizacion_id':
prefixed_field_name = prefixed_field_name.replace('organizacion_id', 'organizacion_nombre')
processed_record[prefixed_field_name] = self.safe_excel_value(processed_value)
if key not in all_models_data:
all_models_data[key] = {'relation_fields': {}, 'model_records': {}}
for rel_field in relation_fields:
if rel_field in record:
all_models_data[key]['relation_fields'][rel_field] = record[rel_field]
if model_name not in all_models_data[key]['model_records']:
all_models_data[key]['model_records'][model_name] = []
all_models_data[key]['model_records'][model_name].append(processed_record)
except LookupError:
continue
# 2. Sin datos → Excel vacío (no JSON 404 que rompe la descarga en el frontend)
if not all_models_data:
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Sin datos"
ws.append(["No se encontraron datos para los filtros especificados"])
output = io.BytesIO()
wb.save(output)
output.seek(0)
resp = HttpResponse(
output.read(),
content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
resp['Content-Disposition'] = 'attachment; filename="datastage_sin_datos.xlsx"'
return resp
# 3. Construir filas combinadas — repetir el último registro en lugar de dejar vacíos
combined_rows = []
for key, data in all_models_data.items():
relation_fields_data = data['relation_fields']
model_records = data['model_records']
max_records_per_key = max((len(recs) for recs in model_records.values()), default=1)
for i in range(max_records_per_key):
row_data = {}
for rel_field, rel_value in relation_fields_data.items():
row_data[rel_field] = self.safe_excel_value(rel_value)
for model_name, records in model_records.items():
# Usar posición i o el último registro disponible
record = records[i] if i < len(records) else records[-1]
for field_name, value in record.items():
row_data[field_name] = value
combined_rows.append(row_data)
# 4. Encabezados ordenados
all_fields_set = set()
for row in combined_rows:
all_fields_set.update(row.keys())
all_fields = []
for rel_field in ['seccion_aduanera', 'patente', 'pedimento']:
if rel_field in all_fields_set:
all_fields.append(rel_field)
all_fields_set.discard(rel_field)
org_fields = sorted(f for f in all_fields_set if 'organizacion' in f.lower())
for org_field in org_fields:
all_fields.append(org_field)
all_fields_set.discard(org_field)
all_fields.extend(sorted(all_fields_set))
# 5. Filas de título y fecha de generación
now_str = datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')
title_row = ["Reporte Datastage"]
date_row = [f"Generado: {now_str}"]
def _write_sheet(ws, sheet_name, page_rows):
ws.title = sheet_name[:31]
ws.append(title_row)
ws.append(date_row)
ws.append([])
ws.append(all_fields)
for row_data in page_rows:
ws.append([row_data.get(field, '') for field in all_fields])
for column in ws.columns:
max_length = 0
col_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except Exception:
pass
ws.column_dimensions[col_letter].width = min(max_length + 2, 50)
# 6. Excel directo si cabe en un archivo; ZIP solo si se necesita particionar
from django.core.paginator import Paginator
paginator = Paginator(combined_rows, self.MAX_RECORDS_PER_FILE)
if paginator.num_pages == 1:
wb = openpyxl.Workbook()
_write_sheet(wb.active, "Datastage", paginator.page(1).object_list)
output = io.BytesIO()
wb.save(output)
output.seek(0)
resp = HttpResponse(
output.read(),
content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
resp['Content-Disposition'] = 'attachment; filename="datastage_reporte.xlsx"'
return resp
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for page_num in paginator.page_range:
page = paginator.page(page_num)
current_wb = openpyxl.Workbook()
_write_sheet(current_wb.active, f"Datastage_p{page_num}", page.object_list)
part_buffer = io.BytesIO()
current_wb.save(part_buffer)
part_buffer.seek(0)
zip_file.writestr(f"datastage_part{page_num}.xlsx", part_buffer.getvalue())
zip_buffer.seek(0)
resp = HttpResponse(zip_buffer.read(), content_type='application/zip')
resp['Content-Disposition'] = 'attachment; filename="datastage_combinado.zip"'
return resp
except Exception as e:
import traceback
import logging
logging.getLogger(__name__).error("Error en exportación combinada: %s", traceback.format_exc())
return Response({'error': f'Error en exportación combinada: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def export_datastage_multiple_partitioned_excel_test_3(self, request, models_data, global_filters, related_keys):
"""Exporta múltiples modelos de DataStage agrupados en la misma hoja de Excel, con particionado por límite de registros"""
try:
@@ -1215,144 +965,6 @@ class ExportDataStageView(APIView):
except Exception as e:
return Response({'error': f'Error en exportación particionada: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def export_datastage_multiple_to_csv_combined(self, request, models_data, global_filters, related_keys):
"""Exporta múltiples modelos combinados en un único CSV plano (misma lógica de agrupación que el Excel)."""
import hashlib
import logging
import traceback
logger = logging.getLogger(__name__)
try:
from api.organization.models import Organizacion
org_mapping = {str(org.id): org.nombre for org in Organizacion.objects.all()}
all_models_data = {}
model_field_mappings = {}
for model_data in models_data:
model_name = model_data.get('model')
fields = model_data.get('fields', [])
if not model_name or not fields:
continue
normalized_fields = []
for f in fields:
key = f.strip() if isinstance(f, str) else f
if isinstance(key, str) and key.lower() == 'organizacion':
if 'organizacion_id' not in normalized_fields:
normalized_fields.append('organizacion_id')
else:
if key not in normalized_fields:
normalized_fields.append(key)
fields = normalized_fields
for req_field in ['seccion_aduanera', 'patente', 'pedimento']:
if req_field not in fields:
fields.append(req_field)
try:
model = apps.get_model('datastage', model_name)
model_field_names = [f.name for f in model._meta.get_fields() if hasattr(f, 'name')]
if 'organizacion_id' not in fields and 'organizacion_id' in model_field_names:
fields.append('organizacion_id')
filters = self.apply_related_filters(global_filters, model, related_keys, request.user)
queryset = model.objects.filter(**filters).values(*fields) if filters else model.objects.none()
if queryset.count() == 0:
continue
relation_fields = [fn for fn in ['seccion_aduanera', 'patente', 'pedimento'] if fn in fields]
if not relation_fields:
relation_fields = ['datastage_id'] if 'datastage_id' in fields else [fields[0]]
if model_name not in model_field_mappings:
model_field_mappings[model_name] = fields
for record in queryset:
key_parts = [str(record[rf]) for rf in relation_fields if rf in record and record[rf] is not None]
key = "_".join(key_parts) if key_parts else hashlib.md5(str(sorted(record.items())).encode()).hexdigest()[:10]
processed_record = {}
for field_name, value in record.items():
if field_name == 'organizacion_id' and value:
org_id_str = str(value)
processed_value = org_mapping.get(org_id_str, org_id_str)
else:
processed_value = value
if field_name in relation_fields:
prefixed = field_name
else:
prefixed = f"{model_name}_{field_name}"
if field_name == 'organizacion_id':
prefixed = prefixed.replace('organizacion_id', 'organizacion_nombre')
processed_record[prefixed] = self.safe_excel_value(processed_value)
if key not in all_models_data:
all_models_data[key] = {'relation_fields': {}, 'model_records': {}}
for rel_field in relation_fields:
if rel_field in record:
all_models_data[key]['relation_fields'][rel_field] = record[rel_field]
if model_name not in all_models_data[key]['model_records']:
all_models_data[key]['model_records'][model_name] = []
all_models_data[key]['model_records'][model_name].append(processed_record)
except LookupError:
continue
# Sin datos → CSV con mensaje, no error HTTP
if not all_models_data:
buf = io.StringIO()
csv.writer(buf).writerow(['No se encontraron datos para los filtros especificados'])
resp = HttpResponse(buf.getvalue(), content_type='text/csv; charset=utf-8')
resp['Content-Disposition'] = 'attachment; filename="datastage_sin_datos.csv"'
return resp
# Construir filas planas
combined_rows = []
for key, data in all_models_data.items():
relation_fields_data = data['relation_fields']
model_records = data['model_records']
max_records = max((len(recs) for recs in model_records.values()), default=1)
for i in range(max_records):
row_data = {}
for rel_field, rel_value in relation_fields_data.items():
row_data[rel_field] = self.safe_excel_value(rel_value)
for mn, records in model_records.items():
record = records[i] if i < len(records) else records[-1]
for field_name, value in record.items():
row_data[field_name] = value
combined_rows.append(row_data)
# Encabezados: campos de relación primero, luego org, luego el resto
all_fields_set = set()
for row in combined_rows:
all_fields_set.update(row.keys())
all_fields = []
for rel_field in ['seccion_aduanera', 'patente', 'pedimento']:
if rel_field in all_fields_set:
all_fields.append(rel_field)
all_fields_set.discard(rel_field)
org_fields = sorted(f for f in all_fields_set if 'organizacion' in f.lower())
for org_field in org_fields:
all_fields.append(org_field)
all_fields_set.discard(org_field)
all_fields.extend(sorted(all_fields_set))
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(all_fields)
for row_data in combined_rows:
writer.writerow([row_data.get(field, '') for field in all_fields])
resp = HttpResponse(buf.getvalue(), content_type='text/csv; charset=utf-8')
resp['Content-Disposition'] = 'attachment; filename="datastage_reporte.csv"'
return resp
except Exception as e:
logger.error("Error en exportación CSV combinada: %s", traceback.format_exc())
return Response({'error': f'Error en exportación CSV combinada: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def export_datastage_multiple_to_csv(self, request, models_data, global_filters, related_keys):
"""Exporta múltiples modelos de DataStage a múltiples archivos CSV en ZIP"""
zip_buffer = io.BytesIO()
@@ -1472,254 +1084,14 @@ class ExportDataStageView(APIView):
except Exception as e:
return Response({'error': f'Error en exportación CSV particionada: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def export_single_model_partitioned(self, request, model_name, fields, filters, total_records):
"""Exporta un solo modelo particionado a ZIP"""
try:
zip_buffer = io.BytesIO()
module = 'datastage'
model = apps.get_model(module, model_name)
queryset = model.objects.filter(**filters).values(*fields)
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
from django.core.paginator import Paginator
paginator = Paginator(queryset, self.MAX_RECORDS_PER_FILE)
for page_num in paginator.page_range:
page = paginator.page(page_num)
# Crear Excel para esta parte
wb = openpyxl.Workbook()
ws = wb.active
ws.title = f"Parte_{page_num}"[:31]
ws.append(fields)
for row in page.object_list:
row_values = [self.safe_excel_value(row[field]) for field in fields]
ws.append(row_values)
part_buffer = io.BytesIO()
wb.save(part_buffer)
part_buffer.seek(0)
filename = f"{model_name}_part{page_num}.xlsx"
zip_file.writestr(filename, part_buffer.getvalue())
zip_buffer.seek(0)
zip_content = zip_buffer.getvalue()
response = HttpResponse(zip_content, content_type='application/zip')
response['Content-Disposition'] = f'attachment; filename="{model_name}_particionado.zip"'
response['Content-Length'] = len(zip_content)
return response
except Exception as e:
return Response({'error': f'Error exportando modelo: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def export_single_model_csv_partitioned(self, request, model_name, fields, filters, total_records):
"""Exporta un solo modelo CSV particionado a ZIP"""
try:
zip_buffer = io.BytesIO()
module = 'datastage'
model = apps.get_model(module, model_name)
queryset = model.objects.filter(**filters).values(*fields)
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
from django.core.paginator import Paginator
paginator = Paginator(queryset, self.MAX_RECORDS_PER_FILE)
for page_num in paginator.page_range:
page = paginator.page(page_num)
csv_buffer = io.StringIO()
writer = csv.writer(csv_buffer)
writer.writerow(fields)
for row in page.object_list:
row_values = [self.safe_excel_value(row[field]) for field in fields]
writer.writerow(row_values)
# Agregar al ZIP
filename = f"{model_name}_part{page_num}.csv"
zip_file.writestr(filename, csv_buffer.getvalue())
zip_buffer.seek(0)
zip_content = zip_buffer.getvalue()
response = HttpResponse(zip_content, content_type='application/zip')
response['Content-Disposition'] = f'attachment; filename="{model_name}_particionado.zip"'
response['Content-Length'] = len(zip_content)
return response
except Exception as e:
return Response({'error': f'Error exportando modelo CSV: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def get_related_keys_from_filters(self, global_filters, models_data, user):
"""
Construye el conjunto de (patente, pedimento, datastage_id) que servirá como
llave de cruce entre modelos.
return datastage_export.get_related_keys_from_filters(global_filters, models_data)
Regla clave: si el filtro RFC está activo, solo los modelos que tienen el campo
'rfc' pueden contribuir a related_keys. Los modelos sin 'rfc' (ej. 505, 506)
no se usan como semilla — solo se filtrarán más tarde usando las claves ya
construidas, evitando que contaminen el resultado con pedimentos de otros RFC.
"""
related_keys = {
'patentes': set(),
'pedimentos': set(),
'datastage_ids': set()
}
# Sin filtros significativos → sin cruce
if not any(v for v in global_filters.values() if v not in [None, '']):
return {}
rfc_filter_active = bool(global_filters.get('rfc'))
date_filter_active = bool(global_filters.get('fecha_pago_desde') or global_filters.get('fecha_pago_hasta'))
all_records_with_filters = []
for model_data in models_data:
model_name = model_data.get('model')
try:
model = apps.get_model('datastage', model_name)
model_field_names = {f.name for f in model._meta.get_fields() if hasattr(f, 'name')}
# Un modelo puede ser semilla de related_keys SOLO si tiene campos
# para aplicar TODOS los filtros activos. Un modelo sin 'rfc' no puede
# ser semilla cuando hay filtro de RFC (contaminaría con pedimentos de
# otros RFCs). Igual para fecha_pago_real cuando hay filtro de fechas.
if rfc_filter_active and 'rfc' not in model_field_names:
continue
if date_filter_active and 'fecha_pago_real' not in model_field_names:
continue
filters = self.apply_global_filters_to_model(global_filters, model, user)
if not filters:
continue
records = model.objects.filter(**filters).values('patente', 'pedimento', 'datastage_id')
all_records_with_filters.extend(list(records))
except LookupError:
continue
if not all_records_with_filters:
return {'patentes': set(), 'pedimentos': set(), 'datastage_ids': set()}
for record in all_records_with_filters:
if record.get('patente'):
related_keys['patentes'].add(record['patente'])
if record.get('pedimento'):
related_keys['pedimentos'].add(record['pedimento'])
if record.get('datastage_id'):
related_keys['datastage_ids'].add(record['datastage_id'])
return {k: list(v) for k, v in related_keys.items() if v}
def apply_global_filters_to_model(self, global_filters, model, user):
"""
Aplica filtros globales - VERSIÓN CORREGIDA CON UUID
"""
filters = {}
model_fields = [f.name for f in model._meta.get_fields()]
# ORGANIZACIÓN - Manejar como UUID
org_value = global_filters.get('organizacion')
if org_value and org_value != '' and 'organizacion' in model_fields:
field = model._meta.get_field('organizacion')
if hasattr(field, 'related_model'): # Es ForeignKey
# Convertir string a UUID
try:
import uuid
org_uuid = uuid.UUID(org_value)
filters['organizacion_id'] = org_uuid
except Exception as e:
# Fallback: dejar como string (puede no funcionar)
filters['organizacion_id'] = org_value
else: # Es CharField
filters['organizacion'] = org_value
# RFC - Manejar normalmente
rfc_value = global_filters.get('rfc')
if rfc_value and rfc_value != '' and 'rfc' in model_fields:
filters['rfc'] = rfc_value
# PATENTE
if global_filters.get('patente'):
filters['patente'] = global_filters['patente']
# PEDIMENTO
if global_filters.get('pedimento'):
filters['pedimento'] = global_filters['pedimento']
# FECHAS
if 'fecha_pago_real' in model_fields:
if global_filters.get('fecha_pago_desde'):
filters['fecha_pago_real__gte'] = global_filters['fecha_pago_desde']
if global_filters.get('fecha_pago_hasta'):
filters['fecha_pago_real__lte'] = global_filters['fecha_pago_hasta']
return filters
return datastage_export.apply_global_filters_to_model(global_filters, model)
def apply_related_filters(self, global_filters, model, related_keys, user):
filters = {}
model_fields = [f.name for f in model._meta.get_fields()]
# 1. Organización — convertir a UUID igual que apply_global_filters_to_model
if 'organizacion' in model_fields and global_filters.get('organizacion'):
org_value = global_filters['organizacion']
try:
field = model._meta.get_field('organizacion')
if hasattr(field, 'related_model'):
filters['organizacion_id'] = uuid.UUID(org_value)
else:
filters['organizacion'] = org_value
except Exception:
filters['organizacion_id'] = org_value
# 2. RFC (¡ESTO ES LO QUE FALTA!)
if 'rfc' in model_fields and global_filters.get('rfc'):
filters['rfc'] = global_filters['rfc']
# 3. Fechas (SIEMPRE se aplican)
if 'fecha_pago_real' in model_fields:
if global_filters.get('fecha_pago_desde'):
filters['fecha_pago_real__gte'] = global_filters['fecha_pago_desde']
if global_filters.get('fecha_pago_hasta'):
filters['fecha_pago_real__lte'] = global_filters['fecha_pago_hasta']
# 🔥 SEGUNDO: Si hay related_keys, AÑADIRLAS a los filtros existentes
if any(related_keys.values()):
# Añadir patentes si existen
if related_keys.get('patentes') and 'patente' in model_fields:
filters['patente__in'] = related_keys['patentes']
# Añadir pedimentos si existen
if related_keys.get('pedimentos') and 'pedimento' in model_fields:
filters['pedimento__in'] = related_keys['pedimentos']
# Añadir datastage_ids si existen
if related_keys.get('datastage_ids') and 'datastage_id' in model_fields:
filters['datastage_id__in'] = related_keys['datastage_ids']
else:
# Solo patente y pedimento específicos (no listas)
if 'patente' in model_fields and global_filters.get('patente'):
filters['patente'] = global_filters['patente']
if 'pedimento' in model_fields and global_filters.get('pedimento'):
filters['pedimento'] = global_filters['pedimento']
return filters
return datastage_export.apply_related_filters(global_filters, model, related_keys)
def estimate_excel_file_size(self, num_records, num_columns):
"""Estima tamaño aproximado del archivo Excel"""