Compare commits
17 Commits
Fix--Audit
...
feature/co
| Author | SHA1 | Date | |
|---|---|---|---|
| 288a96bc59 | |||
| fe8e7dc10f | |||
| a6ce91d8af | |||
| e174df0af3 | |||
| 3c10653d6a | |||
|
|
f96e5a227b | ||
| ed00651a8b | |||
|
|
2e779e83f8 | ||
| 3cadcbd86f | |||
|
|
d29cfcb00c | ||
| 2aebef8b26 | |||
|
|
f9139c980a | ||
| 5c55e93d86 | |||
|
|
b0cc715eb3 | ||
| 5f41132f80 | |||
| d49747f288 | |||
| 47c8bf51c7 |
@@ -1,11 +1,11 @@
|
|||||||
from fastapi import APIRouter, HTTPException, BackgroundTasks
|
from fastapi import APIRouter, HTTPException, BackgroundTasks
|
||||||
from schemas.pedimentoSchema import PedimentoRequest
|
from schemas.pedimentoSchema import PedimentoRequest
|
||||||
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema
|
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
|
||||||
from fastapi.responses import JSONResponse
|
from fastapi.responses import JSONResponse
|
||||||
from celery_app import celery_app
|
from celery_app import celery_app
|
||||||
from tasks import pedimento_completo_task
|
from tasks import pedimento_completo_task
|
||||||
import logging
|
import logging
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any, List
|
||||||
import uuid
|
import uuid
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
@@ -64,6 +64,54 @@ async def async_get_pedimento_completo(request: ServiceRemesaSchema):
|
|||||||
detail=f"Error al agendar la tarea: {str(e)}"
|
detail=f"Error al agendar la tarea: {str(e)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@router.post("/async/services/pedimento_completo/multiple")
|
||||||
|
async def async_multi_pedimento_completo(request: MultiPedimentoCompletoSchema):
|
||||||
|
"""
|
||||||
|
Agenda la tarea de obtener pedimentos completos de forma asíncrona para MÚLTIPLES pedimentos.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request: MultiPedimentoCompletoSchema con lista de pedimentos y organización
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
JSONResponse con task_id para consultar el estado de la tarea
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException: En caso de errores de validación
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
pedimentos_ids = request.pedimentos
|
||||||
|
organizacion = request.organizacion
|
||||||
|
|
||||||
|
logger.info(f"Agendando tarea para {len(pedimentos_ids)} pedimentos completos")
|
||||||
|
|
||||||
|
# Agendar la tarea en Celery
|
||||||
|
from tasks import multi_pedimento_completo_task
|
||||||
|
task = multi_pedimento_completo_task.delay(pedimentos_ids, organizacion)
|
||||||
|
|
||||||
|
# Crear respuesta inmediata (EXACTAMENTE igual a tus otras respuestas)
|
||||||
|
response_data = {
|
||||||
|
"success": True,
|
||||||
|
"message": f"Tarea agendada exitosamente para {len(pedimentos_ids)} pedimentos. La consulta de pedimentos completos se está procesando en segundo plano.",
|
||||||
|
"task_id": task.id,
|
||||||
|
"total_pedimentos": len(pedimentos_ids),
|
||||||
|
"pedimentos": pedimentos_ids,
|
||||||
|
"organizacion": organizacion,
|
||||||
|
"status": "PENDING",
|
||||||
|
"timestamp": datetime.utcnow().isoformat(),
|
||||||
|
"estimated_completion": f"{len(pedimentos_ids) * 2}-{len(pedimentos_ids) * 5} minutos",
|
||||||
|
"check_status_url": f"/async/task-status/{task.id}"
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(f"Tarea múltiple agendada exitosamente - Task ID: {task.id}")
|
||||||
|
return JSONResponse(content=response_data, status_code=202)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error al agendar tarea múltiple de pedimento completo: {e}")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail=f"Error al agendar la tarea: {str(e)}"
|
||||||
|
)
|
||||||
|
|
||||||
@router.get("/async/task-status/{task_id}")
|
@router.get("/async/task-status/{task_id}")
|
||||||
async def get_task_status(task_id: str):
|
async def get_task_status(task_id: str):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
from fastapi import APIRouter, HTTPException
|
from fastapi import APIRouter, HTTPException
|
||||||
from schemas.pedimentoSchema import PedimentoRequest
|
from schemas.pedimentoSchema import PedimentoRequest
|
||||||
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema
|
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
logger = logging.getLogger("app.api")
|
logger = logging.getLogger("app.api")
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ from api.api_v2.modules.edocs.routers import router as edocs_router
|
|||||||
from api.api_v2.modules.partidas.routers import router as partidas_router
|
from api.api_v2.modules.partidas.routers import router as partidas_router
|
||||||
from api.api_v2.modules.pedimentos.routers import router as pedimentos_router
|
from api.api_v2.modules.pedimentos.routers import router as pedimentos_router
|
||||||
from api.api_v2.modules.remesas.routers import router as remesas_router
|
from api.api_v2.modules.remesas.routers import router as remesas_router
|
||||||
|
from api.api_v2.modules.stream.routers import router as stream_router
|
||||||
|
|
||||||
api_router = APIRouter()
|
api_router = APIRouter()
|
||||||
|
|
||||||
@@ -20,3 +21,4 @@ api_router.include_router(edocs_router, tags=["EDocuments"])
|
|||||||
api_router.include_router(partidas_router, tags=["Partidas"])
|
api_router.include_router(partidas_router, tags=["Partidas"])
|
||||||
api_router.include_router(pedimentos_router, tags=["Pedimentos"])
|
api_router.include_router(pedimentos_router, tags=["Pedimentos"])
|
||||||
api_router.include_router(remesas_router, tags=["Remesas"])
|
api_router.include_router(remesas_router, tags=["Remesas"])
|
||||||
|
api_router.include_router(stream_router, tags=["Stream"])
|
||||||
|
|||||||
@@ -33,12 +33,13 @@ async def obtener_acuse(**kwargs):
|
|||||||
|
|
||||||
file_name_request = f"vu_AC_{pedimento_app}_{idEdocument_efc}_REQUEST.xml"
|
file_name_request = f"vu_AC_{pedimento_app}_{idEdocument_efc}_REQUEST.xml"
|
||||||
|
|
||||||
document_response = await acuse_rest_controller.post_document(
|
document_response = await acuse_rest_controller.post_or_update_document(
|
||||||
soap_response=soap_xml,
|
soap_response=soap_xml,
|
||||||
organizacion=organizacion_efc,
|
organizacion=organizacion_efc,
|
||||||
pedimento=pedimento_id_efc,
|
pedimento=pedimento_id_efc,
|
||||||
file_name=file_name_request,
|
file_name=file_name_request,
|
||||||
document_type=25, # Tipo de documento para request de acuse VU
|
document_type=25,
|
||||||
|
identifier=idEdocument_efc,
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -72,11 +73,32 @@ async def obtener_acuse(**kwargs):
|
|||||||
|
|
||||||
if soap_error(response):
|
if soap_error(response):
|
||||||
logger.error("Error SOAP detectado en la respuesta")
|
logger.error("Error SOAP detectado en la respuesta")
|
||||||
|
pedimento_efc = kwargs.get('pedimento', {})
|
||||||
|
organizacion_efc = pedimento_efc.get("organizacion", None)
|
||||||
|
pedimento_id_efc = pedimento_efc.get("id", None)
|
||||||
|
pedimento_app = pedimento_efc.get('pedimento_app', 'N/A')
|
||||||
|
idEdocument_efc = kwargs['edoc'].get('numero_edocument', 'N/A')
|
||||||
|
file_name_response = f"vu_AC_{pedimento_app}_{idEdocument_efc}_RESPONSE_ERROR.xml"
|
||||||
|
|
||||||
|
logger.info(f"Guardando RESPONSE_ERROR: file={file_name_response}, organizacion={organizacion_efc}, pedimento={pedimento_id_efc}")
|
||||||
|
doc_result = await acuse_rest_controller.post_or_update_document(
|
||||||
|
soap_response=response.text,
|
||||||
|
organizacion=organizacion_efc,
|
||||||
|
pedimento=pedimento_id_efc,
|
||||||
|
file_name=file_name_response,
|
||||||
|
document_type=26,
|
||||||
|
identifier=idEdocument_efc,
|
||||||
|
)
|
||||||
|
if doc_result is None:
|
||||||
|
logger.error(f"post_document retornó None para RESPONSE_ERROR — archivo físico guardado sin registro en BD")
|
||||||
|
else:
|
||||||
|
logger.info(f"RESPONSE_ERROR registrado en BD: id={doc_result.get('id')}, document_type={doc_result.get('document_type')}")
|
||||||
|
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=500,
|
status_code=500,
|
||||||
detail=create_error_response(
|
detail=create_error_response(
|
||||||
message="Error en la respuesta del servicio SOAP",
|
message="Error en la respuesta del servicio SOAP",
|
||||||
data={"soap_response": response.text[:500]}
|
data={"soap_response": response.text[:3000]}
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -123,12 +145,13 @@ async def obtener_acuse(**kwargs):
|
|||||||
organizacion = pedimento.get("organizacion", None)
|
organizacion = pedimento.get("organizacion", None)
|
||||||
pedimento_id = pedimento.get("id", None)
|
pedimento_id = pedimento.get("id", None)
|
||||||
|
|
||||||
rest_response = await acuse_rest_controller.post_document(
|
rest_response = await acuse_rest_controller.post_or_update_document(
|
||||||
binary_content=pdf_bytes,
|
binary_content=pdf_bytes,
|
||||||
organizacion=organizacion,
|
organizacion=organizacion,
|
||||||
pedimento=pedimento_id,
|
pedimento=pedimento_id,
|
||||||
file_name=_file_name,
|
file_name=_file_name,
|
||||||
document_type=4
|
document_type=4,
|
||||||
|
identifier=idEdocument_efc,
|
||||||
)
|
)
|
||||||
|
|
||||||
if rest_response is None:
|
if rest_response is None:
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import asyncio
|
|||||||
import logging
|
import logging
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
from fastapi import HTTPException
|
||||||
|
|
||||||
from .services import obtener_acuse
|
from .services import obtener_acuse
|
||||||
from api.api_v2.modules.tasks.services import register_task, update_task
|
from api.api_v2.modules.tasks.services import register_task, update_task
|
||||||
@@ -20,9 +21,11 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
|
|||||||
organizacion_id = pedimento_info.get('organizacion')
|
organizacion_id = pedimento_info.get('organizacion')
|
||||||
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
||||||
|
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Registrar el inicio de la tarea
|
# Registrar el inicio de la tarea
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}")
|
logging.info(f"[ACUSE] Registrando inicio de tarea {task_id}")
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
register_task(
|
register_task(
|
||||||
@@ -73,15 +76,19 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
|
|||||||
# En caso de error, actualizar estado
|
# En caso de error, actualizar estado
|
||||||
error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}"
|
error_message = f"Error al procesar acuse para pedimento {pedimento_app}: {str(e)}"
|
||||||
logging.error(error_message)
|
logging.error(error_message)
|
||||||
loop.run_until_complete(
|
try:
|
||||||
update_task(
|
loop.run_until_complete(
|
||||||
task_id=task_id,
|
update_task(
|
||||||
status="failed",
|
task_id=task_id,
|
||||||
message=error_message,
|
status="failed",
|
||||||
pedimento_id=pedimento_id,
|
message=error_message,
|
||||||
organizacion_id=organizacion_id,
|
pedimento_id=pedimento_id,
|
||||||
servicio=6
|
organizacion_id=organizacion_id,
|
||||||
|
servicio=6
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
except Exception as update_error:
|
||||||
raise
|
logging.error(f"Error al actualizar estado de tarea: {update_error}")
|
||||||
|
return {"status": "failed", "message": error_message}
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
@@ -91,12 +91,13 @@ async def consume_ws_get_cove(**kwargs):
|
|||||||
# Enviar documento de request a EFC
|
# Enviar documento de request a EFC
|
||||||
try:
|
try:
|
||||||
file_name_request = f"vu_COVE_{pedimento_app}_{cove}_REQUEST.xml"
|
file_name_request = f"vu_COVE_{pedimento_app}_{cove}_REQUEST.xml"
|
||||||
document_response = await coves_rest_controller.post_document(
|
document_response = await coves_rest_controller.post_or_update_document(
|
||||||
soap_response=soap_xml,
|
soap_response=soap_xml,
|
||||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||||
pedimento=kwargs.get('pedimento').get('id'),
|
pedimento=kwargs.get('pedimento').get('id'),
|
||||||
file_name=file_name_request,
|
file_name=file_name_request,
|
||||||
document_type=19,
|
document_type=19,
|
||||||
|
identifier=cove,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error al enviar documento request: {e}")
|
logger.error(f"Error al enviar documento request: {e}")
|
||||||
@@ -117,12 +118,13 @@ async def consume_ws_get_cove(**kwargs):
|
|||||||
raise Exception("No se recibió respuesta del servicio SOAP")
|
raise Exception("No se recibió respuesta del servicio SOAP")
|
||||||
|
|
||||||
if soap_error(soap_response):
|
if soap_error(soap_response):
|
||||||
document_response = await coves_rest_controller.post_document(
|
document_response = await coves_rest_controller.post_or_update_document(
|
||||||
soap_response=soap_response,
|
soap_response=soap_response,
|
||||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||||
pedimento=kwargs.get('pedimento').get('id'),
|
pedimento=kwargs.get('pedimento').get('id'),
|
||||||
file_name=f"vu_COVE_{pedimento_app}_{cove}_ERROR.xml",
|
file_name=f"vu_COVE_{pedimento_app}_{cove}_ERROR.xml",
|
||||||
document_type=10,
|
document_type=20,
|
||||||
|
identifier=cove,
|
||||||
)
|
)
|
||||||
|
|
||||||
raise Exception("Error en la respuesta del servicio SOAP")
|
raise Exception("Error en la respuesta del servicio SOAP")
|
||||||
@@ -132,12 +134,13 @@ async def consume_ws_get_cove(**kwargs):
|
|||||||
# Enviar documento
|
# Enviar documento
|
||||||
_file_name = f"vu_COVE_{pedimento_app}_{cove}.xml"
|
_file_name = f"vu_COVE_{pedimento_app}_{cove}.xml"
|
||||||
try:
|
try:
|
||||||
document_response = await coves_rest_controller.post_document(
|
document_response = await coves_rest_controller.post_or_update_document(
|
||||||
soap_response=soap_response,
|
soap_response=soap_response,
|
||||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||||
pedimento=kwargs.get('pedimento').get('id'),
|
pedimento=kwargs.get('pedimento').get('id'),
|
||||||
file_name=_file_name,
|
file_name=_file_name,
|
||||||
document_type=8,
|
document_type=8,
|
||||||
|
identifier=cove,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error detectado en la respuesta SOAP: {str(e)}")
|
logger.error(f"Error detectado en la respuesta SOAP: {str(e)}")
|
||||||
@@ -205,12 +208,13 @@ async def consume_ws_get_acuse_cove(**kwargs):
|
|||||||
# Enviar documento de request a EFC
|
# Enviar documento de request a EFC
|
||||||
try:
|
try:
|
||||||
file_name_request = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_REQUEST.xml"
|
file_name_request = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_REQUEST.xml"
|
||||||
document_response = await coves_rest_controller.post_document(
|
document_response = await coves_rest_controller.post_or_update_document(
|
||||||
soap_response=soap_xml,
|
soap_response=soap_xml,
|
||||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||||
pedimento=kwargs.get('pedimento').get('id'),
|
pedimento=kwargs.get('pedimento').get('id'),
|
||||||
file_name=file_name_request,
|
file_name=file_name_request,
|
||||||
document_type=23,
|
document_type=23,
|
||||||
|
identifier=kwargs['cove'].get('cove', 'N/A'),
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error al enviar documento request de acuse cove: {e}")
|
logger.error(f"Error al enviar documento request de acuse cove: {e}")
|
||||||
@@ -251,12 +255,13 @@ async def consume_ws_get_acuse_cove(**kwargs):
|
|||||||
if soap_error(response):
|
if soap_error(response):
|
||||||
error_file_name = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_ERROR.xml"
|
error_file_name = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_ERROR.xml"
|
||||||
try:
|
try:
|
||||||
rest_response = await coves_rest_controller.post_document(
|
rest_response = await coves_rest_controller.post_or_update_document(
|
||||||
soap_response=response,
|
soap_response=response,
|
||||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||||
pedimento=kwargs.get('pedimento').get('id'),
|
pedimento=kwargs.get('pedimento').get('id'),
|
||||||
file_name=error_file_name,
|
file_name=error_file_name,
|
||||||
document_type=10,
|
document_type=24,
|
||||||
|
identifier=kwargs['cove'].get('cove', 'N/A'),
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error al guardar respuesta SOAP errónea: {e}")
|
logger.error(f"Error al guardar respuesta SOAP errónea: {e}")
|
||||||
@@ -314,12 +319,14 @@ async def consume_ws_get_acuse_cove(**kwargs):
|
|||||||
organizacion = pedimento.get("organizacion", None)
|
organizacion = pedimento.get("organizacion", None)
|
||||||
pedimento_id = pedimento.get("id", None)
|
pedimento_id = pedimento.get("id", None)
|
||||||
|
|
||||||
rest_response = await coves_rest_controller.post_document(
|
cove_identifier = kwargs['cove'].get('cove', '')
|
||||||
|
rest_response = await coves_rest_controller.post_or_update_document(
|
||||||
binary_content=pdf_bytes,
|
binary_content=pdf_bytes,
|
||||||
organizacion=organizacion,
|
organizacion=organizacion,
|
||||||
pedimento=pedimento_id,
|
pedimento=pedimento_id,
|
||||||
file_name=_file_name,
|
file_name=_file_name,
|
||||||
document_type=7
|
document_type=7,
|
||||||
|
identifier=cove_identifier,
|
||||||
)
|
)
|
||||||
|
|
||||||
acuse_status = await change_acuse_status(
|
acuse_status = await change_acuse_status(
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import time
|
|
||||||
from celery import Celery
|
from celery import Celery
|
||||||
from celery_app import celery_app
|
from celery_app import celery_app
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
|
from fastapi import HTTPException as _HTTPException
|
||||||
|
|
||||||
from .services import consume_ws_get_cove, consume_ws_get_acuse_cove
|
from .services import consume_ws_get_cove, consume_ws_get_acuse_cove
|
||||||
from api.api_v2.modules.tasks.services import update_task, register_task
|
from api.api_v2.modules.tasks.services import update_task, register_task
|
||||||
@@ -23,13 +23,13 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
organizacion_id = pedimento_info.get('organizacion')
|
organizacion_id = pedimento_info.get('organizacion')
|
||||||
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
||||||
cove_info = cove_request.get('cove', {})
|
cove_info = cove_request.get('cove', {})
|
||||||
cove_number = cove_info.get('numero_cove', 'N/A')
|
# aqui se cambio de cove_number a 'cove' porque no esta asi en el mapeo
|
||||||
|
cove_number = cove_info.get('cove', 'N/A')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Registrar el inicio de la tarea
|
# Registrar el inicio de la tarea
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
|
logger.info(f"[COVE] Registrando inicio de tarea {task_id}")
|
||||||
loop.run_until_complete(
|
asyncio.run(
|
||||||
register_task(
|
register_task(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
status="submitted",
|
status="submitted",
|
||||||
@@ -40,12 +40,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Esperar un momento breve para asegurar que el registro se complete
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
# Actualizar estado: procesando
|
|
||||||
logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}")
|
logger.info(f"[COVE] Actualizando estado a processing para tarea {task_id}")
|
||||||
loop.run_until_complete(
|
asyncio.run(
|
||||||
update_task(
|
update_task(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
status="processing",
|
status="processing",
|
||||||
@@ -56,11 +52,9 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Obtener el COVE
|
cove_response = asyncio.run(consume_ws_get_cove(**cove_request))
|
||||||
cove_response = loop.run_until_complete(consume_ws_get_cove(**cove_request))
|
|
||||||
|
|
||||||
# Actualizar estado: completado
|
asyncio.run(
|
||||||
loop.run_until_complete(
|
|
||||||
update_task(
|
update_task(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
status="completed",
|
status="completed",
|
||||||
@@ -74,19 +68,25 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
|
|||||||
return {"status": "processed", "data": cove_response}
|
return {"status": "processed", "data": cove_response}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# En caso de error, actualizar estado
|
|
||||||
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
error_message = f"Error al procesar COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
||||||
logger.error(error_message)
|
logger.error(error_message)
|
||||||
loop.run_until_complete(
|
|
||||||
update_task(
|
try:
|
||||||
task_id=task_id,
|
asyncio.run(
|
||||||
status="failed",
|
update_task(
|
||||||
message=error_message,
|
task_id=task_id,
|
||||||
pedimento_id=pedimento_id,
|
status="failed",
|
||||||
organizacion_id=organizacion_id,
|
message=error_message,
|
||||||
servicio=8
|
pedimento_id=pedimento_id,
|
||||||
|
organizacion_id=organizacion_id,
|
||||||
|
servicio=8
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
except Exception as update_error:
|
||||||
|
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
|
||||||
|
|
||||||
|
if isinstance(e, _HTTPException):
|
||||||
|
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
@@ -101,13 +101,13 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
|||||||
organizacion_id = pedimento_info.get('organizacion')
|
organizacion_id = pedimento_info.get('organizacion')
|
||||||
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
||||||
cove_info = cove_request.get('cove', {})
|
cove_info = cove_request.get('cove', {})
|
||||||
cove_number = cove_info.get('numero_cove', 'N/A')
|
# se cambio el cove_number, ya que no existe directamente en el mapeo, se conoce solo como 'cove'
|
||||||
|
cove_number = cove_info.get('cove', 'N/A')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Registrar el inicio de la tarea
|
# Registrar el inicio de la tarea
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
|
logger.info(f"[COVE] Registrando inicio de tarea de acuse {task_id}")
|
||||||
loop.run_until_complete(
|
asyncio.run(
|
||||||
register_task(
|
register_task(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
status="submitted",
|
status="submitted",
|
||||||
@@ -118,12 +118,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Esperar un momento breve para asegurar que el registro se complete
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
# Actualizar estado: procesando
|
|
||||||
logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}")
|
logger.info(f"[COVE] Actualizando estado a processing para tarea de acuse {task_id}")
|
||||||
loop.run_until_complete(
|
asyncio.run(
|
||||||
update_task(
|
update_task(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
status="processing",
|
status="processing",
|
||||||
@@ -134,11 +130,9 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Obtener el acuse del COVE
|
acuse_response = asyncio.run(consume_ws_get_acuse_cove(**cove_request))
|
||||||
acuse_response = loop.run_until_complete(consume_ws_get_acuse_cove(**cove_request))
|
|
||||||
|
|
||||||
# Actualizar estado: completado
|
asyncio.run(
|
||||||
loop.run_until_complete(
|
|
||||||
update_task(
|
update_task(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
status="completed",
|
status="completed",
|
||||||
@@ -152,18 +146,23 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
|
|||||||
return {"status": "processed", "data": acuse_response}
|
return {"status": "processed", "data": acuse_response}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# En caso de error, actualizar estado
|
|
||||||
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
error_message = f"Error al procesar acuse de COVE {cove_number} para pedimento {pedimento_app}: {str(e)}"
|
||||||
logger.error(error_message)
|
logger.error(error_message)
|
||||||
loop.run_until_complete(
|
|
||||||
update_task(
|
|
||||||
task_id=task_id,
|
|
||||||
status="failed",
|
|
||||||
message=error_message,
|
|
||||||
pedimento_id=pedimento_id,
|
|
||||||
organizacion_id=organizacion_id,
|
|
||||||
servicio=9
|
|
||||||
)
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(
|
||||||
|
update_task(
|
||||||
|
task_id=task_id,
|
||||||
|
status="failed",
|
||||||
|
message=error_message,
|
||||||
|
pedimento_id=pedimento_id,
|
||||||
|
organizacion_id=organizacion_id,
|
||||||
|
servicio=9
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except Exception as update_error:
|
||||||
|
logger.error(f"No se pudo actualizar el estado de la tarea: {update_error}")
|
||||||
|
|
||||||
|
if isinstance(e, _HTTPException):
|
||||||
|
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||||
|
raise
|
||||||
@@ -48,12 +48,13 @@ async def obtener_edoc(**kwargs):
|
|||||||
|
|
||||||
file_name_request = f"VU_ED_{pedimento_app}_{numero_documento}_REQUEST.xml"
|
file_name_request = f"VU_ED_{pedimento_app}_{numero_documento}_REQUEST.xml"
|
||||||
|
|
||||||
document_response = await edocs_rest_controller.post_document(
|
document_response = await edocs_rest_controller.post_or_update_document(
|
||||||
soap_response=soap_xml,
|
soap_response=soap_xml,
|
||||||
organizacion=organizacion_efc,
|
organizacion=organizacion_efc,
|
||||||
pedimento=pedimento_id_efc,
|
pedimento=pedimento_id_efc,
|
||||||
file_name=file_name_request,
|
file_name=file_name_request,
|
||||||
document_type=21 # Tipo de documento para request de e-document,
|
document_type=21,
|
||||||
|
identifier=numero_documento,
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -97,6 +98,21 @@ async def obtener_edoc(**kwargs):
|
|||||||
|
|
||||||
if soap_error(response):
|
if soap_error(response):
|
||||||
logger.error("Respuesta SOAP contiene error de VUCEM")
|
logger.error("Respuesta SOAP contiene error de VUCEM")
|
||||||
|
_pedimento_efc = kwargs.get('pedimento', {})
|
||||||
|
_file_name_error = f"VU_ED_{_pedimento_efc.get('pedimento_app', 'N/A')}_{numero_documento}_RESPONSE_ERROR.xml"
|
||||||
|
logger.info(f"Guardando RESPONSE_ERROR doc_type=22: file={_file_name_error}, organizacion={_pedimento_efc.get('organizacion')}, pedimento={_pedimento_efc.get('id')}")
|
||||||
|
_doc_result = await edocs_rest_controller.post_or_update_document(
|
||||||
|
soap_response=response.text,
|
||||||
|
organizacion=_pedimento_efc.get('organizacion'),
|
||||||
|
pedimento=_pedimento_efc.get('id'),
|
||||||
|
file_name=_file_name_error,
|
||||||
|
document_type=22,
|
||||||
|
identifier=numero_documento,
|
||||||
|
)
|
||||||
|
if _doc_result is None:
|
||||||
|
logger.error("post_or_update_document retornó None para RESPONSE_ERROR doc_type=22 — archivo físico sin registro en BD")
|
||||||
|
else:
|
||||||
|
logger.info(f"RESPONSE_ERROR registrado en BD: id={_doc_result.get('id')}, document_type={_doc_result.get('document_type')}")
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=500,
|
status_code=500,
|
||||||
detail=create_error_response(
|
detail=create_error_response(
|
||||||
@@ -109,8 +125,23 @@ async def obtener_edoc(**kwargs):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
edoc_base64 = extract_pdf_bytes_from_xml_content(response.text)
|
edoc_base64 = extract_pdf_bytes_from_xml_content(response.text)
|
||||||
except ValueError as ve:
|
except Exception as ve:
|
||||||
logger.error(f"Error extrayendo contenido del XML: {ve}")
|
logger.error(f"Error extrayendo contenido del XML: {ve}")
|
||||||
|
_pedimento_efc = kwargs.get('pedimento', {})
|
||||||
|
_file_name_error = f"VU_ED_{_pedimento_efc.get('pedimento_app', 'N/A')}_{numero_documento}_RESPONSE_ERROR.xml"
|
||||||
|
logger.info(f"Guardando RESPONSE_ERROR doc_type=22 (parse error): file={_file_name_error}")
|
||||||
|
_doc_result = await edocs_rest_controller.post_or_update_document(
|
||||||
|
soap_response=response.text,
|
||||||
|
organizacion=_pedimento_efc.get('organizacion'),
|
||||||
|
pedimento=_pedimento_efc.get('id'),
|
||||||
|
file_name=_file_name_error,
|
||||||
|
document_type=22,
|
||||||
|
identifier=numero_documento,
|
||||||
|
)
|
||||||
|
if _doc_result is None:
|
||||||
|
logger.error("post_document retornó None para RESPONSE_ERROR doc_type=22 (parse error)")
|
||||||
|
else:
|
||||||
|
logger.info(f"RESPONSE_ERROR registrado en BD: id={_doc_result.get('id')}, document_type={_doc_result.get('document_type')}")
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=500,
|
status_code=500,
|
||||||
detail=create_error_response(
|
detail=create_error_response(
|
||||||
@@ -172,14 +203,17 @@ async def obtener_edoc(**kwargs):
|
|||||||
# No guardaremos el archivo localmente por seguridad
|
# No guardaremos el archivo localmente por seguridad
|
||||||
logger.debug(f"Procesando documento {numero_documento} para pedimento {pedimento_id}")
|
logger.debug(f"Procesando documento {numero_documento} para pedimento {pedimento_id}")
|
||||||
|
|
||||||
rest_response = await edocs_rest_controller.post_document(
|
rest_response = await edocs_rest_controller.post_or_update_document(
|
||||||
binary_content=pdf_bytes,
|
binary_content=pdf_bytes,
|
||||||
organizacion=organizacion,
|
organizacion=organizacion,
|
||||||
pedimento=pedimento_id,
|
pedimento=pedimento_id,
|
||||||
file_name=_file_name,
|
file_name=_file_name,
|
||||||
document_type=5
|
document_type=5,
|
||||||
|
identifier=numero_documento,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
print(f"rest_response >>>> {rest_response}")
|
||||||
|
|
||||||
if rest_response is None:
|
if rest_response is None:
|
||||||
logger.error("Error al enviar el documento a la API interna")
|
logger.error("Error al enviar el documento a la API interna")
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
@@ -239,9 +273,10 @@ async def obtener_edoc(**kwargs):
|
|||||||
|
|
||||||
|
|
||||||
async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
|
async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
|
||||||
|
# estaba acualizando mal el status de descarga, actualizaba otro campo que no le correspondia
|
||||||
data = {
|
data = {
|
||||||
"id": edoc.get("id"),
|
"id": edoc.get("id"),
|
||||||
"acuse_descargado": status,
|
"edocument_descargado": status,
|
||||||
"numero_edocument": edoc.get("numero_edocument"),
|
"numero_edocument": edoc.get("numero_edocument"),
|
||||||
"pedimento": pedimento.get("id"),
|
"pedimento": pedimento.get("id"),
|
||||||
"organizacion": pedimento.get("organizacion"),
|
"organizacion": pedimento.get("organizacion"),
|
||||||
@@ -252,42 +287,42 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
|
|||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
NS = {
|
||||||
|
's': 'http://schemas.xmlsoap.org/soap/envelope/',
|
||||||
|
't': 'http://tempuri.org/',
|
||||||
|
'i': 'http://www.w3.org/2001/XMLSchema-instance'
|
||||||
|
}
|
||||||
|
|
||||||
|
# mejorar la extraccion de seccion File
|
||||||
def extract_pdf_bytes_from_xml_content(xml_content: str):
|
def extract_pdf_bytes_from_xml_content(xml_content: str):
|
||||||
"""
|
|
||||||
Extrae el PDF y metadatos desde un string XML.
|
|
||||||
"""
|
|
||||||
root = ET.fromstring(xml_content)
|
root = ET.fromstring(xml_content)
|
||||||
file_elem = root.find('.//File')
|
|
||||||
if file_elem is None:
|
errores = root.find('.//t:Errores', NS)
|
||||||
for elem in root.iter():
|
tiene_error = root.find('.//t:TieneError', NS)
|
||||||
if elem.tag.endswith('File') and elem.text:
|
|
||||||
file_elem = elem
|
if tiene_error is not None and tiene_error.text == 'true':
|
||||||
break
|
err_msg = errores.text if errores is not None else 'Error desconocido'
|
||||||
if file_elem is not None and file_elem.text and file_elem.text.strip():
|
raise Exception(f"VUCEM informa error: {err_msg}")
|
||||||
base64_data = file_elem.text.strip().replace('\n', '').replace('\r', '')
|
|
||||||
pdf_bytes = base64.b64decode(base64_data)
|
file_elem = root.find('.//t:File', NS)
|
||||||
cadena_original = None
|
if file_elem is None or file_elem.get('{http://www.w3.org/2001/XMLSchema-instance}nil') == 'true' or not file_elem.text:
|
||||||
sello_digital = None
|
raise ValueError("No se encontró el tag <File> con contenido válido.")
|
||||||
cadena_elem = root.find('.//CadenaOriginal')
|
|
||||||
if cadena_elem is None:
|
base64_data = file_elem.text.strip().replace('\n', '').replace('\r', '')
|
||||||
for elem in root.iter():
|
pdf_bytes = base64.b64decode(base64_data)
|
||||||
if elem.tag.endswith('CadenaOriginal') and elem.text:
|
|
||||||
cadena_elem = elem
|
# Extraer CadenaOriginal y SelloDigital con namespaces
|
||||||
break
|
cadena_original = None
|
||||||
if cadena_elem is not None and cadena_elem.text:
|
sello_digital = None
|
||||||
cadena_original = cadena_elem.text.strip()
|
cadena_elem = root.find('.//t:CadenaOriginal', NS)
|
||||||
sello_elem = root.find('.//SelloDigital')
|
if cadena_elem is not None and cadena_elem.get('{http://www.w3.org/2001/XMLSchema-instance}nil') != 'true':
|
||||||
if sello_elem is None:
|
cadena_original = cadena_elem.text.strip() if cadena_elem.text else None
|
||||||
for elem in root.iter():
|
sello_elem = root.find('.//t:SelloDigital', NS)
|
||||||
if elem.tag.endswith('SelloDigital') and elem.text:
|
if sello_elem is not None and sello_elem.get('{http://www.w3.org/2001/XMLSchema-instance}nil') != 'true':
|
||||||
sello_elem = elem
|
sello_digital = sello_elem.text.strip() if sello_elem.text else None
|
||||||
break
|
|
||||||
if sello_elem is not None and sello_elem.text:
|
return {
|
||||||
sello_digital = sello_elem.text.strip()
|
"pdf_bytes": pdf_bytes,
|
||||||
return {
|
"cadena_original": cadena_original,
|
||||||
"pdf_bytes": pdf_bytes,
|
"sello_digital": sello_digital
|
||||||
"cadena_original": cadena_original,
|
}
|
||||||
"sello_digital": sello_digital
|
|
||||||
}
|
|
||||||
else:
|
|
||||||
raise ValueError("No se encontró el tag <File> con contenido válido. Verifique que el XML contiene el tag <File> con datos base64.")
|
|
||||||
@@ -3,6 +3,7 @@ import logging
|
|||||||
import time
|
import time
|
||||||
from celery_app import celery_app
|
from celery_app import celery_app
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
|
from fastapi import HTTPException as _HTTPException
|
||||||
|
|
||||||
from .services import obtener_edoc
|
from .services import obtener_edoc
|
||||||
from api.api_v2.modules.tasks.services import register_task, update_task
|
from api.api_v2.modules.tasks.services import register_task, update_task
|
||||||
@@ -21,11 +22,14 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
|||||||
organizacion_id = pedimento_info.get('organizacion')
|
organizacion_id = pedimento_info.get('organizacion')
|
||||||
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
|
||||||
edoc_info = edoc_data.get('edoc', {})
|
edoc_info = edoc_data.get('edoc', {})
|
||||||
edoc_number = edoc_info.get('numero_edoc', 'N/A')
|
# el mapeo de numero de documento no estaba correcto, se corrigio
|
||||||
|
edoc_number = edoc_info.get('numero_edocument', 'N/A')
|
||||||
|
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Registrar el inicio de la tarea
|
# Registrar el inicio de la tarea
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
|
logger.info(f"[EDOC] Registrando inicio de tarea {task_id}")
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
register_task(
|
register_task(
|
||||||
@@ -34,7 +38,7 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
|||||||
message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}",
|
message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}",
|
||||||
pedimento_id=pedimento_id,
|
pedimento_id=pedimento_id,
|
||||||
organizacion_id=organizacion_id,
|
organizacion_id=organizacion_id,
|
||||||
servicio=3 # 3 corresponde a "E-document"
|
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -50,7 +54,7 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
|||||||
message=f"Descargando E-document {edoc_number} para pedimento {pedimento_app}",
|
message=f"Descargando E-document {edoc_number} para pedimento {pedimento_app}",
|
||||||
pedimento_id=pedimento_id,
|
pedimento_id=pedimento_id,
|
||||||
organizacion_id=organizacion_id,
|
organizacion_id=organizacion_id,
|
||||||
servicio=3
|
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -65,7 +69,7 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
|||||||
message=f"E-document {edoc_number} descargado exitosamente para pedimento {pedimento_app}",
|
message=f"E-document {edoc_number} descargado exitosamente para pedimento {pedimento_app}",
|
||||||
pedimento_id=pedimento_id,
|
pedimento_id=pedimento_id,
|
||||||
organizacion_id=organizacion_id,
|
organizacion_id=organizacion_id,
|
||||||
servicio=3
|
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -74,16 +78,25 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
# En caso de error, actualizar estado
|
# En caso de error, actualizar estado
|
||||||
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
|
error_message = f"Error al descargar E-document {edoc_number} para pedimento {pedimento_app}: {str(e)}"
|
||||||
logger.error(error_message)
|
logger.error(error_message, exc_info=True)
|
||||||
loop.run_until_complete(
|
|
||||||
update_task(
|
|
||||||
task_id=task_id,
|
|
||||||
status="failed",
|
|
||||||
message=error_message,
|
|
||||||
pedimento_id=pedimento_id,
|
|
||||||
organizacion_id=organizacion_id,
|
|
||||||
servicio=3
|
|
||||||
)
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(
|
||||||
|
update_task(
|
||||||
|
task_id=task_id,
|
||||||
|
status="failed",
|
||||||
|
message=error_message,
|
||||||
|
pedimento_id=pedimento_id,
|
||||||
|
organizacion_id=organizacion_id,
|
||||||
|
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except Exception as update_error:
|
||||||
|
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||||
|
|
||||||
|
if isinstance(e, _HTTPException):
|
||||||
|
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
# Cerrar el loop para liberar recursos
|
||||||
|
loop.close()
|
||||||
@@ -59,12 +59,13 @@ async def consume_ws_get_partida(**kwargs):
|
|||||||
|
|
||||||
file_name_request = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_REQUEST.xml"
|
file_name_request = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_REQUEST.xml"
|
||||||
|
|
||||||
document_response = await partida_rest_controller.post_document(
|
document_response = await partida_rest_controller.post_or_update_document(
|
||||||
soap_response=soap_xml,
|
soap_response=soap_xml,
|
||||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||||
pedimento=kwargs.get('pedimento').get('id'),
|
pedimento=kwargs.get('pedimento').get('id'),
|
||||||
file_name=file_name_request,
|
file_name=file_name_request,
|
||||||
document_type=17, # Tipo de documento para petición de partidas
|
document_type=17,
|
||||||
|
identifier=f"_PT_{pedimento_app}_{partida.get('numero', '')}_",
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error al enviar documento request: {e}")
|
logger.error(f"Error al enviar documento request: {e}")
|
||||||
@@ -97,12 +98,13 @@ async def consume_ws_get_partida(**kwargs):
|
|||||||
if soap_error(soap_response):
|
if soap_error(soap_response):
|
||||||
error_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_ERROR.xml"
|
error_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}_ERROR.xml"
|
||||||
try:
|
try:
|
||||||
document_response = await partida_rest_controller.post_document(
|
document_response = await partida_rest_controller.post_or_update_document(
|
||||||
soap_response=soap_response,
|
soap_response=soap_response,
|
||||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||||
pedimento=kwargs.get('pedimento').get('id'),
|
pedimento=kwargs.get('pedimento').get('id'),
|
||||||
file_name=error_file_name,
|
file_name=error_file_name,
|
||||||
document_type=10,
|
document_type=18,
|
||||||
|
identifier=f"_PT_{pedimento_app}_{partida.get('numero', '')}_",
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error al guardar la respuesta de error: {e}")
|
logger.error(f"Error al guardar la respuesta de error: {e}")
|
||||||
@@ -127,12 +129,13 @@ async def consume_ws_get_partida(**kwargs):
|
|||||||
# Enviar documento
|
# Enviar documento
|
||||||
_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}.xml"
|
_file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}.xml"
|
||||||
try:
|
try:
|
||||||
document_response = await partida_rest_controller.post_document(
|
document_response = await partida_rest_controller.post_or_update_document(
|
||||||
soap_response=soap_response,
|
soap_response=soap_response,
|
||||||
organizacion=kwargs.get('pedimento').get('organizacion'),
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
||||||
pedimento=kwargs.get('pedimento').get('id'),
|
pedimento=kwargs.get('pedimento').get('id'),
|
||||||
file_name=_file_name,
|
file_name=_file_name,
|
||||||
document_type=1,
|
document_type=1,
|
||||||
|
identifier=f"_PT_{pedimento_app}_{partida.get('numero', '')}_",
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error al enviar documento: {e}")
|
logger.error(f"Error al enviar documento: {e}")
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import time
|
|||||||
from celery import Celery
|
from celery import Celery
|
||||||
from celery_app import celery_app
|
from celery_app import celery_app
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
|
from fastapi import HTTPException as _HTTPException
|
||||||
|
|
||||||
from .services import consume_ws_get_partida
|
from .services import consume_ws_get_partida
|
||||||
from api.api_v2.modules.tasks.services import register_task, update_task
|
from api.api_v2.modules.tasks.services import register_task, update_task
|
||||||
@@ -24,9 +25,12 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
|
|||||||
partida_info = partida_request.get('partida', {})
|
partida_info = partida_request.get('partida', {})
|
||||||
partida_numero = partida_info.get('numero', 'N/A')
|
partida_numero = partida_info.get('numero', 'N/A')
|
||||||
|
|
||||||
|
# Crear un nuevo event loop para esta tarea
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Registrar el inicio de la tarea
|
# Registrar el inicio de la tarea
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
logger.info(f"[PARTIDA] Registrando inicio de tarea {task_id}")
|
logger.info(f"[PARTIDA] Registrando inicio de tarea {task_id}")
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
register_task(
|
register_task(
|
||||||
@@ -39,9 +43,6 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Esperar un momento breve para asegurar que el registro se complete
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
# Actualizar estado: procesando
|
# Actualizar estado: procesando
|
||||||
logger.info(f"[PARTIDA] Actualizando estado a processing para tarea {task_id}")
|
logger.info(f"[PARTIDA] Actualizando estado a processing para tarea {task_id}")
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
@@ -76,15 +77,22 @@ def process_partida_request(self, partida_request: Dict[str, Any]) -> Dict[str,
|
|||||||
# En caso de error, actualizar estado
|
# En caso de error, actualizar estado
|
||||||
error_message = f"Error al procesar partida {partida_numero} para pedimento {pedimento_app}: {str(e)}"
|
error_message = f"Error al procesar partida {partida_numero} para pedimento {pedimento_app}: {str(e)}"
|
||||||
logger.error(error_message)
|
logger.error(error_message)
|
||||||
loop.run_until_complete(
|
try:
|
||||||
update_task(
|
loop.run_until_complete(
|
||||||
task_id=task_id,
|
update_task(
|
||||||
status="failed",
|
task_id=task_id,
|
||||||
message=error_message,
|
status="failed",
|
||||||
pedimento_id=pedimento_id,
|
message=error_message,
|
||||||
organizacion_id=organizacion_id,
|
pedimento_id=pedimento_id,
|
||||||
servicio=4
|
organizacion_id=organizacion_id,
|
||||||
|
servicio=4
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
except Exception as update_error:
|
||||||
|
logger.error(f"Error al actualizar estado de tarea: {update_error}")
|
||||||
|
if isinstance(e, _HTTPException):
|
||||||
|
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
|
# Limpiar el event loop
|
||||||
|
loop.close()
|
||||||
@@ -6,8 +6,9 @@ from .tasks import process_pedimento_completo_request
|
|||||||
from .services import put_pedimento_data_vu
|
from .services import put_pedimento_data_vu
|
||||||
from api.api_v2.modules.tasks.services import register_task
|
from api.api_v2.modules.tasks.services import register_task
|
||||||
import logging
|
import logging
|
||||||
logger = logging.getLogger("app.api")
|
from celery import group, chord, chain
|
||||||
|
|
||||||
|
logger = logging.getLogger("app.api")
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
@router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED)
|
@router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED)
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from typing import Optional, Union, Dict, Any
|
from typing import Optional, Union, Dict, Any, List
|
||||||
from uuid import UUID
|
from uuid import UUID
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
# CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione
|
# CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione
|
||||||
|
|||||||
@@ -64,7 +64,7 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
|
|||||||
|
|
||||||
file_name_request = f"VU_PC_{pedimento_data.get('pedimento_app', 'unknown')}_REQUEST.xml"
|
file_name_request = f"VU_PC_{pedimento_data.get('pedimento_app', 'unknown')}_REQUEST.xml"
|
||||||
|
|
||||||
document_response = await pedimento_rest_controller.post_document(
|
document_response = await pedimento_rest_controller.post_or_update_document(
|
||||||
soap_response=soap_xml,
|
soap_response=soap_xml,
|
||||||
organizacion=pedimento_data.get('organizacion'),
|
organizacion=pedimento_data.get('organizacion'),
|
||||||
pedimento=pedimento_data.get('id'),
|
pedimento=pedimento_data.get('id'),
|
||||||
@@ -90,12 +90,12 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
|
|||||||
|
|
||||||
if soap_error(soap_response):
|
if soap_error(soap_response):
|
||||||
logger.error(f"Error en respuesta SOAP: {soap_response.text if hasattr(soap_response, 'text') else 'Sin detalles'}")
|
logger.error(f"Error en respuesta SOAP: {soap_response.text if hasattr(soap_response, 'text') else 'Sin detalles'}")
|
||||||
document_response = await pedimento_rest_controller.post_document(
|
document_response = await pedimento_rest_controller.post_or_update_document(
|
||||||
soap_response=None,
|
soap_response=soap_response,
|
||||||
organizacion=pedimento_data.get('organizacion'),
|
organizacion=pedimento_data.get('organizacion'),
|
||||||
pedimento=pedimento_data.get('id'),
|
pedimento=pedimento_data.get('id'),
|
||||||
file_name=f"vu_PC_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml",
|
file_name=f"vu_PC_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml",
|
||||||
document_type=10,
|
document_type=14,
|
||||||
)
|
)
|
||||||
raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP")
|
raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP")
|
||||||
|
|
||||||
@@ -111,7 +111,7 @@ async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]:
|
|||||||
|
|
||||||
# Enviar documento
|
# Enviar documento
|
||||||
try:
|
try:
|
||||||
document_response = await pedimento_rest_controller.post_document(
|
document_response = await pedimento_rest_controller.post_or_update_document(
|
||||||
soap_response=soap_response,
|
soap_response=soap_response,
|
||||||
organizacion=pedimento_data.get('organizacion'),
|
organizacion=pedimento_data.get('organizacion'),
|
||||||
pedimento=pedimento_data.get('id'),
|
pedimento=pedimento_data.get('id'),
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ from celery_app import celery_app
|
|||||||
from .services import put_pedimento_data
|
from .services import put_pedimento_data
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
from fastapi import HTTPException as _HTTPException
|
||||||
from ..tasks.services import register_task, update_task
|
from ..tasks.services import register_task, update_task
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -17,7 +18,9 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
|
|||||||
Returns:
|
Returns:
|
||||||
dict: Resultado del procesamiento con estado y detalles
|
dict: Resultado del procesamiento con estado y detalles
|
||||||
"""
|
"""
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
task_id = self.request.id
|
task_id = self.request.id
|
||||||
servicio = 3 # Código para Pedimento Completo
|
servicio = 3 # Código para Pedimento Completo
|
||||||
pedimento_id = pedimento_data.get('pedimento', {}).get('id')
|
pedimento_id = pedimento_data.get('pedimento', {}).get('id')
|
||||||
@@ -84,4 +87,8 @@ def process_pedimento_completo_request(self, pedimento_data: dict):
|
|||||||
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||||
|
|
||||||
# Re-lanzar la excepción para que Celery la marque como fallida
|
# Re-lanzar la excepción para que Celery la marque como fallida
|
||||||
|
if isinstance(e, _HTTPException):
|
||||||
|
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||||
raise
|
raise
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
@@ -64,7 +64,7 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
|
|||||||
try:
|
try:
|
||||||
|
|
||||||
file_name_request = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_REQUEST.xml"
|
file_name_request = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_REQUEST.xml"
|
||||||
document_response = await remesa_rest_controller.post_document(
|
document_response = await remesa_rest_controller.post_or_update_document(
|
||||||
soap_response=soap_xml,
|
soap_response=soap_xml,
|
||||||
organizacion=pedimento_data.get('organizacion'),
|
organizacion=pedimento_data.get('organizacion'),
|
||||||
pedimento=pedimento_data.get('id'),
|
pedimento=pedimento_data.get('id'),
|
||||||
@@ -87,25 +87,77 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
|
|||||||
)
|
)
|
||||||
# Generar nombre de archivo
|
# Generar nombre de archivo
|
||||||
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml"
|
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml"
|
||||||
if soap_error(soap_response):
|
|
||||||
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
|
|
||||||
document_response = await remesa_rest_controller.post_document(
|
|
||||||
soap_response=soap_response,
|
|
||||||
organizacion=pedimento_data.get('organizacion'),
|
|
||||||
pedimento=pedimento_data.get('id'),
|
|
||||||
file_name=file_name,
|
|
||||||
document_type=10,
|
|
||||||
)
|
|
||||||
raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP")
|
|
||||||
# Enviar documento
|
|
||||||
try:
|
|
||||||
|
|
||||||
document_response = await remesa_rest_controller.post_document(
|
# "No hay información" NO es un error — VUCEM confirma que el pedimento no tiene remesas.
|
||||||
|
# No se crea documento de error; se corrige el flag en el pedimento.
|
||||||
|
_SIN_REMESAS = '<ns2:mensaje>No hay información para la búsqueda solicitada</ns2:mensaje>'
|
||||||
|
if hasattr(soap_response, 'text') and _SIN_REMESAS in soap_response.text:
|
||||||
|
pedimento_id = pedimento_data.get('id')
|
||||||
|
logger.info(
|
||||||
|
f"Pedimento {pedimento_data.get('pedimento_app')} no tiene remesas "
|
||||||
|
f"(confirmado por VUCEM). Corrigiendo flag remesas=False."
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
await remesa_rest_controller._make_request_async(
|
||||||
|
'PATCH',
|
||||||
|
f'customs/pedimentos/{pedimento_id}/',
|
||||||
|
data={'remesas': False},
|
||||||
|
)
|
||||||
|
except Exception as patch_err:
|
||||||
|
logger.warning(f"No se pudo actualizar remesas=False para {pedimento_id}: {patch_err}")
|
||||||
|
|
||||||
|
return create_service_response(
|
||||||
|
message="Pedimento sin remesas confirmado por VUCEM",
|
||||||
|
data={"remesas": False},
|
||||||
|
metadata={"pedimento_app": pedimento_data.get('pedimento_app')},
|
||||||
|
)
|
||||||
|
|
||||||
|
if soap_error(soap_response):
|
||||||
|
|
||||||
|
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
|
||||||
|
document_response = await remesa_rest_controller.post_or_update_document(
|
||||||
soap_response=soap_response,
|
soap_response=soap_response,
|
||||||
organizacion=pedimento_data.get('organizacion'),
|
organizacion=pedimento_data.get('organizacion'),
|
||||||
pedimento=pedimento_data.get('id'),
|
pedimento=pedimento_data.get('id'),
|
||||||
file_name=file_name,
|
file_name=file_name,
|
||||||
document_type=5,
|
document_type=16,
|
||||||
|
)
|
||||||
|
|
||||||
|
error_message = "Error en la respuesta del servicio SOAP"
|
||||||
|
|
||||||
|
if hasattr(soap_response, 'text') and soap_response.text:
|
||||||
|
try:
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
root = ET.fromstring(soap_response.text)
|
||||||
|
|
||||||
|
for fault in root.findall('.//{http://schemas.xmlsoap.org/soap/envelope/}Fault'):
|
||||||
|
faultstring = fault.find('.//faultstring')
|
||||||
|
if faultstring is not None and faultstring.text:
|
||||||
|
error_message = faultstring.text
|
||||||
|
break
|
||||||
|
|
||||||
|
for error in root.findall('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}error'):
|
||||||
|
msg = error.find('.//{http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta}message')
|
||||||
|
if msg is not None and msg.text:
|
||||||
|
error_message = msg.text
|
||||||
|
break
|
||||||
|
|
||||||
|
except Exception as parse_error:
|
||||||
|
logger.error(f"Error al parsear respuesta SOAP para extraer mensaje: {parse_error}")
|
||||||
|
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail=f"Error en la respuesta del servicio SOAP: {error_message}"
|
||||||
|
)
|
||||||
|
# Enviar documento
|
||||||
|
try:
|
||||||
|
|
||||||
|
document_response = await remesa_rest_controller.post_or_update_document(
|
||||||
|
soap_response=soap_response,
|
||||||
|
organizacion=pedimento_data.get('organizacion'),
|
||||||
|
pedimento=pedimento_data.get('id'),
|
||||||
|
file_name=file_name,
|
||||||
|
document_type=3,
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -115,12 +167,14 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
|
|||||||
# Extraer datos del XML
|
# Extraer datos del XML
|
||||||
try:
|
try:
|
||||||
remesas_data = remesa_xml_scraper.extract_remesas(soap_response.text)
|
remesas_data = remesa_xml_scraper.extract_remesas(soap_response.text)
|
||||||
|
logger.info(
|
||||||
|
f"Remesas extraídas para pedimento {pedimento_data.get('pedimento_app')}: "
|
||||||
|
f"{len(remesas_data)} COVEs encontrados → {remesas_data}"
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error al extraer datos XML: {e}")
|
logger.error(f"Error al extraer datos XML: {e}")
|
||||||
raise HTTPException(status_code=500, detail="Error al procesar respuesta XML")
|
raise HTTPException(status_code=500, detail="Error al procesar respuesta XML")
|
||||||
|
|
||||||
|
|
||||||
logger.info(f"Remesa procesada exitosamente: {pedimento_data.get('pedimento')}")
|
logger.info(f"Remesa procesada exitosamente: {pedimento_data.get('pedimento')}")
|
||||||
|
|
||||||
return create_service_response(
|
return create_service_response(
|
||||||
@@ -168,12 +222,14 @@ async def post_remesa_data(**kwargs) -> Dict[str, Any]:
|
|||||||
# Obtener datos del servicio web
|
# Obtener datos del servicio web
|
||||||
try:
|
try:
|
||||||
ws_data = await obtener_remesa(**kwargs)
|
ws_data = await obtener_remesa(**kwargs)
|
||||||
result["documento"] = ws_data.get("documento", None)
|
# obtener_remesa devuelve create_service_response → los datos están bajo 'data'
|
||||||
xml_content = ws_data.get('xml_content', {})
|
ws_data_content = ws_data.get('data', {})
|
||||||
|
result["documento"] = ws_data_content.get("documento", None)
|
||||||
|
xml_content = ws_data_content.get('xml_content', [])
|
||||||
result["xml_content"] = xml_content
|
result["xml_content"] = xml_content
|
||||||
|
|
||||||
if not xml_content:
|
if not xml_content:
|
||||||
logger.warning("No se obtuvo contenido XML del servicio web")
|
logger.warning("No se obtuvo contenido XML del servicio web (lista de remesas vacía)")
|
||||||
return result
|
return result
|
||||||
|
|
||||||
except HTTPException:
|
except HTTPException:
|
||||||
@@ -225,17 +281,8 @@ async def _process_coves_safely(kwargs: Dict[str, Any], xml_content) -> Optional
|
|||||||
|
|
||||||
async def _post_coves(pedimento_data: Dict[str, Any], coves: List[Dict[str, str]]) -> List[Dict[str, Any]]:
|
async def _post_coves(pedimento_data: Dict[str, Any], coves: List[Dict[str, str]]) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Envía COVEs al sistema REST.
|
Crea COVEs en el sistema REST usando patrón get-or-create para evitar duplicados.
|
||||||
|
Si el COVE ya existe (creado por pedimento completo u otra remesa), lo reutiliza.
|
||||||
Args:
|
|
||||||
pedimento_data: Datos del pedimento
|
|
||||||
coves: Lista de diccionarios con datos de COVE (comprobanteVE, remesaAgente, remesaSA)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Lista de respuestas exitosas
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
HTTPException: Si no se pudo procesar ningún COVE
|
|
||||||
"""
|
"""
|
||||||
if not coves:
|
if not coves:
|
||||||
return []
|
return []
|
||||||
@@ -244,23 +291,38 @@ async def _post_coves(pedimento_data: Dict[str, Any], coves: List[Dict[str, str]
|
|||||||
errors = []
|
errors = []
|
||||||
|
|
||||||
for cove in coves:
|
for cove in coves:
|
||||||
# Extraer el número de COVE del diccionario
|
|
||||||
numero_cove = cove.get('comprobanteVE')
|
numero_cove = cove.get('comprobanteVE')
|
||||||
if not numero_cove:
|
if not numero_cove:
|
||||||
logger.warning(f"COVE sin comprobanteVE encontrado: {cove}")
|
logger.warning(f"COVE sin comprobanteVE encontrado: {cove}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
document_data = {
|
|
||||||
'numero_cove': numero_cove,
|
|
||||||
'organizacion': pedimento_data.get('organizacion'),
|
|
||||||
'pedimento': pedimento_data.get('id')
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# Verificar si el COVE ya existe antes de intentar crearlo
|
||||||
|
existing = await remesa_rest_controller._make_request_async(
|
||||||
|
'GET', f'customs/coves/?numero_cove={numero_cove}'
|
||||||
|
)
|
||||||
|
if existing:
|
||||||
|
results = existing.get('results', existing) if isinstance(existing, dict) else existing
|
||||||
|
if isinstance(results, list) and results:
|
||||||
|
logger.info(f"COVE {numero_cove} ya existe (id={results[0].get('id')}), omitiendo creación")
|
||||||
|
responses.append(results[0])
|
||||||
|
continue
|
||||||
|
|
||||||
|
# No existe → crear
|
||||||
|
document_data = {
|
||||||
|
'numero_cove': numero_cove,
|
||||||
|
'organizacion': pedimento_data.get('organizacion'),
|
||||||
|
'pedimento': pedimento_data.get('id'),
|
||||||
|
}
|
||||||
response = await remesa_rest_controller.post_cove(document_data)
|
response = await remesa_rest_controller.post_cove(document_data)
|
||||||
if response:
|
if response:
|
||||||
responses.append(response)
|
responses.append(response)
|
||||||
logger.debug(f"COVE {numero_cove} procesado exitosamente")
|
logger.info(f"COVE {numero_cove} creado exitosamente")
|
||||||
|
else:
|
||||||
|
error_msg = f"POST de COVE {numero_cove} devolvió respuesta vacía"
|
||||||
|
logger.warning(error_msg)
|
||||||
|
errors.append(error_msg)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_msg = f"Error al procesar COVE {numero_cove}: {str(e)}"
|
error_msg = f"Error al procesar COVE {numero_cove}: {str(e)}"
|
||||||
logger.warning(error_msg)
|
logger.warning(error_msg)
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ from celery_app import celery_app
|
|||||||
from .services import post_remesa_data
|
from .services import post_remesa_data
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
from fastapi import HTTPException as _HTTPException
|
||||||
from ..tasks.services import register_task, update_task
|
from ..tasks.services import register_task, update_task
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -17,12 +18,17 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
|
|||||||
Returns:
|
Returns:
|
||||||
dict: Resultado del procesamiento con estado y detalles
|
dict: Resultado del procesamiento con estado y detalles
|
||||||
"""
|
"""
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
task_id = self.request.id
|
task_id = self.request.id
|
||||||
servicio = 5 # Código para Pedimento Remesas
|
servicio = 5 # Código para Pedimento Remesas
|
||||||
|
print(f"remesa_request >>>> {remesa_request}")
|
||||||
|
logger.info(f"remesa_request >>>> {remesa_request}")
|
||||||
pedimento_id = remesa_request.get('pedimento', {}).get('id')
|
pedimento_id = remesa_request.get('pedimento', {}).get('id')
|
||||||
organizacion_id = remesa_request.get('pedimento', {}).get('organizacion')
|
organizacion_id = remesa_request.get('pedimento', {}).get('organizacion')
|
||||||
remesa_num = remesa_request.get('remesa', 'N/A')
|
remesa_num = remesa_request.get('pedimento', {}).get('pedimento_app', 'N/A')
|
||||||
|
|
||||||
|
# Crear un NUEVO event loop para esta tarea (evita problemas de loop cerrado)
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Actualizar estado a processing
|
# Actualizar estado a processing
|
||||||
@@ -49,7 +55,6 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
|
|||||||
pedimento_id=pedimento_id,
|
pedimento_id=pedimento_id,
|
||||||
organizacion_id=organizacion_id,
|
organizacion_id=organizacion_id,
|
||||||
servicio=servicio,
|
servicio=servicio,
|
||||||
result=result
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -60,20 +65,45 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
|
|||||||
|
|
||||||
# Actualizar estado a failed
|
# Actualizar estado a failed
|
||||||
try:
|
try:
|
||||||
loop.run_until_complete(
|
# Verificar si el loop aún está abierto
|
||||||
update_task(
|
if not loop.is_closed():
|
||||||
task_id=task_id,
|
loop.run_until_complete(
|
||||||
message=f"Error al procesar remesa {remesa_num}: {str(e)}",
|
update_task(
|
||||||
status="failed",
|
task_id=task_id,
|
||||||
pedimento_id=pedimento_id,
|
message=f"Error al procesar remesa {remesa_num}: {str(e)}",
|
||||||
organizacion_id=organizacion_id,
|
status="failed",
|
||||||
servicio=servicio,
|
pedimento_id=pedimento_id,
|
||||||
error=str(e)
|
organizacion_id=organizacion_id,
|
||||||
|
servicio=servicio,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
else:
|
||||||
|
# Si el loop está cerrado, crear uno nuevo temporal
|
||||||
|
logger.warning(f"Loop cerrado, creando loop temporal para actualizar error")
|
||||||
|
temp_loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(temp_loop)
|
||||||
|
try:
|
||||||
|
temp_loop.run_until_complete(
|
||||||
|
update_task(
|
||||||
|
task_id=task_id,
|
||||||
|
message=f"Error al procesar remesa {remesa_num}: {str(e)}",
|
||||||
|
status="failed",
|
||||||
|
pedimento_id=pedimento_id,
|
||||||
|
organizacion_id=organizacion_id,
|
||||||
|
servicio=servicio
|
||||||
|
)
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
temp_loop.close()
|
||||||
except Exception as update_error:
|
except Exception as update_error:
|
||||||
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
logger.error(f"Error actualizando estado de tarea: {update_error}")
|
||||||
|
|
||||||
# Re-lanzar la excepción para que Celery la marque como fallida
|
# Re-lanzar la excepción para que Celery la marque como fallida
|
||||||
|
if isinstance(e, _HTTPException):
|
||||||
|
raise RuntimeError(f"HTTP {e.status_code}: {e.detail}") from None
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# Limpiar el event loop
|
||||||
|
if not loop.is_closed():
|
||||||
|
loop.close()
|
||||||
0
api/api_v2/modules/stream/__init__.py
Normal file
0
api/api_v2/modules/stream/__init__.py
Normal file
97
api/api_v2/modules/stream/routers.py
Normal file
97
api/api_v2/modules/stream/routers.py
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
import redis.asyncio as aioredis
|
||||||
|
from fastapi import APIRouter, Request
|
||||||
|
from fastapi.responses import StreamingResponse
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CHANNEL_PREFIX = "audit_task:"
|
||||||
|
STATE_PREFIX = "audit_task_state:"
|
||||||
|
HEARTBEAT_INTERVAL = 25 # segundos
|
||||||
|
MAX_STREAM_SECONDS = 7200 # 2 horas
|
||||||
|
|
||||||
|
|
||||||
|
def _redis_url() -> str:
|
||||||
|
# Usa el Redis compartido con Django backend para leer los eventos publicados por sus tasks.
|
||||||
|
host = os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost"))
|
||||||
|
port = os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", "6379"))
|
||||||
|
db = os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", "0"))
|
||||||
|
return f"redis://{host}:{port}/{db}"
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/stream/tasks/{task_id}")
|
||||||
|
async def stream_task_events(task_id: str, request: Request):
|
||||||
|
"""
|
||||||
|
SSE endpoint — el cliente se conecta y recibe eventos de progreso de la tarea
|
||||||
|
en tiempo real vía Redis Pub/Sub.
|
||||||
|
|
||||||
|
Cabeceras requeridas en Nginx upstream:
|
||||||
|
proxy_read_timeout 7200;
|
||||||
|
proxy_buffering off;
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def event_generator():
|
||||||
|
r = aioredis.from_url(_redis_url(), decode_responses=True)
|
||||||
|
pubsub = r.pubsub()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Enviar estado actual si ya existe (cliente que llega tarde)
|
||||||
|
current_raw = await r.get(f"{STATE_PREFIX}{task_id}")
|
||||||
|
if current_raw:
|
||||||
|
current = json.loads(current_raw)
|
||||||
|
yield f"event: task_update\ndata: {current_raw}\n\n"
|
||||||
|
if current.get("status") in ("completed", "failed"):
|
||||||
|
return
|
||||||
|
|
||||||
|
await pubsub.subscribe(f"{CHANNEL_PREFIX}{task_id}")
|
||||||
|
|
||||||
|
tick = 0
|
||||||
|
while tick < MAX_STREAM_SECONDS:
|
||||||
|
if await request.is_disconnected():
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
msg = await asyncio.wait_for(
|
||||||
|
pubsub.get_message(ignore_subscribe_messages=True),
|
||||||
|
timeout=1.0,
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
msg = None
|
||||||
|
|
||||||
|
if msg and msg["type"] == "message":
|
||||||
|
data_str = msg["data"]
|
||||||
|
yield f"event: task_update\ndata: {data_str}\n\n"
|
||||||
|
parsed = json.loads(data_str)
|
||||||
|
if parsed.get("status") in ("completed", "failed"):
|
||||||
|
break
|
||||||
|
|
||||||
|
tick += 1
|
||||||
|
if tick % HEARTBEAT_INTERVAL == 0:
|
||||||
|
yield f"event: heartbeat\ndata: {{}}\n\n"
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error(f"[stream] Error en SSE para tarea {task_id}: {exc}")
|
||||||
|
yield f"event: error\ndata: {json.dumps({'message': str(exc)})}\n\n"
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
await pubsub.unsubscribe(f"{CHANNEL_PREFIX}{task_id}")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
await r.aclose()
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
event_generator(),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"X-Accel-Buffering": "no", # deshabilita buffering en Nginx
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
},
|
||||||
|
)
|
||||||
@@ -4,6 +4,8 @@ from typing import Dict, Any
|
|||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
from ..common import create_error_response
|
from ..common import create_error_response
|
||||||
from core.config import settings
|
from core.config import settings
|
||||||
|
from core.redis_events import publish_task_event
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
async def update_task(
|
async def update_task(
|
||||||
@@ -55,8 +57,17 @@ async def update_task(
|
|||||||
json=update_data,
|
json=update_data,
|
||||||
headers=headers
|
headers=headers
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if response.status_code == 404:
|
||||||
|
logger.warning(f"Tarea {task_id} no encontrada, intentando crearla...")
|
||||||
|
return await _create_and_update_task(
|
||||||
|
task_id, message, status, pedimento_id, organizacion_id, servicio
|
||||||
|
)
|
||||||
|
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
publish_task_event(task_id, status, message)
|
||||||
return response.json()
|
return response.json()
|
||||||
|
|
||||||
except httpx.HTTPError as e:
|
except httpx.HTTPError as e:
|
||||||
logger.error(f"Error al actualizar tarea {task_id}: {str(e)}")
|
logger.error(f"Error al actualizar tarea {task_id}: {str(e)}")
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
@@ -81,6 +92,72 @@ async def update_task(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _create_and_update_task(
|
||||||
|
task_id: str,
|
||||||
|
message: str,
|
||||||
|
status: str,
|
||||||
|
pedimento_id: str,
|
||||||
|
organizacion_id: str,
|
||||||
|
servicio: int
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Función interna para crear una tarea y luego actualizarla.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Primero crear la tarea
|
||||||
|
logger.info(f"Creando tarea {task_id} antes de actualizar")
|
||||||
|
|
||||||
|
headers = {
|
||||||
|
"Authorization": f"Token {settings.API_TOKEN}"
|
||||||
|
}
|
||||||
|
|
||||||
|
create_data = {
|
||||||
|
"task_id": task_id,
|
||||||
|
"message": message,
|
||||||
|
"status": status,
|
||||||
|
"pedimento": pedimento_id,
|
||||||
|
"organizacion": organizacion_id,
|
||||||
|
"servicio": servicio
|
||||||
|
}
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
# Crear la tarea
|
||||||
|
create_response = await client.post(
|
||||||
|
f"{settings.API_URL}/tasks/tasks/",
|
||||||
|
json=create_data,
|
||||||
|
headers=headers
|
||||||
|
)
|
||||||
|
create_response.raise_for_status()
|
||||||
|
logger.info(f"Tarea {task_id} creada exitosamente")
|
||||||
|
|
||||||
|
# Actualizar la tarea recién creada
|
||||||
|
url = f"{settings.API_URL}/tasks/tasks/{task_id}/"
|
||||||
|
update_response = await client.put(
|
||||||
|
url,
|
||||||
|
json=create_data,
|
||||||
|
headers=headers
|
||||||
|
)
|
||||||
|
update_response.raise_for_status()
|
||||||
|
logger.info(f"Tarea {task_id} actualizada exitosamente después de crear")
|
||||||
|
|
||||||
|
return update_response.json()
|
||||||
|
|
||||||
|
except httpx.HTTPError as e:
|
||||||
|
logger.error(f"Error al crear/actualizar tarea {task_id}: {str(e)}")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail=create_error_response(
|
||||||
|
message="Error al crear la tarea",
|
||||||
|
errors=[str(e)],
|
||||||
|
metadata={
|
||||||
|
"task_id": task_id,
|
||||||
|
"status": status
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def register_task(
|
async def register_task(
|
||||||
task_id: str,
|
task_id: str,
|
||||||
message: str,
|
message: str,
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
from celery import Celery
|
from celery import Celery
|
||||||
from core.config import settings
|
from core.config import settings
|
||||||
import os
|
import os
|
||||||
|
from celery.schedules import crontab
|
||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
# Configuración de Celery
|
# Configuración de Celery
|
||||||
celery_app = Celery(
|
celery_app = Celery(
|
||||||
@@ -32,6 +34,15 @@ celery_app.conf.update(
|
|||||||
task_save_success_on_complete=True # Guarda los resultados exitosos
|
task_save_success_on_complete=True # Guarda los resultados exitosos
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# desde aqui se pueden programar tareas pero al estar desde microservicios no tengo acceso a la informacion de organizacion
|
||||||
|
# por este motivo es mejor programar las tareas desde la aplicacion de django
|
||||||
|
celery_app.conf.beat_schedule = {
|
||||||
|
# 'prueba-4': {
|
||||||
|
# 'task': 'tasks.prueba_hola',
|
||||||
|
# 'schedule': timedelta(seconds=60), # 12:00 = 6:03 AM
|
||||||
|
# },
|
||||||
|
}
|
||||||
|
|
||||||
# Autodiscovery of tasks
|
# Autodiscovery of tasks
|
||||||
celery_app.autodiscover_tasks()
|
celery_app.autodiscover_tasks()
|
||||||
|
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ class APIRESTController:
|
|||||||
"""
|
"""
|
||||||
Método asíncrono para hacer peticiones a la API usando httpx.
|
Método asíncrono para hacer peticiones a la API usando httpx.
|
||||||
"""
|
"""
|
||||||
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/') }"
|
url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}"
|
||||||
|
|
||||||
logger.warning(f"Realizando petición {method} a {url}")
|
logger.warning(f"Realizando petición {method} a {url}")
|
||||||
try:
|
try:
|
||||||
@@ -58,6 +58,8 @@ class APIRESTController:
|
|||||||
response = await client.post(url, json=data, headers=self.headers)
|
response = await client.post(url, json=data, headers=self.headers)
|
||||||
elif method.upper() == 'PUT':
|
elif method.upper() == 'PUT':
|
||||||
response = await client.put(url, json=data, headers=self.headers)
|
response = await client.put(url, json=data, headers=self.headers)
|
||||||
|
elif method.upper() == 'PATCH':
|
||||||
|
response = await client.patch(url, json=data, headers=self.headers)
|
||||||
elif method.upper() == 'DELETE':
|
elif method.upper() == 'DELETE':
|
||||||
response = await client.delete(url, headers=self.headers)
|
response = await client.delete(url, headers=self.headers)
|
||||||
else:
|
else:
|
||||||
@@ -91,6 +93,47 @@ class APIRESTController:
|
|||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
async def post_or_update_document(
|
||||||
|
self, soap_response=None, organizacion: str = None,
|
||||||
|
pedimento: str = None, file_name: str = None,
|
||||||
|
document_type: int = None, fuente: int = 2,
|
||||||
|
identifier: str = None, binary_content: bytes = None,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Guarda un documento VU (request o error). Si ya existe uno del mismo
|
||||||
|
tipo/pedimento/identificador, elimina el anterior y crea el nuevo,
|
||||||
|
evitando duplicados en ejecuciones recurrentes (tarea programada diaria).
|
||||||
|
|
||||||
|
identifier: cadena única del documento dentro del pedimento
|
||||||
|
(ej: número de COVE, número de e-document). Se usa como
|
||||||
|
filtro archivo__icontains para distinguir documentos del
|
||||||
|
mismo tipo pertenecientes a distintos COVEs o edocs.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
query = f'record/documents/?pedimento={pedimento}&document_type={document_type}'
|
||||||
|
if identifier:
|
||||||
|
query += f'&archivo__icontains={identifier}'
|
||||||
|
existing = await self._make_request_async('GET', query)
|
||||||
|
if existing:
|
||||||
|
results = existing.get('results', existing) if isinstance(existing, dict) else existing
|
||||||
|
if isinstance(results, list) and results:
|
||||||
|
existing_id = results[0].get('id')
|
||||||
|
if existing_id:
|
||||||
|
await self._make_request_async('DELETE', f'record/documents/{existing_id}/')
|
||||||
|
logger.info(f"Documento VU previo eliminado: id={existing_id}, document_type={document_type}, identifier={identifier}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"No se pudo verificar/eliminar documento VU existente (document_type={document_type}, identifier={identifier}): {e}")
|
||||||
|
|
||||||
|
return await self.post_document(
|
||||||
|
soap_response=soap_response,
|
||||||
|
binary_content=binary_content,
|
||||||
|
organizacion=organizacion,
|
||||||
|
pedimento=pedimento,
|
||||||
|
file_name=file_name,
|
||||||
|
document_type=document_type,
|
||||||
|
fuente=fuente,
|
||||||
|
)
|
||||||
|
|
||||||
async def put_procesamiento(self, service_id: int, data: Dict[str, Any]) -> Dict[str, Any]:
|
async def put_procesamiento(self, service_id: int, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
return await self._make_request_async('PUT', f'customs/procesamientopedimentos/{service_id}/', data=data)
|
return await self._make_request_async('PUT', f'customs/procesamientopedimentos/{service_id}/', data=data)
|
||||||
|
|
||||||
@@ -242,7 +285,10 @@ class APIController:
|
|||||||
"""
|
"""
|
||||||
Método para obtener la lista de servicios desde la API.
|
Método para obtener la lista de servicios desde la API.
|
||||||
"""
|
"""
|
||||||
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}')
|
# return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}')
|
||||||
|
# eliminar filtro de estado, estado tiene 4 tipos 1: En espera, 2: Procesando, 3: Finalizado y 4: Error
|
||||||
|
# lo elimine para poder ejecutar la operacion siempre que sea necesario
|
||||||
|
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&servicio={service_type}')
|
||||||
|
|
||||||
async def get_pedimento(self, pedimento_id: str) -> Dict[str, Any]:
|
async def get_pedimento(self, pedimento_id: str) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
@@ -461,25 +507,29 @@ class APIController:
|
|||||||
"""
|
"""
|
||||||
return await self._make_request_async('PUT', f'customs/edocuments/{edocument_id}/', data=data)
|
return await self._make_request_async('PUT', f'customs/edocuments/{edocument_id}/', data=data)
|
||||||
|
|
||||||
async def get_cer(self, id: str) -> bytes:
|
async def put_cove(self, cove_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
"""
|
return await self._make_request_async('PUT', f'customs/coves/{cove_id}/', data=data)
|
||||||
Método para obtener un certificado específico desde la API (como binario).
|
|
||||||
Args:
|
|
||||||
id: UUID del certificado a consultar
|
|
||||||
Returns:
|
|
||||||
bytes: Contenido binario del certificado
|
|
||||||
"""
|
|
||||||
return await self._make_request_async('GET', f'vucem/vucem/{id}/download_cer/', return_bytes=True)
|
|
||||||
|
|
||||||
async def get_key(self, id: str) -> bytes:
|
async def get_key(self, id: str) -> bytes:
|
||||||
"""
|
result = await self._make_request_async('GET', f'vucem/vucem/{id}/download_key/', return_bytes=True)
|
||||||
Método para obtener una llave específica desde la API (como binario).
|
|
||||||
Args:
|
if result is None:
|
||||||
id: UUID de la llave a consultar
|
logger.info(f"get_key retornó None")
|
||||||
Returns:
|
else:
|
||||||
bytes: Contenido binario de la llave
|
logger.info(f"get_key retornó {len(result)} bytes")
|
||||||
"""
|
|
||||||
return await self._make_request_async('GET', f'vucem/vucem/{id}/download_key/', return_bytes=True)
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
async def get_cer(self, id: str) -> bytes:
|
||||||
|
result = await self._make_request_async('GET', f'vucem/vucem/{id}/download_cer/', return_bytes=True)
|
||||||
|
|
||||||
|
if result is None:
|
||||||
|
logger.info(f"get_cer retornó None")
|
||||||
|
else:
|
||||||
|
logger.info(f"get_cer retornó {len(result)} bytes")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
async def _make_request_async(self, method: str, endpoint: str, data=None, return_bytes: bool = False):
|
async def _make_request_async(self, method: str, endpoint: str, data=None, return_bytes: bool = False):
|
||||||
"""
|
"""
|
||||||
@@ -497,13 +547,17 @@ class APIController:
|
|||||||
try:
|
try:
|
||||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||||
logger.info(f"Haciendo petición {method} a {url}")
|
logger.info(f"Haciendo petición {method} a {url}")
|
||||||
|
print(f"Haciendo petición {method} a {url}")
|
||||||
|
|
||||||
if method.upper() == 'GET':
|
if method.upper() == 'GET':
|
||||||
response = await client.get(url, headers=self.headers)
|
response = await client.get(url, headers=self.headers)
|
||||||
elif method.upper() == 'POST':
|
elif method.upper() == 'POST':
|
||||||
response = await client.post(url, json=data, headers=self.headers)
|
response = await client.post(url, json=data, headers=self.headers)
|
||||||
|
print(f"response >>>> {response}")
|
||||||
elif method.upper() == 'PUT':
|
elif method.upper() == 'PUT':
|
||||||
response = await client.put(url, json=data, headers=self.headers)
|
response = await client.put(url, json=data, headers=self.headers)
|
||||||
|
elif method.upper() == 'PATCH':
|
||||||
|
response = await client.patch(url, json=data, headers=self.headers)
|
||||||
elif method.upper() == 'DELETE':
|
elif method.upper() == 'DELETE':
|
||||||
response = await client.delete(url, headers=self.headers)
|
response = await client.delete(url, headers=self.headers)
|
||||||
else:
|
else:
|
||||||
|
|||||||
44
core/redis_events.py
Normal file
44
core/redis_events.py
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
import redis
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CHANNEL_PREFIX = "audit_task:"
|
||||||
|
STATE_PREFIX = "audit_task_state:"
|
||||||
|
STATE_TTL = 7200 # 2 horas
|
||||||
|
|
||||||
|
|
||||||
|
def _get_sync_redis():
|
||||||
|
# REDIS_PUBSUB_HOST apunta al Redis compartido con el backend Django.
|
||||||
|
# En dev: redis_backend_dev. Por defecto usa REDIS_HOST (para entornos con una sola instancia).
|
||||||
|
return redis.Redis(
|
||||||
|
host=os.getenv("REDIS_PUBSUB_HOST", os.getenv("REDIS_HOST", "localhost")),
|
||||||
|
port=int(os.getenv("REDIS_PUBSUB_PORT", os.getenv("REDIS_PORT", 6379))),
|
||||||
|
db=int(os.getenv("REDIS_PUBSUB_DB", os.getenv("REDIS_DB", 0))),
|
||||||
|
decode_responses=True,
|
||||||
|
socket_connect_timeout=2,
|
||||||
|
socket_timeout=2,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def publish_task_event(task_id: str, status: str, message: str = "", resultado: dict = None, progress: int = None):
|
||||||
|
"""
|
||||||
|
Publica un evento de progreso de tarea en Redis Pub/Sub.
|
||||||
|
Guarda el último estado en una key con TTL para clientes que se conectan tarde.
|
||||||
|
"""
|
||||||
|
payload: dict = {"task_id": task_id, "status": status, "message": message}
|
||||||
|
if resultado is not None:
|
||||||
|
payload["resultado"] = resultado
|
||||||
|
if progress is not None:
|
||||||
|
payload["progress"] = progress
|
||||||
|
|
||||||
|
try:
|
||||||
|
client = _get_sync_redis()
|
||||||
|
serialized = json.dumps(payload)
|
||||||
|
client.publish(f"{CHANNEL_PREFIX}{task_id}", serialized)
|
||||||
|
client.setex(f"{STATE_PREFIX}{task_id}", STATE_TTL, serialized)
|
||||||
|
client.close()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error(f"[redis_events] Error publicando evento para tarea {task_id}: {exc}")
|
||||||
@@ -1,6 +1,5 @@
|
|||||||
from pydantic import BaseModel, Field, field_validator
|
from pydantic import BaseModel, Field, field_validator
|
||||||
from typing import Optional
|
from typing import Optional, List, Union
|
||||||
|
|
||||||
|
|
||||||
class ServiceBaseSchema(BaseModel):
|
class ServiceBaseSchema(BaseModel):
|
||||||
"""Esquema base para servicios con campos comunes"""
|
"""Esquema base para servicios con campos comunes"""
|
||||||
@@ -44,3 +43,40 @@ class ServiceRemesaSchema(BaseModel):
|
|||||||
if not v or not v.strip():
|
if not v or not v.strip():
|
||||||
raise ValueError('Los campos de texto no pueden estar vacíos')
|
raise ValueError('Los campos de texto no pueden estar vacíos')
|
||||||
return v.strip()
|
return v.strip()
|
||||||
|
|
||||||
|
class MultiPedimentoCompletoSchema(BaseModel):
|
||||||
|
"""Esquema para procesar múltiples pedimentos de forma asíncrona"""
|
||||||
|
organizacion: str = Field(..., description="ID de la organización")
|
||||||
|
pedimentos: List[str] = Field(
|
||||||
|
...,
|
||||||
|
description="Lista de IDs de pedimentos a procesar",
|
||||||
|
min_length=1,
|
||||||
|
max_length=200
|
||||||
|
)
|
||||||
|
|
||||||
|
@field_validator('organizacion')
|
||||||
|
def validate_organizacion(cls, v):
|
||||||
|
if not v or not v.strip():
|
||||||
|
raise ValueError('La organización no puede estar vacía')
|
||||||
|
return v.strip()
|
||||||
|
|
||||||
|
@field_validator('pedimentos')
|
||||||
|
def validate_pedimentos(cls, v):
|
||||||
|
if not v:
|
||||||
|
raise ValueError('Debe proporcionar al menos un pedimento')
|
||||||
|
if len(v) > 200:
|
||||||
|
raise ValueError(f'Máximo 200 pedimentos por solicitud. Recibidos: {len(v)}')
|
||||||
|
# Eliminar duplicados y vacíos
|
||||||
|
pedimentos_limpios = [p.strip() for p in v if p and p.strip()]
|
||||||
|
if not pedimentos_limpios:
|
||||||
|
raise ValueError('La lista de pedimentos no puede estar vacía')
|
||||||
|
# Eliminar duplicados
|
||||||
|
return list(set(pedimentos_limpios))
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
json_schema_extra = {
|
||||||
|
"example": {
|
||||||
|
"organizacion": "1",
|
||||||
|
"pedimentos": ["123", "456", "789"]
|
||||||
|
}
|
||||||
|
}
|
||||||
260
tasks.py
260
tasks.py
@@ -2,7 +2,7 @@ from celery import Celery
|
|||||||
from celery_app import celery_app
|
from celery_app import celery_app
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any, List
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from controllers.RESTController import rest_controller
|
from controllers.RESTController import rest_controller
|
||||||
from controllers.SOAPController import soap_controller
|
from controllers.SOAPController import soap_controller
|
||||||
@@ -125,9 +125,12 @@ def pedimento_completo_task(self, request_data: Dict[str, Any]):
|
|||||||
|
|
||||||
# Subir documento de pedimento completo si la petición fue exitosa
|
# Subir documento de pedimento completo si la petición fue exitosa
|
||||||
try:
|
try:
|
||||||
|
# solucion al error de descarga de un e-document, el mapeo de identificador no llegaba y ni siquiera insertaba registros en
|
||||||
|
# la tabla
|
||||||
|
identificadores_ed_response = xml_content.get('identificadores_ed', []) if 'xml_content' in locals() else []
|
||||||
upload_result = await _post_edocuments(
|
upload_result = await _post_edocuments(
|
||||||
response_service=service_data,
|
response_service=service_data,
|
||||||
identificadores_ed=[soap_response.get('documento', {}).get('numero_edocument')]
|
identificadores_ed=identificadores_ed_response
|
||||||
)
|
)
|
||||||
logger.info(f"Documento de pedimento completo subido exitosamente: {upload_result}")
|
logger.info(f"Documento de pedimento completo subido exitosamente: {upload_result}")
|
||||||
except Exception as upload_err:
|
except Exception as upload_err:
|
||||||
@@ -158,6 +161,173 @@ def pedimento_completo_task(self, request_data: Dict[str, Any]):
|
|||||||
|
|
||||||
return run_async_task(_execute_pedimento_completo)
|
return run_async_task(_execute_pedimento_completo)
|
||||||
|
|
||||||
|
@celery_app.task(bind=True, name='tasks.multi_pedimento_completo_task')
|
||||||
|
def multi_pedimento_completo_task(self, pedimentos: List[str], organizacion: str):
|
||||||
|
"""
|
||||||
|
Tarea asíncrona para procesar MÚLTIPLES pedimentos completos.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pedimentos: Lista de IDs de pedimentos a procesar
|
||||||
|
organizacion: ID de la organización
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
results = {
|
||||||
|
"total": len(pedimentos),
|
||||||
|
"successful": [],
|
||||||
|
"failed": [],
|
||||||
|
"started_at": datetime.utcnow().isoformat()
|
||||||
|
}
|
||||||
|
|
||||||
|
total = len(pedimentos)
|
||||||
|
|
||||||
|
for idx, pedimento_id in enumerate(pedimentos, 1):
|
||||||
|
try:
|
||||||
|
# Actualizar progreso (igual que en tus otras tareas)
|
||||||
|
self.update_state(
|
||||||
|
state='PROGRESS',
|
||||||
|
meta={
|
||||||
|
'status': f'Procesando pedimento {idx}/{total}',
|
||||||
|
'current': idx,
|
||||||
|
'total': total,
|
||||||
|
'current_pedimento': pedimento_id,
|
||||||
|
'percentage': round((idx / total) * 100, 2)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"[MULTI] Procesando pedimento {idx}/{total}: {pedimento_id}")
|
||||||
|
|
||||||
|
# Preparar datos exactamente como lo espera la tarea individual
|
||||||
|
request_data = {
|
||||||
|
"pedimento": pedimento_id,
|
||||||
|
"organizacion": organizacion
|
||||||
|
}
|
||||||
|
|
||||||
|
# Reutilizar la lógica de la tarea individual
|
||||||
|
# Esto ejecuta el mismo código que tu endpoint individual
|
||||||
|
async def _execute():
|
||||||
|
return await _execute_pedimento_completo_logic(request_data)
|
||||||
|
|
||||||
|
result = run_async_task(_execute)
|
||||||
|
|
||||||
|
results["successful"].append({
|
||||||
|
"pedimento_id": pedimento_id,
|
||||||
|
"result": result
|
||||||
|
})
|
||||||
|
|
||||||
|
logger.info(f"[MULTI] Pedimento {pedimento_id} procesado exitosamente")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[MULTI] Error procesando pedimento {pedimento_id}: {e}")
|
||||||
|
results["failed"].append({
|
||||||
|
"pedimento_id": pedimento_id,
|
||||||
|
"error": str(e)
|
||||||
|
})
|
||||||
|
|
||||||
|
elapsed_time = time.time() - start_time
|
||||||
|
|
||||||
|
results["completed_at"] = datetime.utcnow().isoformat()
|
||||||
|
results["elapsed_seconds"] = round(elapsed_time, 2)
|
||||||
|
results["success_count"] = len(results["successful"])
|
||||||
|
results["failed_count"] = len(results["failed"])
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
async def _execute_pedimento_completo_logic(request_data: dict) -> dict:
|
||||||
|
"""
|
||||||
|
Lógica compartida para procesar un pedimento completo.
|
||||||
|
Esta es la misma lógica que usa tu endpoint individual.
|
||||||
|
"""
|
||||||
|
operation_name = "pedimento_completo"
|
||||||
|
service_data = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info(f"Procesando pedimento completo - Pedimento: {request_data['pedimento']}")
|
||||||
|
|
||||||
|
# Validar datos de entrada
|
||||||
|
await _validate_request_data(request_data)
|
||||||
|
|
||||||
|
# Obtener servicio
|
||||||
|
service_data = await _get_pedimento_service(
|
||||||
|
pedimento_id=request_data['pedimento'],
|
||||||
|
service_type=3,
|
||||||
|
operation_name=operation_name
|
||||||
|
)
|
||||||
|
|
||||||
|
# Actualizar estado a "En proceso"
|
||||||
|
update_success = await _update_service_status(
|
||||||
|
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
||||||
|
)
|
||||||
|
|
||||||
|
if not update_success:
|
||||||
|
raise Exception("Error al actualizar estado del servicio")
|
||||||
|
|
||||||
|
# Obtener credenciales VUCEM
|
||||||
|
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
||||||
|
if not contribuyente_id:
|
||||||
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||||
|
raise Exception("ID de contribuyente no encontrado")
|
||||||
|
|
||||||
|
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
||||||
|
|
||||||
|
# Procesar petición SOAP
|
||||||
|
soap_response = await get_soap_pedimento_completo(
|
||||||
|
credenciales=credentials,
|
||||||
|
response_service=service_data,
|
||||||
|
soap_controller=soap_controller
|
||||||
|
)
|
||||||
|
|
||||||
|
if not soap_response:
|
||||||
|
raise Exception("Error en la petición SOAP")
|
||||||
|
|
||||||
|
# Actualizar datos del pedimento
|
||||||
|
xml_content = soap_response.get('xml_content', {})
|
||||||
|
if xml_content:
|
||||||
|
update_content = {k: v for k, v in xml_content.items() if k != 'identificadores_ed'}
|
||||||
|
update_content['existe_expediente'] = True
|
||||||
|
await rest_controller.put_pedimento(
|
||||||
|
service_data['pedimento']['id'],
|
||||||
|
update_content
|
||||||
|
)
|
||||||
|
|
||||||
|
# Procesar COVEs
|
||||||
|
coves = xml_content.get('coves', [])
|
||||||
|
if coves:
|
||||||
|
await _post_coves(
|
||||||
|
response_service=service_data,
|
||||||
|
coves=coves
|
||||||
|
)
|
||||||
|
|
||||||
|
# Procesar documentos digitalizados
|
||||||
|
identificadores_ed = xml_content.get('identificadores_ed', [])
|
||||||
|
if identificadores_ed:
|
||||||
|
await _post_edocuments(
|
||||||
|
response_service=service_data,
|
||||||
|
identificadores_ed=identificadores_ed
|
||||||
|
)
|
||||||
|
|
||||||
|
# Finalizar servicio
|
||||||
|
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"pedimento_id": request_data['pedimento'],
|
||||||
|
"message": "Pedimento procesado exitosamente"
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error en pedimento {request_data['pedimento']}: {e}")
|
||||||
|
if service_data:
|
||||||
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"pedimento_id": request_data['pedimento'],
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
|
||||||
@celery_app.task(bind=True)
|
@celery_app.task(bind=True)
|
||||||
def partidas_task(self, **kwargs):
|
def partidas_task(self, **kwargs):
|
||||||
"""Tarea asíncrona para obtener partidas"""
|
"""Tarea asíncrona para obtener partidas"""
|
||||||
@@ -459,6 +629,19 @@ def acuse_task(self, **kwargs):
|
|||||||
documento_info["documento"] = soap_response.get('documento', {})
|
documento_info["documento"] = soap_response.get('documento', {})
|
||||||
documentos_exitosos += 1
|
documentos_exitosos += 1
|
||||||
logger.info(f"Acuse del documento {idx + 1} procesado exitosamente")
|
logger.info(f"Acuse del documento {idx + 1} procesado exitosamente")
|
||||||
|
try:
|
||||||
|
await rest_controller.put_edocument(
|
||||||
|
edocument_id=edoc['id'],
|
||||||
|
data={
|
||||||
|
"id": edoc['id'],
|
||||||
|
"acuse_descargado": True,
|
||||||
|
"numero_edocument": edoc.get('numero_edocument'),
|
||||||
|
"pedimento": service_data['pedimento']['id'],
|
||||||
|
"organizacion": service_data['organizacion'],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except Exception as status_err:
|
||||||
|
logger.warning(f"Error actualizando acuse_descargado para edoc {edoc.get('numero_edocument')}: {status_err}")
|
||||||
else:
|
else:
|
||||||
documento_info["error"] = "Error en petición SOAP"
|
documento_info["error"] = "Error en petición SOAP"
|
||||||
logger.warning(f"No se pudo procesar el acuse del documento {idx + 1}")
|
logger.warning(f"No se pudo procesar el acuse del documento {idx + 1}")
|
||||||
@@ -665,28 +848,19 @@ def edocument_task(self, **kwargs):
|
|||||||
documento_info["documento"] = soap_response.get('documento', {})
|
documento_info["documento"] = soap_response.get('documento', {})
|
||||||
documentos_exitosos += 1
|
documentos_exitosos += 1
|
||||||
logger.info(f"E-document {idx + 1} procesado exitosamente")
|
logger.info(f"E-document {idx + 1} procesado exitosamente")
|
||||||
# Subir el documento si la petición fue exitosa
|
|
||||||
try:
|
try:
|
||||||
upload_result = await _post_edocuments(
|
await rest_controller.put_edocument(
|
||||||
response_service=service_data,
|
edocument_id=edoc['id'],
|
||||||
identificadores_ed=[edoc.get('numero_edocument')]
|
data={
|
||||||
|
"id": edoc['id'],
|
||||||
|
"edocument_descargado": True,
|
||||||
|
"numero_edocument": edoc.get('numero_edocument'),
|
||||||
|
"pedimento": service_data['pedimento']['id'],
|
||||||
|
"organizacion": service_data['organizacion'],
|
||||||
|
}
|
||||||
)
|
)
|
||||||
documento_info["upload_result"] = upload_result
|
except Exception as status_err:
|
||||||
logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente")
|
logger.warning(f"Error actualizando edocument_descargado para edoc {edoc.get('numero_edocument')}: {status_err}")
|
||||||
except Exception as upload_err:
|
|
||||||
documento_info["upload_error"] = str(upload_err)
|
|
||||||
logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}")
|
|
||||||
# Subir el documento si la petición fue exitosa
|
|
||||||
try:
|
|
||||||
upload_result = await _post_edocuments(
|
|
||||||
response_service=service_data,
|
|
||||||
identificadores_ed=[edoc.get('numero_edocument')]
|
|
||||||
)
|
|
||||||
documento_info["upload_result"] = upload_result
|
|
||||||
logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente")
|
|
||||||
except Exception as upload_err:
|
|
||||||
documento_info["upload_error"] = str(upload_err)
|
|
||||||
logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}")
|
|
||||||
else:
|
else:
|
||||||
documento_info["error"] = "Error en petición SOAP"
|
documento_info["error"] = "Error en petición SOAP"
|
||||||
logger.warning(f"No se pudo procesar el e-document {idx + 1}")
|
logger.warning(f"No se pudo procesar el e-document {idx + 1}")
|
||||||
@@ -807,6 +981,19 @@ def coves_task(self, **kwargs):
|
|||||||
documento_info["documento"] = soap_response.get('documento', {})
|
documento_info["documento"] = soap_response.get('documento', {})
|
||||||
documentos_exitosos += 1
|
documentos_exitosos += 1
|
||||||
logger.info(f"cove del documento {idx + 1} procesado exitosamente")
|
logger.info(f"cove del documento {idx + 1} procesado exitosamente")
|
||||||
|
try:
|
||||||
|
await rest_controller.put_cove(
|
||||||
|
cove_id=cove['id'],
|
||||||
|
data={
|
||||||
|
"id": cove['id'],
|
||||||
|
"cove_descargado": True,
|
||||||
|
"numero_cove": cove.get('numero_cove'),
|
||||||
|
"pedimento": service_data['pedimento']['id'],
|
||||||
|
"organizacion": service_data['organizacion'],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except Exception as status_err:
|
||||||
|
logger.warning(f"Error actualizando cove_descargado para cove {cove.get('numero_cove')}: {status_err}")
|
||||||
else:
|
else:
|
||||||
documento_info["error"] = "Error en petición SOAP"
|
documento_info["error"] = "Error en petición SOAP"
|
||||||
logger.warning(f"No se pudo procesar el cove del documento {idx + 1}")
|
logger.warning(f"No se pudo procesar el cove del documento {idx + 1}")
|
||||||
@@ -948,17 +1135,19 @@ def acuse_cove_task(self, **kwargs):
|
|||||||
documento_info["documento"] = soap_response.get('documento', {})
|
documento_info["documento"] = soap_response.get('documento', {})
|
||||||
documentos_exitosos += 1
|
documentos_exitosos += 1
|
||||||
logger.info(f"Acuse de COVE {idx + 1} procesado exitosamente")
|
logger.info(f"Acuse de COVE {idx + 1} procesado exitosamente")
|
||||||
# Subir el documento de COVE si la petición fue exitosa
|
|
||||||
try:
|
try:
|
||||||
upload_result = await _post_coves(
|
await rest_controller.put_cove(
|
||||||
response_service=service_data,
|
cove_id=cove['id'],
|
||||||
identificadores_cove=[cove.get('numero_cove')]
|
data={
|
||||||
|
"id": cove['id'],
|
||||||
|
"acuse_cove_descargado": True,
|
||||||
|
"numero_cove": cove.get('numero_cove'),
|
||||||
|
"pedimento": service_data['pedimento']['id'],
|
||||||
|
"organizacion": service_data['organizacion'],
|
||||||
|
}
|
||||||
)
|
)
|
||||||
documento_info["upload_result"] = upload_result
|
except Exception as status_err:
|
||||||
logger.info(f"Documento COVE {cove.get('numero_cove')} subido exitosamente")
|
logger.warning(f"Error actualizando acuse_cove_descargado para cove {cove.get('numero_cove')}: {status_err}")
|
||||||
except Exception as upload_err:
|
|
||||||
documento_info["upload_error"] = str(upload_err)
|
|
||||||
logger.error(f"Error al subir documento COVE {cove.get('numero_cove')}: {upload_err}")
|
|
||||||
else:
|
else:
|
||||||
documento_info["error"] = "Error en petición SOAP"
|
documento_info["error"] = "Error en petición SOAP"
|
||||||
logger.warning(f"No se pudo procesar el acuse de COVE {idx + 1}")
|
logger.warning(f"No se pudo procesar el acuse de COVE {idx + 1}")
|
||||||
@@ -999,3 +1188,12 @@ def acuse_cove_task(self, **kwargs):
|
|||||||
raise e
|
raise e
|
||||||
|
|
||||||
return run_async_task(_execute_acuse_cove)
|
return run_async_task(_execute_acuse_cove)
|
||||||
|
|
||||||
|
# @celery_app.task(name='tasks.prueba_hola')
|
||||||
|
# def prueba_hola():
|
||||||
|
# """
|
||||||
|
# Tarea de prueba: solo imprime un saludo para verificar la programación.
|
||||||
|
# """
|
||||||
|
# logger.info("¡Hola! Tarea programada ejecutada correctamente.")
|
||||||
|
# print("¡Hola desde la tarea programada!")
|
||||||
|
# return "OK"
|
||||||
@@ -8,9 +8,8 @@ def soap_error(soap_response): # Testeado
|
|||||||
Returns:
|
Returns:
|
||||||
bool: True si no hay errores, False en caso contrario
|
bool: True si no hay errores, False en caso contrario
|
||||||
"""
|
"""
|
||||||
if '<ns2:tieneError>true</ns2:tieneError>' in soap_response.text:
|
# Cubre cualquier variante de namespace y capitalización (tieneError, TieneError, ns2:tieneError, etc.)
|
||||||
return True
|
if 'tieneerror>true<' in soap_response.text.lower():
|
||||||
if '<ns3:tieneError>true</ns3:tieneError>' in soap_response.text:
|
|
||||||
return True
|
return True
|
||||||
if "<mensaje>El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo</mensaje>" in soap_response.text:
|
if "<mensaje>El Cove o Adenda no existe, no está firmado o no cuenta con la autorización para consultarlo</mensaje>" in soap_response.text:
|
||||||
return True
|
return True
|
||||||
|
|||||||
85
utils/minio_client.py
Normal file
85
utils/minio_client.py
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
# microservice/utils/minio_client.py
|
||||||
|
import os
|
||||||
|
from minio import Minio
|
||||||
|
from minio.error import S3Error
|
||||||
|
from datetime import timedelta
|
||||||
|
from typing import Optional, BinaryIO
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class MinIOClient:
|
||||||
|
"""Cliente MinIO para FastAPI"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.client = None
|
||||||
|
self.bucket_name = None
|
||||||
|
self._initialize()
|
||||||
|
|
||||||
|
def _initialize(self):
|
||||||
|
"""Inicializa el cliente MinIO"""
|
||||||
|
endpoint = os.environ.get('MINIO_ENDPOINT', 'minio:9000')
|
||||||
|
access_key = os.environ.get('MINIO_ACCESS_KEY')
|
||||||
|
secret_key = os.environ.get('MINIO_SECRET_KEY')
|
||||||
|
secure = os.environ.get('MINIO_SECURE', 'false').lower() == 'true'
|
||||||
|
self.bucket_name = os.environ.get('MINIO_BUCKET_NAME', 'efc-microservice-dev')
|
||||||
|
|
||||||
|
self.client = Minio(
|
||||||
|
endpoint=endpoint,
|
||||||
|
access_key=access_key,
|
||||||
|
secret_key=secret_key,
|
||||||
|
secure=secure
|
||||||
|
)
|
||||||
|
|
||||||
|
# Asegurar bucket
|
||||||
|
if not self.client.bucket_exists(self.bucket_name):
|
||||||
|
self.client.make_bucket(self.bucket_name)
|
||||||
|
logger.info(f"Bucket '{self.bucket_name}' creado")
|
||||||
|
|
||||||
|
def upload_file(
|
||||||
|
self,
|
||||||
|
object_name: str,
|
||||||
|
file_path: str = None,
|
||||||
|
file_data: bytes = None,
|
||||||
|
content_type: str = None
|
||||||
|
) -> bool:
|
||||||
|
"""Sube archivo a MinIO (síncrono para Celery)"""
|
||||||
|
try:
|
||||||
|
if file_path:
|
||||||
|
self.client.fput_object(
|
||||||
|
self.bucket_name,
|
||||||
|
object_name,
|
||||||
|
file_path,
|
||||||
|
content_type=content_type
|
||||||
|
)
|
||||||
|
elif file_data:
|
||||||
|
import io
|
||||||
|
data_stream = io.BytesIO(file_data)
|
||||||
|
self.client.put_object(
|
||||||
|
self.bucket_name,
|
||||||
|
object_name,
|
||||||
|
data_stream,
|
||||||
|
len(file_data),
|
||||||
|
content_type=content_type
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error subiendo archivo: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_presigned_url(self, object_name: str, expires: int = 3600) -> Optional[str]:
|
||||||
|
"""Genera URL firmada"""
|
||||||
|
try:
|
||||||
|
return self.client.presigned_get_object(
|
||||||
|
self.bucket_name,
|
||||||
|
object_name,
|
||||||
|
expires=timedelta(seconds=expires)
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error generando URL: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# Singleton
|
||||||
|
minio_client = MinIOClient()
|
||||||
@@ -603,7 +603,7 @@ async def get_soap_acuse(credenciales, response_service, soap_controller, edocum
|
|||||||
no_partidas = response_service['pedimento'].get('numero_partidas', 0)
|
no_partidas = response_service['pedimento'].get('numero_partidas', 0)
|
||||||
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
|
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
|
||||||
pedimento = response_service['pedimento'].get('pedimento', 'N/A')
|
pedimento = response_service['pedimento'].get('pedimento', 'N/A')
|
||||||
_file_name = f"vu_AC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{idx}.pdf"
|
_file_name = f"vu_AC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{edocument['numero_edocument']}.pdf"
|
||||||
|
|
||||||
# Enviar el documento PDF usando binary_content
|
# Enviar el documento PDF usando binary_content
|
||||||
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")
|
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")
|
||||||
@@ -710,7 +710,7 @@ async def get_soap_acuseCOVE(credenciales, response_service, soap_controller, co
|
|||||||
no_partidas = response_service['pedimento'].get('numero_partidas', 0)
|
no_partidas = response_service['pedimento'].get('numero_partidas', 0)
|
||||||
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
|
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
|
||||||
pedimento = response_service['pedimento'].get('pedimento', 'N/A')
|
pedimento = response_service['pedimento'].get('pedimento', 'N/A')
|
||||||
_file_name = f"vu_AC_COVE_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{idx}.pdf"
|
_file_name = f"vu_AC_COVE_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{cove['numero_cove']}.pdf"
|
||||||
|
|
||||||
# Enviar el documento PDF usando binary_content
|
# Enviar el documento PDF usando binary_content
|
||||||
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")
|
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")
|
||||||
@@ -884,7 +884,7 @@ async def get_soap_edocument(credenciales, response_service, soap_controller, ed
|
|||||||
no_partidas = response_service['pedimento'].get('numero_partidas', 0)
|
no_partidas = response_service['pedimento'].get('numero_partidas', 0)
|
||||||
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
|
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
|
||||||
pedimento = response_service['pedimento'].get('pedimento', 'N/A')
|
pedimento = response_service['pedimento'].get('pedimento', 'N/A')
|
||||||
_file_name = f"vu_EDC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{idx}.pdf"
|
_file_name = f"vu_EDC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{edocument['numero_edocument']}.pdf"
|
||||||
|
|
||||||
# Enviar el documento PDF usando binary_content
|
# Enviar el documento PDF usando binary_content
|
||||||
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")
|
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")
|
||||||
|
|||||||
Reference in New Issue
Block a user