Files
backend/api/datastage/views.py

211 lines
8.4 KiB
Python

import atexit
import tempfile
from api.utils.storage_service import storage_service
from config import settings
from rest_framework.pagination import PageNumberPagination
from api.customs.models import Pedimento, TipoOperacion, Regimen
from django.shortcuts import render
from rest_framework import viewsets
from rest_framework.permissions import IsAuthenticated
from rest_framework.decorators import action
from rest_framework.response import Response
from django.http import FileResponse, Http404
import os
from .models import DataStage
from .serializer import DataStageSerializer
from api.logger.mixins import LoggingMixin
from mixins.filtrado_organizacion import OrganizacionFiltradaMixin
from core.permissions import (
IsSameOrganization,
IsSameOrganizationDeveloper,
IsSameOrganizationAndAdmin,
IsSuperUser
)
# Create your views here.
class DataStagePagination(PageNumberPagination):
page_size = 20 # Valor por defecto
page_size_query_param = 'page_size'
max_page_size = 1000
class DataStageViewSet(LoggingMixin, viewsets.ModelViewSet, OrganizacionFiltradaMixin):
"""
ViewSet for managing DataStage instances.
Provides CRUD operations for DataStage.
"""
serializer_class = DataStageSerializer
permission_classes = [IsAuthenticated & (IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
model = DataStage
my_tags = ['DataStage']
pagination_class = DataStagePagination
def get_queryset(self):
if self.request.user.is_superuser:
return DataStage.objects.all().order_by('-created_at')
if self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user.groups.filter(name='Agente Aduanal').exists():
return DataStage.objects.filter(organizacion=self.request.user.organizacion).order_by('-created_at')
return self.get_queryset_filtrado_por_organizacion().order_by('-created_at')
def perform_create(self, serializer):
"""
Permite que la organización sea opcional en el request, pero si no se envía, se asigna la del usuario autenticado.
"""
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("Usuario no autenticado o sin organización")
data = serializer.validated_data
organizacion = data.get('organizacion')
if self.request.user.is_superuser:
# Permitir que el superusuario cree sin organización o la especifique
datastage = serializer.save()
self._trigger_processing(datastage)
return
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user.groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
if not organizacion:
datastage = serializer.save(organizacion=self.request.user.organizacion)
else:
datastage = serializer.save()
self._trigger_processing(datastage)
return
raise ValueError("No cuentas con los permisos necesarios para crear un DataStage")
def _trigger_processing(self, datastage):
"""
Método helper para disparar el procesamiento.
"""
from api.datastage.tasks import procesar_datastage_task
user_organizacion = getattr(self.request.user, 'organizacion', None)
user_organizacion_id = user_organizacion.id if user_organizacion else None
datastage.procesado = True
datastage.save()
task = procesar_datastage_task.delay(datastage.id, user_organizacion_id)
def perform_update(self, serializer):
"""
Override to ensure organization is set on update.
"""
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("Usuario no autenticado o sin organización")
if self.request.user.is_superuser:
# Allow superuser to update without organization
serializer.save()
return
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user.groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
serializer.save(organizacion=self.request.user.organizacion)
return
raise ValueError("No cuentas con los permisos necesarios para actualizar un DataStage")
@action(detail=True, methods=['get'], url_path='download-datastage', url_name='download-datastage')
def download_datastage(self, request, pk=None):
"""
Endpoint para descargar el archivo asociado a un DataStage.
Soporta tanto archivos en MinIO como archivos locales antiguos.
"""
try:
datastage = self.get_object()
if not datastage.archivo:
raise Http404("No hay archivo asociado a este DataStage.")
# Detectar si es ruta de MinIO o local
is_minio_path = datastage.archivo.startswith('org_')
if is_minio_path:
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp_path = tmp.name
success = storage_service.download_file(datastage.archivo, tmp_path)
if not success:
raise Http404("No se pudo descargar el archivo de MinIO")
filename = os.path.basename(datastage.archivo)
response = FileResponse(
open(tmp_path, 'rb'),
as_attachment=True,
filename=filename
)
import atexit
atexit.register(lambda: os.unlink(tmp_path) if os.path.exists(tmp_path) else None)
return response
else:
file_path = os.path.join(settings.MEDIA_ROOT, str(datastage.archivo))
if not os.path.exists(file_path):
raise Http404(f"El archivo no existe: {file_path}")
filename = os.path.basename(file_path)
response = FileResponse(
open(file_path, 'rb'),
as_attachment=True,
filename=filename
)
return response
except Exception as e:
return Response({'detail': str(e)}, status=404)
def perform_destroy(self, instance):
"""
Al eliminar un DataStage, también eliminar su archivo asociado.
"""
if instance.archivo:
storage_service.delete_file(instance.archivo)
instance.delete()
@action(detail=True, methods=['post'], url_path='procesar')
def procesar(self, request, pk=None):
"""
Endpoint para procesar el DataStage de forma asíncrona usando Celery.
"""
# ojo aqui
from api.datastage.tasks import procesar_datastage_task
datastage = self.get_object()
user_organizacion = getattr(self.request.user, 'organizacion', None)
user_organizacion_id = user_organizacion.id if user_organizacion else None
task = procesar_datastage_task.delay(datastage.id, user_organizacion_id)
return Response({
'task_id': task.id,
'detail': 'Procesamiento iniciado. Puede consultar el estado con el task_id.'
})
@action(detail=False, methods=['get'], url_path='task-status')
def task_status(self, request):
"""
Consulta el estado de una tarea de Celery por task_id.
"""
from celery.result import AsyncResult
task_id = request.query_params.get('task_id')
if not task_id:
return Response({'detail': 'Falta el parámetro task_id.'}, status=400)
result = AsyncResult(task_id)
return Response({
'task_id': task_id,
'status': result.status,
'result': result.result if result.successful() else None
})