11 Commits

Author SHA1 Message Date
ed00651a8b Merge pull request 'feature/lectura de credenciales de vucem desde archivos de minIO' (#7) from feature/minio-read-credentials into main
Reviewed-on: #7
2026-04-22 17:38:06 +00:00
Dulce
2e779e83f8 feature/lectura de credenciales de vucem desde archivos de minIO 2026-04-22 11:11:34 -06:00
3cadcbd86f Merge pull request 'fix/forzar el procesamiento de un pedimento cargado por datastage' (#6) from fix/procesar-pedimento into main
Reviewed-on: #6
2026-04-16 13:24:43 +00:00
Dulce
d29cfcb00c fix/forzar el procesamiento de un pedimento cargado por datastage 2026-04-16 07:14:56 -06:00
2aebef8b26 Merge pull request 'solucion tarea acuse' (#5) from asuses-update into main
Reviewed-on: #5
2026-03-27 14:30:29 +00:00
Dulce
f9139c980a solucion tarea acuse 2026-03-27 08:25:43 -06:00
5c55e93d86 Merge pull request 'eliminar fallos de asyncio' (#4) from tareas-segundo-plano into main
Reviewed-on: #4
2026-03-26 18:00:35 +00:00
Dulce
b0cc715eb3 eliminar fallos de asyncio 2026-03-26 11:41:52 -06:00
5f41132f80 Merge pull request 'fix: se modifican las claves del tipo de error de pedimento completo, partidas, coves. Para identidcar el tipo de error correspondiente.' (#3) from T2025-09-004 into main
Reviewed-on: #3
2026-01-29 17:53:12 +00:00
d49747f288 fix: se modifican las claves del tipo de error de pedimento completo, partidas, coves. Para identidcar el tipo de error correspondiente. 2026-01-29 08:04:59 -07:00
47c8bf51c7 Merge pull request 'Fix--Auditor-Microservicio' (#2) from Fix--Auditor-Microservicio into main
Reviewed-on: #2
2026-01-22 22:04:25 +00:00
19 changed files with 632 additions and 140 deletions

View File

@@ -1,11 +1,11 @@
from fastapi import APIRouter, HTTPException, BackgroundTasks from fastapi import APIRouter, HTTPException, BackgroundTasks
from schemas.pedimentoSchema import PedimentoRequest from schemas.pedimentoSchema import PedimentoRequest
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from celery_app import celery_app from celery_app import celery_app
from tasks import pedimento_completo_task from tasks import pedimento_completo_task
import logging import logging
from typing import Dict, Any from typing import Dict, Any, List
import uuid import uuid
from datetime import datetime from datetime import datetime
@@ -63,6 +63,54 @@ async def async_get_pedimento_completo(request: ServiceRemesaSchema):
status_code=500, status_code=500,
detail=f"Error al agendar la tarea: {str(e)}" detail=f"Error al agendar la tarea: {str(e)}"
) )
@router.post("/async/services/pedimento_completo/multiple")
async def async_multi_pedimento_completo(request: MultiPedimentoCompletoSchema):
"""
Agenda la tarea de obtener pedimentos completos de forma asíncrona para MÚLTIPLES pedimentos.
Args:
request: MultiPedimentoCompletoSchema con lista de pedimentos y organización
Returns:
JSONResponse con task_id para consultar el estado de la tarea
Raises:
HTTPException: En caso de errores de validación
"""
try:
pedimentos_ids = request.pedimentos
organizacion = request.organizacion
logger.info(f"Agendando tarea para {len(pedimentos_ids)} pedimentos completos")
# Agendar la tarea en Celery
from tasks import multi_pedimento_completo_task
task = multi_pedimento_completo_task.delay(pedimentos_ids, organizacion)
# Crear respuesta inmediata (EXACTAMENTE igual a tus otras respuestas)
response_data = {
"success": True,
"message": f"Tarea agendada exitosamente para {len(pedimentos_ids)} pedimentos. La consulta de pedimentos completos se está procesando en segundo plano.",
"task_id": task.id,
"total_pedimentos": len(pedimentos_ids),
"pedimentos": pedimentos_ids,
"organizacion": organizacion,
"status": "PENDING",
"timestamp": datetime.utcnow().isoformat(),
"estimated_completion": f"{len(pedimentos_ids) * 2}-{len(pedimentos_ids) * 5} minutos",
"check_status_url": f"/async/task-status/{task.id}"
}
logger.info(f"Tarea múltiple agendada exitosamente - Task ID: {task.id}")
return JSONResponse(content=response_data, status_code=202)
except Exception as e:
logger.error(f"Error al agendar tarea múltiple de pedimento completo: {e}")
raise HTTPException(
status_code=500,
detail=f"Error al agendar la tarea: {str(e)}"
)
@router.get("/async/task-status/{task_id}") @router.get("/async/task-status/{task_id}")
async def get_task_status(task_id: str): async def get_task_status(task_id: str):

View File

@@ -1,6 +1,6 @@
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException
from schemas.pedimentoSchema import PedimentoRequest from schemas.pedimentoSchema import PedimentoRequest
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
import asyncio import asyncio
import logging import logging
logger = logging.getLogger("app.api") logger = logging.getLogger("app.api")

View File

@@ -20,9 +20,11 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
organizacion_id = pedimento_info.get('organizacion') organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A') pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}") logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}")
loop.run_until_complete( loop.run_until_complete(
register_task( register_task(
@@ -73,15 +75,19 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
# En caso de error, actualizar estado # En caso de error, actualizar estado
error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}"
logging.error(error_message) logging.error(error_message)
loop.run_until_complete( try:
update_task( loop.run_until_complete(
task_id=task_id, update_task(
status="failed", task_id=task_id,
message=error_message, status="failed",
pedimento_id=pedimento_id, message=error_message,
organizacion_id=organizacion_id, pedimento_id=pedimento_id,
servicio=6 organizacion_id=organizacion_id,
servicio=6
)
) )
) except Exception as update_error:
logging.error(f"Error al actualizar estado de tarea: {update_error}")
raise raise
finally:
loop.close()

View File

@@ -122,7 +122,7 @@ async def consume_ws_get_cove(**kwargs):
organizacion=kwargs.get('pedimento').get('organizacion'), organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'), pedimento=kwargs.get('pedimento').get('id'),
file_name=f"vu_COVE_{pedimento_app}_{cove}_ERROR.xml", file_name=f"vu_COVE_{pedimento_app}_{cove}_ERROR.xml",
document_type=10, document_type=20,
) )
raise Exception("Error en la respuesta del servicio SOAP") raise Exception("Error en la respuesta del servicio SOAP")
@@ -256,7 +256,7 @@ async def consume_ws_get_acuse_cove(**kwargs):
organizacion=kwargs.get('pedimento').get('organizacion'), organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'), pedimento=kwargs.get('pedimento').get('id'),
file_name=error_file_name, file_name=error_file_name,
document_type=10, document_type=24,
) )
except Exception as e: except Exception as e:
logger.error(f"Error al guardar respuesta SOAP errónea: {e}") logger.error(f"Error al guardar respuesta SOAP errónea: {e}")

View File

@@ -1,6 +1,5 @@
import asyncio import asyncio
import logging import logging
import time
from celery import Celery from celery import Celery
from celery_app import celery_app from celery_app import celery_app
from typing import Dict, Any from typing import Dict, Any
@@ -27,9 +26,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[COVE] Registrando inicio de tarea {task_id}") logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
loop.run_until_complete( asyncio.run(
register_task( register_task(
task_id=task_id, task_id=task_id,
status="submitted", status="submitted",
@@ -40,12 +38,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
) )
) )
# Esperar un momento breve para asegurar que el registro se complete
time.sleep(1)
# Actualizar estado: procesando
logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}") logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}")
loop.run_until_complete( asyncio.run(
update_task( update_task(
task_id=task_id, task_id=task_id,
status="processing", status="processing",
@@ -56,11 +50,9 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
) )
) )
# Obtener el COVE cove_response = asyncio.run(consume_ws_get_cove(**cove_request))
cove_response = loop.run_until_complete(consume_ws_get_cove(**cove_request))
# Actualizar estado: completado asyncio.run(
loop.run_until_complete(
update_task( update_task(
task_id=task_id, task_id=task_id,
status="completed", status="completed",
@@ -74,19 +66,23 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
return {"status": "processed", "data": cove_response} return {"status": "processed", "data": cove_response}
except Exception as e: except Exception as e:
# En caso de error, actualizar estado
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message) logger.error(error_message)
loop.run_until_complete(
update_task( try:
task_id=task_id, asyncio.run(
status="failed", update_task(
message=error_message, task_id=task_id,
pedimento_id=pedimento_id, status="failed",
organizacion_id=organizacion_id, message=error_message,
servicio=8 pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=8
)
) )
) except Exception as update_error:
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
raise raise
@@ -105,9 +101,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}") logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
loop.run_until_complete( asyncio.run(
register_task( register_task(
task_id=task_id, task_id=task_id,
status="submitted", status="submitted",
@@ -118,12 +113,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
) )
) )
# Esperar un momento breve para asegurar que el registro se complete
time.sleep(1)
# Actualizar estado: procesando
logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}") logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}")
loop.run_until_complete( asyncio.run(
update_task( update_task(
task_id=task_id, task_id=task_id,
status="processing", status="processing",
@@ -134,11 +125,9 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
) )
) )
# Obtener el acuse del COVE acuse_response = asyncio.run(consume_ws_get_acuse_cove(**cove_request))
acuse_response = loop.run_until_complete(consume_ws_get_acuse_cove(**cove_request))
# Actualizar estado: completado asyncio.run(
loop.run_until_complete(
update_task( update_task(
task_id=task_id, task_id=task_id,
status="completed", status="completed",
@@ -152,18 +141,21 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
return {"status": "processed", "data": acuse_response} return {"status": "processed", "data": acuse_response}
except Exception as e: except Exception as e:
# En caso de error, actualizar estado
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message) logger.error(error_message)
loop.run_until_complete(
update_task( try:
task_id=task_id, asyncio.run(
status="failed", update_task(
message=error_message, task_id=task_id,
pedimento_id=pedimento_id, status="failed",
organizacion_id=organizacion_id, message=error_message,
servicio=9 pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=9
)
) )
) except Exception as update_error:
raise logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
raise

View File

@@ -23,9 +23,11 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
edoc_info = edoc_data.get('edoc', {}) edoc_info = edoc_data.get('edoc', {})
edoc_number = edoc_info.get('numero_edoc', 'N/A') edoc_number = edoc_info.get('numero_edoc', 'N/A')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}") logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
loop.run_until_complete( loop.run_until_complete(
register_task( register_task(
@@ -74,16 +76,23 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
except Exception as e: except Exception as e:
# En caso de error, actualizar estado # En caso de error, actualizar estado
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message) logger.error(error_message, exc_info=True)
loop.run_until_complete(
update_task( try:
task_id=task_id, loop.run_until_complete(
status="failed", update_task(
message=error_message, task_id=task_id,
pedimento_id=pedimento_id, status="failed",
organizacion_id=organizacion_id, message=error_message,
servicio=3 pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=3
)
) )
) except Exception as update_error:
logger.error(f"Error actualizando estado de tarea: {update_error}")
raise raise
finally:
# Cerrar el loop para liberar recursos
loop.close()

View File

@@ -102,7 +102,7 @@ async def consume_ws_get_partida(**kwargs):
organizacion=kwargs.get('pedimento').get('organizacion'), organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'), pedimento=kwargs.get('pedimento').get('id'),
file_name=error_file_name, file_name=error_file_name,
document_type=10, document_type=18,
) )
except Exception as e: except Exception as e:
logger.error(f"Error al guardar la respuesta de error: {e}") logger.error(f"Error al guardar la respuesta de error: {e}")

View File

@@ -24,9 +24,12 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
partida_info = partida_request.get('partida', {}) partida_info = partida_request.get('partida', {})
partida_numero = partida_info.get('numero', 'N/A') partida_numero = partida_info.get('numero', 'N/A')
# Crear un nuevo event loop para esta tarea
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
# Registrar el inicio de la tarea # Registrar el inicio de la tarea
loop = asyncio.get_event_loop()
logger.info(f"[PARTIDA] Registrando inicio de tarea {task_id}") logger.info(f"[PARTIDA] Registrando inicio de tarea {task_id}")
loop.run_until_complete( loop.run_until_complete(
register_task( register_task(
@@ -38,9 +41,6 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
servicio=4 # 4 corresponde a "Pedimento Partidas" servicio=4 # 4 corresponde a "Pedimento Partidas"
) )
) )
# Esperar un momento breve para asegurar que el registro se complete
time.sleep(1)
# Actualizar estado: procesando # Actualizar estado: procesando
logger.info(f"[PARTIDA] Actualizando estado a processing para tarea {task_id}") logger.info(f"[PARTIDA] Actualizando estado a processing para tarea {task_id}")
@@ -76,15 +76,20 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
# En caso de error, actualizar estado # En caso de error, actualizar estado
error_message = f"Error al procesar partida {partida_numero} para pedimento {pedimento_app}: {str(e)}" error_message = f"Error al procesar partida {partida_numero} para pedimento {pedimento_app}: {str(e)}"
logger.error(error_message) logger.error(error_message)
loop.run_until_complete( try:
update_task( loop.run_until_complete(
task_id=task_id, update_task(
status="failed", task_id=task_id,
message=error_message, status="failed",
pedimento_id=pedimento_id, message=error_message,
organizacion_id=organizacion_id, pedimento_id=pedimento_id,
servicio=4 organizacion_id=organizacion_id,
servicio=4
)
) )
) except Exception as update_error:
logger.error(f"Error al actualizar estado de tarea: {update_error}")
raise raise
finally:
# Limpiar el event loop
loop.close()

View File

@@ -6,8 +6,9 @@ from .tasks import process_pedimento_completo_request
from .services import put_pedimento_data_vu from .services import put_pedimento_data_vu
from api.api_v2.modules.tasks.services import register_task from api.api_v2.modules.tasks.services import register_task
import logging import logging
logger = logging.getLogger("app.api") from celery import group, chord, chain
logger = logging.getLogger("app.api")
router = APIRouter() router = APIRouter()
@router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED) @router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED)

