Files
backend/api/customs/views_auditor.py

1781 lines
66 KiB
Python

import os
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from drf_yasg.utils import swagger_auto_schema
from drf_yasg import openapi
from core.permissions import IsSuperUser, IsSameOrganizationDeveloper
from .tasks.auditoria import (
crear_partidas,
crear_partidas_por_pedimento,
auditar_procesamiento_remesa_por_pedimento,
auditar_coves,
auditar_acuse_cove,
auditar_edocuments,
auditar_acuse,
auditar_cove_por_pedimento,
auditar_acuse_cove_por_pedimento,
auditar_edocument_por_pedimento,
auditar_acuse_por_pedimento
)
from .tasks.internal_services import auditar_pedimentos
from .tasks.microservice_v2 import procesar_pedimentos_completos
from api.customs.models import Pedimento
from api.organization.models import Organizacion
from api.record.models import Document
from .tasks.auditoria import auditar_pedimento_por_id
from .tasks.auditoria_xml import extraer_info_pedimento_xml
@swagger_auto_schema(
method='post',
operation_description="Crea partidas para todos los pedimentos de una organización",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
},
required=['organizacion_id']
),
responses={
200: openapi.Response('Tarea iniciada correctamente'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def crear_partidas_organizacion(request):
"""
Crea partidas para todos los pedimentos de una organización específica.
"""
organizacion_id = request.data.get('organizacion_id')
if not organizacion_id:
return Response(
{'error': 'Debe proporcionar organizacion_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos
user = request.user
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
return Response(
{'error': 'No tiene permisos para esta organización'},
status=status.HTTP_403_FORBIDDEN
)
# Ejecutar la tarea
task = crear_partidas.delay(organizacion_id)
message = f"Creación de partidas iniciada para la organización {organizacion_id}"
return Response({
'message': message,
'task_id': task.id
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Crea partidas para un pedimento específico",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Tarea iniciada correctamente'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated ])
def crear_partidas_pedimento(request):
"""
Crea partidas para un pedimento específico.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
# Ejecutar la tarea
task = crear_partidas_por_pedimento.delay(pedimento_id)
message = f"Creación de partidas iniciada para el pedimento {pedimento_id}"
return Response({
'message': message,
'task_id': task.id
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita todos los pedimentos de una organización",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
},
required=['organizacion_id']
),
responses={
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_pedimentos_endpoint(request):
"""
Inicia una tarea de auditoría para todos los pedimentos de una organización.
Verifica todos los documentos y datos asociados a los pedimentos.
"""
organizacion_id = request.data.get('organizacion_id')
if not organizacion_id:
return Response(
{'error': 'Debe proporcionar organizacion_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos
user = request.user
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
return Response(
{'error': 'No tiene permisos para esta organización'},
status=status.HTTP_403_FORBIDDEN
)
# Ejecutar la tarea de auditoría
task = auditar_pedimentos.delay(organizacion_id)
message = f"Auditoría iniciada para la organización {organizacion_id}"
return Response({
'message': message,
'task_id': task.id
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita el procesamiento de remesa para un pedimento específico",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento a auditar')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_procesamiento_remesa_pedimento_endpoint(request):
"""
Inicia una tarea de auditoría de remesa para un pedimento específico.
Verifica el estado del procesamiento de remesa y la creación de COVEs.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
# Ejecutar la tarea de auditoría
task = auditar_procesamiento_remesa_por_pedimento.delay(pedimento_id)
message = f"Auditoría de remesa iniciada para el pedimento {pedimento_id}"
return Response({
'message': message,
'task_id': task.id,
'pedimento': {
'id': str(pedimento.id),
'pedimento': pedimento.pedimento,
'tiene_remesas': pedimento.remesas
}
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita los COVEs de una organización",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
},
required=['organizacion_id']
),
responses={
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_coves_endpoint(request):
"""
Inicia una tarea de auditoría para los COVEs de una organización.
Verifica la existencia y validez de los COVEs generados.
"""
organizacion_id = request.data.get('organizacion_id')
if not organizacion_id:
return Response(
{'error': 'Debe proporcionar organizacion_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos
user = request.user
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
return Response(
{'error': 'No tiene permisos para esta organización'},
status=status.HTTP_403_FORBIDDEN
)
# Ejecutar la tarea de auditoría
task = auditar_coves.delay(organizacion_id)
message = f"Auditoría de COVEs iniciada para la organización {organizacion_id}"
return Response({
'message': message,
'task_id': task.id
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita los acuses de COVE de una organización",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
},
required=['organizacion_id']
),
responses={
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_acuse_cove_endpoint(request):
"""
Inicia una tarea de auditoría para los acuses de COVE de una organización.
Verifica la recepción y validez de los acuses de COVE.
"""
organizacion_id = request.data.get('organizacion_id')
if not organizacion_id:
return Response(
{'error': 'Debe proporcionar organizacion_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos
user = request.user
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
return Response(
{'error': 'No tiene permisos para esta organización'},
status=status.HTTP_403_FORBIDDEN
)
# Ejecutar la tarea de auditoría
task = auditar_acuse_cove.delay(organizacion_id)
message = f"Auditoría de acuses de COVE iniciada para la organización {organizacion_id}"
return Response({
'message': message,
'task_id': task.id
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita los EDocuments de una organización",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
},
required=['organizacion_id']
),
responses={
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_edocuments_endpoint(request):
"""
Inicia una tarea de auditoría para los EDocuments de una organización.
Verifica la existencia y validez de los EDocuments generados.
"""
organizacion_id = request.data.get('organizacion_id')
if not organizacion_id:
return Response(
{'error': 'Debe proporcionar organizacion_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos
user = request.user
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
return Response(
{'error': 'No tiene permisos para esta organización'},
status=status.HTTP_403_FORBIDDEN
)
# Ejecutar la tarea de auditoría
task = auditar_edocuments.delay(organizacion_id)
message = f"Auditoría de EDocuments iniciada para la organización {organizacion_id}"
return Response({
'message': message,
'task_id': task.id
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita los acuses de una organización",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
},
required=['organizacion_id']
),
responses={
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_acuse_endpoint(request):
"""
Inicia una tarea de auditoría para los acuses de una organización.
Verifica la recepción y validez de los acuses.
"""
organizacion_id = request.data.get('organizacion_id')
if not organizacion_id:
return Response(
{'error': 'Debe proporcionar organizacion_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos
user = request.user
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
return Response(
{'error': 'No tiene permisos para esta organización'},
status=status.HTTP_403_FORBIDDEN
)
# Ejecutar la tarea de auditoría
task = auditar_acuse.delay(organizacion_id)
message = f"Auditoría de acuses iniciada para la organización {organizacion_id}"
return Response({
'message': message,
'task_id': task.id
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita el COVE de un pedimento específico",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def auditar_cove_pedimento_endpoint(request):
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
except Pedimento.DoesNotExist:
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
task = auditar_cove_por_pedimento.delay(pedimento_id)
return Response({'message': f'Auditoría de COVE iniciada para el pedimento {pedimento_id}', 'task_id': task.id}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita el acuse de COVE de un pedimento específico",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def auditar_acuse_cove_pedimento_endpoint(request):
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
except Pedimento.DoesNotExist:
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
task = auditar_acuse_cove_por_pedimento.delay(pedimento_id)
return Response({'message': f'Auditoría de acuse de COVE iniciada para el pedimento {pedimento_id}', 'task_id': task.id}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita el EDocument de un pedimento específico",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def auditar_edocument_pedimento_endpoint(request):
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
except Pedimento.DoesNotExist:
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
task = auditar_edocument_por_pedimento.delay(pedimento_id)
return Response({'message': f'Auditoría de EDocument iniciada para el pedimento {pedimento_id}', 'task_id': task.id}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita el acuse de un pedimento específico",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def auditar_acuse_pedimento_endpoint(request):
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
except Pedimento.DoesNotExist:
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
task = auditar_acuse_por_pedimento.delay(pedimento_id)
return Response({'message': f'Auditoría de acuse iniciada para el pedimento {pedimento_id}', 'task_id': task.id}, status=status.HTTP_200_OK)
### Procesamiento de pedimentos ###
@swagger_auto_schema(
method='post',
operation_description="Procesamiento de todos los pedimentos de todas las organizaciones",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={}
),
responses={
200: openapi.Response('Tarea de procesamiento iniciada correctamente'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('No se encontraron organizaciones')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_procesar_pedimentos_organizacion(request):
"""
Inicia una tarea de procesamiento para todos los pedimentos de todas las organizaciones.
Solo usuarios administradores pueden ejecutar esta función.
"""
# Validar permisos (solo superusuarios pueden procesar todas las organizaciones)
user = request.user
if not user.is_superuser:
return Response(
{'error': 'Solo los superusuarios pueden procesar todas las organizaciones'},
status=status.HTTP_403_FORBIDDEN
)
organizaciones = Organizacion.objects.all()
if not organizaciones.exists():
return Response(
{'error': 'No se encontraron organizaciones'},
status=status.HTTP_404_NOT_FOUND
)
# Lista para recopilar todos los task_ids y detalles
tasks_iniciadas = []
for organizacion in organizaciones:
organizacion_id = str(organizacion.id)
print(f"Procesando organización: {organizacion_id} - {organizacion.nombre}")
# Ejecutar la tarea de procesamiento
task = procesar_pedimentos_completos.delay(organizacion_id)
# Agregar información de la tarea a la lista
tasks_iniciadas.append({
'organizacion_id': organizacion_id,
'organizacion_nombre': organizacion.nombre,
'task_id': task.id
})
# Crear mensaje general y lista de task_ids
total_organizaciones = len(tasks_iniciadas)
task_ids = [task['task_id'] for task in tasks_iniciadas]
message = f"Procesamiento de pedimentos iniciado para {total_organizaciones} organización(es)"
return Response({
'message': message,
'total_organizaciones': total_organizaciones,
'task_ids': task_ids,
'tasks_detalle': tasks_iniciadas
}, status=status.HTTP_200_OK)
### Fin Procesamiento de pedimentos ###
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_peticion_respuesta_pedimento_completo(request):
"""
Backend endpoint para obtener las peticiones y respuestas asociadas a un pedimento.
"""
pedimento_id = request.data.get('pedimento_id')
vista_auditar = request.data.get('vista', 'desconocido') # 'completa' o 'resumen'
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
tipo_documento_peticion = None
tipo_documento_respuesta = None
vista = 'desconocido'
if vista_auditar == 'pc':
tipo_documento_peticion = 13
tipo_documento_respuesta = 14
vista = 'Pedimento Completo'
elif vista_auditar == 'rm':
tipo_documento_peticion = 15
tipo_documento_respuesta = 16
vista = 'Remesa'
elif vista_auditar == 'pt':
tipo_documento_peticion = 17
tipo_documento_respuesta = 18
vista = 'Partidas'
elif vista_auditar == 'cove':
tipo_documento_peticion = 19
tipo_documento_respuesta = 20
vista = 'COVEs'
elif vista_auditar == 'edoc':
tipo_documento_peticion = 21
tipo_documento_respuesta = 22
vista = 'Edocuments'
elif vista_auditar == 'ac_cove':
tipo_documento_peticion = 23
tipo_documento_respuesta = 24
vista = 'Acuses COVEs'
elif vista_auditar == 'ac':
tipo_documento_peticion = 25
tipo_documento_respuesta = 26
vista = 'Acuses'
if not tipo_documento_peticion and not tipo_documento_respuesta:
return Response(
{'error': 'Tipo de vista no reconocido para auditoría de pedimento'},
status=status.HTTP_400_BAD_REQUEST
)
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
#archivo__icontains= f"VU_PT_{pedimento_app}_REQUEST.xml",
document_type= tipo_documento_peticion, # Tipo de documento para petición de partidas
organizacion=pedimento.organizacion,
)
documentos_respuesta = Document.objects.filter(
pedimento=pedimento,
#archivo__icontains= f"VU_PT_{pedimento_app}_REQUEST.xml",
document_type= tipo_documento_respuesta, # Tipo de documento para respuesta de partidas
organizacion=pedimento.organizacion,
)
if not documentos_peticion and not documentos_respuesta:
return Response(
{'error': f'Registro de documentos de petición y respuesta de {vista} no encontrado(s)'},
status=status.HTTP_404_NOT_FOUND
)
# Crear lista con todos los documentos encontrados
documentos_lista_peticiones = []
for documento in documentos_peticion:
nombre_archivo = os.path.basename(documento.archivo.name)
documentos_lista_peticiones.append({
'id': str(documento.id),
'archivo': documento.archivo.path,
'archivo_original': nombre_archivo,
'extension': documento.extension,
'size': documento.size,
'tipo': documento.document_type.descripcion if documento.document_type else 'Desconocido',
'creado_en': documento.created_at,
'actualizado_en': documento.updated_at
})
# Crear lista vacía para respuestas (por si se requiere en el futuro)
documentos_lista_respuestas = []
for documento in documentos_respuesta:
nombre_archivo = os.path.basename(documento.archivo.name)
documentos_lista_respuestas.append({
'id': str(documento.id),
'archivo': documento.archivo.path,
'archivo_original': nombre_archivo,
'extension': documento.extension,
'size': documento.size,
'tipo': documento.document_type.descripcion if documento.document_type else 'Desconocido',
'creado_en': documento.created_at,
'actualizado_en': documento.updated_at
})
# return Response({
# 'id': pedimento.id,
# 'pedimento_app': pedimento_app,
# 'contribuyente': getattr(pedimento.contribuyente, 'rfc', None),
# 'organizacion': getattr(pedimento.organizacion, 'nombre', None),
# 'creado': pedimento.created_at
# }, status=status.HTTP_200_OK)
return Response({
'id': str(pedimento.id),
'pedimento_id': str(pedimento.id),
'pedimento': pedimento.pedimento,
'pedimento_app': pedimento_app,
'contribuyente': getattr(pedimento.contribuyente, 'rfc', None),
'organizacion': getattr(pedimento.organizacion, 'nombre', None),
'creado': pedimento.created_at,
'total_documentos_peticiones': len(documentos_lista_peticiones),
'total_documentos_respuestas': len(documentos_lista_respuestas),
'documentos_peticiones': documentos_lista_peticiones,
'documentos_respuestas': documentos_lista_respuestas
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_pedimento_vu(request):
"""
Backend endpoint para obtener las peticiones y respuestas asociadas a un pedimento.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_PC_{pedimento_app}_REQUEST.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de petición no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_pedimento_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a un pedimento.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_PC_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_remesa_vu(request):
"""
Backend endpoint para obtener las peticiones asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_RM_{pedimento_app}_REQUEST.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de petición de remesa no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_remesa_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_RM_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta de remesa no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_partidas_vu(request):
"""
Backend endpoint para obtener las peticiones asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
#archivo__icontains= f"VU_PT_{pedimento_app}_REQUEST.xml",
document_type= 17, # Tipo de documento para petición de partidas
organizacion=pedimento.organizacion,
)
if not documentos_peticion:
return Response(
{'error': 'Documento de petición de partidas no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
# Crear lista con todos los documentos encontrados
documentos_lista = []
for documento in documentos_peticion:
nombre_archivo = os.path.basename(documento.archivo.name)
documentos_lista.append({
'id': str(documento.id),
'archivo': documento.archivo.path,
'archivo_original': nombre_archivo,
'extension': documento.extension,
'size': documento.size,
'tipo': documento.document_type.descripcion if documento.document_type else 'Desconocido',
'creado_en': documento.created_at,
'actualizado_en': documento.updated_at
})
# nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
# return Response({
# 'id': documentos_peticion.id,
# 'archivo': documentos_peticion.archivo.path,
# 'archivo_original': nombre_archivo,
# 'extension': documentos_peticion.extension,
# 'size': documentos_peticion.size,
# 'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
# }, status=status.HTTP_200_OK)
return Response({
'pedimento_id': str(pedimento.id),
'pedimento': pedimento.pedimento,
'pedimento_app': pedimento.pedimento_app,
'total_documentos': len(documentos_lista),
'documentos': documentos_lista
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_partidas_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_PT_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta de partidas no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_acuse_vu(request):
"""
Backend endpoint para obtener las peticiones asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_AC_{pedimento_app}_REQUEST.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de petición de acuse no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_acuse_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_AC_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta de acuse no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_cove_vu(request):
"""
Backend endpoint para obtener las peticiones asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_COVE_{pedimento_app}_REQUEST.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de petición de cove no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_cove_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_COVE_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta de cove no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_acuse_cove_vu(request):
"""
Backend endpoint para obtener las peticiones asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_AC_COVE_{pedimento_app}_REQUEST.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de petición de acuse cove no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_acuse_cove_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_AC_COVE_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta de acuse cove no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_peticion_edocument_vu(request):
"""
Backend endpoint para obtener las peticiones asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_ED_{pedimento_app}_REQUEST.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de petición de e-document no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
}, status=status.HTTP_200_OK)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditor_obtener_respuesta_edocument_vu(request):
"""
Backend endpoint para obtener las respuestas asociadas a una remesa.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
# Validar permisos y existencia del pedimento
try:
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
pedimento_app = pedimento.pedimento_app
documentos_peticion = Document.objects.filter(
pedimento=pedimento,
archivo__icontains= f"VU_ED_{pedimento_app}_ERROR.xml",
organizacion=pedimento.organizacion,
).first()
if not documentos_peticion:
return Response(
{'error': 'Documento de respuesta de e-document no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
return Response({
'id': documentos_peticion.id,
'archivo': documentos_peticion.archivo.path,
'archivo_original': nombre_archivo,
'extension': documentos_peticion.extension,
'size': documentos_peticion.size,
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
}, status=status.HTTP_200_OK)
@swagger_auto_schema(
method='post',
operation_description="Audita un pedimento específico verificando su XML y extrayendo información",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
},
required=['pedimento_id']
),
responses={
200: openapi.Response('Auditoría completada'),
400: openapi.Response('Error en los parámetros'),
403: openapi.Response('No tiene permisos suficientes'),
404: openapi.Response('Pedimento no encontrado')
}
)
@api_view(['POST'])
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
def auditar_pedimento_endpoint(request):
"""
Audita un pedimento específico verificando si existe su XML y extrayendo información.
"""
pedimento_id = request.data.get('pedimento_id')
if not pedimento_id:
return Response(
{'error': 'Debe proporcionar pedimento_id'},
status=status.HTTP_400_BAD_REQUEST
)
try:
# Validar permisos y existencia del pedimento
pedimento = Pedimento.objects.get(id=pedimento_id)
user = request.user
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
return Response(
{'error': 'No tiene permisos para este pedimento'},
status=status.HTTP_403_FORBIDDEN
)
# Buscar documentos XML del pedimento
documentos_xml = Document.objects.filter(
pedimento=pedimento,
archivo__endswith='.xml',
organizacion=pedimento.organizacion
)
if not documentos_xml.exists():
return Response({
'pedimento_id': str(pedimento_id),
'pedimento': pedimento.pedimento,
'pedimento_app': pedimento.pedimento_app,
'archivos_xml_encontrados': 0,
'mensaje': 'No se encontraron archivos XML para este pedimento',
'auditoria_completa': False
}, status=status.HTTP_200_OK)
# Lista para almacenar información de cada XML
xmls_analizados = []
informacion_extraida = []
for documento in documentos_xml:
try:
xml_info = {
'documento_id': str(documento.id),
'nombre_archivo': os.path.basename(documento.archivo.name),
'tamanio': documento.size,
'extension': documento.extension,
'tipo_documento': documento.document_type.descripcion if documento.document_type else 'Desconocido'
}
# Intentar extraer información del XML
try:
with open(documento.archivo.path, 'r', encoding='utf-8') as xml_file:
xml_content = xml_file.read()
# Extraer información específica del XML
info_pedimento = extraer_info_pedimento_xml(xml_content)
if info_pedimento:
xml_info['informacion_extraida'] = info_pedimento
informacion_extraida.append(info_pedimento)
# Actualizar el pedimento con la información encontrada si es necesario
actualizar_info_pedimento(pedimento, info_pedimento)
except Exception as e:
xml_info['error_lectura'] = str(e)
xmls_analizados.append(xml_info)
except Exception as e:
xmls_analizados.append({
'documento_id': str(documento.id),
'nombre_archivo': os.path.basename(documento.archivo.name),
'error': f'Error procesando archivo: {str(e)}'
})
# Ejecutar la tarea de auditoría completa
task = auditar_pedimento_por_id.delay(pedimento_id)
response_data = {
'pedimento_id': str(pedimento_id),
'pedimento': pedimento.pedimento,
'pedimento_app': pedimento.pedimento_app,
'archivos_xml_encontrados': len(xmls_analizados),
'xmls_analizados': xmls_analizados,
'informacion_extraida': informacion_extraida,
'auditoria_completa': True,
'task_id': task.id,
'mensaje': f'Auditoría completada para el pedimento {pedimento.pedimento}'
}
return Response(response_data, status=status.HTTP_200_OK)
except Pedimento.DoesNotExist:
return Response(
{'error': 'Pedimento no encontrado'},
status=status.HTTP_404_NOT_FOUND
)
except Exception as e:
return Response(
{'error': f'Error en la auditoría: {str(e)}'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def actualizar_info_pedimento(pedimento, info_xml):
"""
Actualiza la información del pedimento con los datos extraídos del XML.
"""
try:
actualizado = False
# Actualizar información del pedimento si está en el XML y no está ya llena
if 'numero_operacion' in info_xml and not pedimento.numero_operacion:
pedimento.numero_operacion = info_xml['numero_operacion']
actualizado = True
# Número de partidas
if 'numero_partidas' in info_xml and not pedimento.numero_partidas:
pedimento.numero_partidas = info_xml['numero_partidas']
actualizado = True
# Clave del pedimento
if 'clave_pedimento' in info_xml and not pedimento.clave_pedimento:
pedimento.clave_pedimento = info_xml['clave_pedimento']
actualizado = True
# Aduana (patente)
if 'aduana_clave' in info_xml and not pedimento.aduana:
pedimento.aduana = info_xml['aduana_clave']
actualizado = True
# RFC Agente Aduanal
if 'rfc_agente_aduanal' in info_xml and not pedimento.agente_aduanal:
pedimento.agente_aduanal = info_xml['rfc_agente_aduanal']
actualizado = True
# CURP Apoderado
if 'curp_apoderado' in info_xml and not pedimento.curp_apoderado:
pedimento.curp_apoderado = info_xml['curp_apoderado']
actualizado = True
# Fecha de pago
if 'fecha_pago' in info_xml and not pedimento.fecha_pago:
try:
# Convertir formato de fecha (ej: "2024-02-15-06:00")
fecha_str = info_xml['fecha_pago']
# Extraer solo la parte de la fecha (antes del primer '-')
fecha_parts = fecha_str.split('-')
if len(fecha_parts) >= 3:
fecha_simple = f"{fecha_parts[0]}-{fecha_parts[1]}-{fecha_parts[2]}"
from datetime import datetime
fecha_obj = datetime.strptime(fecha_simple, '%Y-%m-%d').date()
pedimento.fecha_pago = fecha_obj
actualizado = True
except (ValueError, TypeError, IndexError):
pass
# Importe total (valor en dólares)
if 'valor_dolares' in info_xml and not pedimento.importe_total:
try:
pedimento.importe_total = float(info_xml['valor_dolares'])
actualizado = True
except (ValueError, TypeError):
pass
if 'contribuyente_rfc' in info_xml and not pedimento.contribuyente:
try:
# Buscar o crear el importador
from api.customs.models import Importador
importador, created = Importador.objects.get_or_create(
rfc=info_xml['contribuyente_rfc'],
organizacion=pedimento.organizacion,
defaults={'nombre': info_xml.get('contribuyente_nombre', '')}
)
pedimento.contribuyente = importador
actualizado = True
except Exception:
pass
if 'tipo_operacion' in info_xml and not pedimento.tipo_operacion:
try:
from api.customs.models import TipoOperacion
tipo_op_obj, created = TipoOperacion.objects.get_or_create(
tipo=info_xml['tipo_operacion'],
defaults={'descripcion': info_xml['tipo_operacion_descripcion'][:100]} # Limitar a 100 caracteres
)
pedimento.tipo_operacion = tipo_op_obj
actualizado = True
except Exception:
pass
if actualizado:
pedimento.save()
return True
return False
except Exception:
return False