fix/forzar el procesamiento de un pedimento cargado por datastage

This commit is contained in:
Dulce
2026-04-16 07:14:56 -06:00
parent 2aebef8b26
commit d29cfcb00c
7 changed files with 267 additions and 10 deletions

View File

@@ -1,11 +1,11 @@
from fastapi import APIRouter, HTTPException, BackgroundTasks from fastapi import APIRouter, HTTPException, BackgroundTasks
from schemas.pedimentoSchema import PedimentoRequest from schemas.pedimentoSchema import PedimentoRequest
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from celery_app import celery_app from celery_app import celery_app
from tasks import pedimento_completo_task from tasks import pedimento_completo_task
import logging import logging
from typing import Dict, Any from typing import Dict, Any, List
import uuid import uuid
from datetime import datetime from datetime import datetime
@@ -64,6 +64,54 @@ async def async_get_pedimento_completo(request: ServiceRemesaSchema):
detail=f"Error al agendar la tarea: {str(e)}" detail=f"Error al agendar la tarea: {str(e)}"
) )
@router.post("/async/services/pedimento_completo/multiple")
async def async_multi_pedimento_completo(request: MultiPedimentoCompletoSchema):
"""
Agenda la tarea de obtener pedimentos completos de forma asíncrona para MÚLTIPLES pedimentos.
Args:
request: MultiPedimentoCompletoSchema con lista de pedimentos y organización
Returns:
JSONResponse con task_id para consultar el estado de la tarea
Raises:
HTTPException: En caso de errores de validación
"""
try:
pedimentos_ids = request.pedimentos
organizacion = request.organizacion
logger.info(f"Agendando tarea para {len(pedimentos_ids)} pedimentos completos")
# Agendar la tarea en Celery
from tasks import multi_pedimento_completo_task
task = multi_pedimento_completo_task.delay(pedimentos_ids, organizacion)
# Crear respuesta inmediata (EXACTAMENTE igual a tus otras respuestas)
response_data = {
"success": True,
"message": f"Tarea agendada exitosamente para {len(pedimentos_ids)} pedimentos. La consulta de pedimentos completos se está procesando en segundo plano.",
"task_id": task.id,
"total_pedimentos": len(pedimentos_ids),
"pedimentos": pedimentos_ids,
"organizacion": organizacion,
"status": "PENDING",
"timestamp": datetime.utcnow().isoformat(),
"estimated_completion": f"{len(pedimentos_ids) * 2}-{len(pedimentos_ids) * 5} minutos",
"check_status_url": f"/async/task-status/{task.id}"
}
logger.info(f"Tarea múltiple agendada exitosamente - Task ID: {task.id}")
return JSONResponse(content=response_data, status_code=202)
except Exception as e:
logger.error(f"Error al agendar tarea múltiple de pedimento completo: {e}")
raise HTTPException(
status_code=500,
detail=f"Error al agendar la tarea: {str(e)}"
)
@router.get("/async/task-status/{task_id}") @router.get("/async/task-status/{task_id}")
async def get_task_status(task_id: str): async def get_task_status(task_id: str):
""" """

View File

@@ -1,6 +1,6 @@
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException
from schemas.pedimentoSchema import PedimentoRequest from schemas.pedimentoSchema import PedimentoRequest
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
import asyncio import asyncio
import logging import logging
logger = logging.getLogger("app.api") logger = logging.getLogger("app.api")

View File

@@ -6,8 +6,9 @@ from .tasks import process_pedimento_completo_request
from .services import put_pedimento_data_vu from .services import put_pedimento_data_vu
from api.api_v2.modules.tasks.services import register_task from api.api_v2.modules.tasks.services import register_task
import logging import logging
logger = logging.getLogger("app.api") from celery import group, chord, chain
logger = logging.getLogger("app.api")
router = APIRouter() router = APIRouter()
@router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED) @router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED)

View File

@@ -1,4 +1,4 @@
from typing import Optional, Union, Dict, Any from typing import Optional, Union, Dict, Any, List
from uuid import UUID from uuid import UUID
from datetime import datetime from datetime import datetime
# CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione # CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione

View File

