diff --git a/api/customs/tasks/internal_services.py b/api/customs/tasks/internal_services.py index 89b3043..cf20ac4 100644 --- a/api/customs/tasks/internal_services.py +++ b/api/customs/tasks/internal_services.py @@ -1,6 +1,14 @@ from celery import shared_task, group from api.customs.models import ProcesamientoPedimento, Pedimento, Cove, EDocument from core.utils import xml_controller +from api.customs.tasks.microservice import ( + procesar_cove_individual, + procesar_acuse_individual, + procesar_acuse_cove_individual, + procesar_edoc_individual, + procesar_partida_individual, + procesar_remesa_individual, +) @shared_task def crear_procesamiento_remesa(pedimento_id): @@ -11,7 +19,7 @@ def crear_procesamiento_remesa(pedimento_id): if pedimento.remesas: existe = ProcesamientoPedimento.objects.filter( pedimento=pedimento, - servicio_id=5, # ID del servicio de remesas + servicio_id=5, organizacion=pedimento.organizacion, estado_id__in=[1, 2, 3, 4] ).exists() @@ -19,10 +27,11 @@ def crear_procesamiento_remesa(pedimento_id): logger.info(f"[TAREA] ProcesamientoPedimento remesa creado para pedimento {pedimento_id}") ProcesamientoPedimento.objects.create( pedimento=pedimento, - estado_id=1, # Estado "pendiente" + estado_id=1, servicio_id=5, organizacion=pedimento.organizacion ) + procesar_remesa_individual.apply_async(args=[str(pedimento.id), str(pedimento.organizacion.id)]) @shared_task def crear_procesamiento_partida(pedimento_id): @@ -32,7 +41,7 @@ def crear_procesamiento_partida(pedimento_id): logger.info(f"[TAREA] crear_procesamiento_partida para pedimento {pedimento_id}") existe = ProcesamientoPedimento.objects.filter( pedimento=pedimento, - servicio_id=4, # ID del servicio de partidas + servicio_id=4, organizacion=pedimento.organizacion, estado_id__in=[1, 2, 3, 4] ).exists() @@ -40,10 +49,11 @@ def crear_procesamiento_partida(pedimento_id): logger.info(f"[TAREA] ProcesamientoPedimento partida creado para pedimento {pedimento_id}") ProcesamientoPedimento.objects.create( pedimento=pedimento, - estado_id=1, # Estado "pendiente" + estado_id=1, servicio_id=4, organizacion=pedimento.organizacion ) + procesar_partida_individual.apply_async(args=[str(pedimento.id), str(pedimento.organizacion.id)]) @shared_task def crear_procesamiento_cove(pedimento_id): @@ -54,7 +64,7 @@ def crear_procesamiento_cove(pedimento_id): if pedimento.coves.exists(): existe = ProcesamientoPedimento.objects.filter( pedimento=pedimento, - servicio_id=8, # ID del servicio de Coves + servicio_id=8, organizacion=pedimento.organizacion, estado_id__in=[1, 2, 3, 4] ).exists() @@ -62,10 +72,11 @@ def crear_procesamiento_cove(pedimento_id): logger.info(f"[TAREA] ProcesamientoPedimento cove creado para pedimento {pedimento_id}") ProcesamientoPedimento.objects.create( pedimento=pedimento, - estado_id=1, # Estado "pendiente" + estado_id=1, servicio_id=8, organizacion=pedimento.organizacion ) + procesar_cove_individual.apply_async(args=[str(pedimento.id), str(pedimento.organizacion.id)]) @shared_task def crear_procesamiento_acuse(pedimento_id): @@ -73,10 +84,10 @@ def crear_procesamiento_acuse(pedimento_id): logger = logging.getLogger('api.customs.async_operations') pedimento = Pedimento.objects.get(id=pedimento_id) logger.info(f"[TAREA] crear_procesamiento_acuse para pedimento {pedimento_id}") - if pedimento.coves.exists(): + if pedimento.documentos.exists(): existe = ProcesamientoPedimento.objects.filter( pedimento=pedimento, - servicio_id=6, # ID del servicio de Acuse Cove + servicio_id=6, organizacion=pedimento.organizacion, estado_id__in=[1, 2, 3, 4] ).exists() @@ -84,10 +95,11 @@ def crear_procesamiento_acuse(pedimento_id): logger.info(f"[TAREA] ProcesamientoPedimento acuse creado para pedimento {pedimento_id}") ProcesamientoPedimento.objects.create( pedimento=pedimento, - estado_id=1, # Estado "pendiente" + estado_id=1, servicio_id=6, organizacion=pedimento.organizacion ) + procesar_acuse_individual.apply_async(args=[str(pedimento.id), str(pedimento.organizacion.id)]) @shared_task def crear_procesamiento_acuse_cove(pedimento_id): @@ -98,7 +110,7 @@ def crear_procesamiento_acuse_cove(pedimento_id): if pedimento.coves.exists(): existe = ProcesamientoPedimento.objects.filter( pedimento=pedimento, - servicio_id=9, # ID del servicio de Acuse Cove + servicio_id=9, organizacion=pedimento.organizacion, estado_id__in=[1, 2, 3, 4] ).exists() @@ -106,10 +118,11 @@ def crear_procesamiento_acuse_cove(pedimento_id): logger.info(f"[TAREA] ProcesamientoPedimento acuse_cove creado para pedimento {pedimento_id}") ProcesamientoPedimento.objects.create( pedimento=pedimento, - estado_id=1, # Estado "pendiente" + estado_id=1, servicio_id=9, organizacion=pedimento.organizacion ) + procesar_acuse_cove_individual.apply_async(args=[str(pedimento.id), str(pedimento.organizacion.id)]) @shared_task def crear_procesamiento_edocument(pedimento_id): @@ -120,7 +133,7 @@ def crear_procesamiento_edocument(pedimento_id): if pedimento.documentos.exists(): existe = ProcesamientoPedimento.objects.filter( pedimento=pedimento, - servicio_id=7, # ID del servicio de EDocument + servicio_id=7, organizacion=pedimento.organizacion, estado_id__in=[1, 2, 3, 4] ).exists() @@ -128,10 +141,11 @@ def crear_procesamiento_edocument(pedimento_id): logger.info(f"[TAREA] ProcesamientoPedimento edocument creado para pedimento {pedimento_id}") ProcesamientoPedimento.objects.create( pedimento=pedimento, - estado_id=1, # Estado "pendiente" + estado_id=1, servicio_id=7, organizacion=pedimento.organizacion ) + procesar_edoc_individual.apply_async(args=[str(pedimento.id), str(pedimento.organizacion.id)]) @shared_task def crear_procesamiento_pedimento_completo(organizacion_id): diff --git a/api/customs/tests.py b/api/customs/tests.py index bcbbebf..959adcb 100644 --- a/api/customs/tests.py +++ b/api/customs/tests.py @@ -3,7 +3,12 @@ from django.urls import reverse from rest_framework.test import APITestCase, APIClient from rest_framework import status from django.contrib.auth import get_user_model +from django.core.files.uploadedfile import SimpleUploadedFile +from unittest.mock import patch +from io import BytesIO +import zipfile from api.organization.models import Organizacion +from api.licence.models import Licencia from .models import Pedimento, TipoOperacion, ProcesamientoPedimento, EDocument User = get_user_model() @@ -75,3 +80,147 @@ class CustomsViewsTests(APITestCase): self.client.force_authenticate(user=self.admin) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) + + +# --------------------------------------------------------------------------- +# Tests de integración para bulk-create (ViewSetPedimento.bulk_create) +# Verifica que al re-cargar un pedimento existente sus documentos se actualicen +# --------------------------------------------------------------------------- + +class BulkCreateDocumentReplaceTests(APITestCase): + """Verifica que bulk-create actualiza los documentos de pedimentos existentes + en vez de ignorarlos, y que no quedan archivos residuales en el storage.""" + + PEDIMENTO_APP = "24-01-3420-1234567" + + def setUp(self): + self.licencia = Licencia.objects.create(nombre="Lic100GB", almacenamiento=100) + self.org = Organizacion.objects.create( + nombre="OrgBulkCreate", + licencia=self.licencia, + is_active=True, + is_verified=True, + ) + self.user = User.objects.create_user( + username="bulkcreateuser", password="pass", organizacion=self.org + ) + self.pedimento = Pedimento.objects.create( + organizacion=self.org, + pedimento="1234567", + pedimento_app=self.PEDIMENTO_APP, + ) + from api.record.models import DocumentType, Fuente + self.doc_type = DocumentType.objects.get_or_create(nombre="Pedimento")[0] + # bulk_create usa fuente_id=4 hardcodeado; debe existir en la DB de test + Fuente.objects.get_or_create(id=4, defaults={"nombre": "Bulk Create"}) + self.url = reverse("Pedimento-bulk-create") + self.client.force_authenticate(user=self.user) + + def _make_zip(self, files_dict): + """Crea un ZIP en memoria. files_dict = {nombre_archivo: contenido_bytes}""" + buf = BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + for name, content in files_dict.items(): + zf.writestr(name, content) + buf.seek(0) + return SimpleUploadedFile( + f"{self.PEDIMENTO_APP}.zip", buf.read(), content_type="application/zip" + ) + + def _post_zip(self, files_dict): + return self.client.post( + self.url, + {"contribuyente": "XAXX010101000", "archivos": [self._make_zip(files_dict)]}, + format="multipart", + ) + + @patch("api.customs.views.storage_service") + def test_existing_pedimento_not_duplicated(self, mock_st): + """Re-subir un pedimento existente NO debe crear un segundo Pedimento.""" + mock_st.save_document_from_path.return_value = "org_1/documents/ped/informe_a1b2c3d4.pdf" + + self._post_zip({"informe.pdf": b"contenido"}) + + self.assertEqual( + Pedimento.objects.filter( + organizacion=self.org, pedimento_app=self.PEDIMENTO_APP + ).count(), + 1, + ) + + @patch("api.customs.views.storage_service") + def test_existing_pedimento_document_replaced_not_duplicated(self, mock_st): + """Documento existente con el mismo nombre base se reemplaza, no se duplica.""" + from api.record.models import Document + + old_path = f"org_1/documents/{self.PEDIMENTO_APP}/informe_a1b2c3d4.pdf" + old_doc = Document.objects.create( + organizacion=self.org, + pedimento=self.pedimento, + document_type=self.doc_type, + archivo=old_path, + size=500, + extension="pdf", + ) + new_path = f"org_1/documents/{self.PEDIMENTO_APP}/informe_b5c6d7e8.pdf" + mock_st.save_document_from_path.return_value = new_path + mock_st.delete_file.return_value = True + + self._post_zip({"informe.pdf": b"contenido actualizado"}) + + docs = Document.objects.filter(pedimento=self.pedimento) + # Sin duplicados + self.assertEqual(docs.count(), 1) + # Mismo registro + self.assertEqual(docs.first().id, old_doc.id) + # Archivo actualizado + old_doc.refresh_from_db() + self.assertEqual(old_doc.archivo.name, new_path) + + @patch("api.customs.views.storage_service") + def test_existing_pedimento_stale_file_deleted_from_storage(self, mock_st): + """Al reemplazar un documento, el archivo viejo debe eliminarse del storage.""" + from api.record.models import Document + + old_path = f"org_1/documents/{self.PEDIMENTO_APP}/informe_a1b2c3d4.pdf" + Document.objects.create( + organizacion=self.org, + pedimento=self.pedimento, + document_type=self.doc_type, + archivo=old_path, + size=500, + extension="pdf", + ) + mock_st.save_document_from_path.return_value = f"org_1/documents/{self.PEDIMENTO_APP}/informe_b5c6d7e8.pdf" + mock_st.delete_file.return_value = True + + self._post_zip({"informe.pdf": b"contenido"}) + + # delete_file debe haberse llamado con la ruta del archivo viejo + mock_st.delete_file.assert_called() + called_arg = str(mock_st.delete_file.call_args[0][0]) + self.assertIn("informe_a1b2c3d4", called_arg) + + @patch("api.customs.views.storage_service") + def test_existing_pedimento_new_file_added(self, mock_st): + """Archivo nuevo en el ZIP se añade al pedimento existente.""" + from api.record.models import Document + + mock_st.save_document_from_path.return_value = "org_1/documents/ped/nuevo_b5c6d7e8.pdf" + + self._post_zip({"nuevo_documento.pdf": b"contenido nuevo"}) + + self.assertGreaterEqual( + Document.objects.filter(pedimento=self.pedimento).count(), 1 + ) + + @patch("api.customs.views.storage_service") + def test_already_existing_count_in_response(self, mock_st): + """La respuesta debe indicar que el pedimento ya existía (already_existing_count >= 1).""" + mock_st.save_document_from_path.return_value = "org_1/documents/ped/f_a1b2c3d4.pdf" + + response = self._post_zip({"archivo.pdf": b"contenido"}) + + self.assertIn(response.status_code, [status.HTTP_200_OK, status.HTTP_207_MULTI_STATUS, status.HTTP_201_CREATED]) + data = response.json() + self.assertGreaterEqual(data.get("already_existing_count", 0), 1) diff --git a/api/customs/urls.py b/api/customs/urls.py index 89c45a4..f706dac 100644 --- a/api/customs/urls.py +++ b/api/customs/urls.py @@ -39,6 +39,7 @@ from .views_auditor import ( auditar_acuse_cove_endpoint, auditar_edocuments_endpoint, auditar_acuse_endpoint, + auditar_remesas_endpoint, auditar_cove_pedimento_endpoint, auditar_acuse_cove_pedimento_endpoint, auditar_edocument_pedimento_endpoint, @@ -72,6 +73,7 @@ urlpatterns = [ path('auditor/auditar-acuse-cove/', auditar_acuse_cove_endpoint, name='auditar-acuse-cove'), path('auditor/auditar-edocuments/', auditar_edocuments_endpoint, name='auditar-edocuments'), path('auditor/auditar-acuse/', auditar_acuse_endpoint, name='auditar-acuse'), + path('auditor/auditar-remesas/', auditar_remesas_endpoint, name='auditar-remesas'), path('auditor/auditar-cove/pedimento/', auditar_cove_pedimento_endpoint, name='auditar-cove-pedimento'), path('auditor/auditar-acuse-cove/pedimento/', auditar_acuse_cove_pedimento_endpoint, name='auditar-acuse-cove-pedimento'), path('auditor/auditar-edocument/pedimento/', auditar_edocument_pedimento_endpoint, name='auditar-edocument-pedimento'), diff --git a/api/datastage/models.py b/api/datastage/models.py index 383fdbc..ab68c1d 100644 --- a/api/datastage/models.py +++ b/api/datastage/models.py @@ -84,7 +84,9 @@ class Registro501(models.Model): organizacion = models.ForeignKey('organization.Organizacion', on_delete=models.CASCADE, related_name='registro501s', null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro501s', null=True, blank=True) - + + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro501' @@ -104,6 +106,8 @@ class Registro502(models.Model): datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro502s', null=True, blank=True) patente = models.CharField(max_length=50, null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro502' @@ -120,6 +124,8 @@ class Registro503(models.Model): consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro503s', null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro503' @@ -136,6 +142,8 @@ class Registro504(models.Model): consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro504s', null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro504' @@ -165,6 +173,8 @@ class Registro505(models.Model): datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro505s', null=True, blank=True) patente = models.CharField(max_length=50, null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro505' @@ -181,6 +191,8 @@ class Registro506(models.Model): consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro506s', null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro506' @@ -199,6 +211,8 @@ class Registro507(models.Model): consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro507s', null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro507' @@ -223,6 +237,8 @@ class Registro508(models.Model): consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro508s', null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro508' @@ -241,6 +257,8 @@ class Registro509(models.Model): consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro509s', null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro509' @@ -261,6 +279,8 @@ class Registro510(models.Model): forma_pago = models.CharField(max_length=3, null=True, blank=True) importe_pago = models.CharField(max_length=12, null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro510' @@ -278,6 +298,8 @@ class Registro511(models.Model): consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro511s', null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro511' @@ -301,6 +323,8 @@ class Registro512(models.Model): consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro512s', null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro512' @@ -363,6 +387,8 @@ class Registro551(models.Model): datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro551s', null=True, blank=True) entidad_fed_destino = models.CharField(max_length=50, null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro551' @@ -381,6 +407,8 @@ class Registro552(models.Model): consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro552s', null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro552' @@ -402,6 +430,8 @@ class Registro553(models.Model): datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro553s', null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro553' @@ -421,6 +451,8 @@ class Registro554(models.Model): consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro554s', null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro554' @@ -446,6 +478,8 @@ class Registro555(models.Model): created_by = models.IntegerField(null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro555' @@ -465,6 +499,8 @@ class Registro556(models.Model): fraccion = models.CharField(max_length=8, null=True, blank=True) secuencia_fraccion = models.CharField(max_length=50, null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro556' @@ -484,6 +520,8 @@ class Registro557(models.Model): datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro557s', null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro557' @@ -502,6 +540,8 @@ class Registro558(models.Model): datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro558s', null=True, blank=True) consulta = models.CharField(max_length=50, null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro558' @@ -522,6 +562,8 @@ class RegistroSel(models.Model): consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro_sel', null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro_sel' @@ -546,6 +588,8 @@ class Registro701(models.Model): consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro701s', null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro701' @@ -564,6 +608,8 @@ class Registro702(models.Model): consulta = models.CharField(max_length=50, null=True, blank=True) datastage = models.ForeignKey(DataStage, on_delete=models.CASCADE, related_name='registro702s', null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + class Meta: db_table = 'registro702' diff --git a/api/datastage/tasks.py b/api/datastage/tasks.py index c884990..e95f814 100644 --- a/api/datastage/tasks.py +++ b/api/datastage/tasks.py @@ -9,6 +9,8 @@ import zipfile import re from api.utils.storage_service import storage_service +logger = logging.getLogger(__name__) + @shared_task def procesar_datastage_task(datastage_id, user_organizacion_id=None): import traceback @@ -167,15 +169,22 @@ def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name): continue if first: - field_names = [f for f in line_decoded.split('|')] + field_names = line_decoded.split('|') + # Eliminar columnas vacías del final (líneas terminan con |) + while field_names and field_names[-1] == '': + field_names.pop() field_names_snake = [to_snake_case(f) for f in field_names] first = False continue - + values = line_decoded.split('|') while values and values[-1] == '': values.pop() if len(values) != len(field_names_snake): + logger.debug( + "%s línea %d: esperados %d campos, recibidos %d — se omite", + asc_name, line_count, len(field_names_snake), len(values) + ) continue data = dict(zip(field_names_snake, values)) @@ -185,28 +194,36 @@ def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name): if hasattr(Model, 'datastage_id'): data['datastage_id'] = datastage.id - # Limpiar fechas vacías + # Parsear y normalizar todos los campos de fecha/datetime for field in Model._meta.get_fields(): - if hasattr(field, 'get_internal_type') and field.get_internal_type() in ["DateField", "DateTimeField"]: - if data.get(field.name) == "": - data[field.name] = None - - # Convertir fecha_pago_real - if 'fecha_pago_real' in data and data['fecha_pago_real']: - fecha_val = data['fecha_pago_real'] - if isinstance(fecha_val, str): - try: - dt = datetime.datetime.strptime(fecha_val, '%Y-%m-%d %H:%M:%S') - except ValueError: + if not hasattr(field, 'get_internal_type'): + continue + field_type = field.get_internal_type() + val = data.get(field.name) + if val == '' or val is None: + data[field.name] = None + continue + if field_type == 'DateTimeField' and isinstance(val, str): + dt = None + for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'): try: - dt = datetime.datetime.strptime(fecha_val, '%Y-%m-%d') - except Exception: - dt = None + dt = datetime.datetime.strptime(val, fmt) + break + except ValueError: + continue if dt and timezone.is_naive(dt): dt = timezone.make_aware(dt) - if dt: - data['fecha_pago_real'] = dt + data[field.name] = dt + # Filtrar data para solo incluir campos válidos del modelo + valid_fields = set() + for f in Model._meta.get_fields(): + if hasattr(f, 'name'): + valid_fields.add(f.name) + if hasattr(f, 'attname'): + valid_fields.add(f.attname) + data = {k: v for k, v in data.items() if k in valid_fields} + try: obj = Model(**data) objects_to_create.append(obj) @@ -284,8 +301,9 @@ def procesar_archivo_asc_task(datastage_id, user_organizacion_id, asc_name): try: Pedimento.objects.create(**pedimento_data) except Exception as ped_exc: - pass + logger.warning("No se pudo crear Pedimento %s: %s", pedimento_app, ped_exc) except Exception as e: + logger.error("%s línea %d: error creando objeto %s: %s", asc_name, line_count, model_name, e) continue # Bulk create diff --git a/api/tasks/views.py b/api/tasks/views.py index 2a68fa7..fefb1d1 100644 --- a/api/tasks/views.py +++ b/api/tasks/views.py @@ -57,46 +57,61 @@ from celery.result import AsyncResult class TaskStatusView(APIView): - """ - Vista para consultar el estado de tareas de Celery. - """ permission_classes = [IsAuthenticated] - + def get(self, request, task_id): """ - Consulta el estado de una tarea de Celery. - - Returns: - - PENDING: La tarea está esperando ser procesada - - STARTED: La tarea ha sido iniciada - - SUCCESS: La tarea se completó exitosamente - - FAILURE: La tarea falló - - RETRY: La tarea está reintentando + Consulta el estado de una tarea Celery. + + Estados posibles: + PENDING — en cola, aún no inició + STARTED — worker la tomó y está ejecutando + SUCCESS — terminó correctamente, `result` contiene el resumen + FAILURE — lanzó una excepción no capturada, `error` describe el problema + RETRY — el worker la está reintentando """ try: task_result = AsyncResult(task_id) - + state = task_result.state + response_data = { 'task_id': task_id, - 'status': task_result.state, + 'status': state, 'ready': task_result.ready(), 'successful': task_result.successful() if task_result.ready() else None, } - - if task_result.ready() and task_result.successful(): - try: - response_data['result'] = task_result.result - except Exception: - pass - - if task_result.state == 'FAILURE': + + if state == 'SUCCESS': + result = task_result.result + response_data['result'] = result + + # Resumen legible cuando es auditoría masiva de organización + if isinstance(result, dict) and 'total_pedimentos' in result: + total = result.get('total_pedimentos', 0) + completados = result.get('completados', 0) + con_pendientes = result.get('con_pendientes', 0) + con_errores = result.get('con_errores', 0) + + if con_pendientes == 0 and con_errores == 0: + mensaje = f'Auditoría completa — {completados}/{total} pedimentos sin pendientes' + else: + partes = [] + if con_pendientes: + partes.append(f'{con_pendientes} con documentos pendientes') + if con_errores: + partes.append(f'{con_errores} con error') + mensaje = f'{completados}/{total} pedimentos completos — {", ".join(partes)}' + + response_data['mensaje'] = mensaje + + elif state == 'FAILURE': response_data['error'] = str(task_result.info) - - if task_result.state == 'STARTED': + + elif state == 'STARTED': response_data['info'] = str(task_result.info) if task_result.info else None - + return Response(response_data, status=status.HTTP_200_OK) - + except Exception as e: return Response( {'error': f'Error al consultar tarea: {str(e)}'}, diff --git a/config/celery.py b/config/celery.py index fb276c1..0ff9f13 100644 --- a/config/celery.py +++ b/config/celery.py @@ -1,8 +1,11 @@ import os from celery import Celery +from datetime import timedelta os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings') app = Celery('config') app.config_from_object('django.conf:settings', namespace='CELERY') +# corroborar que las tareas esten programadas, se cambio el horario a hora denver +# print("Beat schedule cargado:", app.conf.beat_schedule) app.autodiscover_tasks() diff --git a/config/settings.py b/config/settings.py index 28f599a..16296b0 100644 --- a/config/settings.py +++ b/config/settings.py @@ -30,8 +30,14 @@ from celery.schedules import crontab from config.stg.storage import * CELERY_BEAT_SCHEDULE = { - - + 'process_all_organizations': { + 'task': 'api.customs.tasks.microservice_v2.process_all_organizations', + 'schedule': crontab(hour=7, minute=1), # analizar si se requiere otra en un futuro + }, + # 'process_all_organizations': { + # 'task': 'api.customs.tasks.microservice_v2.process_all_organizations', + # 'schedule': crontab(hour=11, minute=39), # analizar si se requiere otra en un futuro + # }, } # Cargar variables de entorno desde un archivo .env @@ -305,7 +311,8 @@ DEFAULT_FROM_EMAIL = EMAIL_HOST_USER # Configuración Celery CELERY_BROKER_URL = os.getenv('CELERY_BROKER_URL', 'redis://redis:6379/0') CELERY_RESULT_BACKEND = os.getenv('CELERY_RESULT_BACKEND', 'redis://redis:6379/0') -CELERY_TIMEZONE = 'America/Mexico_City' +# CELERY_TIMEZONE = 'America/Mexico_City' +CELERY_TIMEZONE = 'America/Denver' # Configuración para procesamiento asíncrono nativo de Django ASGI_APPLICATION = 'config.asgi.application'