View File

@@ -1,4 +1,4 @@
from typing import Optional, Union, Dict, Any from typing import Optional, Union, Dict, Any, List
from uuid import UUID from uuid import UUID
from datetime import datetime from datetime import datetime
# CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione # CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione

View File

@@ -95,7 +95,7 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
organizacion=pedimento_data.get('organizacion'), organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'), pedimento=pedimento_data.get('id'),
file_name=f"vu_PC_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml", file_name=f"vu_PC_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml",
document_type=10, document_type=14,
) )
raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP") raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP")

View File

@@ -17,7 +17,9 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
Returns: Returns:
dict: Resultado del procesamiento con estado y detalles dict: Resultado del procesamiento con estado y detalles
""" """
loop = asyncio.get_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task_id = self.request.id task_id = self.request.id
servicio = 3 # Código para Pedimento Completo servicio = 3 # Código para Pedimento Completo
pedimento_id = pedimento_data.get('pedimento', {}).get('id') pedimento_id = pedimento_data.get('pedimento', {}).get('id')
@@ -84,4 +86,6 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
logger.error(f"Error actualizando estado de tarea: {update_error}") logger.error(f"Error actualizando estado de tarea: {update_error}")
# Re-lanzar la excepción para que Celery la marque como fallida # Re-lanzar la excepción para que Celery la marque como fallida
raise raise
finally:
loop.close()

