7 Commits

19 changed files with 662 additions and 158 deletions

View File

@@ -1,11 +1,11 @@
from fastapi import APIRouter, HTTPException, BackgroundTasks
from schemas.pedimentoSchema import PedimentoRequest
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
from fastapi.responses import JSONResponse
from celery_app import celery_app
from tasks import pedimento_completo_task
import logging
from typing import Dict, Any
from typing import Dict, Any, List
import uuid
from datetime import datetime
@@ -63,6 +63,54 @@ async def async_get_pedimento_completo(request: ServiceRemesaSchema):
status_code=500,
detail=f"Error al agendar la tarea: {str(e)}"
)
@router.post("/async/services/pedimento_completo/multiple")
async def async_multi_pedimento_completo(request: MultiPedimentoCompletoSchema):
"""
Agenda la tarea de obtener pedimentos completos de forma asíncrona para MÚLTIPLES pedimentos.
Args:
request: MultiPedimentoCompletoSchema con lista de pedimentos y organización
Returns:
JSONResponse con task_id para consultar el estado de la tarea
Raises:
HTTPException: En caso de errores de validación
"""
try:
pedimentos_ids = request.pedimentos
organizacion = request.organizacion
logger.info(f"Agendando tarea para {len(pedimentos_ids)} pedimentos completos")
# Agendar la tarea en Celery
from tasks import multi_pedimento_completo_task
task = multi_pedimento_completo_task.delay(pedimentos_ids, organizacion)
# Crear respuesta inmediata (EXACTAMENTE igual a tus otras respuestas)
response_data = {
"success": True,
"message": f"Tarea agendada exitosamente para {len(pedimentos_ids)} pedimentos. La consulta de pedimentos completos se está procesando en segundo plano.",
"task_id": task.id,
"total_pedimentos": len(pedimentos_ids),
"pedimentos": pedimentos_ids,
"organizacion": organizacion,
"status": "PENDING",
"timestamp": datetime.utcnow().isoformat(),
"estimated_completion": f"{len(pedimentos_ids) * 2}-{len(pedimentos_ids) * 5} minutos",
"check_status_url": f"/async/task-status/{task.id}"
}
logger.info(f"Tarea múltiple agendada exitosamente - Task ID: {task.id}")
return JSONResponse(content=response_data, status_code=202)
except Exception as e:
logger.error(f"Error al agendar tarea múltiple de pedimento completo: {e}")
raise HTTPException(
status_code=500,
detail=f"Error al agendar la tarea: {str(e)}"
)
@router.get("/async/task-status/{task_id}")
async def get_task_status(task_id: str):

View File

@@ -1,6 +1,6 @@
from fastapi import APIRouter, HTTPException
from schemas.pedimentoSchema import PedimentoRequest
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema
from schemas.serviceSchema import ServiceBaseSchema, ServiceRemesaSchema, MultiPedimentoCompletoSchema
import asyncio
import logging
logger = logging.getLogger("app.api")

View File

@@ -33,19 +33,20 @@ async def obtener_acuse(**kwargs):
file_name_request = f"vu_AC_{pedimento_app}_{idEdocument_efc}_REQUEST.xml"
document_response = await acuse_rest_controller.post_document(
document_response = await acuse_rest_controller.post_or_update_document(
soap_response=soap_xml,
organizacion=organizacion_efc,
pedimento=pedimento_id_efc,
file_name=file_name_request,
document_type=25, # Tipo de documento para request de acuse VU
document_type=25,
identifier=idEdocument_efc,
)
except Exception as e:
logger.error(f"Error al enviar solicitud SOAP: {e}")
response = await acuse_vu_controller.make_request_async(
"ventanilla-acuses-HA/ConsultaAcusesServiceWS?wsdl",
"ventanilla-acuses-HA/ConsultaAcusesServiceWS?wsdl",
data=soap_xml,
headers=soap_headers
)
@@ -72,11 +73,32 @@ async def obtener_acuse(**kwargs):
if soap_error(response):
logger.error("Error SOAP detectado en la respuesta")
pedimento_efc = kwargs.get('pedimento', {})
organizacion_efc = pedimento_efc.get("organizacion", None)
pedimento_id_efc = pedimento_efc.get("id", None)
pedimento_app = pedimento_efc.get('pedimento_app', 'N/A')
idEdocument_efc = kwargs['edoc'].get('numero_edocument', 'N/A')
file_name_response = f"vu_AC_{pedimento_app}_{idEdocument_efc}_RESPONSE_ERROR.xml"
logger.info(f"Guardando RESPONSE_ERROR: file={file_name_response}, organizacion={organizacion_efc}, pedimento={pedimento_id_efc}")
doc_result = await acuse_rest_controller.post_or_update_document(
soap_response=response.text,
organizacion=organizacion_efc,
pedimento=pedimento_id_efc,
file_name=file_name_response,
document_type=26,
identifier=idEdocument_efc,
)
if doc_result is None:
logger.error(f"post_document retornó None para RESPONSE_ERROR — archivo físico guardado sin registro en BD")
else:
logger.info(f"RESPONSE_ERROR registrado en BD: id={doc_result.get('id')}, document_type={doc_result.get('document_type')}")
raise HTTPException(
status_code=500,
detail=create_error_response(
message="Error en la respuesta del servicio SOAP",
data={"soap_response": response.text[:500]}
data={"soap_response": response.text[:3000]}
)
)

View File

@@ -4,6 +4,7 @@ import asyncio
import logging
from typing import Dict, Any
from contextlib import asynccontextmanager
from fastapi import HTTPException
from .services import obtener_acuse
from api.api_v2.modules.tasks.services import register_task, update_task
@@ -88,6 +89,6 @@ def process_acuse_request(self, acuse_request: Dict[str, Any]) -> Dict[str, Any]
)
except Exception as update_error:
logging.error(f"Error al actualizar estado de tarea: {update_error}")
raise
return {"status": "failed", "message": error_message}
finally:
loop.close()

