8 Commits

18 changed files with 528 additions and 122 deletions

View File

@@ -1,11 +1,11 @@
from fastapi import APIRouter, HTTPException, BackgroundTasks
from schemas.pedimentoSchema import PedimentoRequest
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
from fastapi.responses import JSONResponse
from celery_app import celery_app
from tasks import pedimento_completo_task
import logging
from typing import Dict, Any
from typing import Dict, Any, List
import uuid
from datetime import datetime
@@ -64,6 +64,54 @@ async def async_get_pedimento_completo(request: ServiceRemesaSchema):
detail=f"Error al agendar la tarea: {str(e)}"
)
@router.post("/async/services/pedimento_completo/multiple")
async def async_multi_pedimento_completo(request: MultiPedimentoCompletoSchema):
"""
Agenda la tarea de obtener pedimentos completos de forma asíncrona para MÚLTIPLES pedimentos.
Args:
request: MultiPedimentoCompletoSchema con lista de pedimentos y organización
Returns:
JSONResponse con task_id para consultar el estado de la tarea
Raises:
HTTPException: En caso de errores de validación
"""
try:
pedimentos_ids = request.pedimentos
organizacion = request.organizacion
logger.info(f"Agendando tarea para {len(pedimentos_ids)} pedimentos completos")
# Agendar la tarea en Celery
from tasks import multi_pedimento_completo_task
task = multi_pedimento_completo_task.delay(pedimentos_ids, organizacion)
# Crear respuesta inmediata (EXACTAMENTE igual a tus otras respuestas)
response_data = {
"success": True,
"message": f"Tarea agendada exitosamente para {len(pedimentos_ids)} pedimentos. La consulta de pedimentos completos se está procesando en segundo plano.",
"task_id": task.id,
"total_pedimentos": len(pedimentos_ids),
"pedimentos": pedimentos_ids,
"organizacion": organizacion,
"status": "PENDING",
"timestamp": datetime.utcnow().isoformat(),
"estimated_completion": f"{len(pedimentos_ids) * 2}-{len(pedimentos_ids) * 5} minutos",
"check_status_url": f"/async/task-status/{task.id}"
}
logger.info(f"Tarea múltiple agendada exitosamente - Task ID: {task.id}")
return JSONResponse(content=response_data, status_code=202)
except Exception as e:
logger.error(f"Error al agendar tarea múltiple de pedimento completo: {e}")
raise HTTPException(
status_code=500,
detail=f"Error al agendar la tarea: {str(e)}"
)
@router.get("/async/task-status/{task_id}")
async def get_task_status(task_id: str):
"""

View File

@@ -1,6 +1,6 @@
from fastapi import APIRouter, HTTPException
from schemas.pedimentoSchema import PedimentoRequest
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
import asyncio
import logging
logger = logging.getLogger("app.api")

View File

@@ -20,9 +20,11 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}")
loop.run_until_complete(
register_task(
@@ -73,15 +75,19 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
# En caso de error, actualizar estado
error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}"
logging.error(error_message)
loop.run_until_complete(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=6
try:
loop.run_until_complete(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=6
)
)
)
except Exception as update_error:
logging.error(f"Error al actualizar estado de tarea: {update_error}")
raise
finally:
loop.close()

View File

@@ -122,7 +122,7 @@ async def consume_ws_get_cove(**kwargs):
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=f"vu_COVE_{pedimento_app}_{cove}_ERROR.xml",
document_type=10,
document_type=20,
)
raise Exception("Error en la respuesta del servicio SOAP")
@@ -256,7 +256,7 @@ async def consume_ws_get_acuse_cove(**kwargs):
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=error_file_name,
document_type=10,
document_type=24,
)
except Exception as e:
logger.error(f"Error al guardar respuesta SOAP errónea: {e}")

View File

@@ -1,6 +1,5 @@
import asyncio
import logging
import time
from celery import Celery
from celery_app import celery_app
from typing import Dict, Any
@@ -27,9 +26,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
try:
# Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
loop.run_until_complete(
asyncio.run(
register_task(
task_id=task_id,
status="submitted",
@@ -40,12 +38,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
)
)
# Esperar un momento breve para asegurar que el registro se complete
time.sleep(1)
# Actualizar estado: procesando
logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}")
loop.run_until_complete(
asyncio.run(
update_task(
task_id=task_id,
status="processing",
@@ -56,11 +50,9 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
)
)
# Obtener el COVE
cove_response = loop.run_until_complete(consume_ws_get_cove(**cove_request))
cove_response = asyncio.run(consume_ws_get_cove(**cove_request))
# Actualizar estado: completado
loop.run_until_complete(
asyncio.run(
update_task(
task_id=task_id,
status="completed",
@@ -74,19 +66,23 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
return {"status": "processed", "data": cove_response}
except Exception as e:
# En caso de error, actualizar estado
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message)
loop.run_until_complete(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=8
try:
asyncio.run(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=8
)
)
)
except Exception as update_error:
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
raise
@@ -105,9 +101,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
try:
# Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
loop.run_until_complete(
asyncio.run(
register_task(
task_id=task_id,
status="submitted",
@@ -118,12 +113,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
)
)
# Esperar un momento breve para asegurar que el registro se complete
time.sleep(1)
# Actualizar estado: procesando
logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}")
loop.run_until_complete(
asyncio.run(
update_task(
task_id=task_id,
status="processing",
@@ -134,11 +125,9 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
)
)
# Obtener el acuse del COVE
acuse_response = loop.run_until_complete(consume_ws_get_acuse_cove(**cove_request))
acuse_response = asyncio.run(consume_ws_get_acuse_cove(**cove_request))
# Actualizar estado: completado
loop.run_until_complete(
asyncio.run(
update_task(
task_id=task_id,
status="completed",
@@ -152,18 +141,21 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
return {"status": "processed", "data": acuse_response}
except Exception as e:
# En caso de error, actualizar estado
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message)
loop.run_until_complete(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=9
)
)
raise
try:
asyncio.run(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=9
)
)
except Exception as update_error:
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
raise

View File

@@ -23,9 +23,11 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
edoc_info = edoc_data.get('edoc', {})
edoc_number = edoc_info.get('numero_edoc', 'N/A')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
loop.run_until_complete(
register_task(
@@ -74,16 +76,23 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
except Exception as e:
# En caso de error, actualizar estado
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message)
loop.run_until_complete(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=3
)
)
raise
logger.error(error_message, exc_info=True)
try:
loop.run_until_complete(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=3
)
)
except Exception as update_error:
logger.error(f"Error actualizando estado de tarea: {update_error}")
raise
finally:
# Cerrar el loop para liberar recursos
loop.close()

View File

@@ -102,7 +102,7 @@ async def consume_ws_get_partida(**kwargs):
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=error_file_name,
document_type=10,
document_type=18,
)
except Exception as e:
logger.error(f"Error al guardar la respuesta de error: {e}")

View File

@@ -24,9 +24,12 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
partida_info = partida_request.get('partida', {})
partida_numero = partida_info.get('numero', 'N/A')
# Crear un nuevo event loop para esta tarea
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[PARTIDA] Registrando inicio de tarea {task_id}")
loop.run_until_complete(
register_task(
@@ -39,9 +42,6 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
)
)
# Esperar un momento breve para asegurar que el registro se complete
time.sleep(1)
# Actualizar estado: procesando
logger.info(f"[PARTIDA] Actualizando estado a processing para tarea {task_id}")
loop.run_until_complete(
@@ -76,15 +76,20 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
# En caso de error, actualizar estado
error_message = f"Error al procesar partida {partida_numero} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message)
loop.run_until_complete(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=4
try:
loop.run_until_complete(
update_task(
task_id=task_id,
status="failed",
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=4
)
)
)
except Exception as update_error:
logger.error(f"Error al actualizar estado de tarea: {update_error}")
raise
finally:
# Limpiar el event loop
loop.close()

View File

@@ -6,8 +6,9 @@ from .tasks import process_pedimento_completo_request
from .services import put_pedimento_data_vu
from api.api_v2.modules.tasks.services import register_task
import logging
logger = logging.getLogger("app.api")
from celery import group, chord, chain
logger = logging.getLogger("app.api")
router = APIRouter()
@router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED)

View File

@@ -1,4 +1,4 @@
from typing import Optional, Union, Dict, Any
from typing import Optional, Union, Dict, Any, List
from uuid import UUID
from datetime import datetime
# CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione

View File

@@ -95,7 +95,7 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'),
file_name=f"vu_PC_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml",
document_type=10,
document_type=14,
)
raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP")

View File

@@ -17,7 +17,9 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
Returns:
dict: Resultado del procesamiento con estado y detalles
"""
loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task_id = self.request.id
servicio = 3 # Código para Pedimento Completo
pedimento_id = pedimento_data.get('pedimento', {}).get('id')
@@ -85,3 +87,5 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
# Re-lanzar la excepción para que Celery la marque como fallida
raise
finally:
loop.close()