View File

@@ -88,15 +88,49 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
# Generar nombre de archivo # Generar nombre de archivo
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml" file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml"
if soap_error(soap_response): if soap_error(soap_response):
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
document_response = await remesa_rest_controller.post_document( file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
soap_response=soap_response, document_response = await remesa_rest_controller.post_document(
organizacion=pedimento_data.get('organizacion'), soap_response=soap_response,
pedimento=pedimento_data.get('id'), organizacion=pedimento_data.get('organizacion'),
file_name=file_name, pedimento=pedimento_data.get('id'),
document_type=10, file_name=file_name,
) document_type=16,
raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP") )
# Aquí necesitamos extraer el mensaje de error real
error_message = "Error en la respuesta del servicio SOAP"
# Intentar extraer mensaje de error del XML de respuesta
if hasattr(soap_response, 'text') and soap_response.text:
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(soap_response.text)
# Buscar mensajes de error comunes en respuestas SOAP de VUCEM
# Esto puede variar según el servicio, pero comúnmente buscan:
for fault in root.findall('.//{http://schemas.xmlsoap.org/soap/envelope/}Fault'):
faultcode = fault.find('.//faultcode')
faultstring = fault.find('.//faultstring')
if faultstring is not None and faultstring.text:
error_message = faultstring.text
break
# También podría estar en una estructura de error específica de VUCEM
for error in root.findall('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}error'):
msg = error.find('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}message')
if msg is not None and msg.text:
error_message = msg.text
break
except Exception as parse_error:
logger.error(f"Error al parsear respuesta SOAP para extraer mensaje: {parse_error}")
# Lanzar excepción con el mensaje de error real
raise HTTPException(
status_code=500,
detail=f"Error en la respuesta del servicio SOAP: {error_message}"
)
# Enviar documento # Enviar documento
try: try:

View File

@@ -17,13 +17,16 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
Returns: Returns:
dict: Resultado del procesamiento con estado y detalles dict: Resultado del procesamiento con estado y detalles
""" """
loop = asyncio.get_event_loop()
task_id = self.request.id task_id = self.request.id
servicio = 5 # Código para Pedimento Remesas servicio = 5 # Código para Pedimento Remesas
pedimento_id = remesa_request.get('pedimento', {}).get('id') pedimento_id = remesa_request.get('pedimento', {}).get('id')
organizacion_id = remesa_request.get('pedimento', {}).get('organizacion') organizacion_id = remesa_request.get('pedimento', {}).get('organizacion')
remesa_num = remesa_request.get('remesa', 'N/A') remesa_num = remesa_request.get('remesa', 'N/A')
# Crear un NUEVO event loop para esta tarea (evita problemas de loop cerrado)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
# Actualizar estado a processing # Actualizar estado a processing
loop.run_until_complete( loop.run_until_complete(
@@ -49,7 +52,6 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
pedimento_id=pedimento_id, pedimento_id=pedimento_id,
organizacion_id=organizacion_id, organizacion_id=organizacion_id,
servicio=servicio, servicio=servicio,
result=result
) )
) )
@@ -60,20 +62,43 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
# Actualizar estado a failed # Actualizar estado a failed
try: try:
loop.run_until_complete( # Verificar si el loop aún está abierto
update_task( if not loop.is_closed():
task_id=task_id, loop.run_until_complete(
message=f"Error al procesar remesa {remesa_num}: {str(e)}", update_task(
status="failed", task_id=task_id,
pedimento_id=pedimento_id, message=f"Error al procesar remesa {remesa_num}: {str(e)}",
organizacion_id=organizacion_id, status="failed",
servicio=servicio, pedimento_id=pedimento_id,
error=str(e) organizacion_id=organizacion_id,
servicio=servicio,
)
) )
) else:
# Si el loop está cerrado, crear uno nuevo temporal
logger.warning(f"Loop cerrado, creando loop temporal para actualizar error")
temp_loop = asyncio.new_event_loop()
asyncio.set_event_loop(temp_loop)
try:
temp_loop.run_until_complete(
update_task(
task_id=task_id,
message=f"Error al procesar remesa {remesa_num}: {str(e)}",
status="failed",
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=servicio
)
)
finally:
temp_loop.close()
except Exception as update_error: except Exception as update_error:
logger.error(f"Error actualizando estado de tarea: {update_error}") logger.error(f"Error actualizando estado de tarea: {update_error}")
# Re-lanzar la excepción para que Celery la marque como fallida # Re-lanzar la excepción para que Celery la marque como fallida
raise raise
finally:
# Limpiar el event loop
if not loop.is_closed():
loop.close()

