6 Commits

15 changed files with 524 additions and 118 deletions

View File

@@ -1,11 +1,11 @@
from fastapi import APIRouter, HTTPException, BackgroundTasks from fastapi import APIRouter, HTTPException, BackgroundTasks
from schemas.pedimentoSchema import PedimentoRequest from schemas.pedimentoSchema import PedimentoRequest
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from celery_app import celery_app from celery_app import celery_app
from tasks import pedimento_completo_task from tasks import pedimento_completo_task
import logging import logging
from typing import Dict, Any from typing import Dict, Any, List
import uuid import uuid
from datetime import datetime from datetime import datetime
@@ -63,6 +63,54 @@ async def async_get_pedimento_completo(request: ServiceRemesaSchema):
status_code=500, status_code=500,
detail=f"Error al agendar la tarea: {str(e)}" detail=f"Error al agendar la tarea: {str(e)}"
) )
@router.post("/async/services/pedimento_completo/multiple")
async def async_multi_pedimento_completo(request: MultiPedimentoCompletoSchema):
"""
Agenda la tarea de obtener pedimentos completos de forma asíncrona para MÚLTIPLES pedimentos.
Args:
request: MultiPedimentoCompletoSchema con lista de pedimentos y organización
Returns:
JSONResponse con task_id para consultar el estado de la tarea
Raises:
HTTPException: En caso de errores de validación
"""
try:
pedimentos_ids = request.pedimentos
organizacion = request.organizacion
logger.info(f"Agendando tarea para {len(pedimentos_ids)} pedimentos completos")
# Agendar la tarea en Celery
from tasks import multi_pedimento_completo_task
task = multi_pedimento_completo_task.delay(pedimentos_ids, organizacion)
# Crear respuesta inmediata (EXACTAMENTE igual a tus otras respuestas)
response_data = {
"success": True,
"message": f"Tarea agendada exitosamente para {len(pedimentos_ids)} pedimentos. La consulta de pedimentos completos se está procesando en segundo plano.",
"task_id": task.id,
"total_pedimentos": len(pedimentos_ids),
"pedimentos": pedimentos_ids,
"organizacion": organizacion,
"status": "PENDING",
"timestamp": datetime.utcnow().isoformat(),
"estimated_completion": f"{len(pedimentos_ids) * 2}-{len(pedimentos_ids) * 5} minutos",
"check_status_url": f"/async/task-status/{task.id}"
}
logger.info(f"Tarea múltiple agendada exitosamente - Task ID: {task.id}")
return JSONResponse(content=response_data, status_code=202)
except Exception as e:
logger.error(f"Error al agendar tarea múltiple de pedimento completo: {e}")
raise HTTPException(
status_code=500,
detail=f"Error al agendar la tarea: {str(e)}"
)
@router.get("/async/task-status/{task_id}") @router.get("/async/task-status/{task_id}")
async def get_task_status(task_id: str): async def get_task_status(task_id: str):

View File

@@ -1,6 +1,6 @@
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException
from schemas.pedimentoSchema import PedimentoRequest from schemas.pedimentoSchema import PedimentoRequest
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
import asyncio import asyncio
import logging import logging
logger = logging.getLogger("app.api") logger = logging.getLogger("app.api")

View File

