Files
backend/api/customs/views.py
2026-01-19 09:46:35 -07:00

1196 lines
52 KiB
Python

from config.settings import SERVICE_API_URL
from django.shortcuts import render
from rest_framework import viewsets
from rest_framework.permissions import IsAuthenticated
from rest_framework.pagination import PageNumberPagination
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.exceptions import PermissionDenied
from rest_framework import status
from django_filters.rest_framework import DjangoFilterBackend
from django.core.files.storage import default_storage
from rest_framework.filters import SearchFilter, OrderingFilter
from core.permissions import (
IsSameOrganization,
IsSameOrganizationDeveloper,
IsSameOrganizationAndAdmin,
IsSuperUser
)
from api.customs.models import (
Pedimento,
TipoOperacion,
ProcesamientoPedimento,
EDocument,
Cove,
Importador,
Partida,
)
from api.customs.serializers import (
PedimentoSerializer,
TipoOperacionSerializer,
ProcesamientoPedimentoSerializer,
EDocumentSerializer,
CoveSerializer,
ImportadorSerializer,
PartidaSerializer
)
from api.logger.mixins import LoggingMixin
from mixins.filtrado_organizacion import OrganizacionFiltradaMixin, ProcesosPorOrganizacionMixin
import requests
import os
import re
import zipfile
import tempfile
import shutil
import subprocess
from datetime import date, datetime, time
from django.core.files.base import ContentFile
from django.db import transaction
from rest_framework.parsers import MultiPartParser, FormParser
from api.record.models import Document, DocumentType, Fuente
from unicodedata import normalize
from django.utils import timezone
from .tasks.bulk_pedimentos import process_bulk_upload
# Importar rarfile de manera opcional
try:
import rarfile
RAR_SUPPORT = True
except ImportError:
RAR_SUPPORT = False
# Importar tarea de procesamiento de pedimento (Celery)
from api.customs.tasks.microservice import procesar_pedimento_completo_individual
def get_available_extractors():
"""
Devuelve lista de extractores disponibles en orden de preferencia
"""
extractors = []
if RAR_SUPPORT:
extractors.append('rarfile')
# Verificar si 'unrar' está disponible
if shutil.which('unrar'):
extractors.append('unrar')
# Verificar si '7z' o '7za' están disponibles
if shutil.which('7z'):
extractors.append('7z')
elif shutil.which('7za'):
extractors.append('7za')
return extractors
def extract_rar_to_dir(rar_path, dest_dir):
"""
Extrae un archivo RAR a `dest_dir` usando varios mecanismos de respaldo:
1) intentará usar la librería `rarfile` si está disponible,
2) intentará ejecutar la utilidad `unrar` si está instalada en el sistema,
3) intentará ejecutar `7z`/`7za` (p7zip) si está instalada.
Lanza Exception con mensaje explicativo si falla.
"""
# Versión que primero verifica herramientas disponibles
available = get_available_extractors()
if not available:
raise Exception("No hay herramientas de extracción disponibles.")
print(f"Extractores disponibles (en orden de preferencia): {available}")
# Intento con rarfile primero si está disponible
# if RAR_SUPPORT:
if 'rarfile' in available and RAR_SUPPORT:
try:
# rarfile puede trabajar con rutas en disco mejor que con file-like
with rarfile.RarFile(rar_path) as rf:
rf.extractall(dest_dir)
try:
if os.path.exists(rar_path):
os.remove(rar_path)
print(f"Archivo original eliminado: {rar_path}")
except OSError as remove_error:
print(f"Advertencia: No se pudo eliminar '{rar_path}': {remove_error}")
return
except Exception as e:
# Si rarfile falla (por ejemplo RarCannotExec), seguimos con herramientas externas
# Hacer log para depuración
print(f"rarfile extraction failed, will try external tools: {e}")
# Intento con comandos externos
# Probar 'unrar' primero
external_cmds = [
['unrar', 'x', '-o+', rar_path, dest_dir],
['7z', 'x', rar_path, f'-o{dest_dir}', '-y'],
['7za', 'x', rar_path, f'-o{dest_dir}', '-y']
]
# for cmd in external_cmds:
# try:
# subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# try:
# if os.path.exists(rar_path):
# os.remove(rar_path)
# print(f"Archivo original eliminado: {rar_path}")
# except OSError as remove_error:
# print(f"Advertencia: No se pudo eliminar '{rar_path}': {remove_error}")
# return
# except FileNotFoundError:
# # El ejecutable no existe en PATH, intentar siguiente
# continue
# except subprocess.CalledProcessError as e:
# # El comando falló en la extracción; intentar siguiente
# print(f"External extractor failed ({cmd[0]}): {e}")
# continue
for extractor_name in available:
if extractor_name in external_cmds:
cmd = external_cmds[extractor_name]
try:
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
if os.path.exists(rar_path):
os.remove(rar_path)
print(f"Archivo original eliminado: {rar_path}")
except OSError as remove_error:
print(f"Advertencia: No se pudo eliminar '{rar_path}': {remove_error}")
return
except FileNotFoundError:
# El ejecutable no existe en PATH, intentar siguiente
continue
except subprocess.CalledProcessError as e:
# El comando falló en la extracción; intentar siguiente
print(f"External extractor {extractor_name} failed ({cmd[0]}): {e}")
continue
# Si llegamos aquí, ningún método funcionó
raise Exception("No se encontró una herramienta válida para extraer RAR (rarfile sin backend, 'unrar' o '7z' no disponibles o extracción fallida). Instale 'unrar' o 'p7zip' y asegúrese de que estén en PATH, o configure rarfile con un backend.")
from .tasks.microservice_v2 import *
from .tasks.auditoria import crear_partidas_por_pedimento
class CustomPagination(PageNumberPagination):
"""
Paginación personalizada con parámetros flexibles
- Si no se especifica page_size, devuelve todos los resultados (sin paginación)
- Si se especifica page_size, usa paginación normal
"""
page_size = None # Sin paginación por defecto
page_size_query_param = 'page_size'
max_page_size = 10000 # Límite máximo de seguridad
page_query_param = 'page'
def paginate_queryset(self, queryset, request, view=None):
"""
Si no se especifica page_size en los parámetros, devolver None (sin paginación)
Si se especifica, usar paginación normal
"""
# Verificar si se especificó page_size en la query
if self.page_size_query_param not in request.query_params:
# No hay page_size, devolver None para indicar "sin paginación"
return None
# Hay page_size, usar paginación normal
try:
page_size = int(request.query_params[self.page_size_query_param])
if page_size <= 0:
return None
# Establecer el page_size temporalmente para esta request
self.page_size = min(page_size, self.max_page_size)
except (ValueError, TypeError):
return None
return super().paginate_queryset(queryset, request, view)
class PedimentoPagination(PageNumberPagination):
"""
Paginación personalizada con parámetros flexibles
- Si no se especifica page_size, devuelve todos los resultados (sin paginación)
- Si se especifica page_size, usa paginación normal
"""
page_size = None # Sin paginación por defecto
page_size_query_param = 'page_size'
max_page_size = 1000 # Límite máximo de seguridad
page_query_param = 'page'
def paginate_queryset(self, queryset, request, view=None):
"""
Si no se especifica page_size en los parámetros, devolver None (sin paginación)
Si se especifica, usar paginación normal
"""
# Verificar si se especificó page_size en la query
if self.page_size_query_param not in request.query_params:
# No hay page_size, devolver None para indicar "sin paginación"
return None
# Hay page_size, usar paginación normal
try:
page_size = int(request.query_params[self.page_size_query_param])
if page_size <= 0:
return None
# Establecer el page_size temporalmente para esta request
self.page_size = min(page_size, self.max_page_size)
except (ValueError, TypeError):
return None
return super().paginate_queryset(queryset, request, view)
# Create your views here.
class ViewSetPedimento(LoggingMixin, viewsets.ModelViewSet, OrganizacionFiltradaMixin): # Pendiente de permisos de creacion
"""
ViewSet for Pedimento model.
Soporta paginación, filtros y búsqueda.
Parámetros disponibles:
- page: Número de página (solo si se especifica page_size)
- page_size: Elementos por página (si NO se especifica, devuelve TODOS los resultados)
- search: Búsqueda en pedimento, contribuyente, agente_aduanal
- pedimento: Filtro por número de pedimento
- existe_expediente: Filtro por expediente (True/False)
- contribuyente: Filtro por contribuyente
- curp_apoderado: Filtro por curp del apoderado
- fecha_pago: Filtro por fecha de pago (YYYY-MM-DD)
- patente: Filtro por patente
- aduana: Filtro por aduana
- tipo_operacion: Filtro por tipo de operación
- clave_pedimento: Filtro por clave de pedimento
- ordering: Ordenar por campo (ej: -created_at, pedimento)
Ejemplos:
- /pedimentos/ → Devuelve TODOS los pedimentos
- /pedimentos/?page_size=10 → Devuelve los primeros 10
- /pedimentos/?page_size=10&page=2 → Devuelve los pedimentos 11-20
- /pedimentos/?pedimento=12345678 → Filtra por número de pedimento
- /pedimentos/?existe_expediente=true → Filtra por expediente existente
- /pedimentos/?contribuyente=EMPRESA → Filtra por contribuyente
- /pedimentos/?curp_apoderado=XXXX → Filtra por curp apoderado
- /pedimentos/?fecha_pago=2025-07-18 → Filtra por fecha de pago
"""
permission_classes = [IsAuthenticated & (IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
serializer_class = PedimentoSerializer
pagination_class = PedimentoPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
model = Pedimento
filterset_fields = ['patente', 'aduana', 'tipo_operacion', 'clave_pedimento', 'pedimento', 'existe_expediente', 'contribuyente', 'curp_apoderado', 'fecha_pago', 'pedimento_app']
search_fields = ['pedimento', 'pedimento_app', 'agente_aduanal', 'clave_pedimento']
# AGREGAR ESTOS CAMPOS PARA ORDENACIÓN
ordering_fields = ['created_at', 'pedimento', 'fecha_pago', 'aduana', 'patente']
ordering = ['-created_at'] # Orden descendente por fecha de creación por defecto
def get_queryset(self):
queryset = self.get_queryset_filtrado_por_organizacion() # Tambien filtra por importador
# pedimento_app_filter = self.request.GET.get('pedimento_app', None)
# if pedimento_app_filter:
# print(f"Filtro por pedimento_app: {pedimento_app_filter}")
# queryset = queryset.filter(pedimento_app__icontains=pedimento_app_filter)
return queryset
def perform_create(self, serializer):
"""
Asigna automáticamente la organización del usuario autenticado al crear un pedimento.
"""
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("Usuario no autenticado o sin organización")
data = serializer.validated_data
if not data.get('pedimento_app'):
fecha_pago = data.get('fecha_pago')
aduana = data.get('aduana')
patente = data.get('patente')
pedimento = data.get('pedimento')
if fecha_pago and aduana and patente and pedimento:
pedimento_app = f"{str(fecha_pago.year)[-2:]}-{str(aduana).zfill(2)[-2:]}-{str(patente).zfill(4)[-4:]}-{str(pedimento).zfill(7)[-7:]}"
serializer.save(organizacion=self.request.user.organizacion, pedimento_app=pedimento_app)
try:
# Usar el nombre del servicio de Docker Compose en lugar de localhost
response = procesar_pedimento_completo_individual(serializer.instance.id)
# Verificar si la respuesta fue exitosa
if response.status_code == 200:
print(f"Servicio FastAPI ejecutado exitosamente: {response.status_code}")
print(f"Respuesta: {response.json()}")
elif response.status_code == 201:
print(f"Recurso creado exitosamente en FastAPI: {response.status_code}")
print(f"Respuesta: {response.json()}")
else:
print(f"Servicio FastAPI respondió con error: {response.status_code}")
print(f"Respuesta: {response.text}")
except requests.exceptions.ConnectionError as e:
print(f"No se pudo conectar al servicio FastAPI: {e}")
print(f"Verifica que el servicio FastAPI esté corriendo en {SERVICE_API_URL}")
except requests.exceptions.Timeout as e:
print(f"Timeout al conectar con el servicio FastAPI: {e}")
except requests.exceptions.RequestException as e:
print(f"Error de request al servicio FastAPI: {e}")
except Exception as e:
print(f"Error inesperado al llamar al servicio FastAPI: {e}")
def perform_update(self, serializer):
"""
Ejecuta acciones después de actualizar un pedimento basado en los campos modificados.
"""
# Obtener los campos que se están actualizando
updated_fields = set(serializer.validated_data.keys())
# Guardar los cambios
pedimento = serializer.save()
# Si se actualizó el campo existe_expediente, procesar el pedimento completo
if 'existe_expediente' in updated_fields:
# Iniciar todas las tareas
procesar_remesas_pedimento(pedimento.id)
crear_partidas_por_pedimento(pedimento.id)
procesar_acuse_coves_pedimento(pedimento.id)
procesar_edocs_pedimento(pedimento.id)
procesar_acuses_pedimento(pedimento.id)
procesar_partidas_pedimento(pedimento.id)
procesar_coves_pedimento(pedimento.id)
# Agregar mensaje de tareas iniciadas al serializer
serializer._data = {
**serializer.data,
"message": "Tareas de procesamiento iniciadas",
"tasks": [
"Procesamiento de remesas",
"Creación de partidas",
"Procesamiento de acuses de COVEs",
"Procesamiento de E-documents",
"Procesamiento de acuses",
"Procesamiento de partidas",
"Procesamiento de COVEs"
]
}
@action(detail=True, methods=['post'], url_path='procesar-completo')
def procesar_completo(self, request, pk=None):
"""
Acción para disparar el procesamiento completo de un pedimento existente.
Dispara la tarea `procesar_pedimento_completo_individual` de forma asíncrona
y devuelve el `task_id`.
"""
pedimento = self.get_object()
try:
# Usar el nombre del servicio de Docker Compose en lugar de localhost
task = procesar_pedimento_completo_individual.delay(pedimento.id, pedimento.organizacion.id)
# Verificar si la respuesta fue exitosa
if task.id:
return Response({"status": "Recurso creado exitosamente en API", "task_id": task.id}, status=status.HTTP_202_ACCEPTED)
else:
return Response({"status": "Servicio API respondió con error", "task_id": 0}, status=status.HTTP_202_ACCEPTED)
except Exception as e:
return Response({"error": f"Error inesperado al llamar al servicio API: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['post'], url_path='bulk-delete')
def bulk_delete(self, request):
"""
Endpoint para eliminar múltiples pedimentos de manera masiva.
Payload esperado:
{
"ids": ["uuid1", "uuid2", "uuid3", ...]
}
Respuesta exitosa:
{
"message": "Pedimentos eliminados exitosamente",
"deleted_count": 3,
"deleted_ids": ["uuid1", "uuid2", "uuid3"]
}
Respuesta con errores:
{
"message": "Algunos pedimentos no pudieron ser eliminados",
"deleted_count": 2,
"deleted_ids": ["uuid1", "uuid2"],
"failed_ids": ["uuid3"],
"errors": ["No se encontró el pedimento con ID uuid3"]
}
"""
# Obtener los IDs del payload
ids = request.data.get('ids', [])
if not ids:
return Response(
{"error": "Se requiere una lista de IDs para eliminar"},
status=status.HTTP_400_BAD_REQUEST
)
if not isinstance(ids, list):
return Response(
{"error": "El campo 'ids' debe ser una lista"},
status=status.HTTP_400_BAD_REQUEST
)
# Obtener el queryset filtrado por organización
queryset = self.get_queryset()
# Filtrar solo los pedimentos que existen y pertenecen a la organización del usuario
existing_pedimentos = queryset.filter(id__in=ids)
existing_ids = list(existing_pedimentos.values_list('id', flat=True))
# Convertir UUIDs a strings para comparación
existing_ids_str = [str(id) for id in existing_ids]
requested_ids_str = [str(id) for id in ids]
# Identificar IDs que no existen o no pertenecen a la organización
failed_ids = [id for id in requested_ids_str if id not in existing_ids_str]
deleted_count = 0
errors = []
if existing_pedimentos.exists():
try:
# Eliminar los pedimentos encontrados
deleted_count = existing_pedimentos.count()
existing_pedimentos.delete()
except Exception as e:
return Response(
{"error": f"Error al eliminar pedimentos: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
# Agregar errores para IDs no encontrados
if failed_ids:
errors = [f"No se encontró el pedimento con ID {id} o no pertenece a su organización" for id in failed_ids]
# Preparar respuesta
response_data = {
"deleted_count": deleted_count,
"deleted_ids": existing_ids_str
}
if failed_ids:
response_data.update({
"message": "Algunos pedimentos no pudieron ser eliminados",
"failed_ids": failed_ids,
"errors": errors
})
response_status = status.HTTP_207_MULTI_STATUS
else:
response_data["message"] = "Pedimentos eliminados exitosamente"
response_status = status.HTTP_200_OK
return Response(response_data, status=response_status)
@action(detail=False, methods=['post'], url_path='bulk-create', parser_classes=[MultiPartParser, FormParser])
def bulk_create(self, request):
"""
Version asincrona con Celery
"""
from django.utils import timezone
import time as python_time
contribuyente = request.data.get('contribuyente')
archivos = request.FILES.getlist('archivos')
if not archivos:
return Response(
{"error": "Se requiere al menos un archivo"},
status=status.HTTP_400_BAD_REQUEST
)
if not request.user.is_authenticated or not hasattr(request.user, 'organizacion'):
return Response(
{"error": "Usuario no autenticado o sin organización"},
status=status.HTTP_400_BAD_REQUEST
)
temp_files_info = []
try:
for archivo in archivos:
timestamp = int(timezone.now().timestamp() * 1000)
unique_name = f"temp_{timestamp}_{archivo.name}"
saved_path = f"temp_uploads/{unique_name}"
default_storage.save(saved_path, ContentFile(archivo.read()))
temp_files_info.append({
'original_name': archivo.name,
'saved_path': saved_path,
'size': archivo.size
})
bulk_upload = BulkUploadTask.objects.create(
user=request.user,
organizacion=request.user.organizacion,
contribuyente=contribuyente,
task_type='bulk_create',
total_files=len(archivos),
status='pending',
result={
'temp_files': temp_files_info,
'original_filenames': [archivo.name for archivo in archivos]
}
)
process_bulk_upload.delay(
bulk_upload_id=bulk_upload.id
)
return Response({
"message": "Procesamiento iniciado en segundo plano",
"task_id": bulk_upload.id,
"status": "pending",
"check_status_url": f"/api/pedimentos/bulk-upload-status/{bulk_upload.id}/"
}, status=status.HTTP_202_ACCEPTED)
except Exception as e:
for file_info in temp_files_info:
try:
default_storage.delete(file_info['saved_path'])
except:
pass
return Response(
{"error": f"Error al iniciar procesamiento: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@action(detail=False, methods=['post'], url_path='bulk-create-pedimento_desk', parser_classes=[MultiPartParser, FormParser])
def bulk_create_pedimento_desk(self, request):
"""
Versión asíncrona para EFC APP Desk
"""
print("🔧 Iniciando bulk-create-pedimento_desk asíncrono...")
from django.utils import timezone
archivos = request.FILES.getlist('archivos')
if not archivos:
return Response(
{"tieneError": True, "error": "Se requiere al menos un archivo"},
status=status.HTTP_400_BAD_REQUEST
)
if not request.user.is_authenticated or not hasattr(request.user, 'organizacion'):
return Response(
{"tieneError": True, "error": "Usuario no autenticado o sin organización"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
temp_files_info = []
try:
for archivo in archivos:
timestamp = int(timezone.now().timestamp() * 1000)
unique_name = f"temp_{timestamp}_{archivo.name}"
saved_path = f"temp_uploads/{unique_name}"
default_storage.save(saved_path, ContentFile(archivo.read()))
temp_files_info.append({
'original_name': archivo.name,
'saved_path': saved_path,
'size': archivo.size
})
bulk_upload = BulkUploadTask.objects.create(
user=request.user,
organizacion=request.user.organizacion,
contribuyente=request.data.get('contribuyente'),
task_type='bulk_create_pedimento_desk',
total_files=len(archivos),
status='pending',
fecha_pago=request.data.get('fecha_pago'),
clave_pedimento=request.data.get('clave_pedimento'),
tipo_operacion_id=request.data.get('tipo_operacion'),
curp_apoderado=request.data.get('curp_apoderado'),
partidas=request.data.get('partidas', 0),
result={
'temp_files': temp_files_info,
'original_filenames': [archivo.name for archivo in archivos]
}
)
process_bulk_upload.delay(
bulk_upload_id=bulk_upload.id
)
return Response({
"tieneError": False,
"message": "Procesamiento iniciado en segundo plano",
"task_id": bulk_upload.id,
"status": "pending",
"check_status_url": f"/api/pedimentos/bulk-upload-status/{bulk_upload.id}/"
}, status=status.HTTP_202_ACCEPTED)
except Exception as e:
for file_info in temp_files_info:
try:
default_storage.delete(file_info['saved_path'])
except:
pass
return Response(
{"tieneError": True, "error": f"Error al iniciar procesamiento: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@action(detail=False, methods=['get'], url_path='bulk-upload-status(?:/(?P<task_id>[^/.]+))?')
def bulk_upload_status(self, request, task_id=None):
"""
Endpoint que puede manejar:
1. Un task_id específico: /bulk-upload-status/<task_id>/
2. Todos los tasks pendientes del usuario: /bulk-upload-status/
Respuestas:
- Con task_id: Información detallada de una tarea específica
- Sin task_id: Lista de todas las tareas del usuario con status != 'completed'
"""
try:
if task_id:
bulk_upload = BulkUploadTask.objects.get(id=task_id, user=request.user)
estimated_documents = bulk_upload.created_documents
estimated_percentage = 0
if bulk_upload.status == 'processing' and bulk_upload.started_at:
elapsed = (timezone.now() - bulk_upload.started_at).total_seconds()
if elapsed > 10 and bulk_upload.created_documents > 0:
docs_per_second = bulk_upload.created_documents / elapsed
estimated_total_seconds = 300 # 5 minutos para 1000 docs
estimated_documents = int(docs_per_second * min(elapsed, estimated_total_seconds))
estimated_percentage = min(95, (elapsed / estimated_total_seconds) * 100)
elif bulk_upload.status == 'completed' and bulk_upload.result:
estimated_documents = bulk_upload.created_documents
estimated_percentage = 100
response_data = {
"task_id": bulk_upload.id,
"status": bulk_upload.status,
"celery_status": {
'state': 'PROCESSING' if bulk_upload.status == 'processing' else bulk_upload.status.upper(),
'info': bulk_upload.result if bulk_upload.status == 'completed' else None
},
"progress": {
"total_files": bulk_upload.total_files,
"processed_files": bulk_upload.processed_files,
"created_pedimentos": bulk_upload.created_pedimentos,
"created_documents": bulk_upload.created_documents,
"estimated_documents": estimated_documents,
"percentage_files": (bulk_upload.processed_files / bulk_upload.total_files * 100) if bulk_upload.total_files > 0 else 0,
"percentage_documents_estimated": estimated_percentage,
"batch_info": {
"batch_size": 50,
"current_batch": (bulk_upload.created_documents // 50) + 1,
"next_update_at": bulk_upload.created_documents + (50 - (bulk_upload.created_documents % 50))
}
},
"metadata": {
"task_type": bulk_upload.task_type,
"contribuyente": bulk_upload.contribuyente,
"created_at": bulk_upload.created_at,
"started_at": bulk_upload.started_at,
"finished_at": bulk_upload.finished_at,
"elapsed_seconds": (timezone.now() - bulk_upload.started_at).total_seconds() if bulk_upload.started_at else 0
}
}
if bulk_upload.status in ['completed', 'partial', 'failed']:
response_data.update({
"result": bulk_upload.result,
"failed_files": bulk_upload.failed_files,
"error_message": bulk_upload.error_message
})
if bulk_upload.status == 'processing':
response_data["note"] = "El progreso se actualiza cada 50 documentos para optimizar rendimiento"
response_data["next_update_in"] = 50 - (bulk_upload.created_documents % 50)
return Response(response_data)
else:
pending_tasks = BulkUploadTask.objects.filter(
user=request.user
).exclude(
status='completed'
).order_by('-created_at')
if not pending_tasks.exists():
return Response({
"pending_tasks": [],
"total_pending": 0,
"message": "No hay tareas de carga pendientes"
})
tasks_list = []
for task in pending_tasks:
estimated_documents = task.created_documents
estimated_percentage = 0
if task.status == 'processing' and task.started_at:
elapsed = (timezone.now() - task.started_at).total_seconds()
if elapsed > 10 and task.created_documents > 0:
docs_per_second = task.created_documents / elapsed
estimated_total_seconds = 300
estimated_documents = int(docs_per_second * min(elapsed, estimated_total_seconds))
estimated_percentage = min(95, (elapsed / estimated_total_seconds) * 100)
elif task.status == 'completed':
estimated_percentage = 100
task_data = {
"task_id": task.id,
"status": task.status,
"task_type": task.task_type,
"contribuyente": task.contribuyente,
"total_files": task.total_files,
"created_at": task.created_at,
"started_at": task.started_at,
"finished_at": task.finished_at,
"progress": {
"processed_files": task.processed_files,
"created_pedimentos": task.created_pedimentos,
"created_documents": task.created_documents,
"estimated_documents": estimated_documents,
"percentage_files": (task.processed_files / task.total_files * 100) if task.total_files > 0 else 0,
"percentage_documents_estimated": estimated_percentage,
},
"metadata": {
"elapsed_seconds": (timezone.now() - task.started_at).total_seconds() if task.started_at else 0,
"batch_size": 50,
"next_batch_update": task.created_documents + (50 - (task.created_documents % 50))
}
}
if task.error_message:
task_data["error_message"] = task.error_message
tasks_list.append(task_data)
return Response({
"pending_tasks": tasks_list,
"total_pending": len(tasks_list),
"summary": {
"processing": len([t for t in tasks_list if t["status"] == "processing"]),
"pending": len([t for t in tasks_list if t["status"] == "pending"]),
"partial": len([t for t in tasks_list if t["status"] == "partial"]),
"failed": len([t for t in tasks_list if t["status"] == "failed"]),
}
})
except BulkUploadTask.DoesNotExist:
return Response(
{"error": "Task no encontrada"},
status=status.HTTP_404_NOT_FOUND
)
except Exception as e:
return Response(
{"error": f"Error interno: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
my_tags = ['Pedimentos']
class PartidaViewSet(viewsets.ModelViewSet):
"""
ViewSet for Partida model.
Permite filtrar por:
- pedimento: UUID del pedimento (query parameter principal)
- pedimento__id: UUID del pedimento (alternativo)
Ejemplo: GET /api/partidas/?pedimento=6782d22e-5e97-4efc-87c9-bd8497c8ac7e
"""
permission_classes = [IsAuthenticated & (IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
queryset = Partida.objects.all()
serializer_class = PartidaSerializer
pagination_class = CustomPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = {
'pedimento': ['exact'], # Filtro directo por UUID del pedimento
'pedimento__id': ['exact'], # Filtro alternativo
'numero_partida': ['exact', 'gte', 'lte'], # Filtros por número de partida
'descargado': ['exact'], # Filtro por estado de descarga
'created_at': ['exact', 'gte', 'lte'], # Filtros por fecha de creación
'updated_at': ['exact', 'gte', 'lte'] # Filtros por fecha de actualización
}
search_fields = ['pedimento__pedimento', 'pedimento__pedimento_app']
ordering_fields = ['numero_partida', 'pedimento__pedimento', 'id', 'created_at', 'updated_at']
ordering = ['numero_partida'] # Ordenar por número de partida por defecto
my_tags = ['Partidas']
class ViewSetTipoOperacion(LoggingMixin, viewsets.ModelViewSet):
"""
ViewSet for TipoOperacion model.
"""
permission_classes = [IsAuthenticated & (IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
queryset = TipoOperacion.objects.all()
serializer_class = TipoOperacionSerializer
pagination_class = CustomPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['tipo']
search_fields = ['tipo', 'descripcion']
ordering_fields = ['tipo', 'descripcion']
ordering = ['tipo']
my_tags = ['Tipos_Operacion']
def perform_create(self, serializer):
"""
Asigna automáticamente la organización del usuario autenticado al crear un tipo de operación.
"""
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("Usuario no autenticado o sin organización")
# Solo el supoerusuario puede crear tipos de operación
if not self.request.user.is_superuser:
raise PermissionDenied("Solo los superusuarios pueden crear tipos de operación")
serializer.save(organizacion=self.request.user.organizacion)
def perform_update(self, serializer):
"""
Solo el superusuario puede actualizar tipos de operación.
"""
if not self.request.user.is_superuser:
raise PermissionDenied("Solo los superusuarios pueden actualizar tipos de operación")
serializer.save()
class ViewSetProcesamientoPedimento(viewsets.ModelViewSet, ProcesosPorOrganizacionMixin):
"""
ViewSet for ProcesamientoPedimento model.
Soporta paginación, filtros y búsqueda.
Parámetros disponibles:
- page: Número de página (solo si se especifica page_size)
- page_size: Elementos por página (si NO se especifica, devuelve TODOS los resultados)
- pedimento: Filtro por pedimento
- estado: Filtro por estado
- servicio: Filtro por servicio
- tipo_procesamiento: Filtro por tipo de procesamiento
- ordering: Ordenar por campo (ej: -created_at, -updated_at)
Ejemplos:
- /procesamientopedimentos/ → Devuelve TODOS los procesamientos
- /procesamientopedimentos/?page_size=5 → Devuelve los primeros 5
"""
permission_classes = [IsAuthenticated, IsSuperUser | IsSameOrganizationDeveloper ]
serializer_class = ProcesamientoPedimentoSerializer
pagination_class = CustomPagination
model = ProcesamientoPedimento
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = {
'pedimento': ['exact'],
'pedimento__pedimento_app': ['exact', 'icontains'],
'estado': ['exact'],
'servicio': ['exact'],
'tipo_procesamiento': ['exact'],
}
search_fields = ['pedimento__pedimento_app', 'pedimento__pedimento']
ordering_fields = ['created_at', 'updated_at']
ordering = ['-created_at']
def get_queryset(self):
return self.get_queryset_filtrado_por_organizacion()
def perform_create(self, serializer):
"""
Asigna siempre la organización al crear un procesamiento de pedimento.
- Para superusuarios: requiere que la organización venga explícitamente en los datos validados.
- Para usuarios normales: asigna la organización del usuario autenticado.
"""
user = self.request.user
if not user.is_authenticated:
raise ValueError("Usuario no autenticado")
# Si es superusuario, debe venir la organización en los datos validados
if user.is_superuser:
organizacion = serializer.validated_data.get('organizacion', None)
if not organizacion:
raise ValueError("El superusuario debe especificar una organización al crear el procesamiento de pedimento.")
serializer.save()
return
# Para usuarios normales, asignar siempre la organización del usuario
if not hasattr(user, 'organizacion') or not user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=user.organizacion)
def perform_update(self, serializer):
"""
Permite actualizar un procesamiento de pedimento, pero solo si el usuario es superusuario o pertenece a la misma organización.
"""
if not self.request.user.is_authenticated:
raise ValueError("Usuario no autenticado")
if self.request.user.is_superuser:
serializer.save()
return
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user.groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
# Para usuarios normales, usar siempre su organización
if not hasattr(self.request.user, 'organizacion') or not self.request.user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=self.request.user.organizacion)
return
raise ValueError("Usuario no autenticado o sin permisos para actualizar ProcesamientoPedimento")
my_tags = ['Procesamientos_Pedimentos']
class ViewSetEDocument(LoggingMixin, viewsets.ModelViewSet, OrganizacionFiltradaMixin):
"""
ViewSet for EDocument model.
"""
permission_classes = [IsAuthenticated & (IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
serializer_class = EDocumentSerializer
pagination_class = CustomPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['pedimento', 'numero_edocument', 'organizacion']
search_fields = ['numero_edocument', 'descripcion', 'organizacion']
ordering_fields = ['created_at', 'updated_at', 'numero_edocument']
ordering = ['-created_at']
model = EDocument
my_tags = ['EDocuments']
def get_queryset(self):
return self.get_queryset_filtrado_por_organizacion()
def perform_create(self, serializer):
"""
Asigna automáticamente la organización del usuario autenticado al crear un EDocument.
Para superusuarios, permite especificar una organización diferente.
"""
if not self.request.user.is_authenticated:
raise ValueError("Usuario no autenticado")
# Si es superusuario y se especifica organizacion en los datos validados
if self.request.user.is_superuser:
# Permitir que el superusuario especifique la organización
serializer.save()
return
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user.groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
# Para usuarios normales, usar siempre su organización
if not hasattr(self.request.user, 'organizacion') or not self.request.user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=self.request.user.organizacion)
return
raise ValueError("Usuario no autenticado o sin permisos para crear EDocument")
def perform_update(self, serializer):
"""
Permite actualizar un EDocument, pero solo si el usuario es superusuario o pertenece a la misma organización.
"""
if not self.request.user.is_authenticated:
raise ValueError("Usuario no autenticado")
# Si es superusuario, permite actualizar sin restricciones
if self.request.user.is_superuser:
serializer.save()
return
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user.groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
# Para usuarios normales, usar siempre su organización
if not hasattr(self.request.user, 'organizacion') or not self.request.user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=self.request.user.organizacion)
raise ValueError("Usuario no autenticado o sin permisos para actualizar EDocument")
class ViewSetCove(viewsets.ModelViewSet, OrganizacionFiltradaMixin):
"""
ViewSet for Cove model.
"""
permission_classes = [IsAuthenticated & (IsSuperUser |IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper )]
serializer_class = CoveSerializer
pagination_class = CustomPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['pedimento', 'numero_cove', 'organizacion']
search_fields = ['numero_cove', 'descripcion', 'organizacion']
ordering_fields = ['created_at', 'updated_at', 'numero_cove']
ordering = ['-created_at']
model = Cove
my_tags = ['Coves']
def get_queryset(self):
return self.get_queryset_filtrado_por_organizacion()
def perform_create(self, serializer):
"""
Asigna automáticamente la organización del usuario autenticado al crear un Cove.
Para superusuarios, permite especificar una organización diferente.
"""
if not self.request.user.is_authenticated:
raise ValueError("Usuario no autenticado")
# Si es superusuario y se especifica organizacion en los datos validados
if self.request.user.is_superuser:
# Permitir que el superusuario especifique la organización
serializer.save()
return
if (
self.request.user.groups.filter(name='developer').exists()
or self.request.user.groups.filter(name='admin').exists()
or self.request.user.groups.filter(name='user').exists()
) and self.request.user.groups.filter(name='Agente Aduanal').exists():
# Para usuarios normales, usar siempre su organización
if not hasattr(self.request.user, 'organizacion') or not self.request.user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=self.request.user.organizacion)
return
raise ValueError("Usuario no autenticado o sin permisos para crear Cove")
def perform_update(self, serializer):
"""
Permite actualizar un Cove, pero solo si el usuario es superusuario o pertenece a la misma organización.
"""
if not self.request.user.is_authenticated:
raise ValueError("Usuario no autenticado")
# Si es superusuario, permite actualizar sin restricciones
if self.request.user.is_superuser:
serializer.save()
return
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user .groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
# Para usuarios normales, usar siempre su organización
if not hasattr(self.request.user, 'organizacion') or not self.request.user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=self.request.user.organizacion)
class ImportadorViewSet(viewsets.ModelViewSet, OrganizacionFiltradaMixin):
"""
ViewSet for Importador model.
"""
permission_classes = [IsAuthenticated & (IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
serializer_class = ImportadorSerializer
pagination_class = CustomPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['rfc', 'nombre', 'organizacion']
search_fields = ['rfc', 'nombre']
ordering_fields = ['created_at', 'updated_at', 'rfc']
ordering = ['-created_at']
model = Importador
def get_queryset(self):
return self.get_queryset_filtrado_por_organizacion()
def perform_create(self, serializer):
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("Usuario no autenticado o sin organización")
serializer.save(organizacion=self.request.user.organizacion)
def perform_update(self, serializer):
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("Usuario no autenticado o sin organización")
# Si es superusuario, permite actualizar sin restricciones
if self.request.user.is_superuser:
serializer.save()
return
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user.groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
# Para usuarios normales, usar siempre su organización
if not hasattr(self.request.user, 'organizacion') or not self.request.user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=self.request.user.organizacion)
return
raise ValueError("Usuario no autenticado o sin permisos para actualizar Importador")
my_tags = ['Importadores']
# helper | reglas para formato de docuemnto antes de cargarlo
def normalize_filename(filename):
"""
Normaliza el nombre del archivo removiendo caracteres especiales,
espacios y asegurando consistencia.
"""
filename = normalize('NFKD', filename).encode('ASCII', 'ignore').decode('ASCII')
filename = re.sub(r'[^\w\s.-]', '_', filename) # Remover caracteres no alfanuméricos
filename = re.sub(r'[\s()]+', '_', filename) # Reemplazar espacios y paréntesis
filename = re.sub(r'_+', '_', filename) # Consolidar múltiples _
filename = filename.strip('_') # Remover _ al inicio/final
return filename
def get_clean_base_filename(filename):
"""
Obtiene el nombre base limpio sin el sufijo de Django.
"""
normalized = normalize_filename(filename)
name_without_ext, ext = os.path.splitext(normalized)
django_suffix = extract_django_suffix(name_without_ext)
if django_suffix:
base_name = name_without_ext[:-8]
else:
base_name = name_without_ext
base_name = re.sub(r'(_copy|_copia|_-_copia|_-_copy)(_\d+)?$', '', base_name)
return base_name.lower().strip('_')
def is_same_document(existing_doc, new_filename):
"""
Compara si un documento existente y un nuevo archivo son el mismo documento.
Args:
existing_doc: Objeto Document existente
new_filename: Nombre del nuevo archivo a subir
Returns:
bool: True si son el mismo documento
"""
existing_basename = os.path.basename(existing_doc.archivo.name)
existing_base = get_clean_base_filename(existing_basename)
new_base = get_clean_base_filename(new_filename)
existing_ext = existing_doc.extension.lower()
new_ext = os.path.splitext(new_filename)[1].lower().lstrip('.')
return existing_base == new_base and existing_ext == new_ext
def extract_django_suffix(filename):
"""
Extrae el sufijo único que Django añade a los archivos.
"""
name_without_ext = os.path.splitext(filename)[0]
match = re.search(r'_([a-zA-Z0-9]{7})$', name_without_ext)
if match:
return match.group(1)
return None
def get_clean_base_filename(filename):
"""
Obtiene el nombre base limpio sin el sufijo de Django.
"""
normalized = normalize_filename(filename)
name_without_ext, ext = os.path.splitext(normalized)
django_suffix = extract_django_suffix(name_without_ext)
if django_suffix:
base_name = name_without_ext[:-8]
else:
base_name = name_without_ext
base_name = re.sub(r'(_copy|_copia|_-_copia|_-_copy)(_\d+)?$', '', base_name)
return base_name.lower().strip('_')