View File

@@ -55,8 +55,16 @@ async def update_task(
json=update_data, json=update_data,
headers=headers headers=headers
) )
if response.status_code == 404:
logger.warning(f"Tarea {task_id} no encontrada, intentando crearla...")
return await _create_and_update_task(
task_id, message, status, pedimento_id, organizacion_id, servicio
)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
except httpx.HTTPError as e: except httpx.HTTPError as e:
logger.error(f"Error al actualizar tarea {task_id}: {str(e)}") logger.error(f"Error al actualizar tarea {task_id}: {str(e)}")
raise HTTPException( raise HTTPException(
@@ -81,6 +89,72 @@ async def update_task(
) )
) )
async def _create_and_update_task(
task_id: str,
message: str,
status: str,
pedimento_id: str,
organizacion_id: str,
servicio: int
) -> Dict[str, Any]:
"""
Función interna para crear una tarea y luego actualizarla.
"""
try:
# Primero crear la tarea
logger.info(f"Creando tarea {task_id} antes de actualizar")
headers = {
"Authorization": f"Token {settings.API_TOKEN}"
}
create_data = {
"task_id": task_id,
"message": message,
"status": status,
"pedimento": pedimento_id,
"organizacion": organizacion_id,
"servicio": servicio
}
async with httpx.AsyncClient() as client:
# Crear la tarea
create_response = await client.post(
f"{settings.API_URL}/tasks/tasks/",
json=create_data,
headers=headers
)
create_response.raise_for_status()
logger.info(f"Tarea {task_id} creada exitosamente")
# Actualizar la tarea recién creada
url = f"{settings.API_URL}/tasks/tasks/{task_id}/"
update_response = await client.put(
url,
json=create_data,
headers=headers
)
update_response.raise_for_status()
logger.info(f"Tarea {task_id} actualizada exitosamente después de crear")
return update_response.json()
except httpx.HTTPError as e:
logger.error(f"Error al crear/actualizar tarea {task_id}: {str(e)}")
raise HTTPException(
status_code=500,
detail=create_error_response(
message="Error al crear la tarea",
errors=[str(e)],
metadata={
"task_id": task_id,
"status": status
}
)
)
async def register_task( async def register_task(
task_id: str, task_id: str,
message: str, message: str,
@@ -154,4 +228,4 @@ async def register_task(
errors=[str(e)], errors=[str(e)],
metadata={"task_id": task_id} metadata={"task_id": task_id}
) )
) )

