478 lines
19 KiB
Python
478 lines
19 KiB
Python
"""
|
||
Tarea Celery: auto-corrección de pedimentos incompletos a partir de sus XMLs.
|
||
|
||
Busca pedimentos con consultar_vucem=False, analiza su documento XML más reciente
|
||
en busca de una respuesta consultarPedimentoCompleto de VUCEM, y si el número de
|
||
pedimento coincide, auto-corrige los campos faltantes en BD y reclasifica el documento.
|
||
|
||
Campos corregidos (solo si están vacíos/nulos en BD):
|
||
numero_operacion, aduana, clave_pedimento, regimen, contribuyente (por RFC).
|
||
|
||
Acciones sobre el documento si el tipo no es 2 (Pedimento Completo):
|
||
- Renombra el archivo en MinIO: vu_PC_{pedimento_app}.xml
|
||
- Actualiza document_type_id → 2
|
||
- Actualiza vu → False (tipo 2 no es VUCEM directo)
|
||
|
||
Al finalizar activa consultar_vucem=True en el pedimento.
|
||
"""
|
||
import io
|
||
import logging
|
||
import posixpath
|
||
import xml.etree.ElementTree as ET
|
||
|
||
from celery import shared_task
|
||
from django.db import transaction
|
||
|
||
from api.customs.models import Importador, Pedimento, Regimen
|
||
from api.record.models import Document
|
||
from api.utils.minio_client import minio_client
|
||
from core.redis_events import publish_task_event
|
||
|
||
logger = logging.getLogger('api.customs.tasks.auto_corregir')
|
||
|
||
_DOC_TYPE_PC = 2 # Pedimento Completo (ya procesado — no volver a procesar)
|
||
_PROGRESS_INTERVAL = 10 # Emitir progreso cada N pedimentos
|
||
|
||
# Tipos excluidos de la búsqueda:
|
||
# 1 = Pedimento Partida (no contiene respuesta PC)
|
||
# 2 = Pedimento Completo (ya procesado)
|
||
# 13–26 = Tipos VUCEM: requests, errors de VU (peticiones salientes, no respuestas de contenido)
|
||
_EXCLUDE_DOC_TYPES = frozenset(range(13, 27)) | {1, _DOC_TYPE_PC}
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Helpers XML (namespace-agnostic)
|
||
# ──────────────────────────────────────────────
|
||
|
||
def _local(tag):
|
||
return tag.split('}')[-1] if '}' in tag else tag
|
||
|
||
|
||
def _find_text(root, local_name):
|
||
"""Primer elemento con ese nombre local; retorna su texto o None."""
|
||
for el in root.iter():
|
||
if _local(el.tag) == local_name:
|
||
text = (el.text or '').strip()
|
||
return text or None
|
||
return None
|
||
|
||
|
||
def _find_child_text(root, parent_name, child_name):
|
||
"""Texto del hijo directo child_name dentro del primer parent_name encontrado."""
|
||
for el in root.iter():
|
||
if _local(el.tag) == parent_name:
|
||
for child in el:
|
||
if _local(child.tag) == child_name:
|
||
text = (child.text or '').strip()
|
||
return text or None
|
||
return None
|
||
|
||
|
||
def _find_pedimento_number(root):
|
||
"""
|
||
Extrae el número de pedimento de la estructura anidada:
|
||
<ns2:pedimento> ← contenedor
|
||
<ns2:pedimento>XXXX</ns2:pedimento> ← número
|
||
"""
|
||
for el in root.iter():
|
||
if _local(el.tag) == 'pedimento':
|
||
for child in el:
|
||
if _local(child.tag) == 'pedimento':
|
||
text = (child.text or '').strip()
|
||
return text or None
|
||
return None
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Helpers MinIO
|
||
# ──────────────────────────────────────────────
|
||
|
||
def _read_from_minio(object_name):
|
||
if not minio_client.file_exists(object_name):
|
||
return None
|
||
response = minio_client._client.get_object(minio_client._bucket_name, object_name)
|
||
try:
|
||
return response.read()
|
||
finally:
|
||
response.close()
|
||
response.release_conn()
|
||
|
||
|
||
def _rename_in_minio(old_name, new_name, content):
|
||
if old_name == new_name:
|
||
return old_name
|
||
# Si ya existe en destino (ejecución previa parcial): limpiar origen
|
||
if minio_client.file_exists(new_name):
|
||
if minio_client.file_exists(old_name):
|
||
minio_client.delete_file(old_name)
|
||
return new_name
|
||
minio_client.upload_file(new_name, file_data=io.BytesIO(content), content_type='application/xml')
|
||
minio_client.delete_file(old_name)
|
||
return new_name
|
||
|
||
|
||
def _resolve_regimen(clave_pedimento, tipo_operacion_raw):
|
||
"""
|
||
Convierte clave_documento + tipo_operacion del XML al código de régimen,
|
||
replicando la lógica de carga de datastage:
|
||
Regimen.objects.filter(claveped=clave_pedimento, tipo=tipo_int).regimenped
|
||
"""
|
||
if not clave_pedimento or not tipo_operacion_raw:
|
||
return None
|
||
try:
|
||
tipo_int = int(tipo_operacion_raw)
|
||
except (ValueError, TypeError):
|
||
return None
|
||
regimen_obj = Regimen.objects.filter(claveped=clave_pedimento, tipo=tipo_int).first()
|
||
return regimen_obj.regimenped if regimen_obj else None
|
||
|
||
|
||
def _find_pc_document(pedimento):
|
||
"""
|
||
Busca entre los XMLs del pedimento el primero que contenga una respuesta
|
||
consultarPedimentoCompleto de VUCEM.
|
||
|
||
Tipos incluidos: 3–12 (documentos de contenido: pedimento, remesas, acuse,
|
||
edocument, estado, cove, digitalizacion, error, general).
|
||
Tipos excluidos: 1 (partida), 2 (ya procesado), 13–26 (peticiones/errores VU).
|
||
|
||
Retorna (doc, content_bytes, object_name, hay_candidatos):
|
||
- hay_candidatos=False → ningún XML candidato en BD
|
||
- hay_candidatos=True, doc=None → hay XMLs pero ninguno es respuesta PC
|
||
- doc!=None → encontrado
|
||
"""
|
||
qs = (
|
||
Document.objects.filter(
|
||
pedimento=pedimento,
|
||
archivo__iendswith='.xml',
|
||
)
|
||
.exclude(document_type_id__in=_EXCLUDE_DOC_TYPES)
|
||
.order_by('-created_at')
|
||
)
|
||
|
||
hay_candidatos = False
|
||
for doc in qs:
|
||
if not doc.archivo:
|
||
continue
|
||
hay_candidatos = True
|
||
object_name = doc.archivo.name
|
||
try:
|
||
content = _read_from_minio(object_name)
|
||
except Exception as exc:
|
||
logger.debug(f"[find_pc] {pedimento.pedimento_app} — error MinIO {object_name}: {exc}")
|
||
continue
|
||
if not content:
|
||
continue
|
||
if b'consultarPedimentoCompletoRespuesta' in content:
|
||
return doc, content, object_name, True
|
||
|
||
return None, None, None, hay_candidatos
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Tarea principal
|
||
# ──────────────────────────────────────────────
|
||
|
||
@shared_task(bind=True, name='auto_corregir_pedamentos')
|
||
def auto_corregir_pedamentos_task(self, organizacion_id, pedimento_id=None):
|
||
"""
|
||
Itera pedimentos con consultar_vucem=False de la organización.
|
||
Si se proporciona pedimento_id, procesa solo ese pedimento.
|
||
Por cada uno verifica si tiene un XML de pedimento completo válido
|
||
y corrige BD + storage.
|
||
"""
|
||
task_id = self.request.id
|
||
revisados = 0
|
||
corregidos = 0
|
||
ignorados = 0
|
||
detalles = []
|
||
|
||
qs = Pedimento.objects.filter(consultar_vucem=False).order_by('pedimento_app')
|
||
if pedimento_id:
|
||
qs = qs.filter(id=pedimento_id)
|
||
else:
|
||
qs = qs.filter(organizacion_id=organizacion_id)
|
||
|
||
total = qs.count()
|
||
logger.info(f"[auto_corregir] org={organizacion_id} — {total} pedimentos a revisar")
|
||
|
||
publish_task_event(task_id, 'processing', f'Iniciando: {total} pedimentos a revisar', progress=0)
|
||
|
||
for idx, pedimento in enumerate(qs.iterator(chunk_size=100)):
|
||
revisados += 1
|
||
|
||
if total > 0 and (idx % _PROGRESS_INTERVAL == 0 or idx == total - 1):
|
||
pct = int(((idx + 1) / total) * 95)
|
||
publish_task_event(
|
||
task_id, 'processing',
|
||
f'Revisando {idx + 1}/{total}: {pedimento.pedimento_app}',
|
||
progress=pct,
|
||
)
|
||
|
||
# Buscar XML con respuesta de pedimento completo (evalúa todos, VUCEM primero)
|
||
try:
|
||
candidato, content, object_name, hay_candidatos = _find_pc_document(pedimento)
|
||
except Exception as exc:
|
||
logger.warning(f"[auto_corregir] {pedimento.pedimento_app} — error buscando PC: {exc}")
|
||
ignorados += 1
|
||
continue
|
||
|
||
if not candidato:
|
||
ignorados += 1
|
||
continue
|
||
|
||
try:
|
||
root = ET.fromstring(content)
|
||
except ET.ParseError as exc:
|
||
logger.warning(f"[auto_corregir] {pedimento.pedimento_app} — XML inválido: {exc}")
|
||
ignorados += 1
|
||
continue
|
||
|
||
tiene_error = _find_text(root, 'tieneError')
|
||
if tiene_error and tiene_error.lower() == 'true':
|
||
ignorados += 1
|
||
continue
|
||
|
||
pedimento_xml = _find_pedimento_number(root)
|
||
pedimento_bd = (pedimento.pedimento or '').strip()
|
||
if not pedimento_xml or pedimento_xml != pedimento_bd:
|
||
logger.info(
|
||
f"[auto_corregir] {pedimento.pedimento_app} — número no coincide "
|
||
f"(XML={pedimento_xml!r}, BD={pedimento_bd!r})"
|
||
)
|
||
ignorados += 1
|
||
continue
|
||
|
||
# ── Extracción de campos ──────────────────
|
||
numero_operacion = _find_text(root, 'numeroOperacion')
|
||
aduana = _find_child_text(root, 'aduanaEntradaSalida', 'clave')
|
||
clave_pedimento = _find_child_text(root, 'claveDocumento', 'clave')
|
||
tipo_operacion_raw = _find_child_text(root, 'tipoOperacion', 'clave')
|
||
regimen = _resolve_regimen(clave_pedimento, tipo_operacion_raw)
|
||
rfc = _find_child_text(root, 'importadorExportador', 'rfc')
|
||
|
||
ped_fields = []
|
||
if numero_operacion and not pedimento.numero_operacion:
|
||
pedimento.numero_operacion = numero_operacion
|
||
ped_fields.append('numero_operacion')
|
||
if aduana and aduana != (pedimento.aduana or '').strip():
|
||
pedimento.aduana = aduana
|
||
ped_fields.append('aduana')
|
||
if clave_pedimento and clave_pedimento != (pedimento.clave_pedimento or '').strip():
|
||
pedimento.clave_pedimento = clave_pedimento
|
||
ped_fields.append('clave_pedimento')
|
||
if regimen and not pedimento.regimen:
|
||
pedimento.regimen = regimen
|
||
ped_fields.append('regimen')
|
||
|
||
if rfc:
|
||
try:
|
||
importador = Importador.objects.get(rfc=rfc)
|
||
if pedimento.contribuyente_id != importador.rfc:
|
||
pedimento.contribuyente_id = importador.rfc
|
||
ped_fields.append('contribuyente')
|
||
except Importador.DoesNotExist:
|
||
pass
|
||
|
||
pedimento.consultar_vucem = True
|
||
ped_fields.append('consultar_vucem')
|
||
|
||
# ── Renombrado de documento si no es tipo 2 ──
|
||
doc_fields = ['document_type_id', 'vu']
|
||
final_object_name = object_name
|
||
|
||
if candidato.document_type_id != _DOC_TYPE_PC:
|
||
dir_part = posixpath.dirname(object_name)
|
||
new_filename = f"vu_PC_{pedimento.pedimento_app}.xml"
|
||
new_object_name = posixpath.join(dir_part, new_filename)
|
||
try:
|
||
final_object_name = _rename_in_minio(object_name, new_object_name, content)
|
||
doc_fields.append('archivo')
|
||
except Exception as exc:
|
||
logger.error(f"[auto_corregir] {pedimento.pedimento_app} — error renombrando en MinIO: {exc}")
|
||
|
||
# ── Persistir cambios en BD ───────────────
|
||
try:
|
||
with transaction.atomic():
|
||
pedimento.save(update_fields=ped_fields)
|
||
candidato.document_type_id = _DOC_TYPE_PC
|
||
candidato.vu = False
|
||
if 'archivo' in doc_fields:
|
||
candidato.archivo = final_object_name
|
||
candidato.save(update_fields=doc_fields)
|
||
except Exception as exc:
|
||
logger.error(f"[auto_corregir] {pedimento.pedimento_app} — error guardando en BD: {exc}")
|
||
ignorados += 1
|
||
continue
|
||
|
||
corregidos += 1
|
||
detalles.append({
|
||
'pedimento': pedimento.pedimento_app,
|
||
'accion': 'corregido',
|
||
'campos_pedimento': ped_fields,
|
||
'documento_final': final_object_name,
|
||
})
|
||
logger.info(f"[auto_corregir] {pedimento.pedimento_app} — corregido: {ped_fields}")
|
||
|
||
# Modo individual: encolar el procesamiento completo (remesas, partidas,
|
||
# coves, edocs) forzando aunque ya exista el documento tipo 2.
|
||
if pedimento_id:
|
||
try:
|
||
from .microservice_v2 import procesar_pedimento_completo_individual
|
||
procesar_pedimento_completo_individual.delay(str(pedimento.id), force=True)
|
||
logger.info(f"[auto_corregir] {pedimento.pedimento_app} — PC completo encolado (force)")
|
||
except Exception as exc:
|
||
logger.warning(f"[auto_corregir] {pedimento.pedimento_app} — no se pudo encolar PC: {exc}")
|
||
|
||
resultado = {
|
||
'total_revisados': revisados,
|
||
'corregidos': corregidos,
|
||
'ignorados': ignorados,
|
||
'detalles': detalles,
|
||
}
|
||
logger.info(f"[auto_corregir] org={organizacion_id} finalizado — {resultado}")
|
||
|
||
publish_task_event(task_id, 'completed', 'Auto-corrección finalizada', resultado=resultado, progress=100)
|
||
return resultado
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Tarea de análisis (sin modificar nada)
|
||
# ──────────────────────────────────────────────
|
||
|
||
def _campos_a_corregir(pedimento, numero_operacion, aduana, clave_pedimento, regimen, rfc):
|
||
"""Retorna la lista de campos que se corregirían y los valores que se asignarían."""
|
||
campos = []
|
||
if numero_operacion and not pedimento.numero_operacion:
|
||
campos.append({'campo': 'numero_operacion', 'valor_actual': None, 'valor_nuevo': numero_operacion})
|
||
if aduana and aduana != (pedimento.aduana or '').strip():
|
||
campos.append({'campo': 'aduana', 'valor_actual': pedimento.aduana, 'valor_nuevo': aduana})
|
||
if clave_pedimento and clave_pedimento != (pedimento.clave_pedimento or '').strip():
|
||
campos.append({'campo': 'clave_pedimento', 'valor_actual': pedimento.clave_pedimento, 'valor_nuevo': clave_pedimento})
|
||
if regimen and not pedimento.regimen:
|
||
campos.append({'campo': 'regimen', 'valor_actual': None, 'valor_nuevo': regimen})
|
||
if rfc:
|
||
try:
|
||
importador = Importador.objects.get(rfc=rfc)
|
||
if pedimento.contribuyente_id != importador.rfc:
|
||
campos.append({
|
||
'campo': 'contribuyente',
|
||
'valor_actual': pedimento.contribuyente_id,
|
||
'valor_nuevo': rfc,
|
||
})
|
||
except Importador.DoesNotExist:
|
||
pass
|
||
return campos
|
||
|
||
|
||
@shared_task(bind=True, name='auditar_pedamentos_incompletos')
|
||
def auditar_pedamentos_incompletos_task(self, organizacion_id, pedimento_id=None):
|
||
"""
|
||
Análisis de solo lectura: reporta qué pedimentos serían corregidos y qué
|
||
cambios se aplicarían, sin modificar BD ni storage.
|
||
Si se proporciona pedimento_id, analiza solo ese pedimento.
|
||
"""
|
||
task_id = self.request.id
|
||
revisados = 0
|
||
corregibles = []
|
||
sin_xml = 0
|
||
xml_sin_pc = 0
|
||
num_no_coincide = 0
|
||
con_error_vucem = 0
|
||
|
||
# Individual: analiza el pedimento específico sin importar su estado de corrección.
|
||
# Masivo: solo los pendientes (consultar_vucem=False).
|
||
if pedimento_id:
|
||
qs = Pedimento.objects.filter(id=pedimento_id).order_by('pedimento_app')
|
||
else:
|
||
qs = Pedimento.objects.filter(
|
||
organizacion_id=organizacion_id, consultar_vucem=False
|
||
).order_by('pedimento_app')
|
||
|
||
total = qs.count()
|
||
logger.info(f"[auditar_incompletos] org={organizacion_id} — {total} pedimentos a analizar")
|
||
|
||
publish_task_event(task_id, 'processing', f'Iniciando análisis: {total} pedimentos', progress=0)
|
||
|
||
for idx, pedimento in enumerate(qs.iterator(chunk_size=100)):
|
||
revisados += 1
|
||
|
||
if total > 0 and (idx % _PROGRESS_INTERVAL == 0 or idx == total - 1):
|
||
pct = int(((idx + 1) / total) * 95)
|
||
publish_task_event(
|
||
task_id, 'processing',
|
||
f'Analizando {idx + 1}/{total}: {pedimento.pedimento_app}',
|
||
progress=pct,
|
||
)
|
||
|
||
# Buscar XML con respuesta de pedimento completo (evalúa todos, VUCEM primero)
|
||
try:
|
||
candidato, content, object_name, hay_candidatos = _find_pc_document(pedimento)
|
||
except Exception as exc:
|
||
logger.warning(f"[auditar_incompletos] {pedimento.pedimento_app} — error buscando PC: {exc}")
|
||
sin_xml += 1
|
||
continue
|
||
|
||
if not candidato:
|
||
if hay_candidatos:
|
||
xml_sin_pc += 1
|
||
else:
|
||
sin_xml += 1
|
||
continue
|
||
|
||
try:
|
||
root = ET.fromstring(content)
|
||
except ET.ParseError:
|
||
xml_sin_pc += 1
|
||
continue
|
||
|
||
tiene_error = _find_text(root, 'tieneError')
|
||
if tiene_error and tiene_error.lower() == 'true':
|
||
con_error_vucem += 1
|
||
continue
|
||
|
||
pedimento_xml = _find_pedimento_number(root)
|
||
pedimento_bd = (pedimento.pedimento or '').strip()
|
||
if not pedimento_xml or pedimento_xml != pedimento_bd:
|
||
num_no_coincide += 1
|
||
continue
|
||
|
||
numero_operacion = _find_text(root, 'numeroOperacion')
|
||
aduana = _find_child_text(root, 'aduanaEntradaSalida', 'clave')
|
||
clave_pedimento = _find_child_text(root, 'claveDocumento', 'clave')
|
||
tipo_operacion_raw = _find_child_text(root, 'tipoOperacion', 'clave')
|
||
regimen = _resolve_regimen(clave_pedimento, tipo_operacion_raw)
|
||
rfc = _find_child_text(root, 'importadorExportador', 'rfc')
|
||
|
||
campos = _campos_a_corregir(pedimento, numero_operacion, aduana, clave_pedimento, regimen, rfc)
|
||
|
||
dir_part = posixpath.dirname(object_name)
|
||
nombre_pc = posixpath.join(dir_part, f"vu_PC_{pedimento.pedimento_app}.xml")
|
||
|
||
corregibles.append({
|
||
'pedimento_app': pedimento.pedimento_app,
|
||
'pedimento_id': str(pedimento.id),
|
||
'documento_actual': {
|
||
'id': str(candidato.id),
|
||
'archivo': object_name,
|
||
'document_type_id': candidato.document_type_id,
|
||
},
|
||
'documento_nuevo_nombre': nombre_pc if candidato.document_type_id != _DOC_TYPE_PC else None,
|
||
'campos_a_corregir': campos,
|
||
'consultar_vucem': True,
|
||
})
|
||
|
||
resultado = {
|
||
'total_revisados': revisados,
|
||
'corregibles': len(corregibles),
|
||
'sin_xml_o_ilegible': sin_xml,
|
||
'xml_no_es_pedimento_completo': xml_sin_pc,
|
||
'numero_pedimento_no_coincide': num_no_coincide,
|
||
'con_error_vucem': con_error_vucem,
|
||
'pedimentos': corregibles,
|
||
}
|
||
logger.info(f"[auditar_incompletos] org={organizacion_id} finalizado — {resultado}")
|
||
|
||
publish_task_event(task_id, 'completed', 'Análisis finalizado', resultado=resultado, progress=100)
|
||
return resultado
|