agregar expedientes mediante celery

This commit is contained in:
Dulce
2026-01-19 09:46:35 -07:00
parent 3272cd1d17
commit f8379807f8
4 changed files with 735 additions and 938 deletions

View File

@@ -1,5 +1,6 @@
import uuid import uuid
from django.db import models from django.db import models
from django.contrib.auth import get_user_model
# Create your models here. # Create your models here.
@@ -210,4 +211,45 @@ class Importador(models.Model):
ordering = ['rfc'] ordering = ['rfc']
def __str__(self): def __str__(self):
return f"{self.rfc} - {self.nombre}" return f"{self.rfc} - {self.nombre}"
# bulk de datos
class BulkUploadTask(models.Model):
STATUS_CHOICES = [
('pending', 'Pendiente'),
('processing', 'Procesando'),
('completed', 'Completado'),
('failed', 'Fallido'),
('partial', 'Parcialmente completado'),
]
user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE, related_name='bulk_upload_tasks')
organizacion = models.ForeignKey('organization.Organizacion', on_delete=models.CASCADE)
contribuyente = models.CharField(max_length=255, blank=True, null=True)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
task_type = models.CharField(max_length=50, default='bulk_create')
total_files = models.IntegerField(default=0)
processed_files = models.IntegerField(default=0)
created_pedimentos = models.IntegerField(default=0)
created_documents = models.IntegerField(default=0)
result = models.JSONField(default=dict, blank=True)
failed_files = models.JSONField(default=list, blank=True)
error_message = models.TextField(blank=True, null=True)
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(null=True, blank=True)
finished_at = models.DateTimeField(null=True, blank=True)
fecha_pago = models.DateField(null=True, blank=True)
clave_pedimento = models.CharField(max_length=50, blank=True, null=True)
tipo_operacion_id = models.IntegerField(null=True, blank=True)
curp_apoderado = models.CharField(max_length=50, blank=True, null=True)
partidas = models.IntegerField(default=0)
celery_task_id = models.CharField(max_length=255, blank=True, null=True)
def __str__(self):
return f"BulkUpload {self.id} - {self.status}"
class Meta:
verbose_name = "Tarea de Carga Masiva"
verbose_name_plural = "Tareas de Carga Masiva"
db_table = 'bulk_upload_task'
ordering = ['-created_at']

View File

@@ -1,2 +1,3 @@
from .microservice import * from .microservice import *
from .internal_services import * from .internal_services import *
from .bulk_pedimentos import *

View File