View File

@@ -46,7 +46,7 @@ class APIRESTController:
""" """
Método asíncrono para hacer peticiones a la API usando httpx. Método asíncrono para hacer peticiones a la API usando httpx.
""" """
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/') }" url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
logger.warning(f"Realizando petición {method} a {url}") logger.warning(f"Realizando petición {method} a {url}")
try: try:
@@ -242,7 +242,10 @@ class APIController:
""" """
Método para obtener la lista de servicios desde la API. Método para obtener la lista de servicios desde la API.
""" """
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}') # return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}')
# eliminar filtro de estado, estado tiene 4 tipos 1: En espera, 2: Procesando, 3: Finalizado y 4: Error
# lo elimine para poder ejecutar la operacion siempre que sea necesario
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&servicio={service_type}')
async def get_pedimento(self, pedimento_id: str) -> Dict[str, Any]: async def get_pedimento(self, pedimento_id: str) -> Dict[str, Any]:
""" """
@@ -461,25 +464,26 @@ class APIController:
""" """
return await self._make_request_async('PUT', f'customs/edocuments/{edocument_id}/', data=data) return await self._make_request_async('PUT', f'customs/edocuments/{edocument_id}/', data=data)
async def get_cer(self, id: str) -> bytes:
"""
Método para obtener un certificado específico desde la API (como binario).
Args:
id: UUID del certificado a consultar
Returns:
bytes: Contenido binario del certificado
"""
return await self._make_request_async('GET', f'vucem/vucem/{id}/download_cer/', return_bytes=True)
async def get_key(self, id: str) -> bytes: async def get_key(self, id: str) -> bytes:
""" result = await self._make_request_async('GET', f'vucem/vucem/{id}/download_key/', return_bytes=True)
Método para obtener una llave específica desde la API (como binario).
Args: if result is None:
id: UUID de la llave a consultar logger.info(f"get_key retornó None")
Returns: else:
bytes: Contenido binario de la llave logger.info(f"get_key retornó {len(result)} bytes")
"""
return await self._make_request_async('GET', f'vucem/vucem/{id}/download_key/', return_bytes=True) return result
async def get_cer(self, id: str) -> bytes:
result = await self._make_request_async('GET', f'vucem/vucem/{id}/download_cer/', return_bytes=True)
if result is None:
logger.info(f"get_cer retornó None")
else:
logger.info(f"get_cer retornó {len(result)} bytes")
return result
async def _make_request_async(self, method: str, endpoint: str, data=None, return_bytes: bool = False): async def _make_request_async(self, method: str, endpoint: str, data=None, return_bytes: bool = False):
""" """
@@ -497,11 +501,13 @@ class APIController:
try: try:
async with httpx.AsyncClient(timeout=self.timeout) as client: async with httpx.AsyncClient(timeout=self.timeout) as client:
logger.info(f"Haciendo petición {method} a {url}") logger.info(f"Haciendo petición {method} a {url}")
print(f"Haciendo petición {method} a {url}")
if method.upper() == 'GET': if method.upper() == 'GET':
response = await client.get(url, headers=self.headers) response = await client.get(url, headers=self.headers)
elif method.upper() == 'POST': elif method.upper() == 'POST':
response = await client.post(url, json=data, headers=self.headers) response = await client.post(url, json=data, headers=self.headers)
print(f"response >>>> {response}")
elif method.upper() == 'PUT': elif method.upper() == 'PUT':
response = await client.put(url, json=data, headers=self.headers) response = await client.put(url, json=data, headers=self.headers)
elif method.upper() == 'DELETE': elif method.upper() == 'DELETE':

