import io import logging import os import tempfile import traceback from collections import defaultdict import openpyxl from openpyxl.styles import Alignment, Font, PatternFill from celery import shared_task from celery.exceptions import SoftTimeLimitExceeded from django.core.files.uploadedfile import SimpleUploadedFile from django.db.models import Q from django.utils import timezone from api.customs.models import Cove, EDocument, Partida, Pedimento from api.organization.models import Organizacion from api.record.models import Document from api.reports.models import ReportDocument from api.utils.storage_service import storage_service from core.redis_events import publish_task_event logger = logging.getLogger('api.reports.tasks') # ── helpers ─────────────────────────────────────────────────────────────────── def _estado(flag: bool) -> str: return 'RECUPERADO' if flag else 'PENDIENTE' def _build_pedimento_filters(filters: dict) -> Q: q = Q() if filters.get('organizacion_id'): q &= Q(organizacion_id=filters['organizacion_id']) if filters.get('fecha_pago__gte'): q &= Q(fecha_pago__gte=filters['fecha_pago__gte']) if filters.get('fecha_pago__lte'): q &= Q(fecha_pago__lte=filters['fecha_pago__lte']) if filters.get('patente'): q &= Q(patente=filters['patente']) if filters.get('aduana'): q &= Q(aduana=filters['aduana']) if filters.get('pedimento'): q &= Q(pedimento=filters['pedimento']) if filters.get('pedimento_app'): q &= Q(pedimento_app=filters['pedimento_app']) if filters.get('regimen'): q &= Q(regimen=filters['regimen']) if filters.get('tipo_operacion'): q &= Q(tipo_operacion_id=filters['tipo_operacion']) rfc_val = filters.get('contribuyente__rfc') if rfc_val: if rfc_val == 'SIN_RFC': q &= Q(contribuyente__isnull=True) else: q &= Q(contribuyente__rfc=rfc_val) return q def _apply_user_rfc_filter(q: Q, user, requested_rfc: str | None) -> Q: """Restringe el queryset a los importadores visibles del usuario.""" # SIN_RFC ya fue aplicado en _build_pedimento_filters como contribuyente__isnull=True if requested_rfc == 'SIN_RFC': return q user_rfcs = user.rfc.all() if not user_rfcs.exists(): if requested_rfc: q &= Q(contribuyente__rfc=requested_rfc) return q if requested_rfc: if user_rfcs.filter(rfc=requested_rfc).exists(): q &= Q(contribuyente__rfc=requested_rfc) else: q &= Q(contribuyente__in=user_rfcs) else: q &= Q(contribuyente__in=user_rfcs) return q # ── tarea principal ─────────────────────────────────────────────────────────── @shared_task(bind=True, queue='reports', soft_time_limit=600, time_limit=660) def generate_report_document(self, report_id): task_id = self.request.id report = None def _fail(msg, exc=None): """Marca el reporte como error, notifica al frontend y loguea. Sin re-raise.""" tb = traceback.format_exc() if exc else '' full_msg = f"{msg}\n\n{tb}".strip() if tb else msg logger.error('[reporte_cumplimiento] report=%s FALLO: %s', report_id, full_msg) if report: report.status = 'error' report.error_message = full_msg report.finished_at = timezone.now() report.save(update_fields=['status', 'error_message', 'finished_at']) publish_task_event(task_id, 'failed', msg, progress=0) # ── 1. Obtener reporte ──────────────────────────────────────────────────── try: report = ReportDocument.objects.get(id=report_id) except ReportDocument.DoesNotExist: logger.error('[reporte_cumplimiento] ReportDocument %s no existe', report_id) publish_task_event(task_id, 'failed', f'Reporte {report_id} no encontrado', progress=0) return logger.info('[reporte_cumplimiento] Iniciando report=%s user=%s', report_id, report.user_id) report.status = 'processing' report.save(update_fields=['status']) publish_task_event(task_id, 'processing', 'Iniciando generación de reporte...', progress=5) try: filters = report.filters or {} org_id = filters.get('organizacion_id') # ── 2. Filtros y organización ───────────────────────────────────────── q = _build_pedimento_filters(filters) q = _apply_user_rfc_filter(q, report.user, filters.get('contribuyente__rfc')) nombre_org = '' if org_id: try: nombre_org = Organizacion.objects.get(id=org_id).nombre except Organizacion.DoesNotExist: pass logger.info('[reporte_cumplimiento] report=%s org=%s filtros=%s', report_id, nombre_org, filters) publish_task_event(task_id, 'processing', f'Consultando RFCs de {nombre_org}...', progress=10) # ── 3. Listar RFCs (consulta liviana) ──────────────────────────────── rfcs_list = list( Pedimento.objects.filter(q) .exclude(contribuyente__isnull=True) .values_list('contribuyente__rfc', flat=True) .distinct() .order_by('contribuyente__rfc') ) if Pedimento.objects.filter(q, contribuyente__isnull=True).exists(): rfcs_list.append('SIN_RFC') total_rfcs = len(rfcs_list) total_pedimentos = Pedimento.objects.filter(q).count() logger.info('[reporte_cumplimiento] report=%s total_rfcs=%d total_pedimentos=%d', report_id, total_rfcs, total_pedimentos) if total_rfcs == 0: logger.warning('[reporte_cumplimiento] report=%s sin pedimentos para los filtros dados', report_id) publish_task_event( task_id, 'processing', f'{total_rfcs} RFC(s) — {total_pedimentos} pedimentos', progress=15, ) # ── 4. Crear workbook ───────────────────────────────────────────────── wb = openpyxl.Workbook() ws = wb.active ws.title = 'Reporte Cumplimiento' title_fill = PatternFill(start_color='1F4E79', end_color='1F4E79', fill_type='solid') title_font = Font(color='FFFFFF', bold=True, size=12) sub_fill = PatternFill(start_color='2E75B6', end_color='2E75B6', fill_type='solid') sub_font = Font(color='FFFFFF', bold=True, size=10) col_h_fill = PatternFill(start_color='D6E4F0', end_color='D6E4F0', fill_type='solid') col_h_font = Font(bold=True, size=10) footer_fill = PatternFill(start_color='E2EFDA', end_color='E2EFDA', fill_type='solid') center = Alignment(horizontal='center', vertical='center', wrap_text=True) top_left = Alignment(horizontal='left', vertical='top', wrap_text=True) COL_HEADERS = [ 'Año', 'Aduana', 'Patente', 'Pedimento', 'Nomenclatura Completo Pedimento', 'Clav', 'Tipo Operación', 'Expediente Sí', 'Documento', 'Estatus', ] TOTAL_COLS = len(COL_HEADERS) current_row = 1 safe_total = max(total_rfcs, 1) # ── 5. Procesar RFC por RFC ─────────────────────────────────────────── for rfc_idx, rfc in enumerate(rfcs_list): pct = 20 + int((rfc_idx / safe_total) * 65) publish_task_event( task_id, 'processing', f'RFC {rfc_idx + 1}/{total_rfcs}: {rfc}', progress=pct, ) rfc_q = ( q & Q(contribuyente__isnull=True) if rfc == 'SIN_RFC' else q & Q(contribuyente__rfc=rfc) ) peds = list( Pedimento.objects.filter(rfc_q) .select_related('contribuyente', 'tipo_operacion') .order_by('fecha_pago') ) if not peds: logger.warning('[reporte_cumplimiento] report=%s rfc=%s sin pedimentos, omitido', report_id, rfc) continue ped_ids = [p.id for p in peds] razon_social = nombre_org or 'Desconocido' logger.info('[reporte_cumplimiento] report=%s rfc=%s pedimentos=%d', report_id, rfc, len(peds)) # documentos de este RFC solamente coves_map: dict = defaultdict(list) for c in Cove.objects.filter(pedimento_id__in=ped_ids): coves_map[c.pedimento_id].append(c) edocs_map: dict = defaultdict(list) for e in EDocument.objects.filter(pedimento_id__in=ped_ids): edocs_map[e.pedimento_id].append(e) partidas_map: dict = defaultdict(list) for p in Partida.objects.filter(pedimento_id__in=ped_ids).order_by('numero_partida'): partidas_map[p.pedimento_id].append(p) remesa_ped_ids: set = set( Document.objects.filter(pedimento_id__in=ped_ids, document_type_id=15) .values_list('pedimento_id', flat=True) ) total_coves = sum(len(v) for v in coves_map.values()) total_edocs = sum(len(v) for v in edocs_map.values()) total_partidas = sum(len(v) for v in partidas_map.values()) est_rows = len(peds) + total_partidas + total_coves * 2 + total_edocs * 2 + len(remesa_ped_ids) logger.info('[reporte_cumplimiento] report=%s rfc=%s docs coves=%d edocs=%d partidas=%d remesas=%d filas_estimadas=%d', report_id, rfc, total_coves, total_edocs, total_partidas, len(remesa_ped_ids), est_rows) # encabezado sección ws.merge_cells(start_row=current_row, start_column=1, end_row=current_row, end_column=TOTAL_COLS) cell = ws.cell(row=current_row, column=1, value='Reporte Integración de Expedientes.') cell.fill, cell.font, cell.alignment = title_fill, title_font, center current_row += 1 ws.merge_cells(start_row=current_row, start_column=1, end_row=current_row, end_column=TOTAL_COLS) cell = ws.cell(row=current_row, column=1, value=f'Razón Social Importador: {razon_social}') cell.fill, cell.font = sub_fill, sub_font current_row += 1 ws.merge_cells(start_row=current_row, start_column=1, end_row=current_row, end_column=TOTAL_COLS) cell = ws.cell(row=current_row, column=1, value=f'RFC: {rfc}') cell.fill, cell.font = sub_fill, sub_font current_row += 1 for col_i, header in enumerate(COL_HEADERS, 1): cell = ws.cell(row=current_row, column=col_i, value=header) cell.fill, cell.font, cell.alignment = col_h_fill, col_h_font, center current_row += 1 total_exp = len(peds) exp_con_docs = exp_completos = 0 for ped in peds: doc_rows = [('PEDIMENTO COMPLETO', _estado(ped.existe_expediente))] for partida in partidas_map[ped.id]: doc_rows.append((f'PARTIDA{partida.numero_partida}', _estado(partida.descargado))) if ped.remesas: doc_rows.append(('REMESA', _estado(ped.id in remesa_ped_ids))) for cove in coves_map[ped.id]: doc_rows.append((f'COVE{cove.numero_cove}', _estado(cove.cove_descargado))) doc_rows.append((f'ACUSE COVE{cove.numero_cove}', _estado(cove.acuse_cove_descargado))) for edoc in edocs_map[ped.id]: doc_rows.append((f'EDOCUMENTO{edoc.numero_edocument}', _estado(edoc.edocument_descargado))) doc_rows.append((f'ACUSE EDOCUMENTO{edoc.numero_edocument}', _estado(edoc.acuse_descargado))) if len(doc_rows) > 1: exp_con_docs += 1 if all(e == 'RECUPERADO' for _, e in doc_rows): exp_completos += 1 n_rows = len(doc_rows) start_row = current_row anio = ped.fecha_pago.year % 100 if ped.fecha_pago else '' base_vals = [ anio, ped.aduana or '', ped.patente or '', ped.pedimento or '', ped.pedimento_app or '', ped.clave_pedimento or '', ped.tipo_operacion.tipo if ped.tipo_operacion else '', 'SI' if ped.existe_expediente else 'NO', ] # Sin merge_cells — para datasets grandes merge es O(n^2) y cuelga el proceso. # Los datos base solo se escriben en la primera fila; el resto queda vacío, # visualmente equivalente al merge pero sin el costo de memoria/CPU. for offset, (doc_nombre, doc_est) in enumerate(doc_rows): r = start_row + offset if offset == 0: for col, val in enumerate(base_vals, 1): ws.cell(row=r, column=col, value=val) ws.cell(row=r, column=9, value=doc_nombre) ws.cell(row=r, column=10, value=doc_est) current_row += n_rows ws.merge_cells(start_row=current_row, start_column=1, end_row=current_row, end_column=TOTAL_COLS) cell = ws.cell( row=current_row, column=1, value=(f'Total de Expedientes= {total_exp} ' f'Total De Expedientes Con Documentos= {exp_con_docs} ' f'Total De Expedientes Completos= {exp_completos}'), ) cell.fill = footer_fill cell.font = Font(bold=True) current_row += 2 del peds, ped_ids, coves_map, edocs_map, partidas_map, remesa_ped_ids for i, w in enumerate([6, 8, 8, 12, 32, 8, 16, 12, 32, 14], 1): ws.column_dimensions[openpyxl.utils.get_column_letter(i)].width = w # ── 6. Serializar y subir ───────────────────────────────────────────── logger.info('[reporte_cumplimiento] report=%s serializando Excel...', report_id) publish_task_event(task_id, 'processing', 'Serializando Excel...', progress=88) filename = f"reporte_cumplimiento_{report.id}_{timezone.now().strftime('%Y%m%d%H%M%S')}.xlsx" buf = io.BytesIO() wb.save(buf) excel_bytes = buf.getvalue() logger.info('[reporte_cumplimiento] report=%s Excel size=%.1fKB', report_id, len(excel_bytes) / 1024) publish_task_event(task_id, 'processing', 'Subiendo a almacenamiento...', progress=93) ruta = storage_service.save_report( file=SimpleUploadedFile( name=filename, content=excel_bytes, content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', ), organizacion_id=org_id, metadata={ 'report_id': str(report.id), 'report_type': 'cumplimiento', 'user_id': str(report.user.id) if report.user else None, }, ) if ruta: logger.info('[reporte_cumplimiento] report=%s guardado en storage=%s', report_id, ruta) report.file = ruta report.status = 'ready' else: _fail('Error al guardar el archivo en almacenamiento (storage retornó None)') return report.finished_at = timezone.now() report.save(update_fields=['status', 'file', 'finished_at', 'error_message']) resultado = { 'report_id': str(report.id), 'total_rfcs': total_rfcs, 'total_pedimentos': total_pedimentos, } publish_task_event(task_id, 'completed', 'Reporte generado exitosamente.', progress=100, resultado=resultado) logger.info('[reporte_cumplimiento] report=%s COMPLETADO rfcs=%d pedimentos=%d', report_id, total_rfcs, total_pedimentos) return resultado except SoftTimeLimitExceeded: _fail('El reporte tardó más de 10 minutos y fue cancelado. Intenta con un rango de fechas más acotado.') except Exception as exc: _fail(str(exc), exc=exc) # ── reporte de control de pedimentos (sin cambios) ──────────────────────────── @shared_task def generate_report_control_pedimento(report_id): report = None try: report = ReportDocument.objects.get(id=report_id) report.status = 'processing' report.save(update_fields=['status']) filters = report.filters or {} pedimentos_filters = {} if filters.get('organizacion_id'): pedimentos_filters['organizacion_id'] = filters['organizacion_id'] if filters.get('fecha_pago__gte'): pedimentos_filters['fecha_pago__gte'] = filters['fecha_pago__gte'] if filters.get('fecha_pago__lte'): pedimentos_filters['fecha_pago__lte'] = filters['fecha_pago__lte'] if filters.get('pedimento_app'): pedimentos_filters['pedimento_app'] = filters['pedimento_app'] pedimentos_qs = Pedimento.objects.filter(**pedimentos_filters) pedimentos_total = pedimentos_qs.count() pedimento_ids = list(pedimentos_qs.values_list('id', flat=True)) rfcs_raw = list(pedimentos_qs.values_list('agente_aduanal', flat=True)) pedimentos_completos = 0 total_documentos = 0 documentos_sin_descargar = 0 nombre_organizacion = '' if filters.get('organizacion_id'): try: organizacion = Organizacion.objects.get(id=filters['organizacion_id']) nombre_organizacion = organizacion.nombre except Organizacion.DoesNotExist: nombre_organizacion = f"ID: {filters['organizacion_id']}" except Exception as e: nombre_organizacion = f"Error: {str(e)}" rfc_list = ', '.join(sorted(set([rfc for rfc in rfcs_raw if rfc]))) fecha_inicio = '' fecha_fin = '' if pedimentos_qs.exists(): primer_pedimento = pedimentos_qs.order_by('fecha_pago').first() if primer_pedimento and primer_pedimento.fecha_pago: fecha_inicio = primer_pedimento.fecha_pago.strftime('%Y-%m-%d') ultimo_pedimento = pedimentos_qs.order_by('-fecha_pago').first() if ultimo_pedimento and ultimo_pedimento.fecha_pago: fecha_fin = ultimo_pedimento.fecha_pago.strftime('%Y-%m-%d') for pedimento in pedimentos_qs: docs_pedimento = 0 docs_pendientes_pedimento = 0 coves_count = Cove.objects.filter(pedimento_id=pedimento.id).count() coves_pendientes = Cove.objects.filter(pedimento_id=pedimento.id, cove_descargado=False).count() docs_pedimento += coves_count docs_pendientes_pedimento += coves_pendientes partidas_count = Partida.objects.filter(pedimento_id=pedimento.id).count() partidas_pendientes = Partida.objects.filter(pedimento_id=pedimento.id, descargado=False).count() docs_pedimento += partidas_count docs_pendientes_pedimento += partidas_pendientes edocs_count = EDocument.objects.filter(pedimento_id=pedimento.id).count() edocs_pendientes = EDocument.objects.filter(pedimento_id=pedimento.id, edocument_descargado=False).count() docs_pedimento += edocs_count docs_pendientes_pedimento += edocs_pendientes total_documentos += docs_pedimento documentos_sin_descargar += docs_pendientes_pedimento if docs_pendientes_pedimento == 0 and docs_pedimento > 0: pedimentos_completos += 1 porcentaje_faltantes = (documentos_sin_descargar / total_documentos * 100) if total_documentos > 0 else 0 filename = f"report_{report.id}_{timezone.now().strftime('%Y%m%d%H%M%S')}.csv" with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv', encoding='utf-8', newline='') as tmp: tmp_path = tmp.name todas_las_filas = [] for pedimento in pedimentos_qs: datos_base_pedimento = [ pedimento.aduana or '', pedimento.patente or '', pedimento.regimen or '', pedimento.pedimento or '', pedimento.pedimento_app or '', pedimento.clave_pedimento or '', pedimento.tipo_operacion.tipo if pedimento.tipo_operacion else '', str(pedimento.contribuyente_id) if pedimento.contribuyente_id else '' ] coves = Cove.objects.filter(pedimento_id=pedimento.id) for cove in coves: estado = 'VERDADERO' if cove.cove_descargado else 'FALSO' fila = datos_base_pedimento + [cove.numero_cove, 'COVE', estado] todas_las_filas.append(fila) partidas = Partida.objects.filter(pedimento_id=pedimento.id) for partida in partidas: estado = 'VERDADERO' if partida.descargado else 'FALSO' fila = datos_base_pedimento + [partida.numero_partida, 'PARTIDA', estado] todas_las_filas.append(fila) edocuments = EDocument.objects.filter(pedimento_id=pedimento.id) for edoc in edocuments: estado = 'VERDADERO' if edoc.edocument_descargado else 'FALSO' fila = datos_base_pedimento + [edoc.numero_edocument, 'EDOCUMENT', estado] todas_las_filas.append(fila) import csv with open(tmp_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['RESUMEN DEL REPORTE - CONTROL DE PEDIMENTOS']) writer.writerow(['ORGANIZACION:', nombre_organizacion]) writer.writerow([]) writer.writerow(['TOTAL DE EXPEDIENTES:', pedimentos_total]) writer.writerow(['TOTAL DE EXPEDIENTES COMPLETOS:', pedimentos_completos]) writer.writerow(['TOTAL DE DOCUMENTOS:', total_documentos]) writer.writerow(['DOCUMENTOS SIN DESCARGAR:', documentos_sin_descargar]) writer.writerow(['PORCENTAJE DE DOCUMENTOS FALTANTES (%):', f"{porcentaje_faltantes:.2f}%"]) writer.writerow(['DESDE: ', fecha_inicio, ' HASTA: ', fecha_fin]) writer.writerow(['LISTA RFC:', rfc_list]) writer.writerow([]) writer.writerow([]) headers = [ 'ADUANA', 'PATENTE', 'REGIMEN', 'NO. PEDIMENTO', 'PEDIMENTO_APP', 'CLAVE_PEDIMENTO', 'TIPO_OPERACION', 'CONTRIBUYENTE_ID', 'IDENTIFICADOR_DOCUMENTO', 'TIPO_DOCUMENTO', 'ESTADO' ] writer.writerow(headers) for fila in todas_las_filas: writer.writerow(fila) with open(tmp_path, 'rb') as f: file_content = f.read() uploaded_file = SimpleUploadedFile( name=filename, content=file_content, content_type='text/csv' ) ruta = storage_service.save_report( file=uploaded_file, organizacion_id=filters.get('organizacion_id'), metadata={ 'report_id': str(report.id), 'report_type': 'control_pedimento', 'user_id': str(report.user.id) if report.user else None } ) os.unlink(tmp_path) if ruta: report.file = ruta report.status = 'ready' else: report.status = 'error' report.error_message = 'Error al guardar el archivo en storage' report.finished_at = timezone.now() report.save(update_fields=['status', 'file', 'finished_at', 'error_message']) except Exception as e: if report: report.status = 'error' report.error_message = str(e) report.finished_at = timezone.now() report.save(update_fields=['status', 'error_message', 'finished_at'])