fix: se ajusta enpoint de bulk-create-pedimento_desk para scrapear el archivo de validacion. #21

Merged
jcedilloAS merged 1 commits from T2026-01-098 into main 2026-02-09 19:49:03 +00:00
Showing only changes of commit 1c350cf2bf - Show all commits

View File

@@ -1364,37 +1364,39 @@ class ViewSetPedimento(LoggingMixin, viewsets.ModelViewSet, OrganizacionFiltrada
with open(file_path, 'rb') as f: with open(file_path, 'rb') as f:
file_content = f.read() file_content = f.read()
# Verificar si el archivo tiene la nomenclatura especial M8988852.300
file_name_lower = file_name.lower()
tiene_nomenclatura_especial = False
info_extraida = {}
# Patrón: 7 dígitos, punto, 3 dígitos (ej: M8988852.300)
patron_nomenclatura = re.compile(r'^[m|M]\d{7}\.\d{3}$', re.IGNORECASE)
# Separar nombre base y extensión
nombre_base, extension = os.path.splitext(file_name)
if patron_nomenclatura.match(file_name_lower):
tiene_nomenclatura_especial = True
# Procesar el archivo con el método auxiliar
info_extraida = procesar_archivo_m_con_nomenclatura(file_content, pedimento )
if info_extraida.get('tiene_nomenclatura_especial', False):
# Agregar información de procesamiento a los datos de respuesta
if 'procesamiento_archivos' not in locals():
procesamiento_archivos = []
procesamiento_archivos.append({
'archivo': file_name,
'nomenclatura_especial': True,
'registros_encontrados': info_extraida.get('registros_encontrados', []),
'actualizaciones': info_extraida.get('actualizaciones_aplicadas', [])
})
# print(f"📄 Archivo leído: {len(file_content)} bytes") # print(f"📄 Archivo leído: {len(file_content)} bytes")
# Crear ContentFile que Django puede manejar correctamente # Crear ContentFile que Django puede manejar correctamente
django_file = ContentFile(file_content, name=file_name) django_file = ContentFile(file_content, name=file_name)
# # Verificar si el documento ya existe para este pedimento y archivo
# print("🔍 Verificando existencia previa del documento...")
# # Reemplazar múltiples caracteres
# normalized_file_name = file_name.replace(" ", "_")
# file_name_without_extension = normalized_file_name.rsplit('.', 1)[0]
# extension_file = os.path.splitext(normalized_file_name)[1].lower().lstrip('.')
# existing_document = Document.objects.filter(
# pedimento_id=pedimento.id,
# archivo__contains=file_name_without_extension,
# extension=extension_file
# ).first()
# if existing_document:
# print(f"Documento existente encontrado, omitiendo creación: ID {existing_document.id}")
# continue
# try:
# fuente = Fuente.objects.get(nombre="APP-EFC")
# except Fuente.DoesNotExist:
# fuente = Fuente.objects.create(
# nombre="APP-EFC",
# descripcion='Transmitido por la app de escritorio'
# )
fuente, created = Fuente.objects.get_or_create( fuente, created = Fuente.objects.get_or_create(
nombre="APP-EFC", nombre="APP-EFC",
descripcion='Transmitido por la app de escritorio' descripcion='Transmitido por la app de escritorio'
@@ -1426,6 +1428,10 @@ class ViewSetPedimento(LoggingMixin, viewsets.ModelViewSet, OrganizacionFiltrada
}) })
continue continue
if documents_created > 0 and existing_pedimento:
existing_pedimento.existe_expediente = True
existing_pedimento.save()
# print(f"🏁 Procesamiento completado. Archivos procesados en este directorio.") # print(f"🏁 Procesamiento completado. Archivos procesados en este directorio.")
except Exception as e: except Exception as e:
return Response( return Response(
@@ -1917,3 +1923,282 @@ def get_clean_base_filename(filename):
base_name = re.sub(r'(_copy|_copia|_-_copia|_-_copy)(_\d+)?$', '', base_name) base_name = re.sub(r'(_copy|_copia|_-_copia|_-_copy)(_\d+)?$', '', base_name)
return base_name.lower().strip('_') return base_name.lower().strip('_')
def procesar_archivo_m_con_nomenclatura(content, pedimento_instance):
"""
Procesa archivos con nomenclatura M8988852.300 (7 dígitos, punto, 3 dígitos)
y extrae información de registros específicos para actualizar el pedimento.
Args:
content: bytes del contenido del archivo
pedimento_instance: instancia del modelo Pedimento
Returns:
dict: Diccionario con información extraída
"""
try:
# Decodificar el contenido como texto
content_text = content.decode('utf-8', errors='ignore')
# Buscar todas las líneas que empiezan con los registros solicitados
registros = {}
for line in content_text.splitlines():
line = line.strip()
if not line:
continue
# Dividir por pipe
parts = line.split('|')
if len(parts) < 2:
continue
tipo_registro = parts[0]
# Guardar todos los registros encontrados
if tipo_registro not in registros:
registros[tipo_registro] = []
registros[tipo_registro].append(parts)
# Procesar información específica
info_extraida = {
'tiene_nomenclatura_especial': False,
'registros_encontrados': list(registros.keys()),
'detalles_registro_500': [],
'detalles_registro_506': [],
'detalles_registro_501': [],
'detalles_registro_551': [],
'detalles_registro_800': [],
'detalles_registro_801': [],
'actualizaciones_aplicadas': []
}
# Verificar si hay registros del tipo 500 (indicador de archivo válido)
if '500' in registros:
info_extraida['tiene_nomenclatura_especial'] = True
# Procesar registro 500: Información básica del pedimento
for reg_500 in registros['500']:
if len(reg_500) >= 1:
info_extraida['detalles_registro_500'].append({
'tipo_movimiento': reg_500[1] if len(reg_500) > 1 else None,
'patente': reg_500[2] if len(reg_500) > 1 else None,
'numero_pedimento': reg_500[3] if len(reg_500) > 1 else None,
'aduana_seccion': reg_500[4] if len(reg_500) > 1 else None,
'acuse_electronico': reg_500[5] if len(reg_500) > 1 else None,
})
# Procesar registro 506: Fechas importantes
for reg_506 in registros.get('506', []):
if len(reg_506) >= 1:
info_extraida['detalles_registro_506'].append({
'numero_pedimento': reg_506[1] if len(reg_506) > 1 else None,
'tipo_fecha': reg_506[2] if len(reg_506) > 1 else None,
'fecha': reg_506[3] if len(reg_506) > 1 else None
})
# Procesar registro 501: Información del importador/exportador
for reg_501 in registros.get('501', []):
if len(reg_501) >= 1:
info_extraida['detalles_registro_501'].append({
'patente': reg_501[1] if len(reg_501) > 1 else None,
'numero_pedimento': reg_501[2] if len(reg_501) > 1 else None,
'aduana_seccion': reg_501[3] if len(reg_501) > 1 else None,
'rfc': reg_501[8] if len(reg_501) > 1 else None,
'curp': reg_501[9] if len(reg_501) > 1 else None
})
# Procesar registro 551: Información de partidas
for reg_551 in registros.get('551', []):
if len(reg_551) >= 1:
info_extraida['detalles_registro_551'].append({
'numero_pedimento': reg_501[1] if len(reg_501) > 1 else None,
'fraccion_arancelaria': reg_551[2] if len(reg_551) > 1 else None,
'partida': reg_551[3] if len(reg_551) > 1 else None,
'subfraccion': reg_551[4] if len(reg_551) > 1 else None
})
# Electrónica de Pedimento
for reg_801 in registros.get('800', []):
if len(reg_801) >= 1:
info_extraida['detalles_registro_800'].append({
'numero_pedimento': reg_801[1] if len(reg_801) > 1 else None
})
# Fin de Archivo
for reg_801 in registros.get('801', []):
if len(reg_801) >= 1:
info_extraida['detalles_registro_801'].append({
'total_partidas': reg_801[1] if len(reg_801) > 1 else None
})
# Intentar actualizar campos del pedimento con la información extraída
actualizaciones = actualizar_pedimento_con_registros(pedimento_instance, registros)
info_extraida['actualizaciones_aplicadas'] = actualizaciones
return info_extraida
except Exception as e:
print(f"Error al procesar archivo con nomenclatura especial: {str(e)}")
return {
'tiene_nomenclatura_especial': False,
'error': str(e),
'registros_encontrados': []
}
def actualizar_pedimento_con_registros(pedimento_instance, registros):
"""
Actualiza el pedimento con información extraída de los registros.
Args:
pedimento_instance: Instancia del pedimento a actualizar
registros: Diccionario con registros parseados
Returns:
list: Lista de actualizaciones aplicadas
"""
actualizaciones = []
try:
# Extraer información del registro 500 (si existe)
if '500' in registros and registros['500']:
for reg_500 in registros['500']:
if len(reg_500) >= 1:
# Actualizar número de pedimento si está vacío
if pedimento_instance.pedimento == reg_500[3]:
try:
pedimento_instance.aduana = reg_500[4]
actualizaciones.append(f"aduana actualizada a {reg_500[4]}")
except ValueError:
pass
# Extraer información del registro 501 (importador/exportador)
if '501' in registros and registros['501']:
for reg_501 in registros['501']:
if len(reg_501) >= 1:
rfc = reg_501[8] if len(reg_501) > 1 else None
# Actualizar importador si hay RFC y no existe
if rfc and not pedimento_instance.contribuyente and pedimento_instance.pedimento == reg_501[2]:
try:
from api.customs.models import Importador
importador, created = Importador.objects.get_or_create(
rfc=rfc,
defaults={
'nombre': f"Importador {rfc}",
'organizacion': pedimento_instance.organizacion
}
)
pedimento_instance.contribuyente = importador
if created:
actualizaciones.append(f"importador creado con RFC {rfc}")
else:
actualizaciones.append(f"importador asociado con RFC {rfc}")
except Exception as e:
print(f"Error al crear/obtener importador: {str(e)}")
# Extraer CURP del registro 501
if '501' in registros and registros['501']:
for reg_501 in registros['501']:
if len(reg_501) >= 1:
curp = reg_501[9] if len(reg_501) > 1 else None
# Actualizar CURP del apoderado si está vacío
if curp and not pedimento_instance.curp_apoderado and pedimento_instance.pedimento == reg_501[2]:
pedimento_instance.curp_apoderado = curp
actualizaciones.append(f"curp_apoderado actualizado a {curp}")
# Extraer Tipo Operacion del registro 501
if '501' in registros and registros['501']:
for reg_501 in registros['501']:
if len(reg_501) >= 1:
tipo_operacion = reg_501[4] if len(reg_501) > 1 else None
# Actualizar tipo de operación si no existe
if tipo_operacion and pedimento_instance.pedimento == reg_501[2]:
if tipo_operacion=='1':
nombre_tipo_op = "Importacion"
elif tipo_operacion=='2':
nombre_tipo_op = "Exportacion"
else:
nombre_tipo_op = f"Tipo {tipo_operacion}"
try:
from api.customs.models import TipoOperacion
tipo_op_obj, created = TipoOperacion.objects.get_or_create(
id=tipo_operacion,
tipo=nombre_tipo_op,
defaults={'descripcion': f"Tipo de Operación {tipo_operacion}"}
)
pedimento_instance.tipo_operacion = tipo_op_obj
if created:
actualizaciones.append(f"tipo_operacion creado con tipo {tipo_operacion}")
else:
actualizaciones.append(f"tipo_operacion asociado con tipo {tipo_operacion}")
except Exception as e:
print(f"Error al crear/obtener tipo de operación: {str(e)}")
# Extraer Clave Pedimento
if '501' in registros and registros['501']:
for reg_501 in registros['501']:
if len(reg_501) >= 1:
clave = reg_501[5] if len(reg_501) > 1 else None
# Actualizar clave si no existe
if clave and pedimento_instance.pedimento == reg_501[2]:
pedimento_instance.clave_pedimento = clave
actualizaciones.append(f"clave pedimento actualizada a {clave}")
# Extraer fechas del registro 506
if '506' in registros and registros['506']:
for reg_506 in registros['506']:
if not pedimento_instance.pedimento == reg_506[1]:
continue
if len(reg_506) >= 1:
tipo_fecha = reg_506[2] if len(reg_506) > 1 else None
fecha_str = reg_506[3] if len(reg_506) > 1 else None
if not tipo_fecha == '2':
continue
# Procesar fecha según formato (DDMMYYYY o DDMMYY)
if fecha_str:
try:
# Intentar diferentes formatos de fecha
if len(fecha_str) == 8: # DDMMYYYY
fecha = datetime.strptime(fecha_str, '%d%m%Y').date()
elif len(fecha_str) == 6: # DDMMYY
fecha = datetime.strptime(fecha_str, '%d%m%y').date()
else:
continue
# Asignar como fecha de pago si no existe
# if not pedimento_instance.fecha_pago:
# pedimento_instance.fecha_pago = fecha
# actualizaciones.append(f"fecha_pago actualizada a {fecha}")
pedimento_instance.fecha_pago = fecha
actualizaciones.append(f"fecha_pago actualizada a {fecha}")
except (ValueError, TypeError):
pass
num_partidas = 0
if '551' in registros and registros['551']:
for reg_551 in registros['551']:
if not pedimento_instance.pedimento == reg_551[1]:
continue
num_partidas += 1
pedimento_instance.numero_partidas = num_partidas
actualizaciones.append(f"numero_partidas actualizado a {num_partidas}")
# Guardar los cambios si hubo actualizaciones
if actualizaciones:
pedimento_instance.save()
except Exception as e:
print(f"Error al actualizar pedimento con registros: {str(e)}")
actualizaciones.append(f"error: {str(e)}")
return actualizaciones