View File

@@ -91,12 +91,13 @@ async def consume_ws_get_cove(**kwargs):
# Enviar documento de request a EFC
try:
file_name_request = f"vu_COVE_{pedimento_app}_{cove}_REQUEST.xml"
document_response = await coves_rest_controller.post_document(
document_response = await coves_rest_controller.post_or_update_document(
soap_response=soap_xml,
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=file_name_request,
document_type=19,
identifier=cove,
)
except Exception as e:
logger.error(f"Error al enviar documento request: {e}")
@@ -117,12 +118,13 @@ async def consume_ws_get_cove(**kwargs):
raise Exception("No se recibió respuesta del servicio SOAP")
if soap_error(soap_response):
document_response = await coves_rest_controller.post_document(
document_response = await coves_rest_controller.post_or_update_document(
soap_response=soap_response,
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=f"vu_COVE_{pedimento_app}_{cove}_ERROR.xml",
document_type=20,
identifier=cove,
)
raise Exception("Error en la respuesta del servicio SOAP")
@@ -205,12 +207,13 @@ async def consume_ws_get_acuse_cove(**kwargs):
# Enviar documento de request a EFC
try:
file_name_request = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_REQUEST.xml"
document_response = await coves_rest_controller.post_document(
document_response = await coves_rest_controller.post_or_update_document(
soap_response=soap_xml,
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=file_name_request,
document_type=23,
identifier=kwargs['cove'].get('cove', 'N/A'),
)
except Exception as e:
logger.error(f"Error al enviar documento request de acuse cove: {e}")
@@ -251,12 +254,13 @@ async def consume_ws_get_acuse_cove(**kwargs):
if soap_error(response):
error_file_name = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_ERROR.xml"
try:
rest_response = await coves_rest_controller.post_document(
rest_response = await coves_rest_controller.post_or_update_document(
soap_response=response,
organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'),
file_name=error_file_name,
document_type=24,
identifier=kwargs['cove'].get('cove', 'N/A'),
)
except Exception as e:
logger.error(f"Error al guardar respuesta SOAP errónea: {e}")

View File

@@ -22,7 +22,8 @@ def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]:
organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
cove_info = cove_request.get('cove', {})
cove_number = cove_info.get('numero_cove', 'N/A')
# aqui se cambio de cove_number a 'cove' porque no esta asi en el mapeo
cove_number = cove_info.get('cove', 'N/A')
try:
# Registrar el inicio de la tarea
@@ -97,7 +98,8 @@ def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str,
organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
cove_info = cove_request.get('cove', {})
cove_number = cove_info.get('numero_cove', 'N/A')
# se cambio el cove_number, ya que no existe directamente en el mapeo, se conoce solo como 'cove'
cove_number = cove_info.get('cove', 'N/A')
try:
# Registrar el inicio de la tarea

View File

@@ -48,12 +48,13 @@ async def obtener_edoc(**kwargs):
file_name_request = f"VU_ED_{pedimento_app}_{numero_documento}_REQUEST.xml"
document_response = await edocs_rest_controller.post_document(
document_response = await edocs_rest_controller.post_or_update_document(
soap_response=soap_xml,
organizacion=organizacion_efc,
pedimento=pedimento_id_efc,
file_name=file_name_request,
document_type=21 # Tipo de documento para request de e-document,
document_type=21,
identifier=numero_documento,
)
except Exception as e:
@@ -97,6 +98,21 @@ async def obtener_edoc(**kwargs):
if soap_error(response):
logger.error("Respuesta SOAP contiene error de VUCEM")
_pedimento_efc = kwargs.get('pedimento', {})
_file_name_error = f"VU_ED_{_pedimento_efc.get('pedimento_app', 'N/A')}_{numero_documento}_RESPONSE_ERROR.xml"
logger.info(f"Guardando RESPONSE_ERROR doc_type=22: file={_file_name_error}, organizacion={_pedimento_efc.get('organizacion')}, pedimento={_pedimento_efc.get('id')}")
_doc_result = await edocs_rest_controller.post_or_update_document(
soap_response=response.text,
organizacion=_pedimento_efc.get('organizacion'),
pedimento=_pedimento_efc.get('id'),
file_name=_file_name_error,
document_type=22,
identifier=numero_documento,
)
if _doc_result is None:
logger.error("post_or_update_document retornó None para RESPONSE_ERROR doc_type=22 — archivo físico sin registro en BD")
else:
logger.info(f"RESPONSE_ERROR registrado en BD: id={_doc_result.get('id')}, document_type={_doc_result.get('document_type')}")
raise HTTPException(
status_code=500,
detail=create_error_response(
@@ -109,8 +125,23 @@ async def obtener_edoc(**kwargs):
try:
edoc_base64 = extract_pdf_bytes_from_xml_content(response.text)
except ValueError as ve:
except Exception as ve:
logger.error(f"Error extrayendo contenido del XML: {ve}")
_pedimento_efc = kwargs.get('pedimento', {})
_file_name_error = f"VU_ED_{_pedimento_efc.get('pedimento_app', 'N/A')}_{numero_documento}_RESPONSE_ERROR.xml"
logger.info(f"Guardando RESPONSE_ERROR doc_type=22 (parse error): file={_file_name_error}")
_doc_result = await edocs_rest_controller.post_or_update_document(
soap_response=response.text,
organizacion=_pedimento_efc.get('organizacion'),
pedimento=_pedimento_efc.get('id'),
file_name=_file_name_error,
document_type=22,
identifier=numero_documento,
)
if _doc_result is None:
logger.error("post_document retornó None para RESPONSE_ERROR doc_type=22 (parse error)")
else:
logger.info(f"RESPONSE_ERROR registrado en BD: id={_doc_result.get('id')}, document_type={_doc_result.get('document_type')}")
raise HTTPException(
status_code=500,
detail=create_error_response(
@@ -180,6 +211,8 @@ async def obtener_edoc(**kwargs):
document_type=5
)
print(f"rest_response >>>> {rest_response}")
if rest_response is None:
logger.error("Error al enviar el documento a la API interna")
raise HTTPException(
@@ -239,9 +272,10 @@ async def obtener_edoc(**kwargs):
async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
# estaba acualizando mal el status de descarga, actualizaba otro campo que no le correspondia
data = {
"id": edoc.get("id"),
"acuse_descargado": status,
"edocument_descargado": status,
"numero_edocument": edoc.get("numero_edocument"),
"pedimento": pedimento.get("id"),
"organizacion": pedimento.get("organizacion"),
@@ -252,42 +286,42 @@ async def change_edocument_status(edoc: dict, status: bool, pedimento: dict):
return response
NS = {
's': 'http://schemas.xmlsoap.org/soap/envelope/',
't': 'http://tempuri.org/',
'i': 'http://www.w3.org/2001/XMLSchema-instance'
}
# mejorar la extraccion de seccion File
def extract_pdf_bytes_from_xml_content(xml_content: str):
"""
Extrae el PDF y metadatos desde un string XML.
"""
root = ET.fromstring(xml_content)
file_elem = root.find('.//File')
if file_elem is None:
for elem in root.iter():
if elem.tag.endswith('File') and elem.text:
file_elem = elem
break
if file_elem is not None and file_elem.text and file_elem.text.strip():
base64_data = file_elem.text.strip().replace('\n', '').replace('\r', '')
pdf_bytes = base64.b64decode(base64_data)
cadena_original = None
sello_digital = None
cadena_elem = root.find('.//CadenaOriginal')
if cadena_elem is None:
for elem in root.iter():
if elem.tag.endswith('CadenaOriginal') and elem.text:
cadena_elem = elem
break
if cadena_elem is not None and cadena_elem.text:
cadena_original = cadena_elem.text.strip()
sello_elem = root.find('.//SelloDigital')
if sello_elem is None:
for elem in root.iter():
if elem.tag.endswith('SelloDigital') and elem.text:
sello_elem = elem
break
if sello_elem is not None and sello_elem.text:
sello_digital = sello_elem.text.strip()
return {
"pdf_bytes": pdf_bytes,
"cadena_original": cadena_original,
"sello_digital": sello_digital
}
else:
raise ValueError("No se encontró el tag <File> con contenido válido. Verifique que el XML contiene el tag <File> con datos base64.")
errores = root.find('.//t:Errores', NS)
tiene_error = root.find('.//t:TieneError', NS)
if tiene_error is not None and tiene_error.text == 'true':
err_msg = errores.text if errores is not None else 'Error desconocido'
raise Exception(f"VUCEM informa error: {err_msg}")
file_elem = root.find('.//t:File', NS)
if file_elem is None or file_elem.get('{http://www.w3.org/2001/XMLSchema-instance}nil') == 'true' or not file_elem.text:
raise ValueError("No se encontró el tag <File> con contenido válido.")
base64_data = file_elem.text.strip().replace('\n', '').replace('\r', '')
pdf_bytes = base64.b64decode(base64_data)
# Extraer CadenaOriginal y SelloDigital con namespaces
cadena_original = None
sello_digital = None
cadena_elem = root.find('.//t:CadenaOriginal', NS)
if cadena_elem is not None and cadena_elem.get('{http://www.w3.org/2001/XMLSchema-instance}nil') != 'true':
cadena_original = cadena_elem.text.strip() if cadena_elem.text else None
sello_elem = root.find('.//t:SelloDigital', NS)
if sello_elem is not None and sello_elem.get('{http://www.w3.org/2001/XMLSchema-instance}nil') != 'true':
sello_digital = sello_elem.text.strip() if sello_elem.text else None
return {
"pdf_bytes": pdf_bytes,
"cadena_original": cadena_original,
"sello_digital": sello_digital
}

View File

@@ -21,7 +21,8 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
organizacion_id = pedimento_info.get('organizacion')
pedimento_app = pedimento_info.get('pedimento_app', 'N/A')
edoc_info = edoc_data.get('edoc', {})
edoc_number = edoc_info.get('numero_edoc', 'N/A')
# el mapeo de numero de documento no estaba correcto, se corrigio
edoc_number = edoc_info.get('numero_edocument', 'N/A')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
@@ -36,7 +37,7 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
message=f"Iniciando proceso de descarga de E-document {edoc_number} para pedimento {pedimento_app}",
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=3 # 3 corresponde a "E-document"
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
)
)
@@ -52,7 +53,7 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
message=f"Descargando E-document {edoc_number} para pedimento {pedimento_app}",
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=3
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
)
)
@@ -67,7 +68,7 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
message=f"E-document {edoc_number} descargado exitosamente para pedimento {pedimento_app}",
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=3
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
)
)
@@ -86,7 +87,7 @@ def process_edoc_download_request(self, edoc_data: Dict) -> Dict:
message=error_message,
pedimento_id=pedimento_id,
organizacion_id=organizacion_id,
servicio=3
servicio=7 # cambiamos el servicio, el 7 le corresponde a los e documents
)
)
except Exception as update_error:

View File

@@ -240,7 +240,7 @@ class PedimentoXMLScraper: # Clase me extrae datos de Pedimento
# Extraer complemento1 (con namespace)
complemento1_elem = identificador.find('ns:complemento1', namespaces)
complemento1 = complemento1_elem.text if complemento1_elem is not None else None
# Agregar a la lista si tenemos los datos básicos
if clave and complemento1:
identificadores_ed.append({

View File

@@ -6,8 +6,9 @@ from .tasks import process_pedimento_completo_request
from .services import put_pedimento_data_vu
from api.api_v2.modules.tasks.services import register_task
import logging
logger = logging.getLogger("app.api")
from celery import group, chord, chain
logger = logging.getLogger("app.api")
router = APIRouter()
@router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED)

View File

@@ -1,4 +1,4 @@
from typing import Optional, Union, Dict, Any
from typing import Optional, Union, Dict, Any, List
from uuid import UUID
from datetime import datetime
# CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione

View File

@@ -64,12 +64,12 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
try:
file_name_request = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_REQUEST.xml"
document_response = await remesa_rest_controller.post_document(
document_response = await remesa_rest_controller.post_or_update_document(
soap_response=soap_xml,
organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'),
file_name=file_name_request,
document_type=15,
document_type=15,
)
except Exception as e:
@@ -90,12 +90,12 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
if soap_error(soap_response):
file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}_ERROR.xml"
document_response = await remesa_rest_controller.post_document(
document_response = await remesa_rest_controller.post_or_update_document(
soap_response=soap_response,
organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'),
file_name=file_name,
document_type=16,
document_type=16,
)
# Aquí necesitamos extraer el mensaje de error real
@@ -134,12 +134,12 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
# Enviar documento
try:
document_response = await remesa_rest_controller.post_document(
document_response = await remesa_rest_controller.post_or_update_document(
soap_response=soap_response,
organizacion=pedimento_data.get('organizacion'),
pedimento=pedimento_data.get('id'),
file_name=file_name,
document_type=5,
document_type=3,
)
except Exception as e:
@@ -149,12 +149,14 @@ async def obtener_remesa(**kwargs) -> Dict[str, Any]:
# Extraer datos del XML
try:
remesas_data = remesa_xml_scraper.extract_remesas(soap_response.text)
logger.info(
f"Remesas extraídas para pedimento {pedimento_data.get('pedimento_app')}: "
f"{len(remesas_data)} COVEs encontrados → {remesas_data}"
)
except Exception as e:
logger.error(f"Error al extraer datos XML: {e}")
raise HTTPException(status_code=500, detail="Error al procesar respuesta XML")
logger.info(f"Remesa procesada exitosamente: {pedimento_data.get('pedimento')}")
return create_service_response(
@@ -202,14 +204,16 @@ async def post_remesa_data(**kwargs) -> Dict[str, Any]:
# Obtener datos del servicio web
try:
ws_data = await obtener_remesa(**kwargs)
result["documento"] = ws_data.get("documento", None)
xml_content = ws_data.get('xml_content', {})
# obtener_remesa devuelve create_service_response → los datos están bajo 'data'
ws_data_content = ws_data.get('data', {})
result["documento"] = ws_data_content.get("documento", None)
xml_content = ws_data_content.get('xml_content', [])
result["xml_content"] = xml_content
if not xml_content:
logger.warning("No se obtuvo contenido XML del servicio web")
logger.warning("No se obtuvo contenido XML del servicio web (lista de remesas vacía)")
return result
except HTTPException:
raise # Re-lanzar HTTPExceptions
except Exception as e:
@@ -259,55 +263,61 @@ async def _process_coves_safely(kwargs: Dict[str, Any], xml_content) -> Optional
async def _post_coves(pedimento_data: Dict[str, Any], coves: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""
Envía COVEs al sistema REST.
Args:
pedimento_data: Datos del pedimento
coves: Lista de diccionarios con datos de COVE (comprobanteVE, remesaAgente, remesaSA)
Returns:
Lista de respuestas exitosas
Raises:
HTTPException: Si no se pudo procesar ningún COVE
Crea COVEs en el sistema REST usando patrón get-or-create para evitar duplicados.
Si el COVE ya existe (creado por pedimento completo u otra remesa), lo reutiliza.
"""
if not coves:
return []
responses = []
errors = []
for cove in coves:
# Extraer el número de COVE del diccionario
numero_cove = cove.get('comprobanteVE')
if not numero_cove:
logger.warning(f"COVE sin comprobanteVE encontrado: {cove}")
continue
document_data = {
'numero_cove': numero_cove,
'organizacion': pedimento_data.get('organizacion'),
'pedimento': pedimento_data.get('id')
}
try:
# Verificar si el COVE ya existe antes de intentar crearlo
existing = await remesa_rest_controller._make_request_async(
'GET', f'customs/coves/?numero_cove={numero_cove}'
)
if existing:
results = existing.get('results', existing) if isinstance(existing, dict) else existing
if isinstance(results, list) and results:
logger.info(f"COVE {numero_cove} ya existe (id={results[0].get('id')}), omitiendo creación")
responses.append(results[0])
continue
# No existe → crear
document_data = {
'numero_cove': numero_cove,
'organizacion': pedimento_data.get('organizacion'),
'pedimento': pedimento_data.get('id'),
}
response = await remesa_rest_controller.post_cove(document_data)
if response:
responses.append(response)
logger.debug(f"COVE {numero_cove} procesado exitosamente")
logger.info(f"COVE {numero_cove} creado exitosamente")
else:
error_msg = f"POST de COVE {numero_cove} devolvió respuesta vacía"
logger.warning(error_msg)
errors.append(error_msg)
except Exception as e:
error_msg = f"Error al procesar COVE {numero_cove}: {str(e)}"
logger.warning(error_msg)
errors.append(error_msg)
if not responses and coves:
error_detail = f"No se pudo procesar ningún COVE. Errores: {'; '.join(errors)}"
logger.error(error_detail)
raise HTTPException(status_code=500, detail=error_detail)
if errors:
logger.warning(f"Se procesaron {len(responses)}/{len(coves)} COVEs. Errores: {len(errors)}")
return responses

View File

@@ -19,9 +19,11 @@ def process_remesa_request(self, remesa_request: dict) -> dict:
"""
task_id = self.request.id
servicio = 5 # Código para Pedimento Remesas
print(f"remesa_request >>>> {remesa_request}")
logger.info(f"remesa_request >>>> {remesa_request}")
pedimento_id = remesa_request.get('pedimento', {}).get('id')
organizacion_id = remesa_request.get('pedimento', {}).get('organizacion')
remesa_num = remesa_request.get('remesa', 'N/A')
remesa_num = remesa_request.get('pedimento', {}).get('pedimento_app', 'N/A')
# Crear un NUEVO event loop para esta tarea (evita problemas de loop cerrado)
loop = asyncio.new_event_loop()

View File

@@ -1,6 +1,8 @@
from celery import Celery
from core.config import settings
import os
from celery.schedules import crontab
from datetime import timedelta
# Configuración de Celery
celery_app = Celery(
@@ -32,6 +34,15 @@ celery_app.conf.update(
task_save_success_on_complete=True # Guarda los resultados exitosos
)
# desde aqui se pueden programar tareas pero al estar desde microservicios no tengo acceso a la informacion de organizacion
# por este motivo es mejor programar las tareas desde la aplicacion de django
celery_app.conf.beat_schedule = {
# 'prueba-4': {
# 'task': 'tasks.prueba_hola',
# 'schedule': timedelta(seconds=60), # 12:00 = 6:03 AM
# },
}
# Autodiscovery of tasks
celery_app.autodiscover_tasks()

View File

@@ -91,6 +91,46 @@ class APIRESTController:
logger.error(traceback.format_exc())
return None
async def post_or_update_document(
self, soap_response=None, organizacion: str = None,
pedimento: str = None, file_name: str = None,
document_type: int = None, fuente: int = 2,
identifier: str = None,
) -> Dict[str, Any]:
"""
Guarda un documento VU (request o error). Si ya existe uno del mismo
tipo/pedimento/identificador, elimina el anterior y crea el nuevo,
evitando duplicados en ejecuciones recurrentes (tarea programada diaria).
identifier: cadena única del documento dentro del pedimento
(ej: número de COVE, número de e-document). Se usa como
filtro archivo__icontains para distinguir documentos del
mismo tipo pertenecientes a distintos COVEs o edocs.
"""
try:
query = f'record/documents/?pedimento={pedimento}&document_type={document_type}'
if identifier:
query += f'&archivo__icontains={identifier}'
existing = await self._make_request_async('GET', query)
if existing:
results = existing.get('results', existing) if isinstance(existing, dict) else existing
if isinstance(results, list) and results:
existing_id = results[0].get('id')
if existing_id:
await self._make_request_async('DELETE', f'record/documents/{existing_id}/')
logger.info(f"Documento VU previo eliminado: id={existing_id}, document_type={document_type}, identifier={identifier}")
except Exception as e:
logger.warning(f"No se pudo verificar/eliminar documento VU existente (document_type={document_type}, identifier={identifier}): {e}")
return await self.post_document(
soap_response=soap_response,
organizacion=organizacion,
pedimento=pedimento,
file_name=file_name,
document_type=document_type,
fuente=fuente,
)
async def put_procesamiento(self, service_id: int, data: Dict[str, Any]) -> Dict[str, Any]:
return await self._make_request_async('PUT', f'customs/procesamientopedimentos/{service_id}/', data=data)
@@ -242,7 +282,10 @@ class APIController:
"""
Método para obtener la lista de servicios desde la API.
"""
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}')
# return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&estado=1&servicio={service_type}')
# eliminar filtro de estado, estado tiene 4 tipos 1: En espera, 2: Procesando, 3: Finalizado y 4: Error
# lo elimine para poder ejecutar la operacion siempre que sea necesario
return await self._make_request_async('GET', f'customs/procesamientopedimentos/?pedimento={pedimento}&servicio={service_type}')
async def get_pedimento(self, pedimento_id: str) -> Dict[str, Any]:
"""
@@ -454,32 +497,36 @@ class APIController:
async def put_edocument(self, edocument_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Método para actualizar un documento digitalizado en la API.
Args:
edocument_id: UUID del documento a actualizar
data: Diccionario con los datos a actualizar
"""
return await self._make_request_async('PUT', f'customs/edocuments/{edocument_id}/', data=data)
async def get_cer(self, id: str) -> bytes:
"""
Método para obtener un certificado específico desde la API (como binario).
Args:
id: UUID del certificado a consultar
Returns:
bytes: Contenido binario del certificado
"""
return await self._make_request_async('GET', f'vucem/vucem/{id}/download_cer/', return_bytes=True)
async def put_cove(self, cove_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
return await self._make_request_async('PUT', f'customs/coves/{cove_id}/', data=data)
async def get_key(self, id: str) -> bytes:
"""
Método para obtener una llave específica desde la API (como binario).
Args:
id: UUID de la llave a consultar
Returns:
bytes: Contenido binario de la llave
"""
return await self._make_request_async('GET', f'vucem/vucem/{id}/download_key/', return_bytes=True)
result = await self._make_request_async('GET', f'vucem/vucem/{id}/download_key/', return_bytes=True)
if result is None:
logger.info(f"get_key retornó None")
else:
logger.info(f"get_key retornó {len(result)} bytes")
return result
async def get_cer(self, id: str) -> bytes:
result = await self._make_request_async('GET', f'vucem/vucem/{id}/download_cer/', return_bytes=True)
if result is None:
logger.info(f"get_cer retornó None")
else:
logger.info(f"get_cer retornó {len(result)} bytes")
return result
async def _make_request_async(self, method: str, endpoint: str, data=None, return_bytes: bool = False):
"""
@@ -497,11 +544,13 @@ class APIController:
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
logger.info(f"Haciendo petición {method} a {url}")
print(f"Haciendo petición {method} a {url}")
if method.upper() == 'GET':
response = await client.get(url, headers=self.headers)
elif method.upper() == 'POST':
response = await client.post(url, json=data, headers=self.headers)
print(f"response >>>> {response}")
elif method.upper() == 'PUT':
response = await client.put(url, json=data, headers=self.headers)
elif method.upper() == 'DELETE':

View File

@@ -1,6 +1,5 @@
from pydantic import BaseModel, Field, field_validator
from typing import Optional
from typing import Optional, List, Union
class ServiceBaseSchema(BaseModel):
"""Esquema base para servicios con campos comunes"""
@@ -43,4 +42,41 @@ class ServiceRemesaSchema(BaseModel):
def validate_string_fields(cls, v):
if not v or not v.strip():
raise ValueError('Los campos de texto no pueden estar vacíos')
return v.strip()
return v.strip()
class MultiPedimentoCompletoSchema(BaseModel):
"""Esquema para procesar múltiples pedimentos de forma asíncrona"""
organizacion: str = Field(..., description="ID de la organización")
pedimentos: List[str] = Field(
...,
description="Lista de IDs de pedimentos a procesar",
min_length=1,
max_length=200
)
@field_validator('organizacion')
def validate_organizacion(cls, v):
if not v or not v.strip():
raise ValueError('La organización no puede estar vacía')
return v.strip()
@field_validator('pedimentos')
def validate_pedimentos(cls, v):
if not v:
raise ValueError('Debe proporcionar al menos un pedimento')
if len(v) > 200:
raise ValueError(f'Máximo 200 pedimentos por solicitud. Recibidos: {len(v)}')
# Eliminar duplicados y vacíos
pedimentos_limpios = [p.strip() for p in v if p and p.strip()]
if not pedimentos_limpios:
raise ValueError('La lista de pedimentos no puede estar vacía')
# Eliminar duplicados
return list(set(pedimentos_limpios))
class Config:
json_schema_extra = {
"example": {
"organizacion": "1",
"pedimentos": ["123", "456", "789"]
}
}

260
tasks.py
View File

@@ -2,7 +2,7 @@ from celery import Celery
from celery_app import celery_app
import asyncio
import logging
from typing import Dict, Any
from typing import Dict, Any, List
from contextlib import asynccontextmanager
from controllers.RESTController import rest_controller
from controllers.SOAPController import soap_controller
@@ -125,9 +125,12 @@ def pedimento_completo_task(self, request_data: Dict[str, Any]):
# Subir documento de pedimento completo si la petición fue exitosa
try:
# solucion al error de descarga de un e-document, el mapeo de identificador no llegaba y ni siquiera insertaba registros en
# la tabla
identificadores_ed_response = xml_content.get('identificadores_ed', []) if 'xml_content' in locals() else []
upload_result = await _post_edocuments(
response_service=service_data,
identificadores_ed=[soap_response.get('documento', {}).get('numero_edocument')]
identificadores_ed=identificadores_ed_response
)
logger.info(f"Documento de pedimento completo subido exitosamente: {upload_result}")
except Exception as upload_err:
@@ -158,6 +161,173 @@ def pedimento_completo_task(self, request_data: Dict[str, Any]):
return run_async_task(_execute_pedimento_completo)
@celery_app.task(bind=True, name='tasks.multi_pedimento_completo_task')
def multi_pedimento_completo_task(self, pedimentos: List[str], organizacion: str):
"""
Tarea asíncrona para procesar MÚLTIPLES pedimentos completos.
Args:
pedimentos: Lista de IDs de pedimentos a procesar
organizacion: ID de la organización
"""
import time
from datetime import datetime
start_time = time.time()
results = {
"total": len(pedimentos),
"successful": [],
"failed": [],
"started_at": datetime.utcnow().isoformat()
}
total = len(pedimentos)
for idx, pedimento_id in enumerate(pedimentos, 1):
try:
# Actualizar progreso (igual que en tus otras tareas)
self.update_state(
state='PROGRESS',
meta={
'status': f'Procesando pedimento {idx}/{total}',
'current': idx,
'total': total,
'current_pedimento': pedimento_id,
'percentage': round((idx / total) * 100, 2)
}
)
logger.info(f"[MULTI] Procesando pedimento {idx}/{total}: {pedimento_id}")
# Preparar datos exactamente como lo espera la tarea individual
request_data = {
"pedimento": pedimento_id,
"organizacion": organizacion
}
# Reutilizar la lógica de la tarea individual
# Esto ejecuta el mismo código que tu endpoint individual
async def _execute():
return await _execute_pedimento_completo_logic(request_data)
result = run_async_task(_execute)
results["successful"].append({
"pedimento_id": pedimento_id,
"result": result
})
logger.info(f"[MULTI] Pedimento {pedimento_id} procesado exitosamente")
except Exception as e:
logger.error(f"[MULTI] Error procesando pedimento {pedimento_id}: {e}")
results["failed"].append({
"pedimento_id": pedimento_id,
"error": str(e)
})
elapsed_time = time.time() - start_time
results["completed_at"] = datetime.utcnow().isoformat()
results["elapsed_seconds"] = round(elapsed_time, 2)
results["success_count"] = len(results["successful"])
results["failed_count"] = len(results["failed"])
return results
async def _execute_pedimento_completo_logic(request_data: dict) -> dict:
"""
Lógica compartida para procesar un pedimento completo.
Esta es la misma lógica que usa tu endpoint individual.
"""
operation_name = "pedimento_completo"
service_data = None
try:
logger.info(f"Procesando pedimento completo - Pedimento: {request_data['pedimento']}")
# Validar datos de entrada
await _validate_request_data(request_data)
# Obtener servicio
service_data = await _get_pedimento_service(
pedimento_id=request_data['pedimento'],
service_type=3,
operation_name=operation_name
)
# Actualizar estado a "En proceso"
update_success = await _update_service_status(
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
)
if not update_success:
raise Exception("Error al actualizar estado del servicio")
# Obtener credenciales VUCEM
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
if not contribuyente_id:
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
raise Exception("ID de contribuyente no encontrado")
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
# Procesar petición SOAP
soap_response = await get_soap_pedimento_completo(
credenciales=credentials,
response_service=service_data,
soap_controller=soap_controller
)
if not soap_response:
raise Exception("Error en la petición SOAP")
# Actualizar datos del pedimento
xml_content = soap_response.get('xml_content', {})
if xml_content:
update_content = {k: v for k, v in xml_content.items() if k != 'identificadores_ed'}
update_content['existe_expediente'] = True
await rest_controller.put_pedimento(
service_data['pedimento']['id'],
update_content
)
# Procesar COVEs
coves = xml_content.get('coves', [])
if coves:
await _post_coves(
response_service=service_data,
coves=coves
)
# Procesar documentos digitalizados
identificadores_ed = xml_content.get('identificadores_ed', [])
if identificadores_ed:
await _post_edocuments(
response_service=service_data,
identificadores_ed=identificadores_ed
)
# Finalizar servicio
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
return {
"success": True,
"pedimento_id": request_data['pedimento'],
"message": "Pedimento procesado exitosamente"
}
except Exception as e:
logger.error(f"Error en pedimento {request_data['pedimento']}: {e}")
if service_data:
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
return {
"success": False,
"pedimento_id": request_data['pedimento'],
"error": str(e)
}
@celery_app.task(bind=True)
def partidas_task(self, **kwargs):
"""Tarea asíncrona para obtener partidas"""
@@ -459,6 +629,19 @@ def acuse_task(self, **kwargs):
documento_info["documento"] = soap_response.get('documento', {})
documentos_exitosos += 1
logger.info(f"Acuse del documento {idx + 1} procesado exitosamente")
try:
await rest_controller.put_edocument(
edocument_id=edoc['id'],
data={
"id": edoc['id'],
"acuse_descargado": True,
"numero_edocument": edoc.get('numero_edocument'),
"pedimento": service_data['pedimento']['id'],
"organizacion": service_data['organizacion'],
}
)
except Exception as status_err:
logger.warning(f"Error actualizando acuse_descargado para edoc {edoc.get('numero_edocument')}: {status_err}")
else:
documento_info["error"] = "Error en petición SOAP"
logger.warning(f"No se pudo procesar el acuse del documento {idx + 1}")
@@ -665,28 +848,19 @@ def edocument_task(self, **kwargs):
documento_info["documento"] = soap_response.get('documento', {})
documentos_exitosos += 1
logger.info(f"E-document {idx + 1} procesado exitosamente")
# Subir el documento si la petición fue exitosa
try:
upload_result = await _post_edocuments(
response_service=service_data,
identificadores_ed=[edoc.get('numero_edocument')]
await rest_controller.put_edocument(
edocument_id=edoc['id'],
data={
"id": edoc['id'],
"edocument_descargado": True,
"numero_edocument": edoc.get('numero_edocument'),
"pedimento": service_data['pedimento']['id'],
"organizacion": service_data['organizacion'],
}
)
documento_info["upload_result"] = upload_result
logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente")
except Exception as upload_err:
documento_info["upload_error"] = str(upload_err)
logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}")
# Subir el documento si la petición fue exitosa
try:
upload_result = await _post_edocuments(
response_service=service_data,
identificadores_ed=[edoc.get('numero_edocument')]
)
documento_info["upload_result"] = upload_result
logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente")
except Exception as upload_err:
documento_info["upload_error"] = str(upload_err)
logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}")
except Exception as status_err:
logger.warning(f"Error actualizando edocument_descargado para edoc {edoc.get('numero_edocument')}: {status_err}")
else:
documento_info["error"] = "Error en petición SOAP"
logger.warning(f"No se pudo procesar el e-document {idx + 1}")
@@ -807,6 +981,19 @@ def coves_task(self, **kwargs):
documento_info["documento"] = soap_response.get('documento', {})
documentos_exitosos += 1
logger.info(f"cove del documento {idx + 1} procesado exitosamente")
try:
await rest_controller.put_cove(
cove_id=cove['id'],
data={
"id": cove['id'],
"cove_descargado": True,
"numero_cove": cove.get('numero_cove'),
"pedimento": service_data['pedimento']['id'],
"organizacion": service_data['organizacion'],
}
)
except Exception as status_err:
logger.warning(f"Error actualizando cove_descargado para cove {cove.get('numero_cove')}: {status_err}")
else:
documento_info["error"] = "Error en petición SOAP"
logger.warning(f"No se pudo procesar el cove del documento {idx + 1}")
@@ -948,17 +1135,19 @@ def acuse_cove_task(self, **kwargs):
documento_info["documento"] = soap_response.get('documento', {})
documentos_exitosos += 1
logger.info(f"Acuse de COVE {idx + 1} procesado exitosamente")
# Subir el documento de COVE si la petición fue exitosa
try:
upload_result = await _post_coves(
response_service=service_data,
identificadores_cove=[cove.get('numero_cove')]
await rest_controller.put_cove(
cove_id=cove['id'],
data={
"id": cove['id'],
"acuse_cove_descargado": True,
"numero_cove": cove.get('numero_cove'),
"pedimento": service_data['pedimento']['id'],
"organizacion": service_data['organizacion'],
}
)
documento_info["upload_result"] = upload_result
logger.info(f"Documento COVE {cove.get('numero_cove')} subido exitosamente")
except Exception as upload_err:
documento_info["upload_error"] = str(upload_err)
logger.error(f"Error al subir documento COVE {cove.get('numero_cove')}: {upload_err}")
except Exception as status_err:
logger.warning(f"Error actualizando acuse_cove_descargado para cove {cove.get('numero_cove')}: {status_err}")
else:
documento_info["error"] = "Error en petición SOAP"
logger.warning(f"No se pudo procesar el acuse de COVE {idx + 1}")
@@ -999,3 +1188,12 @@ def acuse_cove_task(self, **kwargs):
raise e
return run_async_task(_execute_acuse_cove)
# @celery_app.task(name='tasks.prueba_hola')
# def prueba_hola():
# """
# Tarea de prueba: solo imprime un saludo para verificar la programación.
# """
# logger.info("¡Hola! Tarea programada ejecutada correctamente.")
# print("¡Hola desde la tarea programada!")
# return "OK"

85
utils/minio_client.py Normal file
View File

@@ -0,0 +1,85 @@
# microservice/utils/minio_client.py
import os
from minio import Minio
from minio.error import S3Error
from datetime import timedelta
from typing import Optional, BinaryIO
import logging
logger = logging.getLogger(__name__)
class MinIOClient:
"""Cliente MinIO para FastAPI"""
def __init__(self):
self.client = None
self.bucket_name = None
self._initialize()
def _initialize(self):
"""Inicializa el cliente MinIO"""
endpoint = os.environ.get('MINIO_ENDPOINT', 'minio:9000')
access_key = os.environ.get('MINIO_ACCESS_KEY')
secret_key = os.environ.get('MINIO_SECRET_KEY')
secure = os.environ.get('MINIO_SECURE', 'false').lower() == 'true'
self.bucket_name = os.environ.get('MINIO_BUCKET_NAME', 'efc-microservice-dev')
self.client = Minio(
endpoint=endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure
)
# Asegurar bucket
if not self.client.bucket_exists(self.bucket_name):
self.client.make_bucket(self.bucket_name)
logger.info(f"Bucket '{self.bucket_name}' creado")
def upload_file(
self,
object_name: str,
file_path: str = None,
file_data: bytes = None,
content_type: str = None
) -> bool:
"""Sube archivo a MinIO (síncrono para Celery)"""
try:
if file_path:
self.client.fput_object(
self.bucket_name,
object_name,
file_path,
content_type=content_type
)
elif file_data:
import io
data_stream = io.BytesIO(file_data)
self.client.put_object(
self.bucket_name,
object_name,
data_stream,
len(file_data),
content_type=content_type
)
return True
except Exception as e:
logger.error(f"Error subiendo archivo: {e}")
return False
def get_presigned_url(self, object_name: str, expires: int = 3600) -> Optional[str]:
"""Genera URL firmada"""
try:
return self.client.presigned_get_object(
self.bucket_name,
object_name,
expires=timedelta(seconds=expires)
)
except Exception as e:
logger.error(f"Error generando URL: {e}")
return None
# Singleton
minio_client = MinIOClient()

View File

@@ -603,7 +603,7 @@ async def get_soap_acuse(credenciales, response_service, soap_controller, edocum
no_partidas = response_service['pedimento'].get('numero_partidas', 0)
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
pedimento = response_service['pedimento'].get('pedimento', 'N/A')
_file_name = f"vu_AC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{idx}.pdf"
_file_name = f"vu_AC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{edocument['numero_edocument']}.pdf"
# Enviar el documento PDF usando binary_content
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")
@@ -710,7 +710,7 @@ async def get_soap_acuseCOVE(credenciales, response_service, soap_controller, co
no_partidas = response_service['pedimento'].get('numero_partidas', 0)
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
pedimento = response_service['pedimento'].get('pedimento', 'N/A')
_file_name = f"vu_AC_COVE_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{idx}.pdf"
_file_name = f"vu_AC_COVE_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{cove['numero_cove']}.pdf"
# Enviar el documento PDF usando binary_content
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")
@@ -884,7 +884,7 @@ async def get_soap_edocument(credenciales, response_service, soap_controller, ed
no_partidas = response_service['pedimento'].get('numero_partidas', 0)
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
pedimento = response_service['pedimento'].get('pedimento', 'N/A')
_file_name = f"vu_EDC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{idx}.pdf"
_file_name = f"vu_EDC_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{edocument['numero_edocument']}.pdf"
# Enviar el documento PDF usando binary_content
logger.info(f"Enviando documento PDF: {_file_name} ({len(pdf_bytes)} bytes)")