From 709a5dedabd572aad7919f155e8011fe8011ed24 Mon Sep 17 00:00:00 2001 From: marcos Date: Thu, 28 May 2026 07:10:39 -0600 Subject: [PATCH] feature/pedimentos-correccion-partidas --- .../0005_customuser_rfc_fk_to_m2m.py | 57 +++ .../0006_customuser_active_organization.py | 25 + .../management/commands/fix_archivo_case.py | 117 +++++ .../management/commands/fix_partidas_error.py | 382 ++++++++++++++ api/customs/migrations/0017_bulkuploadtask.py | 50 ++ ...lter_pedimento_unique_together_and_more.py | 21 + .../0019_pedimento_consultar_vucem.py | 18 + api/customs/tasks/auditoria.py | 135 ++++- api/customs/tasks/auto_corregir.py | 477 ++++++++++++++++++ api/customs/tasks/internal_services.py | 120 +++-- api/customs/tasks/microservice_v2.py | 4 +- api/customs/urls.py | 8 + api/customs/views_auditor.py | 245 ++++++++- .../0012_alter_datastage_archivo.py | 18 + .../0013_registro501_add_timestamps.py | 26 + .../0014_all_registros_add_created_at.py | 44 ++ .../migrations/0002_notificacion_datos.py | 18 + api/notificaciones/models.py | 1 + api/notificaciones/serializers.py | 3 +- api/notificaciones/views.py | 12 +- .../0003_organizacion_apply_auto_download.py | 18 + .../migrations/0004_organizacion_owner.py | 25 + .../0005_alter_rolepermission_id.py | 18 + api/record/migrations/0003_document_vu.py | 18 + api/record/views.py | 34 +- .../0002_reportdocument_report_type.py | 18 + .../0003_alter_reportdocument_file.py | 18 + .../0012_alter_vucem_cer_alter_vucem_key.py | 23 + core/redis_events.py | 42 ++ 29 files changed, 1908 insertions(+), 87 deletions(-) create mode 100644 api/cuser/migrations/0005_customuser_rfc_fk_to_m2m.py create mode 100644 api/cuser/migrations/0006_customuser_active_organization.py create mode 100644 api/customs/management/commands/fix_archivo_case.py create mode 100644 api/customs/management/commands/fix_partidas_error.py create mode 100644 api/customs/migrations/0017_bulkuploadtask.py create mode 100644 api/customs/migrations/0018_alter_pedimento_unique_together_and_more.py create mode 100644 api/customs/migrations/0019_pedimento_consultar_vucem.py create mode 100644 api/customs/tasks/auto_corregir.py create mode 100644 api/datastage/migrations/0012_alter_datastage_archivo.py create mode 100644 api/datastage/migrations/0013_registro501_add_timestamps.py create mode 100644 api/datastage/migrations/0014_all_registros_add_created_at.py create mode 100644 api/notificaciones/migrations/0002_notificacion_datos.py create mode 100644 api/organization/migrations/0003_organizacion_apply_auto_download.py create mode 100644 api/organization/migrations/0004_organizacion_owner.py create mode 100644 api/rbac/migrations/0005_alter_rolepermission_id.py create mode 100644 api/record/migrations/0003_document_vu.py create mode 100644 api/reports/migrations/0002_reportdocument_report_type.py create mode 100644 api/reports/migrations/0003_alter_reportdocument_file.py create mode 100644 api/vucem/migrations/0012_alter_vucem_cer_alter_vucem_key.py create mode 100644 core/redis_events.py diff --git a/api/cuser/migrations/0005_customuser_rfc_fk_to_m2m.py b/api/cuser/migrations/0005_customuser_rfc_fk_to_m2m.py new file mode 100644 index 0000000..9f1ceca --- /dev/null +++ b/api/cuser/migrations/0005_customuser_rfc_fk_to_m2m.py @@ -0,0 +1,57 @@ +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +def copiar_rfc_a_m2m(apps, schema_editor): + """Copia el RFC singular (FK) al lado M2M antes de eliminar el FK.""" + CustomUser = apps.get_model('cuser', 'CustomUser') + db_alias = schema_editor.connection.alias + for user in CustomUser.objects.using(db_alias).filter(rfc_old__isnull=False): + user.rfc.add(user.rfc_old) + + +def revertir_m2m_a_fk(apps, schema_editor): + """En reversa: toma el primer RFC del M2M y lo pone de vuelta en el FK temporal.""" + CustomUser = apps.get_model('cuser', 'CustomUser') + db_alias = schema_editor.connection.alias + for user in CustomUser.objects.using(db_alias).prefetch_related('rfc'): + primer_rfc = user.rfc.first() + if primer_rfc: + user.rfc_old = primer_rfc + user.save(update_fields=['rfc_old']) + + +class Migration(migrations.Migration): + + dependencies = [ + ('cuser', '0004_alter_customuser_rfc'), + ('customs', '0015_partida_updated_at'), + ] + + operations = [ + # 1. Renombrar el FK actual a rfc_old para preservar los datos + migrations.RenameField( + model_name='customuser', + old_name='rfc', + new_name='rfc_old', + ), + # 2. Crear el nuevo campo M2M + migrations.AddField( + model_name='customuser', + name='rfc', + field=models.ManyToManyField( + blank=True, + help_text='RFCs de importadores asociados al usuario', + related_name='users', + to='customs.importador', + ), + ), + # 3. Copiar datos del FK al M2M + migrations.RunPython(copiar_rfc_a_m2m, revertir_m2m_a_fk), + # 4. Eliminar el FK temporal + migrations.RemoveField( + model_name='customuser', + name='rfc_old', + ), + ] diff --git a/api/cuser/migrations/0006_customuser_active_organization.py b/api/cuser/migrations/0006_customuser_active_organization.py new file mode 100644 index 0000000..fdb91bc --- /dev/null +++ b/api/cuser/migrations/0006_customuser_active_organization.py @@ -0,0 +1,25 @@ +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('cuser', '0005_customuser_rfc_fk_to_m2m'), + ('organization', '0003_organizacion_apply_auto_download'), + ] + + operations = [ + migrations.AddField( + model_name='customuser', + name='active_organization', + field=models.ForeignKey( + blank=True, + help_text='Solo superusuarios: organización activa para contexto de trabajo', + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name='superusers_activos', + to='organization.organizacion', + ), + ), + ] diff --git a/api/customs/management/commands/fix_archivo_case.py b/api/customs/management/commands/fix_archivo_case.py new file mode 100644 index 0000000..7692206 --- /dev/null +++ b/api/customs/management/commands/fix_archivo_case.py @@ -0,0 +1,117 @@ +""" +Corrige el mismatch de case entre el campo `archivo` en BD y los nombres +reales de los objetos en MinIO. + +Causa habitual: transferencia de archivos de producción a local lowercaseó +los filenames, pero la BD conserva los nombres originales con mayúsculas. + +Estrategia: para cada Document cuyo `archivo` no exista en MinIO con el +nombre exacto, intenta el filename en minúsculas. Si lo encuentra, actualiza +el campo en BD. Los archivos que ya coinciden no se tocan. + +Uso: + python manage.py fix_archivo_case --pedimento --dry-run + python manage.py fix_archivo_case --pedimento + python manage.py fix_archivo_case --organizacion --dry-run + python manage.py fix_archivo_case --organizacion +""" +import posixpath + +from django.core.management.base import BaseCommand, CommandError + +from api.customs.models import Pedimento +from api.record.models import Document +from api.utils.minio_client import minio_client + + +class Command(BaseCommand): + help = "Corrige mismatch de case entre campo archivo en BD y MinIO." + + def add_arguments(self, parser): + parser.add_argument( + "--pedimento", metavar="UUID", + help="UUID del pedimento a corregir.", + ) + parser.add_argument( + "--organizacion", metavar="UUID", + help="UUID de la organización.", + ) + parser.add_argument( + "--dry-run", action="store_true", + help="Solo diagnóstico, sin aplicar cambios.", + ) + + def handle(self, *args, **options): + ped_id = options.get("pedimento") + org_id = options.get("organizacion") + dry_run = options["dry_run"] + + if dry_run: + self.stdout.write(self.style.WARNING( + "=== MODO PRUEBA (--dry-run): Sin cambios en BD ===\n" + )) + + qs = Document.objects.all() + if ped_id: + try: + ped = Pedimento.objects.get(id=ped_id) + except Pedimento.DoesNotExist: + raise CommandError(f"Pedimento {ped_id!r} no encontrado.") + qs = qs.filter(pedimento=ped) + self.stdout.write(f"Pedimento: {ped.pedimento_app}\n") + elif org_id: + qs = qs.filter(organizacion_id=org_id) + + total = qs.count() + self.stdout.write(f"Documentos a revisar: {total}\n") + + ok = mismatch = not_found = 0 + + for doc in qs.iterator(chunk_size=500): + name = doc.archivo.name if doc.archivo else None + if not name: + continue + + if minio_client.file_exists(name): + ok += 1 + continue + + lower_name = self._lower_filename(name) + if lower_name == name: + not_found += 1 + continue + + if minio_client.file_exists(lower_name): + mismatch += 1 + self.stdout.write( + f" {'[DRY]' if dry_run else '[FIX]'} doc {doc.id}:\n" + f" BD : {name}\n" + f" MinIO : {lower_name}\n" + ) + if not dry_run: + doc.archivo.name = lower_name + doc.save(update_fields=["archivo"]) + else: + not_found += 1 + + self.stdout.write( + f"\n{'─' * 60}\nRESUMEN\n" + f" Coinciden exacto : {ok}\n" + f" Mismatch de case : {mismatch}\n" + f" No encontrados : {not_found}\n" + ) + + if dry_run and mismatch: + self.stdout.write(self.style.WARNING( + "\nEjecuta sin --dry-run para aplicar los cambios." + )) + elif not dry_run and mismatch: + self.stdout.write(self.style.SUCCESS( + f"\n{mismatch} registros actualizados en BD." + )) + + def _lower_filename(self, name): + """Lowercase solo el filename, preserva el path del directorio.""" + dir_part = posixpath.dirname(name) + filename = posixpath.basename(name) + return posixpath.join(dir_part, filename.lower()) diff --git a/api/customs/management/commands/fix_partidas_error.py b/api/customs/management/commands/fix_partidas_error.py new file mode 100644 index 0000000..a66b01b --- /dev/null +++ b/api/customs/management/commands/fix_partidas_error.py @@ -0,0 +1,382 @@ +""" +Diagnóstico y corrección de partidas con descargado=True cuyos documentos +de respuesta VUCEM contienen true. + +Convenciones de nomenclatura del microservicio: + - REQUEST (type 17): vu_PT_{pedimento_app}_{partida}_REQUEST.xml + - ERROR (type 18): vu_PT_{pedimento_app}_{partida}_ERROR.xml + - Éxito (type 1): vu_PT_{pedimento_app}_{partida}.xml + +Acciones por cada documento con error VUCEM encontrado: + - document_type_id: actual → 18 (PT ERROR) + - archivo: renombrado a vu_PT_{pedimento_app}_{partida}_ERROR.xml + - Partida.descargado: True → False + +Criterio de pedimento malformado (cualquiera de): + - aduana: nulo/vacío o len < 3 + - numero_operacion: nulo o vacío + - patente: nulo/vacío o len < 4 + - pedimento (campo): nulo/vacío o len < 7 + +Uso: + python manage.py fix_partidas_error --pedimento --dry-run + python manage.py fix_partidas_error --organizacion --dry-run + python manage.py fix_partidas_error --organizacion + python manage.py fix_partidas_error --dry-run # todas las orgs +""" +import io +import posixpath + +from django.core.management.base import BaseCommand, CommandError +from django.db import transaction +from django.db.models import Q +from django.db.models.functions import Length + +from api.customs.models import Partida, Pedimento +from api.record.models import Document +from api.utils.minio_client import minio_client + +_PT_REQUEST = 17 +_PT_ERROR = 18 + + +class Command(BaseCommand): + help = "Corrección de partidas descargado=True con respuestas de error VUCEM." + + def add_arguments(self, parser): + parser.add_argument( + "--organizacion", metavar="UUID", + help="UUID de la organización. Sin este arg: todas las orgs.", + ) + parser.add_argument( + "--pedimento", metavar="UUID", + help="UUID del pedimento a diagnosticar/corregir.", + ) + # Filtros de fecha (aplican sobre fecha_pago del pedimento) + parser.add_argument( + "--fecha-desde", metavar="YYYY-MM-DD", + help="Procesar pedimentos con fecha_pago >= esta fecha.", + ) + parser.add_argument( + "--fecha-hasta", metavar="YYYY-MM-DD", + help="Procesar pedimentos con fecha_pago <= esta fecha.", + ) + # Control de lote + parser.add_argument( + "--offset", type=int, default=0, + help="Saltar los primeros N pedimentos malformados (default: 0).", + ) + parser.add_argument( + "--limit", type=int, default=0, + help="Procesar máximo N pedimentos (default: 0 = todos).", + ) + parser.add_argument( + "--dry-run", action="store_true", + help="Solo diagnóstico, sin aplicar cambios.", + ) + + # ------------------------------------------------------------------ # + # Entry point + # ------------------------------------------------------------------ # + + def handle(self, *args, **options): + org_id = options.get("organizacion") + ped_id = options.get("pedimento") + fecha_desde = options.get("fecha_desde") + fecha_hasta = options.get("fecha_hasta") + offset = options["offset"] + limit = options["limit"] + dry_run = options["dry_run"] + + if dry_run: + self.stdout.write(self.style.WARNING( + "=== MODO PRUEBA (--dry-run): Sin cambios en BD ni storage ===\n" + )) + + if ped_id: + self._handle_single(ped_id, dry_run) + return + + ped_qs = self._malformed_qs() + + if org_id: + ped_qs = ped_qs.filter(organizacion_id=org_id) + if fecha_desde: + ped_qs = ped_qs.filter(fecha_pago__gte=fecha_desde) + if fecha_hasta: + ped_qs = ped_qs.filter(fecha_pago__lte=fecha_hasta) + + ped_qs = ped_qs.select_related("organizacion").order_by("fecha_pago", "pedimento_app") + + total_sin_filtro = ped_qs.count() + + if offset: + ped_qs = ped_qs[offset:] + if limit: + ped_qs = ped_qs[:limit] + + total = ped_qs.count() if not (offset or limit) else min( + limit or total_sin_filtro, max(0, total_sin_filtro - offset) + ) + + self.stdout.write( + f"Pedimentos malformados (total): {total_sin_filtro}\n" + f"Procesando este lote : {total}" + + (f" [offset={offset}]" if offset else "") + + (f" [limit={limit}]" if limit else "") + + "\n" + ) + + if total == 0: + self.stdout.write(self.style.SUCCESS("Nada que corregir en este lote.")) + return + + total_partidas = total_docs = 0 + for ped in ped_qs: + p, d = self._process_pedimento(ped, dry_run) + total_partidas += p + total_docs += d + + self._print_summary(total, total_partidas, total_docs, dry_run) + + # ------------------------------------------------------------------ # + # Flujo --pedimento + # ------------------------------------------------------------------ # + + def _handle_single(self, ped_id, dry_run): + try: + ped = Pedimento.objects.get(id=ped_id) + except Pedimento.DoesNotExist: + raise CommandError(f"Pedimento {ped_id!r} no encontrado.") + + checks = self._field_checks(ped) + self._print_ped_diagnosis(ped, checks) + if not any(checks.values()): + return + self._process_pedimento(ped, dry_run) + + # ------------------------------------------------------------------ # + # Queryset de pedimentos malformados + # ------------------------------------------------------------------ # + + def _malformed_qs(self): + return Pedimento.objects.annotate( + aduana_len=Length("aduana"), + patente_len=Length("patente"), + pedimento_len=Length("pedimento"), + ).filter( + Q(aduana__isnull=True) | Q(aduana="") | Q(aduana_len__lt=3) + | Q(numero_operacion__isnull=True) | Q(numero_operacion="") + | Q(patente__isnull=True) | Q(patente="") | Q(patente_len__lt=4) + | Q(pedimento__isnull=True) | Q(pedimento="") | Q(pedimento_len__lt=7) + ) + + # ------------------------------------------------------------------ # + # Diagnóstico de un pedimento + # ------------------------------------------------------------------ # + + def _field_checks(self, ped): + return { + "aduana (debe tener 3 dígitos)": not ped.aduana or len(ped.aduana.strip()) < 3, + "numero_operacion (obligatorio)": not ped.numero_operacion or not ped.numero_operacion.strip(), + "patente (debe tener 4 dígitos)": not ped.patente or len(ped.patente.strip()) < 4, + "pedimento_fld (debe tener 7 dígitos)": not ped.pedimento or len(ped.pedimento.strip()) < 7, + } + + def _print_ped_diagnosis(self, ped, checks): + es_malo = any(checks.values()) + estado = self.style.ERROR("MALFORMADO") if es_malo else self.style.SUCCESS("VÁLIDO") + self.stdout.write( + f"Pedimento {ped.pedimento_app} (id={ped.id}) → {estado}\n" + f" aduana = {ped.aduana!r} (len={len(ped.aduana or '')})\n" + f" patente = {ped.patente!r} (len={len(ped.patente or '')})\n" + f" numero_op = {ped.numero_operacion!r}\n" + f" pedimento_fld = {ped.pedimento!r} (len={len(ped.pedimento or '')})\n" + ) + for campo, malo in checks.items(): + marca = self.style.ERROR("✗") if malo else self.style.SUCCESS("✓") + self.stdout.write(f" {marca} {campo}") + self.stdout.write("") + + # ------------------------------------------------------------------ # + # Procesamiento de un pedimento malformado + # ------------------------------------------------------------------ # + + def _process_pedimento(self, ped, dry_run): + self.stdout.write( + f"Pedimento: {ped.pedimento_app} | " + f"aduana={ped.aduana!r} patente={ped.patente!r} num_op={ped.numero_operacion!r}" + ) + partidas = Partida.objects.filter(pedimento=ped, descargado=True) + n_partidas = partidas.count() + + if n_partidas == 0: + self.stdout.write(" → Sin partidas con descargado=True\n") + return 0, 0 + + self.stdout.write(f" Partidas con descargado=True: {n_partidas}") + total_docs_error = 0 + + for partida in partidas: + # Documentos de respuesta: excluir REQUEST (17) y los ya marcados ERROR (18) + patron = f"vu_PT_{ped.pedimento_app}_{partida.numero_partida}_" + candidatos = list( + Document.objects.filter( + pedimento=ped, + archivo__icontains=patron, + ).exclude(document_type_id__in=[_PT_REQUEST, _PT_ERROR]) + ) + + self.stdout.write( + f"\n Partida {partida.numero_partida}: {len(candidatos)} doc(s) candidatos a revisar" + ) + + docs_con_error = [] + for doc in candidatos: + # estado: "error" | "ok" | "no_verificable" + estado, motivo = self._check_vucem_error(doc) + if estado == "error": + icono = self.style.ERROR("✗ ERROR VUCEM") + elif estado == "ok": + icono = self.style.SUCCESS("✓ ok") + else: + icono = self.style.WARNING("⚠ sin archivo en storage") + + self.stdout.write(f" [{icono}] type={doc.document_type_id} | {doc.archivo.name}") + + if estado == "error": + self.stdout.write(f" motivo : {motivo}") + new_name = self._build_error_filename( + doc.archivo.name, ped.pedimento_app, partida.numero_partida, len(docs_con_error) + ) + self.stdout.write(f" → {new_name}") + docs_con_error.append(doc) + elif estado == "no_verificable": + self.stdout.write(f" {motivo} — ejecuta en producción para verificar") + + total_docs_error += len(docs_con_error) + + if not dry_run and docs_con_error: + self._apply_fix(partida, docs_con_error, ped.pedimento_app) + + self.stdout.write("") + return n_partidas, total_docs_error + + # ------------------------------------------------------------------ # + # Detección de error VUCEM en el XML + # ------------------------------------------------------------------ # + + def _check_vucem_error(self, doc): + """ + Lee el XML desde MinIO y verifica si VUCEM devolvió un error. + Retorna ("error" | "ok" | "no_verificable", motivo: str | None). + """ + try: + name = doc.archivo.name + if not minio_client.file_exists(name): + return "no_verificable", "archivo no encontrado en storage" + response = minio_client._client.get_object(minio_client._bucket_name, name) + try: + content = response.read() + finally: + response.close() + response.release_conn() + text = content.decode("utf-8", errors="replace") + if "tieneError>true<" in text: + return "error", "tieneError=true detectado en XML" + return "ok", None + except Exception as e: + return "no_verificable", f"excepción al leer archivo: {e}" + + # ------------------------------------------------------------------ # + # Construcción del nombre de archivo de error + # ------------------------------------------------------------------ # + + def _build_error_filename(self, old_name, pedimento_app, numero_partida, index=0): + """ + Retorna la ruta con nomenclatura de error: + index=0 → {dir}/vu_PT_{pedimento_app}_{numero_partida}_ERROR.xml + index>0 → {dir}/vu_PT_{pedimento_app}_{numero_partida}_ERROR_{index}.xml + El índice evita colisión cuando una partida tiene más de un doc con error. + """ + dir_part = posixpath.dirname(old_name) + suffix = f"_{index}" if index > 0 else "" + new_filename = f"vu_PT_{pedimento_app}_{numero_partida}_ERROR{suffix}.xml" + return posixpath.join(dir_part, new_filename) + + # ------------------------------------------------------------------ # + # Aplicación de correcciones + # ------------------------------------------------------------------ # + + @transaction.atomic + def _apply_fix(self, partida, docs, pedimento_app): + """ + Renombra archivos en storage y actualiza BD dentro de una transacción. + Nota: si la transacción revierte, los cambios en storage NO se deshacen. + """ + for idx, doc in enumerate(docs): + new_name = self._build_error_filename( + doc.archivo.name, pedimento_app, partida.numero_partida, idx + ) + final_name = self._rename_in_storage(doc.archivo.name, new_name) + doc.archivo = final_name + doc.document_type_id = _PT_ERROR + doc.vu = True + doc.save(update_fields=["archivo", "document_type_id", "vu"]) + self.stdout.write(self.style.SUCCESS( + f" ✓ Doc {doc.id}: type=18 | {final_name}" + )) + + partida.descargado = False + partida.save(update_fields=["descargado"]) + self.stdout.write(self.style.SUCCESS( + f" ✓ Partida {partida.numero_partida}: descargado=False" + )) + + def _rename_in_storage(self, old_name, new_name): + if old_name == new_name: + return old_name + + if minio_client.file_exists(new_name): + # Rename ya ocurrió en ejecución previa parcial + self.stderr.write(self.style.WARNING( + f" ⚠ ERROR ya existe en storage, usando: {new_name}" + )) + if minio_client.file_exists(old_name): + minio_client.delete_file(old_name) + return new_name + + if not minio_client.file_exists(old_name): + self.stderr.write(self.style.WARNING( + f" ⚠ Archivo no encontrado en storage: {old_name}" + )) + return old_name + + response = minio_client._client.get_object(minio_client._bucket_name, old_name) + try: + content = response.read() + finally: + response.close() + response.release_conn() + + minio_client.upload_file(new_name, file_data=io.BytesIO(content), content_type="application/xml") + minio_client.delete_file(old_name) + return new_name + + # ------------------------------------------------------------------ # + # Resumen final + # ------------------------------------------------------------------ # + + def _print_summary(self, total_peds, total_partidas, total_docs, dry_run): + self.stdout.write( + f"\n{'─' * 60}\nRESUMEN\n" + f" Pedimentos malformados : {total_peds}\n" + f" Partidas con descargado=True : {total_partidas}\n" + f" Documentos con error VUCEM : {total_docs}\n" + ) + if dry_run: + self.stdout.write(self.style.WARNING( + "\nMODO PRUEBA: ejecuta sin --dry-run para aplicar los cambios." + )) + else: + self.stdout.write(self.style.SUCCESS("\nCorrección completada.")) diff --git a/api/customs/migrations/0017_bulkuploadtask.py b/api/customs/migrations/0017_bulkuploadtask.py new file mode 100644 index 0000000..c5387d4 --- /dev/null +++ b/api/customs/migrations/0017_bulkuploadtask.py @@ -0,0 +1,50 @@ +# Generated by Django 5.2.3 on 2026-01-16 00:36 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('customs', '0016_alter_pedimento_unique_together'), + ('organization', '0002_remove_organizacion_membretado_and_more'), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name='BulkUploadTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('contribuyente', models.CharField(blank=True, max_length=255, null=True)), + ('status', models.CharField(choices=[('pending', 'Pendiente'), ('processing', 'Procesando'), ('completed', 'Completado'), ('failed', 'Fallido'), ('partial', 'Parcialmente completado')], default='pending', max_length=20)), + ('task_type', models.CharField(default='bulk_create', max_length=50)), + ('total_files', models.IntegerField(default=0)), + ('processed_files', models.IntegerField(default=0)), + ('created_pedimentos', models.IntegerField(default=0)), + ('created_documents', models.IntegerField(default=0)), + ('result', models.JSONField(blank=True, default=dict)), + ('failed_files', models.JSONField(blank=True, default=list)), + ('error_message', models.TextField(blank=True, null=True)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('started_at', models.DateTimeField(blank=True, null=True)), + ('finished_at', models.DateTimeField(blank=True, null=True)), + ('fecha_pago', models.DateField(blank=True, null=True)), + ('clave_pedimento', models.CharField(blank=True, max_length=50, null=True)), + ('tipo_operacion_id', models.IntegerField(blank=True, null=True)), + ('curp_apoderado', models.CharField(blank=True, max_length=50, null=True)), + ('partidas', models.IntegerField(default=0)), + ('celery_task_id', models.CharField(blank=True, max_length=255, null=True)), + ('organizacion', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='organization.organizacion')), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='bulk_upload_tasks', to=settings.AUTH_USER_MODEL)), + ], + options={ + 'verbose_name': 'Tarea de Carga Masiva', + 'verbose_name_plural': 'Tareas de Carga Masiva', + 'db_table': 'bulk_upload_task', + 'ordering': ['-created_at'], + }, + ), + ] diff --git a/api/customs/migrations/0018_alter_pedimento_unique_together_and_more.py b/api/customs/migrations/0018_alter_pedimento_unique_together_and_more.py new file mode 100644 index 0000000..c333f74 --- /dev/null +++ b/api/customs/migrations/0018_alter_pedimento_unique_together_and_more.py @@ -0,0 +1,21 @@ +# Generated by Django 5.2.3 on 2026-03-06 19:35 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('customs', '0017_bulkuploadtask'), + ('organization', '0002_remove_organizacion_membretado_and_more'), + ] + + operations = [ + migrations.AlterUniqueTogether( + name='pedimento', + unique_together={('organizacion', 'pedimento_app')}, + ), + migrations.DeleteModel( + name='BulkUploadTask', + ), + ] diff --git a/api/customs/migrations/0019_pedimento_consultar_vucem.py b/api/customs/migrations/0019_pedimento_consultar_vucem.py new file mode 100644 index 0000000..5d43b82 --- /dev/null +++ b/api/customs/migrations/0019_pedimento_consultar_vucem.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.3 on 2026-05-19 14:24 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('customs', '0018_alter_pedimento_unique_together_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='pedimento', + name='consultar_vucem', + field=models.BooleanField(default=False, help_text='Solo pedimentos originados desde datastage deben consultar VUCEM automáticamente'), + ), + ] diff --git a/api/customs/tasks/auditoria.py b/api/customs/tasks/auditoria.py index aa9e770..aaf2ff2 100644 --- a/api/customs/tasks/auditoria.py +++ b/api/customs/tasks/auditoria.py @@ -6,9 +6,50 @@ from api.customs.models import ProcesamientoPedimento, Pedimento, Cove, EDocumen from core.utils import xml_controller import requests from core.utils import xml_remesas_controller +from core.redis_events import publish_task_event import logging + logger = logging.getLogger(__name__) + +def _crear_notificacion_auditoria(user_id: str, task_id: str, label: str, resultado: dict): + """Crea una Notificacion persistente cuando una tarea de auditoría masiva completa.""" + try: + from api.notificaciones.models import Notificacion, TipoNotificacion + from api.cuser.models import CustomUser + + tipo, _ = TipoNotificacion.objects.get_or_create( + tipo="auditoria_completada", + defaults={"descripcion": "Auditoría masiva completada"}, + ) + usuario = CustomUser.objects.filter(id=user_id).first() + if not usuario: + return + + total = resultado.get('total_pedimentos', 0) + completados = resultado.get('completados', resultado.get('procesados', 0)) + pendientes = resultado.get('con_pendientes', 0) + errores = resultado.get('con_errores', 0) + + partes = [f"Auditoría de {label} completada — {completados}/{total} pedimentos"] + if pendientes: + partes.append(f"{pendientes} con pendientes") + if errores: + partes.append(f"{errores} con errores") + + Notificacion.objects.create( + tipo=tipo, + dirigido=usuario, + mensaje=", ".join(partes), + datos={ + "task_id": task_id, + "label": label, + "resultado": resultado, + }, + ) + except Exception as exc: + logger.error(f"[auditoria] Error creando notificación para tarea {task_id}: {exc}") + def obtener_pedimentos(organizacion_id): return Pedimento.objects.filter(organizacion_id=organizacion_id) @@ -129,19 +170,22 @@ def auditar_procesamiento_remesa_por_pedimento(pedimento_id): 'pedimento_id': str(pedimento_id) } -@shared_task -def crear_partidas(organizacion_id): +@shared_task(bind=True) +def crear_partidas(self, organizacion_id, user_id=None): from api.customs.models import Partida + task_id = self.request.id pedimentos = obtener_pedimentos(organizacion_id) total_pedimentos = pedimentos.count() + publish_task_event(task_id, "processing", f"Creando partidas: {total_pedimentos} pedimentos", progress=0) + completados = [] con_pendientes = [] sin_datos = [] errores = [] - for pedimento in pedimentos: + for idx, pedimento in enumerate(pedimentos): try: if not pedimento.numero_partidas or pedimento.numero_partidas <= 0: sin_datos.append({ @@ -180,8 +224,13 @@ def crear_partidas(organizacion_id): }) logger.error(f"Error creando partidas para pedimento {pedimento.id}: {e}") - return { + if total_pedimentos > 0 and (idx + 1) % 10 == 0: + pct = int(((idx + 1) / total_pedimentos) * 100) + publish_task_event(task_id, "processing", f"Creando partidas: {idx + 1}/{total_pedimentos}", progress=pct) + + resultado = { 'organizacion_id': str(organizacion_id), + 'auditoria': 'partidas', 'total_pedimentos': total_pedimentos, 'completados': len(completados), 'con_pendientes': len(con_pendientes), @@ -192,6 +241,12 @@ def crear_partidas(organizacion_id): 'detalle_errores': errores, } + publish_task_event(task_id, "completed", "Creación de partidas completada", resultado=resultado, progress=100) + if user_id: + _crear_notificacion_auditoria(user_id, task_id, "Partidas", resultado) + + return resultado + @shared_task def crear_partidas_por_pedimento(pedimento_id): try: @@ -230,19 +285,23 @@ def crear_partidas_por_pedimento(pedimento_id): print(f"Error: Pedimento {pedimento_id} tiene numero_partidas inválido: {pedimento.numero_partidas}") logger.info(f"Error: Pedimento {pedimento_id} tiene numero_partidas inválido: {pedimento.numero_partidas}") -def _auditar_organizacion(organizacion_id, servicio, related_name, variable, label): +def _auditar_organizacion(organizacion_id, servicio, related_name, variable, label, task_id=None, user_id=None): """ Itera todos los pedimentos de una organización auditando el campo `variable` en la relación `related_name`. Retorna un resumen estructurado por pedimento. + Publica eventos SSE en Redis si se proporciona task_id. """ pedimentos = obtener_pedimentos(organizacion_id) total_pedimentos = pedimentos.count() + if task_id: + publish_task_event(task_id, "processing", f"Auditando {label}: {total_pedimentos} pedimentos", progress=0) + completados = [] pendientes = [] errores = [] - for pedimento in pedimentos: + for idx, pedimento in enumerate(pedimentos): try: docs = list(getattr(pedimento, related_name).all()) total = len(docs) @@ -266,6 +325,11 @@ def _auditar_organizacion(organizacion_id, servicio, related_name, variable, lab modificar_estado_procesamiento(pedimento, servicio_id=servicio, nuevo_estado=nuevo_estado) + # Publicar progreso cada 10 pedimentos para no saturar Redis + if task_id and total_pedimentos > 0 and (idx + 1) % 10 == 0: + pct = int(((idx + 1) / total_pedimentos) * 100) + publish_task_event(task_id, "processing", f"Auditando {label}: {idx + 1}/{total_pedimentos}", progress=pct) + except Exception as e: errores.append({ 'pedimento_id': str(pedimento.id), @@ -274,7 +338,7 @@ def _auditar_organizacion(organizacion_id, servicio, related_name, variable, lab }) logger.error(f"Error auditando pedimento {pedimento.id} [{label}]: {e}") - return { + resultado = { 'organizacion_id': str(organizacion_id), 'auditoria': label, 'total_pedimentos': total_pedimentos, @@ -285,73 +349,89 @@ def _auditar_organizacion(organizacion_id, servicio, related_name, variable, lab 'detalle_errores': errores, } + if task_id: + publish_task_event(task_id, "completed", f"Auditoría de {label} completada", resultado=resultado, progress=100) + if user_id: + _crear_notificacion_auditoria(user_id, task_id, label, resultado) -@shared_task -def auditar_coves(organizacion_id): + return resultado + + +@shared_task(bind=True) +def auditar_coves(self, organizacion_id, user_id=None): return _auditar_organizacion( organizacion_id, servicio=8, related_name='coves', variable='cove_descargado', label='cove', + task_id=self.request.id, + user_id=user_id, ) -@shared_task -def auditar_acuse_cove(organizacion_id): +@shared_task(bind=True) +def auditar_acuse_cove(self, organizacion_id, user_id=None): return _auditar_organizacion( organizacion_id, servicio=9, related_name='coves', variable='acuse_cove_descargado', label='acuse_cove', + task_id=self.request.id, + user_id=user_id, ) -@shared_task -def auditar_edocuments(organizacion_id): +@shared_task(bind=True) +def auditar_edocuments(self, organizacion_id, user_id=None): return _auditar_organizacion( organizacion_id, servicio=7, related_name='documentos', variable='edocument_descargado', label='edocument', + task_id=self.request.id, + user_id=user_id, ) -@shared_task -def auditar_acuse(organizacion_id): +@shared_task(bind=True) +def auditar_acuse(self, organizacion_id, user_id=None): return _auditar_organizacion( organizacion_id, servicio=6, related_name='documentos', variable='acuse_descargado', label='acuse', + task_id=self.request.id, + user_id=user_id, ) -@shared_task -def auditar_remesas(organizacion_id): +@shared_task(bind=True) +def auditar_remesas(self, organizacion_id, user_id=None): """ Audita el estado de descarga de remesas para todos los pedimentos de una organización. A diferencia de coves/edocuments, las remesas no tienen campo booleano propio — se verifica la existencia de un documento de tipo 3 (Remesa) en el pedimento. """ + task_id = self.request.id pedimentos = obtener_pedimentos(organizacion_id) total_pedimentos = pedimentos.count() + if task_id: + publish_task_event(task_id, "processing", f"Auditando remesas: {total_pedimentos} pedimentos", progress=0) + completados = [] pendientes = [] errores = [] - for pedimento in pedimentos: + for idx, pedimento in enumerate(pedimentos): try: if not pedimento.remesas: - # El pedimento no declara remesas — no aplica, marcar como completado modificar_estado_procesamiento(pedimento, servicio_id=5, nuevo_estado=3) completados.append(str(pedimento.id)) elif pedimento.documents.filter(document_type=3).exists(): - # Documento de remesa ya descargado modificar_estado_procesamiento(pedimento, servicio_id=5, nuevo_estado=3) completados.append(str(pedimento.id)) else: - # Tiene remesas declaradas pero el documento aún no existe modificar_estado_procesamiento(pedimento, servicio_id=5, nuevo_estado=4) pendientes.append({ 'pedimento_id': str(pedimento.id), @@ -365,7 +445,11 @@ def auditar_remesas(organizacion_id): }) logger.error(f"Error auditando remesa de pedimento {pedimento.id}: {e}") - return { + if task_id and total_pedimentos > 0 and (idx + 1) % 10 == 0: + pct = int(((idx + 1) / total_pedimentos) * 100) + publish_task_event(task_id, "processing", f"Auditando remesas: {idx + 1}/{total_pedimentos}", progress=pct) + + resultado = { 'organizacion_id': str(organizacion_id), 'auditoria': 'remesa', 'total_pedimentos': total_pedimentos, @@ -376,6 +460,13 @@ def auditar_remesas(organizacion_id): 'detalle_errores': errores, } + if task_id: + publish_task_event(task_id, "completed", "Auditoría de remesas completada", resultado=resultado, progress=100) + if user_id: + _crear_notificacion_auditoria(user_id, task_id, "Remesas", resultado) + + return resultado + @shared_task def auditar_cove_por_pedimento(pedimento_id): try: diff --git a/api/customs/tasks/auto_corregir.py b/api/customs/tasks/auto_corregir.py new file mode 100644 index 0000000..0deaff9 --- /dev/null +++ b/api/customs/tasks/auto_corregir.py @@ -0,0 +1,477 @@ +""" +Tarea Celery: auto-corrección de pedimentos incompletos a partir de sus XMLs. + +Busca pedimentos con consultar_vucem=False, analiza su documento XML más reciente +en busca de una respuesta consultarPedimentoCompleto de VUCEM, y si el número de +pedimento coincide, auto-corrige los campos faltantes en BD y reclasifica el documento. + +Campos corregidos (solo si están vacíos/nulos en BD): + numero_operacion, aduana, clave_pedimento, regimen, contribuyente (por RFC). + +Acciones sobre el documento si el tipo no es 2 (Pedimento Completo): + - Renombra el archivo en MinIO: vu_PC_{pedimento_app}.xml + - Actualiza document_type_id → 2 + - Actualiza vu → False (tipo 2 no es VUCEM directo) + +Al finalizar activa consultar_vucem=True en el pedimento. +""" +import io +import logging +import posixpath +import xml.etree.ElementTree as ET + +from celery import shared_task +from django.db import transaction + +from api.customs.models import Importador, Pedimento, Regimen +from api.record.models import Document +from api.utils.minio_client import minio_client +from core.redis_events import publish_task_event + +logger = logging.getLogger('api.customs.tasks.auto_corregir') + +_DOC_TYPE_PC = 2 # Pedimento Completo (ya procesado — no volver a procesar) +_PROGRESS_INTERVAL = 10 # Emitir progreso cada N pedimentos + +# Tipos excluidos de la búsqueda: +# 1 = Pedimento Partida (no contiene respuesta PC) +# 2 = Pedimento Completo (ya procesado) +# 13–26 = Tipos VUCEM: requests, errors de VU (peticiones salientes, no respuestas de contenido) +_EXCLUDE_DOC_TYPES = frozenset(range(13, 27)) | {1, _DOC_TYPE_PC} + + +# ────────────────────────────────────────────── +# Helpers XML (namespace-agnostic) +# ────────────────────────────────────────────── + +def _local(tag): + return tag.split('}')[-1] if '}' in tag else tag + + +def _find_text(root, local_name): + """Primer elemento con ese nombre local; retorna su texto o None.""" + for el in root.iter(): + if _local(el.tag) == local_name: + text = (el.text or '').strip() + return text or None + return None + + +def _find_child_text(root, parent_name, child_name): + """Texto del hijo directo child_name dentro del primer parent_name encontrado.""" + for el in root.iter(): + if _local(el.tag) == parent_name: + for child in el: + if _local(child.tag) == child_name: + text = (child.text or '').strip() + return text or None + return None + + +def _find_pedimento_number(root): + """ + Extrae el número de pedimento de la estructura anidada: + ← contenedor + XXXX ← número + """ + for el in root.iter(): + if _local(el.tag) == 'pedimento': + for child in el: + if _local(child.tag) == 'pedimento': + text = (child.text or '').strip() + return text or None + return None + + +# ────────────────────────────────────────────── +# Helpers MinIO +# ────────────────────────────────────────────── + +def _read_from_minio(object_name): + if not minio_client.file_exists(object_name): + return None + response = minio_client._client.get_object(minio_client._bucket_name, object_name) + try: + return response.read() + finally: + response.close() + response.release_conn() + + +def _rename_in_minio(old_name, new_name, content): + if old_name == new_name: + return old_name + # Si ya existe en destino (ejecución previa parcial): limpiar origen + if minio_client.file_exists(new_name): + if minio_client.file_exists(old_name): + minio_client.delete_file(old_name) + return new_name + minio_client.upload_file(new_name, file_data=io.BytesIO(content), content_type='application/xml') + minio_client.delete_file(old_name) + return new_name + + +def _resolve_regimen(clave_pedimento, tipo_operacion_raw): + """ + Convierte clave_documento + tipo_operacion del XML al código de régimen, + replicando la lógica de carga de datastage: + Regimen.objects.filter(claveped=clave_pedimento, tipo=tipo_int).regimenped + """ + if not clave_pedimento or not tipo_operacion_raw: + return None + try: + tipo_int = int(tipo_operacion_raw) + except (ValueError, TypeError): + return None + regimen_obj = Regimen.objects.filter(claveped=clave_pedimento, tipo=tipo_int).first() + return regimen_obj.regimenped if regimen_obj else None + + +def _find_pc_document(pedimento): + """ + Busca entre los XMLs del pedimento el primero que contenga una respuesta + consultarPedimentoCompleto de VUCEM. + + Tipos incluidos: 3–12 (documentos de contenido: pedimento, remesas, acuse, + edocument, estado, cove, digitalizacion, error, general). + Tipos excluidos: 1 (partida), 2 (ya procesado), 13–26 (peticiones/errores VU). + + Retorna (doc, content_bytes, object_name, hay_candidatos): + - hay_candidatos=False → ningún XML candidato en BD + - hay_candidatos=True, doc=None → hay XMLs pero ninguno es respuesta PC + - doc!=None → encontrado + """ + qs = ( + Document.objects.filter( + pedimento=pedimento, + archivo__iendswith='.xml', + ) + .exclude(document_type_id__in=_EXCLUDE_DOC_TYPES) + .order_by('-created_at') + ) + + hay_candidatos = False + for doc in qs: + if not doc.archivo: + continue + hay_candidatos = True + object_name = doc.archivo.name + try: + content = _read_from_minio(object_name) + except Exception as exc: + logger.debug(f"[find_pc] {pedimento.pedimento_app} — error MinIO {object_name}: {exc}") + continue + if not content: + continue + if b'consultarPedimentoCompletoRespuesta' in content: + return doc, content, object_name, True + + return None, None, None, hay_candidatos + + +# ────────────────────────────────────────────── +# Tarea principal +# ────────────────────────────────────────────── + +@shared_task(bind=True, name='auto_corregir_pedamentos') +def auto_corregir_pedamentos_task(self, organizacion_id, pedimento_id=None): + """ + Itera pedimentos con consultar_vucem=False de la organización. + Si se proporciona pedimento_id, procesa solo ese pedimento. + Por cada uno verifica si tiene un XML de pedimento completo válido + y corrige BD + storage. + """ + task_id = self.request.id + revisados = 0 + corregidos = 0 + ignorados = 0 + detalles = [] + + qs = Pedimento.objects.filter(consultar_vucem=False).order_by('pedimento_app') + if pedimento_id: + qs = qs.filter(id=pedimento_id) + else: + qs = qs.filter(organizacion_id=organizacion_id) + + total = qs.count() + logger.info(f"[auto_corregir] org={organizacion_id} — {total} pedimentos a revisar") + + publish_task_event(task_id, 'processing', f'Iniciando: {total} pedimentos a revisar', progress=0) + + for idx, pedimento in enumerate(qs.iterator(chunk_size=100)): + revisados += 1 + + if total > 0 and (idx % _PROGRESS_INTERVAL == 0 or idx == total - 1): + pct = int(((idx + 1) / total) * 95) + publish_task_event( + task_id, 'processing', + f'Revisando {idx + 1}/{total}: {pedimento.pedimento_app}', + progress=pct, + ) + + # Buscar XML con respuesta de pedimento completo (evalúa todos, VUCEM primero) + try: + candidato, content, object_name, hay_candidatos = _find_pc_document(pedimento) + except Exception as exc: + logger.warning(f"[auto_corregir] {pedimento.pedimento_app} — error buscando PC: {exc}") + ignorados += 1 + continue + + if not candidato: + ignorados += 1 + continue + + try: + root = ET.fromstring(content) + except ET.ParseError as exc: + logger.warning(f"[auto_corregir] {pedimento.pedimento_app} — XML inválido: {exc}") + ignorados += 1 + continue + + tiene_error = _find_text(root, 'tieneError') + if tiene_error and tiene_error.lower() == 'true': + ignorados += 1 + continue + + pedimento_xml = _find_pedimento_number(root) + pedimento_bd = (pedimento.pedimento or '').strip() + if not pedimento_xml or pedimento_xml != pedimento_bd: + logger.info( + f"[auto_corregir] {pedimento.pedimento_app} — número no coincide " + f"(XML={pedimento_xml!r}, BD={pedimento_bd!r})" + ) + ignorados += 1 + continue + + # ── Extracción de campos ────────────────── + numero_operacion = _find_text(root, 'numeroOperacion') + aduana = _find_child_text(root, 'aduanaEntradaSalida', 'clave') + clave_pedimento = _find_child_text(root, 'claveDocumento', 'clave') + tipo_operacion_raw = _find_child_text(root, 'tipoOperacion', 'clave') + regimen = _resolve_regimen(clave_pedimento, tipo_operacion_raw) + rfc = _find_child_text(root, 'importadorExportador', 'rfc') + + ped_fields = [] + if numero_operacion and not pedimento.numero_operacion: + pedimento.numero_operacion = numero_operacion + ped_fields.append('numero_operacion') + if aduana and aduana != (pedimento.aduana or '').strip(): + pedimento.aduana = aduana + ped_fields.append('aduana') + if clave_pedimento and clave_pedimento != (pedimento.clave_pedimento or '').strip(): + pedimento.clave_pedimento = clave_pedimento + ped_fields.append('clave_pedimento') + if regimen and not pedimento.regimen: + pedimento.regimen = regimen + ped_fields.append('regimen') + + if rfc: + try: + importador = Importador.objects.get(rfc=rfc) + if pedimento.contribuyente_id != importador.rfc: + pedimento.contribuyente_id = importador.rfc + ped_fields.append('contribuyente') + except Importador.DoesNotExist: + pass + + pedimento.consultar_vucem = True + ped_fields.append('consultar_vucem') + + # ── Renombrado de documento si no es tipo 2 ── + doc_fields = ['document_type_id', 'vu'] + final_object_name = object_name + + if candidato.document_type_id != _DOC_TYPE_PC: + dir_part = posixpath.dirname(object_name) + new_filename = f"vu_PC_{pedimento.pedimento_app}.xml" + new_object_name = posixpath.join(dir_part, new_filename) + try: + final_object_name = _rename_in_minio(object_name, new_object_name, content) + doc_fields.append('archivo') + except Exception as exc: + logger.error(f"[auto_corregir] {pedimento.pedimento_app} — error renombrando en MinIO: {exc}") + + # ── Persistir cambios en BD ─────────────── + try: + with transaction.atomic(): + pedimento.save(update_fields=ped_fields) + candidato.document_type_id = _DOC_TYPE_PC + candidato.vu = False + if 'archivo' in doc_fields: + candidato.archivo = final_object_name + candidato.save(update_fields=doc_fields) + except Exception as exc: + logger.error(f"[auto_corregir] {pedimento.pedimento_app} — error guardando en BD: {exc}") + ignorados += 1 + continue + + corregidos += 1 + detalles.append({ + 'pedimento': pedimento.pedimento_app, + 'accion': 'corregido', + 'campos_pedimento': ped_fields, + 'documento_final': final_object_name, + }) + logger.info(f"[auto_corregir] {pedimento.pedimento_app} — corregido: {ped_fields}") + + # Modo individual: encolar el procesamiento completo (remesas, partidas, + # coves, edocs) forzando aunque ya exista el documento tipo 2. + if pedimento_id: + try: + from .microservice_v2 import procesar_pedimento_completo_individual + procesar_pedimento_completo_individual.delay(str(pedimento.id), force=True) + logger.info(f"[auto_corregir] {pedimento.pedimento_app} — PC completo encolado (force)") + except Exception as exc: + logger.warning(f"[auto_corregir] {pedimento.pedimento_app} — no se pudo encolar PC: {exc}") + + resultado = { + 'total_revisados': revisados, + 'corregidos': corregidos, + 'ignorados': ignorados, + 'detalles': detalles, + } + logger.info(f"[auto_corregir] org={organizacion_id} finalizado — {resultado}") + + publish_task_event(task_id, 'completed', 'Auto-corrección finalizada', resultado=resultado, progress=100) + return resultado + + +# ────────────────────────────────────────────── +# Tarea de análisis (sin modificar nada) +# ────────────────────────────────────────────── + +def _campos_a_corregir(pedimento, numero_operacion, aduana, clave_pedimento, regimen, rfc): + """Retorna la lista de campos que se corregirían y los valores que se asignarían.""" + campos = [] + if numero_operacion and not pedimento.numero_operacion: + campos.append({'campo': 'numero_operacion', 'valor_actual': None, 'valor_nuevo': numero_operacion}) + if aduana and aduana != (pedimento.aduana or '').strip(): + campos.append({'campo': 'aduana', 'valor_actual': pedimento.aduana, 'valor_nuevo': aduana}) + if clave_pedimento and clave_pedimento != (pedimento.clave_pedimento or '').strip(): + campos.append({'campo': 'clave_pedimento', 'valor_actual': pedimento.clave_pedimento, 'valor_nuevo': clave_pedimento}) + if regimen and not pedimento.regimen: + campos.append({'campo': 'regimen', 'valor_actual': None, 'valor_nuevo': regimen}) + if rfc: + try: + importador = Importador.objects.get(rfc=rfc) + if pedimento.contribuyente_id != importador.rfc: + campos.append({ + 'campo': 'contribuyente', + 'valor_actual': pedimento.contribuyente_id, + 'valor_nuevo': rfc, + }) + except Importador.DoesNotExist: + pass + return campos + + +@shared_task(bind=True, name='auditar_pedamentos_incompletos') +def auditar_pedamentos_incompletos_task(self, organizacion_id, pedimento_id=None): + """ + Análisis de solo lectura: reporta qué pedimentos serían corregidos y qué + cambios se aplicarían, sin modificar BD ni storage. + Si se proporciona pedimento_id, analiza solo ese pedimento. + """ + task_id = self.request.id + revisados = 0 + corregibles = [] + sin_xml = 0 + xml_sin_pc = 0 + num_no_coincide = 0 + con_error_vucem = 0 + + # Individual: analiza el pedimento específico sin importar su estado de corrección. + # Masivo: solo los pendientes (consultar_vucem=False). + if pedimento_id: + qs = Pedimento.objects.filter(id=pedimento_id).order_by('pedimento_app') + else: + qs = Pedimento.objects.filter( + organizacion_id=organizacion_id, consultar_vucem=False + ).order_by('pedimento_app') + + total = qs.count() + logger.info(f"[auditar_incompletos] org={organizacion_id} — {total} pedimentos a analizar") + + publish_task_event(task_id, 'processing', f'Iniciando análisis: {total} pedimentos', progress=0) + + for idx, pedimento in enumerate(qs.iterator(chunk_size=100)): + revisados += 1 + + if total > 0 and (idx % _PROGRESS_INTERVAL == 0 or idx == total - 1): + pct = int(((idx + 1) / total) * 95) + publish_task_event( + task_id, 'processing', + f'Analizando {idx + 1}/{total}: {pedimento.pedimento_app}', + progress=pct, + ) + + # Buscar XML con respuesta de pedimento completo (evalúa todos, VUCEM primero) + try: + candidato, content, object_name, hay_candidatos = _find_pc_document(pedimento) + except Exception as exc: + logger.warning(f"[auditar_incompletos] {pedimento.pedimento_app} — error buscando PC: {exc}") + sin_xml += 1 + continue + + if not candidato: + if hay_candidatos: + xml_sin_pc += 1 + else: + sin_xml += 1 + continue + + try: + root = ET.fromstring(content) + except ET.ParseError: + xml_sin_pc += 1 + continue + + tiene_error = _find_text(root, 'tieneError') + if tiene_error and tiene_error.lower() == 'true': + con_error_vucem += 1 + continue + + pedimento_xml = _find_pedimento_number(root) + pedimento_bd = (pedimento.pedimento or '').strip() + if not pedimento_xml or pedimento_xml != pedimento_bd: + num_no_coincide += 1 + continue + + numero_operacion = _find_text(root, 'numeroOperacion') + aduana = _find_child_text(root, 'aduanaEntradaSalida', 'clave') + clave_pedimento = _find_child_text(root, 'claveDocumento', 'clave') + tipo_operacion_raw = _find_child_text(root, 'tipoOperacion', 'clave') + regimen = _resolve_regimen(clave_pedimento, tipo_operacion_raw) + rfc = _find_child_text(root, 'importadorExportador', 'rfc') + + campos = _campos_a_corregir(pedimento, numero_operacion, aduana, clave_pedimento, regimen, rfc) + + dir_part = posixpath.dirname(object_name) + nombre_pc = posixpath.join(dir_part, f"vu_PC_{pedimento.pedimento_app}.xml") + + corregibles.append({ + 'pedimento_app': pedimento.pedimento_app, + 'pedimento_id': str(pedimento.id), + 'documento_actual': { + 'id': str(candidato.id), + 'archivo': object_name, + 'document_type_id': candidato.document_type_id, + }, + 'documento_nuevo_nombre': nombre_pc if candidato.document_type_id != _DOC_TYPE_PC else None, + 'campos_a_corregir': campos, + 'consultar_vucem': True, + }) + + resultado = { + 'total_revisados': revisados, + 'corregibles': len(corregibles), + 'sin_xml_o_ilegible': sin_xml, + 'xml_no_es_pedimento_completo': xml_sin_pc, + 'numero_pedimento_no_coincide': num_no_coincide, + 'con_error_vucem': con_error_vucem, + 'pedimentos': corregibles, + } + logger.info(f"[auditar_incompletos] org={organizacion_id} finalizado — {resultado}") + + publish_task_event(task_id, 'completed', 'Análisis finalizado', resultado=resultado, progress=100) + return resultado diff --git a/api/customs/tasks/internal_services.py b/api/customs/tasks/internal_services.py index cf20ac4..25f3cd9 100644 --- a/api/customs/tasks/internal_services.py +++ b/api/customs/tasks/internal_services.py @@ -1,6 +1,9 @@ +import logging from celery import shared_task, group from api.customs.models import ProcesamientoPedimento, Pedimento, Cove, EDocument from core.utils import xml_controller +from core.redis_events import publish_task_event +from api.customs.tasks.auditoria import _crear_notificacion_auditoria from api.customs.tasks.microservice import ( procesar_cove_individual, procesar_acuse_individual, @@ -180,51 +183,88 @@ def crear_servicios(organizacion_id): crear_procesamiento_acuse_cove.apply_async(args=[str(pedimento.id)]) crear_procesamiento_edocument.apply_async(args=[str(pedimento.id)]) -@shared_task -def auditar_pedimentos(organizacion_id): +@shared_task(bind=True) +def auditar_pedimentos(self, organizacion_id, user_id=None): + _logger = logging.getLogger('api.customs.async_operations') + task_id = self.request.id pedimentos = Pedimento.objects.filter(organizacion_id=organizacion_id) - for pedimento in pedimentos: + total_pedimentos = pedimentos.count() + + publish_task_event(task_id, "processing", f"Auditando pedimentos: {total_pedimentos} pedimentos", progress=0) + + procesados = 0 + sin_xml = 0 + errores = [] + + for idx, pedimento in enumerate(pedimentos): pc = pedimento.documents.filter(document_type__id=2).first() if pc: - with open(f'./media/{pc.archivo}', 'r') as f: - xml_content = f.read() - - xml_data = xml_controller.extract_data(xml_content) - - pedimento.numero_operacion = xml_data.get('numero_operacion') - pedimento.curp_apoderado = xml_data.get('curp_apoderado') - pedimento.agente_aduanal = xml_data.get('agente_aduanal') - pedimento.numero_partidas = xml_data.get('numero_partidas') - pedimento.remesas = xml_data.get('remesas') - pedimento.tipo_operacion__id = xml_data.get('tipo_operacion') - pedimento.fecha_pago = xml_data.get('fecha_pago') - pedimento.pedimento_app = xml_data.get('fecha_pago')[2:4] + "-" + pedimento.aduana[:2] + "-" + pedimento.patente + "-" + pedimento.pedimentodd - - for edoc in xml_data.get('edocuments', []): - EDocument.objects.get_or_create( - pedimento=pedimento, - organizacion=pedimento.organizacion, - clave=edoc.get('clave'), - descripcion=edoc.get('descripcion'), - numero_edocument=edoc.get('complemento1') - ) - - from django.db import IntegrityError try: - for cove in xml_data.get('coves', []): - try: - Cove.objects.get_or_create( - pedimento=pedimento, - organizacion=pedimento.organizacion, - numero_cove=cove - ) - except IntegrityError: - # Si ya existe por unique, recupera el objeto existente - Cove.objects.get(numero_cove=cove) - except: - # Si ya existe por unique, recupera el objeto existente - pass + with open(f'./media/{pc.archivo}', 'r') as f: + xml_content = f.read() + + xml_data = xml_controller.extract_data(xml_content) + + pedimento.numero_operacion = xml_data.get('numero_operacion') + pedimento.curp_apoderado = xml_data.get('curp_apoderado') + pedimento.agente_aduanal = xml_data.get('agente_aduanal') + pedimento.numero_partidas = xml_data.get('numero_partidas') + pedimento.remesas = xml_data.get('remesas') + pedimento.tipo_operacion__id = xml_data.get('tipo_operacion') + pedimento.fecha_pago = xml_data.get('fecha_pago') + pedimento.pedimento_app = xml_data.get('fecha_pago')[2:4] + "-" + pedimento.aduana[:2] + "-" + pedimento.patente + "-" + pedimento.pedimentodd + + for edoc in xml_data.get('edocuments', []): + EDocument.objects.get_or_create( + pedimento=pedimento, + organizacion=pedimento.organizacion, + clave=edoc.get('clave'), + descripcion=edoc.get('descripcion'), + numero_edocument=edoc.get('complemento1') + ) + + from django.db import IntegrityError + try: + for cove in xml_data.get('coves', []): + try: + Cove.objects.get_or_create( + pedimento=pedimento, + organizacion=pedimento.organizacion, + numero_cove=cove + ) + except IntegrityError: + # Si ya existe por unique, recupera el objeto existente + Cove.objects.get(numero_cove=cove) + except: + pass + + procesados += 1 + except Exception as e: + errores.append({'pedimento_id': str(pedimento.id), 'error': str(e)}) + _logger.error(f"Error auditando pedimento {pedimento.id}: {e}") + else: + sin_xml += 1 + + if total_pedimentos > 0 and (idx + 1) % 10 == 0: + pct = int(((idx + 1) / total_pedimentos) * 100) + publish_task_event(task_id, "processing", f"Auditando pedimentos: {idx + 1}/{total_pedimentos}", progress=pct) + + resultado = { + 'organizacion_id': str(organizacion_id), + 'auditoria': 'pedimentos', + 'total_pedimentos': total_pedimentos, + 'procesados': procesados, + 'sin_xml': sin_xml, + 'con_errores': len(errores), + 'detalle_errores': errores, + } + + publish_task_event(task_id, "completed", "Auditoría de pedimentos completada", resultado=resultado, progress=100) + if user_id: + _crear_notificacion_auditoria(user_id, task_id, "Pedimentos", resultado) + + return resultado @shared_task def crear_todos_los_servicios(): diff --git a/api/customs/tasks/microservice_v2.py b/api/customs/tasks/microservice_v2.py index 1ff99a8..8e1daa4 100644 --- a/api/customs/tasks/microservice_v2.py +++ b/api/customs/tasks/microservice_v2.py @@ -256,9 +256,9 @@ def procesar_remesas_pedimento(pedimento_id): raise @shared_task -def procesar_pedimento_completo_individual(pedimento_id): +def procesar_pedimento_completo_individual(pedimento_id, force=False): pedimento = Pedimento.objects.get(id=pedimento_id) - if not pedimento.documents.filter(document_type=2).exists(): # Tipo 2: Pedimento Completo + if force or not pedimento.documents.filter(document_type=2).exists(): # Tipo 2: Pedimento Completo pedimento_dict = pedimento_to_dict(pedimento) credenciales = Vucem.objects.filter( id=CredencialesImportador.objects.filter(rfc=pedimento.contribuyente).first().vucem.id diff --git a/api/customs/urls.py b/api/customs/urls.py index 1859957..de45209 100644 --- a/api/customs/urls.py +++ b/api/customs/urls.py @@ -63,6 +63,10 @@ from .views_auditor import ( auditor_obtener_respuesta_edocument_vu, auditar_pedimento_endpoint, procesar_pedimento_completo_endpoint, + auto_corregir_pedamentos_endpoint, + auditar_pedamentos_incompletos_endpoint, + auditar_pedamento_incompleto_endpoint, + auto_corregir_pedamento_endpoint, ) urlpatterns = [ @@ -82,6 +86,10 @@ urlpatterns = [ path('auditor/auditar-remesa/pedimento/', auditar_procesamiento_remesa_pedimento_endpoint, name='auditar-remesa-pedimento'), path('auditor/auditar-pedimento/', auditar_pedimento_endpoint, name='auditar-pedimento'), path('auditor/procesar-pedimento-completo/pedimento/', procesar_pedimento_completo_endpoint, name='procesar-pedimento-completo-pedimento'), + path('auditor/auto-corregir-pedamentos/', auto_corregir_pedamentos_endpoint, name='auto-corregir-pedamentos'), + path('auditor/auditar-pedamentos-incompletos/', auditar_pedamentos_incompletos_endpoint, name='auditar-pedamentos-incompletos'), + path('auditor/auto-corregir-pedamento/', auto_corregir_pedamento_endpoint, name='auto-corregir-pedamento'), + path('auditor/auditar-pedamento-incompleto/', auditar_pedamento_incompleto_endpoint, name='auditar-pedamento-incompleto'), path('auditor/procesar-pedimentos/organizaciones/', auditor_procesar_pedimentos_organizacion, name='procesar-pedimentos-organizaciones'), path('auditor/peticion-respuesta/pedimento-vu/', auditar_peticion_respuesta_pedimento_completo, name='peticion-respuesta-pedimento-vu'), diff --git a/api/customs/views_auditor.py b/api/customs/views_auditor.py index 21be114..5d16030 100644 --- a/api/customs/views_auditor.py +++ b/api/customs/views_auditor.py @@ -16,6 +16,7 @@ from .tasks.auditoria import ( ) from .tasks.internal_services import auditar_pedimentos from .tasks.microservice_v2 import procesar_pedimentos_completos, procesar_pedimento_completo_individual +from .tasks.auto_corregir import auto_corregir_pedamentos_task, auditar_pedamentos_incompletos_task from api.customs.models import Pedimento from api.organization.models import Organizacion from api.record.models import Document @@ -98,7 +99,7 @@ def crear_partidas_organizacion(request): if not user.is_superuser and str(user.organizacion.id) != organizacion_id: return Response({'error': 'No tiene permisos para esta organización'}, status=status.HTTP_403_FORBIDDEN) - task = crear_partidas.delay(organizacion_id) + task = crear_partidas.delay(organizacion_id, user_id=str(user.id)) return Response({ 'organizacion_id': organizacion_id, @@ -228,7 +229,7 @@ def auditar_pedimentos_endpoint(request): ) # Ejecutar la tarea de auditoría - task = auditar_pedimentos.delay(organizacion_id) + task = auditar_pedimentos.delay(organizacion_id, user_id=str(user.id)) message = f"Auditoría iniciada para la organización {organizacion_id}" return Response({ @@ -309,7 +310,7 @@ def auditar_procesamiento_remesa_pedimento_endpoint(request): def _lanzar_auditoria_organizacion(request, task_fn, label): - """Helper compartido para los 4 endpoints de auditoría masiva por organización.""" + """Helper compartido para los endpoints de auditoría masiva por organización.""" organizacion_id = request.data.get('organizacion_id') if not organizacion_id: return Response({'error': 'Debe proporcionar organizacion_id'}, status=status.HTTP_400_BAD_REQUEST) @@ -318,12 +319,12 @@ def _lanzar_auditoria_organizacion(request, task_fn, label): if not user.is_superuser and str(user.organizacion.id) != organizacion_id: return Response({'error': 'No tiene permisos para esta organización'}, status=status.HTTP_403_FORBIDDEN) - task = task_fn.delay(organizacion_id) + task = task_fn.delay(organizacion_id, user_id=str(user.id)) return Response({ 'organizacion_id': organizacion_id, 'auditoria': label, 'task_id': task.id, - 'mensaje': f'Auditoría de {label} iniciada. Consulta el resultado en GET /api/tasks/status/{task.id}/', + 'mensaje': f'Auditoría de {label} iniciada. Usa el stream SSE para seguimiento en tiempo real.', }, status=status.HTTP_202_ACCEPTED) @@ -1973,12 +1974,14 @@ def procesar_pedimento_completo_endpoint(request): 'pc_descargado': pc_descargado, } - # Ya descargado — devolver diagnóstico sin encolar - if pc_descargado: + force = bool(request.data.get('force', False)) + + # Ya descargado — solo bloquear si no es forzado + if pc_descargado and not force: return Response({ **base_response, 'estado': 'ya_descargado', - 'mensaje': 'El pedimento completo ya fue descargado', + 'mensaje': 'El pedimento completo ya fue descargado. Usa force=true para reprocesar remesas, partidas y documentos derivados.', }, status=status.HTTP_200_OK) # No puede procesar — devolver diagnóstico con razones @@ -1990,7 +1993,7 @@ def procesar_pedimento_completo_endpoint(request): }, status=status.HTTP_200_OK) # Todo en orden — encolar - task = procesar_pedimento_completo_individual.delay(str(pedimento_id)) + task = procesar_pedimento_completo_individual.delay(str(pedimento_id), force=force) logger.info(f"Procesamiento PC encolado: {pedimento.pedimento} (task={task.id})") return Response({ @@ -2091,8 +2094,226 @@ def actualizar_info_pedimento(pedimento, info_xml): if actualizado: pedimento.save() return True - + return False - + except Exception: - return False \ No newline at end of file + return False + + +# ────────────────────────────────────────────────────────────────────────────── +# Auto-corrección de pedimentos incompletos +# ────────────────────────────────────────────────────────────────────────────── + +@swagger_auto_schema( + method='post', + operation_description=( + "Encola una tarea Celery que analiza los XMLs de pedimentos con " + "consultar_vucem=False, extrae datos del pedimento completo VUCEM y " + "auto-corrige los campos faltantes (numero_operacion, aduana, " + "clave_pedimento, regimen, contribuyente). El documento se reclasifica " + "a tipo 2 (Pedimento Completo) y se activa consultar_vucem=True." + ), + request_body=openapi.Schema( + type=openapi.TYPE_OBJECT, + required=['organizacion_id'], + properties={ + 'organizacion_id': openapi.Schema( + type=openapi.TYPE_STRING, + description='UUID de la organización a procesar', + ), + }, + ), + responses={ + 202: openapi.Response('Tarea encolada correctamente'), + 400: openapi.Response('organizacion_id faltante'), + 404: openapi.Response('Organización no encontrada'), + }, +) +@api_view(['POST']) +@permission_classes([IsAuthenticated, require_permission('auditoria.view')]) +def auto_corregir_pedamentos_endpoint(request): + organizacion_id = request.data.get('organizacion_id') + if not organizacion_id: + return Response( + {'error': 'organizacion_id es requerido'}, + status=status.HTTP_400_BAD_REQUEST, + ) + + try: + Organizacion.objects.get(id=organizacion_id) + except Organizacion.DoesNotExist: + return Response( + {'error': 'Organización no encontrada'}, + status=status.HTTP_404_NOT_FOUND, + ) + + task = auto_corregir_pedamentos_task.delay(str(organizacion_id)) + logger.info( + f"[auto_corregir] tarea encolada — org={organizacion_id} task={task.id}" + ) + + return Response( + { + 'task_id': task.id, + 'organizacion': str(organizacion_id), + 'mensaje': ( + 'Tarea encolada. Se analizarán los pedimentos con ' + 'consultar_vucem=False de la organización.' + ), + }, + status=status.HTTP_202_ACCEPTED, + ) + + +@swagger_auto_schema( + method='post', + operation_description=( + "Análisis de solo lectura: detecta pedimentos con consultar_vucem=False " + "que podrían corregirse automáticamente. No modifica BD ni storage. " + "Retorna el listado de pedimentos corregibles, los campos que cambiarían " + "y el nuevo nombre de documento que se asignaría." + ), + request_body=openapi.Schema( + type=openapi.TYPE_OBJECT, + required=['organizacion_id'], + properties={ + 'organizacion_id': openapi.Schema( + type=openapi.TYPE_STRING, + description='UUID de la organización a analizar', + ), + }, + ), + responses={ + 202: openapi.Response('Tarea de análisis encolada correctamente'), + 400: openapi.Response('organizacion_id faltante'), + 404: openapi.Response('Organización no encontrada'), + }, +) +@api_view(['POST']) +@permission_classes([IsAuthenticated, require_permission('auditoria.view')]) +def auditar_pedamentos_incompletos_endpoint(request): + organizacion_id = request.data.get('organizacion_id') + if not organizacion_id: + return Response( + {'error': 'organizacion_id es requerido'}, + status=status.HTTP_400_BAD_REQUEST, + ) + + try: + Organizacion.objects.get(id=organizacion_id) + except Organizacion.DoesNotExist: + return Response( + {'error': 'Organización no encontrada'}, + status=status.HTTP_404_NOT_FOUND, + ) + + task = auditar_pedamentos_incompletos_task.delay(str(organizacion_id)) + logger.info( + f"[auditar_incompletos] tarea encolada — org={organizacion_id} task={task.id}" + ) + + return Response( + { + 'task_id': task.id, + 'organizacion': str(organizacion_id), + 'mensaje': ( + 'Tarea de análisis encolada. Se reportarán los pedimentos con ' + 'consultar_vucem=False que podrían corregirse automáticamente, ' + 'sin modificar nada.' + ), + }, + status=status.HTTP_202_ACCEPTED, + ) + + +@swagger_auto_schema( + method='post', + operation_description="Analiza un pedimento específico para auto-corrección (solo lectura, sin modificar BD). Acepta pedimento_id (UUID) o pedimento_app.", + request_body=openapi.Schema( + type=openapi.TYPE_OBJECT, + properties={ + 'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='UUID del pedimento'), + 'pedimento_app': openapi.Schema(type=openapi.TYPE_STRING, description='Número de pedimento (ej: 21-80-3452-1004463)'), + }, + ), + responses={ + 202: openapi.Response('Tarea de análisis encolada'), + 400: openapi.Response('Parámetro faltante'), + 404: openapi.Response('Pedimento no encontrado'), + }, +) +@api_view(['POST']) +@permission_classes([IsAuthenticated, require_permission('auditoria.view')]) +def auditar_pedamento_incompleto_endpoint(request): + pedimento_id = request.data.get('pedimento_id') + pedimento_app = request.data.get('pedimento_app') + if not pedimento_id and not pedimento_app: + return Response({'error': 'pedimento_id o pedimento_app es requerido'}, status=status.HTTP_400_BAD_REQUEST) + + try: + if pedimento_id: + pedimento = Pedimento.objects.get(id=pedimento_id) + else: + pedimento = Pedimento.objects.get(pedimento_app=pedimento_app) + except Pedimento.DoesNotExist: + return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND) + + task = auditar_pedamentos_incompletos_task.delay(str(pedimento.organizacion_id), str(pedimento.id)) + logger.info(f"[auditar_incompletos] individual — ped={pedimento.pedimento_app} task={task.id}") + + return Response( + { + 'task_id': task.id, + 'pedimento_id': str(pedimento.id), + 'pedimento': pedimento.pedimento_app, + 'mensaje': 'Análisis individual encolado. Sin modificar nada.', + }, + status=status.HTTP_202_ACCEPTED, + ) + + +@swagger_auto_schema( + method='post', + operation_description="Auto-corrige un pedimento específico: extrae campos del XML y actualiza BD + storage. Acepta pedimento_id (UUID) o pedimento_app.", + request_body=openapi.Schema( + type=openapi.TYPE_OBJECT, + properties={ + 'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='UUID del pedimento'), + 'pedimento_app': openapi.Schema(type=openapi.TYPE_STRING, description='Número de pedimento (ej: 21-80-3452-1004463)'), + }, + ), + responses={ + 202: openapi.Response('Tarea de corrección encolada'), + 400: openapi.Response('Parámetro faltante'), + 404: openapi.Response('Pedimento no encontrado'), + }, +) +@api_view(['POST']) +@permission_classes([IsAuthenticated, require_permission('auditoria.view')]) +def auto_corregir_pedamento_endpoint(request): + pedimento_id = request.data.get('pedimento_id') + pedimento_app = request.data.get('pedimento_app') + if not pedimento_id and not pedimento_app: + return Response({'error': 'pedimento_id o pedimento_app es requerido'}, status=status.HTTP_400_BAD_REQUEST) + + try: + if pedimento_id: + pedimento = Pedimento.objects.get(id=pedimento_id) + else: + pedimento = Pedimento.objects.get(pedimento_app=pedimento_app) + except Pedimento.DoesNotExist: + return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND) + + task = auto_corregir_pedamentos_task.delay(str(pedimento.organizacion_id), str(pedimento.id)) + logger.info(f"[auto_corregir] individual — ped={pedimento.pedimento_app} task={task.id}") + + return Response( + { + 'task_id': task.id, + 'pedimento_id': str(pedimento.id), + 'pedimento': pedimento.pedimento_app, + 'mensaje': 'Corrección individual encolada.', + }, + status=status.HTTP_202_ACCEPTED, + ) \ No newline at end of file diff --git a/api/datastage/migrations/0012_alter_datastage_archivo.py b/api/datastage/migrations/0012_alter_datastage_archivo.py new file mode 100644 index 0000000..c83bd0e --- /dev/null +++ b/api/datastage/migrations/0012_alter_datastage_archivo.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.3 on 2026-04-20 16:34 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('datastage', '0011_alter_registro502_fecha_pago_real_and_more'), + ] + + operations = [ + migrations.AlterField( + model_name='datastage', + name='archivo', + field=models.CharField(blank=True, max_length=500, null=True), + ), + ] diff --git a/api/datastage/migrations/0013_registro501_add_timestamps.py b/api/datastage/migrations/0013_registro501_add_timestamps.py new file mode 100644 index 0000000..0113d81 --- /dev/null +++ b/api/datastage/migrations/0013_registro501_add_timestamps.py @@ -0,0 +1,26 @@ +from django.db import migrations, models +import django.utils.timezone + + +class Migration(migrations.Migration): + + dependencies = [ + ('datastage', '0012_alter_datastage_archivo'), + ] + + operations = [ + # La columna created_at ya existe en la BD (NOT NULL, sin DEFAULT). + # Solo actualizamos el estado interno de Django para que auto_now_add + # inserte el valor al hacer bulk_create. + migrations.SeparateDatabaseAndState( + state_operations=[ + migrations.AddField( + model_name='registro501', + name='created_at', + field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), + preserve_default=False, + ), + ], + database_operations=[], + ), + ] diff --git a/api/datastage/migrations/0014_all_registros_add_created_at.py b/api/datastage/migrations/0014_all_registros_add_created_at.py new file mode 100644 index 0000000..f91a947 --- /dev/null +++ b/api/datastage/migrations/0014_all_registros_add_created_at.py @@ -0,0 +1,44 @@ +from django.db import migrations, models +import django.utils.timezone + + +class Migration(migrations.Migration): + """ + Las columnas created_at ya existen en la BD como NOT NULL sin DEFAULT. + Solo actualizamos el estado interno de Django para que auto_now_add + inserte el timestamp al hacer bulk_create. + """ + + dependencies = [ + ('datastage', '0013_registro501_add_timestamps'), + ] + + operations = [ + migrations.SeparateDatabaseAndState( + state_operations=[ + migrations.AddField(model_name='registro502', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro503', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro504', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro505', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro506', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro507', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro508', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro509', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro510', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro511', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro512', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro551', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro552', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro553', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro554', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro555', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro556', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro557', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro558', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registrosel', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro701', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + migrations.AddField(model_name='registro702', name='created_at', field=models.DateTimeField(auto_now_add=True, default=django.utils.timezone.now), preserve_default=False), + ], + database_operations=[], + ), + ] diff --git a/api/notificaciones/migrations/0002_notificacion_datos.py b/api/notificaciones/migrations/0002_notificacion_datos.py new file mode 100644 index 0000000..6cbfc28 --- /dev/null +++ b/api/notificaciones/migrations/0002_notificacion_datos.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.3 on 2026-05-26 13:40 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('notificaciones', '0001_initial'), + ] + + operations = [ + migrations.AddField( + model_name='notificacion', + name='datos', + field=models.JSONField(blank=True, null=True), + ), + ] diff --git a/api/notificaciones/models.py b/api/notificaciones/models.py index 61d3933..395210a 100644 --- a/api/notificaciones/models.py +++ b/api/notificaciones/models.py @@ -21,6 +21,7 @@ class Notificacion(models.Model): mensaje = models.TextField(help_text="Mensaje de la notificación") + datos = models.JSONField(null=True, blank=True) fecha_envio = models.DateTimeField(blank=True, null=True, help_text="Fecha de envío de la notificación") created_at = models.DateTimeField(auto_now_add=True, help_text="Fecha de creación de la notificación") visto = models.BooleanField(default=False, help_text="Indica si la notificación ha sido vista") diff --git a/api/notificaciones/serializers.py b/api/notificaciones/serializers.py index e09291e..77df38d 100644 --- a/api/notificaciones/serializers.py +++ b/api/notificaciones/serializers.py @@ -16,10 +16,11 @@ class NotificacionSerializer(serializers.ModelSerializer): 'tipo', 'dirigido', 'mensaje', + 'datos', 'fecha_envio', 'created_at', 'visto' ] - read_only_fields = ['id', 'created_at', 'tipo', 'dirigido', 'fecha_envio', 'mensaje'] + read_only_fields = ['id', 'created_at', 'tipo', 'dirigido', 'fecha_envio', 'mensaje', 'datos'] \ No newline at end of file diff --git a/api/notificaciones/views.py b/api/notificaciones/views.py index 0a0c11c..3ae0412 100644 --- a/api/notificaciones/views.py +++ b/api/notificaciones/views.py @@ -1,6 +1,8 @@ -from rest_framework import viewsets +from rest_framework import viewsets, status +from rest_framework.decorators import action from rest_framework.permissions import IsAuthenticated from rest_framework.exceptions import PermissionDenied +from rest_framework.response import Response from .models import Notificacion, TipoNotificacion from .serializers import NotificacionSerializer, TipoNotificacionSerializer @@ -45,3 +47,11 @@ class NotificacionViewSet(viewsets.ModelViewSet): serializer.save() return raise PermissionDenied("No tienes permiso para crear notificaciones para otros usuarios") + + @action(detail=False, methods=['get'], url_path=r'by-task/(?P[^/.]+)') + def by_task(self, request, task_id=None): + """Recupera la notificación de una tarea de auditoría por su task_id (Celery).""" + notif = self.get_queryset().filter(datos__task_id=task_id).first() + if not notif: + return Response({'detail': 'No encontrada.'}, status=status.HTTP_404_NOT_FOUND) + return Response(self.get_serializer(notif).data) diff --git a/api/organization/migrations/0003_organizacion_apply_auto_download.py b/api/organization/migrations/0003_organizacion_apply_auto_download.py new file mode 100644 index 0000000..25fd3b3 --- /dev/null +++ b/api/organization/migrations/0003_organizacion_apply_auto_download.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.3 on 2026-05-19 13:45 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('organization', '0002_remove_organizacion_membretado_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='organizacion', + name='apply_auto_download', + field=models.BooleanField(default=False), + ), + ] diff --git a/api/organization/migrations/0004_organizacion_owner.py b/api/organization/migrations/0004_organizacion_owner.py new file mode 100644 index 0000000..0b61e3b --- /dev/null +++ b/api/organization/migrations/0004_organizacion_owner.py @@ -0,0 +1,25 @@ +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('organization', '0003_organizacion_apply_auto_download'), + ('cuser', '0005_customuser_rfc_fk_to_m2m'), + ] + + operations = [ + migrations.AddField( + model_name='organizacion', + name='owner', + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.PROTECT, + related_name='organizaciones_que_administra', + to=settings.AUTH_USER_MODEL, + ), + ), + ] diff --git a/api/rbac/migrations/0005_alter_rolepermission_id.py b/api/rbac/migrations/0005_alter_rolepermission_id.py new file mode 100644 index 0000000..257e50c --- /dev/null +++ b/api/rbac/migrations/0005_alter_rolepermission_id.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.3 on 2026-05-26 13:40 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('rbac', '0004_auditoria_permissions'), + ] + + operations = [ + migrations.AlterField( + model_name='rolepermission', + name='id', + field=models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'), + ), + ] diff --git a/api/record/migrations/0003_document_vu.py b/api/record/migrations/0003_document_vu.py new file mode 100644 index 0000000..7830e96 --- /dev/null +++ b/api/record/migrations/0003_document_vu.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.3 on 2026-03-06 19:35 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('record', '0002_fuente_document_fuente'), + ] + + operations = [ + migrations.AddField( + model_name='document', + name='vu', + field=models.BooleanField(default=False), + ), + ] diff --git a/api/record/views.py b/api/record/views.py index 9e8a0cc..ef16e2e 100644 --- a/api/record/views.py +++ b/api/record/views.py @@ -1321,11 +1321,13 @@ class DocumentViewSet(viewsets.ModelViewSet, DocumentosFiltradosMixin): nombre="Documento General", defaults={'descripcion': "Documento general sin tipo específico"} ) - + uploaded_documents = [] failed_files = [] errors = [] total_space_used = 0 + created_count = 0 + replaced_count = 0 try: with transaction.atomic(): @@ -1410,6 +1412,8 @@ class DocumentViewSet(viewsets.ModelViewSet, DocumentosFiltradosMixin): else: raise Exception(f"Error al guardar archivo: {file.name}") document = existing_doc + replaced_count += 1 + was_replaced = True else: # Crear nuevo documento document = Document.objects.create( @@ -1431,6 +1435,8 @@ class DocumentViewSet(viewsets.ModelViewSet, DocumentosFiltradosMixin): else: document.delete() raise Exception(f"Error al guardar archivo: {file.name}") + created_count += 1 + was_replaced = False # Actualizar espacio usado espacio_usado_temp += file.size @@ -1441,7 +1447,8 @@ class DocumentViewSet(viewsets.ModelViewSet, DocumentosFiltradosMixin): "filename": file.name, "size": file.size, "extension": extension, - "document_type": document.document_type.nombre if document.document_type else None + "document_type": document.document_type.nombre if document.document_type else None, + "replaced": was_replaced, }) except Exception as e: @@ -1463,27 +1470,36 @@ class DocumentViewSet(viewsets.ModelViewSet, DocumentosFiltradosMixin): space_used_mb = round(total_space_used / (1024 * 1024), 2) # Preparar respuesta + partes = [] + if created_count: + partes.append(f"{created_count} documento(s) creado(s) exitosamente") + if replaced_count: + partes.append(f"{replaced_count} documento(s) reemplazado(s) exitosamente") + mensaje_exito = " y ".join(partes) if partes else "Sin cambios" + response_data = { "uploaded_count": len(uploaded_documents), + "created_count": created_count, + "replaced_count": replaced_count, "uploaded_documents": uploaded_documents, "space_used_mb": space_used_mb, "pedimento_id": str(pedimento_id), - "document_type": document_type.nombre + "document_type": document_type.nombre, } - + if failed_files: response_data.update({ - "message": "Algunos documentos no pudieron ser subidos", + "message": f"Algunos documentos no pudieron ser subidos. {mensaje_exito}", "failed_files": failed_files, - "errors": errors + "errors": errors, }) response_status = status.HTTP_207_MULTI_STATUS else: - response_data["message"] = "Documentos subidos exitosamente" + response_data["message"] = mensaje_exito response_status = status.HTTP_201_CREATED - + return Response(response_data, status=response_status) - + @action(detail=False, methods=['post'], url_path='bulk-upload-vu', parser_classes=[MultiPartParser]) def bulk_upload_vu(self, request): """ diff --git a/api/reports/migrations/0002_reportdocument_report_type.py b/api/reports/migrations/0002_reportdocument_report_type.py new file mode 100644 index 0000000..4897e7b --- /dev/null +++ b/api/reports/migrations/0002_reportdocument_report_type.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.3 on 2025-11-21 14:40 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('reports', '0001_initial'), + ] + + operations = [ + migrations.AddField( + model_name='reportdocument', + name='report_type', + field=models.CharField(choices=[('cumplimiento', 'cumplimiento'), ('control_pedimento', 'control_pedimento')], default='cumplimiento', max_length=30), + ), + ] diff --git a/api/reports/migrations/0003_alter_reportdocument_file.py b/api/reports/migrations/0003_alter_reportdocument_file.py new file mode 100644 index 0000000..f656a10 --- /dev/null +++ b/api/reports/migrations/0003_alter_reportdocument_file.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.3 on 2026-04-21 22:53 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('reports', '0002_reportdocument_report_type'), + ] + + operations = [ + migrations.AlterField( + model_name='reportdocument', + name='file', + field=models.CharField(blank=True, max_length=500, null=True), + ), + ] diff --git a/api/vucem/migrations/0012_alter_vucem_cer_alter_vucem_key.py b/api/vucem/migrations/0012_alter_vucem_cer_alter_vucem_key.py new file mode 100644 index 0000000..1427b43 --- /dev/null +++ b/api/vucem/migrations/0012_alter_vucem_cer_alter_vucem_key.py @@ -0,0 +1,23 @@ +# Generated by Django 5.2.3 on 2026-04-21 14:58 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('vucem', '0011_alter_credencialesimportador_rfc'), + ] + + operations = [ + migrations.AlterField( + model_name='vucem', + name='cer', + field=models.CharField(blank=True, help_text='Certificado de VUCEM', max_length=500, null=True), + ), + migrations.AlterField( + model_name='vucem', + name='key', + field=models.CharField(blank=True, help_text='Llave privada de VUCEM', max_length=500, null=True), + ), + ] diff --git a/core/redis_events.py b/core/redis_events.py new file mode 100644 index 0000000..48f03dd --- /dev/null +++ b/core/redis_events.py @@ -0,0 +1,42 @@ +import json +import os +import logging + +logger = logging.getLogger(__name__) + +CHANNEL_PREFIX = "audit_task:" +STATE_PREFIX = "audit_task_state:" +STATE_TTL = 7200 # 2 horas + + +def _get_client(): + import redis + return redis.Redis( + host=os.getenv("REDIS_HOST", "localhost"), + port=int(os.getenv("REDIS_PORT", 6379)), + db=int(os.getenv("REDIS_DB", 0)), + decode_responses=True, + socket_connect_timeout=2, + socket_timeout=2, + ) + + +def publish_task_event(task_id: str, status: str, message: str = "", resultado: dict = None, progress: int = None): + """ + Publica un evento de progreso de tarea en Redis Pub/Sub. + El microservicio SSE usa el mismo canal para streamear al frontend. + """ + payload: dict = {"task_id": task_id, "status": status, "message": message} + if resultado is not None: + payload["resultado"] = resultado + if progress is not None: + payload["progress"] = progress + + try: + client = _get_client() + serialized = json.dumps(payload) + client.publish(f"{CHANNEL_PREFIX}{task_id}", serialized) + client.setex(f"{STATE_PREFIX}{task_id}", STATE_TTL, serialized) + client.close() + except Exception as exc: + logger.error(f"[redis_events] Error publicando evento para tarea {task_id}: {exc}") -- 2.49.1