Compare commits
7 Commits
feature/T2
...
042d02e240
| Author | SHA1 | Date | |
|---|---|---|---|
| 042d02e240 | |||
| 288a96bc59 | |||
| 9f59ac0d00 | |||
| fe8e7dc10f | |||
| a6ce91d8af | |||
| e174df0af3 | |||
| 3c10653d6a |
@@ -9,6 +9,7 @@ from api.api_v2.modules.edocs.routers import router as edocs_router
|
||||
from api.api_v2.modules.partidas.routers import router as partidas_router
|
||||
from api.api_v2.modules.pedimentos.routers import router as pedimentos_router
|
||||
from api.api_v2.modules.remesas.routers import router as remesas_router
|
||||
from api.api_v2.modules.stream.routers import router as stream_router
|
||||
|
||||
api_router = APIRouter()
|
||||
|
||||
@@ -20,3 +21,4 @@ api_router.include_router(edocs_router, tags=["EDocuments"])
|
||||
api_router.include_router(partidas_router, tags=["Partidas"])
|
||||
api_router.include_router(pedimentos_router, tags=["Pedimentos"])
|
||||
api_router.include_router(remesas_router, tags=["Remesas"])
|
||||
api_router.include_router(stream_router, tags=["Stream"])
|
||||
|
||||
@@ -145,12 +145,13 @@ async def obtener_acuse(**kwargs):
|
||||
organizacion = pedimento.get("organizacion", None)
|
||||
pedimento_id = pedimento.get("id", None)
|
||||
|
||||
rest_response = await acuse_rest_controller.post_document(
|
||||
rest_response = await acuse_rest_controller.post_or_update_document(
|
||||
binary_content=pdf_bytes,
|
||||
organizacion=organizacion,
|
||||
pedimento=pedimento_id,
|
||||
file_name=_file_name,
|
||||
document_type=4
|
||||
document_type=4,
|
||||
identifier=idEdocument_efc,
|
||||
)
|
||||
|
||||
if rest_response is None:
|
||||
|
||||
@@ -134,12 +134,13 @@ async def consume_ws_get_cove(**kwargs):
|
||||
# Enviar documento
|
||||
_file_name = f"vu_COVE_{pedimento_app}_{cove}.xml"
|
||||
try:
|
||||
document_response = await coves_rest_controller.post_document(
|
||||
document_response = await coves_rest_controller.post_or_update_document(
|
||||
soap_response=soap_response,
|
||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||
pedimento=kwargs.get('pedimento').get('id'),
|
||||
file_name=_file_name,
|
||||
document_type=8,
|
||||
identifier=cove,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error detectado en la respuesta SOAP: {str(e)}")
|
||||
@@ -318,12 +319,14 @@ async def consume_ws_get_acuse_cove(**kwargs):
|
||||
organizacion = pedimento.get("organizacion", None)
|
||||
pedimento_id = pedimento.get("id", None)
|
||||
|
||||
rest_response = await coves_rest_controller.post_document(
|
||||
cove_identifier = kwargs['cove'].get('cove', '')
|
||||
rest_response = await coves_rest_controller.post_or_update_document(
|
||||
binary_content=pdf_bytes,
|
||||
organizacion=organizacion,
|
||||
pedimento=pedimento_id,
|
||||
file_name=_file_name,
|
||||
document_type=7
|
||||
document_type=7,
|
||||
identifier=cove_identifier,
|
||||
)
|
||||
|
||||
acuse_status = await change_acuse_status(
|
||||
|
||||
@@ -3,6 +3,7 @@ import logging
|
||||
from celery import Celery
|
||||
from celery_app import celery_app
|
||||
from typing import Dict, Any
|
||||
from fastapi import HTTPException as _HTTPException
|
||||
|
||||
from .services import consume_ws_get_cove, consume_ws_get_acuse_cove
|
||||
from api.api_v2.modules.tasks.services import update_task, register_task
|
||||
@@ -83,7 +84,9 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
||||
)
|
||||
except Exception as update_error:
|
||||
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
|
||||
|
||||
|
||||
if isinstance(e, _HTTPException):
|
||||
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||
raise
|
||||
|
||||
|
||||
@@ -159,5 +162,7 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
||||
)
|
||||
except Exception as update_error:
|
||||
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
|
||||
|
||||
|
||||
if isinstance(e, _HTTPException):
|
||||
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||
raise
|
||||
@@ -203,12 +203,13 @@ async def obtener_edoc(**kwargs):
|
||||
# No guardaremos el archivo localmente por seguridad
|
||||
logger.debug(f"Procesando documento {numero_documento} para pedimento {pedimento_id}")
|
||||
|
||||
rest_response = await edocs_rest_controller.post_document(
|
||||
rest_response = await edocs_rest_controller.post_or_update_document(
|
||||
binary_content=pdf_bytes,
|
||||
organizacion=organizacion,
|
||||
pedimento=pedimento_id,
|
||||
file_name=_file_name,
|
||||
document_type=5
|
||||
document_type=5,
|
||||
identifier=numero_documento,
|
||||
)
|
||||
|
||||
print(f"rest_response >>>> {rest_response}")
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from celery_app import celery_app
|
||||
from celery_app import celery_app
|
||||
from typing import Dict
|
||||
from fastapi import HTTPException as _HTTPException
|
||||
|
||||
from .services import obtener_edoc
|
||||
from .services import obtener_edoc
|
||||
from api.api_v2.modules.tasks.services import register_task, update_task
|
||||
|
||||
# Logger para el módulo
|
||||
@@ -92,7 +93,9 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
||||
)
|
||||
except Exception as update_error:
|
||||
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||
|
||||
|
||||
if isinstance(e, _HTTPException):
|
||||
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||
raise
|
||||
finally:
|
||||
# Cerrar el loop para liberar recursos
|
||||
|
||||
@@ -59,12 +59,13 @@ async def consume_ws_get_partida(**kwargs):
|
||||
|
||||
file_name_request = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_REQUEST.xml"
|
||||
|
||||
document_response = await partida_rest_controller.post_document(
|
||||
document_response = await partida_rest_controller.post_or_update_document(
|
||||
soap_response=soap_xml,
|
||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||
pedimento=kwargs.get('pedimento').get('id'),
|
||||
file_name=file_name_request,
|
||||
document_type=17, # Tipo de documento para petición de partidas
|
||||
document_type=17,
|
||||
identifier=f"_PT_{pedimento_app}_{partida.get('numero', '')}_",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error al enviar documento request: {e}")
|
||||
@@ -97,12 +98,13 @@ async def consume_ws_get_partida(**kwargs):
|
||||
if soap_error(soap_response):
|
||||
error_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_ERROR.xml"
|
||||
try:
|
||||
document_response = await partida_rest_controller.post_document(
|
||||
document_response = await partida_rest_controller.post_or_update_document(
|
||||
soap_response=soap_response,
|
||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||
pedimento=kwargs.get('pedimento').get('id'),
|
||||
file_name=error_file_name,
|
||||
document_type=18,
|
||||
identifier=f"_PT_{pedimento_app}_{partida.get('numero', '')}_",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error al guardar la respuesta de error: {e}")
|
||||
@@ -127,12 +129,13 @@ async def consume_ws_get_partida(**kwargs):
|
||||
# Enviar documento
|
||||
_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}.xml"
|
||||
try:
|
||||
document_response = await partida_rest_controller.post_document(
|
||||
document_response = await partida_rest_controller.post_or_update_document(
|
||||
soap_response=soap_response,
|
||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||
pedimento=kwargs.get('pedimento').get('id'),
|
||||
file_name=_file_name,
|
||||
document_type=1,
|
||||
document_type=1,
|
||||
identifier=f"_PT_{pedimento_app}_{partida.get('numero', '')}_",
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error al enviar documento: {e}")
|
||||
|
||||
@@ -4,6 +4,7 @@ import time
|
||||
from celery import Celery
|
||||
from celery_app import celery_app
|
||||
from typing import Dict, Any
|
||||
from fastapi import HTTPException as _HTTPException
|
||||
|
||||
from .services import consume_ws_get_partida
|
||||
from api.api_v2.modules.tasks.services import register_task, update_task
|
||||
@@ -89,6 +90,8 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
|
||||
)
|
||||
except Exception as update_error:
|
||||
logger.error(f"Error al actualizar estado de tarea: {update_error}")
|
||||
if isinstance(e, _HTTPException):
|
||||
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||
raise
|
||||
finally:
|
||||
# Limpiar el event loop
|
||||
|
||||
@@ -64,12 +64,12 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
|
||||
|
||||
file_name_request = f"VU_PC_{pedimento_data.get('pedimento_app', 'unknown')}_REQUEST.xml"
|
||||
|
||||
document_response = await pedimento_rest_controller.post_document(
|
||||
document_response = await pedimento_rest_controller.post_or_update_document(
|
||||
soap_response=soap_xml,
|
||||
organizacion=pedimento_data.get('organizacion'),
|
||||
pedimento=pedimento_data.get('id'),
|
||||
file_name=file_name_request,
|
||||
document_type=13,
|
||||
document_type=13,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error al enviar documento request: {e}")
|
||||
@@ -90,12 +90,12 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
|
||||
|
||||
if soap_error(soap_response):
|
||||
logger.error(f"Error en respuesta SOAP: {soap_response.text if hasattr(soap_response, 'text') else 'Sin detalles'}")
|
||||
document_response = await pedimento_rest_controller.post_document(
|
||||
soap_response=None,
|
||||
document_response = await pedimento_rest_controller.post_or_update_document(
|
||||
soap_response=soap_response,
|
||||
organizacion=pedimento_data.get('organizacion'),
|
||||
pedimento=pedimento_data.get('id'),
|
||||
file_name=f"vu_PC_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml",
|
||||
document_type=14,
|
||||
document_type=14,
|
||||
)
|
||||
raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP")
|
||||
|
||||
@@ -111,12 +111,12 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
|
||||
|
||||
# Enviar documento
|
||||
try:
|
||||
document_response = await pedimento_rest_controller.post_document(
|
||||
document_response = await pedimento_rest_controller.post_or_update_document(
|
||||
soap_response=soap_response,
|
||||
organizacion=pedimento_data.get('organizacion'),
|
||||
pedimento=pedimento_data.get('id'),
|
||||
file_name=file_name,
|
||||
document_type=2,
|
||||
document_type=2,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error al enviar documento: {e}")
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
from celery_app import celery_app
|
||||
from celery_app import celery_app
|
||||
from .services import put_pedimento_data
|
||||
import asyncio
|
||||
import logging
|
||||
from fastapi import HTTPException as _HTTPException
|
||||
from ..tasks.services import register_task, update_task
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -84,8 +85,10 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
|
||||
)
|
||||
except Exception as update_error:
|
||||
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||
|
||||
|
||||
# Re-lanzar la excepción para que Celery la marque como fallida
|
||||
if isinstance(e, _HTTPException):
|
||||
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||
raise
|
||||
finally:
|
||||
loop.close()
|
||||
@@ -87,8 +87,33 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
|
||||
)
|
||||
# Generar nombre de archivo
|
||||
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml"
|
||||
|
||||
# "No hay información" NO es un error — VUCEM confirma que el pedimento no tiene remesas.
|
||||
# No se crea documento de error; se corrige el flag en el pedimento.
|
||||
_SIN_REMESAS = '<ns2:mensaje>No hay información para la búsqueda solicitada</ns2:mensaje>'
|
||||
if hasattr(soap_response, 'text') and _SIN_REMESAS in soap_response.text:
|
||||
pedimento_id = pedimento_data.get('id')
|
||||
logger.info(
|
||||
f"Pedimento {pedimento_data.get('pedimento_app')} no tiene remesas "
|
||||
f"(confirmado por VUCEM). Corrigiendo flag remesas=False."
|
||||
)
|
||||
try:
|
||||
await remesa_rest_controller._make_request_async(
|
||||
'PATCH',
|
||||
f'customs/pedimentos/{pedimento_id}/',
|
||||
data={'remesas': False},
|
||||
)
|
||||
except Exception as patch_err:
|
||||
logger.warning(f"No se pudo actualizar remesas=False para {pedimento_id}: {patch_err}")
|
||||
|
||||
return create_service_response(
|
||||
message="Pedimento sin remesas confirmado por VUCEM",
|
||||
data={"remesas": False},
|
||||
metadata={"pedimento_app": pedimento_data.get('pedimento_app')},
|
||||
)
|
||||
|
||||
if soap_error(soap_response):
|
||||
|
||||
|
||||
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
|
||||
document_response = await remesa_rest_controller.post_or_update_document(
|
||||
soap_response=soap_response,
|
||||
@@ -97,38 +122,31 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
|
||||
file_name=file_name,
|
||||
document_type=16,
|
||||
)
|
||||
|
||||
# Aquí necesitamos extraer el mensaje de error real
|
||||
|
||||
error_message = "Error en la respuesta del servicio SOAP"
|
||||
|
||||
# Intentar extraer mensaje de error del XML de respuesta
|
||||
|
||||
if hasattr(soap_response, 'text') and soap_response.text:
|
||||
try:
|
||||
import xml.etree.ElementTree as ET
|
||||
root = ET.fromstring(soap_response.text)
|
||||
|
||||
# Buscar mensajes de error comunes en respuestas SOAP de VUCEM
|
||||
# Esto puede variar según el servicio, pero comúnmente buscan:
|
||||
|
||||
for fault in root.findall('.//{http://schemas.xmlsoap.org/soap/envelope/}Fault'):
|
||||
faultcode = fault.find('.//faultcode')
|
||||
faultstring = fault.find('.//faultstring')
|
||||
if faultstring is not None and faultstring.text:
|
||||
error_message = faultstring.text
|
||||
break
|
||||
|
||||
# También podría estar en una estructura de error específica de VUCEM
|
||||
|
||||
for error in root.findall('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}error'):
|
||||
msg = error.find('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}message')
|
||||
if msg is not None and msg.text:
|
||||
error_message = msg.text
|
||||
break
|
||||
|
||||
|
||||
except Exception as parse_error:
|
||||
logger.error(f"Error al parsear respuesta SOAP para extraer mensaje: {parse_error}")
|
||||
|
||||
# Lanzar excepción con el mensaje de error real
|
||||
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
status_code=500,
|
||||
detail=f"Error en la respuesta del servicio SOAP: {error_message}"
|
||||
)
|
||||
# Enviar documento
|
||||
|
||||
@@ -2,6 +2,7 @@ from celery_app import celery_app
|
||||
from .services import post_remesa_data
|
||||
import asyncio
|
||||
import logging
|
||||
from fastapi import HTTPException as _HTTPException
|
||||
from ..tasks.services import register_task, update_task
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -98,6 +99,8 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
|
||||
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||
|
||||
# Re-lanzar la excepción para que Celery la marque como fallida
|
||||
if isinstance(e, _HTTPException):
|
||||
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||
raise
|
||||
|
||||
finally:
|
||||
|
||||
0
api/api_v2/modules/stream/__init__.py
Normal file
0
api/api_v2/modules/stream/__init__.py
Normal file
97
api/api_v2/modules/stream/routers.py
Normal file
97
api/api_v2/modules/stream/routers.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
import redis.asyncio as aioredis
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
router = APIRouter()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CHANNEL_PREFIX = "audit_task:"
|
||||
STATE_PREFIX = "audit_task_state:"
|
||||
HEARTBEAT_INTERVAL = 25 # segundos
|
||||
MAX_STREAM_SECONDS = 7200 # 2 horas
|
||||
|
||||
|
||||
def _redis_url() -> str:
|
||||
# Usa el Redis compartido con Django backend para leer los eventos publicados por sus tasks.
|
||||
host = os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost"))
|
||||
port = os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", "6379"))
|
||||
db = os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", "0"))
|
||||
return f"redis://{host}:{port}/{db}"
|
||||
|
||||
|
||||
@router.get("/stream/tasks/{task_id}")
|
||||
async def stream_task_events(task_id: str, request: Request):
|
||||
"""
|
||||
SSE endpoint — el cliente se conecta y recibe eventos de progreso de la tarea
|
||||
en tiempo real vía Redis Pub/Sub.
|
||||
|
||||
Cabeceras requeridas en Nginx upstream:
|
||||
proxy_read_timeout 7200;
|
||||
proxy_buffering off;
|
||||
"""
|
||||
|
||||
async def event_generator():
|
||||
r = aioredis.from_url(_redis_url(), decode_responses=True)
|
||||
pubsub = r.pubsub()
|
||||
|
||||
try:
|
||||
# Enviar estado actual si ya existe (cliente que llega tarde)
|
||||
current_raw = await r.get(f"{STATE_PREFIX}{task_id}")
|
||||
if current_raw:
|
||||
current = json.loads(current_raw)
|
||||
yield f"event: task_update\ndata: {current_raw}\n\n"
|
||||
if current.get("status") in ("completed", "failed"):
|
||||
return
|
||||
|
||||
await pubsub.subscribe(f"{CHANNEL_PREFIX}{task_id}")
|
||||
|
||||
tick = 0
|
||||
while tick < MAX_STREAM_SECONDS:
|
||||
if await request.is_disconnected():
|
||||
break
|
||||
|
||||
try:
|
||||
msg = await asyncio.wait_for(
|
||||
pubsub.get_message(ignore_subscribe_messages=True),
|
||||
timeout=1.0,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
msg = None
|
||||
|
||||
if msg and msg["type"] == "message":
|
||||
data_str = msg["data"]
|
||||
yield f"event: task_update\ndata: {data_str}\n\n"
|
||||
parsed = json.loads(data_str)
|
||||
if parsed.get("status") in ("completed", "failed"):
|
||||
break
|
||||
|
||||
tick += 1
|
||||
if tick % HEARTBEAT_INTERVAL == 0:
|
||||
yield f"event: heartbeat\ndata: {{}}\n\n"
|
||||
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as exc:
|
||||
logger.error(f"[stream] Error en SSE para tarea {task_id}: {exc}")
|
||||
yield f"event: error\ndata: {json.dumps({'message': str(exc)})}\n\n"
|
||||
finally:
|
||||
try:
|
||||
await pubsub.unsubscribe(f"{CHANNEL_PREFIX}{task_id}")
|
||||
except Exception:
|
||||
pass
|
||||
await r.aclose()
|
||||
|
||||
return StreamingResponse(
|
||||
event_generator(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"X-Accel-Buffering": "no", # deshabilita buffering en Nginx
|
||||
"Connection": "keep-alive",
|
||||
},
|
||||
)
|
||||
@@ -4,6 +4,8 @@ from typing import Dict, Any
|
||||
from fastapi import HTTPException
|
||||
from ..common import create_error_response
|
||||
from core.config import settings
|
||||
from core.redis_events import publish_task_event
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def update_task(
|
||||
@@ -61,8 +63,9 @@ async def update_task(
|
||||
return await _create_and_update_task(
|
||||
task_id, message, status, pedimento_id, organizacion_id, servicio
|
||||
)
|
||||
|
||||
|
||||
response.raise_for_status()
|
||||
publish_task_event(task_id, status, message)
|
||||
return response.json()
|
||||
|
||||
except httpx.HTTPError as e:
|
||||
|
||||
@@ -58,6 +58,8 @@ class APIRESTController:
|
||||
response = await client.post(url, json=data, headers=self.headers)
|
||||
elif method.upper() == 'PUT':
|
||||
response = await client.put(url, json=data, headers=self.headers)
|
||||
elif method.upper() == 'PATCH':
|
||||
response = await client.patch(url, json=data, headers=self.headers)
|
||||
elif method.upper() == 'DELETE':
|
||||
response = await client.delete(url, headers=self.headers)
|
||||
else:
|
||||
@@ -95,7 +97,7 @@ class APIRESTController:
|
||||
self, soap_response=None, organizacion: str = None,
|
||||
pedimento: str = None, file_name: str = None,
|
||||
document_type: int = None, fuente: int = 2,
|
||||
identifier: str = None,
|
||||
identifier: str = None, binary_content: bytes = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Guarda un documento VU (request o error). Si ya existe uno del mismo
|
||||
@@ -124,6 +126,7 @@ class APIRESTController:
|
||||
|
||||
return await self.post_document(
|
||||
soap_response=soap_response,
|
||||
binary_content=binary_content,
|
||||
organizacion=organizacion,
|
||||
pedimento=pedimento,
|
||||
file_name=file_name,
|
||||
@@ -553,6 +556,8 @@ class APIController:
|
||||
print(f"response >>>> {response}")
|
||||
elif method.upper() == 'PUT':
|
||||
response = await client.put(url, json=data, headers=self.headers)
|
||||
elif method.upper() == 'PATCH':
|
||||
response = await client.patch(url, json=data, headers=self.headers)
|
||||
elif method.upper() == 'DELETE':
|
||||
response = await client.delete(url, headers=self.headers)
|
||||
else:
|
||||
|
||||
44
core/redis_events.py
Normal file
44
core/redis_events.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import json
|
||||
import os
|
||||
import redis
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CHANNEL_PREFIX = "audit_task:"
|
||||
STATE_PREFIX = "audit_task_state:"
|
||||
STATE_TTL = 7200 # 2 horas
|
||||
|
||||
|
||||
def _get_sync_redis():
|
||||
# REDIS_PUBSUB_HOST apunta al Redis compartido con el backend Django.
|
||||
# En dev: redis_backend_dev. Por defecto usa REDIS_HOST (para entornos con una sola instancia).
|
||||
return redis.Redis(
|
||||
host=os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost")),
|
||||
port=int(os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", 6379))),
|
||||
db=int(os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", 0))),
|
||||
decode_responses=True,
|
||||
socket_connect_timeout=2,
|
||||
socket_timeout=2,
|
||||
)
|
||||
|
||||
|
||||
def publish_task_event(task_id: str, status: str, message: str = "", resultado: dict = None, progress: int = None):
|
||||
"""
|
||||
Publica un evento de progreso de tarea en Redis Pub/Sub.
|
||||
Guarda el último estado en una key con TTL para clientes que se conectan tarde.
|
||||
"""
|
||||
payload: dict = {"task_id": task_id, "status": status, "message": message}
|
||||
if resultado is not None:
|
||||
payload["resultado"] = resultado
|
||||
if progress is not None:
|
||||
payload["progress"] = progress
|
||||
|
||||
try:
|
||||
client = _get_sync_redis()
|
||||
serialized = json.dumps(payload)
|
||||
client.publish(f"{CHANNEL_PREFIX}{task_id}", serialized)
|
||||
client.setex(f"{STATE_PREFIX}{task_id}", STATE_TTL, serialized)
|
||||
client.close()
|
||||
except Exception as exc:
|
||||
logger.error(f"[redis_events] Error publicando evento para tarea {task_id}: {exc}")
|
||||
@@ -8,9 +8,8 @@ def soap_error(soap_response): # Testeado
|
||||
Returns:
|
||||
bool: True si no hay errores, False en caso contrario
|
||||
"""
|
||||
if '<ns2:tieneError>true</ns2:tieneError>' in soap_response.text:
|
||||
return True
|
||||
if '<ns3:tieneError>true</ns3:tieneError>' in soap_response.text:
|
||||
# Cubre cualquier variante de namespace y capitalización (tieneError, TieneError, ns2:tieneError, etc.)
|
||||
if 'tieneerror>true<' in soap_response.text.lower():
|
||||
return True
|
||||
if "<mensaje>El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo</mensaje>" in soap_response.text:
|
||||
return True
|
||||
|
||||
Reference in New Issue
Block a user