1781 lines
66 KiB
Python
1781 lines
66 KiB
Python
import os
|
|
from rest_framework.decorators import api_view, permission_classes
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from rest_framework.response import Response
|
|
from rest_framework import status
|
|
from drf_yasg.utils import swagger_auto_schema
|
|
from drf_yasg import openapi
|
|
from core.permissions import IsSuperUser, IsSameOrganizationDeveloper
|
|
from .tasks.auditoria import (
|
|
crear_partidas,
|
|
crear_partidas_por_pedimento,
|
|
auditar_procesamiento_remesa_por_pedimento,
|
|
auditar_coves,
|
|
auditar_acuse_cove,
|
|
auditar_edocuments,
|
|
auditar_acuse,
|
|
auditar_cove_por_pedimento,
|
|
auditar_acuse_cove_por_pedimento,
|
|
auditar_edocument_por_pedimento,
|
|
auditar_acuse_por_pedimento
|
|
)
|
|
from .tasks.internal_services import auditar_pedimentos
|
|
from .tasks.microservice_v2 import procesar_pedimentos_completos
|
|
from api.customs.models import Pedimento
|
|
from api.organization.models import Organizacion
|
|
from api.record.models import Document
|
|
from .tasks.auditoria import auditar_pedimento_por_id
|
|
from .tasks.auditoria_xml import extraer_info_pedimento_xml
|
|
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Crea partidas para todos los pedimentos de una organización",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
|
|
},
|
|
required=['organizacion_id']
|
|
),
|
|
responses={
|
|
200: openapi.Response('Tarea iniciada correctamente'),
|
|
400: openapi.Response('Error en los parámetros'),
|
|
403: openapi.Response('No tiene permisos suficientes')
|
|
}
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def crear_partidas_organizacion(request):
|
|
"""
|
|
Crea partidas para todos los pedimentos de una organización específica.
|
|
"""
|
|
organizacion_id = request.data.get('organizacion_id')
|
|
|
|
if not organizacion_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar organizacion_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos
|
|
user = request.user
|
|
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
|
|
return Response(
|
|
{'error': 'No tiene permisos para esta organización'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
|
|
# Ejecutar la tarea
|
|
task = crear_partidas.delay(organizacion_id)
|
|
message = f"Creación de partidas iniciada para la organización {organizacion_id}"
|
|
|
|
return Response({
|
|
'message': message,
|
|
'task_id': task.id
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Crea partidas para un pedimento específico",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
|
|
},
|
|
required=['pedimento_id']
|
|
),
|
|
responses={
|
|
200: openapi.Response('Tarea iniciada correctamente'),
|
|
400: openapi.Response('Error en los parámetros'),
|
|
403: openapi.Response('No tiene permisos suficientes'),
|
|
404: openapi.Response('Pedimento no encontrado')
|
|
}
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated ])
|
|
def crear_partidas_pedimento(request):
|
|
"""
|
|
Crea partidas para un pedimento específico.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
# Ejecutar la tarea
|
|
task = crear_partidas_por_pedimento.delay(pedimento_id)
|
|
message = f"Creación de partidas iniciada para el pedimento {pedimento_id}"
|
|
|
|
return Response({
|
|
'message': message,
|
|
'task_id': task.id
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Audita todos los pedimentos de una organización",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
|
|
},
|
|
required=['organizacion_id']
|
|
),
|
|
responses={
|
|
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
|
|
400: openapi.Response('Error en los parámetros'),
|
|
403: openapi.Response('No tiene permisos suficientes')
|
|
}
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditar_pedimentos_endpoint(request):
|
|
"""
|
|
Inicia una tarea de auditoría para todos los pedimentos de una organización.
|
|
Verifica todos los documentos y datos asociados a los pedimentos.
|
|
"""
|
|
organizacion_id = request.data.get('organizacion_id')
|
|
|
|
if not organizacion_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar organizacion_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos
|
|
user = request.user
|
|
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
|
|
return Response(
|
|
{'error': 'No tiene permisos para esta organización'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
|
|
# Ejecutar la tarea de auditoría
|
|
task = auditar_pedimentos.delay(organizacion_id)
|
|
message = f"Auditoría iniciada para la organización {organizacion_id}"
|
|
|
|
return Response({
|
|
'message': message,
|
|
'task_id': task.id
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Audita el procesamiento de remesa para un pedimento específico",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento a auditar')
|
|
},
|
|
required=['pedimento_id']
|
|
),
|
|
responses={
|
|
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
|
|
400: openapi.Response('Error en los parámetros'),
|
|
403: openapi.Response('No tiene permisos suficientes'),
|
|
404: openapi.Response('Pedimento no encontrado')
|
|
}
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditar_procesamiento_remesa_pedimento_endpoint(request):
|
|
"""
|
|
Inicia una tarea de auditoría de remesa para un pedimento específico.
|
|
Verifica el estado del procesamiento de remesa y la creación de COVEs.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
# Ejecutar la tarea de auditoría
|
|
task = auditar_procesamiento_remesa_por_pedimento.delay(pedimento_id)
|
|
message = f"Auditoría de remesa iniciada para el pedimento {pedimento_id}"
|
|
|
|
return Response({
|
|
'message': message,
|
|
'task_id': task.id,
|
|
'pedimento': {
|
|
'id': str(pedimento.id),
|
|
'pedimento': pedimento.pedimento,
|
|
'tiene_remesas': pedimento.remesas
|
|
}
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Audita los COVEs de una organización",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
|
|
},
|
|
required=['organizacion_id']
|
|
),
|
|
responses={
|
|
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
|
|
400: openapi.Response('Error en los parámetros'),
|
|
403: openapi.Response('No tiene permisos suficientes')
|
|
}
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditar_coves_endpoint(request):
|
|
"""
|
|
Inicia una tarea de auditoría para los COVEs de una organización.
|
|
Verifica la existencia y validez de los COVEs generados.
|
|
"""
|
|
organizacion_id = request.data.get('organizacion_id')
|
|
|
|
if not organizacion_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar organizacion_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos
|
|
user = request.user
|
|
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
|
|
return Response(
|
|
{'error': 'No tiene permisos para esta organización'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
|
|
# Ejecutar la tarea de auditoría
|
|
task = auditar_coves.delay(organizacion_id)
|
|
message = f"Auditoría de COVEs iniciada para la organización {organizacion_id}"
|
|
|
|
return Response({
|
|
'message': message,
|
|
'task_id': task.id
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Audita los acuses de COVE de una organización",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
|
|
},
|
|
required=['organizacion_id']
|
|
),
|
|
responses={
|
|
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
|
|
400: openapi.Response('Error en los parámetros'),
|
|
403: openapi.Response('No tiene permisos suficientes')
|
|
}
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditar_acuse_cove_endpoint(request):
|
|
"""
|
|
Inicia una tarea de auditoría para los acuses de COVE de una organización.
|
|
Verifica la recepción y validez de los acuses de COVE.
|
|
"""
|
|
organizacion_id = request.data.get('organizacion_id')
|
|
|
|
if not organizacion_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar organizacion_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos
|
|
user = request.user
|
|
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
|
|
return Response(
|
|
{'error': 'No tiene permisos para esta organización'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
|
|
# Ejecutar la tarea de auditoría
|
|
task = auditar_acuse_cove.delay(organizacion_id)
|
|
message = f"Auditoría de acuses de COVE iniciada para la organización {organizacion_id}"
|
|
|
|
return Response({
|
|
'message': message,
|
|
'task_id': task.id
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Audita los EDocuments de una organización",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
|
|
},
|
|
required=['organizacion_id']
|
|
),
|
|
responses={
|
|
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
|
|
400: openapi.Response('Error en los parámetros'),
|
|
403: openapi.Response('No tiene permisos suficientes')
|
|
}
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditar_edocuments_endpoint(request):
|
|
"""
|
|
Inicia una tarea de auditoría para los EDocuments de una organización.
|
|
Verifica la existencia y validez de los EDocuments generados.
|
|
"""
|
|
organizacion_id = request.data.get('organizacion_id')
|
|
|
|
if not organizacion_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar organizacion_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos
|
|
user = request.user
|
|
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
|
|
return Response(
|
|
{'error': 'No tiene permisos para esta organización'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
|
|
# Ejecutar la tarea de auditoría
|
|
task = auditar_edocuments.delay(organizacion_id)
|
|
message = f"Auditoría de EDocuments iniciada para la organización {organizacion_id}"
|
|
|
|
return Response({
|
|
'message': message,
|
|
'task_id': task.id
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Audita los acuses de una organización",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'organizacion_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID de la organización')
|
|
},
|
|
required=['organizacion_id']
|
|
),
|
|
responses={
|
|
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
|
|
400: openapi.Response('Error en los parámetros'),
|
|
403: openapi.Response('No tiene permisos suficientes')
|
|
}
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditar_acuse_endpoint(request):
|
|
"""
|
|
Inicia una tarea de auditoría para los acuses de una organización.
|
|
Verifica la recepción y validez de los acuses.
|
|
"""
|
|
organizacion_id = request.data.get('organizacion_id')
|
|
|
|
if not organizacion_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar organizacion_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos
|
|
user = request.user
|
|
if not user.is_superuser and str(user.organizacion.id) != organizacion_id:
|
|
return Response(
|
|
{'error': 'No tiene permisos para esta organización'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
|
|
# Ejecutar la tarea de auditoría
|
|
task = auditar_acuse.delay(organizacion_id)
|
|
message = f"Auditoría de acuses iniciada para la organización {organizacion_id}"
|
|
|
|
return Response({
|
|
'message': message,
|
|
'task_id': task.id
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Audita el COVE de un pedimento específico",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
|
|
},
|
|
required=['pedimento_id']
|
|
),
|
|
responses={
|
|
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
|
|
400: openapi.Response('Error en los parámetros'),
|
|
403: openapi.Response('No tiene permisos suficientes'),
|
|
404: openapi.Response('Pedimento no encontrado')
|
|
}
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated])
|
|
def auditar_cove_pedimento_endpoint(request):
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
|
|
except Pedimento.DoesNotExist:
|
|
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
|
|
task = auditar_cove_por_pedimento.delay(pedimento_id)
|
|
return Response({'message': f'Auditoría de COVE iniciada para el pedimento {pedimento_id}', 'task_id': task.id}, status=status.HTTP_200_OK)
|
|
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Audita el acuse de COVE de un pedimento específico",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
|
|
},
|
|
required=['pedimento_id']
|
|
),
|
|
responses={
|
|
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
|
|
400: openapi.Response('Error en los parámetros'),
|
|
403: openapi.Response('No tiene permisos suficientes'),
|
|
404: openapi.Response('Pedimento no encontrado')
|
|
}
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated])
|
|
def auditar_acuse_cove_pedimento_endpoint(request):
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
|
|
except Pedimento.DoesNotExist:
|
|
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
|
|
task = auditar_acuse_cove_por_pedimento.delay(pedimento_id)
|
|
return Response({'message': f'Auditoría de acuse de COVE iniciada para el pedimento {pedimento_id}', 'task_id': task.id}, status=status.HTTP_200_OK)
|
|
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Audita el EDocument de un pedimento específico",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
|
|
},
|
|
required=['pedimento_id']
|
|
),
|
|
responses={
|
|
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
|
|
400: openapi.Response('Error en los parámetros'),
|
|
403: openapi.Response('No tiene permisos suficientes'),
|
|
404: openapi.Response('Pedimento no encontrado')
|
|
}
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated])
|
|
def auditar_edocument_pedimento_endpoint(request):
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
|
|
except Pedimento.DoesNotExist:
|
|
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
|
|
task = auditar_edocument_por_pedimento.delay(pedimento_id)
|
|
return Response({'message': f'Auditoría de EDocument iniciada para el pedimento {pedimento_id}', 'task_id': task.id}, status=status.HTTP_200_OK)
|
|
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Audita el acuse de un pedimento específico",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
|
|
},
|
|
required=['pedimento_id']
|
|
),
|
|
responses={
|
|
200: openapi.Response('Tarea de auditoría iniciada correctamente'),
|
|
400: openapi.Response('Error en los parámetros'),
|
|
403: openapi.Response('No tiene permisos suficientes'),
|
|
404: openapi.Response('Pedimento no encontrado')
|
|
}
|
|
)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated])
|
|
def auditar_acuse_pedimento_endpoint(request):
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response({'error': 'Debe proporcionar pedimento_id'}, status=status.HTTP_400_BAD_REQUEST)
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response({'error': 'No tiene permisos para este pedimento'}, status=status.HTTP_403_FORBIDDEN)
|
|
except Pedimento.DoesNotExist:
|
|
return Response({'error': 'Pedimento no encontrado'}, status=status.HTTP_404_NOT_FOUND)
|
|
task = auditar_acuse_por_pedimento.delay(pedimento_id)
|
|
return Response({'message': f'Auditoría de acuse iniciada para el pedimento {pedimento_id}', 'task_id': task.id}, status=status.HTTP_200_OK)
|
|
|
|
### Procesamiento de pedimentos ###
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Procesamiento de todos los pedimentos de todas las organizaciones",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={}
|
|
),
|
|
responses={
|
|
200: openapi.Response('Tarea de procesamiento iniciada correctamente'),
|
|
403: openapi.Response('No tiene permisos suficientes'),
|
|
404: openapi.Response('No se encontraron organizaciones')
|
|
}
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_procesar_pedimentos_organizacion(request):
|
|
"""
|
|
Inicia una tarea de procesamiento para todos los pedimentos de todas las organizaciones.
|
|
Solo usuarios administradores pueden ejecutar esta función.
|
|
"""
|
|
# Validar permisos (solo superusuarios pueden procesar todas las organizaciones)
|
|
user = request.user
|
|
if not user.is_superuser:
|
|
return Response(
|
|
{'error': 'Solo los superusuarios pueden procesar todas las organizaciones'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
|
|
organizaciones = Organizacion.objects.all()
|
|
if not organizaciones.exists():
|
|
return Response(
|
|
{'error': 'No se encontraron organizaciones'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
# Lista para recopilar todos los task_ids y detalles
|
|
tasks_iniciadas = []
|
|
|
|
for organizacion in organizaciones:
|
|
organizacion_id = str(organizacion.id)
|
|
print(f"Procesando organización: {organizacion_id} - {organizacion.nombre}")
|
|
# Ejecutar la tarea de procesamiento
|
|
task = procesar_pedimentos_completos.delay(organizacion_id)
|
|
|
|
# Agregar información de la tarea a la lista
|
|
tasks_iniciadas.append({
|
|
'organizacion_id': organizacion_id,
|
|
'organizacion_nombre': organizacion.nombre,
|
|
'task_id': task.id
|
|
})
|
|
|
|
# Crear mensaje general y lista de task_ids
|
|
total_organizaciones = len(tasks_iniciadas)
|
|
task_ids = [task['task_id'] for task in tasks_iniciadas]
|
|
|
|
message = f"Procesamiento de pedimentos iniciado para {total_organizaciones} organización(es)"
|
|
|
|
return Response({
|
|
'message': message,
|
|
'total_organizaciones': total_organizaciones,
|
|
'task_ids': task_ids,
|
|
'tasks_detalle': tasks_iniciadas
|
|
}, status=status.HTTP_200_OK)
|
|
### Fin Procesamiento de pedimentos ###
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditar_peticion_respuesta_pedimento_completo(request):
|
|
"""
|
|
Backend endpoint para obtener las peticiones y respuestas asociadas a un pedimento.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
vista_auditar = request.data.get('vista', 'desconocido') # 'completa' o 'resumen'
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
tipo_documento_peticion = None
|
|
tipo_documento_respuesta = None
|
|
vista = 'desconocido'
|
|
|
|
if vista_auditar == 'pc':
|
|
tipo_documento_peticion = 13
|
|
tipo_documento_respuesta = 14
|
|
vista = 'Pedimento Completo'
|
|
elif vista_auditar == 'rm':
|
|
tipo_documento_peticion = 15
|
|
tipo_documento_respuesta = 16
|
|
vista = 'Remesa'
|
|
elif vista_auditar == 'pt':
|
|
tipo_documento_peticion = 17
|
|
tipo_documento_respuesta = 18
|
|
vista = 'Partidas'
|
|
elif vista_auditar == 'cove':
|
|
tipo_documento_peticion = 19
|
|
tipo_documento_respuesta = 20
|
|
vista = 'COVEs'
|
|
elif vista_auditar == 'edoc':
|
|
tipo_documento_peticion = 21
|
|
tipo_documento_respuesta = 22
|
|
vista = 'Edocuments'
|
|
elif vista_auditar == 'ac_cove':
|
|
tipo_documento_peticion = 23
|
|
tipo_documento_respuesta = 24
|
|
vista = 'Acuses COVEs'
|
|
elif vista_auditar == 'ac':
|
|
tipo_documento_peticion = 25
|
|
tipo_documento_respuesta = 26
|
|
vista = 'Acuses'
|
|
|
|
if not tipo_documento_peticion and not tipo_documento_respuesta:
|
|
return Response(
|
|
{'error': 'Tipo de vista no reconocido para auditoría de pedimento'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
#archivo__icontains= f"VU_PT_{pedimento_app}_REQUEST.xml",
|
|
document_type= tipo_documento_peticion, # Tipo de documento para petición de partidas
|
|
organizacion=pedimento.organizacion,
|
|
)
|
|
|
|
documentos_respuesta = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
#archivo__icontains= f"VU_PT_{pedimento_app}_REQUEST.xml",
|
|
document_type= tipo_documento_respuesta, # Tipo de documento para respuesta de partidas
|
|
organizacion=pedimento.organizacion,
|
|
)
|
|
|
|
if not documentos_peticion and not documentos_respuesta:
|
|
return Response(
|
|
{'error': f'Registro de documentos de petición y respuesta de {vista} no encontrado(s)'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
# Crear lista con todos los documentos encontrados
|
|
documentos_lista_peticiones = []
|
|
for documento in documentos_peticion:
|
|
|
|
nombre_archivo = os.path.basename(documento.archivo.name)
|
|
|
|
documentos_lista_peticiones.append({
|
|
'id': str(documento.id),
|
|
'archivo': documento.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documento.extension,
|
|
'size': documento.size,
|
|
'tipo': documento.document_type.descripcion if documento.document_type else 'Desconocido',
|
|
'creado_en': documento.created_at,
|
|
'actualizado_en': documento.updated_at
|
|
})
|
|
|
|
# Crear lista vacía para respuestas (por si se requiere en el futuro)
|
|
documentos_lista_respuestas = []
|
|
for documento in documentos_respuesta:
|
|
|
|
nombre_archivo = os.path.basename(documento.archivo.name)
|
|
|
|
documentos_lista_respuestas.append({
|
|
'id': str(documento.id),
|
|
'archivo': documento.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documento.extension,
|
|
'size': documento.size,
|
|
'tipo': documento.document_type.descripcion if documento.document_type else 'Desconocido',
|
|
'creado_en': documento.created_at,
|
|
'actualizado_en': documento.updated_at
|
|
})
|
|
|
|
# return Response({
|
|
# 'id': pedimento.id,
|
|
# 'pedimento_app': pedimento_app,
|
|
# 'contribuyente': getattr(pedimento.contribuyente, 'rfc', None),
|
|
# 'organizacion': getattr(pedimento.organizacion, 'nombre', None),
|
|
# 'creado': pedimento.created_at
|
|
# }, status=status.HTTP_200_OK)
|
|
return Response({
|
|
'id': str(pedimento.id),
|
|
'pedimento_id': str(pedimento.id),
|
|
'pedimento': pedimento.pedimento,
|
|
'pedimento_app': pedimento_app,
|
|
'contribuyente': getattr(pedimento.contribuyente, 'rfc', None),
|
|
'organizacion': getattr(pedimento.organizacion, 'nombre', None),
|
|
'creado': pedimento.created_at,
|
|
'total_documentos_peticiones': len(documentos_lista_peticiones),
|
|
'total_documentos_respuestas': len(documentos_lista_respuestas),
|
|
'documentos_peticiones': documentos_lista_peticiones,
|
|
'documentos_respuestas': documentos_lista_respuestas
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_peticion_pedimento_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las peticiones y respuestas asociadas a un pedimento.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__icontains= f"VU_PC_{pedimento_app}_REQUEST.xml",
|
|
organizacion=pedimento.organizacion,
|
|
).first()
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de petición no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
|
|
return Response({
|
|
'id': documentos_peticion.id,
|
|
'archivo': documentos_peticion.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documentos_peticion.extension,
|
|
'size': documentos_peticion.size,
|
|
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_respuesta_pedimento_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las respuestas asociadas a un pedimento.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__icontains= f"VU_PC_{pedimento_app}_ERROR.xml",
|
|
organizacion=pedimento.organizacion,
|
|
).first()
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de respuesta no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
return Response({
|
|
'id': documentos_peticion.id,
|
|
'archivo': documentos_peticion.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documentos_peticion.extension,
|
|
'size': documentos_peticion.size,
|
|
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_peticion_remesa_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las peticiones asociadas a una remesa.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__icontains= f"VU_RM_{pedimento_app}_REQUEST.xml",
|
|
organizacion=pedimento.organizacion,
|
|
).first()
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de petición de remesa no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
|
|
return Response({
|
|
'id': documentos_peticion.id,
|
|
'archivo': documentos_peticion.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documentos_peticion.extension,
|
|
'size': documentos_peticion.size,
|
|
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_respuesta_remesa_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las respuestas asociadas a una remesa.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__icontains= f"VU_RM_{pedimento_app}_ERROR.xml",
|
|
organizacion=pedimento.organizacion,
|
|
).first()
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de respuesta de remesa no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
return Response({
|
|
'id': documentos_peticion.id,
|
|
'archivo': documentos_peticion.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documentos_peticion.extension,
|
|
'size': documentos_peticion.size,
|
|
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_peticion_partidas_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las peticiones asociadas a una remesa.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
#archivo__icontains= f"VU_PT_{pedimento_app}_REQUEST.xml",
|
|
document_type= 17, # Tipo de documento para petición de partidas
|
|
organizacion=pedimento.organizacion,
|
|
)
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de petición de partidas no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
# Crear lista con todos los documentos encontrados
|
|
documentos_lista = []
|
|
|
|
for documento in documentos_peticion:
|
|
|
|
nombre_archivo = os.path.basename(documento.archivo.name)
|
|
|
|
documentos_lista.append({
|
|
'id': str(documento.id),
|
|
'archivo': documento.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documento.extension,
|
|
'size': documento.size,
|
|
'tipo': documento.document_type.descripcion if documento.document_type else 'Desconocido',
|
|
'creado_en': documento.created_at,
|
|
'actualizado_en': documento.updated_at
|
|
})
|
|
|
|
# nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
|
|
# return Response({
|
|
# 'id': documentos_peticion.id,
|
|
# 'archivo': documentos_peticion.archivo.path,
|
|
# 'archivo_original': nombre_archivo,
|
|
# 'extension': documentos_peticion.extension,
|
|
# 'size': documentos_peticion.size,
|
|
# 'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
|
|
|
|
# }, status=status.HTTP_200_OK)
|
|
return Response({
|
|
'pedimento_id': str(pedimento.id),
|
|
'pedimento': pedimento.pedimento,
|
|
'pedimento_app': pedimento.pedimento_app,
|
|
'total_documentos': len(documentos_lista),
|
|
'documentos': documentos_lista
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_respuesta_partidas_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las respuestas asociadas a una remesa.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__icontains= f"VU_PT_{pedimento_app}_ERROR.xml",
|
|
organizacion=pedimento.organizacion,
|
|
).first()
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de respuesta de partidas no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
return Response({
|
|
'id': documentos_peticion.id,
|
|
'archivo': documentos_peticion.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documentos_peticion.extension,
|
|
'size': documentos_peticion.size,
|
|
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_peticion_acuse_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las peticiones asociadas a una remesa.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__icontains= f"VU_AC_{pedimento_app}_REQUEST.xml",
|
|
organizacion=pedimento.organizacion,
|
|
).first()
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de petición de acuse no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
|
|
return Response({
|
|
'id': documentos_peticion.id,
|
|
'archivo': documentos_peticion.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documentos_peticion.extension,
|
|
'size': documentos_peticion.size,
|
|
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_respuesta_acuse_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las respuestas asociadas a una remesa.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__icontains= f"VU_AC_{pedimento_app}_ERROR.xml",
|
|
organizacion=pedimento.organizacion,
|
|
).first()
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de respuesta de acuse no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
return Response({
|
|
'id': documentos_peticion.id,
|
|
'archivo': documentos_peticion.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documentos_peticion.extension,
|
|
'size': documentos_peticion.size,
|
|
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_peticion_cove_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las peticiones asociadas a una remesa.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__icontains= f"VU_COVE_{pedimento_app}_REQUEST.xml",
|
|
organizacion=pedimento.organizacion,
|
|
).first()
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de petición de cove no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
|
|
return Response({
|
|
'id': documentos_peticion.id,
|
|
'archivo': documentos_peticion.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documentos_peticion.extension,
|
|
'size': documentos_peticion.size,
|
|
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_respuesta_cove_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las respuestas asociadas a una remesa.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__icontains= f"VU_COVE_{pedimento_app}_ERROR.xml",
|
|
organizacion=pedimento.organizacion,
|
|
).first()
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de respuesta de cove no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
return Response({
|
|
'id': documentos_peticion.id,
|
|
'archivo': documentos_peticion.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documentos_peticion.extension,
|
|
'size': documentos_peticion.size,
|
|
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_peticion_acuse_cove_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las peticiones asociadas a una remesa.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__icontains= f"VU_AC_COVE_{pedimento_app}_REQUEST.xml",
|
|
organizacion=pedimento.organizacion,
|
|
).first()
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de petición de acuse cove no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
|
|
return Response({
|
|
'id': documentos_peticion.id,
|
|
'archivo': documentos_peticion.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documentos_peticion.extension,
|
|
'size': documentos_peticion.size,
|
|
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_respuesta_acuse_cove_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las respuestas asociadas a una remesa.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__icontains= f"VU_AC_COVE_{pedimento_app}_ERROR.xml",
|
|
organizacion=pedimento.organizacion,
|
|
).first()
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de respuesta de acuse cove no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
return Response({
|
|
'id': documentos_peticion.id,
|
|
'archivo': documentos_peticion.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documentos_peticion.extension,
|
|
'size': documentos_peticion.size,
|
|
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_peticion_edocument_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las peticiones asociadas a una remesa.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__icontains= f"VU_ED_{pedimento_app}_REQUEST.xml",
|
|
organizacion=pedimento.organizacion,
|
|
).first()
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de petición de e-document no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
|
|
return Response({
|
|
'id': documentos_peticion.id,
|
|
'archivo': documentos_peticion.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documentos_peticion.extension,
|
|
'size': documentos_peticion.size,
|
|
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticament
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditor_obtener_respuesta_edocument_vu(request):
|
|
"""
|
|
Backend endpoint para obtener las respuestas asociadas a una remesa.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
# Validar permisos y existencia del pedimento
|
|
try:
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
pedimento_app = pedimento.pedimento_app
|
|
|
|
documentos_peticion = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__icontains= f"VU_ED_{pedimento_app}_ERROR.xml",
|
|
organizacion=pedimento.organizacion,
|
|
).first()
|
|
|
|
if not documentos_peticion:
|
|
return Response(
|
|
{'error': 'Documento de respuesta de e-document no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
|
|
nombre_archivo = os.path.basename(documentos_peticion.archivo.name)
|
|
return Response({
|
|
'id': documentos_peticion.id,
|
|
'archivo': documentos_peticion.archivo.path,
|
|
'archivo_original': nombre_archivo,
|
|
'extension': documentos_peticion.extension,
|
|
'size': documentos_peticion.size,
|
|
'tipo': documentos_peticion.document_type.descripcion # O detectar automáticamente
|
|
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
|
|
@swagger_auto_schema(
|
|
method='post',
|
|
operation_description="Audita un pedimento específico verificando su XML y extrayendo información",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
properties={
|
|
'pedimento_id': openapi.Schema(type=openapi.TYPE_STRING, description='ID del pedimento')
|
|
},
|
|
required=['pedimento_id']
|
|
),
|
|
responses={
|
|
200: openapi.Response('Auditoría completada'),
|
|
400: openapi.Response('Error en los parámetros'),
|
|
403: openapi.Response('No tiene permisos suficientes'),
|
|
404: openapi.Response('Pedimento no encontrado')
|
|
}
|
|
)
|
|
@api_view(['POST'])
|
|
@permission_classes([IsAuthenticated & (IsSuperUser | IsSameOrganizationDeveloper)])
|
|
def auditar_pedimento_endpoint(request):
|
|
"""
|
|
Audita un pedimento específico verificando si existe su XML y extrayendo información.
|
|
"""
|
|
pedimento_id = request.data.get('pedimento_id')
|
|
|
|
if not pedimento_id:
|
|
return Response(
|
|
{'error': 'Debe proporcionar pedimento_id'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
try:
|
|
# Validar permisos y existencia del pedimento
|
|
pedimento = Pedimento.objects.get(id=pedimento_id)
|
|
user = request.user
|
|
|
|
if not user.is_superuser and str(pedimento.organizacion.id) != str(user.organizacion.id):
|
|
return Response(
|
|
{'error': 'No tiene permisos para este pedimento'},
|
|
status=status.HTTP_403_FORBIDDEN
|
|
)
|
|
|
|
# Buscar documentos XML del pedimento
|
|
documentos_xml = Document.objects.filter(
|
|
pedimento=pedimento,
|
|
archivo__endswith='.xml',
|
|
organizacion=pedimento.organizacion
|
|
)
|
|
|
|
if not documentos_xml.exists():
|
|
return Response({
|
|
'pedimento_id': str(pedimento_id),
|
|
'pedimento': pedimento.pedimento,
|
|
'pedimento_app': pedimento.pedimento_app,
|
|
'archivos_xml_encontrados': 0,
|
|
'mensaje': 'No se encontraron archivos XML para este pedimento',
|
|
'auditoria_completa': False
|
|
}, status=status.HTTP_200_OK)
|
|
|
|
# Lista para almacenar información de cada XML
|
|
xmls_analizados = []
|
|
informacion_extraida = []
|
|
|
|
for documento in documentos_xml:
|
|
try:
|
|
xml_info = {
|
|
'documento_id': str(documento.id),
|
|
'nombre_archivo': os.path.basename(documento.archivo.name),
|
|
'tamanio': documento.size,
|
|
'extension': documento.extension,
|
|
'tipo_documento': documento.document_type.descripcion if documento.document_type else 'Desconocido'
|
|
}
|
|
|
|
# Intentar extraer información del XML
|
|
try:
|
|
with open(documento.archivo.path, 'r', encoding='utf-8') as xml_file:
|
|
xml_content = xml_file.read()
|
|
|
|
# Extraer información específica del XML
|
|
info_pedimento = extraer_info_pedimento_xml(xml_content)
|
|
|
|
if info_pedimento:
|
|
xml_info['informacion_extraida'] = info_pedimento
|
|
informacion_extraida.append(info_pedimento)
|
|
|
|
# Actualizar el pedimento con la información encontrada si es necesario
|
|
actualizar_info_pedimento(pedimento, info_pedimento)
|
|
|
|
except Exception as e:
|
|
xml_info['error_lectura'] = str(e)
|
|
|
|
xmls_analizados.append(xml_info)
|
|
|
|
except Exception as e:
|
|
xmls_analizados.append({
|
|
'documento_id': str(documento.id),
|
|
'nombre_archivo': os.path.basename(documento.archivo.name),
|
|
'error': f'Error procesando archivo: {str(e)}'
|
|
})
|
|
|
|
# Ejecutar la tarea de auditoría completa
|
|
task = auditar_pedimento_por_id.delay(pedimento_id)
|
|
|
|
response_data = {
|
|
'pedimento_id': str(pedimento_id),
|
|
'pedimento': pedimento.pedimento,
|
|
'pedimento_app': pedimento.pedimento_app,
|
|
'archivos_xml_encontrados': len(xmls_analizados),
|
|
'xmls_analizados': xmls_analizados,
|
|
'informacion_extraida': informacion_extraida,
|
|
'auditoria_completa': True,
|
|
'task_id': task.id,
|
|
'mensaje': f'Auditoría completada para el pedimento {pedimento.pedimento}'
|
|
}
|
|
|
|
return Response(response_data, status=status.HTTP_200_OK)
|
|
|
|
except Pedimento.DoesNotExist:
|
|
return Response(
|
|
{'error': 'Pedimento no encontrado'},
|
|
status=status.HTTP_404_NOT_FOUND
|
|
)
|
|
except Exception as e:
|
|
return Response(
|
|
{'error': f'Error en la auditoría: {str(e)}'},
|
|
status=status.HTTP_500_INTERNAL_SERVER_ERROR
|
|
)
|
|
|
|
def actualizar_info_pedimento(pedimento, info_xml):
|
|
"""
|
|
Actualiza la información del pedimento con los datos extraídos del XML.
|
|
"""
|
|
try:
|
|
actualizado = False
|
|
|
|
# Actualizar información del pedimento si está en el XML y no está ya llena
|
|
if 'numero_operacion' in info_xml and not pedimento.numero_operacion:
|
|
pedimento.numero_operacion = info_xml['numero_operacion']
|
|
actualizado = True
|
|
|
|
# Número de partidas
|
|
if 'numero_partidas' in info_xml and not pedimento.numero_partidas:
|
|
pedimento.numero_partidas = info_xml['numero_partidas']
|
|
actualizado = True
|
|
|
|
# Clave del pedimento
|
|
if 'clave_pedimento' in info_xml and not pedimento.clave_pedimento:
|
|
pedimento.clave_pedimento = info_xml['clave_pedimento']
|
|
actualizado = True
|
|
|
|
# Aduana (patente)
|
|
if 'aduana_clave' in info_xml and not pedimento.aduana:
|
|
pedimento.aduana = info_xml['aduana_clave']
|
|
actualizado = True
|
|
|
|
# RFC Agente Aduanal
|
|
if 'rfc_agente_aduanal' in info_xml and not pedimento.agente_aduanal:
|
|
pedimento.agente_aduanal = info_xml['rfc_agente_aduanal']
|
|
actualizado = True
|
|
|
|
# CURP Apoderado
|
|
if 'curp_apoderado' in info_xml and not pedimento.curp_apoderado:
|
|
pedimento.curp_apoderado = info_xml['curp_apoderado']
|
|
actualizado = True
|
|
|
|
# Fecha de pago
|
|
if 'fecha_pago' in info_xml and not pedimento.fecha_pago:
|
|
try:
|
|
# Convertir formato de fecha (ej: "2024-02-15-06:00")
|
|
fecha_str = info_xml['fecha_pago']
|
|
# Extraer solo la parte de la fecha (antes del primer '-')
|
|
fecha_parts = fecha_str.split('-')
|
|
if len(fecha_parts) >= 3:
|
|
fecha_simple = f"{fecha_parts[0]}-{fecha_parts[1]}-{fecha_parts[2]}"
|
|
from datetime import datetime
|
|
fecha_obj = datetime.strptime(fecha_simple, '%Y-%m-%d').date()
|
|
pedimento.fecha_pago = fecha_obj
|
|
actualizado = True
|
|
except (ValueError, TypeError, IndexError):
|
|
pass
|
|
|
|
# Importe total (valor en dólares)
|
|
if 'valor_dolares' in info_xml and not pedimento.importe_total:
|
|
try:
|
|
pedimento.importe_total = float(info_xml['valor_dolares'])
|
|
actualizado = True
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
if 'contribuyente_rfc' in info_xml and not pedimento.contribuyente:
|
|
try:
|
|
# Buscar o crear el importador
|
|
from api.customs.models import Importador
|
|
importador, created = Importador.objects.get_or_create(
|
|
rfc=info_xml['contribuyente_rfc'],
|
|
organizacion=pedimento.organizacion,
|
|
defaults={'nombre': info_xml.get('contribuyente_nombre', '')}
|
|
)
|
|
pedimento.contribuyente = importador
|
|
actualizado = True
|
|
except Exception:
|
|
pass
|
|
|
|
if 'tipo_operacion' in info_xml and not pedimento.tipo_operacion:
|
|
try:
|
|
from api.customs.models import TipoOperacion
|
|
tipo_op_obj, created = TipoOperacion.objects.get_or_create(
|
|
tipo=info_xml['tipo_operacion'],
|
|
defaults={'descripcion': info_xml['tipo_operacion_descripcion'][:100]} # Limitar a 100 caracteres
|
|
)
|
|
pedimento.tipo_operacion = tipo_op_obj
|
|
actualizado = True
|
|
except Exception:
|
|
pass
|
|
|
|
if actualizado:
|
|
pedimento.save()
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception:
|
|
return False |