feat: comando dedup_documents para duplicados legados (T2025-09-004)
Limpia documentos duplicados (misma sub-entidad + mismo document_type) creados ANTES del fix de reemplazo del microservicio (jun-2026). Conserva el mas reciente con archivo valido en storage, borra el resto (archivo MinIO si no lo referencia otra fila + fila + ajuste de cuota). --dry-run, conteo previo, idempotente; solo docs ligados a entidad (partida/cove/edocument). La creacion ya reemplaza desde jun-2026: verificado 0 duplicados posteriores al fix. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
138
api/customs/management/commands/dedup_documents.py
Normal file
138
api/customs/management/commands/dedup_documents.py
Normal file
@@ -0,0 +1,138 @@
|
||||
"""
|
||||
Limpieza de documentos duplicados legados (T2025-09-004).
|
||||
|
||||
Un mismo documento (misma sub-entidad + mismo document_type) quedó con varias
|
||||
filas porque, antes del fix de reemplazo (microservicio, jun-2026), cada descarga
|
||||
re-creaba en vez de reemplazar. La creación ya está corregida; esto solo limpia lo
|
||||
viejo.
|
||||
|
||||
Estrategia: encuentra los grupos duplicados con UNA agregación global por FK
|
||||
(no itera pedimento por pedimento — hay ~110k pedimentos y casi ninguno tiene
|
||||
duplicados). Por cada grupo (FK partida/cove/edocument + document_type, con >1
|
||||
fila): conserva el MÁS RECIENTE cuyo archivo exista en storage (si ninguno existe,
|
||||
conserva el más reciente y NO borra el grupo entero) y elimina el resto —archivo
|
||||
en MinIO (si no lo referencia otra fila) + fila + ajuste de cuota vía Document.delete().
|
||||
|
||||
Solo toca documentos ligados a una entidad (partida/cove/edoc). NO toca documentos
|
||||
nativos del pedimento ni subidas sin FK.
|
||||
|
||||
Uso:
|
||||
python manage.py dedup_documents [--pedimento UUID] [--organizacion UUID]
|
||||
[--offset N] [--limit N] [--dry-run]
|
||||
"""
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.db import transaction
|
||||
from django.db.models import Count
|
||||
|
||||
from api.record.models import Document
|
||||
from api.utils.storage_service import storage_service
|
||||
|
||||
# Campos FK de sub-entidad sobre los que se detectan duplicados.
|
||||
_CAMPOS_FK = ('partida_id', 'cove_id', 'edocument_id')
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Elimina documentos duplicados legados (misma entidad + mismo tipo), conservando el más reciente con archivo válido."
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('--pedimento', type=str, default=None)
|
||||
parser.add_argument('--organizacion', type=str, default=None)
|
||||
parser.add_argument('--offset', type=int, default=0, help='Saltar los primeros N grupos')
|
||||
parser.add_argument('--limit', type=int, default=None, help='Procesar máximo N grupos')
|
||||
parser.add_argument('--dry-run', action='store_true', help='Reporta sin borrar')
|
||||
|
||||
def handle(self, *args, **opts):
|
||||
dry_run = opts['dry_run']
|
||||
base = Document.objects.all()
|
||||
if opts['pedimento']:
|
||||
base = base.filter(pedimento_id=opts['pedimento'])
|
||||
if opts['organizacion']:
|
||||
base = base.filter(pedimento__organizacion_id=opts['organizacion'])
|
||||
|
||||
offset = opts['offset'] or 0
|
||||
limit = opts['limit']
|
||||
if dry_run:
|
||||
self.stdout.write(self.style.WARNING("== DRY-RUN: no se borra nada =="))
|
||||
|
||||
stats = {'grupos': 0, 'eliminados': 0, 'bytes': 0, 'sin_archivo_valido': 0}
|
||||
visto = 0 # índice global de grupos (para offset/limit)
|
||||
for campo in _CAMPOS_FK:
|
||||
# Una sola agregación: todos los grupos duplicados de esta FK.
|
||||
grupos = (
|
||||
base.filter(**{campo + '__isnull': False})
|
||||
.values('pedimento_id', campo, 'document_type_id')
|
||||
.annotate(n=Count('id'))
|
||||
.filter(n__gt=1)
|
||||
.order_by() # limpia el ordering por defecto del modelo (rompe el GROUP BY)
|
||||
)
|
||||
for g in grupos.iterator():
|
||||
if visto < offset:
|
||||
visto += 1
|
||||
continue
|
||||
if limit is not None and stats['grupos'] >= limit:
|
||||
break
|
||||
visto += 1
|
||||
self._dedup_grupo(campo, g, dry_run, stats)
|
||||
if limit is not None and stats['grupos'] >= limit:
|
||||
break
|
||||
|
||||
mb = stats['bytes'] / (1024 * 1024)
|
||||
self.stdout.write("")
|
||||
self.stdout.write(self.style.SUCCESS(
|
||||
"Grupos con duplicados: %d | filas eliminadas: %d | espacio liberado: %.1f MB | grupos sin archivo válido (se conservó el más reciente): %d"
|
||||
% (stats['grupos'], stats['eliminados'], mb, stats['sin_archivo_valido'])
|
||||
))
|
||||
|
||||
def _dedup_grupo(self, campo, g, dry_run, stats):
|
||||
docs = list(
|
||||
Document.objects.filter(
|
||||
pedimento_id=g['pedimento_id'],
|
||||
document_type_id=g['document_type_id'],
|
||||
**{campo: g[campo]},
|
||||
).select_related('pedimento').order_by('-created_at')
|
||||
)
|
||||
if len(docs) < 2:
|
||||
return
|
||||
conservado = self._elegir_conservado(docs, dry_run, stats)
|
||||
a_borrar = [d for d in docs if d.id != conservado.id]
|
||||
if not a_borrar:
|
||||
return
|
||||
stats['grupos'] += 1
|
||||
# SELECT + COUNT previo (estándar de la organización): reportar antes de borrar.
|
||||
self.stdout.write(
|
||||
" ped=%s %s=%s type=%s: conservar %s, eliminar %d (%s)"
|
||||
% (str(g['pedimento_id'])[:8], campo, g[campo], g['document_type_id'],
|
||||
str(conservado.id)[:8], len(a_borrar), ', '.join(str(d.id)[:8] for d in a_borrar))
|
||||
)
|
||||
if not dry_run:
|
||||
for d in a_borrar:
|
||||
self._borrar(d, stats)
|
||||
|
||||
def _elegir_conservado(self, docs_desc, dry_run, stats):
|
||||
"""docs_desc viene ordenado por -created_at. En dry-run conserva el más
|
||||
reciente sin tocar storage; en ejecución real, el más reciente cuyo
|
||||
archivo exista en MinIO (fallback: el más reciente, para no borrar todo)."""
|
||||
if dry_run:
|
||||
return docs_desc[0]
|
||||
for d in docs_desc:
|
||||
try:
|
||||
if d.archivo and storage_service.file_exists(d.archivo.name):
|
||||
return d
|
||||
except Exception:
|
||||
continue
|
||||
stats['sin_archivo_valido'] += 1
|
||||
return docs_desc[0]
|
||||
|
||||
def _borrar(self, doc, stats):
|
||||
with transaction.atomic():
|
||||
# Borrar el archivo en MinIO solo si ninguna OTRA fila lo referencia.
|
||||
nombre = doc.archivo.name if doc.archivo else None
|
||||
if nombre and not Document.objects.filter(archivo=nombre).exclude(id=doc.id).exists():
|
||||
try:
|
||||
storage_service.delete_file(nombre)
|
||||
except Exception as e:
|
||||
self.stdout.write(self.style.WARNING(f" no se pudo borrar de storage {nombre}: {e}"))
|
||||
stats['bytes'] += doc.size or 0
|
||||
doc.delete() # ajusta la cuota de almacenamiento (UsoAlmacenamiento)
|
||||
stats['eliminados'] += 1
|
||||
@@ -780,3 +780,52 @@ class DocumentFKResolutionTests(TestCase):
|
||||
call_command("backfill_document_links", pedimento=str(self.pedimento.id), stdout=StringIO())
|
||||
d_pt.refresh_from_db()
|
||||
self.assertEqual(d_pt.partida_id, self.partida.id)
|
||||
|
||||
def _tres_copias_edoc(self):
|
||||
"""3 copias del mismo edoc (type 5) con created_at d1<d2<d3 y archivos distintos."""
|
||||
from django.utils import timezone
|
||||
import datetime
|
||||
d1 = self._doc(f"vu_ED_{self.APP}_EDOC001_aaa.pdf", 5)
|
||||
d2 = self._doc(f"vu_ED_{self.APP}_EDOC001_bbb.pdf", 5)
|
||||
d3 = self._doc(f"vu_ED_{self.APP}_EDOC001_ccc.pdf", 5)
|
||||
base = timezone.now()
|
||||
Document.objects.filter(id=d1.id).update(created_at=base - datetime.timedelta(days=2))
|
||||
Document.objects.filter(id=d2.id).update(created_at=base - datetime.timedelta(days=1))
|
||||
Document.objects.filter(id=d3.id).update(created_at=base)
|
||||
return d1, d2, d3
|
||||
|
||||
def test_dedup_conserva_mas_reciente_y_es_idempotente(self):
|
||||
from unittest.mock import patch
|
||||
d1, d2, d3 = self._tres_copias_edoc()
|
||||
self.assertEqual(Document.objects.filter(edocument=self.edoc, document_type_id=5).count(), 3)
|
||||
with patch('api.customs.management.commands.dedup_documents.storage_service') as st:
|
||||
st.file_exists.return_value = True
|
||||
call_command('dedup_documents', pedimento=str(self.pedimento.id), stdout=StringIO())
|
||||
self.assertEqual(st.delete_file.call_count, 2) # borró los 2 viejos de MinIO
|
||||
restantes = list(Document.objects.filter(edocument=self.edoc, document_type_id=5))
|
||||
self.assertEqual(len(restantes), 1)
|
||||
self.assertEqual(restantes[0].id, d3.id) # conservó el más reciente
|
||||
# idempotente: re-correr no borra nada
|
||||
with patch('api.customs.management.commands.dedup_documents.storage_service') as st2:
|
||||
st2.file_exists.return_value = True
|
||||
call_command('dedup_documents', pedimento=str(self.pedimento.id), stdout=StringIO())
|
||||
self.assertEqual(st2.delete_file.call_count, 0)
|
||||
|
||||
def test_dedup_dry_run_no_borra(self):
|
||||
from unittest.mock import patch
|
||||
self._tres_copias_edoc()
|
||||
with patch('api.customs.management.commands.dedup_documents.storage_service') as st:
|
||||
call_command('dedup_documents', pedimento=str(self.pedimento.id), dry_run=True, stdout=StringIO())
|
||||
self.assertEqual(st.delete_file.call_count, 0)
|
||||
self.assertEqual(Document.objects.filter(edocument=self.edoc, document_type_id=5).count(), 3)
|
||||
|
||||
def test_dedup_conserva_el_que_tiene_archivo_en_storage(self):
|
||||
from unittest.mock import patch
|
||||
d1, d2, d3 = self._tres_copias_edoc() # d3 el más reciente
|
||||
# En storage solo existe el de d1 (el más viejo); los más nuevos no.
|
||||
with patch('api.customs.management.commands.dedup_documents.storage_service') as st:
|
||||
st.file_exists.side_effect = lambda nombre: nombre.endswith('_aaa.pdf')
|
||||
call_command('dedup_documents', pedimento=str(self.pedimento.id), stdout=StringIO())
|
||||
restantes = list(Document.objects.filter(edocument=self.edoc, document_type_id=5))
|
||||
self.assertEqual(len(restantes), 1)
|
||||
self.assertEqual(restantes[0].id, d1.id) # conservó el único con archivo válido
|
||||
|
||||
Reference in New Issue
Block a user