@@ -20,9 +20,11 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
organizacion_id = pedimento_info.get('organizacion') organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A') pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}") logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}")
loop.run_until_complete( loop.run_until_complete(
register_task( register_task(
@@ -73,15 +75,19 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
# En caso de error, actualizar estado # En caso de error, actualizar estado
error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}"
logging.error(error_message) logging.error(error_message)
loop.run_until_complete( try:
update_task( loop.run_until_complete(
task_id=task_id, update_task(
status="failed", task_id=task_id,
message=error_message, status="failed",
pedimento_id=pedimento_id, message=error_message,
organizacion_id=organizacion_id, pedimento_id=pedimento_id,
servicio=6 organizacion_id=organizacion_id,
servicio=6
)
) )
) except Exception as update_error:
logging.error(f"Error al actualizar estado de tarea: {update_error}")
raise raise
finally:
loop.close()

View File

@@ -1,6 +1,5 @@
import asyncio import asyncio
import logging import logging
import time
from celery import Celery from celery import Celery
from celery_app import celery_app from celery_app import celery_app
from typing import Dict, Any from typing import Dict, Any
@@ -27,9 +26,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[COVE] Registrando inicio de tarea {task_id}") logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
loop.run_until_complete( asyncio.run(
register_task( register_task(
task_id=task_id, task_id=task_id,
status="submitted", status="submitted",
@@ -40,12 +38,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
) )
) )
# Esperar un momento breve para asegurar que el registro se complete
time.sleep(1)
# Actualizar estado: procesando
logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}") logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}")
loop.run_until_complete( asyncio.run(
update_task( update_task(
task_id=task_id, task_id=task_id,
status="processing", status="processing",
@@ -56,11 +50,9 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
) )
) )
# Obtener el COVE cove_response = asyncio.run(consume_ws_get_cove(**cove_request))
cove_response = loop.run_until_complete(consume_ws_get_cove(**cove_request))
# Actualizar estado: completado asyncio.run(
loop.run_until_complete(
update_task( update_task(
task_id=task_id, task_id=task_id,
status="completed", status="completed",
@@ -74,19 +66,23 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
return {"status": "processed", "data": cove_response} return {"status": "processed", "data": cove_response}
except Exception as e: except Exception as e:
# En caso de error, actualizar estado
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message) logger.error(error_message)
loop.run_until_complete(
update_task( try:
task_id=task_id, asyncio.run(
status="failed", update_task(
message=error_message, task_id=task_id,
pedimento_id=pedimento_id, status="failed",
organizacion_id=organizacion_id, message=error_message,
servicio=8 pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=8
)
) )
) except Exception as update_error:
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
raise raise
@@ -105,9 +101,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}") logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
loop.run_until_complete( asyncio.run(
register_task( register_task(
task_id=task_id, task_id=task_id,
status="submitted", status="submitted",
@@ -118,12 +113,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
) )
) )
# Esperar un momento breve para asegurar que el registro se complete
time.sleep(1)
# Actualizar estado: procesando
logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}") logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}")
loop.run_until_complete( asyncio.run(
update_task( update_task(
task_id=task_id, task_id=task_id,
status="processing", status="processing",
@@ -134,11 +125,9 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
) )
) )
# Obtener el acuse del COVE acuse_response = asyncio.run(consume_ws_get_acuse_cove(**cove_request))
acuse_response = loop.run_until_complete(consume_ws_get_acuse_cove(**cove_request))
# Actualizar estado: completado asyncio.run(
loop.run_until_complete(
update_task( update_task(
task_id=task_id, task_id=task_id,
status="completed", status="completed",
@@ -152,18 +141,21 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
return {"status": "processed", "data": acuse_response} return {"status": "processed", "data": acuse_response}
except Exception as e: except Exception as e:
# En caso de error, actualizar estado
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message) logger.error(error_message)
loop.run_until_complete(
update_task( try:
task_id=task_id, asyncio.run(
status="failed", update_task(
message=error_message, task_id=task_id,
pedimento_id=pedimento_id, status="failed",
organizacion_id=organizacion_id, message=error_message,
servicio=9 pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=9
)
) )
) except Exception as update_error:
raise logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
raise

View File

@@ -23,9 +23,11 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
edoc_info = edoc_data.get('edoc', {}) edoc_info = edoc_data.get('edoc', {})
edoc_number = edoc_info.get('numero_edoc', 'N/A') edoc_number = edoc_info.get('numero_edoc', 'N/A')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}") logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
loop.run_until_complete( loop.run_until_complete(
register_task( register_task(
@@ -74,16 +76,23 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
except Exception as e: except Exception as e:
# En caso de error, actualizar estado # En caso de error, actualizar estado
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message) logger.error(error_message, exc_info=True)
loop.run_until_complete(
update_task( try:
task_id=task_id, loop.run_until_complete(
status="failed", update_task(
message=error_message, task_id=task_id,
pedimento_id=pedimento_id, status="failed",
organizacion_id=organizacion_id, message=error_message,
servicio=3 pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=3
)
) )
) except Exception as update_error:
logger.error(f"Error actualizando estado de tarea: {update_error}")
raise raise
finally:
# Cerrar el loop para liberar recursos
loop.close()

View File

@@ -24,9 +24,12 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
partida_info = partida_request.get('partida', {}) partida_info = partida_request.get('partida', {})
partida_numero = partida_info.get('numero', 'N/A') partida_numero = partida_info.get('numero', 'N/A')
# Crear un nuevo event loop para esta tarea
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[PARTIDA] Registrando inicio de tarea {task_id}") logger.info(f"[PARTIDA] Registrando inicio de tarea {task_id}")
loop.run_until_complete( loop.run_until_complete(
register_task( register_task(
@@ -38,9 +41,6 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
servicio=4 # 4 corresponde a "Pedimento Partidas" servicio=4 # 4 corresponde a "Pedimento Partidas"
) )
) )
# Esperar un momento breve para asegurar que el registro se complete
time.sleep(1)
# Actualizar estado: procesando # Actualizar estado: procesando
logger.info(f"[PARTIDA] Actualizando estado a processing para tarea {task_id}") logger.info(f"[PARTIDA] Actualizando estado a processing para tarea {task_id}")
@@ -76,15 +76,20 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
# En caso de error, actualizar estado # En caso de error, actualizar estado
error_message = f"Error al procesar partida {partida_numero} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar partida {partida_numero} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message) logger.error(error_message)
loop.run_until_complete( try:
update_task( loop.run_until_complete(
task_id=task_id, update_task(
status="failed", task_id=task_id,
message=error_message, status="failed",
pedimento_id=pedimento_id, message=error_message,
organizacion_id=organizacion_id, pedimento_id=pedimento_id,
servicio=4 organizacion_id=organizacion_id,
servicio=4
)
) )
) except Exception as update_error:
logger.error(f"Error al actualizar estado de tarea: {update_error}")
raise raise
finally:
# Limpiar el event loop
loop.close()

View File

@@ -6,8 +6,9 @@ from .tasks import process_pedimento_completo_request
from .services import put_pedimento_data_vu from .services import put_pedimento_data_vu
from api.api_v2.modules.tasks.services import register_task from api.api_v2.modules.tasks.services import register_task
import logging import logging
logger = logging.getLogger("app.api") from celery import group, chord, chain
logger = logging.getLogger("app.api")
router = APIRouter() router = APIRouter()
@router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED) @router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED)

View File

@@ -1,4 +1,4 @@
from typing import Optional, Union, Dict, Any from typing import Optional, Union, Dict, Any, List
from uuid import UUID from uuid import UUID
from datetime import datetime from datetime import datetime
# CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione # CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione

View File

@@ -17,7 +17,9 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
Returns: Returns:
dict: Resultado del procesamiento con estado y detalles dict: Resultado del procesamiento con estado y detalles
""" """
loop = asyncio.get_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task_id = self.request.id task_id = self.request.id
servicio = 3 # Código para Pedimento Completo servicio = 3 # Código para Pedimento Completo
pedimento_id = pedimento_data.get('pedimento', {}).get('id') pedimento_id = pedimento_data.get('pedimento', {}).get('id')
@@ -84,4 +86,6 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
logger.error(f"Error actualizando estado de tarea: {update_error}") logger.error(f"Error actualizando estado de tarea: {update_error}")
# Re-lanzar la excepción para que Celery la marque como fallida # Re-lanzar la excepción para que Celery la marque como fallida
raise raise
finally:
loop.close()

View File

@@ -88,15 +88,49 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
# Generar nombre de archivo # Generar nombre de archivo
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml" file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml"
if soap_error(soap_response): if soap_error(soap_response):
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
document_response = await remesa_rest_controller.post_document( file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
soap_response=soap_response, document_response = await remesa_rest_controller.post_document(
organizacion=pedimento_data.get('organizacion'), soap_response=soap_response,
pedimento=pedimento_data.get('id'), organizacion=pedimento_data.get('organizacion'),
file_name=file_name, pedimento=pedimento_data.get('id'),
document_type=16, file_name=file_name,
) document_type=16,
raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP") )
# Aquí necesitamos extraer el mensaje de error real
error_message = "Error en la respuesta del servicio SOAP"
# Intentar extraer mensaje de error del XML de respuesta
if hasattr(soap_response, 'text') and soap_response.text:
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(soap_response.text)
# Buscar mensajes de error comunes en respuestas SOAP de VUCEM
# Esto puede variar según el servicio, pero comúnmente buscan:
for fault in root.findall('.//{http://schemas.xmlsoap.org/soap/envelope/}Fault'):
faultcode = fault.find('.//faultcode')
faultstring = fault.find('.//faultstring')
if faultstring is not None and faultstring.text:
error_message = faultstring.text
break
# También podría estar en una estructura de error específica de VUCEM
for error in root.findall('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}error'):
msg = error.find('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}message')
if msg is not None and msg.text:
error_message = msg.text
break
except Exception as parse_error:
logger.error(f"Error al parsear respuesta SOAP para extraer mensaje: {parse_error}")
# Lanzar excepción con el mensaje de error real
raise HTTPException(
status_code=500,
detail=f"Error en la respuesta del servicio SOAP: {error_message}"
)
# Enviar documento # Enviar documento
try: try:

View File

@@ -17,13 +17,16 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
Returns: Returns:
dict: Resultado del procesamiento con estado y detalles dict: Resultado del procesamiento con estado y detalles
""" """
loop = asyncio.get_event_loop()
task_id = self.request.id task_id = self.request.id
servicio = 5 # Código para Pedimento Remesas servicio = 5 # Código para Pedimento Remesas
pedimento_id = remesa_request.get('pedimento', {}).get('id') pedimento_id = remesa_request.get('pedimento', {}).get('id')
organizacion_id = remesa_request.get('pedimento', {}).get('organizacion') organizacion_id = remesa_request.get('pedimento', {}).get('organizacion')
remesa_num = remesa_request.get('remesa', 'N/A') remesa_num = remesa_request.get('remesa', 'N/A')
# Crear un NUEVO event loop para esta tarea (evita problemas de loop cerrado)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
# Actualizar estado a processing # Actualizar estado a processing
loop.run_until_complete( loop.run_until_complete(
@@ -49,7 +52,6 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
pedimento_id=pedimento_id, pedimento_id=pedimento_id,
organizacion_id=organizacion_id, organizacion_id=organizacion_id,
servicio=servicio, servicio=servicio,
result=result
) )
) )
@@ -60,20 +62,43 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
# Actualizar estado a failed # Actualizar estado a failed
try: try:
loop.run_until_complete( # Verificar si el loop aún está abierto
update_task( if not loop.is_closed():
task_id=task_id, loop.run_until_complete(
message=f"Error al procesar remesa {remesa_num}: {str(e)}", update_task(
status="failed", task_id=task_id,
pedimento_id=pedimento_id, message=f"Error al procesar remesa {remesa_num}: {str(e)}",
organizacion_id=organizacion_id, status="failed",
servicio=servicio, pedimento_id=pedimento_id,
error=str(e) organizacion_id=organizacion_id,
servicio=servicio,
)
) )
) else:
# Si el loop está cerrado, crear uno nuevo temporal
logger.warning(f"Loop cerrado, creando loop temporal para actualizar error")
temp_loop = asyncio.new_event_loop()
asyncio.set_event_loop(temp_loop)
try:
temp_loop.run_until_complete(
update_task(
task_id=task_id,
message=f"Error al procesar remesa {remesa_num}: {str(e)}",
status="failed",
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=servicio
)
)
finally:
temp_loop.close()
except Exception as update_error: except Exception as update_error:
logger.error(f"Error actualizando estado de tarea: {update_error}") logger.error(f"Error actualizando estado de tarea: {update_error}")
# Re-lanzar la excepción para que Celery la marque como fallida # Re-lanzar la excepción para que Celery la marque como fallida
raise raise
finally:
# Limpiar el event loop
if not loop.is_closed():
loop.close()

View File

@@ -55,8 +55,16 @@ async def update_task(
json=update_data, json=update_data,
headers=headers headers=headers
) )
if response.status_code == 404:
logger.warning(f"Tarea {task_id} no encontrada, intentando crearla...")
return await _create_and_update_task(
task_id, message, status, pedimento_id, organizacion_id, servicio
)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
except httpx.HTTPError as e: except httpx.HTTPError as e:
logger.error(f"Error al actualizar tarea {task_id}: {str(e)}") logger.error(f"Error al actualizar tarea {task_id}: {str(e)}")
raise HTTPException( raise HTTPException(
@@ -81,6 +89,72 @@ async def update_task(
) )
) )
async def _create_and_update_task(
task_id: str,
message: str,
status: str,
pedimento_id: str,
organizacion_id: str,
servicio: int
) -> Dict[str, Any]:
"""
Función interna para crear una tarea y luego actualizarla.
"""
try:
# Primero crear la tarea
logger.info(f"Creando tarea {task_id} antes de actualizar")
headers = {
"Authorization": f"Token {settings.API_TOKEN}"
}
create_data = {
"task_id": task_id,
"message": message,
"status": status,
"pedimento": pedimento_id,
"organizacion": organizacion_id,
"servicio": servicio
}
async with httpx.AsyncClient() as client:
# Crear la tarea
create_response = await client.post(
f"{settings.API_URL}/tasks/tasks/",
json=create_data,
headers=headers
)
create_response.raise_for_status()
logger.info(f"Tarea {task_id} creada exitosamente")
# Actualizar la tarea recién creada
url = f"{settings.API_URL}/tasks/tasks/{task_id}/"
update_response = await client.put(
url,
json=create_data,
headers=headers
)
update_response.raise_for_status()
logger.info(f"Tarea {task_id} actualizada exitosamente después de crear")
return update_response.json()
except httpx.HTTPError as e:
logger.error(f"Error al crear/actualizar tarea {task_id}: {str(e)}")
raise HTTPException(
status_code=500,
detail=create_error_response(
message="Error al crear la tarea",
errors=[str(e)],
metadata={
"task_id": task_id,
"status": status
}
)
)
async def register_task( async def register_task(
task_id: str, task_id: str,
message: str, message: str,
@@ -154,4 +228,4 @@ async def register_task(
errors=[str(e)], errors=[str(e)],
metadata={"task_id": task_id} metadata={"task_id": task_id}
) )
) )