View File

@@ -88,15 +88,49 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
# Generar nombre de archivo
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml"
if soap_error(soap_response):
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
document_response = await remesa_rest_controller.post_document(
soap_response=soap_response,
organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'),
file_name=file_name,
document_type=10,
)
raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP")
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
document_response = await remesa_rest_controller.post_document(
soap_response=soap_response,
organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'),
file_name=file_name,
document_type=16,
)
# Aquí necesitamos extraer el mensaje de error real
error_message = "Error en la respuesta del servicio SOAP"
# Intentar extraer mensaje de error del XML de respuesta
if hasattr(soap_response, 'text') and soap_response.text:
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(soap_response.text)
# Buscar mensajes de error comunes en respuestas SOAP de VUCEM
# Esto puede variar según el servicio, pero comúnmente buscan:
for fault in root.findall('.//{http://schemas.xmlsoap.org/soap/envelope/}Fault'):
faultcode = fault.find('.//faultcode')
faultstring = fault.find('.//faultstring')
if faultstring is not None and faultstring.text:
error_message = faultstring.text
break
# También podría estar en una estructura de error específica de VUCEM
for error in root.findall('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}error'):
msg = error.find('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}message')
if msg is not None and msg.text:
error_message = msg.text
break
except Exception as parse_error:
logger.error(f"Error al parsear respuesta SOAP para extraer mensaje: {parse_error}")
# Lanzar excepción con el mensaje de error real
raise HTTPException(
status_code=500,
detail=f"Error en la respuesta del servicio SOAP: {error_message}"
)
# Enviar documento
try:

