# Imports de Django from django.shortcuts import render from django.http import Http404 from django.urls import reverse_lazy, reverse from django.core.mail import send_mail from django.views.generic.edit import CreateView from django.views.generic.list import ListView from django.contrib import messages from django.db.models import Case, When, Value, BooleanField from django.contrib.auth.models import Permission, User from django.contrib.contenttypes.models import ContentType from django.contrib.auth import login from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin from django.utils import timezone # Imports de Django REST framework from rest_framework import viewsets from rest_framework.authentication import TokenAuthentication, BasicAuthentication from rest_framework.views import APIView from rest_framework.authtoken.models import Token from rest_framework import authentication from rest_framework.response import Response from rest_framework import status from rest_framework.permissions import IsAuthenticated from rest_framework.pagination import PageNumberPagination from rest_framework import generics # Imports de allauth from allauth.account.models import EmailConfirmation, EmailAddress from allauth.account.forms import SignupForm # Imports de tus modelos y serializadores from .permissions import ActiveTokenSessionPerm, TokenCheckSession from .models import Sistemas_por_cliente_A24, ClientesA24, DeviceA24, ActiveTokenSession, Modulo from Sistemas.models import Sistema, BitacoraErrores from Sistemas.permissions import ItsAdminToken, HasAuthorizationHeader, CheckPermiso from .forms import ClienteForm_IMMEX from .serializers import (ClientesA24Serailizer, SerialiazerA24, SignupSerializer, Sistema_Serializer, Sistema_Por_Cliente_Serializer, DeviceA24_admin_Serialiazer, CustomPermissionSerializer, ModulosSerializer ) # Otras bibliotecas y módulos import urllib.parse import traceback import json from io import StringIO import csv class Sistemas_xCliente_IMMEX_ListView(UserPassesTestMixin,LoginRequiredMixin, ListView): model = Sistemas_por_cliente_A24 paginate_by = 100 template_name = 'IMMEX/xclientes/lista.html' def test_func(self): res = self.request.user.groups.filter(name= 'admin_soft') if not res: messages.error(self.request, 'Lo sentimos. La página que buscas no está disponible o no cuentas con los permisos.') return res class Sistemas_xCliente_IMMEX_CreateView(UserPassesTestMixin,LoginRequiredMixin,CreateView): model = Sistemas_por_cliente_A24 fields = ['id_sistema', 'cliente', 'num_licencias'] template_name = 'IMMEX/xclientes/sistema_create_IMMEX.html' success_url = reverse_lazy('sistemasXcli_IMMEX') def test_func(self): res = self.request.user.groups.filter(name= 'admin_soft') if not res: messages.error(self.request, 'Lo sentimos. La página que buscas no está disponible o no cuentas con los permisos.') return res class ClientesIMMEX_CreateView(CreateView): model = ClientesA24 form_class = ClienteForm_IMMEX success_url = '/IMMEX/' template_name = 'IMMEX/xclientes/edit_cliente.html' def form_valid(self, form): response = super().form_valid(form) if self.request.headers.get('X-Requested-With') == 'XMLHttpRequest': data={ 'id':self.object.id, 'RFC':self.object.RFC, 'Nombre':self.object.Nombre, 'Activo':self.object.Activo, } return JsonResponse(data) else: return response def form_invalid(self,form): response = super().form_invalid(form) if self.request.headers.get('X-Requested-With') == 'XMLHttpRequest': errors = form.errors.as_text() return JsonResponse({'errors':f'{errors}'},status=200,content_type='application/json') else: return response """---------API VIEWS---------""" class ChecarPermisos(APIView): authentication_classes = [TokenAuthentication] permission_classes = [IsAuthenticated, HasAuthorizationHeader, TokenCheckSession] def get(self,request): if 'Response-Type' not in request.headers: Response({"ACCESO":"OK"}) else: ct= request.headers['Response-Type'] response = Response("ACCESS:OK", content_type=ct) return response class LoginIMMEX(APIView): authentication_classes = [TokenAuthentication] permission_classes = [IsAuthenticated, HasAuthorizationHeader, ActiveTokenSessionPerm] def get(self,request): if 'Response-Type' not in request.headers: return Response({'username':request.user.username}) else: print(request.headers['Response-Type']) ct= request.headers['Response-Type'] response = Response("ACCESS:OK", content_type=ct) return response def post(self, request): try: username = request.data.get('username') password = request.data.get('password') user = authentication.authenticate(request, username=username, password=password) if user is not None: email_address = user.emailaddress_set.first() if email_address: if email_address.verified: # User is authenticated and email is verified # Proceed with session creation or any other logic token, created = Token.objects.get_or_create(user=user) return Response({'access': True, 'message':f'Bienvenido {user.first_name}','token': token.key}) else: return Response({'access': False, 'message': 'El correo asociado con este usuario no está verificado.'}) else: return Response({'access': False, 'message': 'No se encuentra una dirección de correo asociada con este usuario.'}) else: return Response({'access': False, 'message': 'Credenciales de inicio de sesión inválidas.'}) except authentication.exceptions.AuthenticationFailed as ex: return Response({'access': False, 'message': 'Error de autenticación: ' + str(ex)}) except Exception as ex: return Response({'access': False, 'message': 'Error durante el inicio de sesión: ' + str(ex)}) class RegistroUsuarios(APIView): authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[IsAuthenticated,ItsAdminToken] def post(self,request, *args, **kwargs): try: serializer = SignupSerializer(data=request.data,context={'request':request}) if serializer.is_valid(): user = serializer.save() # Generar la confirmación de correo electrónico email_address = EmailAddress.objects.get(user=user, email=user.email) email_confirmation = EmailConfirmation.create(email_address) self.send_email_confirmation(request, email_confirmation) return Response({'access':True, 'message': f'Registro exitoso, te enviamos un correo electronico "{user.email}" favor confirme su correo. '}) else: return Response({'access':False, 'message': 'Error de validación', 'Error': serializer.errors}) except Exception as E: return Response({'Error':f'Error al registro con datos del usuario {E}', 'isError':True, 'access':False}) def send_email_confirmation(self, request, email_confirmation): email_address = email_confirmation.email_address email = email_address.email email_confirmation.sent = timezone.now() email_confirmation.save() email_confirmation_url = request.build_absolute_uri(reverse('account_confirm_email', args=[email_confirmation.key])) message = f"Por favor, confirma tu correo electrónico en el siguiente enlace: {email_confirmation_url}" send_mail( subject='Confirmación de correo electrónico', message=message, from_email='aduanasoftpruebas@gmail.com', recipient_list=[email], ) class Check_IMMEX_RFC(APIView): """Verifica que el cliente pueda Timbrar""" authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[IsAuthenticated] def post(self,request,*args, **kwargs): rfc= request.data.get('RFC') try: clienteA24 , created = ClientesA24.objects.get_or_create(RFC=rfc) serializer = ClientesA24Serailizer(clienteA24) if created: clienteA24.Activo =True clienteA24.Nombre = rfc clienteA24.save() if not serializer.is_valid: return Response({'Error':f'{serializer.errors}','isError':True},status=200) return Response(serializer.data) except Exception as E: return Response({'Error':f'check_RFC:{E} RFC:{rfc}','isError':True}) class RegisterIMMEX_Device_APIView(APIView): """Register IMMEX Devices se manda el siguiente JSON { "dataBase" : "CuentaGastosIII", "deviceIP" : "192.168.1.28", "deviceOS" : "Microsoft Windows NT 6.1.7601 Service Pack 1", "deviceName" : "ASJUA-WEB05", "clienteA24" : "C&A010417QS6", "sistema" : "CFDI", "MAC" : "F8BC12952BE2" } este es un ejemplo, el clienteA24 y sistema deben ser nombres validos para sus tablas en IMMEX, es decir deben estar dados de alta en AS Admin en IMMEX """ #authentication_classes = (BasicAuthentication, TokenAuthentication, ) #permission_classes=[ItsAdminToken] def post(self,request): try: serializer = SerialiazerA24(data=request.data, context={'request':request}) if serializer.is_valid(): instance =serializer.save() token = instance.token.key if 'Response-Type' not in request.headers: return Response({'token':token}, status=status.HTTP_201_CREATED) else: response = Response(instance.token.key, content_type='text/plain') return response else: return Response({'Error':f'{serializer.errors}','isError':True}, status=status.HTTP_200_OK) except Exception as ex: data_json = json.dumps(request.data) traceback_info = f'{data_json}\n{traceback.format_exc()}' BitacoraErrores.objects.create(level=2, message=str(ex), traceback=traceback_info, view='IMMEX.RegisterIMMEX_Device_APIView') return Response({'Error':f'{ex}','isError':True}, status=status.HTTP_200_OK) def get(self, request): nombre_sistema = request.query_params.get('nombre_sistema') cliente_rfc = request.query_params.get('cliente_rfc') db = request.query_params.get('db') MAC = request.query_params.get('MAC') sistemas_por_cliente = Sistemas_por_cliente_A24.objects.all() if nombre_sistema: sistemas_por_cliente = sistemas_por_cliente.filter(id_sistema__nombre_sistema=nombre_sistema) if cliente_rfc: sistemas_por_cliente = sistemas_por_cliente.filter(cliente__RFC=cliente_rfc) dispositivos = DeviceA24.objects.filter(sistema__nombre_sistema=nombre_sistema, clienteA24__RFC=cliente_rfc) print('dispositivos:',dispositivos) serializer = Sistema_Por_Cliente_Serializer(sistemas_por_cliente, many=True, context={'request': request}) data=serializer.data for item in data: del item['id_sistema'] del item['cliente'] if 'Response-Type' not in request.headers: return Response(data, status=status.HTTP_200_OK) else: print(request.headers['Response-Type']) csv_buffer = StringIO() writer = csv.DictWriter(csv_buffer, fieldnames=serializer.child.fields.keys()) # Escribe los encabezados del CSV #writer.writeheader() # Escribe los datos en el CSV for row in data: writer.writerow(row) # Coloca el puntero del archivo al principio del archivo csv_buffer.seek(0) # Lee la cadena CSV desde el objeto StringIO y devuelve como respuesta HTTP csv_data = csv_buffer.getvalue() print(csv_data) csv_data = csv_data.replace("\r\n", "") response = Response(csv_data, content_type='text/csv') return response class Sistemas_IMMEX_List_APIView(APIView): authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[IsAuthenticated] def get(self, request): sistemas = Sistema.objects.all() serializer = Sistema_Serializer(sistemas,many=True) return Response(serializer.data, status=status.HTTP_200_OK) class Sistema_por_cliente_APIView(APIView): authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[IsAuthenticated] def get(self, request): nombre_sistema = request.query_params.get('nombre_sistema') cliente_rfc = request.query_params.get('cliente_rfc') db = request.query_params.get('db') MAC = request.query_params.get('MAC') sistemas_por_cliente = Sistemas_por_cliente_A24.objects.all() if nombre_sistema: sistemas_por_cliente = sistemas_por_cliente.filter(id_sistema__nombre_sistema=nombre_sistema) if cliente_rfc: sistemas_por_cliente = sistemas_por_cliente.filter(cliente__RFC=cliente_rfc) dispositivos = DeviceA24.objects.filter(sistema__nombre_sistema=nombre_sistema, clienteA24__RFC=cliente_rfc) serializer = Sistema_Por_Cliente_Serializer(sistemas_por_cliente, many=True, context={'request': request}) data=serializer.data for item in data: del item['id_sistema'] del item['cliente'] return Response(serializer.data, status=status.HTTP_200_OK) def post(self, request): try: context = { 'clienteA24':request.data.get('clienteA24'), 'DeviceA24':request.data.get('DeviceA24') } serializer = Sistema_Por_Cliente_Serializer(data=request.data, context=context) if serializer.is_valid(): serializer.save() return Response(serializer.data, status=status.HTTP_201_CREATED) else: return Response({'Error':f'{serializer.errors}','isError':True}, status=status.HTTP_200_OK) except Exception as ex: data_json = json.dumps(request.data) traceback_info = f'{data_json}\n{traceback.format_exc()}' BitacoraErrores.objects.create(level=2, message=str(ex), traceback=traceback_info, \ view='IMMEX.Sistema_por_cliente_APIView') return Response({'Error':f'{ex}','isError':True}, status=status.HTTP_200_OK) #CRUD Clientes IMMEX class MyPage(PageNumberPagination): page_size =1 page_size_query_param = 'page_size' max_page_size = 1 class ClientesA24List(generics.ListCreateAPIView): authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[ItsAdminToken] queryset = ClientesA24.objects.all() serializer_class = ClientesA24Serailizer pagination_class = MyPage ordering_fields = ('RFC',"Nombre", 'Activo') def get_queryset(self): queryset = ClientesA24.objects.all() # Filtrar por RFC si se proporciona como parámetro de consulta rfc = self.request.query_params.get('RFC') Nombre = self.request.query_params.get('Nombre') if rfc: queryset = queryset.filter(RFC__icontains=rfc) if Nombre: queryset = queryset.filter(Nombre__icontains=Nombre) # Aplicar ordenación si se proporciona como parámetro de consulta ordering = self.request.query_params.get('ordering') if ordering: queryset = queryset.order_by(ordering) return queryset class ClientesA24Detail(APIView): authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[ ItsAdminToken] def get_object(self, pk): try: return ClientesA24.objects.get(pk=pk) except ClientesA24.DoesNotExist: raise Http404 # Obtener un cliente por ID (GET) def get(self, request, pk): print(request.headers) cliente = self.get_object(pk) serializer = ClientesA24Serailizer(cliente) return Response(serializer.data) # Actualizar un cliente por ID (PUT) def put(self, request, pk): cliente = self.get_object(pk) print(cliente) serializer = ClientesA24Serailizer(cliente, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data,status=status.HTTP_200_OK) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) # Eliminar un cliente por ID (DELETE) def delete(self, request, pk): cliente = self.get_object(pk) cliente.delete() return Response({"pk":pk},status=status.HTTP_200_OK) #-----ADMIN AREA class DeviceA24List(generics.ListCreateAPIView): #queryset = DeviceA24.objects.all() serializer_class = DeviceA24_admin_Serialiazer pagination_class = MyPage authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[ ItsAdminToken] def get_queryset(self): queryset = DeviceA24.objects.all() # Filtrar por clienteA24 si se proporciona como parámetro de consulta clienteA24 = self.request.query_params.get('clienteA24') if clienteA24: queryset = queryset.filter(clienteA24__RFC__icontains=clienteA24) # # Aplicar ordenación si se proporciona como parámetro de consulta # ordering = self.request.query_params.get('ordering') # if ordering: # queryset = queryset.order_by(ordering) return queryset class DeviceA24Detail(generics.RetrieveUpdateDestroyAPIView): queryset = DeviceA24.objects.all() serializer_class = DeviceA24_admin_Serialiazer authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[ ItsAdminToken] # Método para recuperar un registro def retrieve(self, request, *args, **kwargs): instance = self.get_object() serializer = self.get_serializer(instance) return Response(serializer.data) # Método para actualizar un registro def update(self, request, *args, **kwargs): instance = self.get_object() serializer = self.get_serializer(instance, data=request.data) serializer.is_valid(raise_exception=True) serializer.save() return Response(serializer.data) # Método para eliminar un registro def destroy(self, request, *args, **kwargs): instance = self.get_object() instance.delete() return Response(status=204) # Método para listar registros (opcional, dependiendo de tus necesidades) def list(self, request, *args, **kwargs): queryset = self.get_queryset() serializer = self.get_serializer(queryset, many=True) return Response(serializer.data) class PermissionListCreateAPIView(viewsets.ModelViewSet): queryset = Permission.objects.all() # Asegúrate de tener un serializer adecuado serializer_class = CustomPermissionSerializer authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[ ItsAdminToken] def get_queryset(self): app_label = self.request.query_params.get('app_label') user_id = self.request.query_params.get('user_id') queryset = Permission.objects.all() if app_label: content_types = ContentType.objects.filter(app_label=app_label) queryset = queryset.filter(content_type__in=content_types) if user_id: param_user = User.objects.get(id=user_id) # Anotamos los permisos con True si el usuario los tiene, False en caso contrario queryset = queryset.annotate( activo=Case( When(user=param_user, then=Value(True)), default=Value(False), output_field=BooleanField() ) ) return queryset def list(self, request, *args, **kwargs): # Obtén la lista de permisos queryset = self.get_queryset() serializer = self.get_serializer(queryset, many=True) # Agrega datos personalizados a la respuesta data = { "status": "success", "message": "Lista de permisos recuperada exitosamente", "data": serializer.data } return Response(data, status=status.HTTP_200_OK) def create(self, request, *args, **kwargs): # Obtén el ID del usuario del JSON de la solicitud user_id = request.data.get('user_id') try: # Recupera el usuario por su ID user = User.objects.get(id=user_id) # Obtén la lista de permisos del JSON de la solicitud permissions_data = request.data.get('permissions', []) for perm_data in permissions_data: # Recupera el ID del permiso de cada objeto en la lista permission_id = perm_data.get('id') print(permission_id) try: # Recupera el permiso por su ID permission = Permission.objects.get(id=permission_id) # Asigna el permiso al usuario user.user_permissions.add(permission) except Permission.DoesNotExist: return Response({"error": f"El permiso con ID {permission_id} no existe"}, status=status.HTTP_400_BAD_REQUEST) return Response({"message": "Permisos asignados correctamente al usuario"}, status=status.HTTP_201_CREATED) except User.DoesNotExist: return Response({"error": "El usuario no existe"}, status=status.HTTP_400_BAD_REQUEST) class ModulosListCreateAPIView(viewsets.ModelViewSet): queryset = Modulo.objects.all() pagination_class = MyPage serializer_class = ModulosSerializer def get_queryset(self): queryset = Modulo.objects.all() return queryset