View File

@@ -46,7 +46,7 @@ class APIRESTController:
""" """
Método asíncrono para hacer peticiones a la API usando httpx. Método asíncrono para hacer peticiones a la API usando httpx.
""" """
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/') }" url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
logger.warning(f"Realizando petición {method} a {url}") logger.warning(f"Realizando petición {method} a {url}")
try: try:
@@ -242,7 +242,10 @@ class APIController:
""" """
Método para obtener la lista de servicios desde la API. Método para obtener la lista de servicios desde la API.
""" """
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}') # return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}')
# eliminar filtro de estado, estado tiene 4 tipos 1: En espera, 2: Procesando, 3: Finalizado y 4: Error
# lo elimine para poder ejecutar la operacion siempre que sea necesario
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&servicio={service_type}')
async def get_pedimento(self, pedimento_id: str) -> Dict[str, Any]: async def get_pedimento(self, pedimento_id: str) -> Dict[str, Any]:
""" """
@@ -497,11 +500,13 @@ class APIController:
try: try:
async with httpx.AsyncClient(timeout=self.timeout) as client: async with httpx.AsyncClient(timeout=self.timeout) as client:
logger.info(f"Haciendo petición {method} a {url}") logger.info(f"Haciendo petición {method} a {url}")
print(f"Haciendo petición {method} a {url}")
if method.upper() == 'GET': if method.upper() == 'GET':
response = await client.get(url, headers=self.headers) response = await client.get(url, headers=self.headers)
elif method.upper() == 'POST': elif method.upper() == 'POST':
response = await client.post(url, json=data, headers=self.headers) response = await client.post(url, json=data, headers=self.headers)
print(f"response >>>> {response}")
elif method.upper() == 'PUT': elif method.upper() == 'PUT':
response = await client.put(url, json=data, headers=self.headers) response = await client.put(url, json=data, headers=self.headers)
elif method.upper() == 'DELETE': elif method.upper() == 'DELETE':

