""" Tarea Celery: auto-corrección de pedimentos incompletos a partir de sus XMLs. Busca pedimentos con consultar_vucem=False, analiza su documento XML más reciente en busca de una respuesta consultarPedimentoCompleto de VUCEM, y si el número de pedimento coincide, auto-corrige los campos faltantes en BD y reclasifica el documento. Campos corregidos (solo si están vacíos/nulos en BD): numero_operacion, aduana, clave_pedimento, regimen, contribuyente (por RFC). Acciones sobre el documento si el tipo no es 2 (Pedimento Completo): - Renombra el archivo en MinIO: vu_PC_{pedimento_app}.xml - Actualiza document_type_id → 2 - Actualiza vu → False (tipo 2 no es VUCEM directo) Al finalizar activa consultar_vucem=True en el pedimento. """ import io import logging import posixpath import xml.etree.ElementTree as ET from celery import shared_task from django.db import transaction from api.customs.models import Importador, Pedimento, Regimen from api.record.models import Document from api.utils.minio_client import minio_client from core.redis_events import publish_task_event logger = logging.getLogger('api.customs.tasks.auto_corregir') _DOC_TYPE_PC = 2 # Pedimento Completo (ya procesado — no volver a procesar) _PROGRESS_INTERVAL = 10 # Emitir progreso cada N pedimentos # Tipos excluidos de la búsqueda: # 1 = Pedimento Partida (no contiene respuesta PC) # 2 = Pedimento Completo (ya procesado) # 13–26 = Tipos VUCEM: requests, errors de VU (peticiones salientes, no respuestas de contenido) _EXCLUDE_DOC_TYPES = frozenset(range(13, 27)) | {1, _DOC_TYPE_PC} # ────────────────────────────────────────────── # Helpers XML (namespace-agnostic) # ────────────────────────────────────────────── def _local(tag): return tag.split('}')[-1] if '}' in tag else tag def _find_text(root, local_name): """Primer elemento con ese nombre local; retorna su texto o None.""" for el in root.iter(): if _local(el.tag) == local_name: text = (el.text or '').strip() return text or None return None def _find_child_text(root, parent_name, child_name): """Texto del hijo directo child_name dentro del primer parent_name encontrado.""" for el in root.iter(): if _local(el.tag) == parent_name: for child in el: if _local(child.tag) == child_name: text = (child.text or '').strip() return text or None return None def _find_pedimento_number(root): """ Extrae el número de pedimento de la estructura anidada: ← contenedor XXXX ← número """ for el in root.iter(): if _local(el.tag) == 'pedimento': for child in el: if _local(child.tag) == 'pedimento': text = (child.text or '').strip() return text or None return None # ────────────────────────────────────────────── # Helpers MinIO # ────────────────────────────────────────────── def _read_from_minio(object_name): if not minio_client.file_exists(object_name): return None response = minio_client._client.get_object(minio_client._bucket_name, object_name) try: return response.read() finally: response.close() response.release_conn() def _rename_in_minio(old_name, new_name, content): if old_name == new_name: return old_name # Si ya existe en destino (ejecución previa parcial): limpiar origen if minio_client.file_exists(new_name): if minio_client.file_exists(old_name): minio_client.delete_file(old_name) return new_name minio_client.upload_file(new_name, file_data=io.BytesIO(content), content_type='application/xml') minio_client.delete_file(old_name) return new_name def _resolve_regimen(clave_pedimento, tipo_operacion_raw): """ Convierte clave_documento + tipo_operacion del XML al código de régimen, replicando la lógica de carga de datastage: Regimen.objects.filter(claveped=clave_pedimento, tipo=tipo_int).regimenped """ if not clave_pedimento or not tipo_operacion_raw: return None try: tipo_int = int(tipo_operacion_raw) except (ValueError, TypeError): return None regimen_obj = Regimen.objects.filter(claveped=clave_pedimento, tipo=tipo_int).first() return regimen_obj.regimenped if regimen_obj else None def _find_pc_document(pedimento): """ Busca entre los XMLs del pedimento el primero que contenga una respuesta consultarPedimentoCompleto de VUCEM. Tipos incluidos: 3–12 (documentos de contenido: pedimento, remesas, acuse, edocument, estado, cove, digitalizacion, error, general). Tipos excluidos: 1 (partida), 2 (ya procesado), 13–26 (peticiones/errores VU). Retorna (doc, content_bytes, object_name, hay_candidatos): - hay_candidatos=False → ningún XML candidato en BD - hay_candidatos=True, doc=None → hay XMLs pero ninguno es respuesta PC - doc!=None → encontrado """ qs = ( Document.objects.filter( pedimento=pedimento, archivo__iendswith='.xml', ) .exclude(document_type_id__in=_EXCLUDE_DOC_TYPES) .order_by('-created_at') ) hay_candidatos = False for doc in qs: if not doc.archivo: continue hay_candidatos = True object_name = doc.archivo.name try: content = _read_from_minio(object_name) except Exception as exc: logger.debug(f"[find_pc] {pedimento.pedimento_app} — error MinIO {object_name}: {exc}") continue if not content: continue if b'consultarPedimentoCompletoRespuesta' in content: return doc, content, object_name, True return None, None, None, hay_candidatos # ────────────────────────────────────────────── # Tarea principal # ────────────────────────────────────────────── @shared_task(bind=True, name='auto_corregir_pedamentos') def auto_corregir_pedamentos_task(self, organizacion_id, pedimento_id=None): """ Itera pedimentos con consultar_vucem=False de la organización. Si se proporciona pedimento_id, procesa solo ese pedimento. Por cada uno verifica si tiene un XML de pedimento completo válido y corrige BD + storage. """ task_id = self.request.id revisados = 0 corregidos = 0 ignorados = 0 detalles = [] qs = Pedimento.objects.filter(consultar_vucem=False).order_by('pedimento_app') if pedimento_id: qs = qs.filter(id=pedimento_id) else: qs = qs.filter(organizacion_id=organizacion_id) total = qs.count() logger.info(f"[auto_corregir] org={organizacion_id} — {total} pedimentos a revisar") publish_task_event(task_id, 'processing', f'Iniciando: {total} pedimentos a revisar', progress=0) for idx, pedimento in enumerate(qs.iterator(chunk_size=100)): revisados += 1 if total > 0 and (idx % _PROGRESS_INTERVAL == 0 or idx == total - 1): pct = int(((idx + 1) / total) * 95) publish_task_event( task_id, 'processing', f'Revisando {idx + 1}/{total}: {pedimento.pedimento_app}', progress=pct, ) # Buscar XML con respuesta de pedimento completo (evalúa todos, VUCEM primero) try: candidato, content, object_name, hay_candidatos = _find_pc_document(pedimento) except Exception as exc: logger.warning(f"[auto_corregir] {pedimento.pedimento_app} — error buscando PC: {exc}") ignorados += 1 continue if not candidato: ignorados += 1 continue try: root = ET.fromstring(content) except ET.ParseError as exc: logger.warning(f"[auto_corregir] {pedimento.pedimento_app} — XML inválido: {exc}") ignorados += 1 continue tiene_error = _find_text(root, 'tieneError') if tiene_error and tiene_error.lower() == 'true': ignorados += 1 continue pedimento_xml = _find_pedimento_number(root) pedimento_bd = (pedimento.pedimento or '').strip() if not pedimento_xml or pedimento_xml != pedimento_bd: logger.info( f"[auto_corregir] {pedimento.pedimento_app} — número no coincide " f"(XML={pedimento_xml!r}, BD={pedimento_bd!r})" ) ignorados += 1 continue # ── Extracción de campos ────────────────── numero_operacion = _find_text(root, 'numeroOperacion') aduana = _find_child_text(root, 'aduanaEntradaSalida', 'clave') clave_pedimento = _find_child_text(root, 'claveDocumento', 'clave') tipo_operacion_raw = _find_child_text(root, 'tipoOperacion', 'clave') regimen = _resolve_regimen(clave_pedimento, tipo_operacion_raw) rfc = _find_child_text(root, 'importadorExportador', 'rfc') ped_fields = [] if numero_operacion and not pedimento.numero_operacion: pedimento.numero_operacion = numero_operacion ped_fields.append('numero_operacion') if aduana and aduana != (pedimento.aduana or '').strip(): pedimento.aduana = aduana ped_fields.append('aduana') if clave_pedimento and clave_pedimento != (pedimento.clave_pedimento or '').strip(): pedimento.clave_pedimento = clave_pedimento ped_fields.append('clave_pedimento') if regimen and not pedimento.regimen: pedimento.regimen = regimen ped_fields.append('regimen') if rfc: try: importador = Importador.objects.get(rfc=rfc) if pedimento.contribuyente_id != importador.rfc: pedimento.contribuyente_id = importador.rfc ped_fields.append('contribuyente') except Importador.DoesNotExist: pass pedimento.consultar_vucem = True ped_fields.append('consultar_vucem') # ── Renombrado de documento si no es tipo 2 ── doc_fields = ['document_type_id', 'vu'] final_object_name = object_name if candidato.document_type_id != _DOC_TYPE_PC: dir_part = posixpath.dirname(object_name) new_filename = f"vu_PC_{pedimento.pedimento_app}.xml" new_object_name = posixpath.join(dir_part, new_filename) try: final_object_name = _rename_in_minio(object_name, new_object_name, content) doc_fields.append('archivo') except Exception as exc: logger.error(f"[auto_corregir] {pedimento.pedimento_app} — error renombrando en MinIO: {exc}") # ── Persistir cambios en BD ─────────────── try: with transaction.atomic(): pedimento.save(update_fields=ped_fields) candidato.document_type_id = _DOC_TYPE_PC candidato.vu = False if 'archivo' in doc_fields: candidato.archivo = final_object_name candidato.save(update_fields=doc_fields) except Exception as exc: logger.error(f"[auto_corregir] {pedimento.pedimento_app} — error guardando en BD: {exc}") ignorados += 1 continue corregidos += 1 detalles.append({ 'pedimento': pedimento.pedimento_app, 'accion': 'corregido', 'campos_pedimento': ped_fields, 'documento_final': final_object_name, }) logger.info(f"[auto_corregir] {pedimento.pedimento_app} — corregido: {ped_fields}") # Modo individual: encolar el procesamiento completo (remesas, partidas, # coves, edocs) forzando aunque ya exista el documento tipo 2. if pedimento_id: try: from .microservice_v2 import procesar_pedimento_completo_individual procesar_pedimento_completo_individual.delay(str(pedimento.id), force=True) logger.info(f"[auto_corregir] {pedimento.pedimento_app} — PC completo encolado (force)") except Exception as exc: logger.warning(f"[auto_corregir] {pedimento.pedimento_app} — no se pudo encolar PC: {exc}") resultado = { 'total_revisados': revisados, 'corregidos': corregidos, 'ignorados': ignorados, 'detalles': detalles, } logger.info(f"[auto_corregir] org={organizacion_id} finalizado — {resultado}") publish_task_event(task_id, 'completed', 'Auto-corrección finalizada', resultado=resultado, progress=100) return resultado # ────────────────────────────────────────────── # Tarea de análisis (sin modificar nada) # ────────────────────────────────────────────── def _campos_a_corregir(pedimento, numero_operacion, aduana, clave_pedimento, regimen, rfc): """Retorna la lista de campos que se corregirían y los valores que se asignarían.""" campos = [] if numero_operacion and not pedimento.numero_operacion: campos.append({'campo': 'numero_operacion', 'valor_actual': None, 'valor_nuevo': numero_operacion}) if aduana and aduana != (pedimento.aduana or '').strip(): campos.append({'campo': 'aduana', 'valor_actual': pedimento.aduana, 'valor_nuevo': aduana}) if clave_pedimento and clave_pedimento != (pedimento.clave_pedimento or '').strip(): campos.append({'campo': 'clave_pedimento', 'valor_actual': pedimento.clave_pedimento, 'valor_nuevo': clave_pedimento}) if regimen and not pedimento.regimen: campos.append({'campo': 'regimen', 'valor_actual': None, 'valor_nuevo': regimen}) if rfc: try: importador = Importador.objects.get(rfc=rfc) if pedimento.contribuyente_id != importador.rfc: campos.append({ 'campo': 'contribuyente', 'valor_actual': pedimento.contribuyente_id, 'valor_nuevo': rfc, }) except Importador.DoesNotExist: pass return campos @shared_task(bind=True, name='auditar_pedamentos_incompletos') def auditar_pedamentos_incompletos_task(self, organizacion_id, pedimento_id=None): """ Análisis de solo lectura: reporta qué pedimentos serían corregidos y qué cambios se aplicarían, sin modificar BD ni storage. Si se proporciona pedimento_id, analiza solo ese pedimento. """ task_id = self.request.id revisados = 0 corregibles = [] sin_xml = 0 xml_sin_pc = 0 num_no_coincide = 0 con_error_vucem = 0 # Individual: analiza el pedimento específico sin importar su estado de corrección. # Masivo: solo los pendientes (consultar_vucem=False). if pedimento_id: qs = Pedimento.objects.filter(id=pedimento_id).order_by('pedimento_app') else: qs = Pedimento.objects.filter( organizacion_id=organizacion_id, consultar_vucem=False ).order_by('pedimento_app') total = qs.count() logger.info(f"[auditar_incompletos] org={organizacion_id} — {total} pedimentos a analizar") publish_task_event(task_id, 'processing', f'Iniciando análisis: {total} pedimentos', progress=0) for idx, pedimento in enumerate(qs.iterator(chunk_size=100)): revisados += 1 if total > 0 and (idx % _PROGRESS_INTERVAL == 0 or idx == total - 1): pct = int(((idx + 1) / total) * 95) publish_task_event( task_id, 'processing', f'Analizando {idx + 1}/{total}: {pedimento.pedimento_app}', progress=pct, ) # Buscar XML con respuesta de pedimento completo (evalúa todos, VUCEM primero) try: candidato, content, object_name, hay_candidatos = _find_pc_document(pedimento) except Exception as exc: logger.warning(f"[auditar_incompletos] {pedimento.pedimento_app} — error buscando PC: {exc}") sin_xml += 1 continue if not candidato: if hay_candidatos: xml_sin_pc += 1 else: sin_xml += 1 continue try: root = ET.fromstring(content) except ET.ParseError: xml_sin_pc += 1 continue tiene_error = _find_text(root, 'tieneError') if tiene_error and tiene_error.lower() == 'true': con_error_vucem += 1 continue pedimento_xml = _find_pedimento_number(root) pedimento_bd = (pedimento.pedimento or '').strip() if not pedimento_xml or pedimento_xml != pedimento_bd: num_no_coincide += 1 continue numero_operacion = _find_text(root, 'numeroOperacion') aduana = _find_child_text(root, 'aduanaEntradaSalida', 'clave') clave_pedimento = _find_child_text(root, 'claveDocumento', 'clave') tipo_operacion_raw = _find_child_text(root, 'tipoOperacion', 'clave') regimen = _resolve_regimen(clave_pedimento, tipo_operacion_raw) rfc = _find_child_text(root, 'importadorExportador', 'rfc') campos = _campos_a_corregir(pedimento, numero_operacion, aduana, clave_pedimento, regimen, rfc) dir_part = posixpath.dirname(object_name) nombre_pc = posixpath.join(dir_part, f"vu_PC_{pedimento.pedimento_app}.xml") corregibles.append({ 'pedimento_app': pedimento.pedimento_app, 'pedimento_id': str(pedimento.id), 'documento_actual': { 'id': str(candidato.id), 'archivo': object_name, 'document_type_id': candidato.document_type_id, }, 'documento_nuevo_nombre': nombre_pc if candidato.document_type_id != _DOC_TYPE_PC else None, 'campos_a_corregir': campos, 'consultar_vucem': True, }) resultado = { 'total_revisados': revisados, 'corregibles': len(corregibles), 'sin_xml_o_ilegible': sin_xml, 'xml_no_es_pedimento_completo': xml_sin_pc, 'numero_pedimento_no_coincide': num_no_coincide, 'con_error_vucem': con_error_vucem, 'pedimentos': corregibles, } logger.info(f"[auditar_incompletos] org={organizacion_id} finalizado — {resultado}") publish_task_event(task_id, 'completed', 'Análisis finalizado', resultado=resultado, progress=100) return resultado