feature/T2026-05-016 implementar cargas de tareas en background e implementar y corregir auditoria para datastages

This commit is contained in:
Dulce
2026-05-18 11:54:46 -06:00
parent 3a636c14ae
commit 8cc0b9f573
8 changed files with 317 additions and 63 deletions

View File

@@ -1,6 +1,14 @@
from celery import shared_task, group from celery import shared_task, group
from api.customs.models import ProcesamientoPedimento, Pedimento, Cove, EDocument from api.customs.models import ProcesamientoPedimento, Pedimento, Cove, EDocument
from core.utils import xml_controller from core.utils import xml_controller
from api.customs.tasks.microservice import (
procesar_cove_individual,
procesar_acuse_individual,
procesar_acuse_cove_individual,
procesar_edoc_individual,
procesar_partida_individual,
procesar_remesa_individual,
)
@shared_task @shared_task
def crear_procesamiento_remesa(pedimento_id): def crear_procesamiento_remesa(pedimento_id):
@@ -11,7 +19,7 @@ def crear_procesamiento_remesa(pedimento_id):
if pedimento.remesas: if pedimento.remesas:
existe = ProcesamientoPedimento.objects.filter( existe = ProcesamientoPedimento.objects.filter(
pedimento=pedimento, pedimento=pedimento,
servicio_id=5, # ID del servicio de remesas servicio_id=5,
organizacion=pedimento.organizacion, organizacion=pedimento.organizacion,
estado_id__in=[1, 2, 3, 4] estado_id__in=[1, 2, 3, 4]
).exists() ).exists()
@@ -19,10 +27,11 @@ def crear_procesamiento_remesa(pedimento_id):
logger.info(f"[TAREA] ProcesamientoPedimento remesa creado para pedimento {pedimento_id}") logger.info(f"[TAREA] ProcesamientoPedimento remesa creado para pedimento {pedimento_id}")
ProcesamientoPedimento.objects.create( ProcesamientoPedimento.objects.create(
pedimento=pedimento, pedimento=pedimento,
estado_id=1, # Estado "pendiente" estado_id=1,
servicio_id=5, servicio_id=5,
organizacion=pedimento.organizacion organizacion=pedimento.organizacion
) )
procesar_remesa_individual.apply_async(args=[str(pedimento.id), str(pedimento.organizacion.id)])
@shared_task @shared_task
def crear_procesamiento_partida(pedimento_id): def crear_procesamiento_partida(pedimento_id):
@@ -32,7 +41,7 @@ def crear_procesamiento_partida(pedimento_id):
logger.info(f"[TAREA] crear_procesamiento_partida para pedimento {pedimento_id}") logger.info(f"[TAREA] crear_procesamiento_partida para pedimento {pedimento_id}")
existe = ProcesamientoPedimento.objects.filter( existe = ProcesamientoPedimento.objects.filter(
pedimento=pedimento, pedimento=pedimento,
servicio_id=4, # ID del servicio de partidas servicio_id=4,
organizacion=pedimento.organizacion, organizacion=pedimento.organizacion,
estado_id__in=[1, 2, 3, 4] estado_id__in=[1, 2, 3, 4]
).exists() ).exists()
@@ -40,10 +49,11 @@ def crear_procesamiento_partida(pedimento_id):
logger.info(f"[TAREA] ProcesamientoPedimento partida creado para pedimento {pedimento_id}") logger.info(f"[TAREA] ProcesamientoPedimento partida creado para pedimento {pedimento_id}")
ProcesamientoPedimento.objects.create( ProcesamientoPedimento.objects.create(
pedimento=pedimento, pedimento=pedimento,
estado_id=1, # Estado "pendiente" estado_id=1,
servicio_id=4, servicio_id=4,
organizacion=pedimento.organizacion organizacion=pedimento.organizacion
) )
procesar_partida_individual.apply_async(args=[str(pedimento.id), str(pedimento.organizacion.id)])
@shared_task @shared_task
def crear_procesamiento_cove(pedimento_id): def crear_procesamiento_cove(pedimento_id):
@@ -54,7 +64,7 @@ def crear_procesamiento_cove(pedimento_id):
if pedimento.coves.exists(): if pedimento.coves.exists():
existe = ProcesamientoPedimento.objects.filter( existe = ProcesamientoPedimento.objects.filter(
pedimento=pedimento, pedimento=pedimento,
servicio_id=8, # ID del servicio de Coves servicio_id=8,
organizacion=pedimento.organizacion, organizacion=pedimento.organizacion,
estado_id__in=[1, 2, 3, 4] estado_id__in=[1, 2, 3, 4]
).exists() ).exists()
@@ -62,10 +72,11 @@ def crear_procesamiento_cove(pedimento_id):
logger.info(f"[TAREA] ProcesamientoPedimento cove creado para pedimento {pedimento_id}") logger.info(f"[TAREA] ProcesamientoPedimento cove creado para pedimento {pedimento_id}")
ProcesamientoPedimento.objects.create( ProcesamientoPedimento.objects.create(
pedimento=pedimento, pedimento=pedimento,
estado_id=1, # Estado "pendiente" estado_id=1,
servicio_id=8, servicio_id=8,
organizacion=pedimento.organizacion organizacion=pedimento.organizacion
) )
procesar_cove_individual.apply_async(args=[str(pedimento.id), str(pedimento.organizacion.id)])
@shared_task @shared_task
def crear_procesamiento_acuse(pedimento_id): def crear_procesamiento_acuse(pedimento_id):
@@ -73,10 +84,10 @@ def crear_procesamiento_acuse(pedimento_id):
logger = logging.getLogger('api.customs.async_operations') logger = logging.getLogger('api.customs.async_operations')
pedimento = Pedimento.objects.get(id=pedimento_id) pedimento = Pedimento.objects.get(id=pedimento_id)
logger.info(f"[TAREA] crear_procesamiento_acuse para pedimento {pedimento_id}") logger.info(f"[TAREA] crear_procesamiento_acuse para pedimento {pedimento_id}")
if pedimento.coves.exists(): if pedimento.documentos.exists():
existe = ProcesamientoPedimento.objects.filter( existe = ProcesamientoPedimento.objects.filter(
pedimento=pedimento, pedimento=pedimento,
servicio_id=6, # ID del servicio de Acuse Cove servicio_id=6,
organizacion=pedimento.organizacion, organizacion=pedimento.organizacion,
estado_id__in=[1, 2, 3, 4] estado_id__in=[1, 2, 3, 4]
).exists() ).exists()
@@ -84,10 +95,11 @@ def crear_procesamiento_acuse(pedimento_id):
logger.info(f"[TAREA] ProcesamientoPedimento acuse creado para pedimento {pedimento_id}") logger.info(f"[TAREA] ProcesamientoPedimento acuse creado para pedimento {pedimento_id}")
ProcesamientoPedimento.objects.create( ProcesamientoPedimento.objects.create(
pedimento=pedimento, pedimento=pedimento,
estado_id=1, # Estado "pendiente" estado_id=1,
servicio_id=6, servicio_id=6,
organizacion=pedimento.organizacion organizacion=pedimento.organizacion
) )
procesar_acuse_individual.apply_async(args=[str(pedimento.id), str(pedimento.organizacion.id)])
@shared_task @shared_task
def crear_procesamiento_acuse_cove(pedimento_id): def crear_procesamiento_acuse_cove(pedimento_id):
@@ -98,7 +110,7 @@ def crear_procesamiento_acuse_cove(pedimento_id):
if pedimento.coves.exists(): if pedimento.coves.exists():
existe = ProcesamientoPedimento.objects.filter( existe = ProcesamientoPedimento.objects.filter(
pedimento=pedimento, pedimento=pedimento,
servicio_id=9, # ID del servicio de Acuse Cove servicio_id=9,
organizacion=pedimento.organizacion, organizacion=pedimento.organizacion,
estado_id__in=[1, 2, 3, 4] estado_id__in=[1, 2, 3, 4]
).exists() ).exists()
@@ -106,10 +118,11 @@ def crear_procesamiento_acuse_cove(pedimento_id):
logger.info(f"[TAREA] ProcesamientoPedimento acuse_cove creado para pedimento {pedimento_id}") logger.info(f"[TAREA] ProcesamientoPedimento acuse_cove creado para pedimento {pedimento_id}")
ProcesamientoPedimento.objects.create( ProcesamientoPedimento.objects.create(
pedimento=pedimento, pedimento=pedimento,
estado_id=1, # Estado "pendiente" estado_id=1,
servicio_id=9, servicio_id=9,
organizacion=pedimento.organizacion organizacion=pedimento.organizacion
) )
procesar_acuse_cove_individual.apply_async(args=[str(pedimento.id), str(pedimento.organizacion.id)])
@shared_task @shared_task
def crear_procesamiento_edocument(pedimento_id): def crear_procesamiento_edocument(pedimento_id):
@@ -120,7 +133,7 @@ def crear_procesamiento_edocument(pedimento_id):
if pedimento.documentos.exists(): if pedimento.documentos.exists():
existe = ProcesamientoPedimento.objects.filter( existe = ProcesamientoPedimento.objects.filter(
pedimento=pedimento, pedimento=pedimento,
servicio_id=7, # ID del servicio de EDocument servicio_id=7,
organizacion=pedimento.organizacion, organizacion=pedimento.organizacion,
estado_id__in=[1, 2, 3, 4] estado_id__in=[1, 2, 3, 4]
).exists() ).exists()
@@ -128,10 +141,11 @@ def crear_procesamiento_edocument(pedimento_id):
logger.info(f"[TAREA] ProcesamientoPedimento edocument creado para pedimento {pedimento_id}") logger.info(f"[TAREA] ProcesamientoPedimento edocument creado para pedimento {pedimento_id}")
ProcesamientoPedimento.objects.create( ProcesamientoPedimento.objects.create(
pedimento=pedimento, pedimento=pedimento,
estado_id=1, # Estado "pendiente" estado_id=1,
servicio_id=7, servicio_id=7,
organizacion=pedimento.organizacion organizacion=pedimento.organizacion
) )
procesar_edoc_individual.apply_async(args=[str(pedimento.id), str(pedimento.organizacion.id)])
@shared_task @shared_task
def crear_procesamiento_pedimento_completo(organizacion_id): def crear_procesamiento_pedimento_completo(organizacion_id):

