Files
backend/api/customs/views.py
2026-03-06 12:48:51 -07:00

3186 lines
155 KiB
Python

from config.settings import SERVICE_API_URL
from django.shortcuts import render
from rest_framework import viewsets
from rest_framework.permissions import IsAuthenticated
from rest_framework.pagination import PageNumberPagination
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.exceptions import PermissionDenied
from rest_framework import status
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework.filters import SearchFilter, OrderingFilter
from core.permissions import (
IsSameOrganization,
IsSameOrganizationDeveloper,
IsSameOrganizationAndAdmin,
IsSuperUser
)
from api.customs.models import (
Pedimento,
TipoOperacion,
ProcesamientoPedimento,
EDocument,
Cove,
Importador,
Partida,
)
from api.customs.serializers import (
PedimentoSerializer,
TipoOperacionSerializer,
ProcesamientoPedimentoSerializer,
EDocumentSerializer,
CoveSerializer,
ImportadorSerializer,
PartidaSerializer
)
from api.logger.mixins import LoggingMixin
from mixins.filtrado_organizacion import OrganizacionFiltradaMixin, ProcesosPorOrganizacionMixin
import requests
import os
import re
import zipfile
import tempfile
import shutil
import subprocess
from datetime import date, datetime
from django.core.files.base import ContentFile
from django.db import transaction
from rest_framework.parsers import MultiPartParser, FormParser
from api.record.models import Document, DocumentType, Fuente
from unicodedata import normalize
from datetime import datetime
from django.utils import timezone
# Importar rarfile de manera opcional
try:
import rarfile
RAR_SUPPORT = True
except ImportError:
RAR_SUPPORT = False
# Importar tarea de procesamiento de pedimento (Celery)
from api.customs.tasks.microservice import procesar_pedimento_completo_individual
def get_available_extractors():
"""
Devuelve lista de extractores disponibles en orden de preferencia
"""
extractors = []
if RAR_SUPPORT:
extractors.append('rarfile')
# Verificar si 'unrar' está disponible
if shutil.which('unrar'):
extractors.append('unrar')
# Verificar si '7z' o '7za' están disponibles
if shutil.which('7z'):
extractors.append('7z')
elif shutil.which('7za'):
extractors.append('7za')
return extractors
def extract_rar_to_dir(rar_path, dest_dir):
"""
Extrae un archivo RAR a `dest_dir` usando varios mecanismos de respaldo:
1) intentará usar la librería `rarfile` si está disponible,
2) intentará ejecutar la utilidad `unrar` si está instalada en el sistema,
3) intentará ejecutar `7z`/`7za` (p7zip) si está instalada.
Lanza Exception con mensaje explicativo si falla.
"""
# Versión que primero verifica herramientas disponibles
available = get_available_extractors()
if not available:
raise Exception("No hay herramientas de extracción disponibles.")
print(f"Extractores disponibles (en orden de preferencia): {available}")
# Intento con rarfile primero si está disponible
# if RAR_SUPPORT:
if 'rarfile' in available and RAR_SUPPORT:
try:
# rarfile puede trabajar con rutas en disco mejor que con file-like
with rarfile.RarFile(rar_path) as rf:
rf.extractall(dest_dir)
try:
if os.path.exists(rar_path):
os.remove(rar_path)
print(f"Archivo original eliminado: {rar_path}")
except OSError as remove_error:
print(f"Advertencia: No se pudo eliminar '{rar_path}': {remove_error}")
return
except Exception as e:
# Si rarfile falla (por ejemplo RarCannotExec), seguimos con herramientas externas
# Hacer log para depuración
print(f"rarfile extraction failed, will try external tools: {e}")
# Intento con comandos externos
# Probar 'unrar' primero
external_cmds = [
['unrar', 'x', '-o+', rar_path, dest_dir],
['7z', 'x', rar_path, f'-o{dest_dir}', '-y'],
['7za', 'x', rar_path, f'-o{dest_dir}', '-y']
]
# for cmd in external_cmds:
# try:
# subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# try:
# if os.path.exists(rar_path):
# os.remove(rar_path)
# print(f"Archivo original eliminado: {rar_path}")
# except OSError as remove_error:
# print(f"Advertencia: No se pudo eliminar '{rar_path}': {remove_error}")
# return
# except FileNotFoundError:
# # El ejecutable no existe en PATH, intentar siguiente
# continue
# except subprocess.CalledProcessError as e:
# # El comando falló en la extracción; intentar siguiente
# print(f"External extractor failed ({cmd[0]}): {e}")
# continue
for extractor_name in available:
if extractor_name in external_cmds:
cmd = external_cmds[extractor_name]
try:
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
if os.path.exists(rar_path):
os.remove(rar_path)
print(f"Archivo original eliminado: {rar_path}")
except OSError as remove_error:
print(f"Advertencia: No se pudo eliminar '{rar_path}': {remove_error}")
return
except FileNotFoundError:
# El ejecutable no existe en PATH, intentar siguiente
continue
except subprocess.CalledProcessError as e:
# El comando falló en la extracción; intentar siguiente
print(f"External extractor {extractor_name} failed ({cmd[0]}): {e}")
continue
# Si llegamos aquí, ningún método funcionó
raise Exception("No se encontró una herramienta válida para extraer RAR (rarfile sin backend, 'unrar' o '7z' no disponibles o extracción fallida). Instale 'unrar' o 'p7zip' y asegúrese de que estén en PATH, o configure rarfile con un backend.")
from .tasks.microservice_v2 import *
from .tasks.auditoria import crear_partidas_por_pedimento
class CustomPagination(PageNumberPagination):
"""
Paginación personalizada con parámetros flexibles
- Si no se especifica page_size, devuelve todos los resultados (sin paginación)
- Si se especifica page_size, usa paginación normal
"""
page_size = None # Sin paginación por defecto
page_size_query_param = 'page_size'
max_page_size = 10000 # Límite máximo de seguridad
page_query_param = 'page'
def paginate_queryset(self, queryset, request, view=None):
"""
Si no se especifica page_size en los parámetros, devolver None (sin paginación)
Si se especifica, usar paginación normal
"""
# Verificar si se especificó page_size en la query
if self.page_size_query_param not in request.query_params:
# No hay page_size, devolver None para indicar "sin paginación"
return None
# Hay page_size, usar paginación normal
try:
page_size = int(request.query_params[self.page_size_query_param])
if page_size <= 0:
return None
# Establecer el page_size temporalmente para esta request
self.page_size = min(page_size, self.max_page_size)
except (ValueError, TypeError):
return None
return super().paginate_queryset(queryset, request, view)
class PedimentoPagination(PageNumberPagination):
"""
Paginación personalizada con parámetros flexibles
- Si no se especifica page_size, devuelve todos los resultados (sin paginación)
- Si se especifica page_size, usa paginación normal
"""
page_size = None # Sin paginación por defecto
page_size_query_param = 'page_size'
max_page_size = 1000 # Límite máximo de seguridad
page_query_param = 'page'
def paginate_queryset(self, queryset, request, view=None):
"""
Si no se especifica page_size en los parámetros, devolver None (sin paginación)
Si se especifica, usar paginación normal
"""
# Verificar si se especificó page_size en la query
if self.page_size_query_param not in request.query_params:
# No hay page_size, devolver None para indicar "sin paginación"
return None
# Hay page_size, usar paginación normal
try:
page_size = int(request.query_params[self.page_size_query_param])
if page_size <= 0:
return None
# Establecer el page_size temporalmente para esta request
self.page_size = min(page_size, self.max_page_size)
except (ValueError, TypeError):
return None
return super().paginate_queryset(queryset, request, view)
# Create your views here.
class ViewSetPedimento(LoggingMixin, viewsets.ModelViewSet, OrganizacionFiltradaMixin): # Pendiente de permisos de creacion
"""
ViewSet for Pedimento model.
Soporta paginación, filtros y búsqueda.
Parámetros disponibles:
- page: Número de página (solo si se especifica page_size)
- page_size: Elementos por página (si NO se especifica, devuelve TODOS los resultados)
- search: Búsqueda en pedimento, contribuyente, agente_aduanal
- pedimento: Filtro por número de pedimento
- existe_expediente: Filtro por expediente (True/False)
- contribuyente: Filtro por contribuyente
- curp_apoderado: Filtro por curp del apoderado
- fecha_pago: Filtro por fecha de pago (YYYY-MM-DD)
- patente: Filtro por patente
- aduana: Filtro por aduana
- tipo_operacion: Filtro por tipo de operación
- clave_pedimento: Filtro por clave de pedimento
- ordering: Ordenar por campo (ej: -created_at, pedimento)
Ejemplos:
- /pedimentos/ → Devuelve TODOS los pedimentos
- /pedimentos/?page_size=10 → Devuelve los primeros 10
- /pedimentos/?page_size=10&page=2 → Devuelve los pedimentos 11-20
- /pedimentos/?pedimento=12345678 → Filtra por número de pedimento
- /pedimentos/?existe_expediente=true → Filtra por expediente existente
- /pedimentos/?contribuyente=EMPRESA → Filtra por contribuyente
- /pedimentos/?curp_apoderado=XXXX → Filtra por curp apoderado
- /pedimentos/?fecha_pago=2025-07-18 → Filtra por fecha de pago
"""
permission_classes = [IsAuthenticated & (IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
serializer_class = PedimentoSerializer
pagination_class = PedimentoPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
model = Pedimento
filterset_fields = ['patente', 'aduana', 'tipo_operacion', 'clave_pedimento', 'pedimento', 'existe_expediente', 'contribuyente', 'curp_apoderado', 'fecha_pago', 'pedimento_app']
search_fields = ['pedimento', 'pedimento_app', 'agente_aduanal', 'clave_pedimento']
# AGREGAR ESTOS CAMPOS PARA ORDENACIÓN
ordering_fields = ['created_at', 'pedimento', 'fecha_pago', 'aduana', 'patente']
ordering = ['-created_at'] # Orden descendente por fecha de creación por defecto
def get_queryset(self):
queryset = self.get_queryset_filtrado_por_organizacion() # Tambien filtra por importador
# pedimento_app_filter = self.request.GET.get('pedimento_app', None)
# if pedimento_app_filter:
# print(f"Filtro por pedimento_app: {pedimento_app_filter}")
# queryset = queryset.filter(pedimento_app__icontains=pedimento_app_filter)
return queryset
def perform_create(self, serializer):
"""
Asigna automáticamente la organización del usuario autenticado al crear un pedimento.
"""
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("Usuario no autenticado o sin organización")
data = serializer.validated_data
if not data.get('pedimento_app'):
fecha_pago = data.get('fecha_pago')
aduana = data.get('aduana')
patente = data.get('patente')
pedimento = data.get('pedimento')
if fecha_pago and aduana and patente and pedimento:
pedimento_app = f"{str(fecha_pago.year)[-2:]}-{str(aduana).zfill(2)[-2:]}-{str(patente).zfill(4)[-4:]}-{str(pedimento).zfill(7)[-7:]}"
serializer.save(organizacion=self.request.user.organizacion, pedimento_app=pedimento_app)
try:
# Usar el nombre del servicio de Docker Compose en lugar de localhost
response = procesar_pedimento_completo_individual(serializer.instance.id)
# Verificar si la respuesta fue exitosa
if response.status_code == 200:
print(f"Servicio FastAPI ejecutado exitosamente: {response.status_code}")
print(f"Respuesta: {response.json()}")
elif response.status_code == 201:
print(f"Recurso creado exitosamente en FastAPI: {response.status_code}")
print(f"Respuesta: {response.json()}")
else:
print(f"Servicio FastAPI respondió con error: {response.status_code}")
print(f"Respuesta: {response.text}")
except requests.exceptions.ConnectionError as e:
print(f"No se pudo conectar al servicio FastAPI: {e}")
print(f"Verifica que el servicio FastAPI esté corriendo en {SERVICE_API_URL}")
except requests.exceptions.Timeout as e:
print(f"Timeout al conectar con el servicio FastAPI: {e}")
except requests.exceptions.RequestException as e:
print(f"Error de request al servicio FastAPI: {e}")
except Exception as e:
print(f"Error inesperado al llamar al servicio FastAPI: {e}")
def perform_update(self, serializer):
"""
Ejecuta acciones después de actualizar un pedimento basado en los campos modificados.
"""
# Obtener los campos que se están actualizando
updated_fields = set(serializer.validated_data.keys())
# Guardar los cambios
pedimento = serializer.save()
# Si se actualizó el campo existe_expediente, procesar el pedimento completo
if 'existe_expediente' in updated_fields:
# Iniciar todas las tareas
procesar_remesas_pedimento(pedimento.id)
crear_partidas_por_pedimento(pedimento.id)
procesar_acuse_coves_pedimento(pedimento.id)
procesar_edocs_pedimento(pedimento.id)
procesar_acuses_pedimento(pedimento.id)
procesar_partidas_pedimento(pedimento.id)
procesar_coves_pedimento(pedimento.id)
# Agregar mensaje de tareas iniciadas al serializer
serializer._data = {
**serializer.data,
"message": "Tareas de procesamiento iniciadas",
"tasks": [
"Procesamiento de remesas",
"Creación de partidas",
"Procesamiento de acuses de COVEs",
"Procesamiento de E-documents",
"Procesamiento de acuses",
"Procesamiento de partidas",
"Procesamiento de COVEs"
]
}
@action(detail=True, methods=['post'], url_path='procesar-completo')
def procesar_completo(self, request, pk=None):
"""
Acción para disparar el procesamiento completo de un pedimento existente.
Dispara la tarea `procesar_pedimento_completo_individual` de forma asíncrona
y devuelve el `task_id`.
"""
pedimento = self.get_object()
try:
# Usar el nombre del servicio de Docker Compose en lugar de localhost
task = procesar_pedimento_completo_individual.delay(pedimento.id, pedimento.organizacion.id)
# Verificar si la respuesta fue exitosa
if task.id:
return Response({"status": "Recurso creado exitosamente en API", "task_id": task.id}, status=status.HTTP_202_ACCEPTED)
else:
return Response({"status": "Servicio API respondió con error", "task_id": 0}, status=status.HTTP_202_ACCEPTED)
except Exception as e:
return Response({"error": f"Error inesperado al llamar al servicio API: {str(e)}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@action(detail=False, methods=['post'], url_path='bulk-delete')
def bulk_delete(self, request):
"""
Endpoint para eliminar múltiples pedimentos de manera masiva.
Payload esperado:
{
"ids": ["uuid1", "uuid2", "uuid3", ...]
}
Respuesta exitosa:
{
"message": "Pedimentos eliminados exitosamente",
"deleted_count": 3,
"deleted_ids": ["uuid1", "uuid2", "uuid3"]
}
Respuesta con errores:
{
"message": "Algunos pedimentos no pudieron ser eliminados",
"deleted_count": 2,
"deleted_ids": ["uuid1", "uuid2"],
"failed_ids": ["uuid3"],
"errors": ["No se encontró el pedimento con ID uuid3"]
}
"""
# Obtener los IDs del payload
ids = request.data.get('ids', [])
if not ids:
return Response(
{"error": "Se requiere una lista de IDs para eliminar"},
status=status.HTTP_400_BAD_REQUEST
)
if not isinstance(ids, list):
return Response(
{"error": "El campo 'ids' debe ser una lista"},
status=status.HTTP_400_BAD_REQUEST
)
# Obtener el queryset filtrado por organización
queryset = self.get_queryset()
# Filtrar solo los pedimentos que existen y pertenecen a la organización del usuario
existing_pedimentos = queryset.filter(id__in=ids)
existing_ids = list(existing_pedimentos.values_list('id', flat=True))
# Convertir UUIDs a strings para comparación
existing_ids_str = [str(id) for id in existing_ids]
requested_ids_str = [str(id) for id in ids]
# Identificar IDs que no existen o no pertenecen a la organización
failed_ids = [id for id in requested_ids_str if id not in existing_ids_str]
deleted_count = 0
errors = []
if existing_pedimentos.exists():
try:
# Eliminar los pedimentos encontrados
deleted_count = existing_pedimentos.count()
existing_pedimentos.delete()
except Exception as e:
return Response(
{"error": f"Error al eliminar pedimentos: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
# Agregar errores para IDs no encontrados
if failed_ids:
errors = [f"No se encontró el pedimento con ID {id} o no pertenece a su organización" for id in failed_ids]
# Preparar respuesta
response_data = {
"deleted_count": deleted_count,
"deleted_ids": existing_ids_str
}
if failed_ids:
response_data.update({
"message": "Algunos pedimentos no pudieron ser eliminados",
"failed_ids": failed_ids,
"errors": errors
})
response_status = status.HTTP_207_MULTI_STATUS
else:
response_data["message"] = "Pedimentos eliminados exitosamente"
response_status = status.HTTP_200_OK
return Response(response_data, status=response_status)
@action(detail=False, methods=['post'], url_path='bulk-create', parser_classes=[MultiPartParser, FormParser])
def bulk_create(self, request):
"""
Endpoint para crear múltiples pedimentos de manera masiva desde archivos.
FormData esperado:
- contribuyente: string (nombre del contribuyente)
- archivos: files (pueden ser múltiples archivos: zip, rar o individuales)
Nomenclatura esperada de archivos: anio-aduana-patente-pedimento
- anio: 2 dígitos (ej: 24)
- aduana: 2 o 3 dígitos (ej: 01, 123)
- patente: 4 dígitos (ej: 3420)
- pedimento: 7 dígitos (ej: 1234567)
Ejemplo: 24-01-3420-1234567
Nota: Cada archivo ZIP/RAR se procesa independientemente en su propio subdirectorio.
Respuesta exitosa:
{
"message": "Pedimentos creados exitosamente",
"created_count": 5,
"created_pedimentos": [...],
"documents_created": 15,
"processed_files": 3,
"summary": "Procesados 3 archivo(s): 5 pedimento(s) creado(s), 15 documento(s) asociado(s)",
"failed_files": [],
"errors": [],
"already_existing": [] # Nuevo campo para pedimentos que ya existían
}
"""
print(request.data)
# Validar datos requeridos
contribuyente = request.data.get('contribuyente')
archivos = request.FILES.getlist('archivos')
if not contribuyente:
return Response(
{"error": "Se requiere el campo 'contribuyente'"},
status=status.HTTP_400_BAD_REQUEST
)
if not archivos:
return Response(
{"error": "Se requiere al menos un archivo"},
status=status.HTTP_400_BAD_REQUEST
)
# Validar organización del usuario
if not request.user.is_authenticated or not hasattr(request.user, 'organizacion'):
return Response(
{"error": "Usuario no autenticado o sin organización"},
status=status.HTTP_400_BAD_REQUEST
)
organizacion = request.user.organizacion
# Regex para validar nomenclatura: anio-aduana-patente-pedimento
nomenclatura_pattern = re.compile(r'^(\d{2})-(\d{2,3})-(\d{4})-(\d{7})$')
nomenclatura_pattern_sin_anio = re.compile(r'^(\d{2,3})-(\d{4})-(\d{7})$')
created_pedimentos = []
already_existing_pedimentos = [] # Para trackear pedimentos que ya existen
failed_files = []
errors = []
documents_created = 0
temp_dir = None
# Obtener DocumentType ANTES de la transacción atómica
print("Intentando obtener o crear DocumentType...")
try:
# Primero intentar obtener si ya existe
try:
document_type = DocumentType.objects.get(nombre="Pedimento")
print(f"DocumentType obtenido existente: {document_type.nombre} (ID: {document_type.id})")
except DocumentType.DoesNotExist:
# Si no existe, crear uno nuevo
document_type = DocumentType.objects.create(
nombre="Pedimento",
descripcion="Documento de pedimento"
)
print(f"DocumentType creado nuevo: {document_type.nombre} (ID: {document_type.id})")
except Exception as e:
print(f"Error al obtener/crear DocumentType: {str(e)}")
# Como fallback, intentar obtener cualquier DocumentType existente
try:
document_type = DocumentType.objects.first()
if document_type:
print(f"Usando DocumentType existente como fallback: {document_type.nombre} (ID: {document_type.id})")
else:
print("No hay DocumentType disponible")
return Response(
{"error": "No se pudo configurar el tipo de documento y no hay tipos existentes"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
except Exception as fallback_error:
print(f"Error en fallback: {str(fallback_error)}")
return Response(
{"error": f"Error crítico al configurar tipo de documento: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
try:
print("Iniciando transacción atómica...")
with transaction.atomic():
# Crear directorio temporal
temp_dir = tempfile.mkdtemp()
print(f"Directorio temporal creado: {temp_dir}")
# Procesar cada archivo enviado
for idx, archivo in enumerate(archivos):
archivo_name = archivo.name.lower()
print(f"Procesando archivo {idx + 1}/{len(archivos)}: {archivo_name}")
# Extraer nombre base sin extensión para validación
archivo_name_sin_extension = os.path.splitext(archivo.name)[0]
# Validar nomenclatura del nombre del archivo/folder
match = nomenclatura_pattern.match(archivo_name_sin_extension)
match_sin_anio = nomenclatura_pattern_sin_anio.match(archivo_name_sin_extension)
if not match and not match_sin_anio:
print(f"Nomenclatura inválida en nombre de archivo: {archivo_name_sin_extension}")
failed_files.append({
"archivo_original": archivo.name,
"error": f"Nomenclatura inválida: {archivo_name_sin_extension}. Esperado: anio-aduana-patente-pedimento"
})
continue
# Extraer información del pedimento desde el nombre del archivo
if match:
anio, aduana, patente, pedimento_num = match.groups()
print(f"Extraído del nombre del archivo - Año: {anio}, Aduana: {aduana}, Patente: {patente}, Pedimento: {pedimento_num}")
try:
# Convertir año de 2 dígitos a 4 dígitos
anio_completo = 2000 + int(anio) if int(anio) < 50 else 1900 + int(anio)
fecha_pago = datetime(anio_completo, 1, 1).date()
print(f"Fecha de pago calculada: {fecha_pago}")
except ValueError:
failed_files.append({
"archivo_original": archivo.name,
"error": f"Año inválido: {anio}"
})
continue
elif match_sin_anio:
aduana, patente, pedimento_num = match_sin_anio.groups()
print(f"Extraído del nombre del archivo - Aduana: {aduana}, Patente: {patente}, Pedimento: {pedimento_num}")
# Obtener el primer dígito del pedimento
primer_digito_pedimento = int(pedimento_num[0]) if pedimento_num else 0
# Usar año actual para fecha_pago y ajustar según el dígito del pedimento
año_actual = datetime.now().year
# Crear año con el dígito del pedimento (reemplazando el último dígito)
año_con_digito = int(str(año_actual)[:-1] + str(primer_digito_pedimento))
# Aplicar lógica de comparación
if año_con_digito <= año_actual:
año_final = año_con_digito
else:
año_final = año_con_digito - 10
# Tomar los últimos 2 dígitos del año final
anio = año_final % 100
# Crear fecha de pago (primer día del año)
fecha_pago = datetime(año_final, 1, 1).date()
print(f"Fecha de pago (año actual) calculada: {fecha_pago}")
# Generar pedimento_app
pedimento_app = f"{anio}-{aduana.zfill(2)}-{patente}-{pedimento_num}"
print(f"Pedimento_app generado: {pedimento_app}")
# VERIFICAR SI EL PEDIMENTO YA EXISTE ANTES DE PROCESAR EL ARCHIVO
print(f"Buscando pedimento existente con pedimento_app: {pedimento_app} y organización ID: {organizacion.id}")
existing_pedimento = Pedimento.objects.filter(
pedimento_app=pedimento_app,
organizacion=organizacion
).first()
if existing_pedimento:
print(f"⚠️ Pedimento ya existe: ID {existing_pedimento.id}, pedimento_app: {pedimento_app}")
already_existing_pedimentos.append({
"id": str(existing_pedimento.id),
"pedimento_app": pedimento_app,
"contribuyente": existing_pedimento.contribuyente.rfc if existing_pedimento.contribuyente else None,
"archivo_original": archivo.name
})
# NO procesamos este archivo, pasamos al siguiente
continue
# Si el pedimento no existe, continuar con el procesamiento normal
print("📝 Pedimento no existe, continuando con procesamiento...")
# Crear subdirectorio para cada archivo usando el nombre del archivo sin extensión
sub_dir = os.path.join(temp_dir, archivo_name_sin_extension)
os.makedirs(sub_dir, exist_ok=True)
print(f"Subdirectorio creado: {sub_dir}")
if archivo_name.endswith('.zip'):
# Manejar archivo ZIP
print("Es un archivo ZIP")
try:
with zipfile.ZipFile(archivo, 'r') as zip_ref:
zip_ref.extractall(sub_dir)
print("Archivo ZIP extraído exitosamente")
except zipfile.BadZipFile as e:
failed_files.append({
"archivo_original": archivo.name,
"error": f"Archivo ZIP corrupto o inválido: {str(e)}"
})
continue
except Exception as e:
failed_files.append({
"archivo_original": archivo.name,
"error": f"Error al extraer ZIP: {str(e)}"
})
continue
elif archivo_name.endswith('.rar'):
# Manejar archivo RAR: guardar el archivo en disco y usar helper con fallbacks
archivo_temp_path = os.path.join(sub_dir, archivo.name)
with open(archivo_temp_path, 'wb') as f:
for chunk in archivo.chunks():
f.write(chunk)
try:
extract_rar_to_dir(archivo_temp_path, sub_dir)
print(f"Archivo RAR {archivo.name} extraído en {sub_dir}")
except Exception as e:
error_msg = str(e)
help_msg = "Instale 'unrar' o 'p7zip' (7z) y asegúrese de que estén en PATH, o instale y configure 'rarfile' con un backend."
failed_files.append({
"archivo_original": archivo.name,
"error": f"Error al extraer archivo RAR: {error_msg}"
})
continue
else:
# Asumir que es un archivo individual
archivo_path = os.path.join(sub_dir, archivo.name)
with open(archivo_path, 'wb') as f:
for chunk in archivo.chunks():
f.write(chunk)
print(f"Archivo individual {archivo.name} guardado en sub_dir:", archivo_path)
# Ahora crear el pedimento (ya verificamos que no existe)
try:
print("🔄 Iniciando creación de pedimento...")
# Obtener o crear el importador
print(f"🏢 Buscando/creando importador con RFC: {contribuyente}")
importador, created = Importador.objects.get_or_create(
rfc=contribuyente,
defaults={
'nombre': f"Importador {contribuyente}",
'organizacion': organizacion
}
)
if created:
print(f"✅ Importador creado: {importador.rfc} - {importador.nombre}")
else:
print(f"♻️ Importador existente: {importador.rfc} - {importador.nombre}")
pedimento = Pedimento.objects.create(
organizacion=organizacion,
contribuyente=importador,
# pedimento=int(pedimento_num),
pedimento=pedimento_num,
aduana=aduana,
# aduana=int(aduana),
# patente=int(patente),
patente=patente,
fecha_pago=fecha_pago,
pedimento_app=pedimento_app,
agente_aduanal=f"Agente {patente}", # Valor por defecto
clave_pedimento="A1" # Valor por defecto
)
print(f"✅ Pedimento creado exitosamente: ID {pedimento.id}, pedimento_app: {pedimento_app}")
created_pedimentos.append({
"id": str(pedimento.id),
"pedimento_app": pedimento_app,
"contribuyente": importador.rfc,
"contribuyente_nombre": importador.nombre,
"archivo_original": archivo.name
})
except Exception as e:
print(f"❌ Error al crear pedimento: {str(e)}")
failed_files.append({
"archivo_original": archivo.name,
"error": f"Error al crear pedimento: {str(e)}"
})
continue
# Procesar documentos dentro del directorio
print("Procesando documentos del directorio...")
for root, dirs, files in os.walk(sub_dir):
for file_name in files:
file_path = os.path.join(root, file_name)
print(f"Procesando documento: {file_name}")
try:
# Leer el archivo desde el directorio temporal
with open(file_path, 'rb') as f:
file_content = f.read()
from api.utils.helpers import extraer_info_pedimento_xml
# Extraer info del pedimento desde XML si es aplicable
if file_name.lower().endswith('.xml'):
try:
xml_info = extraer_info_pedimento_xml(file_content)
if xml_info:
if 'numero_operacion' in xml_info:
if 'numero_pedimento' in xml_info:
if xml_info['numero_pedimento'] == str(pedimento.pedimento):
Pedimento.objects.filter(id=pedimento.id).update(
aduana=xml_info.get('aduana_clave', pedimento.aduana)
)
print(f"Información extraída del XML: {xml_info}")
except Exception as e:
print(f"No se pudo extraer información del XML {file_name}: {str(e)}")
# Obtener información del archivo
extension = os.path.splitext(file_name)[1].lower().lstrip('.')
# Buscar si ya existe un documento con el mismo nombre para este pedimento
existing_documents = Document.objects.filter(
pedimento_id=pedimento.id,
organizacion=organizacion
)
existing_document = None
for doc in existing_documents:
if is_same_document(doc, file_name):
existing_document = doc
print(f"✅ Encontrado documento existente: ID {doc.id}")
break
# Crear ContentFile
django_file = ContentFile(file_content, name=file_name)
if existing_document:
# Opcional: Eliminar el archivo físico anterior
try:
if existing_document.archivo and os.path.exists(existing_document.archivo.path):
os.remove(existing_document.archivo.path)
except (ValueError, OSError) as e:
print(f"No se pudo eliminar archivo físico anterior: {str(e)}")
# Actualizar el documento existente
existing_document.archivo = django_file
existing_document.size = len(file_content)
existing_document.extension = extension
existing_document.updated_at = timezone.now() # Si tienes este campo
existing_document.save()
documents_created += 1
print(f"📄 Documento actualizado: {file_name}")
else:
# Crear nuevo documento
document = Document.objects.create(
organizacion=organizacion,
pedimento_id=pedimento.id,
document_type=document_type,
fuente_id=4,
archivo=django_file,
size=len(file_content),
extension=extension
)
documents_created += 1
print(f"📄 Nuevo documento creado: {file_name}")
except Exception as e:
print(f"❌ Error al procesar documento {file_name}: {str(e)}")
# Continuar con otros documentos
print(f"🏁 Procesamiento completado. Archivos procesados en este directorio.")
except Exception as e:
return Response(
{"error": f"Error durante el procesamiento: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
finally:
# Limpiar directorio temporal
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
# Preparar respuesta
response_data = {
"created_count": len(created_pedimentos),
"created_pedimentos": created_pedimentos,
"already_existing_count": len(already_existing_pedimentos),
"already_existing": already_existing_pedimentos,
"documents_created": documents_created,
"failed_files": failed_files,
"processed_files": len(archivos),
"summary": f"Procesados {len(archivos)} archivo(s): {len(created_pedimentos)} pedimento(s) creado(s), {len(already_existing_pedimentos)} ya existían, {documents_created} documento(s) asociado(s)"
}
try:
# Determinar el mensaje apropiado
if already_existing_pedimentos and not created_pedimentos and not failed_files:
response_data["message"] = "Todos los pedimentos ya existen. No se crearon nuevos pedimentos."
response_status = status.HTTP_200_OK
elif already_existing_pedimentos or failed_files:
response_data.update({
"message": "Procesamiento completado con advertencias",
})
if failed_files:
response_data["errors"] = [item["error"] for item in failed_files]
response_status = status.HTTP_207_MULTI_STATUS
else:
response_data["message"] = "Pedimentos creados exitosamente"
response_status = status.HTTP_201_CREATED
except Exception as e:
return Response(
{"error": f"Error durante el procesamiento: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
return Response(response_data, status=response_status)
@action(detail=False, methods=['post'], url_path='bulk-create-pedimento_desk', parser_classes=[MultiPartParser, FormParser])
def bulk_create_pedimento_desk(self, request):
"""
Endpoint para crear múltiples pedimentos desde EFC APP Desk.
FormData esperado:
- contribuyente: string (nombre del contribuyente)
- archivos: files (pueden ser múltiples archivos: zip, rar o individuales)
Nomenclatura esperada de archivos: anio-aduana-patente-pedimento
- anio: 2 dígitos (ej: 24)
- aduana: 2 o 3 dígitos (ej: 01, 123)
- patente: 4 dígitos (ej: 3420)
- pedimento: 7 dígitos (ej: 1234567)
Ejemplo: 24-01-3420-1234567
Nota: Cada archivo ZIP/RAR se procesa independientemente en su propio subdirectorio.
Respuesta exitosa:
{
"message": "Pedimentos creados exitosamente",
"created_count": 5,
"created_pedimentos": [...],
"documents_created": 15,
"processed_files": 3,
"summary": "Procesados 3 archivo(s): 5 pedimento(s) creado(s), 15 documento(s) asociado(s)",
"failed_files": [],
"errors": []
}
"""
print(request.data)
# Validar datos requeridos
contribuyente = request.data.get('contribuyente')
fecha_pago_input = request.data.get('fecha_pago')
clave_pedimento_input = request.data.get('clave_pedimento')
patente_input = request.data.get('patente')
tipo_operacion_input = request.data.get('tipo_operacion')
aduana_input = request.data.get('aduana')
contribuyente_input = request.data.get('contribuyente')
curp_apoderado_input = request.data.get('curp_apoderado')
partidas_input = request.data.get('partidas')
fuente_archivos = request.data.get('partidas')
archivos = request.FILES.getlist('archivos')
# if not contribuyente:
# return Response(
# {"error": "Se requiere el campo 'contribuyente'"},
# status=status.HTTP_400_BAD_REQUEST
# )
if not archivos:
return Response(
{
"tieneError": True,
"error": "Se requiere al menos un archivo"
},
status=status.HTTP_400_BAD_REQUEST
)
# Validar organización del usuario
if not request.user.is_authenticated or not hasattr(request.user, 'organizacion'):
return Response(
{
"tieneError": True,
"error": "Usuario no autenticado o sin organización"
},
status=status.HTTP_400_BAD_REQUEST
)
organizacion = request.user.organizacion
# Regex para validar nomenclatura: anio-aduana-patente-pedimento
nomenclatura_pattern = re.compile(r'^(\d{2})-(\d{2,3})-(\d{4})-(\d{7})$')
nomenclatura_pattern_sin_anio = re.compile(r'^(\d{2,3})-(\d{4})-(\d{7})$')
created_pedimentos = []
failed_files = []
errors = []
documents_created = 0
temp_dir = None
# Obtener DocumentType ANTES de la transacción atómica
# print("Intentando obtener o crear DocumentType...")
try:
# Primero intentar obtener si ya existe
try:
document_type = DocumentType.objects.get(nombre="Pedimento")
# print(f"DocumentType obtenido existente: {document_type.nombre} (ID: {document_type.id})")
except DocumentType.DoesNotExist:
# Si no existe, crear uno nuevo
document_type = DocumentType.objects.create(
nombre="Pedimento",
descripcion="Documento de pedimento"
)
# print(f"DocumentType creado nuevo: {document_type.nombre} (ID: {document_type.id})")
except Exception as e:
# print(f"Error al obtener/crear DocumentType: {str(e)}")
# Como fallback, intentar obtener cualquier DocumentType existente
try:
# document_type = DocumentType.objects.first()
# if document_type:
# print(f"Usando DocumentType existente como fallback: {document_type.nombre} (ID: {document_type.id})")
# else:
# print("No hay DocumentType disponible")
# return Response(
# {"error": "No se pudo configurar el tipo de documento y no hay tipos existentes"},
# status=status.HTTP_500_INTERNAL_SERVER_ERROR
# )
document_type = DocumentType.objects.first()
if not document_type:
return Response(
{"error": "No se pudo configurar el tipo de documento y no hay tipos existentes"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
except Exception as fallback_error:
# print(f"Error en fallback: {str(fallback_error)}")
return Response(
{
"tieneError": True,
"error": f"Error crítico al configurar tipo de documento: {str(e)}"
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
try:
# print("Iniciando transacción atómica...")
with transaction.atomic():
# Crear directorio temporal
temp_dir = tempfile.mkdtemp()
# print(f"Directorio temporal creado: {temp_dir}")
# Procesar cada archivo enviado
for idx, archivo in enumerate(archivos):
archivo_name = archivo.name.lower()
# print(f"Procesando archivo {idx + 1}/{len(archivos)}: {archivo_name}")
# Crear subdirectorio para cada archivo usando el nombre del archivo sin extensión
archivo_name_sin_extension = os.path.splitext(archivo.name)[0]
sub_dir = os.path.join(temp_dir, archivo_name_sin_extension)
os.makedirs(sub_dir, exist_ok=True)
# print(f"Subdirectorio creado: {sub_dir}")
if archivo_name.endswith('.zip'):
# Manejar archivo ZIP
# print("Es un archivo ZIP")
try:
with zipfile.ZipFile(archivo, 'r') as zip_ref:
zip_ref.extractall(sub_dir)
# print("Archivo ZIP extraído exitosamente")
except zipfile.BadZipFile as e:
return Response(
{
"tieneError": True,
"error": f"Archivo ZIP corrupto o inválido: {archivo.name} - {str(e)}"
},
status=status.HTTP_400_BAD_REQUEST
)
except Exception as e:
return Response(
{
"tieneError": True,
"error": f"Error al extraer ZIP {archivo.name}: {str(e)}"
},
status=status.HTTP_400_BAD_REQUEST
)
elif archivo_name.endswith('.rar'):
# Manejar archivo RAR: guardar el archivo en disco y usar helper con fallbacks
# Guardar el archivo subido en un path temporal dentro del sub_dir
archivo_temp_path = os.path.join(sub_dir, archivo.name)
with open(archivo_temp_path, 'wb') as f:
for chunk in archivo.chunks():
f.write(chunk)
try:
extract_rar_to_dir(archivo_temp_path, sub_dir)
# print(f"Archivo RAR {archivo.name} extraído en {sub_dir}")
except Exception as e:
error_msg = str(e)
help_msg = "Instale 'unrar' o 'p7zip' (7z) y asegúrese de que estén en PATH, o instale y configure 'rarfile' con un backend."
return Response(
{
"tieneError": True,
"error": f"Error al extraer archivo RAR {archivo.name}: {error_msg}. {help_msg}"
},
status=status.HTTP_400_BAD_REQUEST
)
else:
# Asumir que es un archivo individual
# Crear el archivo en el subdirectorio
archivo_path = os.path.join(sub_dir, archivo.name)
with open(archivo_path, 'wb') as f:
for chunk in archivo.chunks():
f.write(chunk)
# print(f"Archivo individual {archivo.name} guardado en sub_dir:", archivo_path)
# Recorrer todos los archivos extraídos o el directorio
print("Iniciando recorrido de archivos...")
for root, dirs, files in os.walk(temp_dir):
# print(f"Revisando directorio: {root}")
# print(f"Archivos encontrados: {files}")
for file_name in files:
# print(f"Procesando archivo: {file_name}")
file_path = os.path.join(root, file_name)
# Obtener la ruta relativa para determinar la estructura de carpetas
relative_path = os.path.relpath(file_path, temp_dir)
# print(f"Ruta relativa: {relative_path}")
# Determinar si el archivo está en una carpeta que sigue la nomenclatura
folder_name = None
if os.path.dirname(relative_path):
# El archivo está dentro de una carpeta
folder_parts = relative_path.split(os.sep)
folder_name = folder_parts[0] # Primera carpeta (nombre del archivo ZIP/RAR sin extensión)
else:
# El archivo está en la raíz, usar el nombre del archivo sin extensión
folder_name = os.path.splitext(file_name)[0]
# print(f"Folder name para validación: {folder_name}")
# Validar nomenclatura
match = nomenclatura_pattern.match(folder_name)
match_sin_anio = nomenclatura_pattern_sin_anio.match(folder_name)
if not match and not match_sin_anio:
# print(f"Nomenclatura inválida: {folder_name}")
# Determinar el archivo original basado en el subdirectorio
archivo_original = folder_name + ('.zip' if any(f.endswith('.zip') for f in [a.name for a in archivos]) else '.rar')
failed_files.append({
"file": relative_path,
"archivo_original": archivo_original,
"error": f"Nomenclatura inválida: {folder_name}. Esperado: anio-aduana-patente-pedimento"
})
continue
if match:
# print(f"Nomenclatura válida: {folder_name}")
anio, aduana, patente, pedimento_num = match.groups()
# print(f"Extraído - Año: {anio}, Aduana: {aduana}, Patente: {patente}, Pedimento: {pedimento_num}")
# Formato original: anio-aduana-patente-pedimento
# Crear fecha_pago basada en el año
try:
# Convertir año de 2 dígitos a 4 dígitos
anio_completo = 2000 + int(anio) if int(anio) < 50 else 1900 + int(anio)
fecha_pago = datetime(anio_completo, 1, 1).date()
# print(f"Fecha de pago calculada: {fecha_pago}")
except ValueError:
archivo_original = folder_name + ('.zip' if any(f.endswith('.zip') for f in [a.name for a in archivos]) else '.rar')
failed_files.append({
"file": relative_path,
"archivo_original": archivo_original,
"error": f"Año inválido: {anio}"
})
continue
elif match_sin_anio:
# print(f"Nomenclatura válida sin año: {folder_name}")
# Formato sin año: aduana-patente-pedimento
aduana, patente, pedimento_num = match_sin_anio.groups()
# print(f"Extraído - Aduana: {aduana}, Patente: {patente}, Pedimento: {pedimento_num}")
# Obtener el primer dígito del pedimento
primer_digito_pedimento = int(pedimento_num[0]) if pedimento_num else 0
# Usar año actual para fecha_pago y ajustar según el dígito del pedimento
año_actual = datetime.now().year
# Crear año con el dígito del pedimento (reemplazando el último dígito)
año_con_digito = int(str(año_actual)[:-1] + str(primer_digito_pedimento))
# Aplicar lógica de comparación
if año_con_digito <= año_actual:
# Si el año con dígito es menor o igual al año actual
año_final = año_con_digito
else:
# Si el año con dígito es mayor al año actual, restar 10
año_final = año_con_digito - 10
# Tomar los últimos 2 dígitos del año final
anio = año_final % 100
# Crear fecha de pago (primer día del año)
fecha_pago = datetime(año_final , 1, 1).date()
# print(f"Fecha de pago (año actual) calculada: {fecha_pago}")
# Generar pedimento_app
pedimento_app = f"{anio}-{aduana.zfill(2)}-{patente}-{pedimento_num}"
# print(f"Pedimento_app generado: {pedimento_app}")
# print(f"Buscando pedimento existente con pedimento_app: {pedimento_app} y organización ID: {organizacion.id}")
# Verificar si el pedimento ya existe
existing_pedimento = Pedimento.objects.filter(
# pedimento=int(pedimento_num),
pedimento_app=pedimento_app,
organizacion=organizacion
).first()
# print(f"Pedimento existente: {existing_pedimento is not None}")
if not existing_pedimento:
# print("📝 Pedimento no existe, creando nuevo...")
# Crear nuevo pedimento
try:
# print("🔄 Iniciando creación de pedimento...")
importador = None
if contribuyente:
# Obtener o crear el importador
# print(f"🏢 Buscando/creando importador con RFC: {contribuyente}")
importador, created = Importador.objects.get_or_create(
rfc=contribuyente,
defaults={
'nombre': f"Importador {contribuyente}",
'organizacion': organizacion
}
)
# if created:
# print(f"✅ Importador creado: {importador.rfc} - {importador.nombre}")
# else:
# print(f"♻️ Importador existente: {importador.rfc} - {importador.nombre}")
if tipo_operacion_input:
tipo_operacion_input = TipoOperacion.objects.get(id=tipo_operacion_input)
pedimento = Pedimento.objects.create(
organizacion=organizacion,
contribuyente=importador if importador else None,
pedimento=str(pedimento_num),
aduana=str(aduana),
patente=str(patente),
fecha_pago=fecha_pago_input if fecha_pago_input else fecha_pago,
curp_apoderado=curp_apoderado_input if curp_apoderado_input else "",
numero_partidas=partidas_input if partidas_input else 0,
tipo_operacion=tipo_operacion_input if tipo_operacion_input else None,
pedimento_app=pedimento_app,
agente_aduanal=f"Agente {patente}", # Valor por defecto
clave_pedimento=clave_pedimento_input if clave_pedimento_input else "A1" # Valor por defecto
)
# print(f"✅ Pedimento creado exitosamente: ID {pedimento.id}, pedimento_app: {pedimento_app}")
created_pedimentos.append({
"id": str(pedimento.id),
"pedimento_app": pedimento_app,
"contribuyente": getattr(importador, 'rfc', None),
"contribuyente_nombre": getattr(importador, 'nombre', None)
})
except Exception as e:
# print(f"❌ Error al crear pedimento: {str(e)}")
archivo_original = folder_name + ('.zip' if any(f.endswith('.zip') for f in [a.name for a in archivos]) else '.rar')
failed_files.append({
"file": relative_path,
"archivo_original": archivo_original,
"error": f"Error al crear pedimento: {str(e)}"
})
continue
else:
# print(f"♻️ Usando pedimento existente: ID {existing_pedimento.id}")
# Usar pedimento existente
# # Actualizar Importador
if contribuyente:
importador, created = Importador.objects.get_or_create(
rfc=contribuyente,
defaults={
'nombre': f"Importador {contribuyente}",
'organizacion': organizacion
}
)
importador_db = existing_pedimento.contribuyente
if importador_db:
if importador_db != importador:
existing_pedimento.contribuyente = importador
else:
existing_pedimento.contribuyente = importador
existing_pedimento.save()
# Actualizar Tipo Operacion
if tipo_operacion_input:
tipo_operacion_input = TipoOperacion.objects.get(id=tipo_operacion_input)
if tipo_operacion_input:
tipo_operacion_db = existing_pedimento.tipo_operacion
if not tipo_operacion_db:
existing_pedimento.tipo_operacion = tipo_operacion_input
existing_pedimento.save()
# Actualizar fecha de pago solo cuando aun no esta actualizado
if fecha_pago_input:
fecha_db = existing_pedimento.fecha_pago
# Verificar si hay fecha en BD
if fecha_db:
# Asegurar que trabajamos con date
if isinstance(fecha_db, datetime):
fecha_db = fecha_db.date()
# Si la fecha existe y es 1 de enero, actualizar
if fecha_db.month == 1 and fecha_db.day == 1:
# Actualizar Fecha
existing_pedimento.fecha_pago = fecha_pago_input
existing_pedimento.save()
else:
existing_pedimento.fecha_pago = fecha_pago_input
existing_pedimento.save()
if clave_pedimento_input:
clavePedimento = existing_pedimento.clave_pedimento
if not clavePedimento:
existing_pedimento.clave_pedimento = clave_pedimento_input
existing_pedimento.save()
if curp_apoderado_input:
curp = existing_pedimento.curp_apoderado
if not curp:
existing_pedimento.curp_apoderado = curp_apoderado_input
existing_pedimento.save()
if partidas_input:
numPartidas = existing_pedimento.numero_partidas
if not numPartidas:
existing_pedimento.numero_partidas = partidas_input
existing_pedimento.save()
elif numPartidas <= 0:
existing_pedimento.numero_partidas = partidas_input
existing_pedimento.save()
pedimento = existing_pedimento
# print(f"🔄 Iniciando creación de documento para pedimento ID: {pedimento.id}")
# Crear documento asociado al pedimento
try:
# print("📖 Leyendo archivo desde directorio temporal...")
# Leer el archivo desde el directorio temporal
with open(file_path, 'rb') as f:
file_content = f.read()
# Verificar si el archivo tiene la nomenclatura especial M8988852.300
file_name_lower = file_name.lower()
tiene_nomenclatura_especial = False
info_extraida = {}
# Patrón: 7 dígitos, punto, 3 dígitos (ej: M8988852.300)
patron_nomenclatura = re.compile(r'^[m|M]\d{7}\.\d{3}$', re.IGNORECASE)
# Separar nombre base y extensión
nombre_base, extension = os.path.splitext(file_name)
if patron_nomenclatura.match(file_name_lower):
tiene_nomenclatura_especial = True
# Procesar el archivo con el método auxiliar
info_extraida = procesar_archivo_m_con_nomenclatura(file_content, pedimento )
if info_extraida.get('tiene_nomenclatura_especial', False):
# Agregar información de procesamiento a los datos de respuesta
if 'procesamiento_archivos' not in locals():
procesamiento_archivos = []
procesamiento_archivos.append({
'archivo': file_name,
'nomenclatura_especial': True,
'registros_encontrados': info_extraida.get('registros_encontrados', []),
'actualizaciones': info_extraida.get('actualizaciones_aplicadas', [])
})
# print(f"📄 Archivo leído: {len(file_content)} bytes")
# Crear ContentFile que Django puede manejar correctamente
django_file = ContentFile(file_content, name=file_name)
fuente, created = Fuente.objects.get_or_create(
nombre="APP-EFC",
descripcion='Transmitido por la app de escritorio'
)
# print(f"Creando documento para archivo: {file_name}")
# Crear documento - Django automáticamente guardará el archivo en media/documents/
document = Document.objects.create(
organizacion=organizacion,
pedimento_id=pedimento.id,
document_type=document_type,
fuente_id=fuente.id,
archivo=django_file,
size=len(file_content),
extension=os.path.splitext(file_name)[1].lower().lstrip('.')
)
# print(f"Documento creado exitosamente: {document.id}")
documents_created += 1
# print(f"📊 Total documentos creados hasta ahora: {documents_created}")
except Exception as e:
# print(f"❌ Error al crear documento: {str(e)}")
archivo_original = folder_name + ('.zip' if any(f.endswith('.zip') for f in [a.name for a in archivos]) else '.rar')
failed_files.append({
"file": relative_path,
"archivo_original": archivo_original,
"error": f"Error al crear documento: {str(e)}"
})
continue
if documents_created > 0 and existing_pedimento:
existing_pedimento.existe_expediente = True
existing_pedimento.save()
# print(f"🏁 Procesamiento completado. Archivos procesados en este directorio.")
except Exception as e:
return Response(
{ "tieneError": True,
"error": f"Error durante el procesamiento: {str(e)}"
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
finally:
# Limpiar directorio temporal
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
# Preparar respuesta
response_data = {
"tieneError": False,
"failed_files": failed_files,
"processed_files": len(archivos),
}
if failed_files:
response_data["tieneError"] = True
response_data.update({
"message": "Procesamiento completado con algunos errores",
"errors": [item["error"] for item in failed_files]
})
response_status = status.HTTP_207_MULTI_STATUS
else:
response_data["message"] = "Pedimentos creados exitosamente"
response_status = status.HTTP_201_CREATED
return Response(response_data, status=response_status)
@action(detail=False, methods=['post'], url_path='bulk-upload-record-zip', parser_classes=[MultiPartParser, FormParser])
def bulk_upload_record(self, request):
"""
Endpoint para subir un archivo zip con documentos de pedimentos.
Se espera un archivo zip con nomenclatura esperada de archivos: anio-aduana-patente-pedimento o aduana-patente-pedimento: ejemplo: 24-07-3420-1234567.zip o 07-3420-1234567.zip
- anio: 2 dígitos (ej: 24)
- aduana: 2 o 3 dígitos (ej: 01, 123)
- patente: 4 dígitos (ej: 3420)
- pedimento: 7 dígitos (ej: 1234567)
El endpoint procesará cada registro, verificando si el pedimento existe y actualizando su estado o creando un nuevo registro según sea necesario.
Respuesta:
{
"message": "Archivo procesado",
"processed_records": 100,
"created_pedimentos": [...],
"updated_pedimentos": [...],
"failed_records": [...]
}
"""
created_pedimentos = []
updated_pedimentos = []
failed_records = []
errors = []
documents_created = 0
temp_dir = None
# Implementación del procesamiento del archivo CSV y actualización/creación de pedimentos
# Este es un ejemplo básico y se puede expandir según los requisitos específicos
archivos = request.FILES.get('archivos')
if archivos and hasattr(archivos, 'name'): # Es un archivo individual
archivos = [archivos] # Convertir a lista para procesar de manera uniforme
else:
archivos = request.FILES.getlist('archivos')
# Validar datos requeridos
contribuyente = request.data.get('contribuyente')
fecha_pago_input = request.data.get('fecha_pago')
clave_pedimento_input = request.data.get('clave_pedimento')
patente_input = request.data.get('patente')
tipo_operacion_input = request.data.get('tipo_operacion')
aduana_input = request.data.get('aduana')
contribuyente_input = request.data.get('contribuyente')
curp_apoderado_input = request.data.get('curp_apoderado')
partidas_input = request.data.get('partidas')
fuente_archivos = request.data.get('partidas')
# Validar organización del usuario
if not request.user.is_authenticated or not hasattr(request.user, 'organizacion'):
return Response(
{
"tieneError": True,
"error": "Usuario no autenticado o sin organización"
},
status=status.HTTP_400_BAD_REQUEST
)
organizacion = request.user.organizacion
# Regex para validar nomenclatura: anio-aduana-patente-pedimento
nomenclatura_pattern = re.compile(r'^(\d{2})-(\d{2,3})-(\d{4})-(\d{7})$')
nomenclatura_pattern_sin_anio = re.compile(r'^(\d{2,3})-(\d{4})-(\d{7})$')
if not archivos:
return Response(
{ "tieneError": True,
"error": "Se requiere un archivo para procesar"
},
status=status.HTTP_400_BAD_REQUEST
)
# Obtener DocumentType ANTES de la transacción atómica
try:
# Primero intentar obtener si ya existe
try:
document_type = DocumentType.objects.get(nombre="Pedimento")
except DocumentType.DoesNotExist:
# Si no existe, crear uno nuevo
document_type = DocumentType.objects.create(
nombre="Pedimento",
descripcion="Documento de pedimento"
)
except Exception as e:
# Como fallback, intentar obtener cualquier DocumentType existente
try:
document_type = DocumentType.objects.first()
if not document_type:
return Response(
{"error": "No se pudo configurar el tipo de documento y no hay tipos existentes"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
except Exception as fallback_error:
return Response(
{
"tieneError": True,
"error": f"Error crítico al configurar tipo de documento: {str(e)}"
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
try:
with transaction.atomic():
# Crear directorio temporal
temp_dir = tempfile.mkdtemp()
# Procesar cada archivo enviado
for idx, archivo in enumerate(archivos):
archivo_name = archivo.name.lower()
# Crear subdirectorio para cada archivo usando el nombre del archivo sin extensión
archivo_name_sin_extension = os.path.splitext(archivo.name)[0]
sub_dir = os.path.join(temp_dir, archivo_name_sin_extension)
os.makedirs(sub_dir, exist_ok=True)
if archivo_name.endswith('.zip'):
# Manejar archivo ZIP
try:
with zipfile.ZipFile(archivo, 'r') as zip_ref:
zip_ref.extractall(sub_dir)
# print("Archivo ZIP extraído exitosamente")
except zipfile.BadZipFile as e:
return Response(
{
"tieneError": True,
"error": f"Archivo ZIP corrupto o inválido: {archivo.name} - {str(e)}"
},
status=status.HTTP_400_BAD_REQUEST
)
except Exception as e:
return Response(
{
"tieneError": True,
"error": f"Error al extraer ZIP {archivo.name}: {str(e)}"
},
status=status.HTTP_400_BAD_REQUEST
)
else:
return Response(
{ "tieneError": True,
"error": "Solo se admiten archivos ZIP"
},status=status.HTTP_400_BAD_REQUEST
)
# Recorrer todos los archivos extraídos o el directorio
for root, dirs, files in os.walk(temp_dir):
for file_name in files:
file_path = os.path.join(root, file_name)
# Obtener la ruta relativa para determinar la estructura de carpetas
relative_path = os.path.relpath(file_path, temp_dir)
# Determinar si el archivo está en una carpeta que sigue la nomenclatura
folder_name = None
if os.path.dirname(relative_path):
# El archivo está dentro de una carpeta
folder_parts = relative_path.split(os.sep)
folder_name = folder_parts[0] # Primera carpeta (nombre del archivo ZIP/RAR sin extensión)
else:
# El archivo está en la raíz, usar el nombre del archivo sin extensión
folder_name = os.path.splitext(file_name)[0]
# Validar nomenclatura
match = nomenclatura_pattern.match(folder_name)
match_sin_anio = nomenclatura_pattern_sin_anio.match(folder_name)
if not match and not match_sin_anio:
# Determinar el archivo original basado en el subdirectorio
archivo_original = folder_name + ('.zip' if any(f.endswith('.zip') for f in [a.name for a in archivos]) else '.rar')
failed_records.append({
"file": relative_path,
"archivo_original": archivo_original,
"error": f"Nomenclatura inválida: {folder_name}. Esperado: anio-aduana-patente-pedimento"
})
continue
if match:
anio, aduana, patente, pedimento_num = match.groups()
# Formato original: anio-aduana-patente-pedimento
# Crear fecha_pago basada en el año
try:
# Convertir año de 2 dígitos a 4 dígitos
anio_completo = 2000 + int(anio) if int(anio) < 50 else 1900 + int(anio)
fecha_pago = datetime(anio_completo, 1, 1).date()
except ValueError:
archivo_original = folder_name + ('.zip' if any(f.endswith('.zip') for f in [a.name for a in archivos]) else '.rar')
failed_records.append({
"file": relative_path,
"archivo_original": archivo_original,
"error": f"Año inválido: {anio}"
})
continue
elif match_sin_anio:
# Formato sin año: aduana-patente-pedimento
aduana, patente, pedimento_num = match_sin_anio.groups()
# Obtener el primer dígito del pedimento
primer_digito_pedimento = int(pedimento_num[0]) if pedimento_num else 0
# Usar año actual para fecha_pago y ajustar según el dígito del pedimento
año_actual = datetime.now().year
# Crear año con el dígito del pedimento (reemplazando el último dígito)
año_con_digito = int(str(año_actual)[:-1] + str(primer_digito_pedimento))
# Aplicar lógica de comparación
if año_con_digito <= año_actual:
# Si el año con dígito es menor o igual al año actual
año_final = año_con_digito
else:
# Si el año con dígito es mayor al año actual, restar 10
año_final = año_con_digito - 10
# Tomar los últimos 2 dígitos del año final
anio = año_final % 100
# Crear fecha de pago (primer día del año)
fecha_pago = datetime(año_final , 1, 1).date()
# Generar pedimento_app
pedimento_app = f"{anio}-{aduana.zfill(2)}-{patente}-{pedimento_num}"
# Verificar si el pedimento ya existe
existing_pedimento = Pedimento.objects.filter(
pedimento_app=pedimento_app,
organizacion=organizacion
).first()
if not existing_pedimento:
# Crear nuevo pedimento
try:
importador = None
if contribuyente:
# Obtener o crear el importador
importador, created = Importador.objects.get_or_create(
rfc=contribuyente,
defaults={
'nombre': f"Importador {contribuyente}",
'organizacion': organizacion
}
)
if tipo_operacion_input:
tipo_operacion_input = TipoOperacion.objects.get(id=tipo_operacion_input)
pedimento = Pedimento.objects.create(
organizacion=organizacion,
contribuyente=importador if importador else None,
pedimento=str(pedimento_num),
aduana=str(aduana),
patente=str(patente),
fecha_pago=fecha_pago_input if fecha_pago_input else fecha_pago,
curp_apoderado=curp_apoderado_input if curp_apoderado_input else "",
numero_partidas=partidas_input if partidas_input else 0,
tipo_operacion=tipo_operacion_input if tipo_operacion_input else None,
pedimento_app=pedimento_app,
agente_aduanal=f"Agente {patente}", # Valor por defecto
clave_pedimento=clave_pedimento_input if clave_pedimento_input else "A1" # Valor por defecto
)
created_pedimentos.append({
"id": str(pedimento.id),
"pedimento_app": pedimento_app,
"contribuyente": getattr(importador, 'rfc', None),
"contribuyente_nombre": getattr(importador, 'nombre', None)
})
except Exception as e:
archivo_original = folder_name + ('.zip' if any(f.endswith('.zip') for f in [a.name for a in archivos]) else '.rar')
failed_records.append({
"file": relative_path,
"archivo_original": archivo_original,
"error": f"Error al crear pedimento: {str(e)}"
})
continue
else: # Usar pedimento existente
# Actualizar Importador
if contribuyente:
importador, created = Importador.objects.get_or_create(
rfc=contribuyente,
defaults={
'nombre': f"Importador {contribuyente}",
'organizacion': organizacion
}
)
importador_db = existing_pedimento.contribuyente
if importador_db:
if importador_db != importador:
existing_pedimento.contribuyente = importador
else:
existing_pedimento.contribuyente = importador
existing_pedimento.save()
# Actualizar Tipo Operacion
if tipo_operacion_input:
tipo_operacion_input = TipoOperacion.objects.get(id=tipo_operacion_input)
if tipo_operacion_input:
tipo_operacion_db = existing_pedimento.tipo_operacion
if not tipo_operacion_db:
existing_pedimento.tipo_operacion = tipo_operacion_input
existing_pedimento.save()
# Actualizar fecha de pago solo cuando aun no esta actualizado
if fecha_pago_input:
fecha_db = existing_pedimento.fecha_pago
# Verificar si hay fecha en BD
if fecha_db:
# Asegurar que trabajamos con date
if isinstance(fecha_db, datetime):
fecha_db = fecha_db.date()
# Si la fecha existe y es 1 de enero, actualizar
if fecha_db.month == 1 and fecha_db.day == 1:
# Actualizar Fecha
existing_pedimento.fecha_pago = fecha_pago_input
existing_pedimento.save()
else:
existing_pedimento.fecha_pago = fecha_pago_input
existing_pedimento.save()
if clave_pedimento_input:
clavePedimento = existing_pedimento.clave_pedimento
if not clavePedimento or clavePedimento.strip() != clave_pedimento_input.strip():
existing_pedimento.clave_pedimento = clave_pedimento_input
existing_pedimento.save()
if curp_apoderado_input:
curp = existing_pedimento.curp_apoderado
if not curp:
existing_pedimento.curp_apoderado = curp_apoderado_input
existing_pedimento.save()
if partidas_input:
numPartidas = existing_pedimento.numero_partidas
if not numPartidas:
existing_pedimento.numero_partidas = partidas_input
existing_pedimento.save()
elif numPartidas <= 0:
existing_pedimento.numero_partidas = partidas_input
existing_pedimento.save()
pedimento = existing_pedimento
# Crear documento asociado al pedimento
try:
# Leer el archivo desde el directorio temporal
with open(file_path, 'rb') as f:
file_content = f.read()
# Verificar si el archivo tiene la nomenclatura especial M8988852.300
file_name_lower = file_name.lower()
tiene_nomenclatura_especial = False
info_extraida = {}
# Patrón: 7 dígitos, punto, 3 dígitos (ej: M8988852.300)
patron_nomenclatura = re.compile(r'^[m|M]\d{7}\.\d{3}$', re.IGNORECASE)
# Separar nombre base y extensión
nombre_base, extension = os.path.splitext(file_name)
if patron_nomenclatura.match(file_name_lower):
tiene_nomenclatura_especial = True
# Procesar el archivo con el método auxiliar
info_extraida = procesar_archivo_m_con_nomenclatura(file_content, pedimento)
if info_extraida.get('tiene_nomenclatura_especial', False):
# Agregar información de procesamiento a los datos de respuesta
if 'procesamiento_archivos' not in locals():
procesamiento_archivos = []
procesamiento_archivos.append({
'archivo': file_name,
'nomenclatura_especial': True,
'registros_encontrados': info_extraida.get('registros_encontrados', []),
'actualizaciones': info_extraida.get('actualizaciones_aplicadas', [])
})
# Crear ContentFile que Django puede manejar correctamente
django_file = ContentFile(file_content, name=file_name)
fuente, created = Fuente.objects.get_or_create(
nombre="APP-EFC",
descripcion='Transmitido por la app de escritorio'
)
# Buscar si ya existe un documento con el mismo nombre para este pedimento
existing_documents = Document.objects.filter(
pedimento_id=pedimento.id,
organizacion=organizacion
)
existing_document = None
for doc in existing_documents:
if is_same_document(doc, file_name):
existing_document = doc
break
if existing_document:
# Opcional: Eliminar el archivo físico anterior
try:
if existing_document.archivo and os.path.exists(existing_document.archivo.path):
os.remove(existing_document.archivo.path)
except (ValueError, OSError) as e:
pass
# Actualizar el documento existente con el nuevo archivo y datos
existing_document.archivo = django_file
existing_document.size = len(file_content)
existing_document.extension = extension
existing_document.updated_at = timezone.now() # Si tienes este campo
existing_document.save()
documents_created += 1
else:
# Crear documento - Django automáticamente guardará el archivo en media/documents/
document = Document.objects.create(
organizacion=organizacion,
pedimento_id=pedimento.id,
document_type=document_type,
fuente_id=fuente.id,
archivo=django_file,
size=len(file_content),
extension=os.path.splitext(file_name)[1].lower().lstrip('.')
)
documents_created += 1
except Exception as e:
archivo_original = folder_name + ('.zip' if any(f.endswith('.zip') for f in [a.name for a in archivos]) else '.rar')
failed_records.append({
"file": relative_path,
"archivo_original": archivo_original,
"error": f"Error al crear documento: {str(e)}"
})
continue
if documents_created > 0 and existing_pedimento:
existing_pedimento.existe_expediente = True
existing_pedimento.save()
except Exception as e:
return Response(
{ "tieneError": True,
"error": f"Error durante el procesamiento: {str(e)}"
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
finally:
# Limpiar directorio temporal
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
# Preparar respuesta
response_data = {
"tieneError": False,
"failed_files": failed_records,
"processed_files": len(archivos),
}
if failed_records:
response_data["tieneError"] = True
response_data.update({
"message": "Procesamiento completado con algunos errores",
"errors": [item["error"] for item in failed_records]
})
response_status = status.HTTP_207_MULTI_STATUS
else:
response_data["message"] = "Pedimentos creados exitosamente"
response_status = status.HTTP_201_CREATED
return Response(response_data, status=response_status)
@action(detail=False, methods=['post'], url_path='bulk-upload-record-zip-async', parser_classes=[MultiPartParser, FormParser])
def bulk_upload_record_async(self, request):
"""
Endpoint asíncrono para subir archivos ZIP de pedimentos en segundo plano.
Retorna task_id para polling de estado.
Respuesta:
{
"task_id": "uuid-de-la-tarea",
"status": "PENDING",
"message": "Procesamiento iniciado en segundo plano"
}
"""
import uuid
import os
from django.conf import settings
from api.customs.tasks.bulk_upload import bulk_upload_record_task
# Obtener archivos
archivos = request.FILES.get('archivos')
if archivos and hasattr(archivos, 'name'):
archivos = [archivos]
else:
archivos = request.FILES.getlist('archivos')
# Validar datos requeridos
# contribuyente = request.data.get('contribuyente')
# if not contribuyente:
# return Response(
# {"error": "Se requiere el campo 'contribuyente'"},
# status=status.HTTP_400_BAD_REQUEST
# )
if not archivos:
return Response(
{'tieneError': True,
"mensaje": "Se requiere al menos un archivo"},
status=status.HTTP_400_BAD_REQUEST
)
# Validar organización del usuario
if not request.user.is_authenticated or not hasattr(request.user, 'organizacion'):
return Response(
{'tieneError': True,
"mensaje": "Usuario no autenticado o sin organización"},
status=status.HTTP_400_BAD_REQUEST
)
organizacion = request.user.organizacion
# Preparar parámetros
parametros = {
'contribuyente': request.data.get('contribuyente'),
'fecha_pago_input': request.data.get('fecha_pago'),
'clave_pedimento_input': request.data.get('clave_pedimento'),
'patente_input': request.data.get('patente'),
'tipo_operacion_input': request.data.get('tipo_operacion'),
'aduana_input': request.data.get('aduana'),
'curp_apoderado_input': request.data.get('curp_apoderado'),
'partidas_input': request.data.get('partidas')
}
# Guardar archivos temporalmente en MEDIA_ROOT (compartido entre contenedores)
temp_dir = os.path.join(settings.MEDIA_ROOT, 'temp_bulk_upload', str(uuid.uuid4()))
os.makedirs(temp_dir, exist_ok=True)
archivo_paths = []
try:
for archivo in archivos:
if not archivo.name.lower().endswith('.zip'):
return Response(
{'tieneError': True,
"mensaje": f"Solo se admiten archivos ZIP: {archivo.name}"},
status=status.HTTP_400_BAD_REQUEST
)
file_path = os.path.join(temp_dir, archivo.name)
with open(file_path, 'wb') as f:
for chunk in archivo.chunks():
f.write(chunk)
archivo_paths.append(file_path)
# Llamar tarea Celery
task = bulk_upload_record_task.apply_async(
args=[str(organizacion.id), parametros, archivo_paths]
)
return Response({
'tieneError': False,
'task_id': task.id,
'status': 'PENDING',
'mensaje': 'Procesamiento iniciado en segundo plano'
}, status=status.HTTP_202_ACCEPTED)
# bulk_upload_record_task_1(str(organizacion.id), parametros, archivo_paths)
# return Response({
# 'task_id': str(uuid.uuid4()), # Generar un UUID ficticio para la respuesta, ya que no estamos usando Celery en este ejemplo
# 'status': 'PENDING',
# 'message': 'Procesamiento iniciado en segundo plano'
# }, status=status.HTTP_202_ACCEPTED)
except Exception as e:
# Limpiar archivos temporales SOLO si hay error antes de lanzar la tarea
if temp_dir and os.path.exists(temp_dir):
import shutil
shutil.rmtree(temp_dir)
return Response(
{'tieneError': True,
"mensaje": f"Error al iniciar procesamiento: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
my_tags = ['Pedimentos']
class PartidaViewSet(viewsets.ModelViewSet):
"""
ViewSet for Partida model.
Permite filtrar por:
- pedimento: UUID del pedimento (query parameter principal)
- pedimento__id: UUID del pedimento (alternativo)
Ejemplo: GET /api/partidas/?pedimento=6782d22e-5e97-4efc-87c9-bd8497c8ac7e
"""
permission_classes = [IsAuthenticated & (IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
queryset = Partida.objects.all()
serializer_class = PartidaSerializer
pagination_class = CustomPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = {
'pedimento': ['exact'], # Filtro directo por UUID del pedimento
'pedimento__id': ['exact'], # Filtro alternativo
'numero_partida': ['exact', 'gte', 'lte'], # Filtros por número de partida
'descargado': ['exact'], # Filtro por estado de descarga
'created_at': ['exact', 'gte', 'lte'], # Filtros por fecha de creación
'updated_at': ['exact', 'gte', 'lte'] # Filtros por fecha de actualización
}
search_fields = ['pedimento__pedimento', 'pedimento__pedimento_app']
ordering_fields = ['numero_partida', 'pedimento__pedimento', 'id', 'created_at', 'updated_at']
ordering = ['numero_partida'] # Ordenar por número de partida por defecto
my_tags = ['Partidas']
class ViewSetTipoOperacion(LoggingMixin, viewsets.ModelViewSet):
"""
ViewSet for TipoOperacion model.
"""
permission_classes = [IsAuthenticated & (IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
queryset = TipoOperacion.objects.all()
serializer_class = TipoOperacionSerializer
pagination_class = CustomPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['tipo']
search_fields = ['tipo', 'descripcion']
ordering_fields = ['tipo', 'descripcion']
ordering = ['tipo']
my_tags = ['Tipos_Operacion']
def perform_create(self, serializer):
"""
Asigna automáticamente la organización del usuario autenticado al crear un tipo de operación.
"""
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("Usuario no autenticado o sin organización")
# Solo el supoerusuario puede crear tipos de operación
if not self.request.user.is_superuser:
raise PermissionDenied("Solo los superusuarios pueden crear tipos de operación")
serializer.save(organizacion=self.request.user.organizacion)
def perform_update(self, serializer):
"""
Solo el superusuario puede actualizar tipos de operación.
"""
if not self.request.user.is_superuser:
raise PermissionDenied("Solo los superusuarios pueden actualizar tipos de operación")
serializer.save()
class ViewSetProcesamientoPedimento(viewsets.ModelViewSet, ProcesosPorOrganizacionMixin):
"""
ViewSet for ProcesamientoPedimento model.
Soporta paginación, filtros y búsqueda.
Parámetros disponibles:
- page: Número de página (solo si se especifica page_size)
- page_size: Elementos por página (si NO se especifica, devuelve TODOS los resultados)
- pedimento: Filtro por pedimento
- estado: Filtro por estado
- servicio: Filtro por servicio
- tipo_procesamiento: Filtro por tipo de procesamiento
- ordering: Ordenar por campo (ej: -created_at, -updated_at)
Ejemplos:
- /procesamientopedimentos/ → Devuelve TODOS los procesamientos
- /procesamientopedimentos/?page_size=5 → Devuelve los primeros 5
"""
permission_classes = [IsAuthenticated, IsSuperUser | IsSameOrganizationDeveloper ]
serializer_class = ProcesamientoPedimentoSerializer
pagination_class = CustomPagination
model = ProcesamientoPedimento
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = {
'pedimento': ['exact'],
'pedimento__pedimento_app': ['exact', 'icontains'],
'estado': ['exact'],
'servicio': ['exact'],
'tipo_procesamiento': ['exact'],
}
search_fields = ['pedimento__pedimento_app', 'pedimento__pedimento']
ordering_fields = ['created_at', 'updated_at']
ordering = ['-created_at']
def get_queryset(self):
return self.get_queryset_filtrado_por_organizacion()
def perform_create(self, serializer):
"""
Asigna siempre la organización al crear un procesamiento de pedimento.
- Para superusuarios: requiere que la organización venga explícitamente en los datos validados.
- Para usuarios normales: asigna la organización del usuario autenticado.
"""
user = self.request.user
if not user.is_authenticated:
raise ValueError("Usuario no autenticado")
# Si es superusuario, debe venir la organización en los datos validados
if user.is_superuser:
organizacion = serializer.validated_data.get('organizacion', None)
if not organizacion:
raise ValueError("El superusuario debe especificar una organización al crear el procesamiento de pedimento.")
serializer.save()
return
# Para usuarios normales, asignar siempre la organización del usuario
if not hasattr(user, 'organizacion') or not user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=user.organizacion)
def perform_update(self, serializer):
"""
Permite actualizar un procesamiento de pedimento, pero solo si el usuario es superusuario o pertenece a la misma organización.
"""
if not self.request.user.is_authenticated:
raise ValueError("Usuario no autenticado")
if self.request.user.is_superuser:
serializer.save()
return
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user.groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
# Para usuarios normales, usar siempre su organización
if not hasattr(self.request.user, 'organizacion') or not self.request.user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=self.request.user.organizacion)
return
raise ValueError("Usuario no autenticado o sin permisos para actualizar ProcesamientoPedimento")
my_tags = ['Procesamientos_Pedimentos']
class ViewSetEDocument(LoggingMixin, viewsets.ModelViewSet, OrganizacionFiltradaMixin):
"""
ViewSet for EDocument model.
"""
permission_classes = [IsAuthenticated & (IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
serializer_class = EDocumentSerializer
pagination_class = CustomPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['pedimento', 'numero_edocument', 'organizacion']
search_fields = ['numero_edocument', 'descripcion', 'organizacion']
ordering_fields = ['created_at', 'updated_at', 'numero_edocument']
ordering = ['-created_at']
model = EDocument
my_tags = ['EDocuments']
def get_queryset(self):
return self.get_queryset_filtrado_por_organizacion()
def perform_create(self, serializer):
"""
Asigna automáticamente la organización del usuario autenticado al crear un EDocument.
Para superusuarios, permite especificar una organización diferente.
"""
if not self.request.user.is_authenticated:
raise ValueError("Usuario no autenticado")
# Si es superusuario y se especifica organizacion en los datos validados
if self.request.user.is_superuser:
# Permitir que el superusuario especifique la organización
serializer.save()
return
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user.groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
# Para usuarios normales, usar siempre su organización
if not hasattr(self.request.user, 'organizacion') or not self.request.user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=self.request.user.organizacion)
return
raise ValueError("Usuario no autenticado o sin permisos para crear EDocument")
def perform_update(self, serializer):
"""
Permite actualizar un EDocument, pero solo si el usuario es superusuario o pertenece a la misma organización.
"""
if not self.request.user.is_authenticated:
raise ValueError("Usuario no autenticado")
# Si es superusuario, permite actualizar sin restricciones
if self.request.user.is_superuser:
serializer.save()
return
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user.groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
# Para usuarios normales, usar siempre su organización
if not hasattr(self.request.user, 'organizacion') or not self.request.user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=self.request.user.organizacion)
raise ValueError("Usuario no autenticado o sin permisos para actualizar EDocument")
class ViewSetCove(viewsets.ModelViewSet, OrganizacionFiltradaMixin):
"""
ViewSet for Cove model.
"""
permission_classes = [IsAuthenticated & (IsSuperUser |IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper )]
serializer_class = CoveSerializer
pagination_class = CustomPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['pedimento', 'numero_cove', 'organizacion']
search_fields = ['numero_cove', 'descripcion', 'organizacion']
ordering_fields = ['created_at', 'updated_at', 'numero_cove']
ordering = ['-created_at']
model = Cove
my_tags = ['Coves']
def get_queryset(self):
return self.get_queryset_filtrado_por_organizacion()
def perform_create(self, serializer):
"""
Asigna automáticamente la organización del usuario autenticado al crear un Cove.
Para superusuarios, permite especificar una organización diferente.
"""
if not self.request.user.is_authenticated:
raise ValueError("Usuario no autenticado")
# Si es superusuario y se especifica organizacion en los datos validados
if self.request.user.is_superuser:
# Permitir que el superusuario especifique la organización
serializer.save()
return
if (
self.request.user.groups.filter(name='developer').exists()
or self.request.user.groups.filter(name='admin').exists()
or self.request.user.groups.filter(name='user').exists()
) and self.request.user.groups.filter(name='Agente Aduanal').exists():
# Para usuarios normales, usar siempre su organización
if not hasattr(self.request.user, 'organizacion') or not self.request.user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=self.request.user.organizacion)
return
raise ValueError("Usuario no autenticado o sin permisos para crear Cove")
def perform_update(self, serializer):
"""
Permite actualizar un Cove, pero solo si el usuario es superusuario o pertenece a la misma organización.
"""
if not self.request.user.is_authenticated:
raise ValueError("Usuario no autenticado")
# Si es superusuario, permite actualizar sin restricciones
if self.request.user.is_superuser:
serializer.save()
return
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user .groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
# Para usuarios normales, usar siempre su organización
if not hasattr(self.request.user, 'organizacion') or not self.request.user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=self.request.user.organizacion)
class ImportadorViewSet(viewsets.ModelViewSet, OrganizacionFiltradaMixin):
"""
ViewSet for Importador model.
"""
permission_classes = [IsAuthenticated & (IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
serializer_class = ImportadorSerializer
pagination_class = CustomPagination
filter_backends = [DjangoFilterBackend, SearchFilter, OrderingFilter]
filterset_fields = ['rfc', 'nombre', 'organizacion']
search_fields = ['rfc', 'nombre']
ordering_fields = ['created_at', 'updated_at', 'rfc']
ordering = ['-created_at']
model = Importador
def get_queryset(self):
return self.get_queryset_filtrado_por_organizacion()
def perform_create(self, serializer):
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("Usuario no autenticado o sin organización")
serializer.save(organizacion=self.request.user.organizacion)
def perform_update(self, serializer):
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("Usuario no autenticado o sin organización")
# Si es superusuario, permite actualizar sin restricciones
if self.request.user.is_superuser:
serializer.save()
return
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user.groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
# Para usuarios normales, usar siempre su organización
if not hasattr(self.request.user, 'organizacion') or not self.request.user.organizacion:
raise ValueError("Usuario sin organización")
serializer.save(organizacion=self.request.user.organizacion)
return
raise ValueError("Usuario no autenticado o sin permisos para actualizar Importador")
my_tags = ['Importadores']
class EjecutarComandoView(APIView):
permission_classes = [IsAuthenticated & (IsSameOrganization | IsSameOrganizationAndAdmin | IsSameOrganizationDeveloper | IsSuperUser)]
"""
View para ejecutar el comando de microservicios desde una petición HTTP.
"""
def post(self, request):
# Obtener organizacion_id del request (si se envía)
organizacion_id_request = request.data.get('organizacionid', None)
procesamiento = request.data.get('procesamiento', None)
todos = request.data.get('todos', False)
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'organizacion'):
raise ValueError("Usuario no autenticado o sin organización")
if organizacion_id_request is None:
return Response(
{"error": 'No se proporcionó la organización a ejecutar el proceso.'},
status=status.HTTP_400_BAD_REQUEST
)
# organizacion_id = self.request.user.organizacion.id
organizacion_id = organizacion_id_request
nombre_organizacion = self.request.user.organizacion.nombre
if procesamiento is None and todos == False:
return Response(
{"message": 'No se detectó el tipo de ejecución de procesamiento.'},
status=status.HTTP_400_BAD_REQUEST
)
procesamiento = str(procesamiento)
from api.customs.tasks import microservice_v2
if todos:
microservice_v2.ejecutar_todos_por_organizacion(organizacion_id)
return Response(
{"message": f'Se estarán ejecutando todos los procesos para la organización {nombre_organizacion} en segundo plano.'},
status=status.HTTP_200_OK
)
elif organizacion_id:
if procesamiento:
microservice_v2.ejecutar_por_organizacion_y_procesamiento(organizacion_id, procesamiento)
return Response(
{"message": f'Se estará ejecutando el procesamiento {procesamiento} para la organización {nombre_organizacion} en segundo plano.'},
status=status.HTTP_200_OK
)
return Response(
{"error": "Parámetros insuficientes. Proporcione 'organizacion' y 'procesamiento', o seleccione 'todos'."},
status=status.HTTP_400_BAD_REQUEST
)
my_tags = ['Procesamientos_Pedimentos']
def bulk_upload_record_task_1(organizacion_id, parametros, archivo_paths):
"""
Procesa archivos ZIP de pedimentos en segundo plano.
Args:
organizacion_id: UUID de la organización
parametros: dict con keys:
- contribuyente
- fecha_pago_input
- clave_pedimento_input
- patente_input
- tipo_operacion_input
- aduana_input
- curp_apoderado_input
- partidas_input
archivo_paths: lista de rutas temporales de archivos ZIP
"""
from api.organization.models import Organizacion
from api.customs.models import Pedimento, Importador, TipoOperacion
from api.record.models import Document, DocumentType, Fuente
created_pedimentos = []
updated_pedimentos = []
failed_records = []
documents_created = 0
temp_dir = None
try:
organizacion = Organizacion.objects.get(id=organizacion_id)
# Extraer parámetros
contribuyente = parametros.get('contribuyente', None)
fecha_pago_input = parametros.get('fecha_pago_input', None)
clave_pedimento_input = parametros.get('clave_pedimento_input', None)
patente_input = parametros.get('patente_input', None)
tipo_operacion_input = parametros.get('tipo_operacion_input', None)
aduana_input = parametros.get('aduana_input', None)
curp_apoderado_input = parametros.get('curp_apoderado_input', None)
partidas_input = parametros.get('partidas_input', None)
# Regex patterns
nomenclatura_pattern = re.compile(r'^(\d{2})-(\d{2,3})-(\d{4})-(\d{7})$')
nomenclatura_pattern_sin_anio = re.compile(r'^(\d{2,3})-(\d{4})-(\d{7})$')
# Obtener DocumentType
try:
document_type = DocumentType.objects.get(nombre="Pedimento")
except DocumentType.DoesNotExist:
document_type = DocumentType.objects.create(
nombre="Pedimento",
descripcion="Documento de pedimento"
)
# Fuente
fuente, _ = Fuente.objects.get_or_create(
nombre="APP-EFC",
descripcion='Transmitido por la app de escritorio'
)
# Usar el directorio donde están los archivos (ya guardado en MEDIA_ROOT)
# El directorio base es el padre del primer archivo
if archivo_paths:
temp_dir = os.path.dirname(archivo_paths[0])
else:
temp_dir = tempfile.mkdtemp()
# Patrón para nomenclatura especial M8988852.300
patron_nomenclatura = re.compile(r'^[m|M]\d{7}\.\d{3}$', re.IGNORECASE)
existing_pedimento = None
for archivo_path in archivo_paths:
archivo_name = os.path.basename(archivo_path).lower()
archivo_name_sin_extension = os.path.splitext(os.path.basename(archivo_path))[0]
sub_dir = os.path.join(temp_dir, archivo_name_sin_extension)
os.makedirs(sub_dir, exist_ok=True)
print(f"Procesando archivo: {archivo_name} en ruta temporal: {archivo_path}")
if archivo_name.endswith('.zip'):
try:
with zipfile.ZipFile(archivo_path, 'r') as zip_ref:
zip_ref.extractall(sub_dir)
os.remove(archivo_path) # Eliminar el archivo ZIP después de extraerlo
except zipfile.BadZipFile as e:
failed_records.append({
"file": archivo_path,
"archivo_original": archivo_name,
"error": f"Archivo ZIP corrupto o inválido: {str(e)}"
})
continue
except Exception as e:
failed_records.append({
"file": archivo_path,
"archivo_original": archivo_name,
"error": f"Error al extraer ZIP: {str(e)}"
})
continue
else:
failed_records.append({
"file": archivo_path,
"archivo_original": archivo_name,
"error": "Solo se admiten archivos ZIP"
})
continue
# Procesar archivos extraídos
for root, dirs, files in os.walk(temp_dir):
for file_name in files:
file_path = os.path.join(root, file_name)
relative_path = os.path.relpath(file_path, temp_dir)
# Determinar folder_name
folder_name = None
if os.path.dirname(relative_path):
folder_parts = relative_path.split(os.sep)
folder_name = folder_parts[0]
else:
folder_name = os.path.splitext(file_name)[0]
# Validar nomenclatura
match = nomenclatura_pattern.match(folder_name)
match_sin_anio = nomenclatura_pattern_sin_anio.match(folder_name)
if not match and not match_sin_anio:
archivo_original = folder_name + '.zip'
failed_records.append({
"file": relative_path,
"archivo_original": archivo_original,
"error": f"Nomenclatura inválida: {folder_name}. Esperado: anio-aduana-patente-pedimento"
})
continue
if match:
anio, aduana, patente, pedimento_num = match.groups()
try:
anio_completo = 2000 + int(anio) if int(anio) < 50 else 1900 + int(anio)
fecha_pago = datetime(anio_completo, 1, 1).date()
except ValueError:
failed_records.append({
"file": relative_path,
"archivo_original": folder_name + '.zip',
"error": f"Año inválido: {anio}"
})
continue
elif match_sin_anio:
aduana, patente, pedimento_num = match_sin_anio.groups()
primer_digito_pedimento = int(pedimento_num[0]) if pedimento_num else 0
año_actual = datetime.now().year
año_con_digito = int(str(año_actual)[:-1] + str(primer_digito_pedimento))
if año_con_digito <= año_actual:
año_final = año_con_digito
else:
año_final = año_con_digito - 10
anio = año_final % 100
fecha_pago = datetime(año_final, 1, 1).date()
# Generar pedimento_app
pedimento_app = f"{anio}-{aduana.zfill(2)}-{patente}-{pedimento_num}"
# Verificar si el pedimento ya existe
existing_pedimento = Pedimento.objects.filter(
pedimento_app=pedimento_app,
organizacion=organizacion
).first()
if not existing_pedimento:
# Crear nuevo pedimento
try:
importador = None
if contribuyente:
importador, created = Importador.objects.get_or_create(
rfc=contribuyente,
defaults={
'nombre': f"Importador {contribuyente}",
'organizacion': organizacion
}
)
tipo_op = None
if tipo_operacion_input:
tipo_op = TipoOperacion.objects.get(id=tipo_operacion_input)
pedimento = Pedimento.objects.create(
organizacion=organizacion,
contribuyente=importador if importador else None,
pedimento=str(pedimento_num),
aduana=str(aduana),
patente=str(patente),
fecha_pago=fecha_pago_input if fecha_pago_input else fecha_pago,
curp_apoderado=curp_apoderado_input if curp_apoderado_input else "",
numero_partidas=partidas_input if partidas_input else 0,
tipo_operacion=tipo_op if tipo_op else None,
pedimento_app=pedimento_app,
agente_aduanal=f"Agente {patente}",
clave_pedimento=clave_pedimento_input if clave_pedimento_input else "A1"
)
existing_pedimento = pedimento
created_pedimentos.append({
"id": str(pedimento.id),
"pedimento_app": pedimento_app,
"contribuyente": getattr(importador, 'rfc', None),
"contribuyente_nombre": getattr(importador, 'nombre', None)
})
except Exception as e:
failed_records.append({
"file": relative_path,
"archivo_original": folder_name + '.zip',
"error": f"Error al crear pedimento: {str(e)}"
})
continue
else:
# Actualizar pedimento existente
if contribuyente:
importador, created = Importador.objects.get_or_create(
rfc=contribuyente,
defaults={
'nombre': f"Importador {contribuyente}",
'organizacion': organizacion
}
)
importador_db = existing_pedimento.contribuyente
if importador_db:
if importador_db != importador:
existing_pedimento.contribuyente = importador
else:
existing_pedimento.contribuyente = importador
existing_pedimento.save()
# Actualizar Tipo Operacion
if tipo_operacion_input:
tipo_op = TipoOperacion.objects.get(id=tipo_operacion_input)
if tipo_op and not existing_pedimento.tipo_operacion:
existing_pedimento.tipo_operacion = tipo_op
existing_pedimento.save()
# Actualizar fecha de pago
if fecha_pago_input:
fecha_db = existing_pedimento.fecha_pago
if fecha_db:
if isinstance(fecha_db, datetime):
fecha_db = fecha_db.date()
if fecha_db.month == 1 and fecha_db.day == 1:
existing_pedimento.fecha_pago = fecha_pago_input
existing_pedimento.save()
else:
existing_pedimento.fecha_pago = fecha_pago_input
existing_pedimento.save()
# Actualizar clave_pedimento
if clave_pedimento_input:
clave_pedimento = existing_pedimento.clave_pedimento
if not clave_pedimento or clave_pedimento.strip() != clave_pedimento_input.strip():
existing_pedimento.clave_pedimento = clave_pedimento_input
existing_pedimento.save()
# Actualizar curp_apoderado
if curp_apoderado_input:
if not existing_pedimento.curp_apoderado:
existing_pedimento.curp_apoderado = curp_apoderado_input
existing_pedimento.save()
# Actualizar partidas
if partidas_input:
num_partidas = existing_pedimento.numero_partidas
if not num_partidas or num_partidas <= 0:
existing_pedimento.numero_partidas = partidas_input
existing_pedimento.save()
# Crear documento asociado al pedimento
try:
with open(file_path, 'rb') as f:
file_content = f.read()
file_name_lower = file_name.lower()
tiene_nomenclatura_especial = False
info_extraida = {}
nombre_base, extension = os.path.splitext(file_name)
if patron_nomenclatura.match(file_name_lower):
tiene_nomenclatura_especial = True
info_extraida = procesar_archivo_m_con_nomenclatura(file_content, existing_pedimento)
django_file = ContentFile(file_content, name=file_name)
# Buscar documento existente
existing_documents = Document.objects.filter(
pedimento_id=existing_pedimento.id,
organizacion=organizacion
)
existing_document = None
for doc in existing_documents:
if is_same_document(doc, file_name):
existing_document = doc
break
if existing_document:
# Actualizar documento existente
# try:
# if existing_document.archivo and os.path.exists(existing_document.archivo.path):
# os.remove(existing_document.archivo.path)
# except (ValueError, OSError):
# pass
# existing_document.archivo = django_file
# existing_document.size = len(file_content)
# existing_document.extension = extension
# existing_document.updated_at = timezone.now()
# existing_document.save()
# doc = Document.objects.get(id=existing_document.id)
# doc.archivo.delete(save=False) # Eliminar el archivo anterior
# doc.delete() # Eliminar el registro para crear uno nuevo (evita problemas con archivos en Django)
updated_pedimentos.append({
"id": str(existing_pedimento.id),
"pedimento_app": existing_pedimento.pedimento_app,
"accion": "Documento actualizado",
"documento": file_name
})
documents_created += 1
else:
# Crear nuevo documento
document = Document.objects.create(
organizacion=organizacion,
pedimento_id=existing_pedimento.id,
document_type=document_type,
fuente_id=fuente.id,
archivo=django_file,
size=len(file_content),
extension=os.path.splitext(file_name)[1].lower().lstrip('.')
)
updated_pedimentos.append({
"id": str(existing_pedimento.id),
"pedimento_app": existing_pedimento.pedimento_app,
"accion": "Documento creado",
"documento": file_name
})
documents_created += 1
except Exception as e:
failed_records.append({
"file": relative_path,
"archivo_original": folder_name + '.zip',
"error": f"Error al crear documento: {str(e)}"
})
continue
# Actualizar estado de expediente
if documents_created > 0 and existing_pedimento:
existing_pedimento.existe_expediente = True
existing_pedimento.save()
return {
'status': 'completed',
'created_pedimentos': created_pedimentos,
'updated_pedimentos': updated_pedimentos,
'failed_records': failed_records,
'documents_created': documents_created,
'tieneError': len(failed_records) > 0
}
except Exception as e:
pass
finally:
# Limpiar directorio temporal
if temp_dir and os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir)
except Exception as e:
pass
# helper | reglas para formato de docuemnto antes de cargarlo
def normalize_filename(filename):
"""
Normaliza el nombre del archivo removiendo caracteres especiales,
espacios y asegurando consistencia.
"""
filename = normalize('NFKD', filename).encode('ASCII', 'ignore').decode('ASCII')
filename = re.sub(r'[^\w\s.-]', '_', filename) # Remover caracteres no alfanuméricos
filename = re.sub(r'[\s()]+', '_', filename) # Reemplazar espacios y paréntesis
filename = re.sub(r'_+', '_', filename) # Consolidar múltiples _
filename = filename.strip('_') # Remover _ al inicio/final
return filename
def get_clean_base_filename(filename):
"""
Obtiene el nombre base limpio sin el sufijo de Django.
"""
normalized = normalize_filename(filename)
name_without_ext, ext = os.path.splitext(normalized)
django_suffix = extract_django_suffix(name_without_ext)
if django_suffix:
base_name = name_without_ext[:-8]
else:
base_name = name_without_ext
base_name = re.sub(r'(_copy|_copia|_-_copia|_-_copy)(_\d+)?$', '', base_name)
return base_name.lower().strip('_')
def is_same_document(existing_doc, new_filename):
"""
Compara si un documento existente y un nuevo archivo son el mismo documento.
Args:
existing_doc: Objeto Document existente
new_filename: Nombre del nuevo archivo a subir
Returns:
bool: True si son el mismo documento
"""
existing_basename = os.path.basename(existing_doc.archivo.name)
existing_base = get_clean_base_filename(existing_basename)
new_base = get_clean_base_filename(new_filename)
existing_ext = existing_doc.extension.lower()
new_ext = os.path.splitext(new_filename)[1].lower().lstrip('.')
return existing_base == new_base and existing_ext == new_ext
def extract_django_suffix(filename):
"""
Extrae el sufijo único que Django añade a los archivos.
"""
name_without_ext = os.path.splitext(filename)[0]
match = re.search(r'_([a-zA-Z0-9]{7})$', name_without_ext)
if match:
return match.group(1)
return None
def get_clean_base_filename(filename):
"""
Obtiene el nombre base limpio sin el sufijo de Django.
"""
normalized = normalize_filename(filename)
name_without_ext, ext = os.path.splitext(normalized)
django_suffix = extract_django_suffix(name_without_ext)
if django_suffix:
base_name = name_without_ext[:-8]
else:
base_name = name_without_ext
base_name = re.sub(r'(_copy|_copia|_-_copia|_-_copy)(_\d+)?$', '', base_name)
return base_name.lower().strip('_')
def procesar_archivo_m_con_nomenclatura(content, pedimento_instance):
"""
Procesa archivos con nomenclatura M8988852.300 (7 dígitos, punto, 3 dígitos)
y extrae información de registros específicos para actualizar el pedimento.
Args:
content: bytes del contenido del archivo
pedimento_instance: instancia del modelo Pedimento
Returns:
dict: Diccionario con información extraída
"""
try:
# Decodificar el contenido como texto
content_text = content.decode('utf-8', errors='ignore')
# Buscar todas las líneas que empiezan con los registros solicitados
registros = {}
for line in content_text.splitlines():
line = line.strip()
if not line:
continue
# Dividir por pipe
parts = line.split('|')
if len(parts) < 2:
continue
tipo_registro = parts[0]
# Guardar todos los registros encontrados
if tipo_registro not in registros:
registros[tipo_registro] = []
registros[tipo_registro].append(parts)
# Procesar información específica
info_extraida = {
'tiene_nomenclatura_especial': False,
'registros_encontrados': list(registros.keys()),
'detalles_registro_500': [],
'detalles_registro_506': [],
'detalles_registro_501': [],
'detalles_registro_551': [],
'detalles_registro_800': [],
'detalles_registro_801': [],
'actualizaciones_aplicadas': []
}
# Verificar si hay registros del tipo 500 (indicador de archivo válido)
if '500' in registros:
info_extraida['tiene_nomenclatura_especial'] = True
# Procesar registro 500: Información básica del pedimento
for reg_500 in registros['500']:
if len(reg_500) >= 1:
info_extraida['detalles_registro_500'].append({
'tipo_movimiento': reg_500[1] if len(reg_500) > 1 else None,
'patente': reg_500[2] if len(reg_500) > 1 else None,
'numero_pedimento': reg_500[3] if len(reg_500) > 1 else None,
'aduana_seccion': reg_500[4] if len(reg_500) > 1 else None,
'acuse_electronico': reg_500[5] if len(reg_500) > 1 else None,
})
# Procesar registro 506: Fechas importantes
for reg_506 in registros.get('506', []):
if len(reg_506) >= 1:
info_extraida['detalles_registro_506'].append({
'numero_pedimento': reg_506[1] if len(reg_506) > 1 else None,
'tipo_fecha': reg_506[2] if len(reg_506) > 1 else None,
'fecha': reg_506[3] if len(reg_506) > 1 else None
})
# Procesar registro 501: Información del importador/exportador
for reg_501 in registros.get('501', []):
if len(reg_501) >= 1:
info_extraida['detalles_registro_501'].append({
'patente': reg_501[1] if len(reg_501) > 1 else None,
'numero_pedimento': reg_501[2] if len(reg_501) > 1 else None,
'aduana_seccion': reg_501[3] if len(reg_501) > 1 else None,
'rfc': reg_501[8] if len(reg_501) > 1 else None,
'curp': reg_501[9] if len(reg_501) > 1 else None
})
# Procesar registro 551: Información de partidas
for reg_551 in registros.get('551', []):
if len(reg_551) >= 1:
info_extraida['detalles_registro_551'].append({
'numero_pedimento': reg_501[1] if len(reg_501) > 1 else None,
'fraccion_arancelaria': reg_551[2] if len(reg_551) > 1 else None,
'partida': reg_551[3] if len(reg_551) > 1 else None,
'subfraccion': reg_551[4] if len(reg_551) > 1 else None
})
# Electrónica de Pedimento
for reg_801 in registros.get('800', []):
if len(reg_801) >= 1:
info_extraida['detalles_registro_800'].append({
'numero_pedimento': reg_801[1] if len(reg_801) > 1 else None
})
# Fin de Archivo
for reg_801 in registros.get('801', []):
if len(reg_801) >= 1:
info_extraida['detalles_registro_801'].append({
'total_partidas': reg_801[1] if len(reg_801) > 1 else None
})
# Intentar actualizar campos del pedimento con la información extraída
actualizaciones = actualizar_pedimento_con_registros(pedimento_instance, registros)
info_extraida['actualizaciones_aplicadas'] = actualizaciones
return info_extraida
except Exception as e:
print(f"Error al procesar archivo con nomenclatura especial: {str(e)}")
return {
'tiene_nomenclatura_especial': False,
'error': str(e),
'registros_encontrados': []
}
def actualizar_pedimento_con_registros(pedimento_instance, registros):
"""
Actualiza el pedimento con información extraída de los registros.
Args:
pedimento_instance: Instancia del pedimento a actualizar
registros: Diccionario con registros parseados
Returns:
list: Lista de actualizaciones aplicadas
"""
actualizaciones = []
try:
# Extraer información del registro 500 (si existe)
if '500' in registros and registros['500']:
for reg_500 in registros['500']:
if len(reg_500) >= 1:
# Actualizar número de pedimento si está vacío
if pedimento_instance.pedimento == reg_500[3]:
try:
pedimento_instance.aduana = reg_500[4]
actualizaciones.append(f"aduana actualizada a {reg_500[4]}")
except ValueError:
pass
# Extraer información del registro 501 (importador/exportador)
if '501' in registros and registros['501']:
for reg_501 in registros['501']:
if len(reg_501) >= 1:
rfc = reg_501[8] if len(reg_501) > 1 else None
# Actualizar importador si hay RFC y no existe
if rfc and not pedimento_instance.contribuyente and pedimento_instance.pedimento == reg_501[2]:
try:
from api.customs.models import Importador
importador, created = Importador.objects.get_or_create(
rfc=rfc,
defaults={
'nombre': f"Importador {rfc}",
'organizacion': pedimento_instance.organizacion
}
)
pedimento_instance.contribuyente = importador
if created:
actualizaciones.append(f"importador creado con RFC {rfc}")
else:
actualizaciones.append(f"importador asociado con RFC {rfc}")
except Exception as e:
print(f"Error al crear/obtener importador: {str(e)}")
# Extraer CURP del registro 501
if '501' in registros and registros['501']:
for reg_501 in registros['501']:
if len(reg_501) >= 1:
curp = reg_501[9] if len(reg_501) > 1 else None
# Actualizar CURP del apoderado si está vacío
if curp and not pedimento_instance.curp_apoderado and pedimento_instance.pedimento == reg_501[2]:
pedimento_instance.curp_apoderado = curp
actualizaciones.append(f"curp_apoderado actualizado a {curp}")
# Extraer Tipo Operacion del registro 501
if '501' in registros and registros['501']:
for reg_501 in registros['501']:
if len(reg_501) >= 1:
tipo_operacion = reg_501[4] if len(reg_501) > 1 else None
# Actualizar tipo de operación si no existe
if tipo_operacion and pedimento_instance.pedimento == reg_501[2]:
if tipo_operacion=='1':
nombre_tipo_op = "Importacion"
elif tipo_operacion=='2':
nombre_tipo_op = "Exportacion"
else:
nombre_tipo_op = f"Tipo {tipo_operacion}"
try:
from api.customs.models import TipoOperacion
tipo_op_obj, created = TipoOperacion.objects.get_or_create(
id=tipo_operacion,
tipo=nombre_tipo_op,
defaults={'descripcion': f"Tipo de Operación {tipo_operacion}"}
)
pedimento_instance.tipo_operacion = tipo_op_obj
if created:
actualizaciones.append(f"tipo_operacion creado con tipo {tipo_operacion}")
else:
actualizaciones.append(f"tipo_operacion asociado con tipo {tipo_operacion}")
except Exception as e:
print(f"Error al crear/obtener tipo de operación: {str(e)}")
# Extraer Clave Pedimento
if '501' in registros and registros['501']:
for reg_501 in registros['501']:
if len(reg_501) >= 1:
clave = reg_501[5] if len(reg_501) > 1 else None
# Actualizar clave si no existe
if clave and pedimento_instance.pedimento == reg_501[2]:
pedimento_instance.clave_pedimento = clave
actualizaciones.append(f"clave pedimento actualizada a {clave}")
# Extraer fechas del registro 506
if '506' in registros and registros['506']:
for reg_506 in registros['506']:
if not pedimento_instance.pedimento == reg_506[1]:
continue
if len(reg_506) >= 1:
tipo_fecha = reg_506[2] if len(reg_506) > 1 else None
fecha_str = reg_506[3] if len(reg_506) > 1 else None
if not tipo_fecha == '2':
continue
# Procesar fecha según formato (DDMMYYYY o DDMMYY)
if fecha_str:
try:
# Intentar diferentes formatos de fecha
if len(fecha_str) == 8: # DDMMYYYY
fecha = datetime.strptime(fecha_str, '%d%m%Y').date()
elif len(fecha_str) == 6: # DDMMYY
fecha = datetime.strptime(fecha_str, '%d%m%y').date()
else:
continue
# Asignar como fecha de pago si no existe
# if not pedimento_instance.fecha_pago:
# pedimento_instance.fecha_pago = fecha
# actualizaciones.append(f"fecha_pago actualizada a {fecha}")
pedimento_instance.fecha_pago = fecha
actualizaciones.append(f"fecha_pago actualizada a {fecha}")
except (ValueError, TypeError):
pass
num_partidas = 0
if '551' in registros and registros['551']:
for reg_551 in registros['551']:
if not pedimento_instance.pedimento == reg_551[1]:
continue
num_partidas += 1
pedimento_instance.numero_partidas = num_partidas
actualizaciones.append(f"numero_partidas actualizado a {num_partidas}")
# Guardar los cambios si hubo actualizaciones
if actualizaciones:
pedimento_instance.save()
except Exception as e:
print(f"Error al actualizar pedimento con registros: {str(e)}")
actualizaciones.append(f"error: {str(e)}")
return actualizaciones