feature/implementacion de gestor de informacion y archivos minIO

This commit is contained in:
Dulce
2026-04-22 11:10:05 -06:00
parent 69d07f2713
commit 39504e196c
23 changed files with 2272 additions and 391 deletions

View File

@@ -1,3 +1,4 @@
import tempfile
from celery import group
from celery import shared_task
import logging
@@ -6,81 +7,130 @@ from django.utils import timezone
import os
import zipfile
import re
from api.utils.storage_service import storage_service
@shared_task
def procesar_datastage_task(datastage_id, user_organizacion_id=None):
import traceback
tmp_path = None
try:
logger = logging.getLogger(__name__)
from api.datastage.models import DataStage
from api.organization.models import Organizacion
from api.customs.models import Pedimento, TipoOperacion, Regimen
datastage = DataStage.objects.get(id=datastage_id)
# Obtener datastage
try:
datastage = DataStage.objects.get(id=datastage_id)
except DataStage.DoesNotExist:
return {'error': f'DataStage {datastage_id} no encontrado'}
# Validar archivo
if not datastage.archivo:
print("DataStage no tiene archivo asociado")
return {'detail': 'No hay archivo asociado a este DataStage.'}
file_path = datastage.archivo.path
if not os.path.exists(file_path):
return {'detail': 'El archivo no existe en el servidor.'}
if not file_path.endswith('.zip'):
ruta_archivo = str(datastage.archivo)
if not ruta_archivo.lower().endswith('.zip'):
return {'detail': 'El archivo no es un .zip.'}
documentos_encontrados = []
registros_cargados = {}
registros_por_archivo = {}
errores_por_archivo = {}
errores_pedimento = []
# Descargar archivo
with tempfile.NamedTemporaryFile(delete=False, suffix='.zip') as tmp:
tmp_path = tmp.name
success = storage_service.download_file(ruta_archivo, tmp_path)
if not success:
print(f"No se pudo descargar: {ruta_archivo}")
return {'detail': f'No se pudo descargar el archivo: {ruta_archivo}'}
file_path = tmp_path
# Obtener organización
user_organizacion = None
if user_organizacion_id:
user_organizacion = Organizacion.objects.get(id=user_organizacion_id)
try:
user_organizacion = Organizacion.objects.get(id=user_organizacion_id)
except Organizacion.DoesNotExist:
print(f"Organización no encontrada: {user_organizacion_id}")
def to_snake_case(name):
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
s2 = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1)
return s2.replace('__', '_').lower()
# Lanzar una subtarea por cada archivo ASC
# Leer ZIP y lanzar subtareas
subtasks = []
with zipfile.ZipFile(file_path, 'r') as zip_ref:
for asc_name in zip_ref.namelist():
namelist = zip_ref.namelist()
for asc_name in namelist:
if asc_name.endswith('.asc'):
subtasks.append(procesar_archivo_asc_task.s(datastage_id, user_organizacion_id, asc_name))
subtasks.append(
procesar_archivo_asc_task.s(datastage_id, user_organizacion_id, asc_name)
)
if subtasks:
job = group(subtasks).apply_async()
print(f"Grupo de tareas lanzado: {job.id}")
return {
'group_id': job.id,
'subtask_ids': [t.id for t in job.results],
'detail': 'Procesamiento lanzado. Monitorea el estado de cada subtask_id.'
'detail': f'Procesamiento lanzado. {len(subtasks)} archivos .ASC en cola.'
}
print("No se encontraron archivos .ASC")
return {'detail': 'No se encontraron archivos .asc'}
except Exception as e:
import traceback
return {'error': str(e), 'traceback': traceback.format_exc()}
finally:
# Limpiar temporal
if tmp_path and os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except Exception as e:
print(f"No se pudo eliminar temporal: {e}")
@shared_task
def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name):
import traceback
"""
Procesa un archivo .ASC individual dentro del ZIP
"""
tmp_path = None
try:
logger = logging.getLogger(__name__)
from api.datastage.models import DataStage
from api.organization.models import Organizacion
from api.customs.models import Pedimento, TipoOperacion, Regimen
from django.apps import apps
import zipfile
import re
import datetime
# Obtener datastage
datastage = DataStage.objects.get(id=datastage_id)
user_organizacion = None
if user_organizacion_id:
user_organizacion = Organizacion.objects.get(id=user_organizacion_id)
file_path = datastage.archivo.path
ruta_archivo = str(datastage.archivo)
# Descargar archivo
with tempfile.NamedTemporaryFile(delete=False, suffix='.zip') as tmp:
tmp_path = tmp.name
success = storage_service.download_file(ruta_archivo, tmp_path)
if not success:
return {'errores': [f'No se pudo descargar el archivo: {ruta_archivo}']}
file_path = tmp_path
def to_snake_case(name):
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
s2 = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1)
return s2.replace('__', '_').lower()
objects_to_create = []
with zipfile.ZipFile(file_path, 'r') as zip_ref:
if asc_name not in zip_ref.namelist():
print(f"{asc_name} no encontrado en el ZIP")
return {'errores': [f'{asc_name} no encontrado en el zip']}
# Determinar modelo
match = re.match(r'.*_(\d+)\.asc$', asc_name)
if match:
registro_key = match.group(1)
@@ -96,53 +146,53 @@ def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name):
Model = apps.get_model('datastage', model_name)
except LookupError:
return {'errores': [f"No existe el modelo para {model_name}"]}
# Procesar archivo
with zip_ref.open(asc_name) as asc_file:
first = True
field_names = []
field_names_snake = []
objects_to_create = []
errores_pedimento = []
line_count = 0
for line in asc_file:
line_decoded = None
line_count += 1
try:
line_decoded = line.decode('utf-8').strip()
except UnicodeDecodeError:
try:
line_decoded = line.decode('latin-1').strip()
except Exception as e:
except Exception:
continue
except Exception as e:
continue
if not line_decoded:
continue
if first:
field_names = [f for f in line_decoded.split('|')]
field_names_snake = [to_snake_case(f) for f in field_names]
first = False
continue
values = line_decoded.split('|')
while values and values[-1] == '':
values.pop()
if len(values) == len(field_names_snake) + 1 and values[-1] == '':
values = values[:-1]
if len(values) < len(field_names_snake):
values += [None] * (len(field_names_snake) - len(values))
if len(values) != len(field_names_snake):
continue
data = dict(zip(field_names_snake, values))
if hasattr(Model, 'organizacion_id'):
data['organizacion_id'] = user_organizacion.id if user_organizacion else None
if hasattr(Model, 'datastage_id'):
data['datastage_id'] = datastage.id
# Limpiar campos de fecha vacíos ('') a None
# Limpiar fechas vacías
for field in Model._meta.get_fields():
if hasattr(field, 'get_internal_type') and field.get_internal_type() in ["DateField", "DateTimeField"]:
if data.get(field.name) == "":
data[field.name] = None
# Convertir fecha_pago_real a timezone-aware si existe
# Convertir fecha_pago_real
if 'fecha_pago_real' in data and data['fecha_pago_real']:
from django.utils import timezone
import datetime
fecha_val = data['fecha_pago_real']
if isinstance(fecha_val, str):
try:
@@ -156,11 +206,11 @@ def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name):
dt = timezone.make_aware(dt)
if dt:
data['fecha_pago_real'] = dt
elif isinstance(fecha_val, datetime.datetime) and timezone.is_naive(fecha_val):
data['fecha_pago_real'] = timezone.make_aware(fecha_val)
try:
obj = Model(**data)
objects_to_create.append(obj)
# Si es Registro501, crear Pedimento
if model_name == 'Registro501':
organizacion_instance = None
@@ -169,7 +219,7 @@ def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name):
try:
organizacion_instance = Organizacion.objects.get(id=org_id)
except Exception as org_exc:
logger.warning(f"No se encontró la organización con id {org_id}: {org_exc}")
print(f"No se encontró la organización con id {org_id}: {org_exc}")
if not organizacion_instance:
organizacion_instance = user_organizacion
fecha_pago_raw = data.get('fecha_pago_real')
@@ -198,7 +248,7 @@ def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name):
pedimento_app = f"{year[-2:]}-{str(aduana).zfill(2)[:2]}-{str(patente).zfill(4)[-4:]}-{str(pedimento_num).zfill(7)[-7:]}"
# logger.info(f"pedimento_app >>>> {pedimento_app}")
except Exception as ped_app_exc:
logger.warning(f"No se pudo generar pedimento_app: {ped_app_exc}")
print(f"No se pudo generar pedimento_app: {ped_app_exc}")
tipo_operacion_val = data.get('tipo_operacion')
tipo_operacion = TipoOperacion.objects.filter(id=int(tipo_operacion_val)).first() if tipo_operacion_val else None
regimen = Regimen.objects.filter(claveped=data.get('clave_documento', '').strip(), tipo=tipo_operacion.id if tipo_operacion else None).first() if tipo_operacion else None
@@ -237,11 +287,14 @@ def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name):
pass
except Exception as e:
continue
if objects_to_create:
try:
Model.objects.bulk_create(objects_to_create, batch_size=1000)
except Exception as e:
return {'archivo': asc_name, 'error': str(e), 'traceback': traceback.format_exc()}
# Bulk create
if objects_to_create:
try:
Model.objects.bulk_create(objects_to_create, batch_size=1000)
except Exception as e:
return {'archivo': asc_name, 'error': str(e)}
return {
'archivo': asc_name,
'insertados': len(objects_to_create)
@@ -249,33 +302,11 @@ def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name):
except Exception as e:
import traceback
return {'archivo': asc_name, 'error': str(e), 'traceback': traceback.format_exc()}
detalles = {}
for key in ['502', '503', '504']:
model_name = f'Registro{key}'
asc_file = None
encabezado = None
errores = []
for asc_name in registros_por_archivo:
if asc_name.endswith(f'_{key}.asc'):
asc_file = asc_name
break
if asc_file:
finally:
# Limpiar temporal
if tmp_path and os.path.exists(tmp_path):
try:
with zipfile.ZipFile(file_path, 'r') as zip_ref:
with zip_ref.open(asc_file) as f:
for line in f:
try:
encabezado = line.decode('utf-8').strip()
except UnicodeDecodeError:
encabezado = line.decode('latin-1').strip()
break
os.unlink(tmp_path)
except Exception as e:
encabezado = f'Error leyendo encabezado: {e}'
errores = errores_por_archivo.get(asc_file, [])
detalles[model_name] = {
'archivo': asc_file,
'encabezado': encabezado,
'errores': errores
}
return {'registros_cargados': registros_cargados, 'errores_pedimento': errores_pedimento}
print(f"No se pudo eliminar temporal: {e}")