diff --git a/api/api_v2/modules/acuses/routers.py b/api/api_v2/modules/acuses/routers.py index 8f3ee7a..83a00b0 100644 --- a/api/api_v2/modules/acuses/routers.py +++ b/api/api_v2/modules/acuses/routers.py @@ -19,16 +19,6 @@ async def obtener_acuse(acuse_request: AcuseSchema): # Ejecuta la tarea de Celery de forma asíncrona task = process_acuse_request.delay(acuse_dict) - # Registra la tarea en el servicio de seguimiento - await register_task( - task_id=task.id, - message=f"Procesando acuse para pedimento {acuse_dict.get('pedimento', {}).get('pedimento_app', 'N/A')}", - status="submitted", - pedimento_id=acuse_dict.get('pedimento', {}).get('id'), - organizacion_id=acuse_dict.get('pedimento', {}).get('organizacion'), - servicio=6 # 6 corresponde a "Acuse" - ) - return {"task_id": task.id, "status": "submitted"} @@ -49,14 +39,4 @@ async def obtener_acuses(acuse_request: AcuseMasivoSchema): task = process_acuse_request.delay(acuse_dict) task_ids.append(task.id) - # Registra cada tarea en el servicio de seguimiento - await register_task( - task_id=task.id, - message=f"Procesando acuse masivo para pedimento {acuse_request_dict.get('pedimento', {}).get('pedimento_app', 'N/A')}", - status="submitted", - pedimento_id=acuse_request_dict.get('pedimento', {}).get('id'), - organizacion_id=acuse_request_dict.get('pedimento', {}).get('organizacion'), - servicio=6 # 6 corresponde a "Acuse" - ) - return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)} diff --git a/api/api_v2/modules/acuses/tasks.py b/api/api_v2/modules/acuses/tasks.py index ae47aca..9559166 100644 --- a/api/api_v2/modules/acuses/tasks.py +++ b/api/api_v2/modules/acuses/tasks.py @@ -1,4 +1,4 @@ -from celery import Celery +from celery import Celery, current_task from celery_app import celery_app import asyncio import logging @@ -6,22 +6,82 @@ from typing import Dict, Any from contextlib import asynccontextmanager from .services import obtener_acuse -from api.api_v2.modules.tasks.tasks import run_async_task +from api.api_v2.modules.tasks.services import register_task, update_task -@celery_app.task -def process_acuse_request(acuse_request: Dict[str, Any]) -> Dict[str, Any]: +@celery_app.task(bind=True) +def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]: """ - Tarea de Celery para procesar la solicitud de acuse. + Tarea para procesar solicitudes de acuse. + """ + task_id = self.request.id + pedimento_info = acuse_request.get('pedimento', {}) + pedimento_id = pedimento_info.get('id') + organizacion_id = pedimento_info.get('organizacion') + pedimento_app = pedimento_info.get('pedimento_app', 'N/A') - Args: - acuse_request: Diccionario con los datos de la solicitud de acuse. + try: + # Registrar el inicio de la tarea + loop = asyncio.get_event_loop() + logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}") + loop.run_until_complete( + register_task( + task_id=task_id, + status="submitted", + message=f"Iniciando proceso de acuse para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=6 # 6 corresponde a "Acuse" + ) + ) + + # Esperar un momento breve para asegurar que el registro se complete + import time + time.sleep(1) - Returns: - Diccionario con la respuesta del acuse. - """ - loop = asyncio.get_event_loop() - acuse_response = loop.run_until_complete(obtener_acuse(**acuse_request)) - - return {"status": "processed", "data": acuse_response} + # Actualizar estado: procesando + logging.info(f"[ACUSE] Actualizando estado a processing para tarea {task_id}") + loop.run_until_complete( + update_task( + task_id=task_id, + status="processing", + message=f"Procesando acuse para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=6 + ) + ) + + # Obtener el acuse + acuse_response = loop.run_until_complete(obtener_acuse(**acuse_request)) + + # Actualizar estado: completado + loop.run_until_complete( + update_task( + task_id=task_id, + status="completed", + message=f"Acuse obtenido exitosamente para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=6 + ) + ) + + return {"status": "processed", "data": acuse_response} + + except Exception as e: + # En caso de error, actualizar estado + error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}" + logging.error(error_message) + loop.run_until_complete( + update_task( + task_id=task_id, + status="failed", + message=error_message, + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=6 + ) + ) + raise diff --git a/api/api_v2/modules/coves/routers.py b/api/api_v2/modules/coves/routers.py index 56f664a..c0220fb 100644 --- a/api/api_v2/modules/coves/routers.py +++ b/api/api_v2/modules/coves/routers.py @@ -2,10 +2,13 @@ from fastapi import APIRouter, HTTPException from .schemas import CoveListSchema, CoveRequestSchema from typing import List, Dict, Any from uuid import UUID +import logging from .tasks import process_cove_request, process_acuse_cove_request from ..tasks.services import register_task +logger = logging.getLogger(__name__) + router = APIRouter() # Aquí puedes definir tus endpoints relacionados con COVES usando el esquema CoveBaseSchema @@ -17,16 +20,6 @@ async def get_cove(cove: CoveRequestSchema): cove_dict = cove.model_dump() task = process_cove_request.delay(cove_dict) - # Registrar la tarea en el servicio de seguimiento - await register_task( - task_id=task.id, - message=f"Procesando COVE para pedimento {cove_dict.get('pedimento', {}).get('pedimento_app', 'N/A')}", - status="submitted", - pedimento_id=cove_dict.get('pedimento', {}).get('id'), - organizacion_id=cove_dict.get('pedimento', {}).get('organizacion'), - servicio=8 # 8 corresponde a "Cove" - ) - return {"task_id": task.id, "status": "submitted"} @@ -46,16 +39,6 @@ async def get_coves(coves_request: CoveListSchema): task = process_cove_request.delay(cove_dict) task_ids.append(task.id) - # Registrar cada tarea en el servicio de seguimiento - await register_task( - task_id=task.id, - message=f"Procesando COVE masivo para pedimento {pedimento.get('pedimento_app', 'N/A')}", - status="submitted", - pedimento_id=pedimento.get('id'), - organizacion_id=pedimento.get('organizacion'), - servicio=8 # 8 corresponde a "Cove" - ) - return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)} @@ -65,16 +48,6 @@ async def get_acuse_cove(cove: CoveRequestSchema): cove_dict = cove.model_dump() task = process_acuse_cove_request.delay(cove_dict) - # Registrar la tarea en el servicio de seguimiento - await register_task( - task_id=task.id, - message=f"Procesando acuse de COVE para pedimento {cove_dict.get('pedimento', {}).get('pedimento_app', 'N/A')}", - status="submitted", - pedimento_id=cove_dict.get('pedimento', {}).get('id'), - organizacion_id=cove_dict.get('pedimento', {}).get('organizacion'), - servicio=9 # 9 corresponde a "Acuse Cove" - ) - return {"task_id": task.id, "status": "submitted"} @router.post("/services/all/acuse/cove/", response_model=Dict[str, Any]) @@ -93,14 +66,4 @@ async def get_acuses_cove(coves_request: CoveListSchema): task = process_acuse_cove_request.delay(acuse_dict) task_ids.append(task.id) - # Registrar cada tarea en el servicio de seguimiento - await register_task( - task_id=task.id, - message=f"Procesando acuse masivo de COVE para pedimento {pedimento.get('pedimento_app', 'N/A')}", - status="submitted", - pedimento_id=pedimento.get('id'), - organizacion_id=pedimento.get('organizacion'), - servicio=9 # 9 corresponde a "Acuse Cove" - ) - return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)} \ No newline at end of file diff --git a/api/api_v2/modules/coves/tasks.py b/api/api_v2/modules/coves/tasks.py index c34b9b7..f1d8201 100644 --- a/api/api_v2/modules/coves/tasks.py +++ b/api/api_v2/modules/coves/tasks.py @@ -1,11 +1,12 @@ import asyncio import logging +import time from celery import Celery from celery_app import celery_app from typing import Dict, Any from .services import consume_ws_get_cove, consume_ws_get_acuse_cove -from api.api_v2.modules.tasks.tasks import run_async_task +from api.api_v2.modules.tasks.services import update_task, register_task # Logger para el módulo logger = logging.getLogger(__name__) @@ -13,16 +14,156 @@ logger = logging.getLogger(__name__) @celery_app.task(bind=True) def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: - loop = asyncio.get_event_loop() - cove_response = loop.run_until_complete(consume_ws_get_cove(**cove_request)) + """ + Tarea para procesar solicitudes de COVE. + """ + task_id = self.request.id + pedimento_info = cove_request.get('pedimento', {}) + pedimento_id = pedimento_info.get('id') + organizacion_id = pedimento_info.get('organizacion') + pedimento_app = pedimento_info.get('pedimento_app', 'N/A') + cove_info = cove_request.get('cove', {}) + cove_number = cove_info.get('numero_cove', 'N/A') + + try: + # Registrar el inicio de la tarea + loop = asyncio.get_event_loop() + logger.info(f"[COVE] Registrando inicio de tarea {task_id}") + loop.run_until_complete( + register_task( + task_id=task_id, + status="submitted", + message=f"Iniciando proceso de COVE {cove_number} para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=8 # 8 corresponde a "Cove" + ) + ) - return {"status": "processed", "data": cove_response} + # Esperar un momento breve para asegurar que el registro se complete + time.sleep(1) + + # Actualizar estado: procesando + logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}") + loop.run_until_complete( + update_task( + task_id=task_id, + status="processing", + message=f"Procesando COVE {cove_number} para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=8 + ) + ) + + # Obtener el COVE + cove_response = loop.run_until_complete(consume_ws_get_cove(**cove_request)) + + # Actualizar estado: completado + loop.run_until_complete( + update_task( + task_id=task_id, + status="completed", + message=f"COVE {cove_number} obtenido exitosamente para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=8 + ) + ) + + return {"status": "processed", "data": cove_response} + + except Exception as e: + # En caso de error, actualizar estado + error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" + logger.error(error_message) + loop.run_until_complete( + update_task( + task_id=task_id, + status="failed", + message=error_message, + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=8 + ) + ) + raise @celery_app.task(bind=True) def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: - loop = asyncio.get_event_loop() - acuse_response = loop.run_until_complete(consume_ws_get_acuse_cove(**cove_request)) + """ + Tarea para procesar solicitudes de acuse de COVE. + """ + task_id = self.request.id + pedimento_info = cove_request.get('pedimento', {}) + pedimento_id = pedimento_info.get('id') + organizacion_id = pedimento_info.get('organizacion') + pedimento_app = pedimento_info.get('pedimento_app', 'N/A') + cove_info = cove_request.get('cove', {}) + cove_number = cove_info.get('numero_cove', 'N/A') + + try: + # Registrar el inicio de la tarea + loop = asyncio.get_event_loop() + logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}") + loop.run_until_complete( + register_task( + task_id=task_id, + status="submitted", + message=f"Iniciando proceso de acuse de COVE {cove_number} para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=9 # 9 corresponde a "Acuse Cove" + ) + ) - return {"status": "processed", "data": acuse_response} + # Esperar un momento breve para asegurar que el registro se complete + time.sleep(1) + + # Actualizar estado: procesando + logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}") + loop.run_until_complete( + update_task( + task_id=task_id, + status="processing", + message=f"Procesando acuse de COVE {cove_number} para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=9 + ) + ) + + # Obtener el acuse del COVE + acuse_response = loop.run_until_complete(consume_ws_get_acuse_cove(**cove_request)) + + # Actualizar estado: completado + loop.run_until_complete( + update_task( + task_id=task_id, + status="completed", + message=f"Acuse de COVE {cove_number} obtenido exitosamente para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=9 + ) + ) + + return {"status": "processed", "data": acuse_response} + + except Exception as e: + # En caso de error, actualizar estado + error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}" + logger.error(error_message) + loop.run_until_complete( + update_task( + task_id=task_id, + status="failed", + message=error_message, + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=9 + ) + ) + raise diff --git a/api/api_v2/modules/edocs/routers.py b/api/api_v2/modules/edocs/routers.py index b98cf40..86beac5 100644 --- a/api/api_v2/modules/edocs/routers.py +++ b/api/api_v2/modules/edocs/routers.py @@ -18,15 +18,6 @@ async def download_edoc(edoc_request: EdocumentsSchema): edoc_dict = edoc_request.model_dump() # Ejecuta la tarea de Celery de forma asíncrona task = process_edoc_download_request.delay(edoc_dict) - # Registrar la tarea en el servicio de seguimiento - await register_task( - task_id=task.id, - message=f"Procesando descarga de E-Document {edoc_dict.get('edoc', {}).get('numero_edocument', 'N/A')}", - status="submitted", - pedimento_id=edoc_dict.get('pedimento', {}).get('id'), - organizacion_id=edoc_dict.get('pedimento', {}).get('organizacion'), - servicio=7 # 7 corresponde a "EDocument" - ) # Devuelve el ID de la tarea return {"task_id": task.id, "status": "submitted"} @@ -47,14 +38,4 @@ async def download_edocs_masivo(edoc_request: EdocumentsMasivoSchema): } task = process_edoc_download_request.delay(edoc_dict) task_ids.append(task.id) - # Registrar cada tarea en el servicio de seguimiento - await register_task( - task_id=task.id, - message=f"Procesando descarga masiva de E-Document {edoc.get('numero_edocument', 'N/A')}", - status="submitted", - pedimento_id=edoc_dict.get('pedimento', {}).get('id'), - organizacion_id=edoc_dict.get('pedimento', {}).get('organizacion'), - servicio=7 # 7 corresponde a "EDocument" - ) - return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)} \ No newline at end of file diff --git a/api/api_v2/modules/edocs/tasks.py b/api/api_v2/modules/edocs/tasks.py index 41479d1..d6fad28 100644 --- a/api/api_v2/modules/edocs/tasks.py +++ b/api/api_v2/modules/edocs/tasks.py @@ -1,25 +1,89 @@ +import asyncio +import logging +import time from celery_app import celery_app +from typing import Dict from .services import obtener_edoc -import asyncio # Necesario para ejecutar funciones async dentro de Celery +from api.api_v2.modules.tasks.services import register_task, update_task + +# Logger para el módulo +logger = logging.getLogger(__name__) @celery_app.task(bind=True) -def process_edoc_download_request(self, edoc_data: dict): +def process_edoc_download_request(self, edoc_data: Dict) -> Dict: """ Tarea de Celery para procesar la descarga de un solo documento edoc. """ + task_id = self.request.id + pedimento_info = edoc_data.get('pedimento', {}) + pedimento_id = pedimento_info.get('id') + organizacion_id = pedimento_info.get('organizacion') + pedimento_app = pedimento_info.get('pedimento_app', 'N/A') + edoc_info = edoc_data.get('edoc', {}) + edoc_number = edoc_info.get('numero_edoc', 'N/A') + try: - # Ejecutar la función asíncrona dentro del hilo síncrono de Celery + # Registrar el inicio de la tarea loop = asyncio.get_event_loop() + logger.info(f"[EDOC] Registrando inicio de tarea {task_id}") + loop.run_until_complete( + register_task( + task_id=task_id, + status="submitted", + message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=3 # 3 corresponde a "E-document" + ) + ) + + # Esperar un momento breve para asegurar que el registro se complete + time.sleep(1) + + # Actualizar estado: procesando + logger.info(f"[EDOC] Actualizando estado a processing para tarea {task_id}") + loop.run_until_complete( + update_task( + task_id=task_id, + status="processing", + message=f"Descargando E-document {edoc_number} para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=3 + ) + ) + + # Obtener el E-document result = loop.run_until_complete(obtener_edoc(**edoc_data)) - return {"status": "success", "result": result} - except Exception as e: - # Manejo de errores - self.update_state( - state='FAILURE', - meta={'exc_type': type(e).__name__, 'exc_message': str(e)} + # Actualizar estado: completado + loop.run_until_complete( + update_task( + task_id=task_id, + status="completed", + message=f"E-document {edoc_number} descargado exitosamente para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=3 + ) ) - # Es crucial volver a lanzar la excepción para que Celery la marque como fallida - raise e + + return {"status": "success", "result": result} + + except Exception as e: + # En caso de error, actualizar estado + error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}" + logger.error(error_message) + loop.run_until_complete( + update_task( + task_id=task_id, + status="failed", + message=error_message, + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=3 + ) + ) + raise diff --git a/api/api_v2/modules/partidas/routers.py b/api/api_v2/modules/partidas/routers.py index 967016b..4426c61 100644 --- a/api/api_v2/modules/partidas/routers.py +++ b/api/api_v2/modules/partidas/routers.py @@ -20,16 +20,7 @@ async def obtener_partida(partida_request: PartidaRequestSchema): acuse_dict = partida_request.model_dump() # Ejecuta la tarea de Celery de forma asíncrona task = process_partida_request.delay(acuse_dict) - # Registrar la tarea en el servicio de seguimiento - await register_task( - task_id=task.id, - message=f"Procesando partida {acuse_dict.get('partida', {}).get('numero', 'N/A')} para pedimento {acuse_dict.get('pedimento', {}).get('pedimento_app', 'N/A')}", - status="submitted", - pedimento_id=acuse_dict.get('pedimento', {}).get('id'), - organizacion_id=acuse_dict.get('pedimento', {}).get('organizacion'), - servicio=4 # 4 corresponde a "Pedimento Partidas" - ) - # Puedes devolver el ID de la tarea para consultar el estado después + # Devolver el ID de la tarea para consultar el estado después return {"task_id": task.id, "status": "submitted"} @router.post("/services/all/partidas/", response_model=Dict[str, Any]) @@ -49,14 +40,4 @@ async def obtener_partidas(partidas_request: PartidaListSchema): } task = process_partida_request.delay(partida_dict) task_ids.append(task.id) - # Registrar cada tarea en el servicio de seguimiento - await register_task( - task_id=task.id, - message=f"Procesando partida masiva {partida.get('numero', 'N/A')} para pedimento {partida_request_dict.get('pedimento', {}).get('pedimento_app', 'N/A')}", - status="submitted", - pedimento_id=partida_request_dict.get('pedimento', {}).get('id'), - organizacion_id=partida_request_dict.get('pedimento', {}).get('organizacion'), - servicio=4 # 4 corresponde a "Pedimento Partidas" - ) - return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)} \ No newline at end of file diff --git a/api/api_v2/modules/partidas/tasks.py b/api/api_v2/modules/partidas/tasks.py index 8e10579..9c53513 100644 --- a/api/api_v2/modules/partidas/tasks.py +++ b/api/api_v2/modules/partidas/tasks.py @@ -1,26 +1,90 @@ -from celery import Celery -from celery_app import celery_app import asyncio import logging +import time +from celery import Celery +from celery_app import celery_app from typing import Dict, Any -from contextlib import asynccontextmanager from .services import consume_ws_get_partida +from api.api_v2.modules.tasks.services import register_task, update_task +# Logger para el módulo +logger = logging.getLogger(__name__) -@celery_app.task -def process_partida_request(partida_request: Dict[str, Any]) -> Dict[str, Any]: +@celery_app.task(bind=True) +def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str, Any]: """ Tarea de Celery para procesar la solicitud de partida. - - Args: - partida_request: Diccionario con los datos de la solicitud de partida. - - Returns: - Diccionario con la respuesta de la partida. """ - loop = asyncio.get_event_loop() - partida_response = loop.run_until_complete(consume_ws_get_partida(**partida_request)) + task_id = self.request.id + pedimento_info = partida_request.get('pedimento', {}) + pedimento_id = pedimento_info.get('id') + organizacion_id = pedimento_info.get('organizacion') + pedimento_app = pedimento_info.get('pedimento_app', 'N/A') + partida_info = partida_request.get('partida', {}) + partida_numero = partida_info.get('numero', 'N/A') - return {"status": "processed", "data": partida_response} + try: + # Registrar el inicio de la tarea + loop = asyncio.get_event_loop() + logger.info(f"[PARTIDA] Registrando inicio de tarea {task_id}") + loop.run_until_complete( + register_task( + task_id=task_id, + status="submitted", + message=f"Iniciando proceso de partida {partida_numero} para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=4 # 4 corresponde a "Pedimento Partidas" + ) + ) + + # Esperar un momento breve para asegurar que el registro se complete + time.sleep(1) + + # Actualizar estado: procesando + logger.info(f"[PARTIDA] Actualizando estado a processing para tarea {task_id}") + loop.run_until_complete( + update_task( + task_id=task_id, + status="processing", + message=f"Procesando partida {partida_numero} para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=4 + ) + ) + + # Obtener la partida + partida_response = loop.run_until_complete(consume_ws_get_partida(**partida_request)) + + # Actualizar estado: completado + loop.run_until_complete( + update_task( + task_id=task_id, + status="completed", + message=f"Partida {partida_numero} procesada exitosamente para pedimento {pedimento_app}", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=4 + ) + ) + + return {"status": "processed", "data": partida_response} + + except Exception as e: + # En caso de error, actualizar estado + error_message = f"Error al procesar partida {partida_numero} para pedimento {pedimento_app}: {str(e)}" + logger.error(error_message) + loop.run_until_complete( + update_task( + task_id=task_id, + status="failed", + message=error_message, + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=4 + ) + ) + raise diff --git a/api/api_v2/modules/pedimentos/routers.py b/api/api_v2/modules/pedimentos/routers.py index 2dae985..1f9f792 100644 --- a/api/api_v2/modules/pedimentos/routers.py +++ b/api/api_v2/modules/pedimentos/routers.py @@ -17,14 +17,8 @@ async def download_pedimento_completo(Pedimento: PedimentoCompletoRequestSchema) # Ejecuta la tarea de Celery de forma asíncrona task = process_pedimento_completo_request.delay(pedimento_dict) - # Registrar la tarea en el servicio de seguimiento - await register_task( - task_id=task.id, - message=f"Procesando descarga de pedimento completo {pedimento_dict.get('pedimento', 'N/A')}", - status="submitted", - pedimento_id=pedimento_dict.get('id'), - organizacion_id=pedimento_dict.get('organizacion'), - servicio=3 # 3 corresponde a "Pedimento Completo" - ) - # Puedes devolver el ID de la tarea para consultar el estado después - return {"status": "submitted", "detail": "La solicitud de descarga del pedimento completo ha sido enviada.", "task_id": task.id} + return { + "status": "submitted", + "detail": "La solicitud de descarga del pedimento completo ha sido enviada.", + "task_id": task.id + } diff --git a/api/api_v2/modules/pedimentos/tasks.py b/api/api_v2/modules/pedimentos/tasks.py index 5516838..27b0c46 100644 --- a/api/api_v2/modules/pedimentos/tasks.py +++ b/api/api_v2/modules/pedimentos/tasks.py @@ -1,26 +1,87 @@ from celery_app import celery_app - from .services import put_pedimento_data -import asyncio # Necesario para ejecutar funciones async dentro de Celery - +import asyncio +import logging +from ..tasks.services import register_task, update_task +logger = logging.getLogger(__name__) @celery_app.task(bind=True) def process_pedimento_completo_request(self, pedimento_data: dict): """ - Tarea de Celery para procesar la descarga de un solo documento edoc. + Tarea de Celery para procesar la descarga de un pedimento completo. + + Args: + pedimento_data (dict): Datos del pedimento a procesar + + Returns: + dict: Resultado del procesamiento con estado y detalles """ + loop = asyncio.get_event_loop() + task_id = self.request.id + servicio = 3 # Código para Pedimento Completo + pedimento_id = pedimento_data.get('pedimento', {}).get('id') + organizacion_id = pedimento_data.get('pedimento', {}).get('organizacion') + try: - # Ejecutar la función asíncrona dentro del hilo síncrono de Celery - loop = asyncio.get_event_loop() + # Registrar la tarea primero + loop.run_until_complete( + register_task( + task_id=task_id, + message=f"Iniciando procesamiento del pedimento completo", + status="submitted", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=servicio + ) + ) + + # Luego actualizar estado a processing + loop.run_until_complete( + update_task( + task_id=task_id, + message=f"Iniciando procesamiento del pedimento completo", + status="processing", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=servicio + ) + ) + + # Procesar pedimento result = loop.run_until_complete(put_pedimento_data(**pedimento_data)) - return {"status": "success", "result": result} - except Exception as e: - # Manejo de errores - self.update_state( - state='FAILURE', - meta={'exc_type': type(e).__name__, 'exc_message': str(e)} + # Actualizar estado a completed + loop.run_until_complete( + update_task( + task_id=task_id, + message=f"Pedimento completo procesado exitosamente", + status="completed", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=servicio + ) ) - # Es crucial volver a lanzar la excepción para que Celery la marque como fallida - raise e \ No newline at end of file + + return {"status": "success", "result": result} + + except Exception as e: + logger.error(f"Error procesando pedimento completo: {str(e)}", exc_info=True) + + # Actualizar estado a failed + try: + loop.run_until_complete( + update_task( + task_id=task_id, + message=f"Error al procesar pedimento completo: {str(e)}", + status="failed", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=servicio + ) + ) + except Exception as update_error: + logger.error(f"Error actualizando estado de tarea: {update_error}") + + # Re-lanzar la excepción para que Celery la marque como fallida + raise \ No newline at end of file diff --git a/api/api_v2/modules/remesas/routers.py b/api/api_v2/modules/remesas/routers.py index 9bebabf..4f17b5c 100644 --- a/api/api_v2/modules/remesas/routers.py +++ b/api/api_v2/modules/remesas/routers.py @@ -17,14 +17,8 @@ async def download_remesa(remesa_request: RemesaBaseSchema): # Ejecuta la tarea de Celery de forma asíncrona task = process_remesa_request.delay(remesa_dict) - # Registrar la tarea en el servicio de seguimiento - await register_task( - task_id=task.id, - message=f"Procesando descarga de remesa {remesa_dict.get('remesa', 'N/A')}", - status="submitted", - pedimento_id=remesa_dict.get('pedimento', {}).get('id'), - organizacion_id=remesa_dict.get('pedimento', {}).get('organizacion'), - servicio=5 # 5 corresponde a "Pedimento Remesas" - ) - # Puedes devolver el ID de la tarea para consultar el estado después - return {"status": "submitted", "detail": "La solicitud de descarga de la remesa ha sido enviada.", "task_id": task.id} + return { + "status": "submitted", + "detail": "La solicitud de descarga de la remesa ha sido enviada.", + "task_id": task.id + } diff --git a/api/api_v2/modules/remesas/services.py b/api/api_v2/modules/remesas/services.py index 9814a5a..3508c48 100644 --- a/api/api_v2/modules/remesas/services.py +++ b/api/api_v2/modules/remesas/services.py @@ -90,7 +90,7 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]: organizacion=pedimento_data.get('organizacion'), pedimento=pedimento_data.get('id'), file_name=file_name, - document_type=3, + document_type=5, ) except Exception as e: diff --git a/api/api_v2/modules/remesas/tasks.py b/api/api_v2/modules/remesas/tasks.py index 9f2754d..1c1717e 100644 --- a/api/api_v2/modules/remesas/tasks.py +++ b/api/api_v2/modules/remesas/tasks.py @@ -1,27 +1,79 @@ -from celery import Celery from celery_app import celery_app +from .services import post_remesa_data import asyncio import logging -from typing import Dict, Any -from contextlib import asynccontextmanager +from ..tasks.services import register_task, update_task -from .services import post_remesa_data -from api.api_v2.modules.tasks.tasks import run_async_task +logger = logging.getLogger(__name__) - -@celery_app.task -def process_remesa_request(remesa_request: Dict[str, Any]) -> Dict[str, Any]: +@celery_app.task(bind=True) +def process_remesa_request(self, remesa_request: dict) -> dict: """ - Tarea de Celery para procesar la solicitud de acuse. + Tarea de Celery para procesar la solicitud de remesa. Args: - acuse_request: Diccionario con los datos de la solicitud de acuse. + remesa_request (dict): Datos de la remesa a procesar Returns: - Diccionario con la respuesta del acuse. + dict: Resultado del procesamiento con estado y detalles """ loop = asyncio.get_event_loop() - remesa_response = loop.run_until_complete(post_remesa_data(**remesa_request)) + task_id = self.request.id + servicio = 5 # Código para Pedimento Remesas + pedimento_id = remesa_request.get('pedimento', {}).get('id') + organizacion_id = remesa_request.get('pedimento', {}).get('organizacion') + remesa_num = remesa_request.get('remesa', 'N/A') - return {"status": "processed", "data": remesa_response} + try: + # Actualizar estado a processing + loop.run_until_complete( + update_task( + task_id=task_id, + message=f"Iniciando procesamiento de la remesa {remesa_num}", + status="processing", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=servicio + ) + ) + + # Procesar remesa + result = loop.run_until_complete(post_remesa_data(**remesa_request)) + + # Actualizar estado a completed + loop.run_until_complete( + update_task( + task_id=task_id, + message=f"Remesa {remesa_num} procesada exitosamente", + status="completed", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=servicio, + result=result + ) + ) + + return {"status": "success", "result": result} + + except Exception as e: + logger.error(f"Error procesando remesa {remesa_num}: {str(e)}", exc_info=True) + + # Actualizar estado a failed + try: + loop.run_until_complete( + update_task( + task_id=task_id, + message=f"Error al procesar remesa {remesa_num}: {str(e)}", + status="failed", + pedimento_id=pedimento_id, + organizacion_id=organizacion_id, + servicio=servicio, + error=str(e) + ) + ) + except Exception as update_error: + logger.error(f"Error actualizando estado de tarea: {update_error}") + + # Re-lanzar la excepción para que Celery la marque como fallida + raise diff --git a/api/api_v2/modules/tasks/routers.py b/api/api_v2/modules/tasks/routers.py index eb5f062..f79ed5b 100644 --- a/api/api_v2/modules/tasks/routers.py +++ b/api/api_v2/modules/tasks/routers.py @@ -1,17 +1,19 @@ -from fastapi import FastAPI +from fastapi import FastAPI, Depends from fastapi.routing import APIRouter from celery_app import celery_app from fastapi import APIRouter, HTTPException, BackgroundTasks from datetime import datetime from fastapi.responses import JSONResponse +from typing import Dict, Any, List, Optional +from api.api_v2.modules.authentication.services import get_current_user import logging logger = logging.getLogger(__name__) router = APIRouter() -@router.get("/async/task-status/{task_id}") -async def get_task_status(task_id: str): +@router.get("/async/task-status/{task_id}", response_model=Dict[str, Any]) +async def get_task_status(task_id: str, current_user: Dict = Depends(get_current_user)): """ Consulta el estado de una tarea agendada. @@ -84,37 +86,9 @@ async def get_task_status(task_id: str): detail=f"Error al consultar el estado de la tarea: {str(e)}" ) -@router.get("/async/tasks/active") -async def get_active_tasks(): - """ - Lista todas las tareas activas en el sistema. - - Returns: - JSONResponse con la lista de tareas activas - """ - try: - # Obtener tareas activas desde Celery - inspect = celery_app.control.inspect() - active_tasks = inspect.active() - scheduled_tasks = inspect.scheduled() - - response_data = { - "active_tasks": active_tasks or {}, - "scheduled_tasks": scheduled_tasks or {}, - "timestamp": datetime.utcnow().isoformat() - } - - return JSONResponse(content=response_data, status_code=200) - - except Exception as e: - logger.error(f"Error al obtener tareas activas: {e}") - raise HTTPException( - status_code=500, - detail=f"Error al obtener tareas activas: {str(e)}" - ) -@router.delete("/async/task/{task_id}") -async def cancel_task(task_id: str): +@router.delete("/async/task/{task_id}", response_model=Dict[str, Any]) +async def cancel_task(task_id: str, current_user: Dict = Depends(get_current_user)): """ Cancela una tarea agendada. diff --git a/api/api_v2/modules/tasks/services.py b/api/api_v2/modules/tasks/services.py index 2edaea8..24a079b 100644 --- a/api/api_v2/modules/tasks/services.py +++ b/api/api_v2/modules/tasks/services.py @@ -6,6 +6,81 @@ from ..common import create_error_response from core.config import settings logger = logging.getLogger(__name__) +async def update_task( + task_id: str, + message: str, + status: str, + pedimento_id: str, + organizacion_id: str, + servicio: int +) -> Dict[str, Any]: + """ + Actualiza el estado de una tarea existente en el servicio de seguimiento. + + Args: + task_id: ID de la tarea de Celery + message: Mensaje descriptivo del nuevo estado de la tarea + status: Nuevo estado de la tarea + pedimento_id: ID del pedimento asociado + organizacion_id: ID de la organización + servicio: ID del tipo de servicio (1-9) + + Returns: + Dict con la respuesta del servicio + """ + try: + headers = { + "Authorization": f"Token {settings.API_TOKEN}" + } + + # Construir el cuerpo de la petición + update_data = { + "task_id": task_id, + "message": message, + "status": status, + "pedimento": pedimento_id, + "organizacion": organizacion_id, + "servicio": servicio + } + + logger.info(f"Actualizando tarea {task_id} con datos: {update_data}") + + # Django requiere el slash final + url = f"{settings.API_URL}/tasks/tasks/{task_id}/" + logger.info(f"Actualizando tarea en: {url}") + + async with httpx.AsyncClient() as client: + response = await client.put( + url, + json=update_data, + headers=headers + ) + response.raise_for_status() + return response.json() + except httpx.HTTPError as e: + logger.error(f"Error al actualizar tarea {task_id}: {str(e)}") + raise HTTPException( + status_code=500, + detail=create_error_response( + message="Error al actualizar la tarea", + errors=[str(e)], + metadata={ + "task_id": task_id, + "status": status + } + ) + ) + except Exception as e: + logger.error(f"Error inesperado al actualizar tarea {task_id}: {str(e)}") + raise HTTPException( + status_code=500, + detail=create_error_response( + message="Error inesperado al actualizar la tarea", + errors=[str(e)], + metadata={"task_id": task_id} + ) + ) + async def register_task( task_id: str, message: str, diff --git a/celery_app.py b/celery_app.py index 13b2592..61dc453 100644 --- a/celery_app.py +++ b/celery_app.py @@ -12,15 +12,24 @@ celery_app = Celery( # Configuración adicional celery_app.conf.update( + # Configuración básica task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, - task_track_started=True, + + # Configuración de workers task_time_limit=3600, # 1 hour timeout worker_prefetch_multiplier=1, worker_max_tasks_per_child=1000, + + # Configuración de resultados persistentes + task_track_started=True, # Guarda cuando la tarea inicia + task_ignore_result=False, # Asegura que se guarden los resultados + result_expires=60 * 60 * 24 * 7, # Mantiene resultados por 7 días + task_store_errors_even_if_ignored=True, # Guarda errores aunque la tarea sea ignorada + task_save_success_on_complete=True # Guarda los resultados exitosos ) # Autodiscovery of tasks diff --git a/utils/helpers.py b/utils/helpers.py index 8fef0ab..4fb34f8 100644 --- a/utils/helpers.py +++ b/utils/helpers.py @@ -24,6 +24,24 @@ def soap_error(soap_response): # Testeado return True if "No hay información para la búsqueda solicitada" in soap_response.text: return True + if "El RFC no tiene relación con el eDocument. " in soap_response.text: + return True + if "Firma Electrónica : El RFC del usuario es distinto al del certificado." in soap_response.text: + return True + if "Firma Electrónica : Firma inválida" in soap_response.text: + return True + if "Firma Electrónica : El certificado ha expirado." in soap_response.text: + return True + if "Firma Electrónica : El certificado ha sido revocado." in soap_response.text: + return True + if "Firma Electrónica : El certificado no es válido." in soap_response.text: + return True + if "Firma Electrónica : No se encontró el certificado." in soap_response.text: + return True + if "Firma Electrónica : Error al procesar la firma electrónica." in soap_response.text: + return True + if "unexpected XML tag. expected: {http://tempuri.org/}ConsultaPartidaResponse but found: {http://tempuri.org/}consultaPedimentoResponse" in soap_response.text: + return True # Aquí podrías agregar más lógica para verificar errores específicos en el XML return False \ No newline at end of file