from celery import shared_task from django.core.files.base import ContentFile from django.utils import timezone import os import zipfile import tempfile import shutil import logging import re import uuid from datetime import datetime logger = logging.getLogger(__name__) def normalize_filename(filename): """ Normaliza el nombre del archivo removiendo caracteres especiales, espacios y asegurando consistencia. """ from unicodedata import normalize filename = normalize('NFKD', filename).encode('ASCII', 'ignore').decode('ASCII') filename = re.sub(r'[^\w\s.-]', '_', filename) filename = re.sub(r'[\s()]+', '_', filename) filename = re.sub(r'_+', '_', filename) filename = filename.strip('_') return filename def get_clean_base_filename(filename): """ Obtiene el nombre base limpio sin el sufijo de Django. """ normalized = normalize_filename(filename) name_without_ext, ext = os.path.splitext(normalized) django_suffix = extract_django_suffix(name_without_ext) if django_suffix: base_name = name_without_ext[:-8] else: base_name = name_without_ext base_name = re.sub(r'(_copy|_copia|_-_copia|_-_copy)(_\d+)?$', '', base_name) return base_name.lower().strip('_') def extract_django_suffix(filename): """ Extrae el sufijo único que Django añade a los archivos. """ name_without_ext = os.path.splitext(filename)[0] match = re.search(r'_([a-zA-Z0-9]{7})$', name_without_ext) if match: return match.group(1) return None def is_same_document(existing_doc, new_filename): """ Compara si un documento existente y un nuevo archivo son el mismo documento. """ existing_basename = os.path.basename(existing_doc.archivo.name) existing_base = get_clean_base_filename(existing_basename) new_base = get_clean_base_filename(new_filename) existing_ext = existing_doc.extension.lower() new_ext = os.path.splitext(new_filename)[1].lower().lstrip('.') return existing_base == new_base and existing_ext == new_ext def procesar_archivo_m_con_nomenclatura(content, pedimento_instance): """ Procesa archivos con nomenclatura M8988852.300 (7 dígitos, punto, 3 dígitos) y extrae información de registros específicos para actualizar el pedimento. Args: content: bytes del contenido del archivo pedimento_instance: instancia del modelo Pedimento Returns: dict: Diccionario con información extraída """ try: content_text = content.decode('utf-8', errors='ignore') registros = {} for line in content_text.splitlines(): line = line.strip() if not line: continue parts = line.split('|') if len(parts) < 2: continue tipo_registro = parts[0] if tipo_registro not in registros: registros[tipo_registro] = [] registros[tipo_registro].append(parts) info_extraida = { 'tiene_nomenclatura_especial': False, 'registros_encontrados': list(registros.keys()), 'detalles_registro_500': [], 'detalles_registro_506': [], 'detalles_registro_501': [], 'detalles_registro_551': [], 'detalles_registro_800': [], 'detalles_registro_801': [], 'actualizaciones_aplicadas': [] } if '500' in registros: info_extraida['tiene_nomenclatura_especial'] = True for reg_500 in registros['500']: if len(reg_500) >= 1: info_extraida['detalles_registro_500'].append({ 'tipo_movimiento': reg_500[1] if len(reg_500) > 1 else None, 'patente': reg_500[2] if len(reg_500) > 1 else None, 'numero_pedimento': reg_500[3] if len(reg_500) > 1 else None, 'aduana_seccion': reg_500[4] if len(reg_500) > 1 else None, 'acuse_electronico': reg_500[5] if len(reg_500) > 1 else None, }) for reg_506 in registros.get('506', []): if len(reg_506) >= 1: info_extraida['detalles_registro_506'].append({ 'numero_pedimento': reg_506[1] if len(reg_506) > 1 else None, 'tipo_fecha': reg_506[2] if len(reg_506) > 1 else None, 'fecha': reg_506[3] if len(reg_506) > 1 else None }) for reg_501 in registros.get('501', []): if len(reg_501) >= 1: info_extraida['detalles_registro_501'].append({ 'patente': reg_501[1] if len(reg_501) > 1 else None, 'numero_pedimento': reg_501[2] if len(reg_501) > 1 else None, 'aduana_seccion': reg_501[3] if len(reg_501) > 1 else None, 'rfc': reg_501[8] if len(reg_501) > 1 else None, 'curp': reg_501[9] if len(reg_501) > 1 else None }) for reg_551 in registros.get('551', []): if len(reg_551) >= 1: info_extraida['detalles_registro_551'].append({ 'numero_pedimento': reg_501[1] if len(reg_501) > 1 else None, 'fraccion_arancelaria': reg_551[2] if len(reg_551) > 1 else None, 'partida': reg_551[3] if len(reg_551) > 1 else None, 'subfraccion': reg_551[4] if len(reg_551) > 1 else None }) for reg_801 in registros.get('800', []): if len(reg_801) >= 1: info_extraida['detalles_registro_800'].append({ 'numero_pedimento': reg_801[1] if len(reg_801) > 1 else None }) for reg_801 in registros.get('801', []): if len(reg_801) >= 1: info_extraida['detalles_registro_801'].append({ 'total_partidas': reg_801[1] if len(reg_801) > 1 else None }) actualizaciones = actualizar_pedimento_con_registros(pedimento_instance, registros) info_extraida['actualizaciones_aplicadas'] = actualizaciones return info_extraida except Exception as e: logger.error(f"Error al procesar archivo con nomenclatura especial: {str(e)}") return { 'tiene_nomenclatura_especial': False, 'error': str(e), 'registros_encontrados': [] } def actualizar_pedimento_con_registros(pedimento_instance, registros): """ Actualiza el pedimento con información extraída de los registros. Args: pedimento_instance: Instancia del pedimento a actualizar registros: Diccionario con registros parseados Returns: list: Lista de actualizaciones aplicadas """ actualizaciones = [] try: if '500' in registros and registros['500']: for reg_500 in registros['500']: if len(reg_500) >= 1: if pedimento_instance.pedimento == reg_500[3]: try: pedimento_instance.aduana = reg_500[4] actualizaciones.append(f"aduana actualizada a {reg_500[4]}") except ValueError: pass if '501' in registros and registros['501']: for reg_501 in registros['501']: if len(reg_501) >= 1: rfc = reg_501[8] if len(reg_501) > 1 else None if rfc and not pedimento_instance.contribuyente and pedimento_instance.pedimento == reg_501[2]: try: from api.customs.models import Importador importador, created = Importador.objects.get_or_create( rfc=rfc, defaults={ 'nombre': f"Importador {rfc}", 'organizacion': pedimento_instance.organizacion } ) pedimento_instance.contribuyente = importador if created: actualizaciones.append(f"importador creado con RFC {rfc}") else: actualizaciones.append(f"importador asociado con RFC {rfc}") except Exception as e: logger.error(f"Error al crear/obtener importador: {str(e)}") if '501' in registros and registros['501']: for reg_501 in registros['501']: if len(reg_501) >= 1: curp = reg_501[9] if len(reg_501) > 1 else None if curp and not pedimento_instance.curp_apoderado and pedimento_instance.pedimento == reg_501[2]: pedimento_instance.curp_apoderado = curp actualizaciones.append(f"curp_apoderado actualizado a {curp}") if '501' in registros and registros['501']: for reg_501 in registros['501']: if len(reg_501) >= 1: tipo_operacion = reg_501[4] if len(reg_501) > 1 else None if tipo_operacion and pedimento_instance.pedimento == reg_501[2]: if tipo_operacion=='1': nombre_tipo_op = "Importacion" elif tipo_operacion=='2': nombre_tipo_op = "Exportacion" else: nombre_tipo_op = f"Tipo {tipo_operacion}" try: from api.customs.models import TipoOperacion tipo_op_obj, created = TipoOperacion.objects.get_or_create( id=tipo_operacion, tipo=nombre_tipo_op, defaults={'descripcion': f"Tipo de Operación {tipo_operacion}"} ) pedimento_instance.tipo_operacion = tipo_op_obj if created: actualizaciones.append(f"tipo_operacion creado con tipo {tipo_operacion}") else: actualizaciones.append(f"tipo_operacion asociado con tipo {tipo_operacion}") except Exception as e: logger.error(f"Error al crear/obtener tipo de operación: {str(e)}") if '501' in registros and registros['501']: for reg_501 in registros['501']: if len(reg_501) >= 1: clave = reg_501[5] if len(reg_501) > 1 else None if clave and pedimento_instance.pedimento == reg_501[2]: pedimento_instance.clave_pedimento = clave actualizaciones.append(f"clave pedimento actualizada a {clave}") if '506' in registros and registros['506']: for reg_506 in registros['506']: if not pedimento_instance.pedimento == reg_506[1]: continue if len(reg_506) >= 1: tipo_fecha = reg_506[2] if len(reg_506) > 1 else None fecha_str = reg_506[3] if len(reg_506) > 1 else None if not tipo_fecha == '2': continue if fecha_str: try: if len(fecha_str) == 8: fecha = datetime.strptime(fecha_str, '%d%m%Y').date() elif len(fecha_str) == 6: fecha = datetime.strptime(fecha_str, '%d%m%y').date() else: continue pedimento_instance.fecha_pago = fecha actualizaciones.append(f"fecha_pago actualizada a {fecha}") except (ValueError, TypeError): pass num_partidas = 0 if '551' in registros and registros['551']: for reg_551 in registros['551']: if not pedimento_instance.pedimento == reg_551[1]: continue num_partidas += 1 pedimento_instance.numero_partidas = num_partidas actualizaciones.append(f"numero_partidas actualizado a {num_partidas}") if actualizaciones: pedimento_instance.save() except Exception as e: logger.error(f"Error al actualizar pedimento con registros: {str(e)}") actualizaciones.append(f"error: {str(e)}") return actualizaciones @shared_task(bind=True, max_retries=3, time_limit=600) def bulk_upload_record_task(self, organizacion_id, parametros, archivo_paths): """ Procesa archivos ZIP de pedimentos en segundo plano. Args: organizacion_id: UUID de la organización parametros: dict con keys: - contribuyente - fecha_pago_input - clave_pedimento_input - patente_input - tipo_operacion_input - aduana_input - curp_apoderado_input - partidas_input archivo_paths: lista de rutas temporales de archivos ZIP """ from api.organization.models import Organizacion from api.customs.models import Pedimento, Importador, TipoOperacion from api.record.models import Document, DocumentType, Fuente created_pedimentos = [] updated_pedimentos = [] failed_records = [] documents_created = 0 temp_dir = None try: organizacion = Organizacion.objects.get(id=organizacion_id) # Extraer parámetros contribuyente = parametros.get('contribuyente', None) fecha_pago_input = parametros.get('fecha_pago_input', None) clave_pedimento_input = parametros.get('clave_pedimento_input', None) patente_input = parametros.get('patente_input', None) tipo_operacion_input = parametros.get('tipo_operacion_input', None) aduana_input = parametros.get('aduana_input', None) curp_apoderado_input = parametros.get('curp_apoderado_input', None) partidas_input = parametros.get('partidas_input', None) # Regex patterns nomenclatura_pattern = re.compile(r'^(\d{2})-(\d{2,3})-(\d{4})-(\d{7})$') nomenclatura_pattern_sin_anio = re.compile(r'^(\d{2,3})-(\d{4})-(\d{7})$') # Obtener DocumentType try: document_type = DocumentType.objects.get(nombre="Pedimento") except DocumentType.DoesNotExist: document_type = DocumentType.objects.create( nombre="Pedimento", descripcion="Documento de pedimento" ) # Fuente fuente, _ = Fuente.objects.get_or_create( nombre="APP-EFC", descripcion='Transmitido por la app de escritorio' ) # Usar el directorio donde están los archivos (ya guardado en MEDIA_ROOT) # El directorio base es el padre del primer archivo if archivo_paths: temp_dir = os.path.dirname(archivo_paths[0]) else: temp_dir = tempfile.mkdtemp() # Patrón para nomenclatura especial M8988852.300 patron_nomenclatura = re.compile(r'^[m|M]\d{7}\.\d{3}$', re.IGNORECASE) existing_pedimento = None for archivo_path in archivo_paths: archivo_name = os.path.basename(archivo_path).lower() archivo_name_sin_extension = os.path.splitext(os.path.basename(archivo_path))[0] sub_dir = os.path.join(temp_dir, archivo_name_sin_extension) os.makedirs(sub_dir, exist_ok=True) print(f"Procesando archivo: {archivo_name} en ruta temporal: {archivo_path}") if archivo_name.endswith('.zip'): try: with zipfile.ZipFile(archivo_path, 'r') as zip_ref: zip_ref.extractall(sub_dir) os.remove(archivo_path) # Eliminar el archivo ZIP después de extraerlo except zipfile.BadZipFile as e: failed_records.append({ "file": archivo_path, "archivo_original": archivo_name, "error": f"Archivo ZIP corrupto o inválido: {str(e)}" }) continue except Exception as e: failed_records.append({ "file": archivo_path, "archivo_original": archivo_name, "error": f"Error al extraer ZIP: {str(e)}" }) continue else: failed_records.append({ "file": archivo_path, "archivo_original": archivo_name, "error": "Solo se admiten archivos ZIP" }) continue # Procesar archivos extraídos for root, dirs, files in os.walk(temp_dir): for file_name in files: file_path = os.path.join(root, file_name) relative_path = os.path.relpath(file_path, temp_dir) # Determinar folder_name folder_name = None if os.path.dirname(relative_path): folder_parts = relative_path.split(os.sep) folder_name = folder_parts[0] else: folder_name = os.path.splitext(file_name)[0] # Validar nomenclatura match = nomenclatura_pattern.match(folder_name) match_sin_anio = nomenclatura_pattern_sin_anio.match(folder_name) if not match and not match_sin_anio: archivo_original = folder_name + '.zip' failed_records.append({ "file": relative_path, "archivo_original": archivo_original, "error": f"Nomenclatura inválida: {folder_name}. Esperado: anio-aduana-patente-pedimento" }) continue if match: anio, aduana, patente, pedimento_num = match.groups() try: anio_completo = 2000 + int(anio) if int(anio) < 50 else 1900 + int(anio) fecha_pago = datetime(anio_completo, 1, 1).date() except ValueError: failed_records.append({ "file": relative_path, "archivo_original": folder_name + '.zip', "error": f"Año inválido: {anio}" }) continue elif match_sin_anio: aduana, patente, pedimento_num = match_sin_anio.groups() primer_digito_pedimento = int(pedimento_num[0]) if pedimento_num else 0 año_actual = datetime.now().year año_con_digito = int(str(año_actual)[:-1] + str(primer_digito_pedimento)) if año_con_digito <= año_actual: año_final = año_con_digito else: año_final = año_con_digito - 10 anio = año_final % 100 fecha_pago = datetime(año_final, 1, 1).date() # Generar pedimento_app pedimento_app = f"{anio}-{aduana.zfill(2)}-{patente}-{pedimento_num}" # Verificar si el pedimento ya existe existing_pedimento = Pedimento.objects.filter( pedimento_app=pedimento_app, organizacion=organizacion ).first() if not existing_pedimento: # Crear nuevo pedimento try: importador = None if contribuyente: importador, created = Importador.objects.get_or_create( rfc=contribuyente, defaults={ 'nombre': f"Importador {contribuyente}", 'organizacion': organizacion } ) tipo_op = None if tipo_operacion_input: tipo_op = TipoOperacion.objects.get(id=tipo_operacion_input) pedimento = Pedimento.objects.create( organizacion=organizacion, contribuyente=importador if importador else None, pedimento=str(pedimento_num), aduana=str(aduana), patente=str(patente), fecha_pago=fecha_pago_input if fecha_pago_input else fecha_pago, curp_apoderado=curp_apoderado_input if curp_apoderado_input else "", numero_partidas=partidas_input if partidas_input else 0, tipo_operacion=tipo_op if tipo_op else None, pedimento_app=pedimento_app, agente_aduanal=f"Agente {patente}", clave_pedimento=clave_pedimento_input if clave_pedimento_input else "A1" ) existing_pedimento = pedimento created_pedimentos.append({ "id": str(pedimento.id), "pedimento_app": pedimento_app, "contribuyente": getattr(importador, 'rfc', None), "contribuyente_nombre": getattr(importador, 'nombre', None) }) except Exception as e: failed_records.append({ "file": relative_path, "archivo_original": folder_name + '.zip', "error": f"Error al crear pedimento: {str(e)}" }) continue else: # Actualizar pedimento existente if contribuyente: importador, created = Importador.objects.get_or_create( rfc=contribuyente, defaults={ 'nombre': f"Importador {contribuyente}", 'organizacion': organizacion } ) importador_db = existing_pedimento.contribuyente if importador_db: if importador_db != importador: existing_pedimento.contribuyente = importador else: existing_pedimento.contribuyente = importador existing_pedimento.save() # Actualizar Tipo Operacion if tipo_operacion_input: tipo_op = TipoOperacion.objects.get(id=tipo_operacion_input) if tipo_op and not existing_pedimento.tipo_operacion: existing_pedimento.tipo_operacion = tipo_op existing_pedimento.save() # Actualizar fecha de pago if fecha_pago_input: fecha_db = existing_pedimento.fecha_pago if fecha_db: if isinstance(fecha_db, datetime): fecha_db = fecha_db.date() if fecha_db.month == 1 and fecha_db.day == 1: existing_pedimento.fecha_pago = fecha_pago_input existing_pedimento.save() else: existing_pedimento.fecha_pago = fecha_pago_input existing_pedimento.save() # Actualizar clave_pedimento if clave_pedimento_input: clave_pedimento = existing_pedimento.clave_pedimento if not clave_pedimento or clave_pedimento.strip() != clave_pedimento_input.strip(): existing_pedimento.clave_pedimento = clave_pedimento_input existing_pedimento.save() # Actualizar curp_apoderado if curp_apoderado_input: if not existing_pedimento.curp_apoderado: existing_pedimento.curp_apoderado = curp_apoderado_input existing_pedimento.save() # Actualizar partidas if partidas_input: num_partidas = existing_pedimento.numero_partidas if not num_partidas or num_partidas <= 0: existing_pedimento.numero_partidas = partidas_input existing_pedimento.save() # Crear documento asociado al pedimento try: with open(file_path, 'rb') as f: file_content = f.read() file_name_lower = file_name.lower() tiene_nomenclatura_especial = False info_extraida = {} nombre_base, extension = os.path.splitext(file_name) if patron_nomenclatura.match(file_name_lower): tiene_nomenclatura_especial = True info_extraida = procesar_archivo_m_con_nomenclatura(file_content, existing_pedimento) django_file = ContentFile(file_content, name=file_name) # Buscar documento existente existing_documents = Document.objects.filter( pedimento_id=existing_pedimento.id, organizacion=organizacion ) existing_document = None for doc in existing_documents: if is_same_document(doc, file_name): existing_document = doc break if existing_document: # Actualizar documento existente # try: # if existing_document.archivo and os.path.exists(existing_document.archivo.path): # os.remove(existing_document.archivo.path) # except (ValueError, OSError): # pass # existing_document.archivo = django_file # existing_document.size = len(file_content) # existing_document.extension = extension # existing_document.updated_at = timezone.now() # existing_document.save() # doc = Document.objects.get(id=existing_document.id) # doc.archivo.delete(save=False) # Eliminar el archivo anterior # doc.delete() # Eliminar el registro para crear uno nuevo (evita problemas con archivos en Django) updated_pedimentos.append({ "id": str(existing_pedimento.id), "pedimento_app": existing_pedimento.pedimento_app, "accion": "Documento actualizado", "documento": file_name }) documents_created += 1 else: # Crear nuevo documento document = Document.objects.create( organizacion=organizacion, pedimento_id=existing_pedimento.id, document_type=document_type, fuente_id=fuente.id, archivo=django_file, size=len(file_content), extension=os.path.splitext(file_name)[1].lower().lstrip('.') ) updated_pedimentos.append({ "id": str(existing_pedimento.id), "pedimento_app": existing_pedimento.pedimento_app, "accion": "Documento creado", "documento": file_name }) documents_created += 1 except Exception as e: failed_records.append({ "file": relative_path, "archivo_original": folder_name + '.zip', "error": f"Error al crear documento: {str(e)}" }) continue # Actualizar estado de expediente if documents_created > 0 and existing_pedimento: existing_pedimento.existe_expediente = True existing_pedimento.save() return { 'status': 'completed', 'created_pedimentos': created_pedimentos, 'updated_pedimentos': updated_pedimentos, 'failed_records': failed_records, 'documents_created': documents_created, 'tieneError': len(failed_records) > 0 } except Exception as e: logger.error(f"Error en bulk_upload_record_task: {str(e)}") raise self.retry(exc=e, countdown=60) finally: # Limpiar directorio temporal if temp_dir and os.path.exists(temp_dir): try: shutil.rmtree(temp_dir) except Exception as e: logger.warning(f"Error al limpiar directorio temporal: {e}")