View File

@@ -17,13 +17,16 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
Returns:
dict: Resultado del procesamiento con estado y detalles
"""
loop = asyncio.get_event_loop()
task_id = self.request.id
servicio = 5 # Código para Pedimento Remesas
pedimento_id = remesa_request.get('pedimento', {}).get('id')
organizacion_id = remesa_request.get('pedimento', {}).get('organizacion')
remesa_num = remesa_request.get('remesa', 'N/A')
# Crear un NUEVO event loop para esta tarea (evita problemas de loop cerrado)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Actualizar estado a processing
loop.run_until_complete(
@@ -49,7 +52,6 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=servicio,
result=result
)
)
@@ -60,20 +62,43 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
# Actualizar estado a failed
try:
loop.run_until_complete(
update_task(
task_id=task_id,
message=f"Error al procesar remesa {remesa_num}: {str(e)}",
status="failed",
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=servicio,
error=str(e)
# Verificar si el loop aún está abierto
if not loop.is_closed():
loop.run_until_complete(
update_task(
task_id=task_id,
message=f"Error al procesar remesa {remesa_num}: {str(e)}",
status="failed",
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=servicio,
)
)
)
else:
# Si el loop está cerrado, crear uno nuevo temporal
logger.warning(f"Loop cerrado, creando loop temporal para actualizar error")
temp_loop = asyncio.new_event_loop()
asyncio.set_event_loop(temp_loop)
try:
temp_loop.run_until_complete(
update_task(
task_id=task_id,
message=f"Error al procesar remesa {remesa_num}: {str(e)}",
status="failed",
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=servicio
)
)
finally:
temp_loop.close()
except Exception as update_error:
logger.error(f"Error actualizando estado de tarea: {update_error}")
# Re-lanzar la excepción para que Celery la marque como fallida
raise
finally:
# Limpiar el event loop
if not loop.is_closed():
loop.close()

View File

@@ -55,8 +55,16 @@ async def update_task(
json=update_data,
headers=headers
)
if response.status_code == 404:
logger.warning(f"Tarea {task_id} no encontrada, intentando crearla...")
return await _create_and_update_task(
task_id, message, status, pedimento_id, organizacion_id, servicio
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
logger.error(f"Error al actualizar tarea {task_id}: {str(e)}")
raise HTTPException(
@@ -81,6 +89,72 @@ async def update_task(
)
)
async def _create_and_update_task(
task_id: str,
message: str,
status: str,
pedimento_id: str,
organizacion_id: str,
servicio: int
) -> Dict[str, Any]:
"""
Función interna para crear una tarea y luego actualizarla.
"""
try:
# Primero crear la tarea
logger.info(f"Creando tarea {task_id} antes de actualizar")
headers = {
"Authorization": f"Token {settings.API_TOKEN}"
}
create_data = {
"task_id": task_id,
"message": message,
"status": status,
"pedimento": pedimento_id,
"organizacion": organizacion_id,
"servicio": servicio
}
async with httpx.AsyncClient() as client:
# Crear la tarea
create_response = await client.post(
f"{settings.API_URL}/tasks/tasks/",
json=create_data,
headers=headers
)
create_response.raise_for_status()
logger.info(f"Tarea {task_id} creada exitosamente")
# Actualizar la tarea recién creada
url = f"{settings.API_URL}/tasks/tasks/{task_id}/"
update_response = await client.put(
url,
json=create_data,
headers=headers
)
update_response.raise_for_status()
logger.info(f"Tarea {task_id} actualizada exitosamente después de crear")
return update_response.json()
except httpx.HTTPError as e:
logger.error(f"Error al crear/actualizar tarea {task_id}: {str(e)}")
raise HTTPException(
status_code=500,
detail=create_error_response(
message="Error al crear la tarea",
errors=[str(e)],
metadata={
"task_id": task_id,
"status": status
}
)
)
async def register_task(
task_id: str,
message: str,

View File

@@ -46,7 +46,7 @@ class APIRESTController:
"""
Método asíncrono para hacer peticiones a la API usando httpx.
"""
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/') }"
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
logger.warning(f"Realizando petición {method} a {url}")
try:
@@ -242,7 +242,10 @@ class APIController:
"""
Método para obtener la lista de servicios desde la API.
"""
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}')
# return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}')
# eliminar filtro de estado, estado tiene 4 tipos 1: En espera, 2: Procesando, 3: Finalizado y 4: Error
# lo elimine para poder ejecutar la operacion siempre que sea necesario
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&servicio={service_type}')
async def get_pedimento(self, pedimento_id: str) -> Dict[str, Any]:
"""
@@ -497,11 +500,13 @@ class APIController:
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
logger.info(f"Haciendo petición {method} a {url}")
print(f"Haciendo petición {method} a {url}")
if method.upper() == 'GET':
response = await client.get(url, headers=self.headers)
elif method.upper() == 'POST':
response = await client.post(url, json=data, headers=self.headers)
print(f"response >>>> {response}")
elif method.upper() == 'PUT':
response = await client.put(url, json=data, headers=self.headers)
elif method.upper() == 'DELETE':

