Files
backend/api/vucem/views.py

231 lines
9.0 KiB
Python

import atexit
import os
import tempfile
from django.shortcuts import render
from ..organization.models import Organizacion
from rest_framework import viewsets
from rest_framework.pagination import PageNumberPagination
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.filters import SearchFilter, OrderingFilter
from rest_framework.permissions import IsAuthenticated
from rest_framework.decorators import action
from rest_framework.response import Response
from django.http import FileResponse, Http404
from api.utils.storage_service import storage_service
from .serializers import VucemSerializer, CredencialesImportadorSerializer, CredencialesImportadorSimpleSerializer
from rest_framework import serializers
# Serializer para update donde key y cer no son requeridos
class VucemUpdateSerializer(VucemSerializer):
key = serializers.FileField(required=False, allow_null=True)
cer = serializers.FileField(required=False, allow_null=True)
class Meta(VucemSerializer.Meta):
fields = VucemSerializer.Meta.fields
from .models import Vucem, CredencialesImportador
from core.permissions import IsSameOrganizationDeveloper
from rest_framework import mixins
from core.permissions import (
IsSameOrganization,
IsSameOrganizationDeveloper,
IsSameOrganizationAndAdmin,
IsSuperUser,
IsSameOrganizationAndInAllowedGroups
)
class CustomVucemPagination(PageNumberPagination):
"""
Paginación personalizada para VUCEM
"""
page_size = None # Sin paginación por defecto
page_size_query_param = 'page_size'
max_page_size = 1000
page_query_param = 'page'
def paginate_queryset(self, queryset, request, view=None):
page_size = request.query_params.get(self.page_size_query_param)
if page_size is None:
return None
return super().paginate_queryset(queryset, request, view)
# Create your views here.
class VucemView(viewsets.ModelViewSet):
permission_classes = [IsAuthenticated , (IsSuperUser | IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper )]
queryset = Vucem.objects.all()
pagination_class = CustomVucemPagination
filterset_fields = ['organizacion', 'patente', 'usuario', 'is_importador', 'acusecove', 'acuseedocument', 'is_active']
search_fields = ['usuario', 'patente']
ordering_fields = ['created_at', 'updated_at', 'usuario', 'patente']
ordering = ['-created_at']
def get_serializer_class(self):
if self.action in ['update', 'partial_update']:
return VucemUpdateSerializer
return VucemSerializer
def get_permissions(self):
if self.action in ['create', 'update', 'partial_update', 'destroy']:
return [IsAuthenticated(), IsSameOrganizationAndInAllowedGroups()]
return super().get_permissions()
def get_queryset(self):
# Verificar que el usuario esté autenticado y tenga organización
if not self.request.user.is_authenticated:
return self.queryset.none()
queryset = self.queryset
if self.request.user.is_superuser:
queryset = queryset.all()
elif not hasattr(self.request.user, 'organizacion') or not self.request.user.organizacion:
return queryset.none()
elif self.request.user.groups.filter(name='Importador').exists():
queryset = queryset.filter(organizacion=self.request.user.organizacion, usuario=self.request.user.rfc)
else:
queryset = queryset.filter(organizacion=self.request.user.organizacion)
# Filtro por importador (RFC)
importador_rfc = self.request.query_params.get('importador')
if importador_rfc:
queryset = queryset.filter(usuarios_importadores__rfc__rfc=importador_rfc).distinct()
return queryset
def perform_create(self, serializer):
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("El usuario debe estar autenticado y tener una organización asignada.")
if self.request.user.is_superuser:
organizacion_id = self.request.data.get('organizacion_id')
if not organizacion_id:
raise ValueError("Los superusuarios deben especificar una organización")
try:
# Importa el modelo Organizacion
# from ..organization.models import Organizacion
organizacion = Organizacion.objects.get(id=organizacion_id)
except Organizacion.DoesNotExist:
raise ValueError({"organizacion": "Organización no encontrada"})
serializer.save(
organizacion=organizacion,
created_by=self.request.user,
updated_by=self.request.user
)
return
else:
serializer.save(
organizacion=self.request.user.organizacion,
created_by=self.request.user,
updated_by=self.request.user
)
return
def perform_update(self, serializer):
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("El usuario debe estar autenticado y tener una organización asignada.")
instance = self.get_object()
if self.request.user.is_superuser:
serializer.save(
created_by=instance.created_by,
updated_by=self.request.user
)
return
else:
serializer.save(
organizacion=self.request.user.organizacion,
created_by=instance.created_by,
updated_by=self.request.user
)
return
@action(detail=True, methods=["get"], permission_classes=[IsAuthenticated])
def download_cer(self, request, pk=None):
vucem = self.get_object()
if not vucem.cer:
return Response({"detail": "No hay archivo cer disponible."}, status=404)
ruta = str(vucem.cer)
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp_path = tmp.name
success = storage_service.download_file(ruta, tmp_path)
if not success:
raise Http404("No se pudo descargar el archivo")
filename = os.path.basename(ruta)
response = FileResponse(open(tmp_path, 'rb'), as_attachment=True, filename=filename)
atexit.register(lambda: os.unlink(tmp_path) if os.path.exists(tmp_path) else None)
return response
@action(detail=True, methods=["get"], permission_classes=[IsAuthenticated])
def download_key(self, request, pk=None):
vucem = self.get_object()
if not vucem.key:
return Response({"detail": "No hay archivo key disponible."}, status=404)
ruta = str(vucem.key)
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp_path = tmp.name
success = storage_service.download_file(ruta, tmp_path)
if not success:
raise Http404("No se pudo descargar el archivo")
filename = os.path.basename(ruta)
response = FileResponse(open(tmp_path, 'rb'), as_attachment=True, filename=filename)
atexit.register(lambda: os.unlink(tmp_path) if os.path.exists(tmp_path) else None)
return response
def perform_destroy(self, instance):
if instance.key:
storage_service.delete_file(str(instance.key))
if instance.cer:
storage_service.delete_file(str(instance.cer))
instance.delete()
class CredencialesImportadorViewSet(viewsets.ModelViewSet):
permission_classes = [IsAuthenticated]
queryset = CredencialesImportador.objects.all()
serializer_class = CredencialesImportadorSimpleSerializer
filterset_fields = ['organizacion', 'vucem', 'rfc']
search_fields = ['rfc']
ordering_fields = ['created_at', 'updated_at', 'rfc']
ordering = ['-created_at']
my_tags = ['Credenciales por Importador']
def get_permissions(self):
if self.action in ['create', 'update', 'partial_update', 'destroy']:
return [IsAuthenticated()]
return super().get_permissions()
def get_queryset(self):
if self.request.user.is_superuser:
# Si es superusuario, devolver todos los registros
return self.queryset.all()
# Verificar que el usuario esté autenticado y tenga organización
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
return self.queryset.none()
queryset = self.queryset.filter(organizacion=self.request.user.organizacion)
return queryset
def perform_create(self, serializer):
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("El usuario debe estar autenticado y tener una organización asignada.")
serializer.save(organizacion=self.request.user.organizacion)
return