This commit is contained in:
2025-10-20 21:14:31 -06:00
parent 265f471ea6
commit 14c06cbf43
2 changed files with 135 additions and 93 deletions

View File

@@ -1,12 +1,12 @@
from django.urls import path, include
from .views import ExportModelView, dashboard_summary
# from .views_stats import documentos_por_fecha
# from .views_table import table_summary
from .views_table import table_summary
urlpatterns = [
path('exportmodel/', ExportModelView.as_view(), name='export-model'),
path('dashboard/summary/', dashboard_summary, name='dashboard-summary'),
#path('documentos-por-fecha/', documentos_por_fecha, name='documentos-por-fecha'),
#path('table-summary/', table_summary, name='table-summary'),
path('table-summary/', table_summary, name='table-summary'),
]

View File

@@ -15,9 +15,16 @@ class CustomPagination(PageNumberPagination):
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def table_summary(request):
# ...existing code...
# Si se solicita CSV, generar archivo y devolverlo (después de definir pedimentos_filters)
my_tags = ['Reportes']
"""
Endpoint que devuelve un resumen tabulado de pedimentos y sus documentos asociados.
"""
org_id = request.query_params.get('organizacion_id')
if not org_id:
return Response({"error": "organizacion_id es requerido"}, status=400)
@@ -40,12 +47,12 @@ def table_summary(request):
fecha_pago_gte = fecha_pago_lte - timedelta(days=30)
# Construir filtros base para pedimentos
pedimentos_filters = Q(organizacion_id=org_id)
# Añadir filtros de fecha siempre para limitar el conjunto de datos
pedimentos_filters &= Q(fecha_pago__gte=fecha_pago_gte)
pedimentos_filters &= Q(fecha_pago__lte=fecha_pago_lte)
pedimentos_filters = Q()
pedimentos_filters &= Q(organizacion_id=org_id)
if fecha_pago_gte:
pedimentos_filters &= Q(fecha_pago__gte=fecha_pago_gte)
if fecha_pago_lte:
pedimentos_filters &= Q(fecha_pago__lte=fecha_pago_lte)
if rfc:
pedimentos_filters &= Q(contribuyente__rfc=rfc)
if patente:
@@ -61,95 +68,130 @@ def table_summary(request):
if tipo_operacion:
pedimentos_filters &= Q(tipo_operacion_id=tipo_operacion)
# Query base desde pedimentos con todas las subconsultas necesarias
resultado = Pedimento.objects.filter(pedimentos_filters).values(
'aduana',
'patente',
'regimen',
'pedimento',
'pedimento_app',
'clave_pedimento',
'tipo_operacion_id',
'contribuyente_id'
)
# Si se solicita los últimos 100 registros actualizados
if request.query_params.get('ultimos') == '1':
pedimentos = Pedimento.objects.filter(pedimentos_filters).order_by('-updated_at')[:100]
else:
pedimentos = Pedimento.objects.filter(pedimentos_filters)
# Generar queries según el tipo de documento solicitado
queries = []
if not tipo_documento or tipo_documento == 'ACUSE COVE':
coves_acuse = resultado.annotate(
identificador=Cast(Subquery(
Cove.objects.filter(pedimento_id=OuterRef('id')).values('numero_cove')[:1]
), CharField()),
documento=Value('ACUSE COVE', CharField()),
estado=Cast(Subquery(
Cove.objects.filter(pedimento_id=OuterRef('id')).values('acuse_cove_descargado')[:1]
), CharField())
).filter(identificador__isnull=False)
queries.append(coves_acuse)
# Serializar pedimentos con documentos relacionados
results = []
for ped in pedimentos:
ped_dict = {
'aduana': ped.aduana,
'patente': ped.patente,
'regimen': ped.regimen,
'pedimento': ped.pedimento,
'pedimento_app': ped.pedimento_app,
'clave_pedimento': ped.clave_pedimento,
'tipo_operacion_id': ped.tipo_operacion_id,
'contribuyente_id': ped.contribuyente_id,
'documentos': []
}
# COVEs
for cove in Cove.objects.filter(pedimento=ped):
ped_dict['documentos'].append({
'tipo': 'COVE',
'numero': cove.numero_cove,
'estado': cove.cove_descargado,
'acuse_estado': cove.acuse_cove_descargado
})
# EDOCs
for edoc in EDocument.objects.filter(pedimento=ped):
ped_dict['documentos'].append({
'tipo': 'EDOC',
'numero': edoc.numero_edocument,
'estado': edoc.edocument_descargado,
'acuse_estado': edoc.acuse_descargado,
})
# PARTIDAS
for partida in Partida.objects.filter(pedimento=ped):
ped_dict['documentos'].append({
'tipo': 'PARTIDA',
'numero': partida.numero_partida,
'estado': partida.descargado
})
results.append(ped_dict)
if not tipo_documento or tipo_documento == 'COVE':
coves = resultado.annotate(
identificador=Cast(Subquery(
Cove.objects.filter(pedimento_id=OuterRef('id')).values('numero_cove')[:1]
), CharField()),
documento=Value('COVE', CharField()),
estado=Cast(Subquery(
Cove.objects.filter(pedimento_id=OuterRef('id')).values('cove_descargado')[:1]
), CharField())
).filter(identificador__isnull=False)
queries.append(coves)
if request.query_params.get('csv') == '1':
import csv
from django.http import HttpResponse
headers = [
'aduana', 'patente', 'regimen', 'pedimento', 'pedimento_app', 'clave_pedimento',
'tipo_operacion_id', 'contribuyente_id', 'tipo_documento', 'numero_documento', 'estado', 'acuse_estado'
]
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename=table_summary.csv'
writer = csv.writer(response)
writer.writerow(headers)
# Llenar filas
if request.query_params.get('ultimos') == '1':
pedimentos = Pedimento.objects.filter(pedimentos_filters).order_by('-updated_at')[:100]
else:
pedimentos = Pedimento.objects.filter(pedimentos_filters)
for ped in pedimentos:
# COVEs
for cove in Cove.objects.filter(pedimento=ped):
writer.writerow([
ped.aduana, ped.patente, ped.regimen, ped.pedimento, ped.pedimento_app,
ped.clave_pedimento, ped.tipo_operacion_id, ped.contribuyente_id,
'COVE', cove.numero_cove, cove.cove_descargado, cove.acuse_cove_descargado
])
# EDOCs
for edoc in EDocument.objects.filter(pedimento=ped):
writer.writerow([
ped.aduana, ped.patente, ped.regimen, ped.pedimento, ped.pedimento_app,
ped.clave_pedimento, ped.tipo_operacion_id, ped.contribuyente_id,
'EDOC', edoc.numero_edocument, edoc.edocument_descargado, edoc.acuse_descargado
])
# PARTIDAS
for partida in Partida.objects.filter(pedimento=ped):
writer.writerow([
ped.aduana, ped.patente, ped.regimen, ped.pedimento, ped.pedimento_app,
ped.clave_pedimento, ped.tipo_operacion_id, ped.contribuyente_id,
'PARTIDA', partida.numero_partida, partida.descargado, ''
])
return response
# Si se solicita Excel, generar archivo y devolverlo
if request.query_params.get('excel') == '1':
import openpyxl
from openpyxl.utils import get_column_letter
from django.http import HttpResponse
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Resumen"
# Encabezados
headers = [
'aduana', 'patente', 'regimen', 'pedimento', 'pedimento_app', 'clave_pedimento',
'tipo_operacion_id', 'contribuyente_id', 'tipo_documento', 'numero_documento', 'estado', 'acuse_estado'
]
ws.append(headers)
# Llenar filas
for ped in results:
for doc in ped['documentos']:
ws.append([
ped['aduana'], ped['patente'], ped['regimen'], ped['pedimento'], ped['pedimento_app'],
ped['clave_pedimento'], ped['tipo_operacion_id'], ped['contribuyente_id'],
doc.get('tipo'), doc.get('numero'), doc.get('estado'), doc.get('acuse_estado')
])
# Ajustar ancho de columnas
for i, col in enumerate(headers, 1):
ws.column_dimensions[get_column_letter(i)].width = max(12, len(col) + 2)
# Guardar en memoria y devolver como respuesta
from io import BytesIO
output = BytesIO()
wb.save(output)
output.seek(0)
response = HttpResponse(
output.read(),
content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
response['Content-Disposition'] = 'attachment; filename=table_summary.xlsx'
return response
if not tipo_documento or tipo_documento == 'ACUSE EDOC':
edocs_acuse = resultado.annotate(
identificador=Cast(Subquery(
EDocument.objects.filter(pedimento_id=OuterRef('id')).values('numero_edocument')[:1]
), CharField()),
documento=Value('ACUSE EDOC', CharField()),
estado=Cast(Subquery(
EDocument.objects.filter(pedimento_id=OuterRef('id')).values('acuse_descargado')[:1]
), CharField())
).filter(identificador__isnull=False)
queries.append(edocs_acuse)
if not tipo_documento or tipo_documento == 'EDOC':
edocs = resultado.annotate(
identificador=Cast(Subquery(
EDocument.objects.filter(pedimento_id=OuterRef('id')).values('numero_edocument')[:1]
), CharField()),
documento=Value('EDOC', CharField()),
estado=Cast(Subquery(
EDocument.objects.filter(pedimento_id=OuterRef('id')).values('edocument_descargado')[:1]
), CharField())
).filter(identificador__isnull=False)
queries.append(edocs)
if not tipo_documento or tipo_documento == 'PARTIDA':
partidas = resultado.annotate(
identificador=Cast(Subquery(
Partida.objects.filter(pedimento_id=OuterRef('id')).values('numero_partida')[:1]
), CharField()),
documento=Value('PARTIDA', CharField()),
estado=Cast(Subquery(
Partida.objects.filter(pedimento_id=OuterRef('id')).values('descargado')[:1]
), CharField())
).filter(identificador__isnull=False)
queries.append(partidas)
# Unir los resultados usando UNION ALL para mejor rendimiento
if not queries:
return Response([])
resultado_final = queries[0]
for query in queries[1:]:
resultado_final = resultado_final.union(query, all=True)
# Aplicar paginación
# Aplicar paginación manual sobre results
paginator = CustomPagination()
page = paginator.paginate_queryset(
resultado_final.order_by('pedimento', 'documento'),
request
)
page = paginator.paginate_queryset(results, request)
return paginator.get_paginated_response({
"results": page,