View File

@@ -3,7 +3,12 @@ from django.urls import reverse
from rest_framework.test import APITestCase, APIClient from rest_framework.test import APITestCase, APIClient
from rest_framework import status from rest_framework import status
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from django.core.files.uploadedfile import SimpleUploadedFile
from unittest.mock import patch
from io import BytesIO
import zipfile
from api.organization.models import Organizacion from api.organization.models import Organizacion
from api.licence.models import Licencia
from .models import Pedimento, TipoOperacion, ProcesamientoPedimento, EDocument from .models import Pedimento, TipoOperacion, ProcesamientoPedimento, EDocument
User = get_user_model() User = get_user_model()
@@ -75,3 +80,147 @@ class CustomsViewsTests(APITestCase):
self.client.force_authenticate(user=self.admin) self.client.force_authenticate(user=self.admin)
response = self.client.get(url) response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.status_code, status.HTTP_200_OK)
# ---------------------------------------------------------------------------
# Tests de integración para bulk-create (ViewSetPedimento.bulk_create)
# Verifica que al re-cargar un pedimento existente sus documentos se actualicen
# ---------------------------------------------------------------------------
class BulkCreateDocumentReplaceTests(APITestCase):
"""Verifica que bulk-create actualiza los documentos de pedimentos existentes
en vez de ignorarlos, y que no quedan archivos residuales en el storage."""
PEDIMENTO_APP = "24-01-3420-1234567"
def setUp(self):
self.licencia = Licencia.objects.create(nombre="Lic100GB", almacenamiento=100)
self.org = Organizacion.objects.create(
nombre="OrgBulkCreate",
licencia=self.licencia,
is_active=True,
is_verified=True,
)
self.user = User.objects.create_user(
username="bulkcreateuser", password="pass", organizacion=self.org
)
self.pedimento = Pedimento.objects.create(
organizacion=self.org,
pedimento="1234567",
pedimento_app=self.PEDIMENTO_APP,
)
from api.record.models import DocumentType, Fuente
self.doc_type = DocumentType.objects.get_or_create(nombre="Pedimento")[0]
# bulk_create usa fuente_id=4 hardcodeado; debe existir en la DB de test
Fuente.objects.get_or_create(id=4, defaults={"nombre": "Bulk Create"})
self.url = reverse("Pedimento-bulk-create")
self.client.force_authenticate(user=self.user)
def _make_zip(self, files_dict):
"""Crea un ZIP en memoria. files_dict = {nombre_archivo: contenido_bytes}"""
buf = BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
for name, content in files_dict.items():
zf.writestr(name, content)
buf.seek(0)
return SimpleUploadedFile(
f"{self.PEDIMENTO_APP}.zip", buf.read(), content_type="application/zip"
)
def _post_zip(self, files_dict):
return self.client.post(
self.url,
{"contribuyente": "XAXX010101000", "archivos": [self._make_zip(files_dict)]},
format="multipart",
)
@patch("api.customs.views.storage_service")
def test_existing_pedimento_not_duplicated(self, mock_st):
"""Re-subir un pedimento existente NO debe crear un segundo Pedimento."""
mock_st.save_document_from_path.return_value = "org_1/documents/ped/informe_a1b2c3d4.pdf"
self._post_zip({"informe.pdf": b"contenido"})
self.assertEqual(
Pedimento.objects.filter(
organizacion=self.org, pedimento_app=self.PEDIMENTO_APP
).count(),
1,
)
@patch("api.customs.views.storage_service")
def test_existing_pedimento_document_replaced_not_duplicated(self, mock_st):
"""Documento existente con el mismo nombre base se reemplaza, no se duplica."""
from api.record.models import Document
old_path = f"org_1/documents/{self.PEDIMENTO_APP}/informe_a1b2c3d4.pdf"
old_doc = Document.objects.create(
organizacion=self.org,
pedimento=self.pedimento,
document_type=self.doc_type,
archivo=old_path,
size=500,
extension="pdf",
)
new_path = f"org_1/documents/{self.PEDIMENTO_APP}/informe_b5c6d7e8.pdf"
mock_st.save_document_from_path.return_value = new_path
mock_st.delete_file.return_value = True
self._post_zip({"informe.pdf": b"contenido actualizado"})
docs = Document.objects.filter(pedimento=self.pedimento)
# Sin duplicados
self.assertEqual(docs.count(), 1)
# Mismo registro
self.assertEqual(docs.first().id, old_doc.id)
# Archivo actualizado
old_doc.refresh_from_db()
self.assertEqual(old_doc.archivo.name, new_path)
@patch("api.customs.views.storage_service")
def test_existing_pedimento_stale_file_deleted_from_storage(self, mock_st):
"""Al reemplazar un documento, el archivo viejo debe eliminarse del storage."""
from api.record.models import Document
old_path = f"org_1/documents/{self.PEDIMENTO_APP}/informe_a1b2c3d4.pdf"
Document.objects.create(
organizacion=self.org,
pedimento=self.pedimento,
document_type=self.doc_type,
archivo=old_path,
size=500,
extension="pdf",
)
mock_st.save_document_from_path.return_value = f"org_1/documents/{self.PEDIMENTO_APP}/informe_b5c6d7e8.pdf"
mock_st.delete_file.return_value = True
self._post_zip({"informe.pdf": b"contenido"})
# delete_file debe haberse llamado con la ruta del archivo viejo
mock_st.delete_file.assert_called()
called_arg = str(mock_st.delete_file.call_args[0][0])
self.assertIn("informe_a1b2c3d4", called_arg)
@patch("api.customs.views.storage_service")
def test_existing_pedimento_new_file_added(self, mock_st):
"""Archivo nuevo en el ZIP se añade al pedimento existente."""
from api.record.models import Document
mock_st.save_document_from_path.return_value = "org_1/documents/ped/nuevo_b5c6d7e8.pdf"
self._post_zip({"nuevo_documento.pdf": b"contenido nuevo"})
self.assertGreaterEqual(
Document.objects.filter(pedimento=self.pedimento).count(), 1
)
@patch("api.customs.views.storage_service")
def test_already_existing_count_in_response(self, mock_st):
"""La respuesta debe indicar que el pedimento ya existía (already_existing_count >= 1)."""
mock_st.save_document_from_path.return_value = "org_1/documents/ped/f_a1b2c3d4.pdf"
response = self._post_zip({"archivo.pdf": b"contenido"})
self.assertIn(response.status_code, [status.HTTP_200_OK, status.HTTP_207_MULTI_STATUS, status.HTTP_201_CREATED])
data = response.json()
self.assertGreaterEqual(data.get("already_existing_count", 0), 1)