View File

@@ -1,6 +1,5 @@
from pydantic import BaseModel, Field, field_validator
from typing import Optional
from typing import Optional, List, Union
class ServiceBaseSchema(BaseModel):
"""Esquema base para servicios con campos comunes"""
@@ -44,3 +43,40 @@ class ServiceRemesaSchema(BaseModel):
if not v or not v.strip():
raise ValueError('Los campos de texto no pueden estar vacíos')
return v.strip()
class MultiPedimentoCompletoSchema(BaseModel):
"""Esquema para procesar múltiples pedimentos de forma asíncrona"""
organizacion: str = Field(..., description="ID de la organización")
pedimentos: List[str] = Field(
...,
description="Lista de IDs de pedimentos a procesar",
min_length=1,
max_length=200
)
@field_validator('organizacion')
def validate_organizacion(cls, v):
if not v or not v.strip():
raise ValueError('La organización no puede estar vacía')
return v.strip()
@field_validator('pedimentos')
def validate_pedimentos(cls, v):
if not v:
raise ValueError('Debe proporcionar al menos un pedimento')
if len(v) > 200:
raise ValueError(f'Máximo 200 pedimentos por solicitud. Recibidos: {len(v)}')
# Eliminar duplicados y vacíos
pedimentos_limpios = [p.strip() for p in v if p and p.strip()]
if not pedimentos_limpios:
raise ValueError('La lista de pedimentos no puede estar vacía')
# Eliminar duplicados
return list(set(pedimentos_limpios))
class Config:
json_schema_extra = {
"example": {
"organizacion": "1",
"pedimentos": ["123", "456", "789"]
}
}

169
tasks.py
View File

