Compare commits
7 Commits
feature/rb
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 1e06d1a2bf | |||
| b9c6ab89c3 | |||
| 042d02e240 | |||
| 288a96bc59 | |||
| 9f59ac0d00 | |||
| fe8e7dc10f | |||
| a6ce91d8af |
@@ -9,6 +9,7 @@ from api.api_v2.modules.edocs.routers import router as edocs_router
|
|||||||
from api.api_v2.modules.partidas.routers import router as partidas_router
|
from api.api_v2.modules.partidas.routers import router as partidas_router
|
||||||
from api.api_v2.modules.pedimentos.routers import router as pedimentos_router
|
from api.api_v2.modules.pedimentos.routers import router as pedimentos_router
|
||||||
from api.api_v2.modules.remesas.routers import router as remesas_router
|
from api.api_v2.modules.remesas.routers import router as remesas_router
|
||||||
|
from api.api_v2.modules.stream.routers import router as stream_router
|
||||||
|
|
||||||
api_router = APIRouter()
|
api_router = APIRouter()
|
||||||
|
|
||||||
@@ -20,3 +21,4 @@ api_router.include_router(edocs_router, tags=["EDocuments"])
|
|||||||
api_router.include_router(partidas_router, tags=["Partidas"])
|
api_router.include_router(partidas_router, tags=["Partidas"])
|
||||||
api_router.include_router(pedimentos_router, tags=["Pedimentos"])
|
api_router.include_router(pedimentos_router, tags=["Pedimentos"])
|
||||||
api_router.include_router(remesas_router, tags=["Remesas"])
|
api_router.include_router(remesas_router, tags=["Remesas"])
|
||||||
|
api_router.include_router(stream_router, tags=["Stream"])
|
||||||
|
|||||||
@@ -203,6 +203,7 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
|
|||||||
data = {
|
data = {
|
||||||
"id": edoc.get("id"),
|
"id": edoc.get("id"),
|
||||||
"acuse_descargado": status,
|
"acuse_descargado": status,
|
||||||
|
"acuse_estado": "descargado" if status else "pendiente",
|
||||||
"numero_edocument": edoc.get("numero_edocument"),
|
"numero_edocument": edoc.get("numero_edocument"),
|
||||||
"pedimento": pedimento.get("id"),
|
"pedimento": pedimento.get("id"),
|
||||||
"organizacion": pedimento.get("organizacion"),
|
"organizacion": pedimento.get("organizacion"),
|
||||||
@@ -210,6 +211,36 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
|
|||||||
|
|
||||||
response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
|
response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
|
||||||
|
|
||||||
|
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
|
||||||
|
if response is None:
|
||||||
|
logger.error(f"No se pudo actualizar el estatus del acuse del EDocument {edoc.get('numero_edocument')} en la API")
|
||||||
|
raise Exception(f"Fallo al actualizar el estatus del acuse del EDocument {edoc.get('numero_edocument')}")
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
async def marcar_error_acuse(edoc: dict, pedimento: dict, mensaje: str, definitivo: bool = False):
|
||||||
|
"""
|
||||||
|
Reporta un fallo de descarga del acuse al registro de negocio (T2026-05-027).
|
||||||
|
|
||||||
|
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
|
||||||
|
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
|
||||||
|
la transición a 'error'.
|
||||||
|
- definitivo=True (fallo permanente): transiciona de inmediato a 'error';
|
||||||
|
queda fuera del ciclo automático, solo reproceso manual o reset.
|
||||||
|
"""
|
||||||
|
data = {
|
||||||
|
"id": edoc.get("id"),
|
||||||
|
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
|
||||||
|
"numero_edocument": edoc.get("numero_edocument"),
|
||||||
|
"pedimento": pedimento.get("id"),
|
||||||
|
"organizacion": pedimento.get("organizacion"),
|
||||||
|
}
|
||||||
|
if definitivo:
|
||||||
|
data["acuse_estado"] = "error"
|
||||||
|
response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
|
||||||
|
if response is None:
|
||||||
|
logger.error(f"No se pudo registrar el error del acuse del EDocument {edoc.get('numero_edocument')} en la API")
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def _decode_acuse_base64_content(base64_content): # Testeado
|
def _decode_acuse_base64_content(base64_content): # Testeado
|
||||||
|
|||||||
@@ -6,9 +6,13 @@ from typing import Dict, Any
|
|||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
|
|
||||||
from .services import obtener_acuse
|
from .services import obtener_acuse, marcar_error_acuse
|
||||||
from api.api_v2.modules.tasks.services import register_task, update_task
|
from api.api_v2.modules.tasks.services import register_task, update_task
|
||||||
|
|
||||||
|
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
|
||||||
|
# incrementan el contador de intentos del backend (T2026-05-027)
|
||||||
|
WORKER_MAX_RETRIES = 2
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(bind=True)
|
@celery_app.task(bind=True)
|
||||||
def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]:
|
def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
@@ -20,12 +24,14 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
|
|||||||
pedimento_id = pedimento_info.get('id')
|
pedimento_id = pedimento_info.get('id')
|
||||||
organizacion_id = pedimento_info.get('organizacion')
|
organizacion_id = pedimento_info.get('organizacion')
|
||||||
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
||||||
|
edoc_info = acuse_request.get('edoc', {})
|
||||||
|
|
||||||
loop = asyncio.new_event_loop()
|
loop = asyncio.new_event_loop()
|
||||||
asyncio.set_event_loop(loop)
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Registrar el inicio de la tarea
|
# Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
|
||||||
|
if self.request.retries == 0:
|
||||||
logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}")
|
logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}")
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
register_task(
|
register_task(
|
||||||
@@ -76,6 +82,22 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
|
|||||||
# En caso de error, actualizar estado
|
# En caso de error, actualizar estado
|
||||||
error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}"
|
error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}"
|
||||||
logging.error(error_message)
|
logging.error(error_message)
|
||||||
|
|
||||||
|
# Reintento interno del worker para fallos transitorios (red, timeout):
|
||||||
|
# mismo intento orquestado, NO incrementa el contador del backend
|
||||||
|
if not isinstance(e, HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
|
||||||
|
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
|
||||||
|
|
||||||
|
# Fallo definitivo de este intento: registrar el detalle en el registro de
|
||||||
|
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
|
||||||
|
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(
|
||||||
|
marcar_error_acuse(edoc_info, pedimento_info, error_message, definitivo=False)
|
||||||
|
)
|
||||||
|
except Exception as report_error:
|
||||||
|
logging.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
update_task(
|
update_task(
|
||||||
|
|||||||
@@ -146,6 +146,13 @@ async def consume_ws_get_cove(**kwargs):
|
|||||||
logger.error(f"Error detectado en la respuesta SOAP: {str(e)}")
|
logger.error(f"Error detectado en la respuesta SOAP: {str(e)}")
|
||||||
raise Exception(f"Error en la respuesta SOAP: {str(e)}")
|
raise Exception(f"Error en la respuesta SOAP: {str(e)}")
|
||||||
|
|
||||||
|
# Validar que el documento quedó persistido en la API antes de marcar el
|
||||||
|
# COVE como descargado (T2026-05-027): post_or_update_document retorna
|
||||||
|
# None en error sin lanzar excepción
|
||||||
|
if not document_response or document_response.get("id") is None:
|
||||||
|
logger.error(f"No se pudo persistir el COVE {cove} en la API; el estatus no se actualiza")
|
||||||
|
raise Exception(f"No se pudo persistir el COVE {cove} en la API")
|
||||||
|
|
||||||
logger.info("Documento enviado, actualizando status de COVE")
|
logger.info("Documento enviado, actualizando status de COVE")
|
||||||
|
|
||||||
# Actualizar status del COVE
|
# Actualizar status del COVE
|
||||||
@@ -329,6 +336,15 @@ async def consume_ws_get_acuse_cove(**kwargs):
|
|||||||
identifier=cove_identifier,
|
identifier=cove_identifier,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Validar la subida antes de marcar el acuse como descargado (T2026-05-027):
|
||||||
|
# post_or_update_document retorna None en error sin lanzar excepción
|
||||||
|
if not rest_response or rest_response.get("id") is None:
|
||||||
|
logger.error("Error al enviar el acuse de COVE a la API interna; el estatus no se actualiza")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail="No se pudo persistir el acuse del COVE en la API; el estatus no se actualiza"
|
||||||
|
)
|
||||||
|
|
||||||
acuse_status = await change_acuse_status(
|
acuse_status = await change_acuse_status(
|
||||||
cove=kwargs.get('cove'),
|
cove=kwargs.get('cove'),
|
||||||
status=True,
|
status=True,
|
||||||
@@ -507,6 +523,7 @@ async def change_cove_status(cove: dict, status: bool, pedimento: dict):
|
|||||||
data = {
|
data = {
|
||||||
"id": cove.get("id"),
|
"id": cove.get("id"),
|
||||||
"cove_descargado": status,
|
"cove_descargado": status,
|
||||||
|
"cove_estado": "descargado" if status else "pendiente",
|
||||||
"numero_cove": cove.get("cove"),
|
"numero_cove": cove.get("cove"),
|
||||||
"pedimento": pedimento.get("id"),
|
"pedimento": pedimento.get("id"),
|
||||||
"organizacion": pedimento.get("organizacion"),
|
"organizacion": pedimento.get("organizacion"),
|
||||||
@@ -514,6 +531,11 @@ async def change_cove_status(cove: dict, status: bool, pedimento: dict):
|
|||||||
|
|
||||||
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
|
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
|
||||||
|
|
||||||
|
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
|
||||||
|
if response is None:
|
||||||
|
logger.error(f"No se pudo actualizar el estatus del COVE {cove.get('cove')} en la API")
|
||||||
|
raise Exception(f"Fallo al actualizar el estatus del COVE {cove.get('cove')}")
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
@@ -521,14 +543,46 @@ async def change_acuse_status(cove: dict, status: bool, pedimento: dict):
|
|||||||
data = {
|
data = {
|
||||||
"id": cove.get("id"),
|
"id": cove.get("id"),
|
||||||
"acuse_cove_descargado": status,
|
"acuse_cove_descargado": status,
|
||||||
|
"acuse_cove_estado": "descargado" if status else "pendiente",
|
||||||
"numero_cove": cove.get("cove"),
|
"numero_cove": cove.get("cove"),
|
||||||
"pedimento": pedimento.get("id"),
|
"pedimento": pedimento.get("id"),
|
||||||
"organizacion": pedimento.get("organizacion"),
|
"organizacion": pedimento.get("organizacion"),
|
||||||
}
|
}
|
||||||
|
|
||||||
print(data)
|
|
||||||
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
|
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
|
||||||
|
|
||||||
|
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
|
||||||
|
if response is None:
|
||||||
|
logger.error(f"No se pudo actualizar el estatus del acuse del COVE {cove.get('cove')} en la API")
|
||||||
|
raise Exception(f"Fallo al actualizar el estatus del acuse del COVE {cove.get('cove')}")
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
async def marcar_error_cove(cove: dict, pedimento: dict, mensaje: str, acuse: bool = False, definitivo: bool = False):
|
||||||
|
"""
|
||||||
|
Reporta un fallo de descarga al registro de negocio (T2026-05-027).
|
||||||
|
|
||||||
|
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
|
||||||
|
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
|
||||||
|
la transición a 'error'.
|
||||||
|
- definitivo=True (fallo permanente: VUCEM responde que no existe, credencial
|
||||||
|
rechazada): transiciona de inmediato a 'error'; queda fuera del ciclo
|
||||||
|
automático y solo el reproceso manual o el reset lo reincorporan.
|
||||||
|
"""
|
||||||
|
campo = "acuse_cove_estado" if acuse else "cove_estado"
|
||||||
|
data = {
|
||||||
|
"id": cove.get("id"),
|
||||||
|
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
|
||||||
|
"numero_cove": cove.get("cove"),
|
||||||
|
"pedimento": pedimento.get("id"),
|
||||||
|
"organizacion": pedimento.get("organizacion"),
|
||||||
|
}
|
||||||
|
if definitivo:
|
||||||
|
data[campo] = "error"
|
||||||
|
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
|
||||||
|
if response is None:
|
||||||
|
logger.error(f"No se pudo registrar el error del COVE {cove.get('cove')} en la API")
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -3,13 +3,18 @@ import logging
|
|||||||
from celery import Celery
|
from celery import Celery
|
||||||
from celery_app import celery_app
|
from celery_app import celery_app
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
|
from fastapi import HTTPException as _HTTPException
|
||||||
|
|
||||||
from .services import consume_ws_get_cove, consume_ws_get_acuse_cove
|
from .services import consume_ws_get_cove, consume_ws_get_acuse_cove, marcar_error_cove
|
||||||
from api.api_v2.modules.tasks.services import update_task, register_task
|
from api.api_v2.modules.tasks.services import update_task, register_task
|
||||||
|
|
||||||
# Logger para el módulo
|
# Logger para el módulo
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
|
||||||
|
# incrementan el contador de intentos del backend (T2026-05-027)
|
||||||
|
WORKER_MAX_RETRIES = 2
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(bind=True)
|
@celery_app.task(bind=True)
|
||||||
def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
@@ -26,7 +31,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
cove_number = cove_info.get('cove', 'N/A')
|
cove_number = cove_info.get('cove', 'N/A')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Registrar el inicio de la tarea
|
# Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
|
||||||
|
if self.request.retries == 0:
|
||||||
logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
|
logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
|
||||||
asyncio.run(
|
asyncio.run(
|
||||||
register_task(
|
register_task(
|
||||||
@@ -70,6 +76,21 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
||||||
logger.error(error_message)
|
logger.error(error_message)
|
||||||
|
|
||||||
|
# Reintento interno del worker para fallos transitorios (red, timeout):
|
||||||
|
# mismo intento orquestado, NO incrementa el contador del backend
|
||||||
|
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
|
||||||
|
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
|
||||||
|
|
||||||
|
# Fallo definitivo de este intento: registrar el detalle en el registro de
|
||||||
|
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
|
||||||
|
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
|
||||||
|
try:
|
||||||
|
asyncio.run(
|
||||||
|
marcar_error_cove(cove_info, pedimento_info, error_message, acuse=False, definitivo=False)
|
||||||
|
)
|
||||||
|
except Exception as report_error:
|
||||||
|
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
asyncio.run(
|
asyncio.run(
|
||||||
update_task(
|
update_task(
|
||||||
@@ -84,6 +105,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
except Exception as update_error:
|
except Exception as update_error:
|
||||||
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
|
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
|
||||||
|
|
||||||
|
if isinstance(e, _HTTPException):
|
||||||
|
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
@@ -102,7 +125,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
|||||||
cove_number = cove_info.get('cove', 'N/A')
|
cove_number = cove_info.get('cove', 'N/A')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Registrar el inicio de la tarea
|
# Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
|
||||||
|
if self.request.retries == 0:
|
||||||
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
|
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
|
||||||
asyncio.run(
|
asyncio.run(
|
||||||
register_task(
|
register_task(
|
||||||
@@ -146,6 +170,21 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
|||||||
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
||||||
logger.error(error_message)
|
logger.error(error_message)
|
||||||
|
|
||||||
|
# Reintento interno del worker para fallos transitorios (red, timeout):
|
||||||
|
# mismo intento orquestado, NO incrementa el contador del backend
|
||||||
|
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
|
||||||
|
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
|
||||||
|
|
||||||
|
# Fallo definitivo de este intento: registrar el detalle en el registro de
|
||||||
|
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
|
||||||
|
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
|
||||||
|
try:
|
||||||
|
asyncio.run(
|
||||||
|
marcar_error_cove(cove_info, pedimento_info, error_message, acuse=True, definitivo=False)
|
||||||
|
)
|
||||||
|
except Exception as report_error:
|
||||||
|
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
asyncio.run(
|
asyncio.run(
|
||||||
update_task(
|
update_task(
|
||||||
@@ -160,4 +199,6 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
|||||||
except Exception as update_error:
|
except Exception as update_error:
|
||||||
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
|
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
|
||||||
|
|
||||||
|
if isinstance(e, _HTTPException):
|
||||||
|
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||||
raise
|
raise
|
||||||
@@ -277,6 +277,7 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
|
|||||||
data = {
|
data = {
|
||||||
"id": edoc.get("id"),
|
"id": edoc.get("id"),
|
||||||
"edocument_descargado": status,
|
"edocument_descargado": status,
|
||||||
|
"edocument_estado": "descargado" if status else "pendiente",
|
||||||
"numero_edocument": edoc.get("numero_edocument"),
|
"numero_edocument": edoc.get("numero_edocument"),
|
||||||
"pedimento": pedimento.get("id"),
|
"pedimento": pedimento.get("id"),
|
||||||
"organizacion": pedimento.get("organizacion"),
|
"organizacion": pedimento.get("organizacion"),
|
||||||
@@ -284,6 +285,36 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
|
|||||||
|
|
||||||
response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
|
response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
|
||||||
|
|
||||||
|
# Nunca reportar éxito si el estatus no quedó persistido (T2026-05-027)
|
||||||
|
if response is None:
|
||||||
|
logger.error(f"No se pudo actualizar el estatus del EDocument {edoc.get('numero_edocument')} en la API")
|
||||||
|
raise Exception(f"Fallo al actualizar el estatus del EDocument {edoc.get('numero_edocument')}")
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
async def marcar_error_edocument(edoc: dict, pedimento: dict, mensaje: str, definitivo: bool = False):
|
||||||
|
"""
|
||||||
|
Reporta un fallo de descarga al registro de negocio (T2026-05-027).
|
||||||
|
|
||||||
|
- definitivo=False (fallo transitorio): solo registra ultimo_error; el registro
|
||||||
|
permanece 'pendiente' y el tope de intentos automáticos del backend gobierna
|
||||||
|
la transición a 'error'.
|
||||||
|
- definitivo=True (fallo permanente): transiciona de inmediato a 'error';
|
||||||
|
queda fuera del ciclo automático, solo reproceso manual o reset.
|
||||||
|
"""
|
||||||
|
data = {
|
||||||
|
"id": edoc.get("id"),
|
||||||
|
"ultimo_error": (mensaje or "Error de descarga en VUCEM")[:2000],
|
||||||
|
"numero_edocument": edoc.get("numero_edocument"),
|
||||||
|
"pedimento": pedimento.get("id"),
|
||||||
|
"organizacion": pedimento.get("organizacion"),
|
||||||
|
}
|
||||||
|
if definitivo:
|
||||||
|
data["edocument_estado"] = "error"
|
||||||
|
response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data)
|
||||||
|
if response is None:
|
||||||
|
logger.error(f"No se pudo registrar el error del EDocument {edoc.get('numero_edocument')} en la API")
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -3,13 +3,18 @@ import logging
|
|||||||
import time
|
import time
|
||||||
from celery_app import celery_app
|
from celery_app import celery_app
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
|
from fastapi import HTTPException as _HTTPException
|
||||||
|
|
||||||
from .services import obtener_edoc
|
from .services import obtener_edoc, marcar_error_edocument
|
||||||
from api.api_v2.modules.tasks.services import register_task, update_task
|
from api.api_v2.modules.tasks.services import register_task, update_task
|
||||||
|
|
||||||
# Logger para el módulo
|
# Logger para el módulo
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Reintentos internos del worker: pertenecen al MISMO intento orquestado y no
|
||||||
|
# incrementan el contador de intentos del backend (T2026-05-027)
|
||||||
|
WORKER_MAX_RETRIES = 2
|
||||||
|
|
||||||
@celery_app.task(bind=True)
|
@celery_app.task(bind=True)
|
||||||
def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
||||||
"""
|
"""
|
||||||
@@ -28,7 +33,8 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
|||||||
asyncio.set_event_loop(loop)
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Registrar el inicio de la tarea
|
# Registrar el inicio de la tarea (solo en la primera ejecución, no en reintentos)
|
||||||
|
if self.request.retries == 0:
|
||||||
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
|
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
register_task(
|
register_task(
|
||||||
@@ -79,6 +85,21 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
|||||||
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
|
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
|
||||||
logger.error(error_message, exc_info=True)
|
logger.error(error_message, exc_info=True)
|
||||||
|
|
||||||
|
# Reintento interno del worker para fallos transitorios (red, timeout):
|
||||||
|
# mismo intento orquestado, NO incrementa el contador del backend
|
||||||
|
if not isinstance(e, _HTTPException) and self.request.retries < WORKER_MAX_RETRIES:
|
||||||
|
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
|
||||||
|
|
||||||
|
# Fallo definitivo de este intento: registrar el detalle en el registro de
|
||||||
|
# negocio. Permanece 'pendiente'; el tope de intentos automáticos del
|
||||||
|
# backend (MAX_INTENTOS_AUTO) gobierna la transición a 'error'.
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(
|
||||||
|
marcar_error_edocument(edoc_info, pedimento_info, error_message, definitivo=False)
|
||||||
|
)
|
||||||
|
except Exception as report_error:
|
||||||
|
logger.error(f"No se pudo registrar el error en el registro de negocio: {report_error}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
update_task(
|
update_task(
|
||||||
@@ -93,6 +114,8 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
|||||||
except Exception as update_error:
|
except Exception as update_error:
|
||||||
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||||
|
|
||||||
|
if isinstance(e, _HTTPException):
|
||||||
|
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
# Cerrar el loop para liberar recursos
|
# Cerrar el loop para liberar recursos
|
||||||
|
|||||||
@@ -95,7 +95,17 @@ async def consume_ws_get_partida(**kwargs):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Una partida solo cuenta como descargada si la respuesta es una
|
||||||
|
# consultarPartidaRespuesta real sin <tieneError>true</tieneError>.
|
||||||
|
# Cualquier otro contenido (eco de la petición, fault sin tieneError,
|
||||||
|
# HTML de proxy, etc.) se guarda como ERROR y no marca descargado.
|
||||||
|
motivo_error = None
|
||||||
if soap_error(soap_response):
|
if soap_error(soap_response):
|
||||||
|
motivo_error = "La respuesta contiene un error de VUCEM"
|
||||||
|
elif "consultarpartidarespuesta" not in (soap_response.text or "").lower():
|
||||||
|
motivo_error = "La respuesta no contiene el nodo consultarPartidaRespuesta"
|
||||||
|
|
||||||
|
if motivo_error:
|
||||||
error_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_ERROR.xml"
|
error_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_ERROR.xml"
|
||||||
try:
|
try:
|
||||||
document_response = await partida_rest_controller.post_or_update_document(
|
document_response = await partida_rest_controller.post_or_update_document(
|
||||||
@@ -110,12 +120,12 @@ async def consume_ws_get_partida(**kwargs):
|
|||||||
logger.error(f"Error al guardar la respuesta de error: {e}")
|
logger.error(f"Error al guardar la respuesta de error: {e}")
|
||||||
# Continuamos con el error original
|
# Continuamos con el error original
|
||||||
|
|
||||||
logger.error("Error en la respuesta del servicio SOAP")
|
logger.error(f"Error en la respuesta del servicio SOAP: {motivo_error}")
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=500,
|
status_code=500,
|
||||||
detail=create_error_response(
|
detail=create_error_response(
|
||||||
message="Error en la respuesta del servicio SOAP",
|
message="Error en la respuesta del servicio SOAP",
|
||||||
errors=["La respuesta contiene un error de VUCEM"],
|
errors=[motivo_error],
|
||||||
data={"soap_response": soap_response.text[:500] if hasattr(soap_response, 'text') else None},
|
data={"soap_response": soap_response.text[:500] if hasattr(soap_response, 'text') else None},
|
||||||
metadata={
|
metadata={
|
||||||
"partida_numero": partida.get('numero'),
|
"partida_numero": partida.get('numero'),
|
||||||
@@ -151,6 +161,22 @@ async def consume_ws_get_partida(**kwargs):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# post_or_update_document retorna None en error (no lanza excepción);
|
||||||
|
# sin documento guardado la partida NO debe marcarse como descargada.
|
||||||
|
if not document_response:
|
||||||
|
logger.error(f"El documento de la partida no se guardó, no se marca descargado: {_file_name}")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail=create_error_response(
|
||||||
|
message="Error al guardar el documento de la partida",
|
||||||
|
errors=["El API no confirmó el guardado del documento"],
|
||||||
|
metadata={
|
||||||
|
"file_name": _file_name,
|
||||||
|
"partida_numero": partida.get('numero')
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
logger.info("Documento enviado, actualizando status de Partida")
|
logger.info("Documento enviado, actualizando status de Partida")
|
||||||
|
|
||||||
# Actualizar status de la partida
|
# Actualizar status de la partida
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import time
|
|||||||
from celery import Celery
|
from celery import Celery
|
||||||
from celery_app import celery_app
|
from celery_app import celery_app
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
|
from fastapi import HTTPException as _HTTPException
|
||||||
|
|
||||||
from .services import consume_ws_get_partida
|
from .services import consume_ws_get_partida
|
||||||
from api.api_v2.modules.tasks.services import register_task, update_task
|
from api.api_v2.modules.tasks.services import register_task, update_task
|
||||||
@@ -89,6 +90,8 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
|
|||||||
)
|
)
|
||||||
except Exception as update_error:
|
except Exception as update_error:
|
||||||
logger.error(f"Error al actualizar estado de tarea: {update_error}")
|
logger.error(f"Error al actualizar estado de tarea: {update_error}")
|
||||||
|
if isinstance(e, _HTTPException):
|
||||||
|
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
# Limpiar el event loop
|
# Limpiar el event loop
|
||||||
|
|||||||
@@ -259,13 +259,14 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
|
|||||||
def _remesas(self, root: ET.Element) -> bool:
|
def _remesas(self, root: ET.Element) -> bool:
|
||||||
"""
|
"""
|
||||||
Método para verificar si el pedimento tiene remesas.
|
Método para verificar si el pedimento tiene remesas.
|
||||||
Busca identificadores con clave 'RC' (REMESAS DE CONSOLIDADO).
|
Busca identificadores con clave 'PC' (PEDIMENTO CONSOLIDADO)
|
||||||
|
o 'RC' (REMESAS DE CONSOLIDADO).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
root: Elemento raíz del XML.
|
root: Elemento raíz del XML.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True si encuentra identificadores con clave 'RC', False en caso contrario.
|
True si encuentra identificadores con clave 'PC' o 'RC', False en caso contrario.
|
||||||
"""
|
"""
|
||||||
namespaces = {
|
namespaces = {
|
||||||
'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto',
|
'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto',
|
||||||
@@ -281,8 +282,8 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
|
|||||||
clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces)
|
clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces)
|
||||||
clave = clave_elem.text if clave_elem is not None else None
|
clave = clave_elem.text if clave_elem is not None else None
|
||||||
|
|
||||||
# Si encontramos una clave 'RC', el pedimento tiene remesas
|
# PC (consolidado) o RC (remesas de consolidado) indican remesas
|
||||||
if clave == 'RC':
|
if clave in ('PC', 'RC'):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -290,7 +291,7 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
|
|||||||
print(f"Error procesando identificador para remesas: {e}")
|
print(f"Error procesando identificador para remesas: {e}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
print("No se encontraron remesas (sin identificadores RC)")
|
print("No se encontraron remesas (sin identificadores PC/RC)")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _get_tipo_operacion(self, root: ET.Element) -> str:
|
def _get_tipo_operacion(self, root: ET.Element) -> str:
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ from celery_app import celery_app
|
|||||||
from .services import put_pedimento_data
|
from .services import put_pedimento_data
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
from fastapi import HTTPException as _HTTPException
|
||||||
from ..tasks.services import register_task, update_task
|
from ..tasks.services import register_task, update_task
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -86,6 +87,8 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
|
|||||||
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||||
|
|
||||||
# Re-lanzar la excepción para que Celery la marque como fallida
|
# Re-lanzar la excepción para que Celery la marque como fallida
|
||||||
|
if isinstance(e, _HTTPException):
|
||||||
|
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
loop.close()
|
loop.close()
|
||||||
@@ -87,6 +87,31 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
|
|||||||
)
|
)
|
||||||
# Generar nombre de archivo
|
# Generar nombre de archivo
|
||||||
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml"
|
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml"
|
||||||
|
|
||||||
|
# "No hay información" NO es un error — VUCEM confirma que el pedimento no tiene remesas.
|
||||||
|
# No se crea documento de error; se corrige el flag en el pedimento.
|
||||||
|
_SIN_REMESAS = '<ns2:mensaje>No hay información para la búsqueda solicitada</ns2:mensaje>'
|
||||||
|
if hasattr(soap_response, 'text') and _SIN_REMESAS in soap_response.text:
|
||||||
|
pedimento_id = pedimento_data.get('id')
|
||||||
|
logger.info(
|
||||||
|
f"Pedimento {pedimento_data.get('pedimento_app')} no tiene remesas "
|
||||||
|
f"(confirmado por VUCEM). Corrigiendo flag remesas=False."
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
await remesa_rest_controller._make_request_async(
|
||||||
|
'PATCH',
|
||||||
|
f'customs/pedimentos/{pedimento_id}/',
|
||||||
|
data={'remesas': False},
|
||||||
|
)
|
||||||
|
except Exception as patch_err:
|
||||||
|
logger.warning(f"No se pudo actualizar remesas=False para {pedimento_id}: {patch_err}")
|
||||||
|
|
||||||
|
return create_service_response(
|
||||||
|
message="Pedimento sin remesas confirmado por VUCEM",
|
||||||
|
data={"remesas": False},
|
||||||
|
metadata={"pedimento_app": pedimento_data.get('pedimento_app')},
|
||||||
|
)
|
||||||
|
|
||||||
if soap_error(soap_response):
|
if soap_error(soap_response):
|
||||||
|
|
||||||
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
|
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
|
||||||
@@ -98,25 +123,19 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
|
|||||||
document_type=16,
|
document_type=16,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Aquí necesitamos extraer el mensaje de error real
|
|
||||||
error_message = "Error en la respuesta del servicio SOAP"
|
error_message = "Error en la respuesta del servicio SOAP"
|
||||||
|
|
||||||
# Intentar extraer mensaje de error del XML de respuesta
|
|
||||||
if hasattr(soap_response, 'text') and soap_response.text:
|
if hasattr(soap_response, 'text') and soap_response.text:
|
||||||
try:
|
try:
|
||||||
import xml.etree.ElementTree as ET
|
import xml.etree.ElementTree as ET
|
||||||
root = ET.fromstring(soap_response.text)
|
root = ET.fromstring(soap_response.text)
|
||||||
|
|
||||||
# Buscar mensajes de error comunes en respuestas SOAP de VUCEM
|
|
||||||
# Esto puede variar según el servicio, pero comúnmente buscan:
|
|
||||||
for fault in root.findall('.//{http://schemas.xmlsoap.org/soap/envelope/}Fault'):
|
for fault in root.findall('.//{http://schemas.xmlsoap.org/soap/envelope/}Fault'):
|
||||||
faultcode = fault.find('.//faultcode')
|
|
||||||
faultstring = fault.find('.//faultstring')
|
faultstring = fault.find('.//faultstring')
|
||||||
if faultstring is not None and faultstring.text:
|
if faultstring is not None and faultstring.text:
|
||||||
error_message = faultstring.text
|
error_message = faultstring.text
|
||||||
break
|
break
|
||||||
|
|
||||||
# También podría estar en una estructura de error específica de VUCEM
|
|
||||||
for error in root.findall('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}error'):
|
for error in root.findall('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}error'):
|
||||||
msg = error.find('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}message')
|
msg = error.find('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}message')
|
||||||
if msg is not None and msg.text:
|
if msg is not None and msg.text:
|
||||||
@@ -126,7 +145,6 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
|
|||||||
except Exception as parse_error:
|
except Exception as parse_error:
|
||||||
logger.error(f"Error al parsear respuesta SOAP para extraer mensaje: {parse_error}")
|
logger.error(f"Error al parsear respuesta SOAP para extraer mensaje: {parse_error}")
|
||||||
|
|
||||||
# Lanzar excepción con el mensaje de error real
|
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=500,
|
status_code=500,
|
||||||
detail=f"Error en la respuesta del servicio SOAP: {error_message}"
|
detail=f"Error en la respuesta del servicio SOAP: {error_message}"
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ from celery_app import celery_app
|
|||||||
from .services import post_remesa_data
|
from .services import post_remesa_data
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
from fastapi import HTTPException as _HTTPException
|
||||||
from ..tasks.services import register_task, update_task
|
from ..tasks.services import register_task, update_task
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -98,6 +99,8 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
|
|||||||
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||||
|
|
||||||
# Re-lanzar la excepción para que Celery la marque como fallida
|
# Re-lanzar la excepción para que Celery la marque como fallida
|
||||||
|
if isinstance(e, _HTTPException):
|
||||||
|
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
0
api/api_v2/modules/stream/__init__.py
Normal file
0
api/api_v2/modules/stream/__init__.py
Normal file
97
api/api_v2/modules/stream/routers.py
Normal file
97
api/api_v2/modules/stream/routers.py
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
import redis.asyncio as aioredis
|
||||||
|
from fastapi import APIRouter, Request
|
||||||
|
from fastapi.responses import StreamingResponse
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CHANNEL_PREFIX = "audit_task:"
|
||||||
|
STATE_PREFIX = "audit_task_state:"
|
||||||
|
HEARTBEAT_INTERVAL = 25 # segundos
|
||||||
|
MAX_STREAM_SECONDS = 7200 # 2 horas
|
||||||
|
|
||||||
|
|
||||||
|
def _redis_url() -> str:
|
||||||
|
# Usa el Redis compartido con Django backend para leer los eventos publicados por sus tasks.
|
||||||
|
host = os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost"))
|
||||||
|
port = os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", "6379"))
|
||||||
|
db = os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", "0"))
|
||||||
|
return f"redis://{host}:{port}/{db}"
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/stream/tasks/{task_id}")
|
||||||
|
async def stream_task_events(task_id: str, request: Request):
|
||||||
|
"""
|
||||||
|
SSE endpoint — el cliente se conecta y recibe eventos de progreso de la tarea
|
||||||
|
en tiempo real vía Redis Pub/Sub.
|
||||||
|
|
||||||
|
Cabeceras requeridas en Nginx upstream:
|
||||||
|
proxy_read_timeout 7200;
|
||||||
|
proxy_buffering off;
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def event_generator():
|
||||||
|
r = aioredis.from_url(_redis_url(), decode_responses=True)
|
||||||
|
pubsub = r.pubsub()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Enviar estado actual si ya existe (cliente que llega tarde)
|
||||||
|
current_raw = await r.get(f"{STATE_PREFIX}{task_id}")
|
||||||
|
if current_raw:
|
||||||
|
current = json.loads(current_raw)
|
||||||
|
yield f"event: task_update\ndata: {current_raw}\n\n"
|
||||||
|
if current.get("status") in ("completed", "failed"):
|
||||||
|
return
|
||||||
|
|
||||||
|
await pubsub.subscribe(f"{CHANNEL_PREFIX}{task_id}")
|
||||||
|
|
||||||
|
tick = 0
|
||||||
|
while tick < MAX_STREAM_SECONDS:
|
||||||
|
if await request.is_disconnected():
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
msg = await asyncio.wait_for(
|
||||||
|
pubsub.get_message(ignore_subscribe_messages=True),
|
||||||
|
timeout=1.0,
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
msg = None
|
||||||
|
|
||||||
|
if msg and msg["type"] == "message":
|
||||||
|
data_str = msg["data"]
|
||||||
|
yield f"event: task_update\ndata: {data_str}\n\n"
|
||||||
|
parsed = json.loads(data_str)
|
||||||
|
if parsed.get("status") in ("completed", "failed"):
|
||||||
|
break
|
||||||
|
|
||||||
|
tick += 1
|
||||||
|
if tick % HEARTBEAT_INTERVAL == 0:
|
||||||
|
yield f"event: heartbeat\ndata: {{}}\n\n"
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error(f"[stream] Error en SSE para tarea {task_id}: {exc}")
|
||||||
|
yield f"event: error\ndata: {json.dumps({'message': str(exc)})}\n\n"
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
await pubsub.unsubscribe(f"{CHANNEL_PREFIX}{task_id}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
await r.aclose()
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
event_generator(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"X-Accel-Buffering": "no", # deshabilita buffering en Nginx
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
},
|
||||||
|
)
|
||||||
@@ -4,6 +4,8 @@ from typing import Dict, Any
|
|||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
from ..common import create_error_response
|
from ..common import create_error_response
|
||||||
from core.config import settings
|
from core.config import settings
|
||||||
|
from core.redis_events import publish_task_event
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
async def update_task(
|
async def update_task(
|
||||||
@@ -63,6 +65,7 @@ async def update_task(
|
|||||||
)
|
)
|
||||||
|
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
publish_task_event(task_id, status, message)
|
||||||
return response.json()
|
return response.json()
|
||||||
|
|
||||||
except httpx.HTTPError as e:
|
except httpx.HTTPError as e:
|
||||||
|
|||||||
@@ -58,6 +58,8 @@ class APIRESTController:
|
|||||||
response = await client.post(url, json=data, headers=self.headers)
|
response = await client.post(url, json=data, headers=self.headers)
|
||||||
elif method.upper() == 'PUT':
|
elif method.upper() == 'PUT':
|
||||||
response = await client.put(url, json=data, headers=self.headers)
|
response = await client.put(url, json=data, headers=self.headers)
|
||||||
|
elif method.upper() == 'PATCH':
|
||||||
|
response = await client.patch(url, json=data, headers=self.headers)
|
||||||
elif method.upper() == 'DELETE':
|
elif method.upper() == 'DELETE':
|
||||||
response = await client.delete(url, headers=self.headers)
|
response = await client.delete(url, headers=self.headers)
|
||||||
else:
|
else:
|
||||||
@@ -98,31 +100,18 @@ class APIRESTController:
|
|||||||
identifier: str = None, binary_content: bytes = None,
|
identifier: str = None, binary_content: bytes = None,
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Guarda un documento VU (request o error). Si ya existe uno del mismo
|
Guarda un documento VU (request, error o respuesta). Crea PRIMERO el
|
||||||
tipo/pedimento/identificador, elimina el anterior y crea el nuevo,
|
documento nuevo y, solo si se guardó con éxito, elimina los previos del
|
||||||
evitando duplicados en ejecuciones recurrentes (tarea programada diaria).
|
mismo tipo/pedimento/identificador. Así se evitan duplicados en
|
||||||
|
ejecuciones recurrentes sin perder el documento anterior cuando el
|
||||||
|
guardado del nuevo falla (antes se borraba el previo antes del POST).
|
||||||
|
|
||||||
identifier: cadena única del documento dentro del pedimento
|
identifier: cadena única del documento dentro del pedimento
|
||||||
(ej: número de COVE, número de e-document). Se usa como
|
(ej: número de COVE, número de e-document). Se usa como
|
||||||
filtro archivo__icontains para distinguir documentos del
|
filtro archivo__icontains para distinguir documentos del
|
||||||
mismo tipo pertenecientes a distintos COVEs o edocs.
|
mismo tipo pertenecientes a distintos COVEs o edocs.
|
||||||
"""
|
"""
|
||||||
try:
|
new_document = await self.post_document(
|
||||||
query = f'record/documents/?pedimento={pedimento}&document_type={document_type}'
|
|
||||||
if identifier:
|
|
||||||
query += f'&archivo__icontains={identifier}'
|
|
||||||
existing = await self._make_request_async('GET', query)
|
|
||||||
if existing:
|
|
||||||
results = existing.get('results', existing) if isinstance(existing, dict) else existing
|
|
||||||
if isinstance(results, list) and results:
|
|
||||||
existing_id = results[0].get('id')
|
|
||||||
if existing_id:
|
|
||||||
await self._make_request_async('DELETE', f'record/documents/{existing_id}/')
|
|
||||||
logger.info(f"Documento VU previo eliminado: id={existing_id}, document_type={document_type}, identifier={identifier}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"No se pudo verificar/eliminar documento VU existente (document_type={document_type}, identifier={identifier}): {e}")
|
|
||||||
|
|
||||||
return await self.post_document(
|
|
||||||
soap_response=soap_response,
|
soap_response=soap_response,
|
||||||
binary_content=binary_content,
|
binary_content=binary_content,
|
||||||
organizacion=organizacion,
|
organizacion=organizacion,
|
||||||
@@ -131,6 +120,33 @@ class APIRESTController:
|
|||||||
document_type=document_type,
|
document_type=document_type,
|
||||||
fuente=fuente,
|
fuente=fuente,
|
||||||
)
|
)
|
||||||
|
if not new_document:
|
||||||
|
# POST falló: conservar los documentos previos intactos
|
||||||
|
logger.error(f"post_or_update_document: no se guardó el documento nuevo (document_type={document_type}, identifier={identifier}); se conservan los previos")
|
||||||
|
return new_document
|
||||||
|
|
||||||
|
new_id = new_document.get('id') if isinstance(new_document, dict) else None
|
||||||
|
if not new_id:
|
||||||
|
logger.warning(f"post_or_update_document: respuesta sin id, se omite limpieza de previos (document_type={document_type}, identifier={identifier})")
|
||||||
|
return new_document
|
||||||
|
|
||||||
|
try:
|
||||||
|
query = f'record/documents/?pedimento={pedimento}&document_type={document_type}'
|
||||||
|
if identifier:
|
||||||
|
query += f'&archivo__icontains={identifier}'
|
||||||
|
existing = await self._make_request_async('GET', query)
|
||||||
|
if existing:
|
||||||
|
results = existing.get('results', existing) if isinstance(existing, dict) else existing
|
||||||
|
if isinstance(results, list):
|
||||||
|
for previous in results:
|
||||||
|
previous_id = previous.get('id')
|
||||||
|
if previous_id and previous_id != new_id:
|
||||||
|
await self._make_request_async('DELETE', f'record/documents/{previous_id}/')
|
||||||
|
logger.info(f"Documento VU previo eliminado: id={previous_id}, document_type={document_type}, identifier={identifier}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"No se pudo verificar/eliminar documento VU existente (document_type={document_type}, identifier={identifier}): {e}")
|
||||||
|
|
||||||
|
return new_document
|
||||||
|
|
||||||
async def put_procesamiento(self, service_id: int, data: Dict[str, Any]) -> Dict[str, Any]:
|
async def put_procesamiento(self, service_id: int, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
return await self._make_request_async('PUT', f'customs/procesamientopedimentos/{service_id}/', data=data)
|
return await self._make_request_async('PUT', f'customs/procesamientopedimentos/{service_id}/', data=data)
|
||||||
@@ -554,6 +570,8 @@ class APIController:
|
|||||||
print(f"response >>>> {response}")
|
print(f"response >>>> {response}")
|
||||||
elif method.upper() == 'PUT':
|
elif method.upper() == 'PUT':
|
||||||
response = await client.put(url, json=data, headers=self.headers)
|
response = await client.put(url, json=data, headers=self.headers)
|
||||||
|
elif method.upper() == 'PATCH':
|
||||||
|
response = await client.patch(url, json=data, headers=self.headers)
|
||||||
elif method.upper() == 'DELETE':
|
elif method.upper() == 'DELETE':
|
||||||
response = await client.delete(url, headers=self.headers)
|
response = await client.delete(url, headers=self.headers)
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -135,13 +135,14 @@ class XMLScraper: # Clase me extrae datos de Pedimento
|
|||||||
def _remesas(self, root: ET.Element) -> bool:
|
def _remesas(self, root: ET.Element) -> bool:
|
||||||
"""
|
"""
|
||||||
Método para verificar si el pedimento tiene remesas.
|
Método para verificar si el pedimento tiene remesas.
|
||||||
Busca identificadores con clave 'RC' (REMESAS DE CONSOLIDADO).
|
Busca identificadores con clave 'PC' (PEDIMENTO CONSOLIDADO)
|
||||||
|
o 'RC' (REMESAS DE CONSOLIDADO).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
root: Elemento raíz del XML.
|
root: Elemento raíz del XML.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
True si encuentra identificadores con clave 'RC', False en caso contrario.
|
True si encuentra identificadores con clave 'PC' o 'RC', False en caso contrario.
|
||||||
"""
|
"""
|
||||||
namespaces = {
|
namespaces = {
|
||||||
'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto',
|
'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto',
|
||||||
@@ -157,8 +158,8 @@ class XMLScraper: # Clase me extrae datos de Pedimento
|
|||||||
clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces)
|
clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces)
|
||||||
clave = clave_elem.text if clave_elem is not None else None
|
clave = clave_elem.text if clave_elem is not None else None
|
||||||
|
|
||||||
# Si encontramos una clave 'RC', el pedimento tiene remesas
|
# PC (consolidado) o RC (remesas de consolidado) indican remesas
|
||||||
if clave == 'RC':
|
if clave in ('PC', 'RC'):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -166,7 +167,7 @@ class XMLScraper: # Clase me extrae datos de Pedimento
|
|||||||
print(f"Error procesando identificador para remesas: {e}")
|
print(f"Error procesando identificador para remesas: {e}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
print("No se encontraron remesas (sin identificadores RC)")
|
print("No se encontraron remesas (sin identificadores PC/RC)")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _get_tipo_operacion(self, root: ET.Element) -> str:
|
def _get_tipo_operacion(self, root: ET.Element) -> str:
|
||||||
|
|||||||
44
core/redis_events.py
Normal file
44
core/redis_events.py
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
import redis
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CHANNEL_PREFIX = "audit_task:"
|
||||||
|
STATE_PREFIX = "audit_task_state:"
|
||||||
|
STATE_TTL = 7200 # 2 horas
|
||||||
|
|
||||||
|
|
||||||
|
def _get_sync_redis():
|
||||||
|
# REDIS_PUBSUB_HOST apunta al Redis compartido con el backend Django.
|
||||||
|
# En dev: redis_backend_dev. Por defecto usa REDIS_HOST (para entornos con una sola instancia).
|
||||||
|
return redis.Redis(
|
||||||
|
host=os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost")),
|
||||||
|
port=int(os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", 6379))),
|
||||||
|
db=int(os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", 0))),
|
||||||
|
decode_responses=True,
|
||||||
|
socket_connect_timeout=2,
|
||||||
|
socket_timeout=2,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def publish_task_event(task_id: str, status: str, message: str = "", resultado: dict = None, progress: int = None):
|
||||||
|
"""
|
||||||
|
Publica un evento de progreso de tarea en Redis Pub/Sub.
|
||||||
|
Guarda el último estado en una key con TTL para clientes que se conectan tarde.
|
||||||
|
"""
|
||||||
|
payload: dict = {"task_id": task_id, "status": status, "message": message}
|
||||||
|
if resultado is not None:
|
||||||
|
payload["resultado"] = resultado
|
||||||
|
if progress is not None:
|
||||||
|
payload["progress"] = progress
|
||||||
|
|
||||||
|
try:
|
||||||
|
client = _get_sync_redis()
|
||||||
|
serialized = json.dumps(payload)
|
||||||
|
client.publish(f"{CHANNEL_PREFIX}{task_id}", serialized)
|
||||||
|
client.setex(f"{STATE_PREFIX}{task_id}", STATE_TTL, serialized)
|
||||||
|
client.close()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error(f"[redis_events] Error publicando evento para tarea {task_id}: {exc}")
|
||||||
@@ -8,9 +8,8 @@ def soap_error(soap_response): # Testeado
|
|||||||
Returns:
|
Returns:
|
||||||
bool: True si no hay errores, False en caso contrario
|
bool: True si no hay errores, False en caso contrario
|
||||||
"""
|
"""
|
||||||
if '<ns2:tieneError>true</ns2:tieneError>' in soap_response.text:
|
# Cubre cualquier variante de namespace y capitalización (tieneError, TieneError, ns2:tieneError, etc.)
|
||||||
return True
|
if 'tieneerror>true<' in soap_response.text.lower():
|
||||||
if '<ns3:tieneError>true</ns3:tieneError>' in soap_response.text:
|
|
||||||
return True
|
return True
|
||||||
if "<mensaje>El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo</mensaje>" in soap_response.text:
|
if "<mensaje>El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo</mensaje>" in soap_response.text:
|
||||||
return True
|
return True
|
||||||
|
|||||||
Reference in New Issue
Block a user