312 lines
14 KiB
Python
312 lines
14 KiB
Python
import tempfile
|
|
from celery import group
|
|
from celery import shared_task
|
|
import logging
|
|
from django.apps import apps
|
|
from django.utils import timezone
|
|
import os
|
|
import zipfile
|
|
import re
|
|
from api.utils.storage_service import storage_service
|
|
|
|
@shared_task
|
|
def procesar_datastage_task(datastage_id, user_organizacion_id=None):
|
|
import traceback
|
|
tmp_path = None
|
|
try:
|
|
from api.datastage.models import DataStage
|
|
from api.organization.models import Organizacion
|
|
|
|
# Obtener datastage
|
|
try:
|
|
datastage = DataStage.objects.get(id=datastage_id)
|
|
except DataStage.DoesNotExist:
|
|
return {'error': f'DataStage {datastage_id} no encontrado'}
|
|
|
|
# Validar archivo
|
|
if not datastage.archivo:
|
|
print("DataStage no tiene archivo asociado")
|
|
return {'detail': 'No hay archivo asociado a este DataStage.'}
|
|
|
|
ruta_archivo = str(datastage.archivo)
|
|
|
|
if not ruta_archivo.lower().endswith('.zip'):
|
|
return {'detail': 'El archivo no es un .zip.'}
|
|
|
|
# Descargar archivo
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.zip') as tmp:
|
|
tmp_path = tmp.name
|
|
|
|
success = storage_service.download_file(ruta_archivo, tmp_path)
|
|
|
|
if not success:
|
|
print(f"No se pudo descargar: {ruta_archivo}")
|
|
return {'detail': f'No se pudo descargar el archivo: {ruta_archivo}'}
|
|
|
|
file_path = tmp_path
|
|
|
|
# Obtener organización
|
|
user_organizacion = None
|
|
if user_organizacion_id:
|
|
try:
|
|
user_organizacion = Organizacion.objects.get(id=user_organizacion_id)
|
|
except Organizacion.DoesNotExist:
|
|
print(f"Organización no encontrada: {user_organizacion_id}")
|
|
|
|
# Leer ZIP y lanzar subtareas
|
|
subtasks = []
|
|
with zipfile.ZipFile(file_path, 'r') as zip_ref:
|
|
namelist = zip_ref.namelist()
|
|
|
|
for asc_name in namelist:
|
|
if asc_name.endswith('.asc'):
|
|
subtasks.append(
|
|
procesar_archivo_asc_task.s(datastage_id, user_organizacion_id, asc_name)
|
|
)
|
|
|
|
if subtasks:
|
|
job = group(subtasks).apply_async()
|
|
print(f"Grupo de tareas lanzado: {job.id}")
|
|
return {
|
|
'group_id': job.id,
|
|
'subtask_ids': [t.id for t in job.results],
|
|
'detail': f'Procesamiento lanzado. {len(subtasks)} archivos .ASC en cola.'
|
|
}
|
|
|
|
print("No se encontraron archivos .ASC")
|
|
return {'detail': 'No se encontraron archivos .asc'}
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
return {'error': str(e), 'traceback': traceback.format_exc()}
|
|
|
|
finally:
|
|
# Limpiar temporal
|
|
if tmp_path and os.path.exists(tmp_path):
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except Exception as e:
|
|
print(f"No se pudo eliminar temporal: {e}")
|
|
|
|
@shared_task
|
|
def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name):
|
|
"""
|
|
Procesa un archivo .ASC individual dentro del ZIP
|
|
"""
|
|
tmp_path = None
|
|
try:
|
|
from api.datastage.models import DataStage
|
|
from api.organization.models import Organizacion
|
|
from api.customs.models import Pedimento, TipoOperacion, Regimen
|
|
import datetime
|
|
|
|
# Obtener datastage
|
|
datastage = DataStage.objects.get(id=datastage_id)
|
|
user_organizacion = None
|
|
if user_organizacion_id:
|
|
user_organizacion = Organizacion.objects.get(id=user_organizacion_id)
|
|
|
|
ruta_archivo = str(datastage.archivo)
|
|
|
|
# Descargar archivo
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.zip') as tmp:
|
|
tmp_path = tmp.name
|
|
|
|
success = storage_service.download_file(ruta_archivo, tmp_path)
|
|
if not success:
|
|
return {'errores': [f'No se pudo descargar el archivo: {ruta_archivo}']}
|
|
|
|
file_path = tmp_path
|
|
|
|
def to_snake_case(name):
|
|
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
|
|
s2 = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1)
|
|
return s2.replace('__', '_').lower()
|
|
|
|
objects_to_create = []
|
|
|
|
with zipfile.ZipFile(file_path, 'r') as zip_ref:
|
|
if asc_name not in zip_ref.namelist():
|
|
print(f"❌ {asc_name} no encontrado en el ZIP")
|
|
return {'errores': [f'{asc_name} no encontrado en el zip']}
|
|
|
|
# Determinar modelo
|
|
match = re.match(r'.*_(\d+)\.asc$', asc_name)
|
|
if match:
|
|
registro_key = match.group(1)
|
|
model_name = f'Registro{registro_key}'
|
|
else:
|
|
match2 = re.match(r'.*_([A-Za-z]+)\.asc$', asc_name)
|
|
if match2:
|
|
registro_key = match2.group(1).capitalize()
|
|
model_name = f'Registro{registro_key}'
|
|
else:
|
|
return {'errores': ["No se pudo determinar el modelo"]}
|
|
try:
|
|
Model = apps.get_model('datastage', model_name)
|
|
except LookupError:
|
|
return {'errores': [f"No existe el modelo para {model_name}"]}
|
|
|
|
# Procesar archivo
|
|
with zip_ref.open(asc_name) as asc_file:
|
|
first = True
|
|
field_names_snake = []
|
|
line_count = 0
|
|
|
|
for line in asc_file:
|
|
line_count += 1
|
|
try:
|
|
line_decoded = line.decode('utf-8').strip()
|
|
except UnicodeDecodeError:
|
|
try:
|
|
line_decoded = line.decode('latin-1').strip()
|
|
except Exception:
|
|
continue
|
|
|
|
if not line_decoded:
|
|
continue
|
|
|
|
if first:
|
|
field_names = [f for f in line_decoded.split('|')]
|
|
field_names_snake = [to_snake_case(f) for f in field_names]
|
|
first = False
|
|
continue
|
|
|
|
values = line_decoded.split('|')
|
|
while values and values[-1] == '':
|
|
values.pop()
|
|
if len(values) != len(field_names_snake):
|
|
continue
|
|
|
|
data = dict(zip(field_names_snake, values))
|
|
|
|
if hasattr(Model, 'organizacion_id'):
|
|
data['organizacion_id'] = user_organizacion.id if user_organizacion else None
|
|
if hasattr(Model, 'datastage_id'):
|
|
data['datastage_id'] = datastage.id
|
|
|
|
# Limpiar fechas vacías
|
|
for field in Model._meta.get_fields():
|
|
if hasattr(field, 'get_internal_type') and field.get_internal_type() in ["DateField", "DateTimeField"]:
|
|
if data.get(field.name) == "":
|
|
data[field.name] = None
|
|
|
|
# Convertir fecha_pago_real
|
|
if 'fecha_pago_real' in data and data['fecha_pago_real']:
|
|
fecha_val = data['fecha_pago_real']
|
|
if isinstance(fecha_val, str):
|
|
try:
|
|
dt = datetime.datetime.strptime(fecha_val, '%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
try:
|
|
dt = datetime.datetime.strptime(fecha_val, '%Y-%m-%d')
|
|
except Exception:
|
|
dt = None
|
|
if dt and timezone.is_naive(dt):
|
|
dt = timezone.make_aware(dt)
|
|
if dt:
|
|
data['fecha_pago_real'] = dt
|
|
|
|
try:
|
|
obj = Model(**data)
|
|
objects_to_create.append(obj)
|
|
|
|
# Si es Registro501, crear Pedimento
|
|
if model_name == 'Registro501':
|
|
organizacion_instance = None
|
|
org_id = data.get('organizacion_id')
|
|
if org_id:
|
|
try:
|
|
organizacion_instance = Organizacion.objects.get(id=org_id)
|
|
except Exception as org_exc:
|
|
print(f"No se encontró la organización con id {org_id}: {org_exc}")
|
|
if not organizacion_instance:
|
|
organizacion_instance = user_organizacion
|
|
fecha_pago_raw = data.get('fecha_pago_real')
|
|
fecha_pago = None
|
|
if fecha_pago_raw:
|
|
if isinstance(fecha_pago_raw, str):
|
|
fecha_pago = fecha_pago_raw.split(' ')[0]
|
|
elif hasattr(fecha_pago_raw, 'date'):
|
|
fecha_pago = fecha_pago_raw.date()
|
|
else:
|
|
fecha_pago = fecha_pago_raw
|
|
aduana = data.get('seccion_aduanera')
|
|
# logger.info(f"aduana >>>> {aduana}")
|
|
patente = data.get('patente')
|
|
pedimento_num = data.get('pedimento')
|
|
pedimento_app = ""
|
|
try:
|
|
if fecha_pago and aduana and patente and pedimento_num:
|
|
if isinstance(fecha_pago, str):
|
|
year = fecha_pago[:4]
|
|
else:
|
|
year = str(fecha_pago.year)
|
|
# mantener aduana con sus digitos intactos
|
|
# pedimento_app = f"{year[-2:]}-{str(aduana).zfill(2)[-2:]}-{str(patente).zfill(4)[-4:]}-{str(pedimento_num).zfill(7)[-7:]}"
|
|
# pedimento_app = f"{year[-2:]}-{str(aduana)}-{str(patente).zfill(4)[-4:]}-{str(pedimento_num).zfill(7)[-7:]}"
|
|
pedimento_app = f"{year[-2:]}-{str(aduana).zfill(2)[:2]}-{str(patente).zfill(4)[-4:]}-{str(pedimento_num).zfill(7)[-7:]}"
|
|
# logger.info(f"pedimento_app >>>> {pedimento_app}")
|
|
except Exception as ped_app_exc:
|
|
print(f"No se pudo generar pedimento_app: {ped_app_exc}")
|
|
tipo_operacion_val = data.get('tipo_operacion')
|
|
tipo_operacion = TipoOperacion.objects.filter(id=int(tipo_operacion_val)).first() if tipo_operacion_val else None
|
|
regimen = Regimen.objects.filter(claveped=data.get('clave_documento', '').strip(), tipo=tipo_operacion.id if tipo_operacion else None).first() if tipo_operacion else None
|
|
# Buscar o crear Importador para el RFC
|
|
importador_instance = None
|
|
rfc = data.get('rfc')
|
|
if rfc:
|
|
from api.customs.models import Importador
|
|
importador_instance = Importador.objects.filter(rfc=rfc).first()
|
|
if not importador_instance and organizacion_instance:
|
|
importador_instance = Importador.objects.create(rfc=rfc, organizacion=organizacion_instance)
|
|
pedimento_data = {
|
|
'pedimento': pedimento_num,
|
|
'patente': patente,
|
|
'aduana': aduana,
|
|
'regimen': regimen.regimenped if regimen else None,
|
|
'clave_pedimento': data.get('clave_documento'),
|
|
'pedimento_app': pedimento_app,
|
|
'organizacion': organizacion_instance,
|
|
'patente': patente,
|
|
'fecha_pago': fecha_pago,
|
|
'alerta': False,
|
|
'contribuyente': importador_instance,
|
|
'agente_aduanal': data.get('curp_agente_a'),
|
|
"tipo_operacion": tipo_operacion,
|
|
"numero_partidas": data.get('numero_partidas', 0),
|
|
"importe_total": data.get('importe_total', 0.0),
|
|
"saldo_disponible": data.get('saldo_disponible', 0.0),
|
|
"importe_pedimento": data.get('importe_pedimento', 0.0),
|
|
"existe_expediente": data.get('existe_expediente', False),
|
|
"remesas": data.get('remesas', False),
|
|
}
|
|
try:
|
|
Pedimento.objects.create(**pedimento_data)
|
|
except Exception as ped_exc:
|
|
pass
|
|
except Exception as e:
|
|
continue
|
|
|
|
# Bulk create
|
|
if objects_to_create:
|
|
try:
|
|
Model.objects.bulk_create(objects_to_create, batch_size=1000)
|
|
except Exception as e:
|
|
return {'archivo': asc_name, 'error': str(e)}
|
|
|
|
return {
|
|
'archivo': asc_name,
|
|
'insertados': len(objects_to_create)
|
|
}
|
|
except Exception as e:
|
|
import traceback
|
|
return {'archivo': asc_name, 'error': str(e), 'traceback': traceback.format_exc()}
|
|
|
|
finally:
|
|
# Limpiar temporal
|
|
if tmp_path and os.path.exists(tmp_path):
|
|
try:
|
|
os.unlink(tmp_path)
|
|
except Exception as e:
|
|
print(f"No se pudo eliminar temporal: {e}") |