Merge pull request 'fix/forzar el procesamiento de un pedimento cargado por datastage' (#6) from fix/procesar-pedimento into main
Reviewed-on: #6
This commit is contained in:
@@ -1,11 +1,11 @@
|
|||||||
from fastapi import APIRouter, HTTPException, BackgroundTasks
|
from fastapi import APIRouter, HTTPException, BackgroundTasks
|
||||||
from schemas.pedimentoSchema import PedimentoRequest
|
from schemas.pedimentoSchema import PedimentoRequest
|
||||||
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema
|
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
|
||||||
from fastapi.responses import JSONResponse
|
from fastapi.responses import JSONResponse
|
||||||
from celery_app import celery_app
|
from celery_app import celery_app
|
||||||
from tasks import pedimento_completo_task
|
from tasks import pedimento_completo_task
|
||||||
import logging
|
import logging
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any, List
|
||||||
import uuid
|
import uuid
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
@@ -64,6 +64,54 @@ async def async_get_pedimento_completo(request: ServiceRemesaSchema):
|
|||||||
detail=f"Error al agendar la tarea: {str(e)}"
|
detail=f"Error al agendar la tarea: {str(e)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@router.post("/async/services/pedimento_completo/multiple")
|
||||||
|
async def async_multi_pedimento_completo(request: MultiPedimentoCompletoSchema):
|
||||||
|
"""
|
||||||
|
Agenda la tarea de obtener pedimentos completos de forma asíncrona para MÚLTIPLES pedimentos.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request: MultiPedimentoCompletoSchema con lista de pedimentos y organización
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
JSONResponse con task_id para consultar el estado de la tarea
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException: En caso de errores de validación
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
pedimentos_ids = request.pedimentos
|
||||||
|
organizacion = request.organizacion
|
||||||
|
|
||||||
|
logger.info(f"Agendando tarea para {len(pedimentos_ids)} pedimentos completos")
|
||||||
|
|
||||||
|
# Agendar la tarea en Celery
|
||||||
|
from tasks import multi_pedimento_completo_task
|
||||||
|
task = multi_pedimento_completo_task.delay(pedimentos_ids, organizacion)
|
||||||
|
|
||||||
|
# Crear respuesta inmediata (EXACTAMENTE igual a tus otras respuestas)
|
||||||
|
response_data = {
|
||||||
|
"success": True,
|
||||||
|
"message": f"Tarea agendada exitosamente para {len(pedimentos_ids)} pedimentos. La consulta de pedimentos completos se está procesando en segundo plano.",
|
||||||
|
"task_id": task.id,
|
||||||
|
"total_pedimentos": len(pedimentos_ids),
|
||||||
|
"pedimentos": pedimentos_ids,
|
||||||
|
"organizacion": organizacion,
|
||||||
|
"status": "PENDING",
|
||||||
|
"timestamp": datetime.utcnow().isoformat(),
|
||||||
|
"estimated_completion": f"{len(pedimentos_ids) * 2}-{len(pedimentos_ids) * 5} minutos",
|
||||||
|
"check_status_url": f"/async/task-status/{task.id}"
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(f"Tarea múltiple agendada exitosamente - Task ID: {task.id}")
|
||||||
|
return JSONResponse(content=response_data, status_code=202)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error al agendar tarea múltiple de pedimento completo: {e}")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail=f"Error al agendar la tarea: {str(e)}"
|
||||||
|
)
|
||||||
|
|
||||||
@router.get("/async/task-status/{task_id}")
|
@router.get("/async/task-status/{task_id}")
|
||||||
async def get_task_status(task_id: str):
|
async def get_task_status(task_id: str):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
from fastapi import APIRouter, HTTPException
|
from fastapi import APIRouter, HTTPException
|
||||||
from schemas.pedimentoSchema import PedimentoRequest
|
from schemas.pedimentoSchema import PedimentoRequest
|
||||||
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema
|
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
logger = logging.getLogger("app.api")
|
logger = logging.getLogger("app.api")
|
||||||
|
|||||||
@@ -6,8 +6,9 @@ from .tasks import process_pedimento_completo_request
|
|||||||
from .services import put_pedimento_data_vu
|
from .services import put_pedimento_data_vu
|
||||||
from api.api_v2.modules.tasks.services import register_task
|
from api.api_v2.modules.tasks.services import register_task
|
||||||
import logging
|
import logging
|
||||||
logger = logging.getLogger("app.api")
|
from celery import group, chord, chain
|
||||||
|
|
||||||
|
logger = logging.getLogger("app.api")
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
@router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED)
|
@router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED)
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from typing import Optional, Union, Dict, Any
|
from typing import Optional, Union, Dict, Any, List
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
# CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione
|
# CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione
|
||||||
|
|||||||
@@ -242,7 +242,10 @@ class APIController:
|
|||||||
"""
|
"""
|
||||||
Método para obtener la lista de servicios desde la API.
|
Método para obtener la lista de servicios desde la API.
|
||||||
"""
|
"""
|
||||||
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}')
|
# return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}')
|
||||||
|
# eliminar filtro de estado, estado tiene 4 tipos 1: En espera, 2: Procesando, 3: Finalizado y 4: Error
|
||||||
|
# lo elimine para poder ejecutar la operacion siempre que sea necesario
|
||||||
|
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&servicio={service_type}')
|
||||||
|
|
||||||
async def get_pedimento(self, pedimento_id: str) -> Dict[str, Any]:
|
async def get_pedimento(self, pedimento_id: str) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
@@ -497,11 +500,13 @@ class APIController:
|
|||||||
try:
|
try:
|
||||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||||
logger.info(f"Haciendo petición {method} a {url}")
|
logger.info(f"Haciendo petición {method} a {url}")
|
||||||
|
print(f"Haciendo petición {method} a {url}")
|
||||||
|
|
||||||
if method.upper() == 'GET':
|
if method.upper() == 'GET':
|
||||||
response = await client.get(url, headers=self.headers)
|
response = await client.get(url, headers=self.headers)
|
||||||
elif method.upper() == 'POST':
|
elif method.upper() == 'POST':
|
||||||
response = await client.post(url, json=data, headers=self.headers)
|
response = await client.post(url, json=data, headers=self.headers)
|
||||||
|
print(f"response >>>> {response}")
|
||||||
elif method.upper() == 'PUT':
|
elif method.upper() == 'PUT':
|
||||||
response = await client.put(url, json=data, headers=self.headers)
|
response = await client.put(url, json=data, headers=self.headers)
|
||||||
elif method.upper() == 'DELETE':
|
elif method.upper() == 'DELETE':
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
from pydantic import BaseModel, Field, field_validator
|
from pydantic import BaseModel, Field, field_validator
|
||||||
from typing import Optional
|
from typing import Optional, List, Union
|
||||||
|
|
||||||
|
|
||||||
class ServiceBaseSchema(BaseModel):
|
class ServiceBaseSchema(BaseModel):
|
||||||
"""Esquema base para servicios con campos comunes"""
|
"""Esquema base para servicios con campos comunes"""
|
||||||
@@ -44,3 +43,40 @@ class ServiceRemesaSchema(BaseModel):
|
|||||||
if not v or not v.strip():
|
if not v or not v.strip():
|
||||||
raise ValueError('Los campos de texto no pueden estar vacíos')
|
raise ValueError('Los campos de texto no pueden estar vacíos')
|
||||||
return v.strip()
|
return v.strip()
|
||||||
|
|
||||||
|
class MultiPedimentoCompletoSchema(BaseModel):
|
||||||
|
"""Esquema para procesar múltiples pedimentos de forma asíncrona"""
|
||||||
|
organizacion: str = Field(..., description="ID de la organización")
|
||||||
|
pedimentos: List[str] = Field(
|
||||||
|
...,
|
||||||
|
description="Lista de IDs de pedimentos a procesar",
|
||||||
|
min_length=1,
|
||||||
|
max_length=200
|
||||||
|
)
|
||||||
|
|
||||||
|
@field_validator('organizacion')
|
||||||
|
def validate_organizacion(cls, v):
|
||||||
|
if not v or not v.strip():
|
||||||
|
raise ValueError('La organización no puede estar vacía')
|
||||||
|
return v.strip()
|
||||||
|
|
||||||
|
@field_validator('pedimentos')
|
||||||
|
def validate_pedimentos(cls, v):
|
||||||
|
if not v:
|
||||||
|
raise ValueError('Debe proporcionar al menos un pedimento')
|
||||||
|
if len(v) > 200:
|
||||||
|
raise ValueError(f'Máximo 200 pedimentos por solicitud. Recibidos: {len(v)}')
|
||||||
|
# Eliminar duplicados y vacíos
|
||||||
|
pedimentos_limpios = [p.strip() for p in v if p and p.strip()]
|
||||||
|
if not pedimentos_limpios:
|
||||||
|
raise ValueError('La lista de pedimentos no puede estar vacía')
|
||||||
|
# Eliminar duplicados
|
||||||
|
return list(set(pedimentos_limpios))
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
json_schema_extra = {
|
||||||
|
"example": {
|
||||||
|
"organizacion": "1",
|
||||||
|
"pedimentos": ["123", "456", "789"]
|
||||||
|
}
|
||||||
|
}
|
||||||
169
tasks.py
169
tasks.py
@@ -2,7 +2,7 @@ from celery import Celery
|
|||||||
from celery_app import celery_app
|
from celery_app import celery_app
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any, List
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from controllers.RESTController import rest_controller
|
from controllers.RESTController import rest_controller
|
||||||
from controllers.SOAPController import soap_controller
|
from controllers.SOAPController import soap_controller
|
||||||
@@ -158,6 +158,173 @@ def pedimento_completo_task(self, request_data: Dict[str, Any]):
|
|||||||
|
|
||||||
return run_async_task(_execute_pedimento_completo)
|
return run_async_task(_execute_pedimento_completo)
|
||||||
|
|
||||||
|
@celery_app.task(bind=True, name='tasks.multi_pedimento_completo_task')
|
||||||
|
def multi_pedimento_completo_task(self, pedimentos: List[str], organizacion: str):
|
||||||
|
"""
|
||||||
|
Tarea asíncrona para procesar MÚLTIPLES pedimentos completos.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pedimentos: Lista de IDs de pedimentos a procesar
|
||||||
|
organizacion: ID de la organización
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
results = {
|
||||||
|
"total": len(pedimentos),
|
||||||
|
"successful": [],
|
||||||
|
"failed": [],
|
||||||
|
"started_at": datetime.utcnow().isoformat()
|
||||||
|
}
|
||||||
|
|
||||||
|
total = len(pedimentos)
|
||||||
|
|
||||||
|
for idx, pedimento_id in enumerate(pedimentos, 1):
|
||||||
|
try:
|
||||||
|
# Actualizar progreso (igual que en tus otras tareas)
|
||||||
|
self.update_state(
|
||||||
|
state='PROGRESS',
|
||||||
|
meta={
|
||||||
|
'status': f'Procesando pedimento {idx}/{total}',
|
||||||
|
'current': idx,
|
||||||
|
'total': total,
|
||||||
|
'current_pedimento': pedimento_id,
|
||||||
|
'percentage': round((idx / total) * 100, 2)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"[MULTI] Procesando pedimento {idx}/{total}: {pedimento_id}")
|
||||||
|
|
||||||
|
# Preparar datos exactamente como lo espera la tarea individual
|
||||||
|
request_data = {
|
||||||
|
"pedimento": pedimento_id,
|
||||||
|
"organizacion": organizacion
|
||||||
|
}
|
||||||
|
|
||||||
|
# Reutilizar la lógica de la tarea individual
|
||||||
|
# Esto ejecuta el mismo código que tu endpoint individual
|
||||||
|
async def _execute():
|
||||||
|
return await _execute_pedimento_completo_logic(request_data)
|
||||||
|
|
||||||
|
result = run_async_task(_execute)
|
||||||
|
|
||||||
|
results["successful"].append({
|
||||||
|
"pedimento_id": pedimento_id,
|
||||||
|
"result": result
|
||||||
|
})
|
||||||
|
|
||||||
|
logger.info(f"[MULTI] Pedimento {pedimento_id} procesado exitosamente")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[MULTI] Error procesando pedimento {pedimento_id}: {e}")
|
||||||
|
results["failed"].append({
|
||||||
|
"pedimento_id": pedimento_id,
|
||||||
|
"error": str(e)
|
||||||
|
})
|
||||||
|
|
||||||
|
elapsed_time = time.time() - start_time
|
||||||
|
|
||||||
|
results["completed_at"] = datetime.utcnow().isoformat()
|
||||||
|
results["elapsed_seconds"] = round(elapsed_time, 2)
|
||||||
|
results["success_count"] = len(results["successful"])
|
||||||
|
results["failed_count"] = len(results["failed"])
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
async def _execute_pedimento_completo_logic(request_data: dict) -> dict:
|
||||||
|
"""
|
||||||
|
Lógica compartida para procesar un pedimento completo.
|
||||||
|
Esta es la misma lógica que usa tu endpoint individual.
|
||||||
|
"""
|
||||||
|
operation_name = "pedimento_completo"
|
||||||
|
service_data = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info(f"Procesando pedimento completo - Pedimento: {request_data['pedimento']}")
|
||||||
|
|
||||||
|
# Validar datos de entrada
|
||||||
|
await _validate_request_data(request_data)
|
||||||
|
|
||||||
|
# Obtener servicio
|
||||||
|
service_data = await _get_pedimento_service(
|
||||||
|
pedimento_id=request_data['pedimento'],
|
||||||
|
service_type=3,
|
||||||
|
operation_name=operation_name
|
||||||
|
)
|
||||||
|
|
||||||
|
# Actualizar estado a "En proceso"
|
||||||
|
update_success = await _update_service_status(
|
||||||
|
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
||||||
|
)
|
||||||
|
|
||||||
|
if not update_success:
|
||||||
|
raise Exception("Error al actualizar estado del servicio")
|
||||||
|
|
||||||
|
# Obtener credenciales VUCEM
|
||||||
|
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
||||||
|
if not contribuyente_id:
|
||||||
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||||
|
raise Exception("ID de contribuyente no encontrado")
|
||||||
|
|
||||||
|
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
||||||
|
|
||||||
|
# Procesar petición SOAP
|
||||||
|
soap_response = await get_soap_pedimento_completo(
|
||||||
|
credenciales=credentials,
|
||||||
|
response_service=service_data,
|
||||||
|
soap_controller=soap_controller
|
||||||
|
)
|
||||||
|
|
||||||
|
if not soap_response:
|
||||||
|
raise Exception("Error en la petición SOAP")
|
||||||
|
|
||||||
|
# Actualizar datos del pedimento
|
||||||
|
xml_content = soap_response.get('xml_content', {})
|
||||||
|
if xml_content:
|
||||||
|
update_content = {k: v for k, v in xml_content.items() if k != 'identificadores_ed'}
|
||||||
|
update_content['existe_expediente'] = True
|
||||||
|
await rest_controller.put_pedimento(
|
||||||
|
service_data['pedimento']['id'],
|
||||||
|
update_content
|
||||||
|
)
|
||||||
|
|
||||||
|
# Procesar COVEs
|
||||||
|
coves = xml_content.get('coves', [])
|
||||||
|
if coves:
|
||||||
|
await _post_coves(
|
||||||
|
response_service=service_data,
|
||||||
|
coves=coves
|
||||||
|
)
|
||||||
|
|
||||||
|
# Procesar documentos digitalizados
|
||||||
|
identificadores_ed = xml_content.get('identificadores_ed', [])
|
||||||
|
if identificadores_ed:
|
||||||
|
await _post_edocuments(
|
||||||
|
response_service=service_data,
|
||||||
|
identificadores_ed=identificadores_ed
|
||||||
|
)
|
||||||
|
|
||||||
|
# Finalizar servicio
|
||||||
|
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"pedimento_id": request_data['pedimento'],
|
||||||
|
"message": "Pedimento procesado exitosamente"
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error en pedimento {request_data['pedimento']}: {e}")
|
||||||
|
if service_data:
|
||||||
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"pedimento_id": request_data['pedimento'],
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
|
||||||
@celery_app.task(bind=True)
|
@celery_app.task(bind=True)
|
||||||
def partidas_task(self, **kwargs):
|
def partidas_task(self, **kwargs):
|
||||||
"""Tarea asíncrona para obtener partidas"""
|
"""Tarea asíncrona para obtener partidas"""
|
||||||
|
|||||||
Reference in New Issue
Block a user