View File

@@ -1,6 +1,5 @@
from pydantic import BaseModel, Field, field_validator from pydantic import BaseModel, Field, field_validator
from typing import Optional from typing import Optional, List, Union
class ServiceBaseSchema(BaseModel): class ServiceBaseSchema(BaseModel):
"""Esquema base para servicios con campos comunes""" """Esquema base para servicios con campos comunes"""
@@ -43,4 +42,41 @@ class ServiceRemesaSchema(BaseModel):
def validate_string_fields(cls, v): def validate_string_fields(cls, v):
if not v or not v.strip(): if not v or not v.strip():
raise ValueError('Los campos de texto no pueden estar vacíos') raise ValueError('Los campos de texto no pueden estar vacíos')
return v.strip() return v.strip()
class MultiPedimentoCompletoSchema(BaseModel):
"""Esquema para procesar múltiples pedimentos de forma asíncrona"""
organizacion: str = Field(..., description="ID de la organización")
pedimentos: List[str] = Field(
...,
description="Lista de IDs de pedimentos a procesar",
min_length=1,
max_length=200
)
@field_validator('organizacion')
def validate_organizacion(cls, v):
if not v or not v.strip():
raise ValueError('La organización no puede estar vacía')
return v.strip()
@field_validator('pedimentos')
def validate_pedimentos(cls, v):
if not v:
raise ValueError('Debe proporcionar al menos un pedimento')
if len(v) > 200:
raise ValueError(f'Máximo 200 pedimentos por solicitud. Recibidos: {len(v)}')
# Eliminar duplicados y vacíos
pedimentos_limpios = [p.strip() for p in v if p and p.strip()]
if not pedimentos_limpios:
raise ValueError('La lista de pedimentos no puede estar vacía')
# Eliminar duplicados
return list(set(pedimentos_limpios))
class Config:
json_schema_extra = {
"example": {
"organizacion": "1",
"pedimentos": ["123", "456", "789"]
}
}