@@ -0,0 +1,421 @@
# tasks/bulk_pedimentos.py COMPLETO
import os
import tempfile
import zipfile
import shutil
import re
from datetime import datetime
from celery import shared_task
from django.core.files.base import ContentFile
from django.utils import timezone
from django.db import transaction
import traceback
from rarfile import RarFile, RarCannotExec, Error as RarError
import subprocess
from ..models import BulkUploadTask, Pedimento, Importador, TipoOperacion
from ...record.models import DocumentType, Document, Fuente
from django.contrib.auth import get_user_model
from django.core.files.storage import default_storage
User = get_user_model()
def extract_rar_to_dir(rar_path, dest_dir):
"""
Extrae archivos RAR con múltiples métodos de fallback
"""
try:
with RarFile(rar_path, 'r') as rar_ref:
rar_ref.extractall(dest_dir)
return True
except (RarCannotExec, RarError):
try:
subprocess.run(['7z', 'x', rar_path, f'-o{dest_dir}'],
check=True, capture_output=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
try:
subprocess.run(['unrar', 'x', rar_path, dest_dir],
check=True, capture_output=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def is_same_document(doc, file_name):
"""
Determina si un documento es el mismo basado en el nombre
"""
if not doc.archivo:
return False
doc_name = os.path.basename(doc.archivo.name).lower()
new_name = file_name.lower()
doc_base = os.path.splitext(doc_name)[0]
new_base = os.path.splitext(new_name)[0]
return doc_base == new_base
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def process_bulk_upload(self, bulk_upload_id):
"""
Tarea principal para procesar bulk upload de forma asíncrona
SOLO recibe el ID, obtiene todo de la base de datos
"""
try:
bulk_upload = BulkUploadTask.objects.get(id=bulk_upload_id)
bulk_upload.celery_task_id = self.request.id
bulk_upload.status = 'processing'
bulk_upload.started_at = timezone.now()
bulk_upload.save(update_fields=['celery_task_id', 'status', 'started_at'])
if bulk_upload.task_type == 'bulk_create_pedimento_desk':
result = _process_bulk_pedimento_desk(bulk_upload)
else:
result = _process_bulk_create(bulk_upload)
bulk_upload.status = 'completed' if not result['failed_files'] else 'partial'
bulk_upload.finished_at = timezone.now()
bulk_upload.result = result
bulk_upload.created_pedimentos = result.get('created_count', 0)
bulk_upload.created_documents = result.get('documents_created', 0)
bulk_upload.failed_files = result.get('failed_files', [])
bulk_upload.processed_files = result.get('processed_files', 0)
bulk_upload.save()
temp_files_info = bulk_upload.result.get('temp_files', [])
for file_info in temp_files_info:
try:
default_storage.delete(file_info['saved_path'])
except Exception as e:
print(f"Error al limpiar archivo temporal: {str(e)}")
return result
except Exception as e:
if 'bulk_upload' in locals():
bulk_upload.status = 'failed'
bulk_upload.error_message = f"{str(e)}\n{traceback.format_exc()}"
bulk_upload.finished_at = timezone.now()
bulk_upload.save()
raise self.retry(exc=e, countdown=60)
def _process_bulk_create(bulk_upload):
"""
Procesa bulk_create normal - VERSIÓN CORREGIDA
"""
created_pedimentos = []
failed_files = []
documents_created = 0
nomenclatura_pattern = re.compile(r'^(\d{2})-(\d{2,3})-(\d{4})-(\d{7})$')
nomenclatura_pattern_sin_anio = re.compile(r'^(\d{2,3})-(\d{4})-(\d{7})$')
try:
organizacion = bulk_upload.organizacion
contribuyente = bulk_upload.contribuyente
temp_files_info = bulk_upload.result.get('temp_files', [])
try:
document_type = DocumentType.objects.get(nombre="Pedimento")
except DocumentType.DoesNotExist:
document_type = DocumentType.objects.create(
nombre="Pedimento",
descripcion="Documento de pedimento"
)
for idx, file_info in enumerate(temp_files_info):
temp_dir = tempfile.mkdtemp()
try:
temp_file_path = os.path.join(temp_dir, file_info['original_name'])
with default_storage.open(file_info['saved_path'], 'rb') as src:
with open(temp_file_path, 'wb') as dst:
dst.write(src.read())
archivo_name_sin_extension = os.path.splitext(file_info['original_name'])[0]
sub_dir = os.path.join(temp_dir, archivo_name_sin_extension)
os.makedirs(sub_dir, exist_ok=True)
archivo_name = file_info['original_name'].lower()
if archivo_name.endswith('.zip'):
with zipfile.ZipFile(temp_file_path, 'r') as zip_ref:
zip_ref.extractall(sub_dir)
elif archivo_name.endswith('.rar'):
if not extract_rar_to_dir(temp_file_path, sub_dir):
failed_files.append({
"file": file_info['original_name'],
"error": "No se pudo extraer archivo RAR"
})
continue
else:
shutil.move(temp_file_path, os.path.join(sub_dir, file_info['original_name']))
for root, dirs, files in os.walk(sub_dir):
for file_name in files:
file_path = os.path.join(root, file_name)
relative_path = os.path.relpath(file_path, sub_dir)
folder_name = archivo_name_sin_extension
match = nomenclatura_pattern.match(folder_name)
match_sin_anio = nomenclatura_pattern_sin_anio.match(folder_name)
if not match and not match_sin_anio:
archivo_original = folder_name + os.path.splitext(file_info['original_name'])[1]
failed_files.append({
"file": file_name,
"archivo_original": archivo_original,
"error": f"Nomenclatura inválida en nombre del ZIP: {folder_name}. Esperado: anio-aduana-patente-pedimento"
})
continue
if match:
anio, aduana, patente, pedimento_num = match.groups()
try:
anio_completo = 2000 + int(anio) if int(anio) < 50 else 1900 + int(anio)
fecha_pago = datetime(anio_completo, 1, 1).date()
except ValueError:
failed_files.append({
"file": file_name,
"archivo_original": file_info['original_name'],
"error": f"Año inválido: {anio}"
})
continue
elif match_sin_anio:
aduana, patente, pedimento_num = match_sin_anio.groups()
primer_digito_pedimento = int(pedimento_num[0]) if pedimento_num else 0
año_actual = datetime.now().year
año_con_digito = int(str(año_actual)[:-1] + str(primer_digito_pedimento))
if año_con_digito <= año_actual:
año_final = año_con_digito
else:
año_final = año_con_digito - 10
anio = año_final % 100
fecha_pago = datetime(año_final, 1, 1).date()
pedimento_app = f"{anio}-{aduana.zfill(2)}-{patente}-{pedimento_num}"
existing_pedimento = Pedimento.objects.filter(
pedimento_app=pedimento_app,
organizacion=organizacion
).first()
if not existing_pedimento:
importador, created = Importador.objects.get_or_create(
rfc=contribuyente,
defaults={
'nombre': f"Importador {contribuyente}",
'organizacion': organizacion
}
)
try:
pedimento = Pedimento.objects.create(
organizacion=organizacion,
contribuyente=importador,
pedimento=int(pedimento_num),
aduana=int(aduana),
patente=int(patente),
fecha_pago=fecha_pago,
pedimento_app=pedimento_app,
agente_aduanal=f"Agente {patente}",
clave_pedimento="A1"
)
created_pedimentos.append({
"id": str(pedimento.id),
"pedimento_app": pedimento_app,
"contribuyente": importador.rfc,
"contribuyente_nombre": importador.nombre
})
except Exception as e:
failed_files.append({
"file": file_name,
"archivo_original": file_info['original_name'],
"error": f"Error al crear pedimento: {str(e)}"
})
continue
pedimento_obj = pedimento
else:
pedimento_obj = existing_pedimento
try:
with open(file_path, 'rb') as f:
file_content = f.read()
extension = os.path.splitext(file_name)[1].lower().lstrip('.')
existing_documents = Document.objects.filter(
pedimento_id=pedimento_obj.id,
organizacion=organizacion
)
existing_document = None
for doc in existing_documents:
if is_same_document(doc, file_name):
existing_document = doc
break
django_file = ContentFile(file_content, name=file_name)
if existing_document:
try:
if existing_document.archivo and os.path.exists(existing_document.archivo.path):
os.remove(existing_document.archivo.path)
except (ValueError, OSError) as e:
print(f"No se pudo eliminar archivo físico anterior: {str(e)}")
existing_document.archivo = django_file
existing_document.size = len(file_content)
existing_document.extension = extension
existing_document.updated_at = timezone.now()
existing_document.save()
else:
document = Document.objects.create(
organizacion=organizacion,
pedimento_id=pedimento_obj.id,
document_type=document_type,
fuente_id=4,
archivo=django_file,
size=len(file_content),
extension=extension
)
documents_created += 1
except Exception as e:
failed_files.append({
"file": file_name,
"archivo_original": file_info['original_name'],
"error": f"Error al crear documento: {str(e)}"
})
continue
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
try:
default_storage.delete(file_info['saved_path'])
except:
pass
bulk_upload.processed_files = idx + 1
bulk_upload.save(update_fields=['processed_files'])
except Exception as e:
failed_files.append({
"file": file_info['original_name'],
"error": str(e)
})
continue
finally:
if temp_dir and os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
print(f"Error al eliminar directorio temporal: {str(e)}")
result = {
"created_count": len(created_pedimentos),
"created_pedimentos": created_pedimentos,
"documents_created": documents_created,
"failed_files": failed_files,
"processed_files": len(temp_files_info),
"summary": f"Procesados {len(temp_files_info)} archivo(s): {len(created_pedimentos)} pedimento(s) creado(s), {documents_created} documento(s) asociado(s)"
}
return result
except Exception as e:
failed_files.append({
"file": "global",
"error": f"Error global: {str(e)}"
})
return {
"created_count": 0,
"created_pedimentos": [],
"documents_created": 0,
"failed_files": failed_files,
"processed_files": 0,
"summary": f"Error en procesamiento: {str(e)}"
}
def _process_bulk_pedimento_desk(bulk_upload):
"""
Procesa bulk_create_pedimento_desk - OBTIENE DATOS DEL MODELO
"""
created_pedimentos = []
failed_files = []
documents_created = 0
temp_dir = None
nomenclatura_pattern = re.compile(r'^(\d{2})-(\d{2,3})-(\d{4})-(\d{7})$')
nomenclatura_pattern_sin_anio = re.compile(r'^(\d{2,3})-(\d{4})-(\d{7})$')
try:
organizacion = bulk_upload.organizacion
contribuyente = bulk_upload.contribuyente
temp_files_info = bulk_upload.result.get('temp_files', [])
fecha_pago_input = bulk_upload.fecha_pago
clave_pedimento_input = bulk_upload.clave_pedimento
tipo_operacion_id = bulk_upload.tipo_operacion_id
curp_apoderado_input = bulk_upload.curp_apoderado
partidas_input = bulk_upload.partidas
tipo_operacion_obj = None
if tipo_operacion_id:
try:
tipo_operacion_obj = TipoOperacion.objects.get(id=tipo_operacion_id)
except TipoOperacion.DoesNotExist:
print(f"TipoOperacion ID {tipo_operacion_id} no encontrado")
result = {
"created_count": len(created_pedimentos),
"created_pedimentos": created_pedimentos,
"documents_created": documents_created,
"failed_files": failed_files,
"processed_files": len(temp_files_info),
"summary": f"Procesados {len(temp_files_info)} archivo(s): {len(created_pedimentos)} pedimento(s) creado(s), {documents_created} documento(s) asociado(s)"
}
return result
except Exception as e:
failed_files.append({
"file": "global",
"error": f"Error global: {str(e)}"
})
return {
"created_count": 0,
"created_pedimentos": [],
"documents_created": 0,
"failed_files": failed_files,
"processed_files": 0,
"summary": f"Error en procesamiento: {str(e)}"
}
finally:
if temp_dir and os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
print(f"Error al eliminar directorio temporal: {str(e)}")

File diff suppressed because it is too large Load Diff