Files
backend/api/customs/tasks/auditoria.py

686 lines
27 KiB
Python

import os
from datetime import datetime
from django.db import models
from celery import shared_task, group
from api.customs.models import ProcesamientoPedimento, Pedimento, Cove, EDocument
from core.utils import xml_controller
import requests
from core.utils import xml_remesas_controller
from core.redis_events import publish_task_event
import logging
logger = logging.getLogger(__name__)
def _crear_notificacion_auditoria(user_id: str, task_id: str, label: str, resultado: dict):
"""Crea una Notificacion persistente cuando una tarea de auditoría masiva completa."""
try:
from api.notificaciones.models import Notificacion, TipoNotificacion
from api.cuser.models import CustomUser
tipo, _ = TipoNotificacion.objects.get_or_create(
tipo="auditoria_completada",
defaults={"descripcion": "Auditoría masiva completada"},
)
usuario = CustomUser.objects.filter(id=user_id).first()
if not usuario:
return
total = resultado.get('total_pedimentos', 0)
completados = resultado.get('completados', resultado.get('procesados', 0))
pendientes = resultado.get('con_pendientes', 0)
errores = resultado.get('con_errores', 0)
partes = [f"Auditoría de {label} completada — {completados}/{total} pedimentos"]
if pendientes:
partes.append(f"{pendientes} con pendientes")
if errores:
partes.append(f"{errores} con errores")
Notificacion.objects.create(
tipo=tipo,
dirigido=usuario,
mensaje=", ".join(partes),
datos={
"task_id": task_id,
"label": label,
"resultado": resultado,
},
)
except Exception as exc:
logger.error(f"[auditoria] Error creando notificación para tarea {task_id}: {exc}")
def obtener_pedimentos(organizacion_id):
return Pedimento.objects.filter(organizacion_id=organizacion_id)
def extraer_coves(pedimento):
remesas = pedimento.documents.filter(document_type=3).first()
with open(f'./media/{remesas.archivo}', 'r') as f:
xml_content = f.read()
xml_data = xml_remesas_controller.extract_remesas(xml_content)
return xml_data
def modificar_estado_procesamiento(pedimento, servicio_id, nuevo_estado):
procesamiento = ProcesamientoPedimento.objects.filter(
pedimento=pedimento,
servicio_id=servicio_id,
organizacion=pedimento.organizacion
).first()
if procesamiento:
procesamiento.estado_id = nuevo_estado
procesamiento.save()
return True
return False
def auditor_descargas(pedimento, servicio, related_name, variable, mensaje):
pedimento_id = pedimento.id
docs = getattr(pedimento, related_name).all()
print(f"pedimento: {pedimento}, servicio: {servicio}, related_name: {related_name}, variable: {variable}, mensaje: {mensaje}")
logger.info(f"pedimento: {pedimento}, servicio: {servicio}, related_name: {related_name}, variable: {variable}, mensaje: {mensaje}")
# Si no hay documentos, marcar como completado
if not docs.exists():
proceso = modificar_estado_procesamiento(pedimento, servicio_id=servicio, nuevo_estado=3) # Estado "completado"
print(f"✓ Pedimento {pedimento_id} no tiene {mensaje}s para procesar.")
logger.info(f"✓ Pedimento {pedimento_id} no tiene {mensaje}s para procesar.")
else:
all_docs = all(getattr(doc, variable) for doc in docs)
if all_docs:
proceso = modificar_estado_procesamiento(pedimento, servicio_id=servicio, nuevo_estado=3) # Estado "completado"
print(f"✓ Pedimento {pedimento_id} tiene todos sus {mensaje} descargados.")
logger.info(f"✓ Pedimento {pedimento_id} tiene todos sus {mensaje} descargados.")
else:
proceso = modificar_estado_procesamiento(pedimento, servicio_id=servicio, nuevo_estado=4) # Estado "en progreso"
print(f"✗ Pedimento {pedimento_id} NO tiene todos sus {mensaje} descargados.")
logger.info(f"✗ Pedimento {pedimento_id} NO tiene todos sus {mensaje} descargados.")
if proceso:
print(f"✓ Proceso de auditoría para pedimento {pedimento_id} completado.")
logger.info(f"✓ Proceso de auditoría para pedimento {pedimento_id} completado.")
else:
print(f"✗ No se encontró proceso de auditoría para pedimento {pedimento_id}.")
logger.info(f"✗ No se encontró proceso de auditoría para pedimento {pedimento_id}.")
## Auditar pedimentos
@shared_task
def auditar_procesamiento_remesa_por_pedimento(pedimento_id):
"""
Audita el procesamiento de remesa para un pedimento específico.
Args:
pedimento_id: UUID del pedimento a auditar
Returns:
dict: Resultado de la auditoría con detalles del procesamiento
"""
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
resultado = {
'pedimento_id': str(pedimento_id),
'pedimento_numero': pedimento.pedimento,
'tiene_remesas': pedimento.remesas,
'procesamiento_creado': False,
'coves_creados': []
}
if not pedimento.remesas:
resultado['mensaje'] = 'El pedimento no tiene remesas para procesar'
return resultado
# Verificar documento tipo remesa
if not pedimento.documents.filter(document_type=3).exists():
# Crear procesamiento si no existe documento de remesa
procesamiento, creado = ProcesamientoPedimento.objects.get_or_create(
pedimento=pedimento,
servicio_id=5, # ID del servicio de remesas
organizacion=pedimento.organizacion_id
)
resultado['procesamiento_creado'] = creado
resultado['mensaje'] = 'Procesamiento de remesa creado - documento no encontrado'
else:
# Procesar XML de remesas
xml_data = extraer_coves(pedimento)
if xml_data:
for remesa in xml_data:
numero_cove = remesa.get('remesaSA')
cove, creado = Cove.objects.get_or_create(
pedimento=pedimento,
numero_cove=numero_cove,
organizacion=pedimento.organizacion_id
)
if creado:
resultado['coves_creados'].append(numero_cove)
resultado['mensaje'] = f"Procesados {len(xml_data)} remesas, creados {len(resultado['coves_creados'])} COVEs nuevos"
else:
resultado['mensaje'] = 'No se encontraron datos de remesas en el XML'
return resultado
except Pedimento.DoesNotExist:
return {
'error': f'Pedimento con ID {pedimento_id} no encontrado',
'pedimento_id': str(pedimento_id)
}
except Exception as e:
return {
'error': f'Error procesando pedimento {pedimento_id}: {str(e)}',
'pedimento_id': str(pedimento_id)
}
@shared_task(bind=True)
def crear_partidas(self, organizacion_id, user_id=None):
from api.customs.models import Partida
task_id = self.request.id
pedimentos = obtener_pedimentos(organizacion_id)
total_pedimentos = pedimentos.count()
publish_task_event(task_id, "processing", f"Creando partidas: {total_pedimentos} pedimentos", progress=0)
completados = []
con_pendientes = []
sin_datos = []
errores = []
for idx, pedimento in enumerate(pedimentos):
try:
if not pedimento.numero_partidas or pedimento.numero_partidas <= 0:
sin_datos.append({
'pedimento_id': str(pedimento.id),
'pedimento': pedimento.pedimento,
'razon': f'numero_partidas inválido ({pedimento.numero_partidas})',
})
continue
for i in range(1, pedimento.numero_partidas + 1):
Partida.objects.get_or_create(
pedimento=pedimento,
numero_partida=i,
defaults={'organizacion_id': organizacion_id}
)
partidas = list(pedimento.partidas.order_by('numero_partida'))
no_descargadas = [p.numero_partida for p in partidas if not p.descargado]
if not no_descargadas:
completados.append(str(pedimento.id))
else:
con_pendientes.append({
'pedimento_id': str(pedimento.id),
'pedimento': pedimento.pedimento,
'total_partidas': len(partidas),
'descargadas': len(partidas) - len(no_descargadas),
'no_descargadas': no_descargadas,
})
except Exception as e:
errores.append({
'pedimento_id': str(pedimento.id),
'pedimento': pedimento.pedimento,
'error': str(e),
})
logger.error(f"Error creando partidas para pedimento {pedimento.id}: {e}")
if total_pedimentos > 0 and (idx + 1) % 10 == 0:
pct = int(((idx + 1) / total_pedimentos) * 100)
publish_task_event(task_id, "processing", f"Creando partidas: {idx + 1}/{total_pedimentos}", progress=pct)
resultado = {
'organizacion_id': str(organizacion_id),
'auditoria': 'partidas',
'total_pedimentos': total_pedimentos,
'completados': len(completados),
'con_pendientes': len(con_pendientes),
'sin_datos': len(sin_datos),
'con_errores': len(errores),
'detalle_pendientes': con_pendientes,
'detalle_sin_datos': sin_datos,
'detalle_errores': errores,
}
publish_task_event(task_id, "completed", "Creación de partidas completada", resultado=resultado, progress=100)
if user_id:
_crear_notificacion_auditoria(user_id, task_id, "Partidas", resultado)
return resultado
@shared_task
def crear_partidas_por_pedimento(pedimento_id):
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
except Pedimento.DoesNotExist:
print(f"Error: Pedimento con ID {pedimento_id} no encontrado")
return
print(f"Procesando pedimento individual {pedimento_id}...")
logger.info(f"Procesando pedimento individual {pedimento_id}...")
partidas_agregadas = 0
# Validar que numero_partidas no sea None y sea mayor que 0
if pedimento.numero_partidas is not None and pedimento.numero_partidas > 0:
partidas_existentes = pedimento.partidas.count()
if pedimento.numero_partidas > partidas_existentes:
print(f"Pedimento {pedimento_id} - Partidas existentes: {partidas_existentes}, Requeridas: {pedimento.numero_partidas}")
logger.info(f"Pedimento {pedimento_id} - Partidas existentes: {partidas_existentes}, Requeridas: {pedimento.numero_partidas}")
for i in range(1, pedimento.numero_partidas + 1):
from api.customs.models import Partida
partida, created = Partida.objects.get_or_create(
pedimento=pedimento,
numero_partida=i,
organizacion_id=pedimento.organizacion_id
)
if created:
partidas_agregadas += 1
print(f"✓ Partidas agregadas para pedimento {pedimento_id}: {partidas_agregadas}")
logger.info(f"✓ Partidas agregadas para pedimento {pedimento_id}: {partidas_agregadas}")
else:
print(f"Pedimento {pedimento_id} ya tiene todas sus partidas ({partidas_existentes}/{pedimento.numero_partidas})")
logger.info(f"Pedimento {pedimento_id} ya tiene todas sus partidas ({partidas_existentes}/{pedimento.numero_partidas})")
else:
print(f"Error: Pedimento {pedimento_id} tiene numero_partidas inválido: {pedimento.numero_partidas}")
logger.info(f"Error: Pedimento {pedimento_id} tiene numero_partidas inválido: {pedimento.numero_partidas}")
def _auditar_organizacion(organizacion_id, servicio, related_name, variable, label, task_id=None, user_id=None):
"""
Itera todos los pedimentos de una organización auditando el campo `variable`
en la relación `related_name`. Retorna un resumen estructurado por pedimento.
Publica eventos SSE en Redis si se proporciona task_id.
"""
pedimentos = obtener_pedimentos(organizacion_id)
total_pedimentos = pedimentos.count()
if task_id:
publish_task_event(task_id, "processing", f"Auditando {label}: {total_pedimentos} pedimentos", progress=0)
completados = []
pendientes = []
errores = []
for idx, pedimento in enumerate(pedimentos):
try:
docs = list(getattr(pedimento, related_name).all())
total = len(docs)
faltantes = [
getattr(doc, 'numero_cove', None) or getattr(doc, 'numero_edocument', None)
for doc in docs if not getattr(doc, variable)
]
if total == 0 or len(faltantes) == 0:
nuevo_estado = 3
completados.append(str(pedimento.id))
else:
nuevo_estado = 4
pendientes.append({
'pedimento_id': str(pedimento.id),
'pedimento': pedimento.pedimento,
f'faltantes_{label}': faltantes,
'total': total,
'descargados': total - len(faltantes),
})
modificar_estado_procesamiento(pedimento, servicio_id=servicio, nuevo_estado=nuevo_estado)
# Publicar progreso cada 10 pedimentos para no saturar Redis
if task_id and total_pedimentos > 0 and (idx + 1) % 10 == 0:
pct = int(((idx + 1) / total_pedimentos) * 100)
publish_task_event(task_id, "processing", f"Auditando {label}: {idx + 1}/{total_pedimentos}", progress=pct)
except Exception as e:
errores.append({
'pedimento_id': str(pedimento.id),
'pedimento': pedimento.pedimento,
'error': str(e),
})
logger.error(f"Error auditando pedimento {pedimento.id} [{label}]: {e}")
resultado = {
'organizacion_id': str(organizacion_id),
'auditoria': label,
'total_pedimentos': total_pedimentos,
'completados': len(completados),
'con_pendientes': len(pendientes),
'con_errores': len(errores),
'detalle_pendientes': pendientes,
'detalle_errores': errores,
}
if task_id:
publish_task_event(task_id, "completed", f"Auditoría de {label} completada", resultado=resultado, progress=100)
if user_id:
_crear_notificacion_auditoria(user_id, task_id, label, resultado)
return resultado
@shared_task(bind=True)
def auditar_coves(self, organizacion_id, user_id=None):
return _auditar_organizacion(
organizacion_id,
servicio=8,
related_name='coves',
variable='cove_descargado',
label='cove',
task_id=self.request.id,
user_id=user_id,
)
@shared_task(bind=True)
def auditar_acuse_cove(self, organizacion_id, user_id=None):
return _auditar_organizacion(
organizacion_id,
servicio=9,
related_name='coves',
variable='acuse_cove_descargado',
label='acuse_cove',
task_id=self.request.id,
user_id=user_id,
)
@shared_task(bind=True)
def auditar_edocuments(self, organizacion_id, user_id=None):
return _auditar_organizacion(
organizacion_id,
servicio=7,
related_name='documentos',
variable='edocument_descargado',
label='edocument',
task_id=self.request.id,
user_id=user_id,
)
@shared_task(bind=True)
def auditar_acuse(self, organizacion_id, user_id=None):
return _auditar_organizacion(
organizacion_id,
servicio=6,
related_name='documentos',
variable='acuse_descargado',
label='acuse',
task_id=self.request.id,
user_id=user_id,
)
@shared_task(bind=True)
def auditar_remesas(self, organizacion_id, user_id=None):
"""
Audita el estado de descarga de remesas para todos los pedimentos de una organización.
A diferencia de coves/edocuments, las remesas no tienen campo booleano propio —
se verifica la existencia de un documento de tipo 3 (Remesa) en el pedimento.
"""
task_id = self.request.id
pedimentos = obtener_pedimentos(organizacion_id)
total_pedimentos = pedimentos.count()
if task_id:
publish_task_event(task_id, "processing", f"Auditando remesas: {total_pedimentos} pedimentos", progress=0)
completados = []
pendientes = []
errores = []
for idx, pedimento in enumerate(pedimentos):
try:
if not pedimento.remesas:
modificar_estado_procesamiento(pedimento, servicio_id=5, nuevo_estado=3)
completados.append(str(pedimento.id))
elif pedimento.documents.filter(document_type=3).exists():
modificar_estado_procesamiento(pedimento, servicio_id=5, nuevo_estado=3)
completados.append(str(pedimento.id))
else:
modificar_estado_procesamiento(pedimento, servicio_id=5, nuevo_estado=4)
pendientes.append({
'pedimento_id': str(pedimento.id),
'pedimento': pedimento.pedimento,
})
except Exception as e:
errores.append({
'pedimento_id': str(pedimento.id),
'pedimento': pedimento.pedimento,
'error': str(e),
})
logger.error(f"Error auditando remesa de pedimento {pedimento.id}: {e}")
if task_id and total_pedimentos > 0 and (idx + 1) % 10 == 0:
pct = int(((idx + 1) / total_pedimentos) * 100)
publish_task_event(task_id, "processing", f"Auditando remesas: {idx + 1}/{total_pedimentos}", progress=pct)
resultado = {
'organizacion_id': str(organizacion_id),
'auditoria': 'remesa',
'total_pedimentos': total_pedimentos,
'completados': len(completados),
'con_pendientes': len(pendientes),
'con_errores': len(errores),
'detalle_pendientes': pendientes,
'detalle_errores': errores,
}
if task_id:
publish_task_event(task_id, "completed", "Auditoría de remesas completada", resultado=resultado, progress=100)
if user_id:
_crear_notificacion_auditoria(user_id, task_id, "Remesas", resultado)
return resultado
@shared_task
def auditar_cove_por_pedimento(pedimento_id):
try:
print(f"auditar_cove_por_pedimento >>>> {pedimento_id}")
logger.info(f"auditar_cove_por_pedimento >>>> {pedimento_id}")
from api.customs.models import Pedimento
pedimento = Pedimento.objects.get(id=pedimento_id)
auditor_descargas(
pedimento,
servicio=8,
related_name='coves',
variable='cove_descargado',
mensaje='COVE'
)
return {'success': True, 'pedimento_id': str(pedimento_id)}
except Exception as e:
return {'success': False, 'error': str(e), 'pedimento_id': str(pedimento_id)}
@shared_task
def auditar_acuse_cove_por_pedimento(pedimento_id):
try:
from api.customs.models import Pedimento
pedimento = Pedimento.objects.get(id=pedimento_id)
auditor_descargas(
pedimento,
servicio=9,
related_name='coves',
variable='acuse_cove_descargado',
mensaje='acuse de COVE'
)
return {'success': True, 'pedimento_id': str(pedimento_id)}
except Exception as e:
return {'success': False, 'error': str(e), 'pedimento_id': str(pedimento_id)}
@shared_task
def auditar_edocument_por_pedimento(pedimento_id):
try:
from api.customs.models import Pedimento
pedimento = Pedimento.objects.get(id=pedimento_id)
auditor_descargas(
pedimento,
servicio=7,
related_name='documentos',
variable='edocument_descargado',
mensaje='EDocument'
)
return {'success': True, 'pedimento_id': str(pedimento_id)}
except Exception as e:
return {'success': False, 'error': str(e), 'pedimento_id': str(pedimento_id)}
@shared_task
def auditar_acuse_por_pedimento(pedimento_id):
try:
from api.customs.models import Pedimento
pedimento = Pedimento.objects.get(id=pedimento_id)
auditor_descargas(
pedimento,
servicio=6,
related_name='documentos',
variable='acuse_descargado',
mensaje='acuse'
)
return {'success': True, 'pedimento_id': str(pedimento_id)}
except Exception as e:
return {'success': False, 'error': str(e), 'pedimento_id': str(pedimento_id)}
@shared_task
def auditar_pedimento_por_id(pedimento_id):
"""
Tarea para auditar un pedimento específico verificando todos sus documentos y datos.
"""
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
resultado = {
'pedimento_id': str(pedimento_id),
'pedimento': pedimento.pedimento,
'pedimento_app': pedimento.pedimento_app,
'organizacion': str(pedimento.organizacion.id),
'fecha_auditoria': datetime.now().isoformat(),
'estado_general': 'EN_PROGRESO',
'detalles': {}
}
# 1. Verificar documentos XML
from api.record.models import Document
documentos_xml = Document.objects.filter(
pedimento=pedimento,
archivo__endswith='.xml'
)
resultado['detalles']['documentos_xml'] = {
'total': documentos_xml.count(),
'archivos': []
}
for doc in documentos_xml:
try:
xml_info = {
'id': str(doc.id),
'nombre': os.path.basename(doc.archivo.name),
'tamanio': doc.size,
'extension': doc.extension,
'tipo': doc.document_type.descripcion if doc.document_type else 'Desconocido'
}
# Verificar si el archivo existe físicamente
if os.path.exists(doc.archivo.path):
xml_info['existe_fisicamente'] = True
# Intentar leer el XML
try:
with open(doc.archivo.path, 'r', encoding='utf-8') as f:
content = f.read()
xml_info['es_xml_valido'] = '<?xml' in content[:100]
xml_info['tamanio_bytes'] = len(content)
except Exception as e:
xml_info['error_lectura'] = str(e)
else:
xml_info['existe_fisicamente'] = False
except Exception as e:
xml_info['error'] = str(e)
resultado['detalles']['documentos_xml']['archivos'].append(xml_info)
# 2. Verificar si hay documentos asociados
resultado['detalles']['documentos_totales'] = {
'total': pedimento.documents.count(),
'por_tipo': {}
}
for doc_type in pedimento.documents.values('document_type__descripcion').annotate(total=models.Count('id')):
tipo = doc_type['document_type__descripcion'] or 'Sin tipo'
resultado['detalles']['documentos_totales']['por_tipo'][tipo] = doc_type['total']
# 3. Verificar COVEs
resultado['detalles']['coves'] = {
'total': pedimento.coves.count(),
'descargados': pedimento.coves.filter(cove_descargado=True).count(),
'con_acuse': pedimento.coves.filter(acuse_cove_descargado=True).count()
}
# 4. Verificar EDocuments
resultado['detalles']['edocuments'] = {
'total': pedimento.documentos.count(),
'descargados': pedimento.documentos.filter(edocument_descargado=True).count(),
'con_acuse': pedimento.documentos.filter(acuse_descargado=True).count()
}
# 5. Verificar procesamientos
resultado['detalles']['procesamientos'] = {
'total': pedimento.procesamientos.count(),
'por_estado': {}
}
for proc in pedimento.procesamientos.values('estado__estado').annotate(total=models.Count('id')):
estado = proc['estado__estado'] or 'Sin estado'
resultado['detalles']['procesamientos']['por_estado'][estado] = proc['total']
# 6. Verificar campos importantes del pedimento
campos_revisados = {
'numero_operacion': bool(pedimento.numero_operacion),
'numero_partidas': bool(pedimento.numero_partidas),
'importe_total': bool(pedimento.importe_total),
'contribuyente': bool(pedimento.contribuyente),
'tiene_remesas': pedimento.remesas,
'partidas_creadas': pedimento.partidas.count() > 0,
'fecha_pago': bool(pedimento.fecha_pago)
}
resultado['detalles']['campos_pedimento'] = campos_revisados
resultado['detalles']['campos_completos'] = sum(campos_revisados.values())
resultado['detalles']['campos_totales'] = len(campos_revisados)
# 7. Determinar estado general
campos_completos = resultado['detalles']['campos_completos']
total_campos = resultado['detalles']['campos_totales']
if documentos_xml.count() == 0:
resultado['estado_general'] = 'SIN_XML'
resultado['mensaje'] = 'No se encontraron documentos XML'
elif campos_completos == total_campos:
resultado['estado_general'] = 'COMPLETO'
resultado['mensaje'] = 'Pedimento completamente procesado'
elif campos_completos >= total_campos * 0.7:
resultado['estado_general'] = 'PARCIAL'
resultado['mensaje'] = 'Pedimento parcialmente procesado'
else:
resultado['estado_general'] = 'INCOMPLETO'
resultado['mensaje'] = 'Pedimento con información incompleta'
resultado['porcentaje_completitud'] = (campos_completos / total_campos) * 100 if total_campos > 0 else 0
# 8. Sugerencias
sugerencias = []
if not pedimento.numero_operacion:
sugerencias.append("Falta el número de operación")
if not pedimento.numero_partidas:
sugerencias.append("Falta el número de partidas")
if pedimento.numero_partidas and pedimento.numero_partidas > pedimento.partidas.count():
sugerencias.append(f"Faltan partidas: {pedimento.numero_partidas - pedimento.partidas.count()} de {pedimento.numero_partidas}")
if not pedimento.contribuyente:
sugerencias.append("Falta el contribuyente asociado")
resultado['sugerencias'] = sugerencias
return resultado
except Pedimento.DoesNotExist:
return {
'error': f'Pedimento con ID {pedimento_id} no encontrado',
'pedimento_id': str(pedimento_id)
}
except Exception as e:
return {
'error': f'Error auditar pedimento {pedimento_id}: {str(e)}',
'pedimento_id': str(pedimento_id)
}