Se agregaron estados y update a cada una de las tareas
This commit is contained in:
@@ -19,16 +19,6 @@ async def obtener_acuse(acuse_request: AcuseSchema):
|
||||
# Ejecuta la tarea de Celery de forma asíncrona
|
||||
task = process_acuse_request.delay(acuse_dict)
|
||||
|
||||
# Registra la tarea en el servicio de seguimiento
|
||||
await register_task(
|
||||
task_id=task.id,
|
||||
message=f"Procesando acuse para pedimento {acuse_dict.get('pedimento', {}).get('pedimento_app', 'N/A')}",
|
||||
status="submitted",
|
||||
pedimento_id=acuse_dict.get('pedimento', {}).get('id'),
|
||||
organizacion_id=acuse_dict.get('pedimento', {}).get('organizacion'),
|
||||
servicio=6 # 6 corresponde a "Acuse"
|
||||
)
|
||||
|
||||
return {"task_id": task.id, "status": "submitted"}
|
||||
|
||||
|
||||
@@ -49,14 +39,4 @@ async def obtener_acuses(acuse_request: AcuseMasivoSchema):
|
||||
task = process_acuse_request.delay(acuse_dict)
|
||||
task_ids.append(task.id)
|
||||
|
||||
# Registra cada tarea en el servicio de seguimiento
|
||||
await register_task(
|
||||
task_id=task.id,
|
||||
message=f"Procesando acuse masivo para pedimento {acuse_request_dict.get('pedimento', {}).get('pedimento_app', 'N/A')}",
|
||||
status="submitted",
|
||||
pedimento_id=acuse_request_dict.get('pedimento', {}).get('id'),
|
||||
organizacion_id=acuse_request_dict.get('pedimento', {}).get('organizacion'),
|
||||
servicio=6 # 6 corresponde a "Acuse"
|
||||
)
|
||||
|
||||
return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from celery import Celery
|
||||
from celery import Celery, current_task
|
||||
from celery_app import celery_app
|
||||
import asyncio
|
||||
import logging
|
||||
@@ -6,22 +6,82 @@ from typing import Dict, Any
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from .services import obtener_acuse
|
||||
from api.api_v2.modules.tasks.tasks import run_async_task
|
||||
from api.api_v2.modules.tasks.services import register_task, update_task
|
||||
|
||||
|
||||
@celery_app.task
|
||||
def process_acuse_request(acuse_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||
@celery_app.task(bind=True)
|
||||
def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Tarea de Celery para procesar la solicitud de acuse.
|
||||
Tarea para procesar solicitudes de acuse.
|
||||
"""
|
||||
task_id = self.request.id
|
||||
pedimento_info = acuse_request.get('pedimento', {})
|
||||
pedimento_id = pedimento_info.get('id')
|
||||
organizacion_id = pedimento_info.get('organizacion')
|
||||
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
||||
|
||||
Args:
|
||||
acuse_request: Diccionario con los datos de la solicitud de acuse.
|
||||
try:
|
||||
# Registrar el inicio de la tarea
|
||||
loop = asyncio.get_event_loop()
|
||||
logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}")
|
||||
loop.run_until_complete(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
status="submitted",
|
||||
message=f"Iniciando proceso de acuse para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=6 # 6 corresponde a "Acuse"
|
||||
)
|
||||
)
|
||||
|
||||
# Esperar un momento breve para asegurar que el registro se complete
|
||||
import time
|
||||
time.sleep(1)
|
||||
|
||||
Returns:
|
||||
Diccionario con la respuesta del acuse.
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
acuse_response = loop.run_until_complete(obtener_acuse(**acuse_request))
|
||||
|
||||
return {"status": "processed", "data": acuse_response}
|
||||
# Actualizar estado: procesando
|
||||
logging.info(f"[ACUSE] Actualizando estado a processing para tarea {task_id}")
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="processing",
|
||||
message=f"Procesando acuse para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=6
|
||||
)
|
||||
)
|
||||
|
||||
# Obtener el acuse
|
||||
acuse_response = loop.run_until_complete(obtener_acuse(**acuse_request))
|
||||
|
||||
# Actualizar estado: completado
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="completed",
|
||||
message=f"Acuse obtenido exitosamente para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=6
|
||||
)
|
||||
)
|
||||
|
||||
return {"status": "processed", "data": acuse_response}
|
||||
|
||||
except Exception as e:
|
||||
# En caso de error, actualizar estado
|
||||
error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}"
|
||||
logging.error(error_message)
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="failed",
|
||||
message=error_message,
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=6
|
||||
)
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
@@ -2,10 +2,13 @@ from fastapi import APIRouter, HTTPException
|
||||
from .schemas import CoveListSchema, CoveRequestSchema
|
||||
from typing import List, Dict, Any
|
||||
from uuid import UUID
|
||||
import logging
|
||||
|
||||
from .tasks import process_cove_request, process_acuse_cove_request
|
||||
from ..tasks.services import register_task
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
# Aquí puedes definir tus endpoints relacionados con COVES usando el esquema CoveBaseSchema
|
||||
|
||||
@@ -17,16 +20,6 @@ async def get_cove(cove: CoveRequestSchema):
|
||||
cove_dict = cove.model_dump()
|
||||
task = process_cove_request.delay(cove_dict)
|
||||
|
||||
# Registrar la tarea en el servicio de seguimiento
|
||||
await register_task(
|
||||
task_id=task.id,
|
||||
message=f"Procesando COVE para pedimento {cove_dict.get('pedimento', {}).get('pedimento_app', 'N/A')}",
|
||||
status="submitted",
|
||||
pedimento_id=cove_dict.get('pedimento', {}).get('id'),
|
||||
organizacion_id=cove_dict.get('pedimento', {}).get('organizacion'),
|
||||
servicio=8 # 8 corresponde a "Cove"
|
||||
)
|
||||
|
||||
return {"task_id": task.id, "status": "submitted"}
|
||||
|
||||
|
||||
@@ -46,16 +39,6 @@ async def get_coves(coves_request: CoveListSchema):
|
||||
task = process_cove_request.delay(cove_dict)
|
||||
task_ids.append(task.id)
|
||||
|
||||
# Registrar cada tarea en el servicio de seguimiento
|
||||
await register_task(
|
||||
task_id=task.id,
|
||||
message=f"Procesando COVE masivo para pedimento {pedimento.get('pedimento_app', 'N/A')}",
|
||||
status="submitted",
|
||||
pedimento_id=pedimento.get('id'),
|
||||
organizacion_id=pedimento.get('organizacion'),
|
||||
servicio=8 # 8 corresponde a "Cove"
|
||||
)
|
||||
|
||||
return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)}
|
||||
|
||||
|
||||
@@ -65,16 +48,6 @@ async def get_acuse_cove(cove: CoveRequestSchema):
|
||||
cove_dict = cove.model_dump()
|
||||
task = process_acuse_cove_request.delay(cove_dict)
|
||||
|
||||
# Registrar la tarea en el servicio de seguimiento
|
||||
await register_task(
|
||||
task_id=task.id,
|
||||
message=f"Procesando acuse de COVE para pedimento {cove_dict.get('pedimento', {}).get('pedimento_app', 'N/A')}",
|
||||
status="submitted",
|
||||
pedimento_id=cove_dict.get('pedimento', {}).get('id'),
|
||||
organizacion_id=cove_dict.get('pedimento', {}).get('organizacion'),
|
||||
servicio=9 # 9 corresponde a "Acuse Cove"
|
||||
)
|
||||
|
||||
return {"task_id": task.id, "status": "submitted"}
|
||||
|
||||
@router.post("/services/all/acuse/cove/", response_model=Dict[str, Any])
|
||||
@@ -93,14 +66,4 @@ async def get_acuses_cove(coves_request: CoveListSchema):
|
||||
task = process_acuse_cove_request.delay(acuse_dict)
|
||||
task_ids.append(task.id)
|
||||
|
||||
# Registrar cada tarea en el servicio de seguimiento
|
||||
await register_task(
|
||||
task_id=task.id,
|
||||
message=f"Procesando acuse masivo de COVE para pedimento {pedimento.get('pedimento_app', 'N/A')}",
|
||||
status="submitted",
|
||||
pedimento_id=pedimento.get('id'),
|
||||
organizacion_id=pedimento.get('organizacion'),
|
||||
servicio=9 # 9 corresponde a "Acuse Cove"
|
||||
)
|
||||
|
||||
return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)}
|
||||
@@ -1,11 +1,12 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from celery import Celery
|
||||
from celery_app import celery_app
|
||||
from typing import Dict, Any
|
||||
|
||||
from .services import consume_ws_get_cove, consume_ws_get_acuse_cove
|
||||
from api.api_v2.modules.tasks.tasks import run_async_task
|
||||
from api.api_v2.modules.tasks.services import update_task, register_task
|
||||
|
||||
# Logger para el módulo
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -13,16 +14,156 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||
loop = asyncio.get_event_loop()
|
||||
cove_response = loop.run_until_complete(consume_ws_get_cove(**cove_request))
|
||||
"""
|
||||
Tarea para procesar solicitudes de COVE.
|
||||
"""
|
||||
task_id = self.request.id
|
||||
pedimento_info = cove_request.get('pedimento', {})
|
||||
pedimento_id = pedimento_info.get('id')
|
||||
organizacion_id = pedimento_info.get('organizacion')
|
||||
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
||||
cove_info = cove_request.get('cove', {})
|
||||
cove_number = cove_info.get('numero_cove', 'N/A')
|
||||
|
||||
try:
|
||||
# Registrar el inicio de la tarea
|
||||
loop = asyncio.get_event_loop()
|
||||
logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
|
||||
loop.run_until_complete(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
status="submitted",
|
||||
message=f"Iniciando proceso de COVE {cove_number} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=8 # 8 corresponde a "Cove"
|
||||
)
|
||||
)
|
||||
|
||||
return {"status": "processed", "data": cove_response}
|
||||
# Esperar un momento breve para asegurar que el registro se complete
|
||||
time.sleep(1)
|
||||
|
||||
# Actualizar estado: procesando
|
||||
logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}")
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="processing",
|
||||
message=f"Procesando COVE {cove_number} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=8
|
||||
)
|
||||
)
|
||||
|
||||
# Obtener el COVE
|
||||
cove_response = loop.run_until_complete(consume_ws_get_cove(**cove_request))
|
||||
|
||||
# Actualizar estado: completado
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="completed",
|
||||
message=f"COVE {cove_number} obtenido exitosamente para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=8
|
||||
)
|
||||
)
|
||||
|
||||
return {"status": "processed", "data": cove_response}
|
||||
|
||||
except Exception as e:
|
||||
# En caso de error, actualizar estado
|
||||
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
||||
logger.error(error_message)
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="failed",
|
||||
message=error_message,
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=8
|
||||
)
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||
loop = asyncio.get_event_loop()
|
||||
acuse_response = loop.run_until_complete(consume_ws_get_acuse_cove(**cove_request))
|
||||
"""
|
||||
Tarea para procesar solicitudes de acuse de COVE.
|
||||
"""
|
||||
task_id = self.request.id
|
||||
pedimento_info = cove_request.get('pedimento', {})
|
||||
pedimento_id = pedimento_info.get('id')
|
||||
organizacion_id = pedimento_info.get('organizacion')
|
||||
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
||||
cove_info = cove_request.get('cove', {})
|
||||
cove_number = cove_info.get('numero_cove', 'N/A')
|
||||
|
||||
try:
|
||||
# Registrar el inicio de la tarea
|
||||
loop = asyncio.get_event_loop()
|
||||
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
|
||||
loop.run_until_complete(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
status="submitted",
|
||||
message=f"Iniciando proceso de acuse de COVE {cove_number} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=9 # 9 corresponde a "Acuse Cove"
|
||||
)
|
||||
)
|
||||
|
||||
return {"status": "processed", "data": acuse_response}
|
||||
# Esperar un momento breve para asegurar que el registro se complete
|
||||
time.sleep(1)
|
||||
|
||||
# Actualizar estado: procesando
|
||||
logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}")
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="processing",
|
||||
message=f"Procesando acuse de COVE {cove_number} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=9
|
||||
)
|
||||
)
|
||||
|
||||
# Obtener el acuse del COVE
|
||||
acuse_response = loop.run_until_complete(consume_ws_get_acuse_cove(**cove_request))
|
||||
|
||||
# Actualizar estado: completado
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="completed",
|
||||
message=f"Acuse de COVE {cove_number} obtenido exitosamente para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=9
|
||||
)
|
||||
)
|
||||
|
||||
return {"status": "processed", "data": acuse_response}
|
||||
|
||||
except Exception as e:
|
||||
# En caso de error, actualizar estado
|
||||
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
||||
logger.error(error_message)
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="failed",
|
||||
message=error_message,
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=9
|
||||
)
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
@@ -18,15 +18,6 @@ async def download_edoc(edoc_request: EdocumentsSchema):
|
||||
edoc_dict = edoc_request.model_dump()
|
||||
# Ejecuta la tarea de Celery de forma asíncrona
|
||||
task = process_edoc_download_request.delay(edoc_dict)
|
||||
# Registrar la tarea en el servicio de seguimiento
|
||||
await register_task(
|
||||
task_id=task.id,
|
||||
message=f"Procesando descarga de E-Document {edoc_dict.get('edoc', {}).get('numero_edocument', 'N/A')}",
|
||||
status="submitted",
|
||||
pedimento_id=edoc_dict.get('pedimento', {}).get('id'),
|
||||
organizacion_id=edoc_dict.get('pedimento', {}).get('organizacion'),
|
||||
servicio=7 # 7 corresponde a "EDocument"
|
||||
)
|
||||
# Devuelve el ID de la tarea
|
||||
return {"task_id": task.id, "status": "submitted"}
|
||||
|
||||
@@ -47,14 +38,4 @@ async def download_edocs_masivo(edoc_request: EdocumentsMasivoSchema):
|
||||
}
|
||||
task = process_edoc_download_request.delay(edoc_dict)
|
||||
task_ids.append(task.id)
|
||||
# Registrar cada tarea en el servicio de seguimiento
|
||||
await register_task(
|
||||
task_id=task.id,
|
||||
message=f"Procesando descarga masiva de E-Document {edoc.get('numero_edocument', 'N/A')}",
|
||||
status="submitted",
|
||||
pedimento_id=edoc_dict.get('pedimento', {}).get('id'),
|
||||
organizacion_id=edoc_dict.get('pedimento', {}).get('organizacion'),
|
||||
servicio=7 # 7 corresponde a "EDocument"
|
||||
)
|
||||
|
||||
return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)}
|
||||
@@ -1,25 +1,89 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from celery_app import celery_app
|
||||
from typing import Dict
|
||||
|
||||
from .services import obtener_edoc
|
||||
import asyncio # Necesario para ejecutar funciones async dentro de Celery
|
||||
from api.api_v2.modules.tasks.services import register_task, update_task
|
||||
|
||||
# Logger para el módulo
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def process_edoc_download_request(self, edoc_data: dict):
|
||||
def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
||||
"""
|
||||
Tarea de Celery para procesar la descarga de un solo documento edoc.
|
||||
"""
|
||||
task_id = self.request.id
|
||||
pedimento_info = edoc_data.get('pedimento', {})
|
||||
pedimento_id = pedimento_info.get('id')
|
||||
organizacion_id = pedimento_info.get('organizacion')
|
||||
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
||||
edoc_info = edoc_data.get('edoc', {})
|
||||
edoc_number = edoc_info.get('numero_edoc', 'N/A')
|
||||
|
||||
try:
|
||||
# Ejecutar la función asíncrona dentro del hilo síncrono de Celery
|
||||
# Registrar el inicio de la tarea
|
||||
loop = asyncio.get_event_loop()
|
||||
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
|
||||
loop.run_until_complete(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
status="submitted",
|
||||
message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=3 # 3 corresponde a "E-document"
|
||||
)
|
||||
)
|
||||
|
||||
# Esperar un momento breve para asegurar que el registro se complete
|
||||
time.sleep(1)
|
||||
|
||||
# Actualizar estado: procesando
|
||||
logger.info(f"[EDOC] Actualizando estado a processing para tarea {task_id}")
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="processing",
|
||||
message=f"Descargando E-document {edoc_number} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=3
|
||||
)
|
||||
)
|
||||
|
||||
# Obtener el E-document
|
||||
result = loop.run_until_complete(obtener_edoc(**edoc_data))
|
||||
|
||||
return {"status": "success", "result": result}
|
||||
except Exception as e:
|
||||
# Manejo de errores
|
||||
self.update_state(
|
||||
state='FAILURE',
|
||||
meta={'exc_type': type(e).__name__, 'exc_message': str(e)}
|
||||
# Actualizar estado: completado
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="completed",
|
||||
message=f"E-document {edoc_number} descargado exitosamente para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=3
|
||||
)
|
||||
)
|
||||
# Es crucial volver a lanzar la excepción para que Celery la marque como fallida
|
||||
raise e
|
||||
|
||||
return {"status": "success", "result": result}
|
||||
|
||||
except Exception as e:
|
||||
# En caso de error, actualizar estado
|
||||
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
|
||||
logger.error(error_message)
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="failed",
|
||||
message=error_message,
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=3
|
||||
)
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
@@ -20,16 +20,7 @@ async def obtener_partida(partida_request: PartidaRequestSchema):
|
||||
acuse_dict = partida_request.model_dump()
|
||||
# Ejecuta la tarea de Celery de forma asíncrona
|
||||
task = process_partida_request.delay(acuse_dict)
|
||||
# Registrar la tarea en el servicio de seguimiento
|
||||
await register_task(
|
||||
task_id=task.id,
|
||||
message=f"Procesando partida {acuse_dict.get('partida', {}).get('numero', 'N/A')} para pedimento {acuse_dict.get('pedimento', {}).get('pedimento_app', 'N/A')}",
|
||||
status="submitted",
|
||||
pedimento_id=acuse_dict.get('pedimento', {}).get('id'),
|
||||
organizacion_id=acuse_dict.get('pedimento', {}).get('organizacion'),
|
||||
servicio=4 # 4 corresponde a "Pedimento Partidas"
|
||||
)
|
||||
# Puedes devolver el ID de la tarea para consultar el estado después
|
||||
# Devolver el ID de la tarea para consultar el estado después
|
||||
return {"task_id": task.id, "status": "submitted"}
|
||||
|
||||
@router.post("/services/all/partidas/", response_model=Dict[str, Any])
|
||||
@@ -49,14 +40,4 @@ async def obtener_partidas(partidas_request: PartidaListSchema):
|
||||
}
|
||||
task = process_partida_request.delay(partida_dict)
|
||||
task_ids.append(task.id)
|
||||
# Registrar cada tarea en el servicio de seguimiento
|
||||
await register_task(
|
||||
task_id=task.id,
|
||||
message=f"Procesando partida masiva {partida.get('numero', 'N/A')} para pedimento {partida_request_dict.get('pedimento', {}).get('pedimento_app', 'N/A')}",
|
||||
status="submitted",
|
||||
pedimento_id=partida_request_dict.get('pedimento', {}).get('id'),
|
||||
organizacion_id=partida_request_dict.get('pedimento', {}).get('organizacion'),
|
||||
servicio=4 # 4 corresponde a "Pedimento Partidas"
|
||||
)
|
||||
|
||||
return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)}
|
||||
@@ -1,26 +1,90 @@
|
||||
from celery import Celery
|
||||
from celery_app import celery_app
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from celery import Celery
|
||||
from celery_app import celery_app
|
||||
from typing import Dict, Any
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from .services import consume_ws_get_partida
|
||||
from api.api_v2.modules.tasks.services import register_task, update_task
|
||||
|
||||
# Logger para el módulo
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@celery_app.task
|
||||
def process_partida_request(partida_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||
@celery_app.task(bind=True)
|
||||
def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Tarea de Celery para procesar la solicitud de partida.
|
||||
|
||||
Args:
|
||||
partida_request: Diccionario con los datos de la solicitud de partida.
|
||||
|
||||
Returns:
|
||||
Diccionario con la respuesta de la partida.
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
partida_response = loop.run_until_complete(consume_ws_get_partida(**partida_request))
|
||||
task_id = self.request.id
|
||||
pedimento_info = partida_request.get('pedimento', {})
|
||||
pedimento_id = pedimento_info.get('id')
|
||||
organizacion_id = pedimento_info.get('organizacion')
|
||||
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
||||
partida_info = partida_request.get('partida', {})
|
||||
partida_numero = partida_info.get('numero', 'N/A')
|
||||
|
||||
return {"status": "processed", "data": partida_response}
|
||||
try:
|
||||
# Registrar el inicio de la tarea
|
||||
loop = asyncio.get_event_loop()
|
||||
logger.info(f"[PARTIDA] Registrando inicio de tarea {task_id}")
|
||||
loop.run_until_complete(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
status="submitted",
|
||||
message=f"Iniciando proceso de partida {partida_numero} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=4 # 4 corresponde a "Pedimento Partidas"
|
||||
)
|
||||
)
|
||||
|
||||
# Esperar un momento breve para asegurar que el registro se complete
|
||||
time.sleep(1)
|
||||
|
||||
# Actualizar estado: procesando
|
||||
logger.info(f"[PARTIDA] Actualizando estado a processing para tarea {task_id}")
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="processing",
|
||||
message=f"Procesando partida {partida_numero} para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=4
|
||||
)
|
||||
)
|
||||
|
||||
# Obtener la partida
|
||||
partida_response = loop.run_until_complete(consume_ws_get_partida(**partida_request))
|
||||
|
||||
# Actualizar estado: completado
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="completed",
|
||||
message=f"Partida {partida_numero} procesada exitosamente para pedimento {pedimento_app}",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=4
|
||||
)
|
||||
)
|
||||
|
||||
return {"status": "processed", "data": partida_response}
|
||||
|
||||
except Exception as e:
|
||||
# En caso de error, actualizar estado
|
||||
error_message = f"Error al procesar partida {partida_numero} para pedimento {pedimento_app}: {str(e)}"
|
||||
logger.error(error_message)
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
status="failed",
|
||||
message=error_message,
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=4
|
||||
)
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
@@ -17,14 +17,8 @@ async def download_pedimento_completo(Pedimento: PedimentoCompletoRequestSchema)
|
||||
|
||||
# Ejecuta la tarea de Celery de forma asíncrona
|
||||
task = process_pedimento_completo_request.delay(pedimento_dict)
|
||||
# Registrar la tarea en el servicio de seguimiento
|
||||
await register_task(
|
||||
task_id=task.id,
|
||||
message=f"Procesando descarga de pedimento completo {pedimento_dict.get('pedimento', 'N/A')}",
|
||||
status="submitted",
|
||||
pedimento_id=pedimento_dict.get('id'),
|
||||
organizacion_id=pedimento_dict.get('organizacion'),
|
||||
servicio=3 # 3 corresponde a "Pedimento Completo"
|
||||
)
|
||||
# Puedes devolver el ID de la tarea para consultar el estado después
|
||||
return {"status": "submitted", "detail": "La solicitud de descarga del pedimento completo ha sido enviada.", "task_id": task.id}
|
||||
return {
|
||||
"status": "submitted",
|
||||
"detail": "La solicitud de descarga del pedimento completo ha sido enviada.",
|
||||
"task_id": task.id
|
||||
}
|
||||
|
||||
@@ -1,26 +1,87 @@
|
||||
from celery_app import celery_app
|
||||
|
||||
from .services import put_pedimento_data
|
||||
import asyncio # Necesario para ejecutar funciones async dentro de Celery
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from ..tasks.services import register_task, update_task
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@celery_app.task(bind=True)
|
||||
def process_pedimento_completo_request(self, pedimento_data: dict):
|
||||
"""
|
||||
Tarea de Celery para procesar la descarga de un solo documento edoc.
|
||||
Tarea de Celery para procesar la descarga de un pedimento completo.
|
||||
|
||||
Args:
|
||||
pedimento_data (dict): Datos del pedimento a procesar
|
||||
|
||||
Returns:
|
||||
dict: Resultado del procesamiento con estado y detalles
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
task_id = self.request.id
|
||||
servicio = 3 # Código para Pedimento Completo
|
||||
pedimento_id = pedimento_data.get('pedimento', {}).get('id')
|
||||
organizacion_id = pedimento_data.get('pedimento', {}).get('organizacion')
|
||||
|
||||
try:
|
||||
# Ejecutar la función asíncrona dentro del hilo síncrono de Celery
|
||||
loop = asyncio.get_event_loop()
|
||||
# Registrar la tarea primero
|
||||
loop.run_until_complete(
|
||||
register_task(
|
||||
task_id=task_id,
|
||||
message=f"Iniciando procesamiento del pedimento completo",
|
||||
status="submitted",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=servicio
|
||||
)
|
||||
)
|
||||
|
||||
# Luego actualizar estado a processing
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
message=f"Iniciando procesamiento del pedimento completo",
|
||||
status="processing",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=servicio
|
||||
)
|
||||
)
|
||||
|
||||
# Procesar pedimento
|
||||
result = loop.run_until_complete(put_pedimento_data(**pedimento_data))
|
||||
|
||||
return {"status": "success", "result": result}
|
||||
except Exception as e:
|
||||
# Manejo de errores
|
||||
self.update_state(
|
||||
state='FAILURE',
|
||||
meta={'exc_type': type(e).__name__, 'exc_message': str(e)}
|
||||
# Actualizar estado a completed
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
message=f"Pedimento completo procesado exitosamente",
|
||||
status="completed",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=servicio
|
||||
)
|
||||
)
|
||||
# Es crucial volver a lanzar la excepción para que Celery la marque como fallida
|
||||
raise e
|
||||
|
||||
return {"status": "success", "result": result}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error procesando pedimento completo: {str(e)}", exc_info=True)
|
||||
|
||||
# Actualizar estado a failed
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
message=f"Error al procesar pedimento completo: {str(e)}",
|
||||
status="failed",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=servicio
|
||||
)
|
||||
)
|
||||
except Exception as update_error:
|
||||
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||
|
||||
# Re-lanzar la excepción para que Celery la marque como fallida
|
||||
raise
|
||||
@@ -17,14 +17,8 @@ async def download_remesa(remesa_request: RemesaBaseSchema):
|
||||
|
||||
# Ejecuta la tarea de Celery de forma asíncrona
|
||||
task = process_remesa_request.delay(remesa_dict)
|
||||
# Registrar la tarea en el servicio de seguimiento
|
||||
await register_task(
|
||||
task_id=task.id,
|
||||
message=f"Procesando descarga de remesa {remesa_dict.get('remesa', 'N/A')}",
|
||||
status="submitted",
|
||||
pedimento_id=remesa_dict.get('pedimento', {}).get('id'),
|
||||
organizacion_id=remesa_dict.get('pedimento', {}).get('organizacion'),
|
||||
servicio=5 # 5 corresponde a "Pedimento Remesas"
|
||||
)
|
||||
# Puedes devolver el ID de la tarea para consultar el estado después
|
||||
return {"status": "submitted", "detail": "La solicitud de descarga de la remesa ha sido enviada.", "task_id": task.id}
|
||||
return {
|
||||
"status": "submitted",
|
||||
"detail": "La solicitud de descarga de la remesa ha sido enviada.",
|
||||
"task_id": task.id
|
||||
}
|
||||
|
||||
@@ -90,7 +90,7 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
|
||||
organizacion=pedimento_data.get('organizacion'),
|
||||
pedimento=pedimento_data.get('id'),
|
||||
file_name=file_name,
|
||||
document_type=3,
|
||||
document_type=5,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -1,27 +1,79 @@
|
||||
from celery import Celery
|
||||
from celery_app import celery_app
|
||||
from .services import post_remesa_data
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Dict, Any
|
||||
from contextlib import asynccontextmanager
|
||||
from ..tasks.services import register_task, update_task
|
||||
|
||||
from .services import post_remesa_data
|
||||
from api.api_v2.modules.tasks.tasks import run_async_task
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@celery_app.task
|
||||
def process_remesa_request(remesa_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||
@celery_app.task(bind=True)
|
||||
def process_remesa_request(self, remesa_request: dict) -> dict:
|
||||
"""
|
||||
Tarea de Celery para procesar la solicitud de acuse.
|
||||
Tarea de Celery para procesar la solicitud de remesa.
|
||||
|
||||
Args:
|
||||
acuse_request: Diccionario con los datos de la solicitud de acuse.
|
||||
remesa_request (dict): Datos de la remesa a procesar
|
||||
|
||||
Returns:
|
||||
Diccionario con la respuesta del acuse.
|
||||
dict: Resultado del procesamiento con estado y detalles
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
remesa_response = loop.run_until_complete(post_remesa_data(**remesa_request))
|
||||
task_id = self.request.id
|
||||
servicio = 5 # Código para Pedimento Remesas
|
||||
pedimento_id = remesa_request.get('pedimento', {}).get('id')
|
||||
organizacion_id = remesa_request.get('pedimento', {}).get('organizacion')
|
||||
remesa_num = remesa_request.get('remesa', 'N/A')
|
||||
|
||||
return {"status": "processed", "data": remesa_response}
|
||||
try:
|
||||
# Actualizar estado a processing
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
message=f"Iniciando procesamiento de la remesa {remesa_num}",
|
||||
status="processing",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=servicio
|
||||
)
|
||||
)
|
||||
|
||||
# Procesar remesa
|
||||
result = loop.run_until_complete(post_remesa_data(**remesa_request))
|
||||
|
||||
# Actualizar estado a completed
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
message=f"Remesa {remesa_num} procesada exitosamente",
|
||||
status="completed",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=servicio,
|
||||
result=result
|
||||
)
|
||||
)
|
||||
|
||||
return {"status": "success", "result": result}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error procesando remesa {remesa_num}: {str(e)}", exc_info=True)
|
||||
|
||||
# Actualizar estado a failed
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
update_task(
|
||||
task_id=task_id,
|
||||
message=f"Error al procesar remesa {remesa_num}: {str(e)}",
|
||||
status="failed",
|
||||
pedimento_id=pedimento_id,
|
||||
organizacion_id=organizacion_id,
|
||||
servicio=servicio,
|
||||
error=str(e)
|
||||
)
|
||||
)
|
||||
except Exception as update_error:
|
||||
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||
|
||||
# Re-lanzar la excepción para que Celery la marque como fallida
|
||||
raise
|
||||
|
||||
|
||||
@@ -1,17 +1,19 @@
|
||||
from fastapi import FastAPI
|
||||
from fastapi import FastAPI, Depends
|
||||
from fastapi.routing import APIRouter
|
||||
from celery_app import celery_app
|
||||
from fastapi import APIRouter, HTTPException, BackgroundTasks
|
||||
from datetime import datetime
|
||||
from fastapi.responses import JSONResponse
|
||||
from typing import Dict, Any, List, Optional
|
||||
from api.api_v2.modules.authentication.services import get_current_user
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/async/task-status/{task_id}")
|
||||
async def get_task_status(task_id: str):
|
||||
@router.get("/async/task-status/{task_id}", response_model=Dict[str, Any])
|
||||
async def get_task_status(task_id: str, current_user: Dict = Depends(get_current_user)):
|
||||
"""
|
||||
Consulta el estado de una tarea agendada.
|
||||
|
||||
@@ -84,37 +86,9 @@ async def get_task_status(task_id: str):
|
||||
detail=f"Error al consultar el estado de la tarea: {str(e)}"
|
||||
)
|
||||
|
||||
@router.get("/async/tasks/active")
|
||||
async def get_active_tasks():
|
||||
"""
|
||||
Lista todas las tareas activas en el sistema.
|
||||
|
||||
Returns:
|
||||
JSONResponse con la lista de tareas activas
|
||||
"""
|
||||
try:
|
||||
# Obtener tareas activas desde Celery
|
||||
inspect = celery_app.control.inspect()
|
||||
active_tasks = inspect.active()
|
||||
scheduled_tasks = inspect.scheduled()
|
||||
|
||||
response_data = {
|
||||
"active_tasks": active_tasks or {},
|
||||
"scheduled_tasks": scheduled_tasks or {},
|
||||
"timestamp": datetime.utcnow().isoformat()
|
||||
}
|
||||
|
||||
return JSONResponse(content=response_data, status_code=200)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error al obtener tareas activas: {e}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error al obtener tareas activas: {str(e)}"
|
||||
)
|
||||
|
||||
@router.delete("/async/task/{task_id}")
|
||||
async def cancel_task(task_id: str):
|
||||
@router.delete("/async/task/{task_id}", response_model=Dict[str, Any])
|
||||
async def cancel_task(task_id: str, current_user: Dict = Depends(get_current_user)):
|
||||
"""
|
||||
Cancela una tarea agendada.
|
||||
|
||||
|
||||
@@ -6,6 +6,81 @@ from ..common import create_error_response
|
||||
from core.config import settings
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def update_task(
|
||||
task_id: str,
|
||||
message: str,
|
||||
status: str,
|
||||
pedimento_id: str,
|
||||
organizacion_id: str,
|
||||
servicio: int
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Actualiza el estado de una tarea existente en el servicio de seguimiento.
|
||||
|
||||
Args:
|
||||
task_id: ID de la tarea de Celery
|
||||
message: Mensaje descriptivo del nuevo estado de la tarea
|
||||
status: Nuevo estado de la tarea
|
||||
pedimento_id: ID del pedimento asociado
|
||||
organizacion_id: ID de la organización
|
||||
servicio: ID del tipo de servicio (1-9)
|
||||
|
||||
Returns:
|
||||
Dict con la respuesta del servicio
|
||||
"""
|
||||
try:
|
||||
headers = {
|
||||
"Authorization": f"Token {settings.API_TOKEN}"
|
||||
}
|
||||
|
||||
# Construir el cuerpo de la petición
|
||||
update_data = {
|
||||
"task_id": task_id,
|
||||
"message": message,
|
||||
"status": status,
|
||||
"pedimento": pedimento_id,
|
||||
"organizacion": organizacion_id,
|
||||
"servicio": servicio
|
||||
}
|
||||
|
||||
logger.info(f"Actualizando tarea {task_id} con datos: {update_data}")
|
||||
|
||||
# Django requiere el slash final
|
||||
url = f"{settings.API_URL}/tasks/tasks/{task_id}/"
|
||||
logger.info(f"Actualizando tarea en: {url}")
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.put(
|
||||
url,
|
||||
json=update_data,
|
||||
headers=headers
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except httpx.HTTPError as e:
|
||||
logger.error(f"Error al actualizar tarea {task_id}: {str(e)}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=create_error_response(
|
||||
message="Error al actualizar la tarea",
|
||||
errors=[str(e)],
|
||||
metadata={
|
||||
"task_id": task_id,
|
||||
"status": status
|
||||
}
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error inesperado al actualizar tarea {task_id}: {str(e)}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=create_error_response(
|
||||
message="Error inesperado al actualizar la tarea",
|
||||
errors=[str(e)],
|
||||
metadata={"task_id": task_id}
|
||||
)
|
||||
)
|
||||
|
||||
async def register_task(
|
||||
task_id: str,
|
||||
message: str,
|
||||
|
||||
@@ -12,15 +12,24 @@ celery_app = Celery(
|
||||
|
||||
# Configuración adicional
|
||||
celery_app.conf.update(
|
||||
# Configuración básica
|
||||
task_serializer='json',
|
||||
accept_content=['json'],
|
||||
result_serializer='json',
|
||||
timezone='UTC',
|
||||
enable_utc=True,
|
||||
task_track_started=True,
|
||||
|
||||
# Configuración de workers
|
||||
task_time_limit=3600, # 1 hour timeout
|
||||
worker_prefetch_multiplier=1,
|
||||
worker_max_tasks_per_child=1000,
|
||||
|
||||
# Configuración de resultados persistentes
|
||||
task_track_started=True, # Guarda cuando la tarea inicia
|
||||
task_ignore_result=False, # Asegura que se guarden los resultados
|
||||
result_expires=60 * 60 * 24 * 7, # Mantiene resultados por 7 días
|
||||
task_store_errors_even_if_ignored=True, # Guarda errores aunque la tarea sea ignorada
|
||||
task_save_success_on_complete=True # Guarda los resultados exitosos
|
||||
)
|
||||
|
||||
# Autodiscovery of tasks
|
||||
|
||||
@@ -24,6 +24,24 @@ def soap_error(soap_response): # Testeado
|
||||
return True
|
||||
if "<ns2:mensaje>No hay información para la búsqueda solicitada</ns2:mensaje>" in soap_response.text:
|
||||
return True
|
||||
if "<descripcion>El RFC no tiene relación con el eDocument. </descripcion>" in soap_response.text:
|
||||
return True
|
||||
if "<error>Firma Electrónica : El RFC del usuario es distinto al del certificado.</error>" in soap_response.text:
|
||||
return True
|
||||
if "<error>Firma Electrónica : Firma inválida</error>" in soap_response.text:
|
||||
return True
|
||||
if "<error>Firma Electrónica : El certificado ha expirado.</error>" in soap_response.text:
|
||||
return True
|
||||
if "<error>Firma Electrónica : El certificado ha sido revocado.</error>" in soap_response.text:
|
||||
return True
|
||||
if "<error>Firma Electrónica : El certificado no es válido.</error>" in soap_response.text:
|
||||
return True
|
||||
if "<error>Firma Electrónica : No se encontró el certificado.</error>" in soap_response.text:
|
||||
return True
|
||||
if "<error>Firma Electrónica : Error al procesar la firma electrónica.</error>" in soap_response.text:
|
||||
return True
|
||||
if "<mensaje>unexpected XML tag. expected: {http://tempuri.org/}ConsultaPartidaResponse but found: {http://tempuri.org/}consultaPedimentoResponse</mensaje>" in soap_response.text:
|
||||
return True
|
||||
|
||||
# Aquí podrías agregar más lógica para verificar errores específicos en el XML
|
||||
return False
|
||||
Reference in New Issue
Block a user