169
tasks.py
View File

@@ -2,7 +2,7 @@ from celery import Celery
from celery_app import celery_app from celery_app import celery_app
import asyncio import asyncio
import logging import logging
from typing import Dict, Any from typing import Dict, Any, List
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from controllers.RESTController import rest_controller from controllers.RESTController import rest_controller
from controllers.SOAPController import soap_controller from controllers.SOAPController import soap_controller
@@ -158,6 +158,173 @@ def pedimento_completo_task(self, request_data: Dict[str, Any]):
return run_async_task(_execute_pedimento_completo) return run_async_task(_execute_pedimento_completo)
@celery_app.task(bind=True, name='tasks.multi_pedimento_completo_task')
def multi_pedimento_completo_task(self, pedimentos: List[str], organizacion: str):
"""
Tarea asíncrona para procesar MÚLTIPLES pedimentos completos.
Args:
pedimentos: Lista de IDs de pedimentos a procesar
organizacion: ID de la organización
"""
import time
from datetime import datetime
start_time = time.time()
results = {
"total": len(pedimentos),
"successful": [],
"failed": [],
"started_at": datetime.utcnow().isoformat()
}
total = len(pedimentos)
for idx, pedimento_id in enumerate(pedimentos, 1):
try:
# Actualizar progreso (igual que en tus otras tareas)
self.update_state(
state='PROGRESS',
meta={
'status': f'Procesando pedimento {idx}/{total}',
'current': idx,
'total': total,
'current_pedimento': pedimento_id,
'percentage': round((idx / total) * 100, 2)
}
)
logger.info(f"[MULTI] Procesando pedimento {idx}/{total}: {pedimento_id}")
# Preparar datos exactamente como lo espera la tarea individual
request_data = {
"pedimento": pedimento_id,
"organizacion": organizacion
}
# Reutilizar la lógica de la tarea individual
# Esto ejecuta el mismo código que tu endpoint individual
async def _execute():
return await _execute_pedimento_completo_logic(request_data)
result = run_async_task(_execute)
results["successful"].append({
"pedimento_id": pedimento_id,
"result": result
})
logger.info(f"[MULTI] Pedimento {pedimento_id} procesado exitosamente")
except Exception as e:
logger.error(f"[MULTI] Error procesando pedimento {pedimento_id}: {e}")
results["failed"].append({
"pedimento_id": pedimento_id,
"error": str(e)
})
elapsed_time = time.time() - start_time
results["completed_at"] = datetime.utcnow().isoformat()
results["elapsed_seconds"] = round(elapsed_time, 2)
results["success_count"] = len(results["successful"])
results["failed_count"] = len(results["failed"])
return results
async def _execute_pedimento_completo_logic(request_data: dict) -> dict:
"""
Lógica compartida para procesar un pedimento completo.
Esta es la misma lógica que usa tu endpoint individual.
"""
operation_name = "pedimento_completo"
service_data = None
try:
logger.info(f"Procesando pedimento completo - Pedimento: {request_data['pedimento']}")
# Validar datos de entrada
await _validate_request_data(request_data)
# Obtener servicio
service_data = await _get_pedimento_service(
pedimento_id=request_data['pedimento'],
service_type=3,
operation_name=operation_name
)
# Actualizar estado a "En proceso"
update_success = await _update_service_status(
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
)
if not update_success:
raise Exception("Error al actualizar estado del servicio")
# Obtener credenciales VUCEM
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
if not contribuyente_id:
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
raise Exception("ID de contribuyente no encontrado")
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
# Procesar petición SOAP
soap_response = await get_soap_pedimento_completo(
credenciales=credentials,
response_service=service_data,
soap_controller=soap_controller
)
if not soap_response:
raise Exception("Error en la petición SOAP")
# Actualizar datos del pedimento
xml_content = soap_response.get('xml_content', {})
if xml_content:
update_content = {k: v for k, v in xml_content.items() if k != 'identificadores_ed'}
update_content['existe_expediente'] = True
await rest_controller.put_pedimento(
service_data['pedimento']['id'],
update_content
)
# Procesar COVEs
coves = xml_content.get('coves', [])
if coves:
await _post_coves(
response_service=service_data,
coves=coves
)
# Procesar documentos digitalizados
identificadores_ed = xml_content.get('identificadores_ed', [])
if identificadores_ed:
await _post_edocuments(
response_service=service_data,
identificadores_ed=identificadores_ed
)
# Finalizar servicio
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
return {
"success": True,
"pedimento_id": request_data['pedimento'],
"message": "Pedimento procesado exitosamente"
}
except Exception as e:
logger.error(f"Error en pedimento {request_data['pedimento']}: {e}")
if service_data:
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
return {
"success": False,
"pedimento_id": request_data['pedimento'],
"error": str(e)
}
@celery_app.task(bind=True) @celery_app.task(bind=True)
def partidas_task(self, **kwargs): def partidas_task(self, **kwargs):
"""Tarea asíncrona para obtener partidas""" """Tarea asíncrona para obtener partidas"""

