Files
backend/api/tasks/views.py
2026-03-06 12:48:51 -07:00

105 lines
3.7 KiB
Python

from django.shortcuts import render
from rest_framework import viewsets, filters
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.pagination import PageNumberPagination
from api.logger.mixins import LoggingMixin
from mixins.filtrado_organizacion import OrganizacionFiltradaMixin, ProcesosPorOrganizacionMixin
from .models import Task
from .serializers import TaskSerializer
from .filters import TaskFilter
from rest_framework.permissions import IsAuthenticated
# Create your views here.
from core.permissions import (
IsSameOrganization,
IsSameOrganizationDeveloper,
IsSameOrganizationAndAdmin,
IsSuperUser
)
class TaskPagination(PageNumberPagination):
page_size = 10
page_size_query_param = 'page_size'
max_page_size = 100
class TaskViewSet(LoggingMixin,viewsets.ModelViewSet,OrganizacionFiltradaMixin):
permission_classes = [IsAuthenticated & (IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
queryset = Task.objects.select_related('pedimento', 'servicio').all()
serializer_class = TaskSerializer
filter_backends = [DjangoFilterBackend, filters.OrderingFilter]
filterset_class = TaskFilter
pagination_class = TaskPagination
ordering_fields = ['timestamp']
ordering = ['-timestamp'] # ordenamiento por defecto, más reciente primero
my_tags = ['tasks']
def get_queryset(self):
"""
Filtra las tareas según la organización del usuario.
Superusuarios pueden ver todas las tareas.
"""
queryset = self.get_queryset_filtrado_por_organizacion() # Tambien filtra por importador
# if user.is_superuser:
# return self.queryset
# # return self.queryset.filter(organizacion_id=user.organizacion.id)
# else:
# return self.queryset.filter(organizacion_id=user.organizacion.id)
return queryset
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from celery.result import AsyncResult
class TaskStatusView(APIView):
"""
Vista para consultar el estado de tareas de Celery.
"""
permission_classes = [IsAuthenticated]
def get(self, request, task_id):
"""
Consulta el estado de una tarea de Celery.
Returns:
- PENDING: La tarea está esperando ser procesada
- STARTED: La tarea ha sido iniciada
- SUCCESS: La tarea se completó exitosamente
- FAILURE: La tarea falló
- RETRY: La tarea está reintentando
"""
try:
task_result = AsyncResult(task_id)
response_data = {
'task_id': task_id,
'status': task_result.state,
'ready': task_result.ready(),
'successful': task_result.successful() if task_result.ready() else None,
}
if task_result.ready() and task_result.successful():
try:
response_data['result'] = task_result.result
except Exception:
pass
if task_result.state == 'FAILURE':
response_data['error'] = str(task_result.info)
if task_result.state == 'STARTED':
response_data['info'] = str(task_result.info) if task_result.info else None
return Response(response_data, status=status.HTTP_200_OK)
except Exception as e:
return Response(
{'error': f'Error al consultar tarea: {str(e)}'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)