From d29cfcb00c5ce26c95be29573356f0e62559c901 Mon Sep 17 00:00:00 2001 From: Dulce Date: Thu, 16 Apr 2026 07:14:56 -0600 Subject: [PATCH] fix/forzar el procesamiento de un pedimento cargado por datastage --- api/api_v1/endpoints/async_pedimentos.py | 52 ++++++- api/api_v1/endpoints/pedimentos.py | 2 +- api/api_v2/modules/pedimentos/routers.py | 3 +- api/api_v2/modules/pedimentos/schemas.py | 2 +- controllers/RESTController.py | 7 +- schemas/serviceSchema.py | 42 +++++- tasks.py | 169 ++++++++++++++++++++++- 7 files changed, 267 insertions(+), 10 deletions(-) diff --git a/api/api_v1/endpoints/async_pedimentos.py b/api/api_v1/endpoints/async_pedimentos.py index f75ea87..6a716d2 100644 --- a/api/api_v1/endpoints/async_pedimentos.py +++ b/api/api_v1/endpoints/async_pedimentos.py @@ -1,11 +1,11 @@ from fastapi import APIRouter, HTTPException, BackgroundTasks from schemas.pedimentoSchema import PedimentoRequest -from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema +from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema from fastapi.responses import JSONResponse from celery_app import celery_app from tasks import pedimento_completo_task import logging -from typing import Dict, Any +from typing import Dict, Any, List import uuid from datetime import datetime @@ -63,6 +63,54 @@ async def async_get_pedimento_completo(request: ServiceRemesaSchema): status_code=500, detail=f"Error al agendar la tarea: {str(e)}" ) + +@router.post("/async/services/pedimento_completo/multiple") +async def async_multi_pedimento_completo(request: MultiPedimentoCompletoSchema): + """ + Agenda la tarea de obtener pedimentos completos de forma asíncrona para MÚLTIPLES pedimentos. + + Args: + request: MultiPedimentoCompletoSchema con lista de pedimentos y organización + + Returns: + JSONResponse con task_id para consultar el estado de la tarea + + Raises: + HTTPException: En caso de errores de validación + """ + try: + pedimentos_ids = request.pedimentos + organizacion = request.organizacion + + logger.info(f"Agendando tarea para {len(pedimentos_ids)} pedimentos completos") + + # Agendar la tarea en Celery + from tasks import multi_pedimento_completo_task + task = multi_pedimento_completo_task.delay(pedimentos_ids, organizacion) + + # Crear respuesta inmediata (EXACTAMENTE igual a tus otras respuestas) + response_data = { + "success": True, + "message": f"Tarea agendada exitosamente para {len(pedimentos_ids)} pedimentos. La consulta de pedimentos completos se está procesando en segundo plano.", + "task_id": task.id, + "total_pedimentos": len(pedimentos_ids), + "pedimentos": pedimentos_ids, + "organizacion": organizacion, + "status": "PENDING", + "timestamp": datetime.utcnow().isoformat(), + "estimated_completion": f"{len(pedimentos_ids) * 2}-{len(pedimentos_ids) * 5} minutos", + "check_status_url": f"/async/task-status/{task.id}" + } + + logger.info(f"Tarea múltiple agendada exitosamente - Task ID: {task.id}") + return JSONResponse(content=response_data, status_code=202) + + except Exception as e: + logger.error(f"Error al agendar tarea múltiple de pedimento completo: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al agendar la tarea: {str(e)}" + ) @router.get("/async/task-status/{task_id}") async def get_task_status(task_id: str): diff --git a/api/api_v1/endpoints/pedimentos.py b/api/api_v1/endpoints/pedimentos.py index 8507ff0..668b42e 100644 --- a/api/api_v1/endpoints/pedimentos.py +++ b/api/api_v1/endpoints/pedimentos.py @@ -1,6 +1,6 @@ from fastapi import APIRouter, HTTPException from schemas.pedimentoSchema import PedimentoRequest -from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema +from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema import asyncio import logging logger = logging.getLogger("app.api") diff --git a/api/api_v2/modules/pedimentos/routers.py b/api/api_v2/modules/pedimentos/routers.py index cc7e914..f1b5921 100644 --- a/api/api_v2/modules/pedimentos/routers.py +++ b/api/api_v2/modules/pedimentos/routers.py @@ -6,8 +6,9 @@ from .tasks import process_pedimento_completo_request from .services import put_pedimento_data_vu from api.api_v2.modules.tasks.services import register_task import logging -logger = logging.getLogger("app.api") +from celery import group, chord, chain +logger = logging.getLogger("app.api") router = APIRouter() @router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED) diff --git a/api/api_v2/modules/pedimentos/schemas.py b/api/api_v2/modules/pedimentos/schemas.py index 8d49a3f..4516381 100644 --- a/api/api_v2/modules/pedimentos/schemas.py +++ b/api/api_v2/modules/pedimentos/schemas.py @@ -1,4 +1,4 @@ -from typing import Optional, Union, Dict, Any +from typing import Optional, Union, Dict, Any, List from uuid import UUID from datetime import datetime # CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione diff --git a/controllers/RESTController.py b/controllers/RESTController.py index 991c5c5..39ff5ef 100644 --- a/controllers/RESTController.py +++ b/controllers/RESTController.py @@ -242,7 +242,10 @@ class APIController: """ Método para obtener la lista de servicios desde la API. """ - return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}') + # return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}') + # eliminar filtro de estado, estado tiene 4 tipos 1: En espera, 2: Procesando, 3: Finalizado y 4: Error + # lo elimine para poder ejecutar la operacion siempre que sea necesario + return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&servicio={service_type}') async def get_pedimento(self, pedimento_id: str) -> Dict[str, Any]: """ @@ -497,11 +500,13 @@ class APIController: try: async with httpx.AsyncClient(timeout=self.timeout) as client: logger.info(f"Haciendo petición {method} a {url}") + print(f"Haciendo petición {method} a {url}") if method.upper() == 'GET': response = await client.get(url, headers=self.headers) elif method.upper() == 'POST': response = await client.post(url, json=data, headers=self.headers) + print(f"response >>>> {response}") elif method.upper() == 'PUT': response = await client.put(url, json=data, headers=self.headers) elif method.upper() == 'DELETE': diff --git a/schemas/serviceSchema.py b/schemas/serviceSchema.py index 4bbacba..479b120 100644 --- a/schemas/serviceSchema.py +++ b/schemas/serviceSchema.py @@ -1,6 +1,5 @@ from pydantic import BaseModel, Field, field_validator -from typing import Optional - +from typing import Optional, List, Union class ServiceBaseSchema(BaseModel): """Esquema base para servicios con campos comunes""" @@ -43,4 +42,41 @@ class ServiceRemesaSchema(BaseModel): def validate_string_fields(cls, v): if not v or not v.strip(): raise ValueError('Los campos de texto no pueden estar vacíos') - return v.strip() \ No newline at end of file + return v.strip() + +class MultiPedimentoCompletoSchema(BaseModel): + """Esquema para procesar múltiples pedimentos de forma asíncrona""" + organizacion: str = Field(..., description="ID de la organización") + pedimentos: List[str] = Field( + ..., + description="Lista de IDs de pedimentos a procesar", + min_length=1, + max_length=200 + ) + + @field_validator('organizacion') + def validate_organizacion(cls, v): + if not v or not v.strip(): + raise ValueError('La organización no puede estar vacía') + return v.strip() + + @field_validator('pedimentos') + def validate_pedimentos(cls, v): + if not v: + raise ValueError('Debe proporcionar al menos un pedimento') + if len(v) > 200: + raise ValueError(f'Máximo 200 pedimentos por solicitud. Recibidos: {len(v)}') + # Eliminar duplicados y vacíos + pedimentos_limpios = [p.strip() for p in v if p and p.strip()] + if not pedimentos_limpios: + raise ValueError('La lista de pedimentos no puede estar vacía') + # Eliminar duplicados + return list(set(pedimentos_limpios)) + + class Config: + json_schema_extra = { + "example": { + "organizacion": "1", + "pedimentos": ["123", "456", "789"] + } + } \ No newline at end of file diff --git a/tasks.py b/tasks.py index 6ad5e63..ab219d3 100644 --- a/tasks.py +++ b/tasks.py @@ -2,7 +2,7 @@ from celery import Celery from celery_app import celery_app import asyncio import logging -from typing import Dict, Any +from typing import Dict, Any, List from contextlib import asynccontextmanager from controllers.RESTController import rest_controller from controllers.SOAPController import soap_controller @@ -158,6 +158,173 @@ def pedimento_completo_task(self, request_data: Dict[str, Any]): return run_async_task(_execute_pedimento_completo) +@celery_app.task(bind=True, name='tasks.multi_pedimento_completo_task') +def multi_pedimento_completo_task(self, pedimentos: List[str], organizacion: str): + """ + Tarea asíncrona para procesar MÚLTIPLES pedimentos completos. + + Args: + pedimentos: Lista de IDs de pedimentos a procesar + organizacion: ID de la organización + """ + import time + from datetime import datetime + + start_time = time.time() + + results = { + "total": len(pedimentos), + "successful": [], + "failed": [], + "started_at": datetime.utcnow().isoformat() + } + + total = len(pedimentos) + + for idx, pedimento_id in enumerate(pedimentos, 1): + try: + # Actualizar progreso (igual que en tus otras tareas) + self.update_state( + state='PROGRESS', + meta={ + 'status': f'Procesando pedimento {idx}/{total}', + 'current': idx, + 'total': total, + 'current_pedimento': pedimento_id, + 'percentage': round((idx / total) * 100, 2) + } + ) + + logger.info(f"[MULTI] Procesando pedimento {idx}/{total}: {pedimento_id}") + + # Preparar datos exactamente como lo espera la tarea individual + request_data = { + "pedimento": pedimento_id, + "organizacion": organizacion + } + + # Reutilizar la lógica de la tarea individual + # Esto ejecuta el mismo código que tu endpoint individual + async def _execute(): + return await _execute_pedimento_completo_logic(request_data) + + result = run_async_task(_execute) + + results["successful"].append({ + "pedimento_id": pedimento_id, + "result": result + }) + + logger.info(f"[MULTI] Pedimento {pedimento_id} procesado exitosamente") + + except Exception as e: + logger.error(f"[MULTI] Error procesando pedimento {pedimento_id}: {e}") + results["failed"].append({ + "pedimento_id": pedimento_id, + "error": str(e) + }) + + elapsed_time = time.time() - start_time + + results["completed_at"] = datetime.utcnow().isoformat() + results["elapsed_seconds"] = round(elapsed_time, 2) + results["success_count"] = len(results["successful"]) + results["failed_count"] = len(results["failed"]) + + return results + +async def _execute_pedimento_completo_logic(request_data: dict) -> dict: + """ + Lógica compartida para procesar un pedimento completo. + Esta es la misma lógica que usa tu endpoint individual. + """ + operation_name = "pedimento_completo" + service_data = None + + try: + logger.info(f"Procesando pedimento completo - Pedimento: {request_data['pedimento']}") + + # Validar datos de entrada + await _validate_request_data(request_data) + + # Obtener servicio + service_data = await _get_pedimento_service( + pedimento_id=request_data['pedimento'], + service_type=3, + operation_name=operation_name + ) + + # Actualizar estado a "En proceso" + update_success = await _update_service_status( + service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name + ) + + if not update_success: + raise Exception("Error al actualizar estado del servicio") + + # Obtener credenciales VUCEM + contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '') + if not contribuyente_id: + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + raise Exception("ID de contribuyente no encontrado") + + credentials = await _get_vucem_credentials(contribuyente_id, operation_name) + + # Procesar petición SOAP + soap_response = await get_soap_pedimento_completo( + credenciales=credentials, + response_service=service_data, + soap_controller=soap_controller + ) + + if not soap_response: + raise Exception("Error en la petición SOAP") + + # Actualizar datos del pedimento + xml_content = soap_response.get('xml_content', {}) + if xml_content: + update_content = {k: v for k, v in xml_content.items() if k != 'identificadores_ed'} + update_content['existe_expediente'] = True + await rest_controller.put_pedimento( + service_data['pedimento']['id'], + update_content + ) + + # Procesar COVEs + coves = xml_content.get('coves', []) + if coves: + await _post_coves( + response_service=service_data, + coves=coves + ) + + # Procesar documentos digitalizados + identificadores_ed = xml_content.get('identificadores_ed', []) + if identificadores_ed: + await _post_edocuments( + response_service=service_data, + identificadores_ed=identificadores_ed + ) + + # Finalizar servicio + await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name) + + return { + "success": True, + "pedimento_id": request_data['pedimento'], + "message": "Pedimento procesado exitosamente" + } + + except Exception as e: + logger.error(f"Error en pedimento {request_data['pedimento']}: {e}") + if service_data: + await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name) + return { + "success": False, + "pedimento_id": request_data['pedimento'], + "error": str(e) + } + @celery_app.task(bind=True) def partidas_task(self, **kwargs): """Tarea asíncrona para obtener partidas"""