@@ -242,7 +242,10 @@ class APIController:
""" """
Método para obtener la lista de servicios desde la API. Método para obtener la lista de servicios desde la API.
""" """
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}') # return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}')
# eliminar filtro de estado, estado tiene 4 tipos 1: En espera, 2: Procesando, 3: Finalizado y 4: Error
# lo elimine para poder ejecutar la operacion siempre que sea necesario
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&servicio={service_type}')
async def get_pedimento(self, pedimento_id: str) -> Dict[str, Any]: async def get_pedimento(self, pedimento_id: str) -> Dict[str, Any]:
""" """
@@ -497,11 +500,13 @@ class APIController:
try: try:
async with httpx.AsyncClient(timeout=self.timeout) as client: async with httpx.AsyncClient(timeout=self.timeout) as client:
logger.info(f"Haciendo petición {method} a {url}") logger.info(f"Haciendo petición {method} a {url}")
print(f"Haciendo petición {method} a {url}")
if method.upper() == 'GET': if method.upper() == 'GET':
response = await client.get(url, headers=self.headers) response = await client.get(url, headers=self.headers)
elif method.upper() == 'POST': elif method.upper() == 'POST':
response = await client.post(url, json=data, headers=self.headers) response = await client.post(url, json=data, headers=self.headers)
print(f"response >>>> {response}")
elif method.upper() == 'PUT': elif method.upper() == 'PUT':
response = await client.put(url, json=data, headers=self.headers) response = await client.put(url, json=data, headers=self.headers)
elif method.upper() == 'DELETE': elif method.upper() == 'DELETE':

View File

@@ -1,6 +1,5 @@
from pydantic import BaseModel, Field, field_validator from pydantic import BaseModel, Field, field_validator
from typing import Optional from typing import Optional, List, Union
class ServiceBaseSchema(BaseModel): class ServiceBaseSchema(BaseModel):
"""Esquema base para servicios con campos comunes""" """Esquema base para servicios con campos comunes"""
@@ -44,3 +43,40 @@ class ServiceRemesaSchema(BaseModel):
if not v or not v.strip(): if not v or not v.strip():
raise ValueError('Los campos de texto no pueden estar vacíos') raise ValueError('Los campos de texto no pueden estar vacíos')
return v.strip() return v.strip()
class MultiPedimentoCompletoSchema(BaseModel):
"""Esquema para procesar múltiples pedimentos de forma asíncrona"""
organizacion: str = Field(..., description="ID de la organización")
pedimentos: List[str] = Field(
...,
description="Lista de IDs de pedimentos a procesar",
min_length=1,
max_length=200
)
@field_validator('organizacion')
def validate_organizacion(cls, v):
if not v or not v.strip():
raise ValueError('La organización no puede estar vacía')
return v.strip()
@field_validator('pedimentos')
def validate_pedimentos(cls, v):
if not v:
raise ValueError('Debe proporcionar al menos un pedimento')
if len(v) > 200:
raise ValueError(f'Máximo 200 pedimentos por solicitud. Recibidos: {len(v)}')
# Eliminar duplicados y vacíos
pedimentos_limpios = [p.strip() for p in v if p and p.strip()]
if not pedimentos_limpios:
raise ValueError('La lista de pedimentos no puede estar vacía')
# Eliminar duplicados
return list(set(pedimentos_limpios))
class Config:
json_schema_extra = {
"example": {
"organizacion": "1",
"pedimentos": ["123", "456", "789"]
}
}

169
tasks.py
View File

