Se agregaron cambios y estats para reportes

This commit is contained in:
2025-10-04 22:16:16 -06:00
parent 0aa28b4394
commit 0c6dd348e7
6 changed files with 604 additions and 34 deletions

View File

@@ -1,6 +1,12 @@
from django.urls import path, include
from .views import ExportModelView
from .views import ExportModelView, dashboard_summary
from .views_stats import documentos_por_fecha
from .views_table import table_summary
urlpatterns = [
path('exportmodel/', ExportModelView.as_view(), name='export-model'),
path('dashboard/summary/', dashboard_summary, name='dashboard-summary'),
path('documentos-por-fecha/', documentos_por_fecha, name='documentos-por-fecha'),
#path('table-summary/', table_summary, name='table-summary'),
]

View File

@@ -1,7 +1,14 @@
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from api.customs.models import Pedimento, Cove, EDocument, Partida
from api.record.models import Document
from api.organization.models import Organizacion
from django.db.models import Count, Q
# Registrar endpoint en urls.py:
# path('dashboard/summary/', dashboard_summary)
import csv
import io
from rest_framework.views import APIView
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from .serializers import ExportModelSerializer
@@ -14,43 +21,136 @@ from django.shortcuts import render
from rest_framework import viewsets
from .serializers import ExportModelSerializer
from core.permissions import (
IsSameOrganization,
IsSameOrganizationDeveloper,
IsSameOrganizationAndAdmin,
IsSuperUser
)
from rest_framework.permissions import IsAuthenticated
import csv
import io
import openpyxl
from django.http import HttpResponse
from django.apps import apps
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from rest_framework.permissions import IsAuthenticated
from core.permissions import (
IsSameOrganization,
IsSameOrganizationDeveloper,
IsSameOrganizationAndAdmin,
IsSuperUser
)
from .serializers import ExportModelSerializer
def export_model_to_csv(request, model_name, fields, filters=None):
model = apps.get_model('datastage', model_name)
queryset = model.objects.filter(**(filters or {})).values(*fields)
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = f'attachment; filename="{model_name}.csv"'
writer = csv.DictWriter(response, fieldnames=fields)
writer.writeheader()
for row in queryset:
writer.writerow(row)
return response
def export_model_to_csv(request, model_name, fields, module='datastage', filters=None):
model = apps.get_model(module, model_name)
queryset = model.objects.filter(**(filters or {})).values(*fields)
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = f'attachment; filename="{model_name}.csv"'
writer = csv.DictWriter(response, fieldnames=fields)
writer.writeheader()
for row in queryset:
writer.writerow(row)
return response
def export_model_to_excel(request, model_name, fields, module='datastage', filters=None):
model = apps.get_model(module, model_name)
queryset = model.objects.filter(**(filters or {})).values(*fields)
wb = openpyxl.Workbook()
ws = wb.active
ws.append(fields)
for row in queryset:
# Convertir cada valor a string para asegurar compatibilidad con Excel
row_values = []
for field in fields:
value = row[field]
# Si es UUID u otro objeto, convertirlo a string
if hasattr(value, '__str__'):
value = str(value)
row_values.append(value)
ws.append(row_values)
output = io.BytesIO()
wb.save(output)
output.seek(0)
response = HttpResponse(output.read(
), content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
response['Content-Disposition'] = f'attachment; filename="{model_name}.xlsx"'
return response
class ExportModelView(APIView):
my_tags = ['Reportes']
permission_classes = [IsAuthenticated & (
IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
@swagger_auto_schema(
manual_parameters=[
openapi.Parameter('model', openapi.IN_QUERY, description="Nombre del modelo (ejemplo: Registro500)",
type=openapi.TYPE_STRING, required=True)
],
responses={200: openapi.Response('Campos disponibles', schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'fields': openapi.Schema(type=openapi.TYPE_ARRAY, items=openapi.Items(type=openapi.TYPE_STRING))
}
))}
)
def get(self, request, *args, **kwargs):
"""
Devuelve los campos disponibles para el modelo solicitado.
Ejemplo: /api/reports/exportmodel/?model=Registro500
"""
model_name = request.query_params.get('model')
module = request.query_params.get('module', 'datastage')
if not model_name:
return Response({'error': 'model is required'}, status=status.HTTP_400_BAD_REQUEST)
try:
model = apps.get_model(module, model_name)
except LookupError:
return Response({'error': f'Model {model_name} not found in app {module}'}, status=status.HTTP_404_NOT_FOUND)
fields = [f.name for f in model._meta.fields]
return Response({'fields': fields})
@swagger_auto_schema(
request_body=ExportModelSerializer,
responses={200: 'Archivo generado (Excel o CSV)'}
)
def post(self, request, *args, **kwargs):
model_name = request.data.get('model')
fields = request.data.get('fields')
filters = request.data.get('filters', {})
export_type = request.data.get('type', 'csv')
module = request.data.get('module', 'datastage')
if not model_name or not fields:
return Response({'error': 'model and fields are required'}, status=status.HTTP_400_BAD_REQUEST)
if export_type == 'excel':
return export_model_to_excel(request, model_name, fields, module, filters)
else:
return export_model_to_csv(request, model_name, fields, module, filters)
def export_model_to_excel(request, model_name, fields, filters=None):
model = apps.get_model('datastage', model_name)
queryset = model.objects.filter(**(filters or {})).values(*fields)
wb = openpyxl.Workbook()
ws = wb.active
ws.append(fields)
for row in queryset:
ws.append([row[field] for field in fields])
output = io.BytesIO()
wb.save(output)
output.seek(0)
response = HttpResponse(output.read(), content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
response['Content-Disposition'] = f'attachment; filename="{model_name}.xlsx"'
return response
# Create your views here.
from rest_framework.views import APIView
class ExportModelView(APIView):
my_tags = ['Reportes']
permission_classes = [IsAuthenticated & (
IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
@swagger_auto_schema(
manual_parameters=[
openapi.Parameter('model', openapi.IN_QUERY, description="Nombre del modelo (ejemplo: Registro500)", type=openapi.TYPE_STRING, required=True)
openapi.Parameter('model', openapi.IN_QUERY, description="Nombre del modelo (ejemplo: Registro500)",
type=openapi.TYPE_STRING, required=True)
],
responses={200: openapi.Response('Campos disponibles', schema=openapi.Schema(
type=openapi.TYPE_OBJECT,
@@ -87,7 +187,134 @@ class ExportModelView(APIView):
if not model_name or not fields:
return Response({'error': 'model and fields are required'}, status=status.HTTP_400_BAD_REQUEST)
module = request.data.get('module', 'datastage')
if export_type == 'excel':
return export_model_to_excel(request, model_name, fields, filters)
return export_model_to_excel(request, model_name, fields, module, filters)
else:
return export_model_to_csv(request, model_name, fields, filters)
return export_model_to_csv(request, model_name, fields, module, filters)
# Resumen general para dashboard
@api_view(['GET'])
@permission_classes([
IsAuthenticated
])
def dashboard_summary(request):
org_id = request.query_params.get('organizacion_id')
filters = {}
user = request.user
pedimento_app = request.query_params.get('pedimento_app')
aduana = request.query_params.get('aduana')
patente = request.query_params.get('patente')
regimen = request.query_params.get('regimen')
agente_aduanal = request.query_params.get('agente_aduanal')
tipo_operacion = request.query_params.get('tipo_operacion')
fecha_pago_gte = request.query_params.get('fecha_pago__gte')
fecha_pago_lte = request.query_params.get('fecha_pago__lte')
contribuyente__rfc = request.query_params.get('contribuyente__rfc')
# Si no se especifica organización y el usuario tiene organización, usarla
if not org_id and hasattr(user, 'organizacion') and user.organizacion:
org_id = user.organizacion.id
# Si no es superusuario, filtrar por organización
if org_id and not getattr(user, 'is_superuser', False):
filters['organizacion_id'] = org_id
# Si el usuario pertenece al grupo Importador, filtrar por RFC
if user.groups.filter(name='Importador').exists():
rfc = getattr(user, 'rfc', None)
if rfc:
filters['contribuyente__rfc'] = rfc
if pedimento_app:
filters['pedimento_app'] = pedimento_app
if aduana:
filters['aduana'] = aduana
if patente:
filters['patente'] = patente
if regimen:
filters['regimen'] = regimen
if agente_aduanal:
filters['agente_aduanal'] = agente_aduanal
if tipo_operacion:
filters['tipo_operacion__tipo'] = tipo_operacion
if fecha_pago_gte:
filters['fecha_pago__gte'] = fecha_pago_gte
if fecha_pago_lte:
filters['fecha_pago__lte'] = fecha_pago_lte
if contribuyente__rfc:
filters['contribuyente__rfc'] = contribuyente__rfc
# Filtrar pedimentos
pedimentos_qs = Pedimento.objects.filter(**filters)
pedimentos_total = pedimentos_qs.count()
pedimentos_completos = pedimentos_qs.filter(existe_expediente=True).count()
pedimentos_pendientes = pedimentos_total - pedimentos_completos
# Usar los IDs de pedimentos filtrados para los demás modelos
pedimento_ids = list(pedimentos_qs.values_list('id', flat=True))
coves_total = Cove.objects.filter(pedimento_id__in=pedimento_ids).count()
coves_procesados = Cove.objects.filter(pedimento_id__in=pedimento_ids, cove_descargado=True).count()
acuse_coves_procesados = Cove.objects.filter(pedimento_id__in=pedimento_ids, acuse_cove_descargado=True).count()
acuse_coves_pendientes = coves_total - acuse_coves_procesados
coves_pendientes = coves_total - coves_procesados
edocs_total = EDocument.objects.filter(pedimento_id__in=pedimento_ids).count()
edocs_descargados = EDocument.objects.filter(pedimento_id__in=pedimento_ids, edocument_descargado=True).count()
acuse_descargados = EDocument.objects.filter(pedimento_id__in=pedimento_ids, acuse_descargado=True).count()
edocs_pendientes = edocs_total - edocs_descargados
acuses_pendientes = edocs_total - acuse_descargados
remesas_total = Document.objects.filter(document_type__id=3, pedimento_id__in=pedimento_ids).count()
documentos_descargados = Document.objects.filter(pedimento_id__in=pedimento_ids).count()
partidas_total = Partida.objects.filter(pedimento_id__in=pedimento_ids).count()
partidas_descargadas = Partida.objects.filter(pedimento_id__in=pedimento_ids, descargado=True).count()
partidas_pendientes = partidas_total - partidas_descargadas
# Indicadores de cumplimiento
cumplimiento_pedimentos = (pedimentos_completos / pedimentos_total * 100) if pedimentos_total else 0
cumplimiento_acuse_coves = (acuse_coves_procesados / coves_total * 100) if coves_total else 0
cumplimiento_coves = (coves_procesados / coves_total * 100) if coves_total else 0
cumplimiento_edocs = (edocs_descargados / edocs_total * 100) if edocs_total else 0
cumplimiento_acuses = (acuse_descargados / edocs_total * 100) if edocs_total else 0
cumplimiento_partidas = (partidas_descargadas / partidas_total * 100) if partidas_total else 0
return Response({
"pedimentos": {
"total": pedimentos_total,
"completos": pedimentos_completos,
"pendientes": pedimentos_pendientes,
"cumplimiento": round(cumplimiento_pedimentos, 2)
},
"coves": {
"total": coves_total,
"coves_procesados": coves_procesados,
"acuse_coves_procesados": acuse_coves_procesados,
"coves_pendientes": coves_pendientes,
"acuse_coves_pendientes": acuse_coves_pendientes,
"coves_cumplimiento": round(cumplimiento_coves, 2),
"acuse_coves_cumplimiento": round(cumplimiento_acuse_coves, 2)
},
"edocuments": {
"total": edocs_total,
"edocs_descargados": edocs_descargados,
"edocs_pendientes": edocs_pendientes,
"acuse_descargados": acuse_descargados,
"acuses_pendientes": acuses_pendientes,
"edocs_cumplimiento": round(cumplimiento_edocs, 2),
"acuses_cumplimiento": round(cumplimiento_acuses, 2)
},
"remesas": {
"total": remesas_total
},
"documentos": {
"descargados": documentos_descargados
},
"partidas": {
"total": partidas_total,
"partidas_descargadas": partidas_descargadas,
"partidas_pendientes": partidas_pendientes,
"cumplimiento": round(cumplimiento_partidas, 2)
}
})

121
api/reports/views_stats.py Normal file
View File

@@ -0,0 +1,121 @@
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from django.db.models import Count, Sum
from django.db.models.functions import TruncDay, TruncWeek, TruncMonth
from django.db.models import Q
from api.customs.models import Pedimento, Cove, EDocument
from api.record.models import Document
from datetime import datetime, timedelta
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def documentos_por_fecha(request):
"""
Endpoint para obtener datos agregados de documentos por fecha
Parámetros:
- start_date: fecha inicial (YYYY-MM-DD)
- end_date: fecha final (YYYY-MM-DD)
- periodo: 'dia', 'semana', 'mes' (default: dia)
- organizacion_id: ID de la organización
- tipo_documento: ID del tipo de documento
"""
# Obtener parámetros
start_date = request.query_params.get('start_date')
end_date = request.query_params.get('end_date')
periodo = request.query_params.get('periodo', 'dia') # dia, semana, mes
org_id = request.query_params.get('organizacion_id')
tipo_doc = request.query_params.get('tipo_documento')
# Si no hay fechas, usar último mes
if not start_date:
start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
if not end_date:
end_date = datetime.now().strftime('%Y-%m-%d')
# Construir filtros base para cada modelo
doc_filters = {}
cove_filters = {}
edoc_filters = {}
if org_id:
doc_filters['organizacion_id'] = org_id
cove_filters['organizacion_id'] = org_id
edoc_filters['organizacion_id'] = org_id
if start_date:
doc_filters['created_at__gte'] = start_date
cove_filters['created_at__gte'] = start_date
edoc_filters['created_at__gte'] = start_date
if end_date:
doc_filters['created_at__lte'] = end_date
cove_filters['created_at__lte'] = end_date
edoc_filters['created_at__lte'] = end_date
if tipo_doc:
doc_filters['document_type__id'] = tipo_doc # Solo para Document
# Obtener datos agregados según el periodo
if periodo == 'dia':
trunc_func = TruncDay
elif periodo == 'semana':
trunc_func = TruncWeek
elif periodo == 'mes':
trunc_func = TruncMonth
else:
trunc_func = TruncDay
# Obtener estadísticas de documentos
documentos = Document.objects.filter(**doc_filters).annotate(
fecha=trunc_func('created_at')
).values('fecha').annotate(
total=Count('id'),
size_total=Sum('size')
).order_by('fecha')
# Obtener estadísticas de coves
coves = Cove.objects.filter(**cove_filters).annotate(
fecha=trunc_func('created_at')
).values('fecha').annotate(
total=Count('id'),
descargados=Count('id', filter=Q(cove_descargado=True)),
acuses=Count('id', filter=Q(acuse_cove_descargado=True))
).order_by('fecha')
# Obtener estadísticas de edocs
edocs = EDocument.objects.filter(**edoc_filters).annotate(
fecha=trunc_func('created_at')
).values('fecha').annotate(
total=Count('id'),
descargados=Count('id', filter=Q(edocument_descargado=True)),
acuses=Count('id', filter=Q(acuse_descargado=True))
).order_by('fecha')
# Calcular totales
totales = {
'documentos': {
'total': sum(d['total'] for d in documentos),
'size_total': sum(d['size_total'] for d in documentos)
},
'coves': {
'total': sum(c['total'] for c in coves),
'descargados': sum(c['descargados'] for c in coves),
'acuses': sum(c['acuses'] for c in coves)
},
'edocs': {
'total': sum(e['total'] for e in edocs),
'descargados': sum(e['descargados'] for e in edocs),
'acuses': sum(e['acuses'] for e in edocs)
}
}
return Response({
'documentos': list(documentos),
'coves': list(coves),
'edocs': list(edocs),
'totales': totales,
'periodo': periodo,
'filtros': {
'start_date': start_date,
'end_date': end_date,
'organizacion_id': org_id,
'tipo_documento': tipo_doc
}
})

169
api/reports/views_table.py Normal file
View File

@@ -0,0 +1,169 @@
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.pagination import PageNumberPagination
from django.db.models import Value, CharField, Q, Exists, OuterRef, Subquery
from django.db.models.functions import Cast
from datetime import datetime, timedelta
from api.customs.models import Pedimento, Cove, EDocument, Partida
class CustomPagination(PageNumberPagination):
page_size = 50
page_size_query_param = 'page_size'
max_page_size = 1000
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def table_summary(request):
"""
Endpoint que devuelve un resumen tabulado de pedimentos y sus documentos asociados.
"""
org_id = request.query_params.get('organizacion_id')
if not org_id:
return Response({"error": "organizacion_id es requerido"}, status=400)
# Obtener filtros de query params
tipo_documento = request.query_params.get('tipo_documento')
rfc = request.query_params.get('contribuyente__rfc')
fecha_pago_gte = request.query_params.get('fecha_pago__gte')
fecha_pago_lte = request.query_params.get('fecha_pago__lte')
patente = request.query_params.get('patente')
aduana = request.query_params.get('aduana')
pedimento = request.query_params.get('pedimento')
pedimento_app = request.query_params.get('pedimento_app')
regimen = request.query_params.get('regimen')
tipo_operacion = request.query_params.get('tipo_operacion')
# Si no se proporcionan fechas, establecer un rango por defecto de los últimos 30 días
if not fecha_pago_gte and not fecha_pago_lte:
fecha_pago_lte = datetime.now().date()
fecha_pago_gte = fecha_pago_lte - timedelta(days=30)
# Construir filtros base para pedimentos
pedimentos_filters = Q(organizacion_id=org_id)
# Añadir filtros de fecha siempre para limitar el conjunto de datos
pedimentos_filters &= Q(fecha_pago__gte=fecha_pago_gte)
pedimentos_filters &= Q(fecha_pago__lte=fecha_pago_lte)
if rfc:
pedimentos_filters &= Q(contribuyente__rfc=rfc)
if patente:
pedimentos_filters &= Q(patente=patente)
if aduana:
pedimentos_filters &= Q(aduana=aduana)
if pedimento:
pedimentos_filters &= Q(pedimento=pedimento)
if pedimento_app:
pedimentos_filters &= Q(pedimento_app=pedimento_app)
if regimen:
pedimentos_filters &= Q(regimen=regimen)
if tipo_operacion:
pedimentos_filters &= Q(tipo_operacion_id=tipo_operacion)
# Query base desde pedimentos con todas las subconsultas necesarias
resultado = Pedimento.objects.filter(pedimentos_filters).values(
'aduana',
'patente',
'regimen',
'pedimento',
'pedimento_app',
'clave_pedimento',
'tipo_operacion_id',
'contribuyente_id'
)
# Generar queries según el tipo de documento solicitado
queries = []
if not tipo_documento or tipo_documento == 'ACUSE COVE':
coves_acuse = resultado.annotate(
identificador=Cast(Subquery(
Cove.objects.filter(pedimento_id=OuterRef('id')).values('numero_cove')[:1]
), CharField()),
documento=Value('ACUSE COVE', CharField()),
estado=Cast(Subquery(
Cove.objects.filter(pedimento_id=OuterRef('id')).values('acuse_cove_descargado')[:1]
), CharField())
).filter(identificador__isnull=False)
queries.append(coves_acuse)
if not tipo_documento or tipo_documento == 'COVE':
coves = resultado.annotate(
identificador=Cast(Subquery(
Cove.objects.filter(pedimento_id=OuterRef('id')).values('numero_cove')[:1]
), CharField()),
documento=Value('COVE', CharField()),
estado=Cast(Subquery(
Cove.objects.filter(pedimento_id=OuterRef('id')).values('cove_descargado')[:1]
), CharField())
).filter(identificador__isnull=False)
queries.append(coves)
if not tipo_documento or tipo_documento == 'ACUSE EDOC':
edocs_acuse = resultado.annotate(
identificador=Cast(Subquery(
EDocument.objects.filter(pedimento_id=OuterRef('id')).values('numero_edocument')[:1]
), CharField()),
documento=Value('ACUSE EDOC', CharField()),
estado=Cast(Subquery(
EDocument.objects.filter(pedimento_id=OuterRef('id')).values('acuse_descargado')[:1]
), CharField())
).filter(identificador__isnull=False)
queries.append(edocs_acuse)
if not tipo_documento or tipo_documento == 'EDOC':
edocs = resultado.annotate(
identificador=Cast(Subquery(
EDocument.objects.filter(pedimento_id=OuterRef('id')).values('numero_edocument')[:1]
), CharField()),
documento=Value('EDOC', CharField()),
estado=Cast(Subquery(
EDocument.objects.filter(pedimento_id=OuterRef('id')).values('edocument_descargado')[:1]
), CharField())
).filter(identificador__isnull=False)
queries.append(edocs)
if not tipo_documento or tipo_documento == 'PARTIDA':
partidas = resultado.annotate(
identificador=Cast(Subquery(
Partida.objects.filter(pedimento_id=OuterRef('id')).values('numero_partida')[:1]
), CharField()),
documento=Value('PARTIDA', CharField()),
estado=Cast(Subquery(
Partida.objects.filter(pedimento_id=OuterRef('id')).values('descargado')[:1]
), CharField())
).filter(identificador__isnull=False)
queries.append(partidas)
# Unir los resultados usando UNION ALL para mejor rendimiento
if not queries:
return Response([])
resultado_final = queries[0]
for query in queries[1:]:
resultado_final = resultado_final.union(query, all=True)
# Aplicar paginación
paginator = CustomPagination()
page = paginator.paginate_queryset(
resultado_final.order_by('pedimento', 'documento'),
request
)
return paginator.get_paginated_response({
"results": page,
"filtros_aplicados": {
"organizacion_id": org_id,
"tipo_documento": tipo_documento,
"contribuyente__rfc": rfc,
"fecha_pago__gte": fecha_pago_gte,
"fecha_pago__lte": fecha_pago_lte,
"patente": patente,
"aduana": aduana,
"pedimento": pedimento,
"pedimento_app": pedimento_app,
"regimen": regimen,
"tipo_operacion": tipo_operacion
}
})