procesar datastage completo
This commit is contained in:
@@ -6,4 +6,4 @@ class CustomsConfig(AppConfig):
|
|||||||
name = 'api.customs'
|
name = 'api.customs'
|
||||||
|
|
||||||
def ready(self):
|
def ready(self):
|
||||||
import api.customs.signals
|
import api.customs.signals.procesamiento
|
||||||
@@ -3,7 +3,7 @@ from django.dispatch import receiver
|
|||||||
from django.db import transaction
|
from django.db import transaction
|
||||||
from time import sleep
|
from time import sleep
|
||||||
|
|
||||||
from api.customs.models import Pedimento, ProcesamientoPedimento, Cove, EDocument
|
from api.customs.models import EstadoDeProcesamiento, Pedimento, ProcesamientoPedimento, Cove, EDocument
|
||||||
from api.customs.tasks.internal_services import (
|
from api.customs.tasks.internal_services import (
|
||||||
crear_procesamiento_remesa,
|
crear_procesamiento_remesa,
|
||||||
crear_procesamiento_partida,
|
crear_procesamiento_partida,
|
||||||
@@ -20,8 +20,49 @@ from api.customs.tasks.microservice import (
|
|||||||
|
|
||||||
@receiver(post_save, sender=Pedimento)
|
@receiver(post_save, sender=Pedimento)
|
||||||
def trigger_celery_task_on_create(sender, instance, created, **kwargs):
|
def trigger_celery_task_on_create(sender, instance, created, **kwargs):
|
||||||
if created:
|
|
||||||
procesar_pedimento_completo_individual.apply_async(args=[instance.id, instance.organizacion.id])
|
if not created:
|
||||||
|
import logging
|
||||||
|
logger = logging.getLogger('api.customs.async_operations')
|
||||||
|
logger.info("NO es creación de pedimento, no se crea procesamiento.")
|
||||||
|
return
|
||||||
|
|
||||||
|
def crear_procesamiento():
|
||||||
|
import logging
|
||||||
|
logger = logging.getLogger('api.customs.async_operations')
|
||||||
|
logger.info(f"Pedimento confirmado en BD: {instance.id}, creando procesamiento...")
|
||||||
|
|
||||||
|
try:
|
||||||
|
estado, _ = EstadoDeProcesamiento.objects.get_or_create(
|
||||||
|
estado='En Espera'
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
estado = EstadoDeProcesamiento.objects.first()
|
||||||
|
|
||||||
|
try:
|
||||||
|
ProcesamientoPedimento.objects.get_or_create(
|
||||||
|
pedimento=instance,
|
||||||
|
organizacion=instance.organizacion,
|
||||||
|
defaults={
|
||||||
|
'estado': estado,
|
||||||
|
'servicio_id': 3,
|
||||||
|
'tipo_procesamiento_id': 2,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(
|
||||||
|
f"No se pudo crear ProcesamientoPedimento "
|
||||||
|
f"para pedimento {instance.id}: {e}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Disparar la tarea asíncrona existente
|
||||||
|
try:
|
||||||
|
procesar_pedimento_completo_individual.apply_async(args=[instance.id, instance.organizacion.id])
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f"Error al encolar procesar_pedimento_completo_individual: {e}")
|
||||||
|
|
||||||
|
transaction.on_commit(crear_procesamiento)
|
||||||
|
|
||||||
|
|
||||||
@receiver(post_save, sender=Pedimento)
|
@receiver(post_save, sender=Pedimento)
|
||||||
def trigger_celery_task_on_update(sender, instance, created,**kwargs):
|
def trigger_celery_task_on_update(sender, instance, created,**kwargs):
|
||||||
|
|||||||
@@ -11,6 +11,9 @@ from datetime import datetime
|
|||||||
# ===================
|
# ===================
|
||||||
@shared_task
|
@shared_task
|
||||||
def procesar_pedimento_completo_individual(pedimento_id, organizacion_id):
|
def procesar_pedimento_completo_individual(pedimento_id, organizacion_id):
|
||||||
|
import logging
|
||||||
|
logger = logging.getLogger('api.customs.async_operations')
|
||||||
|
logger.info(f"Pedimento a monitorear: {pedimento_id}, org:: {organizacion_id}, verificando servicios a crear...")
|
||||||
response = requests.post(
|
response = requests.post(
|
||||||
f"{SERVICE_API_URL}/async/services/pedimento_completo",
|
f"{SERVICE_API_URL}/async/services/pedimento_completo",
|
||||||
json={"pedimento": str(pedimento_id), "organizacion": str(organizacion_id)}
|
json={"pedimento": str(pedimento_id), "organizacion": str(organizacion_id)}
|
||||||
|
|||||||
@@ -61,18 +61,35 @@ class DataStageViewSet(LoggingMixin, viewsets.ModelViewSet, OrganizacionFiltrada
|
|||||||
|
|
||||||
if self.request.user.is_superuser:
|
if self.request.user.is_superuser:
|
||||||
# Permitir que el superusuario cree sin organización o la especifique
|
# Permitir que el superusuario cree sin organización o la especifique
|
||||||
serializer.save()
|
datastage = serializer.save()
|
||||||
|
self._trigger_processing(datastage)
|
||||||
return
|
return
|
||||||
|
|
||||||
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user.groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
|
if (self.request.user.groups.filter(name='developer').exists() or self.request.user.groups.filter(name='admin').exists() or self.request.user.groups.filter(name='user').exists()) and self.request.user.groups.filter(name='Agente Aduanal').exists():
|
||||||
if not organizacion:
|
if not organizacion:
|
||||||
serializer.save(organizacion=self.request.user.organizacion)
|
datastage = serializer.save(organizacion=self.request.user.organizacion)
|
||||||
else:
|
else:
|
||||||
serializer.save()
|
datastage = serializer.save()
|
||||||
|
|
||||||
|
self._trigger_processing(datastage)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
raise ValueError("No cuentas con los permisos necesarios para crear un DataStage")
|
raise ValueError("No cuentas con los permisos necesarios para crear un DataStage")
|
||||||
|
|
||||||
|
def _trigger_processing(self, datastage):
|
||||||
|
"""
|
||||||
|
Método helper para disparar el procesamiento.
|
||||||
|
"""
|
||||||
|
from api.datastage.tasks import procesar_datastage_task
|
||||||
|
user_organizacion = getattr(self.request.user, 'organizacion', None)
|
||||||
|
user_organizacion_id = user_organizacion.id if user_organizacion else None
|
||||||
|
|
||||||
|
datastage.procesado = True
|
||||||
|
datastage.save()
|
||||||
|
|
||||||
|
task = procesar_datastage_task.delay(datastage.id, user_organizacion_id)
|
||||||
|
|
||||||
def perform_update(self, serializer):
|
def perform_update(self, serializer):
|
||||||
"""
|
"""
|
||||||
Override to ensure organization is set on update.
|
Override to ensure organization is set on update.
|
||||||
@@ -113,6 +130,7 @@ class DataStageViewSet(LoggingMixin, viewsets.ModelViewSet, OrganizacionFiltrada
|
|||||||
"""
|
"""
|
||||||
Endpoint para procesar el DataStage de forma asíncrona usando Celery.
|
Endpoint para procesar el DataStage de forma asíncrona usando Celery.
|
||||||
"""
|
"""
|
||||||
|
# ojo aqui
|
||||||
from api.datastage.tasks import procesar_datastage_task
|
from api.datastage.tasks import procesar_datastage_task
|
||||||
datastage = self.get_object()
|
datastage = self.get_object()
|
||||||
user_organizacion = getattr(self.request.user, 'organizacion', None)
|
user_organizacion = getattr(self.request.user, 'organizacion', None)
|
||||||
|
|||||||
Reference in New Issue
Block a user