View File

@@ -39,6 +39,7 @@ from .views_auditor import (
auditar_acuse_cove_endpoint, auditar_acuse_cove_endpoint,
auditar_edocuments_endpoint, auditar_edocuments_endpoint,
auditar_acuse_endpoint, auditar_acuse_endpoint,
auditar_remesas_endpoint,
auditar_cove_pedimento_endpoint, auditar_cove_pedimento_endpoint,
auditar_acuse_cove_pedimento_endpoint, auditar_acuse_cove_pedimento_endpoint,
auditar_edocument_pedimento_endpoint, auditar_edocument_pedimento_endpoint,
@@ -72,6 +73,7 @@ urlpatterns = [
path('auditor/auditar-acuse-cove/', auditar_acuse_cove_endpoint, name='auditar-acuse-cove'), path('auditor/auditar-acuse-cove/', auditar_acuse_cove_endpoint, name='auditar-acuse-cove'),
path('auditor/auditar-edocuments/', auditar_edocuments_endpoint, name='auditar-edocuments'), path('auditor/auditar-edocuments/', auditar_edocuments_endpoint, name='auditar-edocuments'),
path('auditor/auditar-acuse/', auditar_acuse_endpoint, name='auditar-acuse'), path('auditor/auditar-acuse/', auditar_acuse_endpoint, name='auditar-acuse'),
path('auditor/auditar-remesas/', auditar_remesas_endpoint, name='auditar-remesas'),
path('auditor/auditar-cove/pedimento/', auditar_cove_pedimento_endpoint, name='auditar-cove-pedimento'), path('auditor/auditar-cove/pedimento/', auditar_cove_pedimento_endpoint, name='auditar-cove-pedimento'),
path('auditor/auditar-acuse-cove/pedimento/', auditar_acuse_cove_pedimento_endpoint, name='auditar-acuse-cove-pedimento'), path('auditor/auditar-acuse-cove/pedimento/', auditar_acuse_cove_pedimento_endpoint, name='auditar-acuse-cove-pedimento'),
path('auditor/auditar-edocument/pedimento/', auditar_edocument_pedimento_endpoint, name='auditar-edocument-pedimento'), path('auditor/auditar-edocument/pedimento/', auditar_edocument_pedimento_endpoint, name='auditar-edocument-pedimento'),

View File

@@ -84,7 +84,9 @@ class Registro501(models.Model):
organizacion = models.ForeignKey('organization.Organizacion', on_delete=models.CASCADE, related_name='registro501s', null=True, blank=True) organizacion = models.ForeignKey('organization.Organizacion', on_delete=models.CASCADE, related_name='registro501s', null=True, blank=True)
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro501s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro501s', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro501' db_table = 'registro501'
@@ -104,6 +106,8 @@ class Registro502(models.Model):
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro502s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro502s', null=True, blank=True)
patente = models.CharField(max_length=50, null=True, blank=True) patente = models.CharField(max_length=50, null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro502' db_table = 'registro502'
@@ -120,6 +124,8 @@ class Registro503(models.Model):
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro503s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro503s', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro503' db_table = 'registro503'
@@ -136,6 +142,8 @@ class Registro504(models.Model):
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro504s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro504s', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro504' db_table = 'registro504'
@@ -165,6 +173,8 @@ class Registro505(models.Model):
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro505s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro505s', null=True, blank=True)
patente = models.CharField(max_length=50, null=True, blank=True) patente = models.CharField(max_length=50, null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro505' db_table = 'registro505'
@@ -181,6 +191,8 @@ class Registro506(models.Model):
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro506s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro506s', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro506' db_table = 'registro506'
@@ -199,6 +211,8 @@ class Registro507(models.Model):
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro507s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro507s', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro507' db_table = 'registro507'
@@ -223,6 +237,8 @@ class Registro508(models.Model):
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro508s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro508s', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro508' db_table = 'registro508'
@@ -241,6 +257,8 @@ class Registro509(models.Model):
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro509s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro509s', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro509' db_table = 'registro509'
@@ -261,6 +279,8 @@ class Registro510(models.Model):
forma_pago = models.CharField(max_length=3, null=True, blank=True) forma_pago = models.CharField(max_length=3, null=True, blank=True)
importe_pago = models.CharField(max_length=12, null=True, blank=True) importe_pago = models.CharField(max_length=12, null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro510' db_table = 'registro510'
@@ -278,6 +298,8 @@ class Registro511(models.Model):
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro511s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro511s', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro511' db_table = 'registro511'
@@ -301,6 +323,8 @@ class Registro512(models.Model):
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro512s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro512s', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro512' db_table = 'registro512'
@@ -363,6 +387,8 @@ class Registro551(models.Model):
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro551s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro551s', null=True, blank=True)
entidad_fed_destino = models.CharField(max_length=50, null=True, blank=True) entidad_fed_destino = models.CharField(max_length=50, null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro551' db_table = 'registro551'
@@ -381,6 +407,8 @@ class Registro552(models.Model):
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro552s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro552s', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro552' db_table = 'registro552'
@@ -402,6 +430,8 @@ class Registro553(models.Model):
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro553s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro553s', null=True, blank=True)
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro553' db_table = 'registro553'
@@ -421,6 +451,8 @@ class Registro554(models.Model):
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro554s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro554s', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro554' db_table = 'registro554'
@@ -446,6 +478,8 @@ class Registro555(models.Model):
created_by = models.IntegerField(null=True, blank=True) created_by = models.IntegerField(null=True, blank=True)
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro555' db_table = 'registro555'
@@ -465,6 +499,8 @@ class Registro556(models.Model):
fraccion = models.CharField(max_length=8, null=True, blank=True) fraccion = models.CharField(max_length=8, null=True, blank=True)
secuencia_fraccion = models.CharField(max_length=50, null=True, blank=True) secuencia_fraccion = models.CharField(max_length=50, null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro556' db_table = 'registro556'
@@ -484,6 +520,8 @@ class Registro557(models.Model):
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro557s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro557s', null=True, blank=True)
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro557' db_table = 'registro557'
@@ -502,6 +540,8 @@ class Registro558(models.Model):
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro558s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro558s', null=True, blank=True)
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro558' db_table = 'registro558'
@@ -522,6 +562,8 @@ class RegistroSel(models.Model):
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro_sel', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro_sel', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro_sel' db_table = 'registro_sel'
@@ -546,6 +588,8 @@ class Registro701(models.Model):
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro701s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro701s', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro701' db_table = 'registro701'
@@ -564,6 +608,8 @@ class Registro702(models.Model):
consulta = models.CharField(max_length=50, null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True)
datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro702s', null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro702s', null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta: class Meta:
db_table = 'registro702' db_table = 'registro702'

View File

@@ -9,6 +9,8 @@ import zipfile
import re import re
from api.utils.storage_service import storage_service from api.utils.storage_service import storage_service
logger = logging.getLogger(__name__)
@shared_task @shared_task
def procesar_datastage_task(datastage_id, user_organizacion_id=None): def procesar_datastage_task(datastage_id, user_organizacion_id=None):
import traceback import traceback
@@ -167,15 +169,22 @@ def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name):
continue continue
if first: if first:
field_names = [f for f in line_decoded.split('|')] field_names = line_decoded.split('|')
# Eliminar columnas vacías del final (líneas terminan con |)
while field_names and field_names[-1] == '':
field_names.pop()
field_names_snake = [to_snake_case(f) for f in field_names] field_names_snake = [to_snake_case(f) for f in field_names]
first = False first = False
continue continue
values = line_decoded.split('|') values = line_decoded.split('|')
while values and values[-1] == '': while values and values[-1] == '':
values.pop() values.pop()
if len(values) != len(field_names_snake): if len(values) != len(field_names_snake):
logger.debug(
"%s línea %d: esperados %d campos, recibidos %d — se omite",
asc_name, line_count, len(field_names_snake), len(values)
)
continue continue
data = dict(zip(field_names_snake, values)) data = dict(zip(field_names_snake, values))
@@ -185,28 +194,36 @@ def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name):
if hasattr(Model, 'datastage_id'): if hasattr(Model, 'datastage_id'):
data['datastage_id'] = datastage.id data['datastage_id'] = datastage.id
# Limpiar fechas vacías # Parsear y normalizar todos los campos de fecha/datetime
for field in Model._meta.get_fields(): for field in Model._meta.get_fields():
if hasattr(field, 'get_internal_type') and field.get_internal_type() in ["DateField", "DateTimeField"]: if not hasattr(field, 'get_internal_type'):
if data.get(field.name) == "": continue
data[field.name] = None field_type = field.get_internal_type()
val = data.get(field.name)
# Convertir fecha_pago_real if val == '' or val is None:
if 'fecha_pago_real' in data and data['fecha_pago_real']: data[field.name] = None
fecha_val = data['fecha_pago_real'] continue
if isinstance(fecha_val, str): if field_type == 'DateTimeField' and isinstance(val, str):
try: dt = None
dt = datetime.datetime.strptime(fecha_val, '%Y-%m-%d %H:%M:%S') for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'):
except ValueError:
try: try:
dt = datetime.datetime.strptime(fecha_val, '%Y-%m-%d') dt = datetime.datetime.strptime(val, fmt)
except Exception: break
dt = None except ValueError:
continue
if dt and timezone.is_naive(dt): if dt and timezone.is_naive(dt):
dt = timezone.make_aware(dt) dt = timezone.make_aware(dt)
if dt: data[field.name] = dt
data['fecha_pago_real'] = dt
# Filtrar data para solo incluir campos válidos del modelo
valid_fields = set()
for f in Model._meta.get_fields():
if hasattr(f, 'name'):
valid_fields.add(f.name)
if hasattr(f, 'attname'):
valid_fields.add(f.attname)
data = {k: v for k, v in data.items() if k in valid_fields}
try: try:
obj = Model(**data) obj = Model(**data)
objects_to_create.append(obj) objects_to_create.append(obj)
@@ -284,8 +301,9 @@ def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name):
try: try:
Pedimento.objects.create(**pedimento_data) Pedimento.objects.create(**pedimento_data)
except Exception as ped_exc: except Exception as ped_exc:
pass logger.warning("No se pudo crear Pedimento %s: %s", pedimento_app, ped_exc)
except Exception as e: except Exception as e:
logger.error("%s línea %d: error creando objeto %s: %s", asc_name, line_count, model_name, e)
continue continue
# Bulk create # Bulk create

View File

@@ -57,46 +57,61 @@ from celery.result import AsyncResult
class TaskStatusView(APIView): class TaskStatusView(APIView):
"""
Vista para consultar el estado de tareas de Celery.
"""
permission_classes = [IsAuthenticated] permission_classes = [IsAuthenticated]
def get(self, request, task_id): def get(self, request, task_id):
""" """
Consulta el estado de una tarea de Celery. Consulta el estado de una tarea Celery.
Returns: Estados posibles:
- PENDING: La tarea está esperando ser procesada PENDING — en cola, aún no inició
- STARTED: La tarea ha sido iniciada STARTED — worker la tomó y está ejecutando
- SUCCESS: La tarea se completó exitosamente SUCCESS — terminó correctamente, `result` contiene el resumen
- FAILURE: La tarea falló FAILURE — lanzó una excepción no capturada, `error` describe el problema
- RETRY: La tarea está reintentando RETRY — el worker la está reintentando
""" """
try: try:
task_result = AsyncResult(task_id) task_result = AsyncResult(task_id)
state = task_result.state
response_data = { response_data = {
'task_id': task_id, 'task_id': task_id,
'status': task_result.state, 'status': state,
'ready': task_result.ready(), 'ready': task_result.ready(),
'successful': task_result.successful() if task_result.ready() else None, 'successful': task_result.successful() if task_result.ready() else None,
} }
if task_result.ready() and task_result.successful(): if state == 'SUCCESS':
try: result = task_result.result
response_data['result'] = task_result.result response_data['result'] = result
except Exception:
pass # Resumen legible cuando es auditoría masiva de organización
if isinstance(result, dict) and 'total_pedimentos' in result:
if task_result.state == 'FAILURE': total = result.get('total_pedimentos', 0)
completados = result.get('completados', 0)
con_pendientes = result.get('con_pendientes', 0)
con_errores = result.get('con_errores', 0)
if con_pendientes == 0 and con_errores == 0:
mensaje = f'Auditoría completa — {completados}/{total} pedimentos sin pendientes'
else:
partes = []
if con_pendientes:
partes.append(f'{con_pendientes} con documentos pendientes')
if con_errores:
partes.append(f'{con_errores} con error')
mensaje = f'{completados}/{total} pedimentos completos — {", ".join(partes)}'
response_data['mensaje'] = mensaje
elif state == 'FAILURE':
response_data['error'] = str(task_result.info) response_data['error'] = str(task_result.info)
if task_result.state == 'STARTED': elif state == 'STARTED':
response_data['info'] = str(task_result.info) if task_result.info else None response_data['info'] = str(task_result.info) if task_result.info else None
return Response(response_data, status=status.HTTP_200_OK) return Response(response_data, status=status.HTTP_200_OK)
except Exception as e: except Exception as e:
return Response( return Response(
{'error': f'Error al consultar tarea: {str(e)}'}, {'error': f'Error al consultar tarea: {str(e)}'},

View File

@@ -1,8 +1,11 @@
import os import os
from celery import Celery from celery import Celery
from datetime import timedelta
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings') os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('config') app = Celery('config')
app.config_from_object('django.conf:settings', namespace='CELERY') app.config_from_object('django.conf:settings', namespace='CELERY')
# corroborar que las tareas esten programadas, se cambio el horario a hora denver
# print("Beat schedule cargado:", app.conf.beat_schedule)
app.autodiscover_tasks() app.autodiscover_tasks()

View File

@@ -30,8 +30,14 @@ from celery.schedules import crontab
from config.stg.storage import * from config.stg.storage import *
CELERY_BEAT_SCHEDULE = { CELERY_BEAT_SCHEDULE = {
'process_all_organizations': {
'task': 'api.customs.tasks.microservice_v2.process_all_organizations',
'schedule': crontab(hour=7, minute=1), # analizar si se requiere otra en un futuro
},
# 'process_all_organizations': {
# 'task': 'api.customs.tasks.microservice_v2.process_all_organizations',
# 'schedule': crontab(hour=11, minute=39), # analizar si se requiere otra en un futuro
# },
} }
# Cargar variables de entorno desde un archivo .env # Cargar variables de entorno desde un archivo .env
@@ -305,7 +311,8 @@ DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# Configuración Celery # Configuración Celery
CELERY_BROKER_URL = os.getenv('CELERY_BROKER_URL', 'redis://redis:6379/0') CELERY_BROKER_URL = os.getenv('CELERY_BROKER_URL', 'redis://redis:6379/0')
CELERY_RESULT_BACKEND = os.getenv('CELERY_RESULT_BACKEND', 'redis://redis:6379/0') CELERY_RESULT_BACKEND = os.getenv('CELERY_RESULT_BACKEND', 'redis://redis:6379/0')
CELERY_TIMEZONE = 'America/Mexico_City' # CELERY_TIMEZONE = 'America/Mexico_City'
CELERY_TIMEZONE = 'America/Denver'
# Configuración para procesamiento asíncrono nativo de Django # Configuración para procesamiento asíncrono nativo de Django
ASGI_APPLICATION = 'config.asgi.application' ASGI_APPLICATION = 'config.asgi.application'