# Imports de Django from django.shortcuts import render from django.http import Http404 from django.urls import reverse_lazy, reverse from django.core.mail import send_mail from django.views.generic.edit import CreateView from django.views.generic.list import ListView from django.contrib import messages from django.db.models import Case, When, Value, BooleanField,Q from django.contrib.auth.models import Permission, User from django.contrib.contenttypes.models import ContentType from django.contrib.auth import login from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin from django.utils import timezone # Imports de Django REST framework from rest_framework import viewsets from rest_framework.authentication import TokenAuthentication, BasicAuthentication from rest_framework.views import APIView from rest_framework.authtoken.models import Token from rest_framework import authentication from rest_framework.response import Response from rest_framework import status from rest_framework.permissions import IsAuthenticated from rest_framework.pagination import PageNumberPagination from rest_framework import generics # Imports de allauth from allauth.account.models import EmailConfirmation, EmailAddress from allauth.account.forms import SignupForm # Imports de tus modelos y serializadores from .permissions import ActiveTokenSessionPerm, TokenCheckSession from .models import Sistemas_por_cliente_A24, ClientesA24, DeviceA24, ActiveTokenSession, Modulo, Permisos_A24 from Sistemas.models import Sistema, BitacoraErrores from Sistemas.permissions import ItsAdminToken, HasAuthorizationHeader, CheckPermiso from .forms import ClienteForm_IMMEX from .serializers import (ClientesA24Serailizer, SerialiazerA24, SignupSerializer, Sistema_Serializer, Sistema_Por_Cliente_Serializer, DeviceA24_admin_Serialiazer, CustomPermissionSerializer, ModulosSerializer, Permisos_A24_Serializer ) # Otras bibliotecas y módulos import urllib.parse import traceback import json from io import StringIO import csv class Sistemas_xCliente_IMMEX_ListView(UserPassesTestMixin,LoginRequiredMixin, ListView): model = Sistemas_por_cliente_A24 paginate_by = 100 template_name = 'IMMEX/xclientes/lista.html' def test_func(self): res = self.request.user.groups.filter(name= 'admin_soft') if not res: messages.error(self.request, 'Lo sentimos. La página que buscas no está disponible o no cuentas con los permisos.') return res class Sistemas_xCliente_IMMEX_CreateView(UserPassesTestMixin,LoginRequiredMixin,CreateView): model = Sistemas_por_cliente_A24 fields = ['id_sistema', 'cliente', 'num_licencias'] template_name = 'IMMEX/xclientes/sistema_create_IMMEX.html' success_url = reverse_lazy('sistemasXcli_IMMEX') def test_func(self): res = self.request.user.groups.filter(name= 'admin_soft') if not res: messages.error(self.request, 'Lo sentimos. La página que buscas no está disponible o no cuentas con los permisos.') return res class ClientesIMMEX_CreateView(CreateView): model = ClientesA24 form_class = ClienteForm_IMMEX success_url = '/IMMEX/' template_name = 'IMMEX/xclientes/edit_cliente.html' def form_valid(self, form): response = super().form_valid(form) if self.request.headers.get('X-Requested-With') == 'XMLHttpRequest': data={ 'id':self.object.id, 'RFC':self.object.RFC, 'Nombre':self.object.Nombre, 'Activo':self.object.Activo, } return JsonResponse(data) else: return response def form_invalid(self,form): response = super().form_invalid(form) if self.request.headers.get('X-Requested-With') == 'XMLHttpRequest': errors = form.errors.as_text() return JsonResponse({'errors':f'{errors}'},status=200,content_type='application/json') else: return response """---------API VIEWS---------""" class ChecarPermisos(APIView): authentication_classes = [TokenAuthentication] permission_classes = [IsAuthenticated, HasAuthorizationHeader, TokenCheckSession] def get(self,request): if 'Response-Type' not in request.headers: return Response({"ACCESO":"OK"}) else: ct= request.headers['Response-Type'] response = Response("ACCESS:OK", content_type=ct) return response class LoginIMMEX(APIView): authentication_classes = [TokenAuthentication] permission_classes = [IsAuthenticated, HasAuthorizationHeader, ActiveTokenSessionPerm] def get(self,request): if 'Response-Type' not in request.headers: return Response({'username':request.user.username}) else: print(request.headers['Response-Type']) ct= request.headers['Response-Type'] response = Response("ACCESS:OK", content_type=ct) return response def post(self, request): try: username = request.data.get('username') password = request.data.get('password') user = authentication.authenticate(request, username=username, password=password) if user is not None: email_address = user.emailaddress_set.first() if email_address: if email_address.verified: # User is authenticated and email is verified # Proceed with session creation or any other logic token, created = Token.objects.get_or_create(user=user) return Response({'access': True, 'message':f'Bienvenido {user.first_name}','token': token.key}) else: return Response({'access': False, 'message': 'El correo asociado con este usuario no está verificado.'}) else: return Response({'access': False, 'message': 'No se encuentra una dirección de correo asociada con este usuario.'}) else: return Response({'access': False, 'message': 'Credenciales de inicio de sesión inválidas.'}) except authentication.exceptions.AuthenticationFailed as ex: return Response({'access': False, 'message': 'Error de autenticación: ' + str(ex)}) except Exception as ex: return Response({'access': False, 'message': 'Error durante el inicio de sesión: ' + str(ex)}) class RegistroUsuarios(APIView): """Este API point se usa para los registos de utileriaas""" authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[IsAuthenticated,ItsAdminToken] def post(self,request, *args, **kwargs): try: serializer = SignupSerializer(data=request.data,context={'request':request}) if serializer.is_valid(): user = serializer.save() # Generar la confirmación de correo electrónico email_address = EmailAddress.objects.get(user=user, email=user.email) email_confirmation = EmailConfirmation.create(email_address) self.send_email_confirmation(request, email_confirmation) return Response({'access':True, 'message': f'Registro exitoso, te enviamos un correo electronico "{user.email}" favor confirme su correo. '}) else: return Response({'access':False, 'message': 'Error de validación', 'Error': serializer.errors}) except Exception as E: return Response({'Error':f'Error al registro con datos del usuario {E}', 'isError':True, 'access':False}) def send_email_confirmation(self, request, email_confirmation): email_address = email_confirmation.email_address email = email_address.email email_confirmation.sent = timezone.now() email_confirmation.save() email_confirmation_url = request.build_absolute_uri(reverse('account_confirm_email', args=[email_confirmation.key])) message = f"Por favor, confirma tu correo electrónico en el siguiente enlace: {email_confirmation_url}" send_mail( subject='Confirmación de correo electrónico', message=message, from_email='aduanasoftpruebas@gmail.com', recipient_list=[email], ) class Check_IMMEX_RFC(APIView): """Verifica que el cliente pueda Timbrar""" authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[IsAuthenticated] def post(self,request,*args, **kwargs): rfc= request.data.get('RFC') try: clienteA24 , created = ClientesA24.objects.get_or_create(RFC=rfc) serializer = ClientesA24Serailizer(clienteA24) if created: clienteA24.Activo =True clienteA24.Nombre = rfc clienteA24.save() if not serializer.is_valid: return Response({'Error':f'{serializer.errors}','isError':True},status=200) return Response(serializer.data) except Exception as E: return Response({'Error':f'check_RFC:{E} RFC:{rfc}','isError':True}) class RegisterIMMEX_Device_APIView(APIView): """Register IMMEX Devices se manda el siguiente JSON { "dataBase" : "CuentaGastosIII", "deviceIP" : "192.168.1.28", "deviceOS" : "Microsoft Windows NT 6.1.7601 Service Pack 1", "deviceName" : "ASJUA-WEB05", "clienteA24" : "C&A010417QS6", "sistema" : "CFDI", "MAC" : "F8BC12952BE2" } este es un ejemplo, el clienteA24 y sistema deben ser nombres validos para sus tablas en IMMEX, es decir deben estar dados de alta en AS Admin en IMMEX """ #authentication_classes = (BasicAuthentication, TokenAuthentication, ) #permission_classes=[ItsAdminToken] def post(self,request): try: serializer = SerialiazerA24(data=request.data, context={'request':request}) if serializer.is_valid(): instance =serializer.save() token = instance.token.key if 'Response-Type' not in request.headers: return Response({'token':token}, status=status.HTTP_201_CREATED) else: response = Response(instance.token.key, content_type='text/plain') return response else: return Response({'Error':f'{serializer.errors}','isError':True}, status=status.HTTP_200_OK) except Exception as ex: data_json = json.dumps(request.data) traceback_info = f'{data_json}\n{traceback.format_exc()}' BitacoraErrores.objects.create(level=2, message=str(ex), traceback=traceback_info, view='IMMEX.RegisterIMMEX_Device_APIView') return Response({'Error':f'{ex}','isError':True}, status=status.HTTP_200_OK) def get(self, request): nombre_sistema = request.query_params.get('nombre_sistema') cliente_rfc = request.query_params.get('cliente_rfc') db = request.query_params.get('db') MAC = request.query_params.get('MAC') sistemas_por_cliente = Sistemas_por_cliente_A24.objects.all() if nombre_sistema: sistemas_por_cliente = sistemas_por_cliente.filter(id_sistema__nombre_sistema=nombre_sistema) if cliente_rfc: sistemas_por_cliente = sistemas_por_cliente.filter(cliente__RFC=cliente_rfc) dispositivos = DeviceA24.objects.filter(sistema__nombre_sistema=nombre_sistema, clienteA24__RFC=cliente_rfc) print('dispositivos:',dispositivos) serializer = Sistema_Por_Cliente_Serializer(sistemas_por_cliente, many=True, context={'request': request}) data=serializer.data for item in data: del item['id_sistema'] del item['cliente'] if 'Response-Type' not in request.headers: return Response(data, status=status.HTTP_200_OK) else: print(request.headers['Response-Type']) csv_buffer = StringIO() writer = csv.DictWriter(csv_buffer, fieldnames=serializer.child.fields.keys()) # Escribe los encabezados del CSV #writer.writeheader() # Escribe los datos en el CSV for row in data: writer.writerow(row) # Coloca el puntero del archivo al principio del archivo csv_buffer.seek(0) # Lee la cadena CSV desde el objeto StringIO y devuelve como respuesta HTTP csv_data = csv_buffer.getvalue() print(csv_data) csv_data = csv_data.replace("\r\n", "") response = Response(csv_data, content_type='text/csv') return response class Sistemas_IMMEX_List_APIView(APIView): authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[IsAuthenticated] def get(self, request): nombre_sistema = request.query_params.get('nombre_sistema',None) if nombre_sistema: sistemas = Sistema.objects.filter(nombre_sistema=nombre_sistema) else: sistemas = Sistema.objects.all() serializer = Sistema_Serializer(sistemas,many=True) return Response(serializer.data, status=status.HTTP_200_OK) class Sistema_por_cliente_APIView(APIView): authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[IsAuthenticated] def get(self, request): nombre_sistema = request.query_params.get('nombre_sistema') cliente_rfc = request.query_params.get('cliente_rfc') db = request.query_params.get('db') MAC = request.query_params.get('MAC') sistemas_por_cliente = Sistemas_por_cliente_A24.objects.all() if nombre_sistema: sistemas_por_cliente = sistemas_por_cliente.filter(id_sistema__nombre_sistema=nombre_sistema) if cliente_rfc: sistemas_por_cliente = sistemas_por_cliente.filter(cliente__RFC=cliente_rfc) dispositivos = DeviceA24.objects.filter(sistema__nombre_sistema=nombre_sistema, clienteA24__RFC=cliente_rfc) serializer = Sistema_Por_Cliente_Serializer(sistemas_por_cliente, many=True, context={'request': request}) data=serializer.data for item in data: del item['id_sistema'] del item['cliente'] return Response(serializer.data, status=status.HTTP_200_OK) def post(self, request): try: context = { 'clienteA24':request.data.get('clienteA24'), 'DeviceA24':request.data.get('DeviceA24') } serializer = Sistema_Por_Cliente_Serializer(data=request.data, context=context) if serializer.is_valid(): serializer.save() return Response(serializer.data, status=status.HTTP_201_CREATED) else: return Response({'Error':f'{serializer.errors}','isError':True}, status=status.HTTP_200_OK) except Exception as ex: data_json = json.dumps(request.data) traceback_info = f'{data_json}\n{traceback.format_exc()}' BitacoraErrores.objects.create(level=2, message=str(ex), traceback=traceback_info, \ view='IMMEX.Sistema_por_cliente_APIView') return Response({'Error':f'{ex}','isError':True}, status=status.HTTP_200_OK) #CRUD Clientes IMMEX (APP de WINDEV 27) class MyPage(PageNumberPagination): page_size =100 page_size_query_param = 'page_size' max_page_size = 1 class ClientesA24List(generics.ListCreateAPIView): authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[ItsAdminToken] queryset = ClientesA24.objects.all() serializer_class = ClientesA24Serailizer pagination_class = MyPage ordering_fields = ('RFC',"Nombre", 'Activo') def get_queryset(self): queryset = ClientesA24.objects.all() # Filtrar por RFC si se proporciona como parámetro de consulta rfc = self.request.query_params.get('RFC') Nombre = self.request.query_params.get('Nombre') if rfc: queryset = queryset.filter(RFC__icontains=rfc) if Nombre: queryset = queryset.filter(Nombre__icontains=Nombre) # Aplicar ordenación si se proporciona como parámetro de consulta ordering = self.request.query_params.get('ordering') if ordering: queryset = queryset.order_by(ordering) return queryset class ClientesA24Detail(APIView): authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[ ItsAdminToken] def get_object(self, pk): try: return ClientesA24.objects.get(pk=pk) except ClientesA24.DoesNotExist: raise Http404 # Obtener un cliente por ID (GET) def get(self, request, pk): try: cliente = self.get_object(pk) serializer = ClientesA24Serailizer(cliente) return Response(serializer.data) except Exception as ex: error_message ={'Error':str(ex), 'isError':True} data_json = json.dumps(error_message) BitacoraErrores.objects.create(level=2, message=ex,\ traceback=traceback.format_exc(),\ view='IMMEX.ClientesA24Detail.get') return Response(data_json,status=status.HTTP_200_OK) # Actualizar un cliente por ID (PUT) def put(self, request, pk): try: cliente = self.get_object(pk) serializer = ClientesA24Serailizer(cliente, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data,status=status.HTTP_200_OK) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) except Exception as ex: error_message ={'Error':str(ex), 'isError':True} data_json = json.dumps(error_message) BitacoraErrores.objects.create(level=2, message=ex,\ traceback=traceback.format_exc(),\ view='IMMEX.ClientesA24Detail.put') return Response(data_json,status=status.HTTP_200_OK) # Eliminar un cliente por ID (DELETE) def delete(self, request, pk): try: cliente = self.get_object(pk) cliente.delete() return Response({"pk":pk},status=status.HTTP_200_OK) except Exception as ex: error_message ={'Error':str(ex), 'isError':True} data_json = json.dumps(error_message) BitacoraErrores.objects.create(level=2, message=ex,\ traceback=traceback.format_exc(),\ view='IMMEX.ClientesA24Detail.delete') return Response(data_json,status=status.HTTP_200_OK) #-----ADMIN AREA class DeviceA24List(generics.ListAPIView): #queryset = DeviceA24.objects.all() serializer_class = DeviceA24_admin_Serialiazer pagination_class = MyPage authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[ ItsAdminToken] def get_queryset(self): try: queryset = DeviceA24.objects.all() # Filtrar por clienteA24 si se proporciona como parámetro de consulta clienteA24 = self.request.query_params.get('clienteA24') if clienteA24: queryset = queryset.filter(clienteA24__RFC__icontains=clienteA24) # # Aplicar ordenación si se proporciona como parámetro de consulta # ordering = self.request.query_params.get('ordering') # if ordering: # queryset = queryset.order_by(ordering) return queryset except Exception as ex: error_message ={'Error':str(ex), 'isError':True} data_json = json.dumps(error_message) BitacoraErrores.objects.create(level=2, message=ex,\ traceback=traceback.format_exc(),\ view='IMMEX.DeviceA24List') return Response(data_json,status=status.HTTP_200_OK) class DeviceA24Detail(generics.RetrieveUpdateDestroyAPIView): queryset = DeviceA24.objects.all() serializer_class = DeviceA24_admin_Serialiazer authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[ ItsAdminToken] # Método para recuperar un registro def retrieve(self, request, *args, **kwargs): try: instance = self.get_object() serializer = self.get_serializer(instance) return Response(serializer.data) except Exception as ex: error_message ={'Error':str(ex), 'isError':True} data_json = json.dumps(error_message) BitacoraErrores.objects.create(level=2, message=ex,\ traceback=traceback.format_exc(),\ view='IMMEX.DeviceA24Detail.retrieve') return Response(data_json,status=status.HTTP_200_OK) # Método para actualizar un registro def update(self, request, *args, **kwargs): try: instance = self.get_object() data=request.data #si se desea agregar o quitar campos excluded se tiene que cambiar el required=False de su campo serializer excluded_fields = ['id','clienteA24','sistema','username','timestamp','token'] for field in excluded_fields: if field in data: data.pop(field) serializer = self.get_serializer(instance, data=data) serializer.is_valid(raise_exception=True) serializer.save() return Response(serializer.data) except Exception as ex: error_message ={'Error':str(ex), 'isError':True} data_json = json.dumps(error_message) BitacoraErrores.objects.create(level=2, message=ex,\ traceback=traceback.format_exc(),\ view='IMMEX.DeviceA24Detail.update') return Response(data_json,status=status.HTTP_200_OK) # Método para eliminar un registro def destroy(self, request, *args, **kwargs): #solo se elimina la instancia de deviceA24. # #se puede agregar un query parameter para eliminar todo el User (pendiente) try: instance = self.get_object() instance.delete() return Response(status=204) except Exception as ex: error_message ={'Error':str(ex), 'isError':True} data_json = json.dumps(error_message) BitacoraErrores.objects.create(level=2, message=ex,\ traceback=traceback.format_exc(),\ view='IMMEX.DeviceA24Detail.destroy') return Response(data_json,status=status.HTTP_200_OK) # Método para listar registros (opcional, dependiendo de tus necesidades) def list(self, request, *args, **kwargs): queryset = self.get_queryset() serializer = self.get_serializer(queryset, many=True) return Response(serializer.data) class PermissionListCreateAPIView(viewsets.ModelViewSet): queryset = Permission.objects.all() # Asegúrate de tener un serializer adecuado serializer_class = CustomPermissionSerializer authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[ ItsAdminToken] def get_queryset(self): app_label = self.request.query_params.get('app_label') user_id = self.request.query_params.get('user_id') queryset = Permission.objects.all() print('TODOS',queryset.count()) if app_label: content_types = ContentType.objects.filter(app_label=app_label) queryset = queryset.filter(content_type__in=content_types) print('app',queryset.count()) if user_id: param_user = User.objects.get(id=user_id) # Anotamos los permisos con True si el usuario los tiene, False en caso contrario # queryset = queryset.annotate( # activo=Case( # When(user=param_user, then=Value(True)), # default=Value(False), # output_field=BooleanField() # ) # ) user_permissions = param_user.user_permissions.all() queryset = queryset.filter(Q(pk__in=user_permissions)) print('user',queryset.count()) return queryset def list(self, request, *args, **kwargs): """Obtén la lista de permisos""" queryset = self.get_queryset() if not self.request.query_params.get('user_id'): data={"error":"favor de proporcionar el 'user_id' en los query params, en este caso solo para el GET"} return Response(data,status=status.HTTP_200_OK) serializer = self.get_serializer(queryset, many=True) # Agrega datos personalizados a la respuesta data = { "status": "success", "message": "Lista de permisos recuperada exitosamente", "data": serializer.data } return Response(data, status=status.HTTP_200_OK) def create(self, request, *args, **kwargs): """ Obtén el ID del usuario del JSON de la solicitud GET en esste metodo se tiene que agregar el user_id required """ user_id = request.data.get('user_id') try: # Recupera el usuario por su ID user = User.objects.get(id=user_id) # Obtén la lista de permisos del JSON de la solicitud permissions_data = request.data.get('permissions', []) print('permissions_data',permissions_data) for perm_data in permissions_data: # Recupera el ID del permiso de cada objeto en la lista permission_id = perm_data.get('id') print(permission_id) try: # Recupera el permiso por su ID permission = Permission.objects.get(id=permission_id) # Asigna el permiso al usuario user.user_permissions.add(permission) except Permission.DoesNotExist: return Response({"error": f"El permiso con ID {permission_id} no existe"}, status=status.HTTP_400_BAD_REQUEST) return Response({"message": "Permisos asignados correctamente al usuario"}, status=status.HTTP_201_CREATED) except User.DoesNotExist: return Response({"error": "El usuario no existe"}, status=status.HTTP_400_BAD_REQUEST) class ModulosListCreateAPIView(viewsets.ModelViewSet): """""" queryset = Modulo.objects.all() pagination_class = MyPage serializer_class = ModulosSerializer authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[ ItsAdminToken] def get_queryset(self): """""" queryset = Modulo.objects.all() return queryset def create(self, request, *args, **kwargs): """se define el create para poder enviar el request.data ya que de forma default en la clase viewset no se envia el request.data""" serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) self.perform_create(serializer) headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) class Permisos_A24_ModelViewSet(viewsets.ModelViewSet): """""" queryset = Permisos_A24.objects.all() pagination_class = MyPage serializer_class = Permisos_A24_Serializer authentication_classes = (BasicAuthentication, TokenAuthentication, ) permission_classes=[ ItsAdminToken] def get_queryset(self): """""" permiso = self.request.query_params.get('permiso') queryset = super(Permisos_A24_ModelViewSet, self).get_queryset() if permiso: queryset = queryset.filter(nombre=permiso) return queryset def create(self, request, *args, **kwargs): """se agrego explicitamente el create para poder manejar el serializador si es que se necesita enviar el modulo y asginar lo """ serializer = self.get_serializer(data=request.data.get('permisos', []), many=True) serializer.is_valid(raise_exception=True) self.perform_create(serializer) headers = self.get_success_headers(serializer.data) return Response(serializer.data,status=status.HTTP_201_CREATED, headers=headers) # def update(self,request,*args,**kwargs): # pk=