Files
backend/api/customs/views_auditor.py
2026-05-18 11:51:30 -06:00

1880 lines
71 KiB
Python

import os
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from core.permissions import IsSuperUser, IsSameOrganizationDeveloper
from .tasks.auditoria import (
crear_partidas,
auditar_coves,
auditar_acuse_cove,
auditar_edocuments,
auditar_acuse,
auditar_remesas,
)
from .tasks.internal_services import auditar_pedimentos
from .tasks.microservice_v2 import procesar_pedimentos_completos
from api.customs.models import Pedimento
from api.organization.models import Organizacion
from api.record.models import Document
from .tasks.auditoria_xml import extraer_info_pedimento_xml
import tempfile
import os
from api.utils.storage_service import storage_service
import logging
import uuid
logger = logging.getLogger('api.customs.views_auditor')
def get_document_content(documento):
"""
Obtiene el contenido de un documento (MinIO o local).
Retorna el contenido como string o bytes.
"""
ruta = str(documento.archivo)
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp_path = tmp.name
try:
success = storage_service.download_file(ruta, tmp_path)
if not success:
return None
with open(tmp_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
return content
finally:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
def get_document_path(documento):
"""
Obtiene la ruta temporal de un documento para lectura.
Retorna la ruta del archivo temporal descargado.
"""
ruta = str(documento.archivo)
tmp = tempfile.NamedTemporaryFile(delete=False)
tmp_path = tmp.name
tmp.close()
success = storage_service.download_file(ruta, tmp_path)
if not success:
return None
return tmp_path
@swagger_auto_schema(
method='post',
operation_description="Crea partidas faltantes para todos los pedimentos de una organización e informa cuáles están descargadas",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
},
required=['organizacion_id']
),
responses={
202: openapi.Response('Tarea iniciada — usar task_id para consultar resultado'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def crear_partidas_organizacion(request):
organizacion_id = request.data.get('organizacion_id')
if not organizacion_id:
return Response({'error': 'Debe proporcionar organizacion_id'}, status=status.HTTP_400_BAD_REQUEST)
user = request.user
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
return Response({'error': 'No tiene permisos para esta organización'}, status=status.HTTP_403_FORBIDDEN)
task = crear_partidas.delay(organizacion_id)
return Response({
'organizacion_id': organizacion_id,
'auditoria': 'partidas',
'task_id': task.id,
'mensaje': f'Creación de partidas iniciada. Consulta el resultado en GET /api/tasks/status/{task.id}/',
}, status=status.HTTP_202_ACCEPTED)
@swagger_auto_schema(
method='post',
operation_description="Crea partidas faltantes para un pedimento e informa cuáles están descargadas",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Resultado de creación y estado de descarga de partidas'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def crear_partidas_pedimento(request):
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
try:
pedimento = Pedimento.objects.prefetch_related('partidas').select_related('organizacion').get(id=pedimento_id)
except Pedimento.DoesNotExist:
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
if not pedimento.numero_partidas or pedimento.numero_partidas <= 0:
return Response({
'pedimento_id': str(pedimento_id),
'pedimento': pedimento.pedimento,
'estado': 'sin_datos',
'mensaje': f'El pedimento no tiene número de partidas definido (numero_partidas={pedimento.numero_partidas})',
}, status=status.HTTP_200_OK)
# Crear partidas faltantes (get_or_create por número)
from api.customs.models import Partida
partidas_creadas = 0
for i in range(1, pedimento.numero_partidas + 1):
_, created = Partida.objects.get_or_create(
pedimento=pedimento,
numero_partida=i,
defaults={'organizacion_id': pedimento.organizacion_id}
)
if created:
partidas_creadas += 1
# Evaluar estado de descarga sobre el conjunto completo
partidas = list(pedimento.partidas.order_by('numero_partida'))
total = len(partidas)
descargadas = [p.numero_partida for p in partidas if p.descargado]
no_descargadas = [p.numero_partida for p in partidas if not p.descargado]
if not no_descargadas:
estado = 'completado'
mensaje = f'Todas las partidas están descargadas ({total}/{total})'
else:
estado = 'en_proceso'
mensaje = f'{len(no_descargadas)} de {total} partidas pendientes de descarga'
return Response({
'pedimento_id': str(pedimento_id),
'pedimento': pedimento.pedimento,
'estado': estado,
'mensaje': mensaje,
'resumen': {
'total_partidas': total,
'partidas_creadas_ahora': partidas_creadas,
'descargadas': len(descargadas),
'no_descargadas': len(no_descargadas),
},
'no_descargadas': no_descargadas,
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita todos los pedimentos de una organización",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
},
required=['organizacion_id']
),
responses={
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_pedimentos_endpoint(request):
"""
Inicia una tarea de auditoría para todos los pedimentos de una organización.
Verifica todos los documentos y datos asociados a los pedimentos.
"""
organizacion_id = request.data.get('organizacion_id')
if not organizacion_id:
return Response(
{'error': 'Debe proporcionar organizacion_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos
user = request.user
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
return Response(
{'error': 'No tiene permisos para esta organización'},
status=status.HTTP_403_FORBIDDEN
)
# Ejecutar la tarea de auditoría
task = auditar_pedimentos.delay(organizacion_id)
message = f"Auditoría iniciada para la organización {organizacion_id}"
return Response({
'message': message,
'task_id': task.id
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita el estado de procesamiento de remesa de un pedimento específico",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento a auditar')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Estado de procesamiento de remesa del pedimento'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_procesamiento_remesa_pedimento_endpoint(request):
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
try:
pedimento = Pedimento.objects.select_related('organizacion').prefetch_related('coves').get(id=pedimento_id)
except Pedimento.DoesNotExist:
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
if not pedimento.remesas:
return Response({
'pedimento_id': str(pedimento_id),
'pedimento': pedimento.pedimento,
'tiene_remesas': False,
'estado': 'completado',
'mensaje': 'El pedimento no tiene remesas para procesar',
}, status=status.HTTP_200_OK)
tiene_documento_remesa = pedimento.documents.filter(document_type=3).exists()
coves = list(pedimento.coves.all())
total_coves = len(coves)
if not tiene_documento_remesa:
estado = 'en_proceso'
mensaje = 'Documento XML de remesa aún no descargado'
elif total_coves == 0:
estado = 'en_proceso'
mensaje = 'Documento de remesa disponible pero no se han creado COVEs'
else:
estado = 'completado'
mensaje = f'Remesa procesada — {total_coves} COVE(s) registrados'
return Response({
'pedimento_id': str(pedimento_id),
'pedimento': pedimento.pedimento,
'tiene_remesas': True,
'estado': estado,
'mensaje': mensaje,
'resumen': {
'tiene_documento_remesa': tiene_documento_remesa,
'total_coves_registrados': total_coves,
},
'coves': [c.numero_cove for c in coves],
}, status=status.HTTP_200_OK)
def _lanzar_auditoria_organizacion(request, task_fn, label):
"""Helper compartido para los 4 endpoints de auditoría masiva por organización."""
organizacion_id = request.data.get('organizacion_id')
if not organizacion_id:
return Response({'error': 'Debe proporcionar organizacion_id'}, status=status.HTTP_400_BAD_REQUEST)
user = request.user
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
return Response({'error': 'No tiene permisos para esta organización'}, status=status.HTTP_403_FORBIDDEN)
task = task_fn.delay(organizacion_id)
return Response({
'organizacion_id': organizacion_id,
'auditoria': label,
'task_id': task.id,
'mensaje': f'Auditoría de {label} iniciada. Consulta el resultado en GET /api/tasks/status/{task.id}/',
}, status=status.HTTP_202_ACCEPTED)
@swagger_auto_schema(
method='post',
operation_description="Audita el estado de descarga de COVEs de todos los pedimentos de una organización",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING)},
required=['organizacion_id']
),
responses={
202: openapi.Response('Tarea iniciada — usar task_id para consultar resultado'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_coves_endpoint(request):
return _lanzar_auditoria_organizacion(request, auditar_coves, 'COVEs')
@swagger_auto_schema(
method='post',
operation_description="Audita el estado de descarga de acuses de COVE de todos los pedimentos de una organización",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING)},
required=['organizacion_id']
),
responses={
202: openapi.Response('Tarea iniciada — usar task_id para consultar resultado'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_acuse_cove_endpoint(request):
return _lanzar_auditoria_organizacion(request, auditar_acuse_cove, 'acuses de COVE')
@swagger_auto_schema(
method='post',
operation_description="Audita el estado de descarga de EDocuments de todos los pedimentos de una organización",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING)},
required=['organizacion_id']
),
responses={
202: openapi.Response('Tarea iniciada — usar task_id para consultar resultado'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_edocuments_endpoint(request):
return _lanzar_auditoria_organizacion(request, auditar_edocuments, 'EDocuments')
@swagger_auto_schema(
method='post',
operation_description="Audita el estado de descarga de acuses de EDocument de todos los pedimentos de una organización",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING)},
required=['organizacion_id']
),
responses={
202: openapi.Response('Tarea iniciada — usar task_id para consultar resultado'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_acuse_endpoint(request):
return _lanzar_auditoria_organizacion(request, auditar_acuse, 'acuses de EDocument')
@swagger_auto_schema(
method='post',
operation_description="Audita el estado de descarga de remesas de todos los pedimentos de una organización",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING)},
required=['organizacion_id']
),
responses={
202: openapi.Response('Tarea iniciada — usar task_id para consultar resultado'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_remesas_endpoint(request):
return _lanzar_auditoria_organizacion(request, auditar_remesas, 'remesas')
@swagger_auto_schema(
method='post',
operation_description="Audita el estado de descarga de COVEs de un pedimento específico",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Estado de descarga de COVEs del pedimento'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def auditar_cove_pedimento_endpoint(request):
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
try:
pedimento = Pedimento.objects.select_related('organizacion').prefetch_related('coves').get(id=pedimento_id)
except Pedimento.DoesNotExist:
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
coves = list(pedimento.coves.all())
total = len(coves)
descargados = sum(1 for c in coves if c.cove_descargado)
pendientes = [c.numero_cove for c in coves if not c.cove_descargado]
if total == 0:
nuevo_estado = 3
mensaje = 'El pedimento no tiene COVEs registrados'
elif descargados == total:
nuevo_estado = 3
mensaje = 'Todos los COVEs están descargados'
else:
nuevo_estado = 4
mensaje = f'{total - descargados} de {total} COVEs pendientes de descarga'
from api.customs.tasks.auditoria import modificar_estado_procesamiento
modificar_estado_procesamiento(pedimento, servicio_id=8, nuevo_estado=nuevo_estado)
return Response({
'pedimento_id': str(pedimento_id),
'estado': 'completado' if nuevo_estado == 3 else 'en_proceso',
'mensaje': mensaje,
'resumen': {
'total_coves': total,
'coves_descargados': descargados,
},
'pendientes': pendientes,
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita el estado de descarga de acuses de COVE de un pedimento específico",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Estado de descarga de acuses de COVE del pedimento'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def auditar_acuse_cove_pedimento_endpoint(request):
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
try:
pedimento = Pedimento.objects.select_related('organizacion').prefetch_related('coves').get(id=pedimento_id)
except Pedimento.DoesNotExist:
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
coves = list(pedimento.coves.all())
total = len(coves)
descargados = sum(1 for c in coves if c.acuse_cove_descargado)
pendientes = [c.numero_cove for c in coves if not c.acuse_cove_descargado]
if total == 0:
nuevo_estado = 3
mensaje = 'El pedimento no tiene COVEs registrados, no hay acuses que auditar'
elif descargados == total:
nuevo_estado = 3
mensaje = 'Todos los acuses de COVE están descargados'
else:
nuevo_estado = 4
mensaje = f'{total - descargados} de {total} acuses de COVE pendientes de descarga'
from api.customs.tasks.auditoria import modificar_estado_procesamiento
modificar_estado_procesamiento(pedimento, servicio_id=9, nuevo_estado=nuevo_estado)
return Response({
'pedimento_id': str(pedimento_id),
'estado': 'completado' if nuevo_estado == 3 else 'en_proceso',
'mensaje': mensaje,
'resumen': {
'total_coves': total,
'acuses_descargados': descargados,
},
'pendientes': pendientes,
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita el estado de descarga de EDocuments de un pedimento específico",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Estado de descarga de EDocuments del pedimento'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def auditar_edocument_pedimento_endpoint(request):
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
try:
pedimento = Pedimento.objects.select_related('organizacion').prefetch_related('documentos').get(id=pedimento_id)
except Pedimento.DoesNotExist:
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
edocuments = list(pedimento.documentos.all())
total = len(edocuments)
descargados = sum(1 for d in edocuments if d.edocument_descargado)
pendientes = [d.numero_edocument for d in edocuments if not d.edocument_descargado]
if total == 0:
nuevo_estado = 3
mensaje = 'El pedimento no tiene EDocuments registrados'
elif descargados == total:
nuevo_estado = 3
mensaje = 'Todos los EDocuments están descargados'
else:
nuevo_estado = 4
mensaje = f'{total - descargados} de {total} EDocuments pendientes de descarga'
from api.customs.tasks.auditoria import modificar_estado_procesamiento
modificar_estado_procesamiento(pedimento, servicio_id=7, nuevo_estado=nuevo_estado)
return Response({
'pedimento_id': str(pedimento_id),
'estado': 'completado' if nuevo_estado == 3 else 'en_proceso',
'mensaje': mensaje,
'resumen': {
'total_edocuments': total,
'edocuments_descargados': descargados,
},
'pendientes': pendientes,
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita el estado de descarga de acuses de EDocument de un pedimento específico",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Estado de descarga de acuses de EDocument del pedimento'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def auditar_acuse_pedimento_endpoint(request):
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
try:
pedimento = Pedimento.objects.select_related('organizacion').prefetch_related('documentos').get(id=pedimento_id)
except Pedimento.DoesNotExist:
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
edocuments = list(pedimento.documentos.all())
total = len(edocuments)
descargados = sum(1 for d in edocuments if d.acuse_descargado)
pendientes = [d.numero_edocument for d in edocuments if not d.acuse_descargado]
if total == 0:
nuevo_estado = 3
mensaje = 'El pedimento no tiene EDocuments registrados, no hay acuses que auditar'
elif descargados == total:
nuevo_estado = 3
mensaje = 'Todos los acuses de EDocument están descargados'
else:
nuevo_estado = 4
mensaje = f'{total - descargados} de {total} acuses de EDocument pendientes de descarga'
from api.customs.tasks.auditoria import modificar_estado_procesamiento
modificar_estado_procesamiento(pedimento, servicio_id=6, nuevo_estado=nuevo_estado)
return Response({
'pedimento_id': str(pedimento_id),
'estado': 'completado' if nuevo_estado == 3 else 'en_proceso',
'mensaje': mensaje,
'resumen': {
'total_edocuments': total,
'acuses_descargados': descargados,
},
'pendientes': pendientes,
}, status=status.HTTP_200_OK)
### Procesamiento de pedimentos ###
@swagger_auto_schema(
method='post',
operation_description="Procesamiento de todos los pedimentos de todas las organizaciones",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={}
),
responses={
200: openapi.Response('Tarea de procesamiento iniciada correctamente'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('No se encontraron organizaciones')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_procesar_pedimentos_organizacion(request):
"""
Inicia una tarea de procesamiento para todos los pedimentos de todas las organizaciones.
Solo usuarios administradores pueden ejecutar esta función.
"""
# Validar permisos (solo superusuarios pueden procesar todas las organizaciones)
user = request.user
if not user.is_superuser:
return Response(
{'error': 'Solo los superusuarios pueden procesar todas las organizaciones'},
status=status.HTTP_403_FORBIDDEN
)
organizaciones = Organizacion.objects.all()
if not organizaciones.exists():
return Response(
{'error': 'No se encontraron organizaciones'},
status=status.HTTP_404_NOT_FOUND
)
# Lista para recopilar todos los task_ids y detalles
tasks_iniciadas = []
for organizacion in organizaciones:
organizacion_id = str(organizacion.id)
print(f"Procesando organización: {organizacion_id} - {organizacion.nombre}")
# Ejecutar la tarea de procesamiento
task = procesar_pedimentos_completos.delay(organizacion_id)
# Agregar información de la tarea a la lista
tasks_iniciadas.append({
'organizacion_id': organizacion_id,
'organizacion_nombre': organizacion.nombre,
'task_id': task.id
})
# Crear mensaje general y lista de task_ids
total_organizaciones = len(tasks_iniciadas)
task_ids = [task['task_id'] for task in tasks_iniciadas]
message = f"Procesamiento de pedimentos iniciado para {total_organizaciones} organización(es)"
return Response({
'message': message,
'total_organizaciones': total_organizaciones,
'task_ids': task_ids,
'tasks_detalle': tasks_iniciadas
}, status=status.HTTP_200_OK)
### Fin Procesamiento de pedimentos ###
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_peticion_respuesta_pedimento_completo(request):
"""
Backend endpoint para obtener las peticiones y respuestas asociadas a un pedimento.
"""
pedimento_id = request.data.get('pedimento_id')
vista_auditar = request.data.get('vista', 'desconocido') # 'completa' o 'resumen'
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
tipo_documento_peticion = None
tipo_documento_respuesta = None
vista = 'desconocido'
if vista_auditar == 'pc':
tipo_documento_peticion = 13
tipo_documento_respuesta = 14
vista = 'Pedimento Completo'
elif vista_auditar == 'rm':
tipo_documento_peticion = 15
tipo_documento_respuesta = 16
vista = 'Remesa'
elif vista_auditar == 'pt':
tipo_documento_peticion = 17
tipo_documento_respuesta = 18
vista = 'Partidas'
elif vista_auditar == 'cove':
tipo_documento_peticion = 19
tipo_documento_respuesta = 20
vista = 'COVEs'
elif vista_auditar == 'edoc':
tipo_documento_peticion = 21
tipo_documento_respuesta = 22
vista = 'Edocuments'
elif vista_auditar == 'ac_cove':
tipo_documento_peticion = 23
tipo_documento_respuesta = 24
vista = 'Acuses COVEs'
elif vista_auditar == 'ac':
tipo_documento_peticion = 25
tipo_documento_respuesta = 26
vista = 'Acuses'
if not tipo_documento_peticion and not tipo_documento_respuesta:
return Response(
{'error': 'Tipo de vista no reconocido para auditoría de pedimento'},
status=status.HTTP_400_BAD_REQUEST
)
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
#archivo__icontains= f"VU_PT_{pedimento_app}_REQUEST.xml",
document_type= tipo_documento_peticion, # Tipo de documento para petición de partidas
organizacion=pedimento.organizacion,
)
documentos_respuesta = Document.objects.filter(
pedimento=pedimento,
#archivo__icontains= f"VU_PT_{pedimento_app}_REQUEST.xml",
document_type= tipo_documento_respuesta, # Tipo de documento para respuesta de partidas
organizacion=pedimento.organizacion,
)
if not documentos_peticion and not documentos_respuesta:
return Response(
{'error': f'Registro de documentos de petición y respuesta de {vista} no encontrado(s)'},
status=status.HTTP_404_NOT_FOUND
)
# Crear lista con todos los documentos encontrados
documentos_lista_peticiones = []
for documento in documentos_peticion:
nombre_archivo = os.path.basename(documento.archivo.name)
ruta_temporal = get_document_path(documento)
documentos_lista_peticiones.append({
'id': str(documento.id),
'archivo': ruta_temporal,
'archivo_original': nombre_archivo,
'extension': documento.extension,
'size': documento.size,
'tipo': documento.document_type.descripcion if documento.document_type else 'Desconocido',
'creado_en': documento.created_at,
'actualizado_en': documento.updated_at
})
# Crear lista vacía para respuestas (por si se requiere en el futuro)
documentos_lista_respuestas = []
for documento in documentos_respuesta:
nombre_archivo = os.path.basename(documento.archivo.name)
documentos_lista_respuestas.append({
'id': str(documento.id),
'archivo': documento.archivo.path,
'archivo_original': nombre_archivo,
'extension': documento.extension,
'size': documento.size,
'tipo': documento.document_type.descripcion if documento.document_type else 'Desconocido',
'creado_en': documento.created_at,
'actualizado_en': documento.updated_at
})
# return Response({
# 'id': pedimento.id,
# 'pedimento_app': pedimento_app,
# 'contribuyente': getattr(pedimento.contribuyente, 'rfc', None),
# 'organizacion': getattr(pedimento.organizacion, 'nombre', None),
# 'creado': pedimento.created_at
# }, status=status.HTTP_200_OK)
return Response({
'id': str(pedimento.id),
'pedimento_id': str(pedimento.id),
'pedimento': pedimento.pedimento,
'pedimento_app': pedimento_app,
'contribuyente': getattr(pedimento.contribuyente, 'rfc', None),
'organizacion': getattr(pedimento.organizacion, 'nombre', None),
'creado': pedimento.created_at,
'total_documentos_peticiones': len(documentos_lista_peticiones),
'total_documentos_respuestas': len(documentos_lista_respuestas),
'documentos_peticiones': documentos_lista_peticiones,
'documentos_respuestas': documentos_lista_respuestas
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_pedimento_vu(request):
"""
Backend endpoint para obtener las peticiones y respuestas asociadas a un pedimento.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_PC_{pedimento_app}_REQUEST.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de petición no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_pedimento_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a un pedimento.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_PC_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_remesa_vu(request):
"""
Backend endpoint para obtener las peticiones asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_RM_{pedimento_app}_REQUEST.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de petición de remesa no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_remesa_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_RM_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta de remesa no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_partidas_vu(request):
"""
Backend endpoint para obtener las peticiones asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
#archivo__icontains= f"VU_PT_{pedimento_app}_REQUEST.xml",
document_type= 17, # Tipo de documento para petición de partidas
organizacion=pedimento.organizacion,
)
if not documentos_peticion:
return Response(
{'error': 'Documento de petición de partidas no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
# Crear lista con todos los documentos encontrados
documentos_lista = []
for documento in documentos_peticion:
nombre_archivo = os.path.basename(documento.archivo.name)
documentos_lista.append({
'id': str(documento.id),
'archivo': documento.archivo.path,
'archivo_original': nombre_archivo,
'extension': documento.extension,
'size': documento.size,
'tipo': documento.document_type.descripcion if documento.document_type else 'Desconocido',
'creado_en': documento.created_at,
'actualizado_en': documento.updated_at
})
# nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
# return Response({
# 'id': documentos_peticion.id,
# 'archivo': documentos_peticion.archivo.path,
# 'archivo_original': nombre_archivo,
# 'extension': documentos_peticion.extension,
# 'size': documentos_peticion.size,
# 'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
# }, status=status.HTTP_200_OK)
return Response({
'pedimento_id': str(pedimento.id),
'pedimento': pedimento.pedimento,
'pedimento_app': pedimento.pedimento_app,
'total_documentos': len(documentos_lista),
'documentos': documentos_lista
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_partidas_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_PT_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta de partidas no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_acuse_vu(request):
"""
Backend endpoint para obtener las peticiones asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_AC_{pedimento_app}_REQUEST.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de petición de acuse no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_acuse_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_AC_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta de acuse no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_cove_vu(request):
"""
Backend endpoint para obtener las peticiones asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_COVE_{pedimento_app}_REQUEST.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de petición de cove no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_cove_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_COVE_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta de cove no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_acuse_cove_vu(request):
"""
Backend endpoint para obtener las peticiones asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_AC_COVE_{pedimento_app}_REQUEST.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de petición de acuse cove no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_acuse_cove_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_AC_COVE_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta de acuse cove no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_edocument_vu(request):
"""
Backend endpoint para obtener las peticiones asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_ED_{pedimento_app}_REQUEST.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de petición de e-document no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_edocument_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_ED_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta de e-document no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita un pedimento específico verificando su XML y extrayendo información",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Auditoría completada'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_pedimento_endpoint(request):
"""
Audita un pedimento específico verificando si existe su XML y extrayendo información.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
try:
# Validar permisos y existencia del pedimento
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
# Buscar documentos XML del pedimento
documentos_xml = Document.objects.filter(
pedimento=pedimento,
archivo__endswith='.xml',
organizacion=pedimento.organizacion
)
if not documentos_xml.exists():
return Response({
'pedimento_id': str(pedimento_id),
'pedimento': pedimento.pedimento,
'pedimento_app': pedimento.pedimento_app,
'archivos_xml_encontrados': 0,
'mensaje': 'No se encontraron archivos XML para este pedimento',
'auditoria_completa': False
}, status=status.HTTP_200_OK)
# Lista para almacenar información de cada XML
xmls_analizados = []
informacion_extraida = []
for documento in documentos_xml:
print(f"documento >>>> {documento}")
logger.info(f"documento >>>> {documento}")
try:
xml_info = {
'documento_id': str(documento.id),
'nombre_archivo': os.path.basename(str(documento.archivo)),
'tamanio': documento.size,
'extension': documento.extension,
'tipo_documento': documento.document_type.descripcion if documento.document_type else 'Desconocido'
}
xml_content = get_document_content(documento)
if xml_content is None:
xml_info['error_lectura'] = 'No se pudo descargar el archivo'
else:
info_pedimento = extraer_info_pedimento_xml(xml_content)
if info_pedimento:
xml_info['informacion_extraida'] = info_pedimento
informacion_extraida.append(info_pedimento)
# Actualizar el pedimento con la información encontrada si es necesario
actualizar_info_pedimento(pedimento, info_pedimento)
xmls_analizados.append(xml_info)
except Exception as e:
xmls_analizados.append({
'documento_id': str(documento.id),
'nombre_archivo': os.path.basename(str(documento.archivo)),
'error': f'Error procesando archivo: {str(e)}'
})
response_data = {
'pedimento_id': str(pedimento_id),
'pedimento': pedimento.pedimento,
'pedimento_app': pedimento.pedimento_app,
'archivos_xml_encontrados': len(xmls_analizados),
'xmls_analizados': xmls_analizados,
'informacion_extraida': informacion_extraida,
'auditoria_completa': True,
'mensaje': f'Auditoría completada para el pedimento {pedimento.pedimento}'
}
return Response(response_data, status=status.HTTP_200_OK)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
except Exception as e:
return Response(
{'error': f'Error en la auditoría: {str(e)}'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def actualizar_info_pedimento(pedimento, info_xml):
"""
Actualiza la información del pedimento con los datos extraídos del XML.
"""
try:
actualizado = False
# Actualizar información del pedimento si está en el XML y no está ya llena
if 'numero_operacion' in info_xml and not pedimento.numero_operacion:
pedimento.numero_operacion = info_xml['numero_operacion']
actualizado = True
# Número de partidas
if 'numero_partidas' in info_xml and not pedimento.numero_partidas:
pedimento.numero_partidas = info_xml['numero_partidas']
actualizado = True
# Clave del pedimento
if 'clave_pedimento' in info_xml and not pedimento.clave_pedimento:
pedimento.clave_pedimento = info_xml['clave_pedimento']
actualizado = True
# Aduana (patente)
if 'aduana_clave' in info_xml and not pedimento.aduana:
pedimento.aduana = info_xml['aduana_clave']
actualizado = True
# RFC Agente Aduanal
if 'rfc_agente_aduanal' in info_xml and not pedimento.agente_aduanal:
pedimento.agente_aduanal = info_xml['rfc_agente_aduanal']
actualizado = True
# CURP Apoderado
if 'curp_apoderado' in info_xml and not pedimento.curp_apoderado:
pedimento.curp_apoderado = info_xml['curp_apoderado']
actualizado = True
# Fecha de pago
if 'fecha_pago' in info_xml and not pedimento.fecha_pago:
try:
# Convertir formato de fecha (ej: "2024-02-15-06:00")
fecha_str = info_xml['fecha_pago']
# Extraer solo la parte de la fecha (antes del primer '-')
fecha_parts = fecha_str.split('-')
if len(fecha_parts) >= 3:
fecha_simple = f"{fecha_parts[0]}-{fecha_parts[1]}-{fecha_parts[2]}"
from datetime import datetime
fecha_obj = datetime.strptime(fecha_simple, '%Y-%m-%d').date()
pedimento.fecha_pago = fecha_obj
actualizado = True
except (ValueError, TypeError, IndexError):
pass
# Importe total (valor en dólares)
if 'valor_dolares' in info_xml and not pedimento.importe_total:
try:
pedimento.importe_total = float(info_xml['valor_dolares'])
actualizado = True
except (ValueError, TypeError):
pass
if 'contribuyente_rfc' in info_xml and not pedimento.contribuyente:
try:
# Buscar o crear el importador
from api.customs.models import Importador
importador, created = Importador.objects.get_or_create(
rfc=info_xml['contribuyente_rfc'],
organizacion=pedimento.organizacion,
defaults={'nombre': info_xml.get('contribuyente_nombre', '')}
)
pedimento.contribuyente = importador
actualizado = True
except Exception:
pass
if 'tipo_operacion' in info_xml and not pedimento.tipo_operacion:
try:
from api.customs.models import TipoOperacion
tipo_op_obj, created = TipoOperacion.objects.get_or_create(
tipo=info_xml['tipo_operacion'],
defaults={'descripcion': info_xml['tipo_operacion_descripcion'][:100]} # Limitar a 100 caracteres
)
pedimento.tipo_operacion = tipo_op_obj
actualizado = True
except Exception:
pass
if actualizado:
pedimento.save()
return True
return False
except Exception:
return False