Compare commits
1 Commits
req--T2025
...
celery-ped
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f8379807f8 |
@@ -1,5 +1,6 @@
|
|||||||
import uuid
|
import uuid
|
||||||
from django.db import models
|
from django.db import models
|
||||||
|
from django.contrib.auth import get_user_model
|
||||||
|
|
||||||
# Create your models here.
|
# Create your models here.
|
||||||
|
|
||||||
@@ -210,4 +211,45 @@ class Importador(models.Model):
|
|||||||
ordering = ['rfc']
|
ordering = ['rfc']
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return f"{self.rfc} - {self.nombre}"
|
return f"{self.rfc} - {self.nombre}"
|
||||||
|
|
||||||
|
# bulk de datos
|
||||||
|
class BulkUploadTask(models.Model):
|
||||||
|
STATUS_CHOICES = [
|
||||||
|
('pending', 'Pendiente'),
|
||||||
|
('processing', 'Procesando'),
|
||||||
|
('completed', 'Completado'),
|
||||||
|
('failed', 'Fallido'),
|
||||||
|
('partial', 'Parcialmente completado'),
|
||||||
|
]
|
||||||
|
|
||||||
|
user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE, related_name='bulk_upload_tasks')
|
||||||
|
organizacion = models.ForeignKey('organization.Organizacion', on_delete=models.CASCADE)
|
||||||
|
contribuyente = models.CharField(max_length=255, blank=True, null=True)
|
||||||
|
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
|
||||||
|
task_type = models.CharField(max_length=50, default='bulk_create')
|
||||||
|
total_files = models.IntegerField(default=0)
|
||||||
|
processed_files = models.IntegerField(default=0)
|
||||||
|
created_pedimentos = models.IntegerField(default=0)
|
||||||
|
created_documents = models.IntegerField(default=0)
|
||||||
|
result = models.JSONField(default=dict, blank=True)
|
||||||
|
failed_files = models.JSONField(default=list, blank=True)
|
||||||
|
error_message = models.TextField(blank=True, null=True)
|
||||||
|
created_at = models.DateTimeField(auto_now_add=True)
|
||||||
|
started_at = models.DateTimeField(null=True, blank=True)
|
||||||
|
finished_at = models.DateTimeField(null=True, blank=True)
|
||||||
|
fecha_pago = models.DateField(null=True, blank=True)
|
||||||
|
clave_pedimento = models.CharField(max_length=50, blank=True, null=True)
|
||||||
|
tipo_operacion_id = models.IntegerField(null=True, blank=True)
|
||||||
|
curp_apoderado = models.CharField(max_length=50, blank=True, null=True)
|
||||||
|
partidas = models.IntegerField(default=0)
|
||||||
|
celery_task_id = models.CharField(max_length=255, blank=True, null=True)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"BulkUpload {self.id} - {self.status}"
|
||||||
|
|
||||||
|
class Meta:
|
||||||
|
verbose_name = "Tarea de Carga Masiva"
|
||||||
|
verbose_name_plural = "Tareas de Carga Masiva"
|
||||||
|
db_table = 'bulk_upload_task'
|
||||||
|
ordering = ['-created_at']
|
||||||
@@ -1,2 +1,3 @@
|
|||||||
from .microservice import *
|
from .microservice import *
|
||||||
from .internal_services import *
|
from .internal_services import *
|
||||||
|
from .bulk_pedimentos import *
|
||||||
421
api/customs/tasks/bulk_pedimentos.py
Normal file
421
api/customs/tasks/bulk_pedimentos.py
Normal file
@@ -0,0 +1,421 @@
|
|||||||
|
# tasks/bulk_pedimentos.py COMPLETO
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import zipfile
|
||||||
|
import shutil
|
||||||
|
import re
|
||||||
|
from datetime import datetime
|
||||||
|
from celery import shared_task
|
||||||
|
from django.core.files.base import ContentFile
|
||||||
|
from django.utils import timezone
|
||||||
|
from django.db import transaction
|
||||||
|
import traceback
|
||||||
|
from rarfile import RarFile, RarCannotExec, Error as RarError
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
from ..models import BulkUploadTask, Pedimento, Importador, TipoOperacion
|
||||||
|
from ...record.models import DocumentType, Document, Fuente
|
||||||
|
from django.contrib.auth import get_user_model
|
||||||
|
from django.core.files.storage import default_storage
|
||||||
|
|
||||||
|
User = get_user_model()
|
||||||
|
|
||||||
|
def extract_rar_to_dir(rar_path, dest_dir):
|
||||||
|
"""
|
||||||
|
Extrae archivos RAR con múltiples métodos de fallback
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with RarFile(rar_path, 'r') as rar_ref:
|
||||||
|
rar_ref.extractall(dest_dir)
|
||||||
|
return True
|
||||||
|
except (RarCannotExec, RarError):
|
||||||
|
try:
|
||||||
|
subprocess.run(['7z', 'x', rar_path, f'-o{dest_dir}'],
|
||||||
|
check=True, capture_output=True)
|
||||||
|
return True
|
||||||
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||||
|
try:
|
||||||
|
subprocess.run(['unrar', 'x', rar_path, dest_dir],
|
||||||
|
check=True, capture_output=True)
|
||||||
|
return True
|
||||||
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def is_same_document(doc, file_name):
|
||||||
|
"""
|
||||||
|
Determina si un documento es el mismo basado en el nombre
|
||||||
|
"""
|
||||||
|
if not doc.archivo:
|
||||||
|
return False
|
||||||
|
|
||||||
|
doc_name = os.path.basename(doc.archivo.name).lower()
|
||||||
|
new_name = file_name.lower()
|
||||||
|
|
||||||
|
doc_base = os.path.splitext(doc_name)[0]
|
||||||
|
new_base = os.path.splitext(new_name)[0]
|
||||||
|
|
||||||
|
return doc_base == new_base
|
||||||
|
|
||||||
|
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
|
||||||
|
def process_bulk_upload(self, bulk_upload_id):
|
||||||
|
"""
|
||||||
|
Tarea principal para procesar bulk upload de forma asíncrona
|
||||||
|
SOLO recibe el ID, obtiene todo de la base de datos
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
|
||||||
|
bulk_upload = BulkUploadTask.objects.get(id=bulk_upload_id)
|
||||||
|
|
||||||
|
bulk_upload.celery_task_id = self.request.id
|
||||||
|
bulk_upload.status = 'processing'
|
||||||
|
bulk_upload.started_at = timezone.now()
|
||||||
|
bulk_upload.save(update_fields=['celery_task_id', 'status', 'started_at'])
|
||||||
|
|
||||||
|
if bulk_upload.task_type == 'bulk_create_pedimento_desk':
|
||||||
|
result = _process_bulk_pedimento_desk(bulk_upload)
|
||||||
|
else:
|
||||||
|
result = _process_bulk_create(bulk_upload)
|
||||||
|
|
||||||
|
bulk_upload.status = 'completed' if not result['failed_files'] else 'partial'
|
||||||
|
bulk_upload.finished_at = timezone.now()
|
||||||
|
bulk_upload.result = result
|
||||||
|
bulk_upload.created_pedimentos = result.get('created_count', 0)
|
||||||
|
bulk_upload.created_documents = result.get('documents_created', 0)
|
||||||
|
bulk_upload.failed_files = result.get('failed_files', [])
|
||||||
|
bulk_upload.processed_files = result.get('processed_files', 0)
|
||||||
|
bulk_upload.save()
|
||||||
|
|
||||||
|
temp_files_info = bulk_upload.result.get('temp_files', [])
|
||||||
|
for file_info in temp_files_info:
|
||||||
|
try:
|
||||||
|
default_storage.delete(file_info['saved_path'])
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error al limpiar archivo temporal: {str(e)}")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
if 'bulk_upload' in locals():
|
||||||
|
bulk_upload.status = 'failed'
|
||||||
|
bulk_upload.error_message = f"{str(e)}\n{traceback.format_exc()}"
|
||||||
|
bulk_upload.finished_at = timezone.now()
|
||||||
|
bulk_upload.save()
|
||||||
|
|
||||||
|
raise self.retry(exc=e, countdown=60)
|
||||||
|
|
||||||
|
def _process_bulk_create(bulk_upload):
|
||||||
|
"""
|
||||||
|
Procesa bulk_create normal - VERSIÓN CORREGIDA
|
||||||
|
"""
|
||||||
|
|
||||||
|
created_pedimentos = []
|
||||||
|
failed_files = []
|
||||||
|
documents_created = 0
|
||||||
|
nomenclatura_pattern = re.compile(r'^(\d{2})-(\d{2,3})-(\d{4})-(\d{7})$')
|
||||||
|
nomenclatura_pattern_sin_anio = re.compile(r'^(\d{2,3})-(\d{4})-(\d{7})$')
|
||||||
|
|
||||||
|
try:
|
||||||
|
organizacion = bulk_upload.organizacion
|
||||||
|
contribuyente = bulk_upload.contribuyente
|
||||||
|
temp_files_info = bulk_upload.result.get('temp_files', [])
|
||||||
|
|
||||||
|
try:
|
||||||
|
document_type = DocumentType.objects.get(nombre="Pedimento")
|
||||||
|
except DocumentType.DoesNotExist:
|
||||||
|
document_type = DocumentType.objects.create(
|
||||||
|
nombre="Pedimento",
|
||||||
|
descripcion="Documento de pedimento"
|
||||||
|
)
|
||||||
|
|
||||||
|
for idx, file_info in enumerate(temp_files_info):
|
||||||
|
temp_dir = tempfile.mkdtemp()
|
||||||
|
|
||||||
|
try:
|
||||||
|
temp_file_path = os.path.join(temp_dir, file_info['original_name'])
|
||||||
|
with default_storage.open(file_info['saved_path'], 'rb') as src:
|
||||||
|
with open(temp_file_path, 'wb') as dst:
|
||||||
|
dst.write(src.read())
|
||||||
|
|
||||||
|
archivo_name_sin_extension = os.path.splitext(file_info['original_name'])[0]
|
||||||
|
sub_dir = os.path.join(temp_dir, archivo_name_sin_extension)
|
||||||
|
os.makedirs(sub_dir, exist_ok=True)
|
||||||
|
|
||||||
|
archivo_name = file_info['original_name'].lower()
|
||||||
|
|
||||||
|
if archivo_name.endswith('.zip'):
|
||||||
|
with zipfile.ZipFile(temp_file_path, 'r') as zip_ref:
|
||||||
|
zip_ref.extractall(sub_dir)
|
||||||
|
|
||||||
|
elif archivo_name.endswith('.rar'):
|
||||||
|
if not extract_rar_to_dir(temp_file_path, sub_dir):
|
||||||
|
failed_files.append({
|
||||||
|
"file": file_info['original_name'],
|
||||||
|
"error": "No se pudo extraer archivo RAR"
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
|
||||||
|
else:
|
||||||
|
shutil.move(temp_file_path, os.path.join(sub_dir, file_info['original_name']))
|
||||||
|
|
||||||
|
for root, dirs, files in os.walk(sub_dir):
|
||||||
|
for file_name in files:
|
||||||
|
file_path = os.path.join(root, file_name)
|
||||||
|
relative_path = os.path.relpath(file_path, sub_dir)
|
||||||
|
folder_name = archivo_name_sin_extension
|
||||||
|
|
||||||
|
match = nomenclatura_pattern.match(folder_name)
|
||||||
|
match_sin_anio = nomenclatura_pattern_sin_anio.match(folder_name)
|
||||||
|
|
||||||
|
if not match and not match_sin_anio:
|
||||||
|
archivo_original = folder_name + os.path.splitext(file_info['original_name'])[1]
|
||||||
|
failed_files.append({
|
||||||
|
"file": file_name,
|
||||||
|
"archivo_original": archivo_original,
|
||||||
|
"error": f"Nomenclatura inválida en nombre del ZIP: {folder_name}. Esperado: anio-aduana-patente-pedimento"
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
|
||||||
|
if match:
|
||||||
|
anio, aduana, patente, pedimento_num = match.groups()
|
||||||
|
|
||||||
|
try:
|
||||||
|
anio_completo = 2000 + int(anio) if int(anio) < 50 else 1900 + int(anio)
|
||||||
|
fecha_pago = datetime(anio_completo, 1, 1).date()
|
||||||
|
except ValueError:
|
||||||
|
failed_files.append({
|
||||||
|
"file": file_name,
|
||||||
|
"archivo_original": file_info['original_name'],
|
||||||
|
"error": f"Año inválido: {anio}"
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
|
||||||
|
elif match_sin_anio:
|
||||||
|
aduana, patente, pedimento_num = match_sin_anio.groups()
|
||||||
|
primer_digito_pedimento = int(pedimento_num[0]) if pedimento_num else 0
|
||||||
|
año_actual = datetime.now().year
|
||||||
|
|
||||||
|
año_con_digito = int(str(año_actual)[:-1] + str(primer_digito_pedimento))
|
||||||
|
|
||||||
|
if año_con_digito <= año_actual:
|
||||||
|
año_final = año_con_digito
|
||||||
|
else:
|
||||||
|
año_final = año_con_digito - 10
|
||||||
|
|
||||||
|
anio = año_final % 100
|
||||||
|
fecha_pago = datetime(año_final, 1, 1).date()
|
||||||
|
|
||||||
|
pedimento_app = f"{anio}-{aduana.zfill(2)}-{patente}-{pedimento_num}"
|
||||||
|
|
||||||
|
existing_pedimento = Pedimento.objects.filter(
|
||||||
|
pedimento_app=pedimento_app,
|
||||||
|
organizacion=organizacion
|
||||||
|
).first()
|
||||||
|
|
||||||
|
if not existing_pedimento:
|
||||||
|
|
||||||
|
importador, created = Importador.objects.get_or_create(
|
||||||
|
rfc=contribuyente,
|
||||||
|
defaults={
|
||||||
|
'nombre': f"Importador {contribuyente}",
|
||||||
|
'organizacion': organizacion
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
pedimento = Pedimento.objects.create(
|
||||||
|
organizacion=organizacion,
|
||||||
|
contribuyente=importador,
|
||||||
|
pedimento=int(pedimento_num),
|
||||||
|
aduana=int(aduana),
|
||||||
|
patente=int(patente),
|
||||||
|
fecha_pago=fecha_pago,
|
||||||
|
pedimento_app=pedimento_app,
|
||||||
|
agente_aduanal=f"Agente {patente}",
|
||||||
|
clave_pedimento="A1"
|
||||||
|
)
|
||||||
|
|
||||||
|
created_pedimentos.append({
|
||||||
|
"id": str(pedimento.id),
|
||||||
|
"pedimento_app": pedimento_app,
|
||||||
|
"contribuyente": importador.rfc,
|
||||||
|
"contribuyente_nombre": importador.nombre
|
||||||
|
})
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
failed_files.append({
|
||||||
|
"file": file_name,
|
||||||
|
"archivo_original": file_info['original_name'],
|
||||||
|
"error": f"Error al crear pedimento: {str(e)}"
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
|
||||||
|
pedimento_obj = pedimento
|
||||||
|
else:
|
||||||
|
pedimento_obj = existing_pedimento
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(file_path, 'rb') as f:
|
||||||
|
file_content = f.read()
|
||||||
|
|
||||||
|
extension = os.path.splitext(file_name)[1].lower().lstrip('.')
|
||||||
|
|
||||||
|
existing_documents = Document.objects.filter(
|
||||||
|
pedimento_id=pedimento_obj.id,
|
||||||
|
organizacion=organizacion
|
||||||
|
)
|
||||||
|
|
||||||
|
existing_document = None
|
||||||
|
for doc in existing_documents:
|
||||||
|
if is_same_document(doc, file_name):
|
||||||
|
existing_document = doc
|
||||||
|
break
|
||||||
|
|
||||||
|
django_file = ContentFile(file_content, name=file_name)
|
||||||
|
|
||||||
|
if existing_document:
|
||||||
|
try:
|
||||||
|
if existing_document.archivo and os.path.exists(existing_document.archivo.path):
|
||||||
|
os.remove(existing_document.archivo.path)
|
||||||
|
except (ValueError, OSError) as e:
|
||||||
|
print(f"No se pudo eliminar archivo físico anterior: {str(e)}")
|
||||||
|
|
||||||
|
existing_document.archivo = django_file
|
||||||
|
existing_document.size = len(file_content)
|
||||||
|
existing_document.extension = extension
|
||||||
|
existing_document.updated_at = timezone.now()
|
||||||
|
existing_document.save()
|
||||||
|
|
||||||
|
else:
|
||||||
|
document = Document.objects.create(
|
||||||
|
organizacion=organizacion,
|
||||||
|
pedimento_id=pedimento_obj.id,
|
||||||
|
document_type=document_type,
|
||||||
|
fuente_id=4,
|
||||||
|
archivo=django_file,
|
||||||
|
size=len(file_content),
|
||||||
|
extension=extension
|
||||||
|
)
|
||||||
|
|
||||||
|
documents_created += 1
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
failed_files.append({
|
||||||
|
"file": file_name,
|
||||||
|
"archivo_original": file_info['original_name'],
|
||||||
|
"error": f"Error al crear documento: {str(e)}"
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
|
||||||
|
if os.path.exists(temp_file_path):
|
||||||
|
os.remove(temp_file_path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
default_storage.delete(file_info['saved_path'])
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
bulk_upload.processed_files = idx + 1
|
||||||
|
bulk_upload.save(update_fields=['processed_files'])
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
failed_files.append({
|
||||||
|
"file": file_info['original_name'],
|
||||||
|
"error": str(e)
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
finally:
|
||||||
|
if temp_dir and os.path.exists(temp_dir):
|
||||||
|
try:
|
||||||
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error al eliminar directorio temporal: {str(e)}")
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"created_count": len(created_pedimentos),
|
||||||
|
"created_pedimentos": created_pedimentos,
|
||||||
|
"documents_created": documents_created,
|
||||||
|
"failed_files": failed_files,
|
||||||
|
"processed_files": len(temp_files_info),
|
||||||
|
"summary": f"Procesados {len(temp_files_info)} archivo(s): {len(created_pedimentos)} pedimento(s) creado(s), {documents_created} documento(s) asociado(s)"
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
failed_files.append({
|
||||||
|
"file": "global",
|
||||||
|
"error": f"Error global: {str(e)}"
|
||||||
|
})
|
||||||
|
|
||||||
|
return {
|
||||||
|
"created_count": 0,
|
||||||
|
"created_pedimentos": [],
|
||||||
|
"documents_created": 0,
|
||||||
|
"failed_files": failed_files,
|
||||||
|
"processed_files": 0,
|
||||||
|
"summary": f"Error en procesamiento: {str(e)}"
|
||||||
|
}
|
||||||
|
|
||||||
|
def _process_bulk_pedimento_desk(bulk_upload):
|
||||||
|
"""
|
||||||
|
Procesa bulk_create_pedimento_desk - OBTIENE DATOS DEL MODELO
|
||||||
|
"""
|
||||||
|
created_pedimentos = []
|
||||||
|
failed_files = []
|
||||||
|
documents_created = 0
|
||||||
|
temp_dir = None
|
||||||
|
|
||||||
|
nomenclatura_pattern = re.compile(r'^(\d{2})-(\d{2,3})-(\d{4})-(\d{7})$')
|
||||||
|
nomenclatura_pattern_sin_anio = re.compile(r'^(\d{2,3})-(\d{4})-(\d{7})$')
|
||||||
|
|
||||||
|
try:
|
||||||
|
organizacion = bulk_upload.organizacion
|
||||||
|
contribuyente = bulk_upload.contribuyente
|
||||||
|
temp_files_info = bulk_upload.result.get('temp_files', [])
|
||||||
|
|
||||||
|
fecha_pago_input = bulk_upload.fecha_pago
|
||||||
|
clave_pedimento_input = bulk_upload.clave_pedimento
|
||||||
|
tipo_operacion_id = bulk_upload.tipo_operacion_id
|
||||||
|
curp_apoderado_input = bulk_upload.curp_apoderado
|
||||||
|
partidas_input = bulk_upload.partidas
|
||||||
|
|
||||||
|
tipo_operacion_obj = None
|
||||||
|
if tipo_operacion_id:
|
||||||
|
try:
|
||||||
|
tipo_operacion_obj = TipoOperacion.objects.get(id=tipo_operacion_id)
|
||||||
|
except TipoOperacion.DoesNotExist:
|
||||||
|
print(f"TipoOperacion ID {tipo_operacion_id} no encontrado")
|
||||||
|
|
||||||
|
result = {
|
||||||
|
"created_count": len(created_pedimentos),
|
||||||
|
"created_pedimentos": created_pedimentos,
|
||||||
|
"documents_created": documents_created,
|
||||||
|
"failed_files": failed_files,
|
||||||
|
"processed_files": len(temp_files_info),
|
||||||
|
"summary": f"Procesados {len(temp_files_info)} archivo(s): {len(created_pedimentos)} pedimento(s) creado(s), {documents_created} documento(s) asociado(s)"
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
|
||||||
|
failed_files.append({
|
||||||
|
"file": "global",
|
||||||
|
"error": f"Error global: {str(e)}"
|
||||||
|
})
|
||||||
|
|
||||||
|
return {
|
||||||
|
"created_count": 0,
|
||||||
|
"created_pedimentos": [],
|
||||||
|
"documents_created": 0,
|
||||||
|
"failed_files": failed_files,
|
||||||
|
"processed_files": 0,
|
||||||
|
"summary": f"Error en procesamiento: {str(e)}"
|
||||||
|
}
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if temp_dir and os.path.exists(temp_dir):
|
||||||
|
try:
|
||||||
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error al eliminar directorio temporal: {str(e)}")
|
||||||
1205
api/customs/views.py
1205
api/customs/views.py
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user