85
utils/minio_client.py Normal file
View File

@@ -0,0 +1,85 @@
# microservice/utils/minio_client.py
import os
from minio import Minio
from minio.error import S3Error
from datetime import timedelta
from typing import Optional, BinaryIO
import logging
logger = logging.getLogger(__name__)
class MinIOClient:
"""Cliente MinIO para FastAPI"""
def __init__(self):
self.client = None
self.bucket_name = None
self._initialize()
def _initialize(self):
"""Inicializa el cliente MinIO"""
endpoint = os.environ.get('MINIO_ENDPOINT', 'minio:9000')
access_key = os.environ.get('MINIO_ACCESS_KEY')
secret_key = os.environ.get('MINIO_SECRET_KEY')
secure = os.environ.get('MINIO_SECURE', 'false').lower() == 'true'
self.bucket_name = os.environ.get('MINIO_BUCKET_NAME', 'efc-microservice-dev')
self.client = Minio(
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure
)
# Asegurar bucket
if not self.client.bucket_exists(self.bucket_name):
self.client.make_bucket(self.bucket_name)
logger.info(f"Bucket '{self.bucket_name}' creado")
def upload_file(
self,
object_name: str,
file_path: str = None,
file_data: bytes = None,
content_type: str = None
) -> bool:
"""Sube archivo a MinIO (síncrono para Celery)"""
try:
if file_path:
self.client.fput_object(
self.bucket_name,
object_name,
file_path,
content_type=content_type
)
elif file_data:
import io
data_stream = io.BytesIO(file_data)
self.client.put_object(
self.bucket_name,
object_name,
data_stream,
len(file_data),
content_type=content_type
)
return True
except Exception as e:
logger.error(f"Error subiendo archivo: {e}")
return False
def get_presigned_url(self, object_name: str, expires: int = 3600) -> Optional[str]:
"""Genera URL firmada"""
try:
return self.client.presigned_get_object(
self.bucket_name,
object_name,
expires=timedelta(seconds=expires)
)
except Exception as e:
logger.error(f"Error generando URL: {e}")
return None
# Singleton
minio_client = MinIOClient()