@@ -2,7 +2,7 @@ from celery import Celery
from celery_app import celery_app
import asyncio
import logging
from typing import Dict, Any
from typing import Dict, Any, List
from contextlib import asynccontextmanager
from controllers.RESTController import rest_controller
from controllers.SOAPController import soap_controller
@@ -158,6 +158,173 @@ def pedimento_completo_task(self, request_data: Dict[str, Any]):
return run_async_task(_execute_pedimento_completo)
@celery_app.task(bind=True, name='tasks.multi_pedimento_completo_task')
def multi_pedimento_completo_task(self, pedimentos: List[str], organizacion: str):
"""
Tarea asíncrona para procesar MÚLTIPLES pedimentos completos.
Args:
pedimentos: Lista de IDs de pedimentos a procesar
organizacion: ID de la organización
"""
import time
from datetime import datetime
start_time = time.time()
results = {
"total": len(pedimentos),
"successful": [],
"failed": [],
"started_at": datetime.utcnow().isoformat()
}
total = len(pedimentos)
for idx, pedimento_id in enumerate(pedimentos, 1):
try:
# Actualizar progreso (igual que en tus otras tareas)
self.update_state(
state='PROGRESS',
meta={
'status': f'Procesando pedimento {idx}/{total}',
'current': idx,
'total': total,
'current_pedimento': pedimento_id,
'percentage': round((idx / total) * 100, 2)
}
)
logger.info(f"[MULTI] Procesando pedimento {idx}/{total}: {pedimento_id}")
# Preparar datos exactamente como lo espera la tarea individual
request_data = {
"pedimento": pedimento_id,
"organizacion": organizacion
}
# Reutilizar la lógica de la tarea individual
# Esto ejecuta el mismo código que tu endpoint individual
async def _execute():
return await _execute_pedimento_completo_logic(request_data)
result = run_async_task(_execute)
results["successful"].append({
"pedimento_id": pedimento_id,
"result": result
})
logger.info(f"[MULTI] Pedimento {pedimento_id} procesado exitosamente")
except Exception as e:
logger.error(f"[MULTI] Error procesando pedimento {pedimento_id}: {e}")
results["failed"].append({
"pedimento_id": pedimento_id,
"error": str(e)
})
elapsed_time = time.time() - start_time
results["completed_at"] = datetime.utcnow().isoformat()
results["elapsed_seconds"] = round(elapsed_time, 2)
results["success_count"] = len(results["successful"])
results["failed_count"] = len(results["failed"])
return results
async def _execute_pedimento_completo_logic(request_data: dict) -> dict:
"""
Lógica compartida para procesar un pedimento completo.
Esta es la misma lógica que usa tu endpoint individual.
"""
operation_name = "pedimento_completo"
service_data = None
try:
logger.info(f"Procesando pedimento completo - Pedimento: {request_data['pedimento']}")
# Validar datos de entrada
await _validate_request_data(request_data)
# Obtener servicio
service_data = await _get_pedimento_service(
pedimento_id=request_data['pedimento'],
service_type=3,
operation_name=operation_name
)
# Actualizar estado a "En proceso"
update_success = await _update_service_status(
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
)
if not update_success:
raise Exception("Error al actualizar estado del servicio")
# Obtener credenciales VUCEM
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
if not contribuyente_id:
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
raise Exception("ID de contribuyente no encontrado")
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
# Procesar petición SOAP
soap_response = await get_soap_pedimento_completo(
credenciales=credentials,
response_service=service_data,
soap_controller=soap_controller
)
if not soap_response:
raise Exception("Error en la petición SOAP")
# Actualizar datos del pedimento
xml_content = soap_response.get('xml_content', {})
if xml_content:
update_content = {k: v for k, v in xml_content.items() if k != 'identificadores_ed'}
update_content['existe_expediente'] = True
await rest_controller.put_pedimento(
service_data['pedimento']['id'],
update_content
)
# Procesar COVEs
coves = xml_content.get('coves', [])
if coves:
await _post_coves(
response_service=service_data,
coves=coves
)
# Procesar documentos digitalizados
identificadores_ed = xml_content.get('identificadores_ed', [])
if identificadores_ed:
await _post_edocuments(
response_service=service_data,
identificadores_ed=identificadores_ed
)
# Finalizar servicio
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
return {
"success": True,
"pedimento_id": request_data['pedimento'],
"message": "Pedimento procesado exitosamente"
}
except Exception as e:
logger.error(f"Error en pedimento {request_data['pedimento']}: {e}")
if service_data:
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
return {
"success": False,
"pedimento_id": request_data['pedimento'],
"error": str(e)
}
@celery_app.task(bind=True)
def partidas_task(self, **kwargs):
"""Tarea asíncrona para obtener partidas"""