Files
AS_timbres/IMMEX/views.py
2023-11-30 10:42:06 -06:00

584 lines
23 KiB
Python

# Imports de Django
from django.shortcuts import render
from django.http import Http404
from django.urls import reverse_lazy, reverse
from django.core.mail import send_mail
from django.views.generic.edit import CreateView
from django.views.generic.list import ListView
from django.contrib import messages
from django.db.models import Case, When, Value, BooleanField
from django.contrib.auth.models import Permission, User
from django.contrib.contenttypes.models import ContentType
from django.contrib.auth import login
from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin
from django.utils import timezone
# Imports de Django REST framework
from rest_framework import viewsets
from rest_framework.authentication import TokenAuthentication, BasicAuthentication
from rest_framework.views import APIView
from rest_framework.authtoken.models import Token
from rest_framework import authentication
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework.pagination import PageNumberPagination
from rest_framework import generics
# Imports de allauth
from allauth.account.models import EmailConfirmation, EmailAddress
from allauth.account.forms import SignupForm
# Imports de tus modelos y serializadores
from .permissions import ActiveTokenSessionPerm, TokenCheckSession
from .models import Sistemas_por_cliente_A24, ClientesA24, DeviceA24, ActiveTokenSession, Modulo
from Sistemas.models import Sistema, BitacoraErrores
from Sistemas.permissions import ItsAdminToken, HasAuthorizationHeader, CheckPermiso
from .forms import ClienteForm_IMMEX
from .serializers import (ClientesA24Serailizer, SerialiazerA24, SignupSerializer,
Sistema_Serializer, Sistema_Por_Cliente_Serializer,
DeviceA24_admin_Serialiazer, CustomPermissionSerializer,
ModulosSerializer
)
# Otras bibliotecas y módulos
import urllib.parse
import traceback
import json
from io import StringIO
import csv
class Sistemas_xCliente_IMMEX_ListView(UserPassesTestMixin,LoginRequiredMixin, ListView):
model = Sistemas_por_cliente_A24
paginate_by = 100
template_name = 'IMMEX/xclientes/lista.html'
def test_func(self):
res = self.request.user.groups.filter(name= 'admin_soft')
if not res:
messages.error(self.request, 'Lo sentimos. La página que buscas no está disponible o no cuentas con los permisos.')
return res
class Sistemas_xCliente_IMMEX_CreateView(UserPassesTestMixin,LoginRequiredMixin,CreateView):
model = Sistemas_por_cliente_A24
fields = ['id_sistema', 'cliente', 'num_licencias']
template_name = 'IMMEX/xclientes/sistema_create_IMMEX.html'
success_url = reverse_lazy('sistemasXcli_IMMEX')
def test_func(self):
res = self.request.user.groups.filter(name= 'admin_soft')
if not res:
messages.error(self.request, 'Lo sentimos. La página que buscas no está disponible o no cuentas con los permisos.')
return res
class ClientesIMMEX_CreateView(CreateView):
model = ClientesA24
form_class = ClienteForm_IMMEX
success_url = '/IMMEX/'
template_name = 'IMMEX/xclientes/edit_cliente.html'
def form_valid(self, form):
response = super().form_valid(form)
if self.request.headers.get('X-Requested-With') == 'XMLHttpRequest':
data={
'id':self.object.id,
'RFC':self.object.RFC,
'Nombre':self.object.Nombre,
'Activo':self.object.Activo,
}
return JsonResponse(data)
else:
return response
def form_invalid(self,form):
response = super().form_invalid(form)
if self.request.headers.get('X-Requested-With') == 'XMLHttpRequest':
errors = form.errors.as_text()
return JsonResponse({'errors':f'{errors}'},status=200,content_type='application/json')
else:
return response
"""---------API VIEWS---------"""
class ChecarPermisos(APIView):
authentication_classes = [TokenAuthentication]
permission_classes = [IsAuthenticated, HasAuthorizationHeader, TokenCheckSession]
def get(self,request):
if 'Response-Type' not in request.headers:
Response({"ACCESO":"OK"})
else:
ct= request.headers['Response-Type']
response = Response("ACCESS:OK", content_type=ct)
return response
class LoginIMMEX(APIView):
authentication_classes = [TokenAuthentication]
permission_classes = [IsAuthenticated, HasAuthorizationHeader, ActiveTokenSessionPerm]
def get(self,request):
if 'Response-Type' not in request.headers:
return Response({'username':request.user.username})
else:
print(request.headers['Response-Type'])
ct= request.headers['Response-Type']
response = Response("ACCESS:OK", content_type=ct)
return response
def post(self, request):
try:
username = request.data.get('username')
password = request.data.get('password')
user = authentication.authenticate(request, username=username, password=password)
if user is not None:
email_address = user.emailaddress_set.first()
if email_address:
if email_address.verified:
# User is authenticated and email is verified
# Proceed with session creation or any other logic
token, created = Token.objects.get_or_create(user=user)
return Response({'access': True, 'message':f'Bienvenido {user.first_name}','token': token.key})
else:
return Response({'access': False, 'message': 'El correo asociado con este usuario no está verificado.'})
else:
return Response({'access': False, 'message': 'No se encuentra una dirección de correo asociada con este usuario.'})
else:
return Response({'access': False, 'message': 'Credenciales de inicio de sesión inválidas.'})
except authentication.exceptions.AuthenticationFailed as ex:
return Response({'access': False, 'message': 'Error de autenticación: ' + str(ex)})
except Exception as ex:
return Response({'access': False, 'message': 'Error durante el inicio de sesión: ' + str(ex)})
class RegistroUsuarios(APIView):
authentication_classes = (BasicAuthentication, TokenAuthentication, )
permission_classes=[IsAuthenticated,ItsAdminToken]
def post(self,request, *args, **kwargs):
try:
serializer = SignupSerializer(data=request.data,context={'request':request})
if serializer.is_valid():
user = serializer.save()
# Generar la confirmación de correo electrónico
email_address = EmailAddress.objects.get(user=user, email=user.email)
email_confirmation = EmailConfirmation.create(email_address)
self.send_email_confirmation(request, email_confirmation)
return Response({'access':True, 'message': f'Registro exitoso, te enviamos un correo electronico "{user.email}" favor confirme su correo. '})
else:
return Response({'access':False, 'message': 'Error de validación', 'Error': serializer.errors})
except Exception as E:
return Response({'Error':f'Error al registro con datos del usuario {E}', 'isError':True, 'access':False})
def send_email_confirmation(self, request, email_confirmation):
email_address = email_confirmation.email_address
email = email_address.email
email_confirmation.sent = timezone.now()
email_confirmation.save()
email_confirmation_url = request.build_absolute_uri(reverse('account_confirm_email', args=[email_confirmation.key]))
message = f"Por favor, confirma tu correo electrónico en el siguiente enlace: {email_confirmation_url}"
send_mail(
subject='Confirmación de correo electrónico',
message=message,
from_email='aduanasoftpruebas@gmail.com',
recipient_list=[email],
)
class Check_IMMEX_RFC(APIView):
"""Verifica que el cliente pueda Timbrar"""
authentication_classes = (BasicAuthentication, TokenAuthentication, )
permission_classes=[IsAuthenticated]
def post(self,request,*args, **kwargs):
rfc= request.data.get('RFC')
try:
clienteA24 , created = ClientesA24.objects.get_or_create(RFC=rfc)
serializer = ClientesA24Serailizer(clienteA24)
if created:
clienteA24.Activo =True
clienteA24.Nombre = rfc
clienteA24.save()
if not serializer.is_valid:
return Response({'Error':f'{serializer.errors}','isError':True},status=200)
return Response(serializer.data)
except Exception as E:
return Response({'Error':f'check_RFC:{E} RFC:{rfc}','isError':True})
class RegisterIMMEX_Device_APIView(APIView):
"""Register IMMEX Devices
se manda el siguiente JSON
{ "dataBase" : "CuentaGastosIII",
"deviceIP" : "192.168.1.28",
"deviceOS" : "Microsoft Windows NT 6.1.7601 Service Pack 1",
"deviceName" : "ASJUA-WEB05",
"clienteA24" : "C&A010417QS6",
"sistema" : "CFDI",
"MAC" : "F8BC12952BE2"
}
este es un ejemplo, el clienteA24 y sistema deben ser nombres validos para
sus tablas en IMMEX, es decir deben estar dados de alta en AS Admin en IMMEX
"""
#authentication_classes = (BasicAuthentication, TokenAuthentication, )
#permission_classes=[ItsAdminToken]
def post(self,request):
try:
serializer = SerialiazerA24(data=request.data, context={'request':request})
if serializer.is_valid():
instance =serializer.save()
token = instance.token.key
if 'Response-Type' not in request.headers:
return Response({'token':token}, status=status.HTTP_201_CREATED)
else:
response = Response(instance.token.key, content_type='text/plain')
return response
else:
return Response({'Error':f'{serializer.errors}','isError':True}, status=status.HTTP_200_OK)
except Exception as ex:
data_json = json.dumps(request.data)
traceback_info = f'{data_json}\n{traceback.format_exc()}'
BitacoraErrores.objects.create(level=2, message=str(ex), traceback=traceback_info,
view='IMMEX.RegisterIMMEX_Device_APIView')
return Response({'Error':f'{ex}','isError':True}, status=status.HTTP_200_OK)
def get(self, request):
nombre_sistema = request.query_params.get('nombre_sistema')
cliente_rfc = request.query_params.get('cliente_rfc')
db = request.query_params.get('db')
MAC = request.query_params.get('MAC')
sistemas_por_cliente = Sistemas_por_cliente_A24.objects.all()
if nombre_sistema:
sistemas_por_cliente = sistemas_por_cliente.filter(id_sistema__nombre_sistema=nombre_sistema)
if cliente_rfc:
sistemas_por_cliente = sistemas_por_cliente.filter(cliente__RFC=cliente_rfc)
dispositivos = DeviceA24.objects.filter(sistema__nombre_sistema=nombre_sistema, clienteA24__RFC=cliente_rfc)
print('dispositivos:',dispositivos)
serializer = Sistema_Por_Cliente_Serializer(sistemas_por_cliente, many=True,
context={'request': request})
data=serializer.data
for item in data:
del item['id_sistema']
del item['cliente']
if 'Response-Type' not in request.headers:
return Response(data, status=status.HTTP_200_OK)
else:
print(request.headers['Response-Type'])
csv_buffer = StringIO()
writer = csv.DictWriter(csv_buffer, fieldnames=serializer.child.fields.keys())
# Escribe los encabezados del CSV
#writer.writeheader()
# Escribe los datos en el CSV
for row in data:
writer.writerow(row)
# Coloca el puntero del archivo al principio del archivo
csv_buffer.seek(0)
# Lee la cadena CSV desde el objeto StringIO y devuelve como respuesta HTTP
csv_data = csv_buffer.getvalue()
print(csv_data)
csv_data = csv_data.replace("\r\n", "")
response = Response(csv_data, content_type='text/csv')
return response
class Sistemas_IMMEX_List_APIView(APIView):
authentication_classes = (BasicAuthentication, TokenAuthentication, )
permission_classes=[IsAuthenticated]
def get(self, request):
sistemas = Sistema.objects.all()
serializer = Sistema_Serializer(sistemas,many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
class Sistema_por_cliente_APIView(APIView):
authentication_classes = (BasicAuthentication, TokenAuthentication, )
permission_classes=[IsAuthenticated]
def get(self, request):
nombre_sistema = request.query_params.get('nombre_sistema')
cliente_rfc = request.query_params.get('cliente_rfc')
db = request.query_params.get('db')
MAC = request.query_params.get('MAC')
sistemas_por_cliente = Sistemas_por_cliente_A24.objects.all()
if nombre_sistema:
sistemas_por_cliente = sistemas_por_cliente.filter(id_sistema__nombre_sistema=nombre_sistema)
if cliente_rfc:
sistemas_por_cliente = sistemas_por_cliente.filter(cliente__RFC=cliente_rfc)
dispositivos = DeviceA24.objects.filter(sistema__nombre_sistema=nombre_sistema, clienteA24__RFC=cliente_rfc)
serializer = Sistema_Por_Cliente_Serializer(sistemas_por_cliente, many=True,
context={'request': request})
data=serializer.data
for item in data:
del item['id_sistema']
del item['cliente']
return Response(serializer.data, status=status.HTTP_200_OK)
def post(self, request):
try:
context = {
'clienteA24':request.data.get('clienteA24'),
'DeviceA24':request.data.get('DeviceA24')
}
serializer = Sistema_Por_Cliente_Serializer(data=request.data, context=context)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
else:
return Response({'Error':f'{serializer.errors}','isError':True}, status=status.HTTP_200_OK)
except Exception as ex:
data_json = json.dumps(request.data)
traceback_info = f'{data_json}\n{traceback.format_exc()}'
BitacoraErrores.objects.create(level=2, message=str(ex), traceback=traceback_info, \
view='IMMEX.Sistema_por_cliente_APIView')
return Response({'Error':f'{ex}','isError':True}, status=status.HTTP_200_OK)
#CRUD Clientes IMMEX
class MyPage(PageNumberPagination):
page_size =1
page_size_query_param = 'page_size'
max_page_size = 1
class ClientesA24List(generics.ListCreateAPIView):
authentication_classes = (BasicAuthentication, TokenAuthentication, )
permission_classes=[ItsAdminToken]
queryset = ClientesA24.objects.all()
serializer_class = ClientesA24Serailizer
pagination_class = MyPage
ordering_fields = ('RFC',"Nombre", 'Activo')
def get_queryset(self):
queryset = ClientesA24.objects.all()
# Filtrar por RFC si se proporciona como parámetro de consulta
rfc = self.request.query_params.get('RFC')
Nombre = self.request.query_params.get('Nombre')
if rfc:
queryset = queryset.filter(RFC__icontains=rfc)
if Nombre:
queryset = queryset.filter(Nombre__icontains=Nombre)
# Aplicar ordenación si se proporciona como parámetro de consulta
ordering = self.request.query_params.get('ordering')
if ordering:
queryset = queryset.order_by(ordering)
return queryset
class ClientesA24Detail(APIView):
authentication_classes = (BasicAuthentication, TokenAuthentication, )
permission_classes=[ ItsAdminToken]
def get_object(self, pk):
try:
return ClientesA24.objects.get(pk=pk)
except ClientesA24.DoesNotExist:
raise Http404
# Obtener un cliente por ID (GET)
def get(self, request, pk):
print(request.headers)
cliente = self.get_object(pk)
serializer = ClientesA24Serailizer(cliente)
return Response(serializer.data)
# Actualizar un cliente por ID (PUT)
def put(self, request, pk):
cliente = self.get_object(pk)
print(cliente)
serializer = ClientesA24Serailizer(cliente, data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data,status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
# Eliminar un cliente por ID (DELETE)
def delete(self, request, pk):
cliente = self.get_object(pk)
cliente.delete()
return Response({"pk":pk},status=status.HTTP_200_OK)
#-----ADMIN AREA
class DeviceA24List(generics.ListCreateAPIView):
#queryset = DeviceA24.objects.all()
serializer_class = DeviceA24_admin_Serialiazer
pagination_class = MyPage
authentication_classes = (BasicAuthentication, TokenAuthentication, )
permission_classes=[ ItsAdminToken]
def get_queryset(self):
queryset = DeviceA24.objects.all()
# Filtrar por clienteA24 si se proporciona como parámetro de consulta
clienteA24 = self.request.query_params.get('clienteA24')
if clienteA24:
queryset = queryset.filter(clienteA24__RFC__icontains=clienteA24)
# # Aplicar ordenación si se proporciona como parámetro de consulta
# ordering = self.request.query_params.get('ordering')
# if ordering:
# queryset = queryset.order_by(ordering)
return queryset
class DeviceA24Detail(generics.RetrieveUpdateDestroyAPIView):
queryset = DeviceA24.objects.all()
serializer_class = DeviceA24_admin_Serialiazer
authentication_classes = (BasicAuthentication, TokenAuthentication, )
permission_classes=[ ItsAdminToken]
# Método para recuperar un registro
def retrieve(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance)
return Response(serializer.data)
# Método para actualizar un registro
def update(self, request, *args, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data)
serializer.is_valid(raise_exception=True)
serializer.save()
return Response(serializer.data)
# Método para eliminar un registro
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
instance.delete()
return Response(status=204)
# Método para listar registros (opcional, dependiendo de tus necesidades)
def list(self, request, *args, **kwargs):
queryset = self.get_queryset()
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class PermissionListCreateAPIView(viewsets.ModelViewSet):
queryset = Permission.objects.all()
# Asegúrate de tener un serializer adecuado
serializer_class = CustomPermissionSerializer
authentication_classes = (BasicAuthentication, TokenAuthentication, )
permission_classes=[ ItsAdminToken]
def get_queryset(self):
app_label = self.request.query_params.get('app_label')
user_id = self.request.query_params.get('user_id')
queryset = Permission.objects.all()
if app_label:
content_types = ContentType.objects.filter(app_label=app_label)
queryset = queryset.filter(content_type__in=content_types)
if user_id:
param_user = User.objects.get(id=user_id)
# Anotamos los permisos con True si el usuario los tiene, False en caso contrario
queryset = queryset.annotate(
activo=Case(
When(user=param_user, then=Value(True)),
default=Value(False),
output_field=BooleanField()
)
)
return queryset
def list(self, request, *args, **kwargs):
# Obtén la lista de permisos
queryset = self.get_queryset()
serializer = self.get_serializer(queryset, many=True)
# Agrega datos personalizados a la respuesta
data = {
"status": "success",
"message": "Lista de permisos recuperada exitosamente",
"data": serializer.data
}
return Response(data, status=status.HTTP_200_OK)
def create(self, request, *args, **kwargs):
# Obtén el ID del usuario del JSON de la solicitud
user_id = request.data.get('user_id')
try:
# Recupera el usuario por su ID
user = User.objects.get(id=user_id)
# Obtén la lista de permisos del JSON de la solicitud
permissions_data = request.data.get('permissions', [])
for perm_data in permissions_data:
# Recupera el ID del permiso de cada objeto en la lista
permission_id = perm_data.get('id')
print(permission_id)
try:
# Recupera el permiso por su ID
permission = Permission.objects.get(id=permission_id)
# Asigna el permiso al usuario
user.user_permissions.add(permission)
except Permission.DoesNotExist:
return Response({"error": f"El permiso con ID {permission_id} no existe"}, status=status.HTTP_400_BAD_REQUEST)
return Response({"message": "Permisos asignados correctamente al usuario"}, status=status.HTTP_201_CREATED)
except User.DoesNotExist:
return Response({"error": "El usuario no existe"}, status=status.HTTP_400_BAD_REQUEST)
class ModulosListCreateAPIView(viewsets.ModelViewSet):
queryset = Modulo.objects.all()
pagination_class = MyPage
serializer_class = ModulosSerializer
def get_queryset(self):
queryset = Modulo.objects.all()
return queryset