@@ -2,7 +2,7 @@ from celery import Celery
from celery_app import celery_app from celery_app import celery_app
import asyncio import asyncio
import logging import logging
from typing import Dict, Any from typing import Dict, Any, List
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from controllers.RESTController import rest_controller from controllers.RESTController import rest_controller
from controllers.SOAPController import soap_controller from controllers.SOAPController import soap_controller
@@ -158,6 +158,173 @@ def pedimento_completo_task(self, request_data: Dict[str, Any]):
return run_async_task(_execute_pedimento_completo) return run_async_task(_execute_pedimento_completo)
@celery_app.task(bind=True, name='tasks.multi_pedimento_completo_task')
def multi_pedimento_completo_task(self, pedimentos: List[str], organizacion: str):
"""
Tarea asíncrona para procesar MÚLTIPLES pedimentos completos.
Args:
pedimentos: Lista de IDs de pedimentos a procesar
organizacion: ID de la organización
"""
import time
from datetime import datetime
start_time = time.time()
results = {
"total": len(pedimentos),
"successful": [],
"failed": [],
"started_at": datetime.utcnow().isoformat()
}
total = len(pedimentos)
for idx, pedimento_id in enumerate(pedimentos, 1):
try:
# Actualizar progreso (igual que en tus otras tareas)
self.update_state(
state='PROGRESS',
meta={
'status': f'Procesando pedimento {idx}/{total}',
'current': idx,
'total': total,
'current_pedimento': pedimento_id,
'percentage': round((idx / total) * 100, 2)
}
)
logger.info(f"[MULTI] Procesando pedimento {idx}/{total}: {pedimento_id}")
# Preparar datos exactamente como lo espera la tarea individual
request_data = {
"pedimento": pedimento_id,
"organizacion": organizacion
}
# Reutilizar la lógica de la tarea individual
# Esto ejecuta el mismo código que tu endpoint individual
async def _execute():
return await _execute_pedimento_completo_logic(request_data)
result = run_async_task(_execute)
results["successful"].append({
"pedimento_id": pedimento_id,
"result": result
})
logger.info(f"[MULTI] Pedimento {pedimento_id} procesado exitosamente")
except Exception as e:
logger.error(f"[MULTI] Error procesando pedimento {pedimento_id}: {e}")
results["failed"].append({
"pedimento_id": pedimento_id,
"error": str(e)
})
elapsed_time = time.time() - start_time
results["completed_at"] = datetime.utcnow().isoformat()
results["elapsed_seconds"] = round(elapsed_time, 2)
results["success_count"] = len(results["successful"])
results["failed_count"] = len(results["failed"])
return results
async def _execute_pedimento_completo_logic(request_data: dict) -> dict:
"""
Lógica compartida para procesar un pedimento completo.
Esta es la misma lógica que usa tu endpoint individual.
"""
operation_name = "pedimento_completo"
service_data = None
try:
logger.info(f"Procesando pedimento completo - Pedimento: {request_data['pedimento']}")
# Validar datos de entrada
await _validate_request_data(request_data)
# Obtener servicio
service_data = await _get_pedimento_service(
pedimento_id=request_data['pedimento'],
service_type=3,
operation_name=operation_name
)
# Actualizar estado a "En proceso"
update_success = await _update_service_status(
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
)
if not update_success:
raise Exception("Error al actualizar estado del servicio")
# Obtener credenciales VUCEM
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
if not contribuyente_id:
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
raise Exception("ID de contribuyente no encontrado")
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
# Procesar petición SOAP
soap_response = await get_soap_pedimento_completo(
credenciales=credentials,
response_service=service_data,
soap_controller=soap_controller
)
if not soap_response:
raise Exception("Error en la petición SOAP")
# Actualizar datos del pedimento
xml_content = soap_response.get('xml_content', {})
if xml_content:
update_content = {k: v for k, v in xml_content.items() if k != 'identificadores_ed'}
update_content['existe_expediente'] = True
await rest_controller.put_pedimento(
service_data['pedimento']['id'],
update_content
)
# Procesar COVEs
coves = xml_content.get('coves', [])
if coves:
await _post_coves(
response_service=service_data,
coves=coves
)
# Procesar documentos digitalizados
identificadores_ed = xml_content.get('identificadores_ed', [])
if identificadores_ed:
await _post_edocuments(
response_service=service_data,
identificadores_ed=identificadores_ed
)
# Finalizar servicio
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
return {
"success": True,
"pedimento_id": request_data['pedimento'],
"message": "Pedimento procesado exitosamente"
}
except Exception as e:
logger.error(f"Error en pedimento {request_data['pedimento']}: {e}")
if service_data:
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
return {
"success": False,
"pedimento_id": request_data['pedimento'],
"error": str(e)
}
@celery_app.task(bind=True) @celery_app.task(bind=True)
def partidas_task(self, **kwargs): def partidas_task(self, **kwargs):
"""Tarea asíncrona para obtener partidas""" """Tarea asíncrona para obtener partidas"""