View File

@@ -1,6 +1,5 @@
from pydantic import BaseModel, Field, field_validator from pydantic import BaseModel, Field, field_validator
from typing import Optional from typing import Optional, List, Union
class ServiceBaseSchema(BaseModel): class ServiceBaseSchema(BaseModel):
"""Esquema base para servicios con campos comunes""" """Esquema base para servicios con campos comunes"""
@@ -43,4 +42,41 @@ class ServiceRemesaSchema(BaseModel):
def validate_string_fields(cls, v): def validate_string_fields(cls, v):
if not v or not v.strip(): if not v or not v.strip():
raise ValueError('Los campos de texto no pueden estar vacíos') raise ValueError('Los campos de texto no pueden estar vacíos')
return v.strip() return v.strip()
class MultiPedimentoCompletoSchema(BaseModel):
"""Esquema para procesar múltiples pedimentos de forma asíncrona"""
organizacion: str = Field(..., description="ID de la organización")
pedimentos: List[str] = Field(
...,
description="Lista de IDs de pedimentos a procesar",
min_length=1,
max_length=200
)
@field_validator('organizacion')
def validate_organizacion(cls, v):
if not v or not v.strip():
raise ValueError('La organización no puede estar vacía')
return v.strip()
@field_validator('pedimentos')
def validate_pedimentos(cls, v):
if not v:
raise ValueError('Debe proporcionar al menos un pedimento')
if len(v) > 200:
raise ValueError(f'Máximo 200 pedimentos por solicitud. Recibidos: {len(v)}')
# Eliminar duplicados y vacíos
pedimentos_limpios = [p.strip() for p in v if p and p.strip()]
if not pedimentos_limpios:
raise ValueError('La lista de pedimentos no puede estar vacía')
# Eliminar duplicados
return list(set(pedimentos_limpios))
class Config:
json_schema_extra = {
"example": {
"organizacion": "1",
"pedimentos": ["123", "456", "789"]
}
}

169
tasks.py
View File

@@ -2,7 +2,7 @@ from celery import Celery
from celery_app import celery_app from celery_app import celery_app
import asyncio import asyncio
import logging import logging
from typing import Dict, Any from typing import Dict, Any, List
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from controllers.RESTController import rest_controller from controllers.RESTController import rest_controller
from controllers.SOAPController import soap_controller from controllers.SOAPController import soap_controller
@@ -158,6 +158,173 @@ def pedimento_completo_task(self, request_data: Dict[str, Any]):
return run_async_task(_execute_pedimento_completo) return run_async_task(_execute_pedimento_completo)
@celery_app.task(bind=True, name='tasks.multi_pedimento_completo_task')
def multi_pedimento_completo_task(self, pedimentos: List[str], organizacion: str):
"""
Tarea asíncrona para procesar MÚLTIPLES pedimentos completos.
Args:
pedimentos: Lista de IDs de pedimentos a procesar
organizacion: ID de la organización
"""
import time
from datetime import datetime
start_time = time.time()
results = {
"total": len(pedimentos),
"successful": [],
"failed": [],
"started_at": datetime.utcnow().isoformat()
}
total = len(pedimentos)
for idx, pedimento_id in enumerate(pedimentos, 1):
try:
# Actualizar progreso (igual que en tus otras tareas)
self.update_state(
state='PROGRESS',
meta={
'status': f'Procesando pedimento {idx}/{total}',
'current': idx,
'total': total,
'current_pedimento': pedimento_id,
'percentage': round((idx / total) * 100, 2)
}
)
logger.info(f"[MULTI] Procesando pedimento {idx}/{total}: {pedimento_id}")
# Preparar datos exactamente como lo espera la tarea individual
request_data = {
"pedimento": pedimento_id,
"organizacion": organizacion
}
# Reutilizar la lógica de la tarea individual
# Esto ejecuta el mismo código que tu endpoint individual
async def _execute():
return await _execute_pedimento_completo_logic(request_data)
result = run_async_task(_execute)
results["successful"].append({
"pedimento_id": pedimento_id,
"result": result
})
logger.info(f"[MULTI] Pedimento {pedimento_id} procesado exitosamente")
except Exception as e:
logger.error(f"[MULTI] Error procesando pedimento {pedimento_id}: {e}")
results["failed"].append({
"pedimento_id": pedimento_id,
"error": str(e)
})
elapsed_time = time.time() - start_time
results["completed_at"] = datetime.utcnow().isoformat()
results["elapsed_seconds"] = round(elapsed_time, 2)
results["success_count"] = len(results["successful"])
results["failed_count"] = len(results["failed"])
return results
async def _execute_pedimento_completo_logic(request_data: dict) -> dict:
"""
Lógica compartida para procesar un pedimento completo.
Esta es la misma lógica que usa tu endpoint individual.
"""
operation_name = "pedimento_completo"
service_data = None
try:
logger.info(f"Procesando pedimento completo - Pedimento: {request_data['pedimento']}")
# Validar datos de entrada
await _validate_request_data(request_data)
# Obtener servicio
service_data = await _get_pedimento_service(
pedimento_id=request_data['pedimento'],
service_type=3,
operation_name=operation_name
)
# Actualizar estado a "En proceso"
update_success = await _update_service_status(
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
)
if not update_success:
raise Exception("Error al actualizar estado del servicio")
# Obtener credenciales VUCEM
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
if not contribuyente_id:
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
raise Exception("ID de contribuyente no encontrado")
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
# Procesar petición SOAP
soap_response = await get_soap_pedimento_completo(
credenciales=credentials,
response_service=service_data,
soap_controller=soap_controller
)
if not soap_response:
raise Exception("Error en la petición SOAP")
# Actualizar datos del pedimento
xml_content = soap_response.get('xml_content', {})
if xml_content:
update_content = {k: v for k, v in xml_content.items() if k != 'identificadores_ed'}
update_content['existe_expediente'] = True
await rest_controller.put_pedimento(
service_data['pedimento']['id'],
update_content
)
# Procesar COVEs
coves = xml_content.get('coves', [])
if coves:
await _post_coves(
response_service=service_data,
coves=coves
)
# Procesar documentos digitalizados
identificadores_ed = xml_content.get('identificadores_ed', [])
if identificadores_ed:
await _post_edocuments(
response_service=service_data,
identificadores_ed=identificadores_ed
)
# Finalizar servicio
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
return {
"success": True,
"pedimento_id": request_data['pedimento'],
"message": "Pedimento procesado exitosamente"
}
except Exception as e:
logger.error(f"Error en pedimento {request_data['pedimento']}: {e}")
if service_data:
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
return {
"success": False,
"pedimento_id": request_data['pedimento'],
"error": str(e)
}
@celery_app.task(bind=True) @celery_app.task(bind=True)
def partidas_task(self, **kwargs): def partidas_task(self, **kwargs):
"""Tarea asíncrona para obtener partidas""" """Tarea asíncrona para obtener partidas"""