diff --git a/.env.dev b/.env.dev deleted file mode 100644 index 27821e3..0000000 --- a/.env.dev +++ /dev/null @@ -1,25 +0,0 @@ -# Configuración de la aplicación -APP_NAME=EFC Microservice -APP_VERSION=1.0.0 -DEBUG=false - -# Configuración del servidor -HOST=0.0.0.0 -PORT=8001 -API_URL=http://host.docker.internal:8000/api/v1 -API_TOKEN=1b5b5a41228cbac6d9c373d739f9c36a918e4dd8 - -# Configuración de API externa -SOAP_SERVICE_URL=https://www.ventanillaunica.gob.mx -EXTERNAL_API_TIMEOUT=5 -MAX_RETRIES=5 -TIMEOUT=5 -WAIT_TIME=0 -VERIFY_SSL=True - -CELERY_BROKER_URL=redis://redis_microservice:6379/0 -CELERY_RESULT_BACKEND=redis://redis_microservice:6379/0 -# Configuración de seguridad -SECRET_KEY=your-super-secret-key-here -ALGORITHM=HS256 -ACCESS_TOKEN_EXPIRE_MINUTES=30 diff --git a/.gitignore b/.gitignore index 41074bc..0e69221 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,6 @@ __pycache__/ # C extensions *.so -api/api_v2 main2.py sample.xml diff --git a/api/api_v2/api.py b/api/api_v2/api.py index 30e3c5f..0d9f0c6 100644 --- a/api/api_v2/api.py +++ b/api/api_v2/api.py @@ -2,17 +2,21 @@ from fastapi import APIRouter # En Python, no se pueden usar llaves {} para importar múltiples módulos. # Debes usar paréntesis () para hacer importaciones multilínea. -# from api.api_v2.modules.acuses import router as acuses_router -from api.api_v2.modules.coves.router import router as coves_router -# from api.api_v2.modules.edocs import router as edocs_router -# from api.api_v2.modules.partidas import router as partidas_router -# from api.api_v2.modules.pedimentos import router as pedimentos_router +from api.api_v2.modules.acuses.routers import router as acuses_router +from api.api_v2.modules.coves.routers import router as coves_router +from api.api_v2.modules.tasks.routers import router as tasks_router +from api.api_v2.modules.edocs.routers import router as edocs_router +from api.api_v2.modules.partidas.routers import router as partidas_router +from api.api_v2.modules.pedimentos.routers import router as pedimentos_router +from api.api_v2.modules.remesas.routers import router as remesas_router api_router = APIRouter() # Incluir routers de endpoints -# api_router.include_router(acuses_router, tags=["acuses"]) -api_router.include_router(coves_router, tags=["coves"]) -# api_router.include_router(edocs_router, tags=["edocs"]) -# api_router.include_router(partidas_router, tags=["partidas"]) -# api_router.include_router(pedimentos_router, tags=["pedimentos"]) +api_router.include_router(acuses_router, tags=["Acuses"]) +api_router.include_router(coves_router, tags=["Coves"]) +api_router.include_router(tasks_router, tags=["Tasks"]) +api_router.include_router(edocs_router, tags=["EDocuments"]) +api_router.include_router(partidas_router, tags=["Partidas"]) +api_router.include_router(pedimentos_router, tags=["Pedimentos"]) +api_router.include_router(remesas_router, tags=["Remesas"]) diff --git a/api/api_v2/modules/edocs/router.py b/api/api_v2/modules/acuses/__init__.py similarity index 100% rename from api/api_v2/modules/edocs/router.py rename to api/api_v2/modules/acuses/__init__.py diff --git a/api/api_v2/modules/acuses/controllers.py b/api/api_v2/modules/acuses/controllers.py new file mode 100644 index 0000000..ff99d49 --- /dev/null +++ b/api/api_v2/modules/acuses/controllers.py @@ -0,0 +1,61 @@ +from controllers.RESTController import APIRESTController +from controllers.SOAPController import VUCEMController +from typing import List, Dict, Any + + +class AcuseVUController(VUCEMController): + def __init__(self): + super().__init__() + + def generate_acuse_template(self, **kwargs) -> str: + credencial = kwargs.get("credencial", {}) + username = credencial.get("user") + password = credencial.get("password") + idEDocument = kwargs['edoc'].get("numero_edocument", "N/A") + soap_template = f''' + + + + + {username} + {password} + + + + + + {idEDocument} + + + + ''' + return soap_template + +class EDocumentController(APIRESTController): + def __init__(self): + super().__init__() + + async def get_edocs(self, pedimento: str) -> List[Dict[str, Any]]: + """ + Método para obtener los documentos digitalizados de un pedimento. + + Args: + pedimento: UUID del pedimento a consultar + """ + return await self._make_request_async('GET', f'customs/edocuments/?pedimento={pedimento}') + + async def put_edocument(self, edocument_id: str, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Método para actualizar un documento digitalizado en la API. + + Args: + edocument_id: UUID del documento a actualizar + data: Diccionario con los datos a actualizar + """ + return await self._make_request_async('PUT', f'customs/edocuments/{edocument_id}/', data=data) + + +acuse_rest_controller = EDocumentController() +acuse_vu_controller = AcuseVUController() + + diff --git a/api/api_v2/modules/acuses/routers.py b/api/api_v2/modules/acuses/routers.py index ce5096c..95d7612 100644 --- a/api/api_v2/modules/acuses/routers.py +++ b/api/api_v2/modules/acuses/routers.py @@ -1,42 +1,43 @@ -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, Depends from fastapi.responses import JSONResponse from typing import Dict, Any, List, Optional -import asyncio -import logging -import traceback + +from api.api_v2.modules.authentication.services import get_current_user from .schemas import AcuseSchema, AcuseMasivoSchema -from .services import * +from .tasks import process_acuse_request +router = APIRouter() -router = APIRouter(prefix="/acuses", tags=["Acuses"]) - -@router.post("/service/acuse/individual", response_model=Dict[str, Any]) +@router.post("/services/acuse/", response_model=Dict[str, Any]) async def obtener_acuse(acuse_request: AcuseSchema): """ Endpoint para obtener el acuse de recibo de un documento específico. """ + + acuse_dict = acuse_request.model_dump() + # Ejecuta la tarea de Celery de forma asíncrona + task = process_acuse_request.delay(acuse_dict) + # Puedes devolver el ID de la tarea para consultar el estado después + return {"task_id": task.id, "status": "submitted"} - pass -@router.post("/service/acuse", response_model=Dict[str, Any]) +@router.post("/services/all/acuse/pedimento/", response_model=Dict[str, Any]) async def obtener_acuses(acuse_request: AcuseMasivoSchema): """ Endpoint para obtener acuses de recibo de documentos asociados a un pedimento. """ - pass + # Para cada edoc en la lista, dispara una tarea Celery + task_ids = [] + acuse_request_dict = acuse_request.model_dump() + for edoc in acuse_request_dict.get('edocs', []): + acuse_dict = { + "edoc": edoc, + "pedimento": acuse_request_dict.get('pedimento'), + "credencial": acuse_request_dict.get('credencial') + } + task = process_acuse_request.delay(acuse_dict) + task_ids.append(task.id) -@router.post("/service/acuse_cove", response_model=Dict[str, Any]) -async def obtener_acuses_cove(acuse_request: AcuseMasivoSchema): - """ - Endpoint para obtener acuses de recibo de COVEs asociados a un pedimento. - """ - pass - -@router.post("/service/acuse_cove/individual", response_model=Dict[str, Any]) -async def obtener_acuse_cove(acuse_request: AcuseSchema): - """ - Endpoint para obtener el acuse de recibo de un COVE específico. - """ - pass \ No newline at end of file + return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)} diff --git a/api/api_v2/modules/acuses/schemas.py b/api/api_v2/modules/acuses/schemas.py index 7708b1c..73a40c7 100644 --- a/api/api_v2/modules/acuses/schemas.py +++ b/api/api_v2/modules/acuses/schemas.py @@ -1,22 +1,20 @@ -from fastapi import FastAPI -from pydantic import BaseModel -from uuid import UUID +from pydantic import BaseModel, Field, field_validator +from schemas.CredencialSchema import CredencialBaseSchema +from api.api_v2.modules.pedimentos.schemas import PedimentoBaseSchema # Aplica para Acuse, Acuse Cove y Edocuments +class AcuseBaseSchema(BaseModel): + id: int = Field(..., description="ID único del eDocument") + numero_edocument: str =Field(..., description="Número del eDocument") + class AcuseSchema(BaseModel): - pedimento: str - organizacion: str - numero_documento: str - vu_user: str - password: str + edoc: AcuseBaseSchema + pedimento: PedimentoBaseSchema + credencial: CredencialBaseSchema class AcuseMasivoSchema(BaseModel): - pedimento: str - organizacion: str - numeros_documentos: list[str] - vu_user: str - password: str - - - + edocs: list[AcuseBaseSchema] + pedimento: PedimentoBaseSchema + credencial: CredencialBaseSchema + diff --git a/api/api_v2/modules/acuses/services.py b/api/api_v2/modules/acuses/services.py index e69de29..f0d71bf 100644 --- a/api/api_v2/modules/acuses/services.py +++ b/api/api_v2/modules/acuses/services.py @@ -0,0 +1,188 @@ +from http.client import HTTPException +import base64 +import re +from .controllers import acuse_vu_controller, acuse_rest_controller +from utils.helpers import soap_error + +import xml.etree.ElementTree as ET + +soap_headers = { + 'Content-Type': 'text/xml; charset=utf-8', + 'SOAPAction': 'http://www.ventanillaunica.gob.mx/ventanilla/ConsultaAcusesService/consultarAcuseEdocument',# AcuseCove + 'Accept-Encoding': 'gzip,deflate', +} + + +async def obtener_acuse(**kwargs): + soap_xml = acuse_vu_controller.generate_acuse_template(**kwargs) + + response = await acuse_vu_controller.make_request_async( + "ventanilla-acuses-HA/ConsultaAcusesServiceWS?wsdl", + data=soap_xml, + headers=soap_headers + ) + + if response is None: + raise Exception("No se obtuvo respuesta del servicio SOAP.") + + if response.status_code != 200: + raise Exception(f"Error en la solicitud SOAP: {response.status}") + + if (response) and (not soap_error(response)): + acuse_base64 = _extract_acuse_data(response.text) + + if acuse_base64 is None: + raise Exception("No se pudo extraer el acuse del documento de la respuesta SOAP.") + + + pdf_bytes = _decode_acuse_base64_content(acuse_base64) + + if not pdf_bytes: + raise HTTPException(status_code=500, detail="No se pudo decodificar el documento del acuse") + + # Validar que el PDF sea válido + if not pdf_bytes.startswith(b'%PDF'): + import logging + logger = logging.getLogger("app.api") + logger.warning("El contenido decodificado no parece ser un PDF válido") + + + # Mejorar el nombre del archivo usando todos los datos relevantes + pedimento = kwargs.get('pedimento', {}) + pedimento_num = pedimento.get('pedimento','') + _file_name = _get_file_name(**kwargs) + + # Validar que organización y pedimento no sean None + organizacion = pedimento.get("organizacion", None) + pedimento_id = pedimento.get("id", None) + + rest_response = await acuse_rest_controller.post_document( + binary_content=pdf_bytes, + organizacion=organizacion, + pedimento=pedimento_id, + file_name=_file_name, + document_type=4 + ) + + if rest_response is None: + raise Exception("No se pudo enviar el acuse a la API interna.") + if rest_response.get("id") is None: + raise Exception("La respuesta de la API interna no contiene un ID válido.") + + acuse_update_response = await change_edocument_status( + edoc=kwargs.get('edoc'), + status=True, + pedimento=pedimento + ) + return { + "document_response": rest_response, + "file_name": _file_name, + "pedimento": pedimento_num, + "acuse_update_response": acuse_update_response + } + +async def change_edocument_status(edoc: dict, status: bool, pedimento: dict): + data = { + "id": edoc.get("id"), + "edocument_descargado": status, + "numero_edocument": edoc.get("numero_edocument"), + "pedimento": pedimento.get("id"), + "organizacion": pedimento.get("organizacion"), + } + + response = await acuse_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data) + + return response + +def _decode_acuse_base64_content(base64_content): # Testeado + """ + Decodifica el contenido Base64 del acuse y limpia caracteres especiales. + + Args: + base64_content (str): Contenido codificado en Base64 + + Returns: + bytes: Contenido decodificado o None si hay error + """ + try: + # Limpiar el contenido Base64 de manera exhaustiva + cleaned_content = base64_content + + # Remover entidades HTML/XML como , , etc. + cleaned_content = re.sub(r'&#x[0-9a-fA-F]+;', '', cleaned_content) + cleaned_content = re.sub(r'&#[0-9]+;', '', cleaned_content) + + # Remover espacios en blanco, saltos de línea, etc. + cleaned_content = re.sub(r'[\s\n\r\t]', '', cleaned_content) + + # Remover caracteres no válidos para Base64 + cleaned_content = re.sub(r'[^A-Za-z0-9+/=]', '', cleaned_content) + + + # Agregar padding si es necesario + missing_padding = len(cleaned_content) % 4 + if missing_padding: + cleaned_content += '=' * (4 - missing_padding) + + # Decodificar Base64 + decoded_content = base64.b64decode(cleaned_content) + + return decoded_content + + except Exception as e: + + # Intentar con validación estricta deshabilitada + try: + decoded_content = base64.b64decode(cleaned_content, validate=False) + return decoded_content + except Exception as e2: + return None + +def _extract_acuse_data(soap_response_text: str) -> dict: + try: + # Primero, extraer la parte XML del contenido multipart + xml_start = soap_response_text.find(' dict: + pedimento = kwargs.get('pedimento', {}) + pedimento_app = pedimento.get('pedimento_app', 'N/A') + idEdocument = kwargs['edoc'].get('numero_edocument', 'N/A') + _file_name = f"vu_AC_{pedimento_app}_{idEdocument}.pdf" + return _file_name + diff --git a/api/api_v2/modules/acuses/tasks.py b/api/api_v2/modules/acuses/tasks.py index e69de29..ae47aca 100644 --- a/api/api_v2/modules/acuses/tasks.py +++ b/api/api_v2/modules/acuses/tasks.py @@ -0,0 +1,27 @@ +from celery import Celery +from celery_app import celery_app +import asyncio +import logging +from typing import Dict, Any +from contextlib import asynccontextmanager + +from .services import obtener_acuse +from api.api_v2.modules.tasks.tasks import run_async_task + + +@celery_app.task +def process_acuse_request(acuse_request: Dict[str, Any]) -> Dict[str, Any]: + """ + Tarea de Celery para procesar la solicitud de acuse. + + Args: + acuse_request: Diccionario con los datos de la solicitud de acuse. + + Returns: + Diccionario con la respuesta del acuse. + """ + loop = asyncio.get_event_loop() + acuse_response = loop.run_until_complete(obtener_acuse(**acuse_request)) + + return {"status": "processed", "data": acuse_response} + diff --git a/api/api_v2/modules/edocs/schema.py b/api/api_v2/modules/authentication/__init__.py similarity index 100% rename from api/api_v2/modules/edocs/schema.py rename to api/api_v2/modules/authentication/__init__.py diff --git a/api/api_v2/modules/authentication/services.py b/api/api_v2/modules/authentication/services.py new file mode 100644 index 0000000..197505a --- /dev/null +++ b/api/api_v2/modules/authentication/services.py @@ -0,0 +1,19 @@ +from fastapi import Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer +from jose import JWTError, jwt +from core.config import settings +# Configuración básica, reemplaza con tu clave secreta real y algoritmo +SECRET_KEY = settings.SECRET_KEY +ALGORITHM = settings.ALGORITHM + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + +def get_current_user(token: str = Depends(oauth2_scheme)): + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + user_id = payload.get("user_id") + if user_id is None: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Usuario no encontrado") + return user_id + except JWTError: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token inválido") diff --git a/api/api_v2/modules/edocs/service.py b/api/api_v2/modules/coves/__init__.py similarity index 100% rename from api/api_v2/modules/edocs/service.py rename to api/api_v2/modules/coves/__init__.py diff --git a/api/api_v2/modules/coves/controllers.py b/api/api_v2/modules/coves/controllers.py new file mode 100644 index 0000000..560c1bf --- /dev/null +++ b/api/api_v2/modules/coves/controllers.py @@ -0,0 +1,103 @@ +from typing import Any, Dict +from controllers.RESTController import APIRESTController +from controllers.SOAPController import VUCEMController + +class CovesController(APIRESTController): + def __init__(self): + super().__init__() + + async def get_cer(self, id: str) -> bytes: + """ + Método para obtener un certificado específico desde la API (como binario). + Args: + id: UUID del certificado a consultar + Returns: + bytes: Contenido binario del certificado + """ + return await self._make_request_async('GET', f'vucem/vucem/{id}/download_cer/', return_bytes=True) + + async def get_key(self, id: str) -> bytes: + """ + Método para obtener una llave específica desde la API (como binario). + Args: + id: UUID de la llave a consultar + Returns: + bytes: Contenido binario de la llave + """ + return await self._make_request_async('GET', f'vucem/vucem/{id}/download_key/', return_bytes=True) + + async def put_cove_data(self, cove_id, data) -> Dict[str, Any]: + return await self._make_request_async('PUT', f'customs/coves/{cove_id}/', data=data) + + +class CovesVUController(VUCEMController): + def __init__(self): + super().__init__() # Implementación específica para Coves VU + + def generate_cove_template(self, username: str, password: str, certificado: str, firma: str, cove: str) -> str: + """ + Genera el template SOAP para consultar un COVE específico + + Args: + username: Usuario de VUCEM + password: Contraseña de VUCEM + certificado: certificado base 64 + firma: firma a base de cadena original base 64 + cove: COVE + + Returns: + str: Template SOAP XML completo + """ + soap_template = f''' + + + + + {username} + {password} + + + + + + + + {certificado} + |{username}|{cove}| + {firma} + + + {cove} + + + + + + ''' + return soap_template + + def generate_acuse_template(self, username: str, password: str, cove: str) -> str: + soap_template = f''' + + + + + {username} + {password} + + + + + + {cove} + + + + ''' + return soap_template + +coves_rest_controller = CovesController() +coves_vu_controller = CovesVUController() + diff --git a/api/api_v2/modules/coves/router.py b/api/api_v2/modules/coves/router.py deleted file mode 100644 index b43cb76..0000000 --- a/api/api_v2/modules/coves/router.py +++ /dev/null @@ -1,12 +0,0 @@ -from fastapi import APIRouter, HTTPException -from .schema import CoveBaseSchema -from typing import List -from uuid import UUID - -router = APIRouter() -# Aquí puedes definir tus endpoints relacionados con COVES usando el esquema CoveBaseSchema - -@router.post("/cove/", response_model=CoveBaseSchema) -async def create_cove(cove: CoveBaseSchema): - # Lógica para crear un COVE - return cove \ No newline at end of file diff --git a/api/api_v2/modules/coves/routers.py b/api/api_v2/modules/coves/routers.py new file mode 100644 index 0000000..b09b2c6 --- /dev/null +++ b/api/api_v2/modules/coves/routers.py @@ -0,0 +1,58 @@ +from fastapi import APIRouter, HTTPException +from .schemas import CoveListSchema, CoveRequestSchema +from typing import List +from uuid import UUID + +from .tasks import process_cove_request, process_acuse_cove_request + +router = APIRouter() +# Aquí puedes definir tus endpoints relacionados con COVES usando el esquema CoveBaseSchema + + + +@router.post("/services/cove/", response_model=dict) +async def get_cove(cove: CoveRequestSchema): + # Lógica para obtener un COVE + task = process_cove_request.delay(cove.model_dump()) + return {"task_id": task.id, "status": "submitted"} + + +@router.post("/services/all/coves", response_model=dict) +async def get_coves(coves_request: CoveListSchema): + # Lógica para obtener un COVE + task_ids = [] + coves_dict = coves_request.model_dump() + for cove in coves_dict.get('coves', []): + cove_dict = { + "cove": cove, + "pedimento": coves_dict.get('pedimento'), + "credencial": coves_dict.get('credencial') + } + task = process_cove_request.delay(cove_dict) + task_ids.append(task.id) + + + return {"task_id": task.id, "coves_tasks_ids": task_ids, "status": "submitted"} + + +@router.post("/services/acuse/cove/", response_model=dict) +async def get_acuse_cove(cove: CoveRequestSchema): + # Lógica para obtener un COVE + task = process_acuse_cove_request.delay(cove.model_dump()) + return {"task_id": task.id, "status": "submitted"} + +@router.post("/services/all/acuse/cove/") +async def get_acuses_cove(coves_request: CoveListSchema): + # Lógica para obtener un COVE + task_ids = [] + coves_dict = coves_request.model_dump() + for cove in coves_dict.get('coves', []): + acuse_dict = { + "cove": cove, + "pedimento": coves_dict.get('pedimento'), + "credencial": coves_dict.get('credencial') + } + task = process_acuse_cove_request.delay(acuse_dict) + task_ids.append(task.id) + + return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)} \ No newline at end of file diff --git a/api/api_v2/modules/coves/schema.py b/api/api_v2/modules/coves/schema.py deleted file mode 100644 index 8637266..0000000 --- a/api/api_v2/modules/coves/schema.py +++ /dev/null @@ -1,12 +0,0 @@ -from pydantic import BaseModel, Field, field_validator -from typing import Optional -from uuid import UUID - - -from api.api_v2.modules.pedimentos.schema import PedimentoBaseSchema -from schemas.CredencialSchema import CredencialBaseSchema - -class CoveBaseSchema(BaseModel): - cove: str = Field(..., description="ID del COVE asociado") - pedimento: PedimentoBaseSchema - credenciales: CredencialBaseSchema diff --git a/api/api_v2/modules/coves/schemas.py b/api/api_v2/modules/coves/schemas.py new file mode 100644 index 0000000..63cbecb --- /dev/null +++ b/api/api_v2/modules/coves/schemas.py @@ -0,0 +1,21 @@ +from pydantic import BaseModel, Field, field_validator +from typing import Optional +from uuid import UUID + + +from api.api_v2.modules.pedimentos.schemas import PedimentoBaseSchema +from schemas.CredencialSchema import CredencialBaseSchema + +class CoveBaseSchema(BaseModel): + id: int = Field(..., description="ID único del COVE") + cove: str = Field(..., description="Numero del COVE") + +class CoveRequestSchema(BaseModel): + cove: CoveBaseSchema + pedimento: PedimentoBaseSchema + credencial: CredencialBaseSchema + +class CoveListSchema(BaseModel): + coves: list[CoveBaseSchema] = Field(..., description="Lista de COVEs") + pedimento: PedimentoBaseSchema + credencial: CredencialBaseSchema diff --git a/api/api_v2/modules/coves/service.py b/api/api_v2/modules/coves/service.py deleted file mode 100644 index 772ed00..0000000 --- a/api/api_v2/modules/coves/service.py +++ /dev/null @@ -1,105 +0,0 @@ -import base64 -from http.client import HTTPException -import os -from controllers.RESTController import rest_controller -from controllers.SOAPController import soap_controller -from cryptography.hazmat.primitives import serialization, hashes -from cryptography.hazmat.primitives.asymmetric import padding -from cryptography.hazmat.primitives.serialization import load_der_private_key -import tempfile - - -def sign_chain_original(key_path: str, password: str, cadena_original: str) -> str: - with open(key_path, 'rb') as key_file: - private_key = load_der_private_key( - key_file.read(), - password=password.encode() if password else None - ) - - signature = private_key.sign( - cadena_original.encode(), - padding.PKCS1v15(), - hashes.SHA256() - ) - - return base64.b64encode(signature).decode() - -async def fetch_sign_and_cer(cadena_original: str, username: str, credenciales: dict, **kwargs): - cer = await rest_controller.get_cer(credenciales['id']) - if cer is None: - raise HTTPException(status_code=500, detail="No se pudo obtener el certificado para firmar el COVE") - certificado = base64.b64encode(cer).decode('utf-8') - - # Obtener la key como binario y guardarla en un archivo temporal - import tempfile - key_bytes = await rest_controller.get_key(credenciales['id']) - if key_bytes is None: - raise HTTPException(status_code=500, detail="No se pudo obtener la llave privada para firmar el COVE") - with tempfile.NamedTemporaryFile(delete=False) as tmp_key_file: - tmp_key_file.write(key_bytes) - tmp_key_path = tmp_key_file.name - - # Usar la ruta temporal para firmar - firma = sign_chain_original(tmp_key_path, credenciales['efirma'], cadena_original) - return firma, certificado, tmp_key_path - -async def consume_ws_get_cove(**kwargs): - - # valdiar kwargs - - # Cadena original que vas a firmar - - try: - cadena_original = f"|{username}|{cove['numero_cove']}|" - firma, certificado, tmp_key_path = await fetch_sign_and_cer(cadena_original, username, credenciales, **kwargs) - os.remove(tmp_key_path) # Eliminar el archivo temporal después de usarlo - - soap_xml = soap_controller.generate_cove_template( - username=username, - password=credenciales['password'], - certificado=certificado, - firma=firma, - cove=cove, - ) - - soap_headers = { - 'Content-Type': 'text/xml; charset=utf-8', - 'SOAPAction': '', - #'Accept-Encoding': 'gzip,deflate', - } - - soap_response = await soap_controller.make_request_async( - "ventanilla/ConsultarEdocumentService?wsdl", - data=soap_xml, - headers=soap_headers - ) - - if (soap_response) and (not soap_error(soap_response)): - remesas = 1 if response_service['pedimento'].get('remesas', 0) else 0 - patente = response_service['pedimento'].get('patente', 'N/A') - aduana = response_service['pedimento'].get('aduana', 'N/A') - no_partidas = response_service['pedimento'].get('numero_partidas', 0) - tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A') - pedimento = response_service['pedimento'].get('pedimento', 'N/A') - _file_name = f"vu_COVE_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{cove['numero_cove']}.xml" - - document_response = await rest_controller.post_document( - soap_response=soap_response, - organizacion=response_service['organizacion'], - pedimento=response_service['pedimento']['id'], - file_name=_file_name, - document_type=8, - ) - - return { - "servicio": response_service, - "documento": document_response - } - else: - raise HTTPException(status_code=500, detail="Error en la petición SOAP al servicio VUCEM") - except HTTPException: - # Re-lanzar HTTPExceptions sin modificar - raise - except Exception as e: - import traceback - raise HTTPException(status_code=500, detail=f"Error interno al procesar acuse cove: {str(e)}") diff --git a/api/api_v2/modules/coves/services.py b/api/api_v2/modules/coves/services.py new file mode 100644 index 0000000..7f878de --- /dev/null +++ b/api/api_v2/modules/coves/services.py @@ -0,0 +1,469 @@ +import base64 +import os +import logging +import re +import xml.etree.ElementTree as ET +from fastapi import HTTPException +from controllers.RESTController import rest_controller +from controllers.SOAPController import soap_controller +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.serialization import load_der_private_key +import tempfile + +from utils.helpers import soap_error +from .controllers import coves_vu_controller, coves_rest_controller + +# Logger para el módulo +logger = logging.getLogger(__name__) + +# Logica de negocio para consumir el servicio SOAP de VUCEM y procesar la respuesta +async def consume_ws_get_cove(**kwargs): + """ + Consume el servicio SOAP para obtener un COVE y procesar la respuesta. + + Args: + **kwargs: Debe contener 'credencial', 'pedimento' y 'cove' + + Returns: + Dict serializable con 'documento' y 'cove_put_response' + + Raises: + Exception: Si hay errores en el procesamiento + """ + try: + logger.info("Iniciando procesamiento de COVE") + + credenciales = kwargs.get('credencial') + username = credenciales.get('user') + pedimento_app = kwargs.get('pedimento', {}).get('pedimento_app', 'N/A') + cove = kwargs['cove'].get('cove', None) + + if not credenciales or not username or not cove: + raise Exception("Credenciales o COVE no proporcionados correctamente") + + logger.info(f"Procesando COVE: {cove} para usuario: {username}") + + # Generar cadena original y obtener firma/certificado + cadena_original = f"|{credenciales.get('user')}|{cove}|" + firma, certificado, tmp_key_path = await fetch_sign_and_cer(cadena_original, username, credenciales) + + # Limpiar archivo temporal inmediatamente + try: + os.remove(tmp_key_path) + logger.debug("Archivo temporal de llave eliminado") + except Exception as e: + logger.warning(f"Error al eliminar archivo temporal: {e}") + + # Generar template SOAP + soap_xml = coves_vu_controller.generate_cove_template( + username=username, + password=credenciales['password'], + certificado=certificado, + firma=firma, + cove=cove, + ) + + soap_headers = { + 'Content-Type': 'text/xml; charset=utf-8', + 'SOAPAction': '', + } + + logger.info("Enviando petición SOAP a VUCEM") + soap_response = await coves_vu_controller.make_request_async( + "ventanilla/ConsultarEdocumentService?wsdl", + data=soap_xml, + headers=soap_headers + ) + + if not soap_response: + raise Exception("No se recibió respuesta del servicio SOAP") + + if soap_error(soap_response): + raise Exception("Error en la respuesta del servicio SOAP") + + logger.info("Respuesta SOAP exitosa, enviando documento") + + # Enviar documento + _file_name = f"vu_COVE_{pedimento_app}_{cove}.xml" + document_response = await coves_rest_controller.post_document( + soap_response=soap_response, + organizacion=kwargs.get('pedimento').get('organizacion'), + pedimento=kwargs.get('pedimento').get('id'), + file_name=_file_name, + document_type=8, + ) + + logger.info("Documento enviado, actualizando status de COVE") + + # Actualizar status del COVE + cove_status_response = await change_cove_status( + cove=kwargs.get('cove'), + status=True, + pedimento=kwargs.get('pedimento') + ) + + logger.info(f"COVE {cove} procesado exitosamente") + + # Asegurar que la respuesta sea serializable + result = { + "documento": document_response if document_response else None, + "cove_update_response": cove_status_response if cove_status_response else None + } + + return result + + except Exception as e: + logger.error(f"Error procesando COVE: {str(e)}", exc_info=True) + # Asegurar que no se retornen datos binarios en el error + raise Exception(f"Error interno al procesar COVE: {str(e)}") + + +async def consume_ws_get_acuse_cove(**kwargs): + credenciales = kwargs.get('credencial') + soap_headers = { + 'Content-Type': 'text/xml; charset=utf-8', + 'SOAPAction': 'http://www.ventanillaunica.gob.mx/ventanilla/ConsultaAcusesService/consultarAcuseCove', + 'Accept-Encoding': 'gzip,deflate', + } + + + soap_xml = coves_vu_controller.generate_acuse_template( + username=credenciales.get('user'), + password=credenciales.get('password'), + cove=kwargs['cove'].get('cove', None), + ) + + + response = await coves_vu_controller.make_request_async( + "ventanilla-acuses-HA/ConsultaAcusesServiceWS?wsdl", + data=soap_xml, + headers=soap_headers + ) + + + + if response is None: + raise Exception("No se obtuvo respuesta del servicio SOAP.") + + if response.status_code != 200: + raise Exception(f"Error en la solicitud SOAP: {response.status}") + + if (response) and (not soap_error(response)): + logger.debug(f"Respuesta SOAP recibida, extrayendo acuse...") + acuse_base64 = _extract_acuse_data(response.text) + + if acuse_base64 is None: + logger.error("No se encontró elemento acuseDocumento en la respuesta") + logger.debug(f"Contenido de respuesta (primeros 1000 chars): {response.text[:1000]}") + else: + logger.error("Error en respuesta SOAP o soap_error detectado") + logger.debug(f"Contenido de respuesta con error: {response.text[:500] if response else 'No response'}") + + if acuse_base64 is None: + raise Exception("No se pudo extraer el acuse del documento de la respuesta SOAP.") + + + pdf_bytes = _decode_acuse_base64_content(acuse_base64) + + if not pdf_bytes: + raise HTTPException(status_code=500, detail="No se pudo decodificar el documento del acuse") + + # Validar que el PDF sea válido + if not pdf_bytes.startswith(b'%PDF'): + logger.warning("El contenido decodificado no parece ser un PDF válido") + + + # Mejorar el nombre del archivo usando todos los datos relevantes + pedimento = kwargs.get('pedimento', {}) + pedimento_num = pedimento.get('pedimento','') + _file_name = _get_file_name(**kwargs) + + # Validar que organización y pedimento no sean None + organizacion = pedimento.get("organizacion", None) + pedimento_id = pedimento.get("id", None) + + rest_response = await coves_rest_controller.post_document( + binary_content=pdf_bytes, + organizacion=organizacion, + pedimento=pedimento_id, + file_name=_file_name, + document_type=7 + ) + + acuse_status = await change_acuse_status( + cove=kwargs.get('cove'), + status=True, + pedimento=kwargs.get('pedimento') + ) + + return { + "document_response": rest_response, + "file_name": _file_name, + "pedimento": pedimento_num, + "acuse_update": acuse_status + } + +def _decode_acuse_base64_content(base64_content): # Testeado + """ + Decodifica el contenido Base64 del acuse y limpia caracteres especiales. + + Args: + base64_content (str): Contenido codificado en Base64 + + Returns: + bytes: Contenido decodificado o None si hay error + """ + try: + # Limpiar el contenido Base64 de manera exhaustiva + cleaned_content = base64_content + + # Remover entidades HTML/XML como , , etc. + cleaned_content = re.sub(r'&#x[0-9a-fA-F]+;', '', cleaned_content) + cleaned_content = re.sub(r'&#[0-9]+;', '', cleaned_content) + + # Remover espacios en blanco, saltos de línea, etc. + cleaned_content = re.sub(r'[\s\n\r\t]', '', cleaned_content) + + # Remover caracteres no válidos para Base64 + cleaned_content = re.sub(r'[^A-Za-z0-9+/=]', '', cleaned_content) + + + # Agregar padding si es necesario + missing_padding = len(cleaned_content) % 4 + if missing_padding: + cleaned_content += '=' * (4 - missing_padding) + + # Decodificar Base64 + decoded_content = base64.b64decode(cleaned_content) + + return decoded_content + + except Exception as e: + + # Intentar con validación estricta deshabilitada + try: + decoded_content = base64.b64decode(cleaned_content, validate=False) + return decoded_content + except Exception as e2: + return None + +def _extract_acuse_data(soap_response_text: str) -> str: + """ + Extrae el contenido base64 del acuse desde la respuesta SOAP. + + Args: + soap_response_text: Texto completo de la respuesta SOAP + + Returns: + str: Contenido base64 del acuse o None si no se encuentra + """ + try: + logger.debug("Iniciando extracción de datos del acuse") + + # Primero, extraer la parte XML del contenido multipart + xml_start = soap_response_text.find(' dict: + pedimento = kwargs.get('pedimento', {}) + pedimento_app = pedimento.get('pedimento_app', 'N/A') + cove = kwargs['cove'].get('cove', 'N/A') + _file_name = f"vu_AC_COVE_{pedimento_app}_{cove}.pdf" + return _file_name + + + +async def change_cove_status(cove: dict, status: bool, pedimento: dict): + data = { + "id": cove.get("id"), + "cove_descargado": status, + "numero_cove": cove.get("cove"), + "pedimento": pedimento.get("id"), + "organizacion": pedimento.get("organizacion"), + } + + response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data) + + return response + +async def change_acuse_status(cove: dict, status: bool, pedimento: dict): + data = { + "id": cove.get("id"), + "acuse_cove_descargado": status, + "numero_cove": cove.get("cove"), + "pedimento": pedimento.get("id"), + "organizacion": pedimento.get("organizacion"), + } + + print(data) + response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data) + + return response + +async def fetch_sign_and_cer(cadena_original: str, username: str, credenciales: dict): + """ + Obtiene certificado y llave, genera la firma para la cadena original. + + Args: + cadena_original: Cadena a firmar + username: Usuario de VUCEM + credenciales: Diccionario con credenciales + + Returns: + tuple: (firma_base64, certificado_base64, ruta_archivo_temporal) + + Raises: + Exception: Si no se pueden obtener los certificados o generar la firma + """ + try: + logger.debug("Obteniendo certificado desde API") + + # Obtener certificado como bytes + cer = await coves_rest_controller.get_cer(credenciales['id']) + if cer is None: + raise Exception("No se pudo obtener el certificado para firmar el COVE") + + # Convertir certificado a base64 string + certificado = base64.b64encode(cer).decode('utf-8') + logger.debug("Certificado obtenido y codificado exitosamente") + + # Obtener llave privada como bytes + logger.debug("Obteniendo llave privada desde API") + key_bytes = await coves_rest_controller.get_key(credenciales['id']) + if key_bytes is None: + raise Exception("No se pudo obtener la llave privada para firmar el COVE") + + # Crear archivo temporal para la llave (requerido por cryptography) + with tempfile.NamedTemporaryFile(delete=False, mode='wb') as tmp_key_file: + tmp_key_file.write(key_bytes) + tmp_key_path = tmp_key_file.name + + logger.debug(f"Llave privada guardada temporalmente en: {tmp_key_path}") + + # Generar firma usando el archivo temporal + firma = sign_chain_original(tmp_key_path, credenciales['efirma'], cadena_original) + logger.debug("Firma generada exitosamente") + + return firma, certificado, tmp_key_path + + except Exception as e: + logger.error(f"Error obteniendo certificado/llave o generando firma: {e}") + # Limpiar archivo temporal si existe + if 'tmp_key_path' in locals() and os.path.exists(tmp_key_path): + try: + os.remove(tmp_key_path) + except: + pass + raise Exception(f"Error en fetch_sign_and_cer: {str(e)}") + + +def sign_chain_original(key_path: str, password: str, cadena_original: str) -> str: + """ + Firma una cadena original usando una llave privada. + + Args: + key_path: Ruta al archivo de la llave privada + password: Password de la llave privada + cadena_original: Cadena a firmar + + Returns: + str: Firma en base64 + + Raises: + Exception: Si hay errores en el proceso de firma + """ + try: + logger.debug(f"Firmando cadena original: {cadena_original}") + + with open(key_path, 'rb') as key_file: + private_key = load_der_private_key( + key_file.read(), + password=password.encode() if password else None + ) + + signature = private_key.sign( + cadena_original.encode('utf-8'), + padding.PKCS1v15(), + hashes.SHA256() + ) + + firma_b64 = base64.b64encode(signature).decode('utf-8') + logger.debug("Cadena firmada exitosamente") + + return firma_b64 + + except Exception as e: + logger.error(f"Error firmando cadena original: {e}") + raise Exception(f"Error en sign_chain_original: {str(e)}") + diff --git a/api/api_v2/modules/coves/tasks.py b/api/api_v2/modules/coves/tasks.py index e69de29..d0c6837 100644 --- a/api/api_v2/modules/coves/tasks.py +++ b/api/api_v2/modules/coves/tasks.py @@ -0,0 +1,114 @@ +import asyncio +import logging +from celery import Celery +from celery_app import celery_app +from typing import Dict, Any + +from .services import consume_ws_get_cove, consume_ws_get_acuse_cove +from api.api_v2.modules.tasks.tasks import run_async_task + +# Logger para el módulo +logger = logging.getLogger(__name__) + + +@celery_app.task(bind=True) +def process_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: + """ + Tarea de Celery para procesar la solicitud de descarga de COVE. + + Args: + cove_request: Diccionario con los datos de la solicitud de COVE. + + Returns: + Diccionario con la respuesta del COVE procesado. + """ + try: + logger.info(f"Iniciando procesamiento de COVE - Task ID: {self.request.id}") + + # Actualizar progreso + self.update_state(state='PROGRESS', meta={'current': 10, 'total': 100, 'status': 'Iniciando procesamiento de COVE'}) + + # Usar run_async_task para ejecutar la función asíncrona + result = run_async_task(consume_ws_get_cove, **cove_request) + + # Actualizar progreso + self.update_state(state='SUCCESS', meta={'current': 100, 'total': 100, 'status': 'COVE procesado exitosamente'}) + + logger.info(f"COVE procesado exitosamente - Task ID: {self.request.id}") + + # Asegurar que la respuesta sea serializable para Celery + return { + "status": "processed", + "data": result, + "task_id": self.request.id + } + + except Exception as e: + error_msg = f"Error procesando COVE: {str(e)}" + logger.error(error_msg, exc_info=True) + + # Actualizar estado con error + self.update_state( + state='FAILURE', + meta={ + 'current': 0, + 'total': 100, + 'status': error_msg, + 'error': str(e) + } + ) + + # Re-lanzar excepción para que Celery la registre + raise e + + +@celery_app.task(bind=True) +def process_acuse_cove_request(self, cove_request: Dict[str, Any]) -> Dict[str, Any]: + """ + Tarea de Celery para procesar la solicitud de acuse de COVE. + + Args: + cove_request: Diccionario con los datos de la solicitud de acuse. + + Returns: + Diccionario con la respuesta del acuse procesado. + """ + try: + logger.info(f"Iniciando procesamiento de acuse de COVE - Task ID: {self.request.id}") + + # Actualizar progreso + self.update_state(state='PROGRESS', meta={'current': 10, 'total': 100, 'status': 'Iniciando procesamiento de acuse de COVE'}) + + # Usar run_async_task para ejecutar la función asíncrona + result = run_async_task(consume_ws_get_acuse_cove, **cove_request) + + # Actualizar progreso + self.update_state(state='SUCCESS', meta={'current': 100, 'total': 100, 'status': 'Acuse de COVE procesado exitosamente'}) + + logger.info(f"Acuse de COVE procesado exitosamente - Task ID: {self.request.id}") + + # Asegurar que la respuesta sea serializable para Celery + return { + "status": "processed", + "data": result, + "task_id": self.request.id + } + + except Exception as e: + error_msg = f"Error procesando acuse de COVE: {str(e)}" + logger.error(error_msg, exc_info=True) + + # Actualizar estado con error + self.update_state( + state='FAILURE', + meta={ + 'current': 0, + 'total': 100, + 'status': error_msg, + 'error': str(e) + } + ) + + # Re-lanzar excepción para que Celery la registre + raise e + diff --git a/api/api_v2/modules/partidas/router.py b/api/api_v2/modules/edocs/__init__.py similarity index 100% rename from api/api_v2/modules/partidas/router.py rename to api/api_v2/modules/edocs/__init__.py diff --git a/api/api_v2/modules/edocs/controllers.py b/api/api_v2/modules/edocs/controllers.py new file mode 100644 index 0000000..4271570 --- /dev/null +++ b/api/api_v2/modules/edocs/controllers.py @@ -0,0 +1,63 @@ +# controllers.py +from controllers.RESTController import APIRESTController +from controllers.SOAPController import VUCEMController +from typing import List, Dict, Any + +class EdocVuController(VUCEMController): + """ + Controlador para interactuar con el servicio SOAP de la Ventanilla Única + para la descarga de documentos electrónicos (edocs). + """ + def __init__(self): + super().__init__() + + def generate_edoc_template(self, **kwargs) -> str: + """ + Genera el template XML de la solicitud SOAP para un edoc. + """ + credencial = kwargs.get("credencial", {}) + username = credencial.get("user") + password = credencial.get("password") + # Aquí usamos `numero_documento` en lugar de `idEDocument` para reflejar el esquema de edocs + numero_documento = kwargs['edoc'].get("numero_edocument", "N/A") + + soap_template = f''' + + + + + {username} + {password} + + + + + + {numero_documento} + + + + ''' + return soap_template + +class EdocRESTController(APIRESTController): + """ + Controlador para interactuar con la API REST interna para + guardar documentos electrónicos. + """ + def __init__(self): + super().__init__() + + async def put_edocument(self, edocument_id: str, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Método para actualizar un documento digitalizado en la API. + + Args: + edocument_id: UUID del documento a actualizar + data: Diccionario con los datos a actualizar + """ + return await self._make_request_async('PUT', f'customs/edocuments/{edocument_id}/', data=data) + +# Instancias de los controladores que serán importadas en services.py +edocs_vu_controller = EdocVuController() +edocs_rest_controller = EdocRESTController() \ No newline at end of file diff --git a/api/api_v2/modules/edocs/routers.py b/api/api_v2/modules/edocs/routers.py new file mode 100644 index 0000000..9528b1b --- /dev/null +++ b/api/api_v2/modules/edocs/routers.py @@ -0,0 +1,41 @@ +from fastapi import APIRouter, HTTPException, Depends +from typing import Dict, Any, Optional +from .schemas import EdocumentsSchema, EdocumentsMasivoSchema +from .tasks import process_edoc_download_request, process_edocs_masivo_download_request +from api.api_v2.modules.authentication.services import get_current_user + +router = APIRouter() + + +# --- Nuevas rutas para la descarga de edocs --- + +@router.post("/services/download/edoc/", response_model=Dict[str, Any]) +async def download_edoc(edoc_request: EdocumentsSchema): + """ + Endpoint para iniciar la descarga de un documento electrónico (edoc). + """ + edoc_dict = edoc_request.model_dump() + # Ejecuta la tarea de Celery de forma asíncrona + task = process_edoc_download_request.delay(edoc_dict) + # Devuelve el ID de la tarea + return {"task_id": task.id, "status": "submitted"} + +@router.post("/services/download/all/edocs/", response_model=Dict[str, Any]) +async def download_edocs_masivo(edoc_request: EdocumentsMasivoSchema): + """ + Endpoint para iniciar la descarga masiva de documentos electrónicos (edocs). + """ + task_ids = [] + edoc_request_dict = edoc_request.model_dump() + # Para cada edoc en la lista, dispara una tarea Celery + for edoc in edoc_request_dict.get('edocs', []): + # Crea un nuevo diccionario de datos para cada tarea + edoc_dict = { + "pedimento": edoc_request_dict.get('pedimento'), + "credencial": edoc_request_dict.get('credencial'), + "edoc": edoc.get('edoc') + } + task = process_edoc_download_request.delay(edoc_dict) + task_ids.append(task.id) + + return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)} \ No newline at end of file diff --git a/api/api_v2/modules/edocs/schemas.py b/api/api_v2/modules/edocs/schemas.py new file mode 100644 index 0000000..fd32341 --- /dev/null +++ b/api/api_v2/modules/edocs/schemas.py @@ -0,0 +1,22 @@ +from pydantic import BaseModel + +from schemas.CredencialSchema import CredencialBaseSchema +from api.api_v2.modules.pedimentos.schemas import PedimentoBaseSchema + + +class EdocumentBaseSchema(BaseModel): + id: int + numero_edocument: str + +#Aplica para EDocuments +class EdocumentsSchema(BaseModel): + edoc : EdocumentBaseSchema + pedimento: PedimentoBaseSchema + credencial: CredencialBaseSchema + + +class EdocumentsMasivoSchema(BaseModel): + edocs: list[EdocumentBaseSchema] + pedimento: PedimentoBaseSchema + credencial: CredencialBaseSchema + \ No newline at end of file diff --git a/api/api_v2/modules/edocs/services.py b/api/api_v2/modules/edocs/services.py new file mode 100644 index 0000000..3d4060a --- /dev/null +++ b/api/api_v2/modules/edocs/services.py @@ -0,0 +1,188 @@ +from http.client import HTTPException +import base64 +import re +import logging +import xml.etree.ElementTree as ET + +from utils.helpers import soap_error +from .controllers import edocs_rest_controller, edocs_vu_controller + +logger = logging.getLogger("app.api") + +soap_headers = { + 'Content-Type': 'text/xml; charset=utf-8', + 'SOAPAction': 'http://www.ventanillaunica.gob.mx/ventanilla/EdocumentosService/consultarEdocumento', + 'Accept-Encoding': 'gzip,deflate', +} + +# --- FUNCIONES AUXILIARES --- + +def _decode_base64_content(base64_content): + try: + cleaned_content = re.sub(r'&#x[0-9a-fA-F]+;', '', base64_content) + cleaned_content = re.sub(r'&#[0-9]+;', '', cleaned_content) + cleaned_content = re.sub(r'[\s\n\r\t]', '', cleaned_content) + cleaned_content = re.sub(r'[^A-Za-z0-9+/=]', '', cleaned_content) + + missing_padding = len(cleaned_content) % 4 + if missing_padding: + cleaned_content += '=' * (4 - missing_padding) + + return base64.b64decode(cleaned_content) + except Exception as e: + logger.error(f"Error al decodificar Base64: {e}") + try: + return base64.b64decode(cleaned_content, validate=False) + except Exception: + return None + + +def _extract_edoc_data(soap_response_text: str) -> str: + try: + xml_start = soap_response_text.find(' str: + pedimento = kwargs.get('pedimento', {}) + pedimento_app = pedimento.get('pedimento_app', 'N/A') + idEDocument = kwargs['edoc'].get('numero_edocument', 'N/A') + return f"vu_ED_{pedimento_app}_{idEDocument}.pdf" + +# --- FUNCIONES DE SERVICIO --- + +async def obtener_edoc(**kwargs): + soap_xml = edocs_vu_controller.generate_edoc_template(**kwargs) + + response = await edocs_vu_controller.make_request_async( + "ventanilla-edocs-HA/EdocumentosServiceWS?wsdl", + data=soap_xml, + headers=soap_headers + ) + + if response is None: + raise Exception("No se obtuvo respuesta del servicio SOAP.") + + if response.status_code != 200: + raise Exception(f"Error en la solicitud SOAP: {response.status_code}") + + if soap_error(response): + raise Exception("Respuesta SOAP contiene error de VUCEM.") + + edoc_base64 = _extract_edoc_data(response.text) + if edoc_base64 is None: + raise Exception("No se pudo extraer el documento de la respuesta SOAP.") + + pdf_bytes = _decode_base64_content(edoc_base64) + if not pdf_bytes: + raise HTTPException(status_code=500, detail="No se pudo decodificar el documento") + + if not pdf_bytes.startswith(b'%PDF'): + logger.warning("El contenido decodificado no parece ser un PDF válido") + + pedimento = kwargs.get('pedimento', {}) + numero_documento = kwargs['edoc'].get('numero_edocument', '') + _file_name = _get_file_name(**kwargs) + + organizacion = pedimento.get("organizacion", None) + pedimento_id = pedimento.get("id", None) + + try: + with open(_file_name, "wb") as f: + f.write(pdf_bytes) + logger.info(f"PDF guardado localmente en {_file_name}") + except Exception as e: + logger.error(f"Error guardando el PDF localmente: {e}") + + rest_response = await edocs_rest_controller.post_document( + binary_content=pdf_bytes, + organizacion=organizacion, + pedimento=pedimento_id, + file_name=_file_name, + document_type=5 + ) + + if rest_response is None: + raise Exception("No se pudo enviar el documento a la API interna.") + if rest_response.get("id") is None: + raise Exception("La respuesta de la API interna no contiene un ID válido.") + + logger.info("Documento enviado, actualizando status de Edoc") + + edoc_status_response = await change_edocument_status( + edoc=kwargs.get('edoc'), + status=True, + pedimento=pedimento + ) + + return { + "document_response": rest_response, + "file_name": _file_name, + "numero_documento": numero_documento, + "edoc_update_response": edoc_status_response if edoc_status_response else None + } + + +async def change_edocument_status(edoc: dict, status: bool, pedimento: dict): + data = { + "id": edoc.get("id"), + "acuse_descargado": status, + "numero_edocument": edoc.get("numero_edocument"), + "pedimento": pedimento.get("id"), + "organizacion": pedimento.get("organizacion"), + } + + response = await edocs_rest_controller.put_edocument(edocument_id=edoc.get("id"), data=data) + + return response + +async def obtener_edocs_masivo(**kwargs): + logger.info("Iniciando la orquestación de descarga masiva de Edocs.") + numeros_documentos = kwargs.get("edocs", []) + if not numeros_documentos: + return {"status": "warning", "message": "No se encontraron números de documento para procesar."} + + for edoc in numeros_documentos: + try: + logger.info(f"Procesando Edoc: {edoc.get('numero_edocument', 'N/A')}") + edoc = { + "edoc": edoc, + "pedimento": kwargs.get("pedimento"), + "credencial": kwargs.get("credencial") + } + await obtener_edoc(**edoc) + logger.info(f"Edoc {edoc.get('numero_edocument', 'N/A')} procesado exitosamente.") + except Exception as e: + logger.error(f"Error procesando Edoc {edoc.get('numero_edocument', 'N/A')}: {str(e)}", exc_info=True) + continue # Continuar con el siguiente edoc en caso de error + + return { + "status": "pending", + "total_documentos": len(numeros_documentos), + "message": "La orquestación de descarga masiva ha sido registrada." + } diff --git a/api/api_v2/modules/edocs/tasks.py b/api/api_v2/modules/edocs/tasks.py index e69de29..c5cc89b 100644 --- a/api/api_v2/modules/edocs/tasks.py +++ b/api/api_v2/modules/edocs/tasks.py @@ -0,0 +1,43 @@ +from celery_app import celery_app + +from .services import obtener_edoc, obtener_edocs_masivo +import asyncio # Necesario para ejecutar funciones async dentro de Celery + +@celery_app.task(bind=True) +def process_edoc_download_request(self, edoc_data: dict): + """ + Tarea de Celery para procesar la descarga de un solo documento edoc. + """ + try: + # Ejecutar la función asíncrona dentro del hilo síncrono de Celery + loop = asyncio.get_event_loop() + result = loop.run_until_complete(obtener_edoc(**edoc_data)) + + return {"status": "success", "result": result} + except Exception as e: + # Manejo de errores + self.update_state( + state='FAILURE', + meta={'exc_type': type(e).__name__, 'exc_message': str(e)} + ) + # Es crucial volver a lanzar la excepción para que Celery la marque como fallida + raise e + +@celery_app.task(bind=True) +def process_edocs_masivo_download_request(self, edoc_data: dict): + """ + Tarea de Celery para procesar la descarga de múltiples documentos edoc. + Esta tarea orquesta la ejecución, pero puede delegar en el servicio. + """ + try: + # Ejecutar la función asíncrona dentro del hilo síncrono de Celery + loop = asyncio.get_event_loop() + result = loop.run_until_complete(obtener_edocs_masivo(**edoc_data)) + + return {"status": "success", "result": result} + except Exception as e: + self.update_state( + state='FAILURE', + meta={'exc_type': type(e).__name__, 'exc_message': str(e)} + ) + raise \ No newline at end of file diff --git a/api/api_v2/modules/partidas/schema.py b/api/api_v2/modules/partidas/__init__.py similarity index 100% rename from api/api_v2/modules/partidas/schema.py rename to api/api_v2/modules/partidas/__init__.py diff --git a/api/api_v2/modules/partidas/controllers.py b/api/api_v2/modules/partidas/controllers.py new file mode 100644 index 0000000..56ddf5c --- /dev/null +++ b/api/api_v2/modules/partidas/controllers.py @@ -0,0 +1,69 @@ +from controllers.RESTController import APIRESTController +from controllers.SOAPController import VUCEMController +from typing import List, Dict, Any +import xml.etree.ElementTree as ET +from dataclasses import dataclass +from typing import List, Dict + +class PartidaRestController(APIRESTController): + def __init__(self): + super().__init__() + + async def put_partida(self, partida_id: str, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Método para actualizar un documento digitalizado en la API. + + Args: + edocument_id: UUID del documento a actualizar + data: Diccionario con los datos a actualizar + """ + return await self._make_request_async('PUT', f'customs/partidas/{partida_id}/', data=data) + +class PartidaVUController(VUCEMController): + def __init__(self): + super().__init__() # Implementación específica para Coves VU + + def generate_partidas_template(self, username: str, password: str, aduana: str, patente: str, pedimento: str, numero_operacion: str, partida: str) -> str: + """ + Genera el template SOAP para consultar partidas de un pedimento + + Args: + username: Usuario de VUCEM + password: Contraseña de VUCEM + aduana: Código de aduana + patente: Número de patente + pedimento: Número de pedimento + + Returns: + str: Template SOAP XML completo + """ + soap_template = f''' + + + + + {username} + {password.strip()} + + + + + + + {aduana} + {patente} + {pedimento} + {numero_operacion} + {partida} + + + + + ''' + + return soap_template + +partida_rest_controller = PartidaRestController() +partida_vu_controller = PartidaVUController() + + diff --git a/api/api_v2/modules/partidas/routers.py b/api/api_v2/modules/partidas/routers.py new file mode 100644 index 0000000..b33e5a5 --- /dev/null +++ b/api/api_v2/modules/partidas/routers.py @@ -0,0 +1,43 @@ +from fastapi import APIRouter, HTTPException, Depends +from fastapi.responses import JSONResponse +from typing import Dict, Any, List, Optional + + + +from api.api_v2.modules.authentication.services import get_current_user +from .schemas import PartidaRequestSchema, PartidaListSchema +from .tasks import process_partida_request + +router = APIRouter() + +@router.post("/services/partida/", response_model=Dict[str, Any]) +async def obtener_partida(partida_request: PartidaRequestSchema): + """ + Endpoint para obtener la información de una partida específica. + """ + + acuse_dict = partida_request.model_dump() + # Ejecuta la tarea de Celery de forma asíncrona + task = process_partida_request.delay(acuse_dict) + # Puedes devolver el ID de la tarea para consultar el estado después + return {"task_id": task.id, "status": "submitted"} + +@router.post("/services/all/partidas/", response_model=Dict[str, Any]) +async def obtener_partidas(partidas_request: PartidaListSchema): + """ + Endpoint para iniciar la descarga masiva de partidas. + """ + task_ids = [] + partida_request_dict = partidas_request.model_dump() + # Para cada partida en la lista, dispara una tarea Celery + for partida in partida_request_dict.get('partidas', []): + # Crea un nuevo diccionario de datos para cada tarea + partida_dict = { + "partida": partida, + "pedimento": partida_request_dict.get('pedimento'), + "credencial": partida_request_dict.get('credencial') + } + task = process_partida_request.delay(partida_dict) + task_ids.append(task.id) + + return {"task_ids": task_ids, "status": "submitted", "total": len(task_ids)} \ No newline at end of file diff --git a/api/api_v2/modules/partidas/schemas.py b/api/api_v2/modules/partidas/schemas.py new file mode 100644 index 0000000..82bb30a --- /dev/null +++ b/api/api_v2/modules/partidas/schemas.py @@ -0,0 +1,20 @@ +from pydantic import BaseModel, Field + +from schemas.CredencialSchema import CredencialBaseSchema +from api.api_v2.modules.pedimentos.schemas import PedimentoBaseSchema + +class PartidaBaseSchema(BaseModel): + id: int + numero: int + +class PartidaRequestSchema(BaseModel): + partida: PartidaBaseSchema + pedimento: PedimentoBaseSchema + credencial: CredencialBaseSchema + +class PartidaListSchema(BaseModel): + partidas: list[PartidaBaseSchema] = Field(..., description="Lista de partidas") + pedimento: PedimentoBaseSchema + credencial: CredencialBaseSchema + + diff --git a/api/api_v2/modules/partidas/services.py b/api/api_v2/modules/partidas/services.py new file mode 100644 index 0000000..d37dab9 --- /dev/null +++ b/api/api_v2/modules/partidas/services.py @@ -0,0 +1,122 @@ +import base64 +import os +import logging +import re +import xml.etree.ElementTree as ET +from fastapi import HTTPException +from controllers.RESTController import rest_controller +from controllers.SOAPController import soap_controller +from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.serialization import load_der_private_key +import tempfile + +from utils.helpers import soap_error +from .controllers import partida_rest_controller, partida_vu_controller + +# Logger para el módulo +logger = logging.getLogger(__name__) + +# Logica de negocio para consumir el servicio SOAP de VUCEM y procesar la respuesta +async def consume_ws_get_partida(**kwargs): + """ + Consume el servicio SOAP para obtener un partida y procesar la respuesta. + + Args: + **kwargs: Debe contener 'credencial', 'pedimento' y 'partida' + + Returns: + Dict serializable con 'documento' y 'partida_put_response' + Raises: + Exception: Si hay errores en el procesamiento + """ + try: + logger.info("Iniciando procesamiento de partidas") + credenciales = kwargs.get('credencial') + username = credenciales.get('user') + pedimento_app = kwargs.get('pedimento', {}).get('pedimento_app', 'N/A') + partida = kwargs.get('partida', {}) + + if not credenciales or not username or not partida: + raise Exception("Credenciales o Partida no proporcionados correctamente") + + logger.info(f"Procesando Partida: {partida} para usuario: {username}") + + # Generar template SOAP + + soap_xml = partida_vu_controller.generate_partidas_template( + username=username, + password=credenciales.get('password'), + aduana=kwargs.get('pedimento', {}).get('aduana', 'N/A'), + patente=kwargs.get('pedimento', {}).get('patente', 'N/A'), + pedimento=kwargs.get('pedimento', {}).get('pedimento', 'N/A'), + numero_operacion=kwargs.get('pedimento', {}).get('numero_operacion', ''), + partida=partida.get('numero', '') + ) + + soap_headers = { + 'Content-Type': 'text/xml; charset=utf-8' + } + + logger.info("Enviando petición SOAP a VUCEM") + soap_response = await partida_vu_controller.make_request_async( + "/ventanilla-ws-pedimentos/ConsultarPartidaService", + data=soap_xml, + headers=soap_headers + ) + + + if not soap_response: + raise Exception("No se recibió respuesta del servicio SOAP") + + if soap_error(soap_response): + raise Exception("Error en la respuesta del servicio SOAP") + + logger.info("Respuesta SOAP exitosa, enviando documento") + + # Enviar documento + _file_name = f"vu_PT_{pedimento_app}_{partida.get('numero', '')}.xml" + document_response = await partida_rest_controller.post_document( + soap_response=soap_response, + organizacion=kwargs.get('pedimento').get('organizacion'), + pedimento=kwargs.get('pedimento').get('id'), + file_name=_file_name, + document_type=1, + ) + + logger.info("Documento enviado, actualizando status de Partida") + + # Actualizar status del partida + partida_status_response = await change_partida_status( + partida=kwargs.get('partida'), + status=True, + pedimento=kwargs.get('pedimento') + ) + + logger.info(f"Partida {partida.get('numero', '')} procesado exitosamente") + + # Asegurar que la respuesta sea serializable + result = { + "documento": document_response if document_response else None, + "partida_update_response": partida_status_response if partida_status_response else None + } + + return result + + except Exception as e: + logger.error(f"Error procesando la partida: {str(e)}", exc_info=True) + # Asegurar que no se retornen datos binarios en el error + raise Exception(f"Error interno al procesar la partida: {str(e)}") + +async def change_partida_status(partida: dict, status: bool, pedimento: dict): + data = { + "id": partida.get("id"), + "numero_partida": partida.get("numero"), + "descargado": status, + "pedimento": pedimento.get("id"), + "organizacion": pedimento.get("organizacion"), + } + print(data) + response = await partida_rest_controller.put_partida(partida_id=partida.get("id"), data=data) + + return response \ No newline at end of file diff --git a/api/api_v2/modules/partidas/tasks.py b/api/api_v2/modules/partidas/tasks.py index e69de29..8e10579 100644 --- a/api/api_v2/modules/partidas/tasks.py +++ b/api/api_v2/modules/partidas/tasks.py @@ -0,0 +1,26 @@ +from celery import Celery +from celery_app import celery_app +import asyncio +import logging +from typing import Dict, Any +from contextlib import asynccontextmanager + +from .services import consume_ws_get_partida + + +@celery_app.task +def process_partida_request(partida_request: Dict[str, Any]) -> Dict[str, Any]: + """ + Tarea de Celery para procesar la solicitud de partida. + + Args: + partida_request: Diccionario con los datos de la solicitud de partida. + + Returns: + Diccionario con la respuesta de la partida. + """ + loop = asyncio.get_event_loop() + partida_response = loop.run_until_complete(consume_ws_get_partida(**partida_request)) + + return {"status": "processed", "data": partida_response} + diff --git a/api/api_v2/modules/pedimentos/__init__.py b/api/api_v2/modules/pedimentos/__init__.py new file mode 100644 index 0000000..c19c78e --- /dev/null +++ b/api/api_v2/modules/pedimentos/__init__.py @@ -0,0 +1 @@ +from . import tasks \ No newline at end of file diff --git a/api/api_v2/modules/pedimentos/controllers.py b/api/api_v2/modules/pedimentos/controllers.py new file mode 100644 index 0000000..9dbed62 --- /dev/null +++ b/api/api_v2/modules/pedimentos/controllers.py @@ -0,0 +1,379 @@ +from controllers.RESTController import APIRESTController +from controllers.SOAPController import VUCEMController +from typing import List, Dict, Any +import xml.etree.ElementTree as ET +from dataclasses import dataclass +from typing import List, Dict + +class PedimentoController(APIRESTController): + def __init__(self): + super().__init__() + + async def put_pedimento(self, pedimento_id: str, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Método para actualizar un pedimento en la API. + """ + return await self._make_request_async('PUT', f'customs/pedimentos/{pedimento_id}/', data=data) + + async def post_edocument(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Método para enviar un documento digitalizado a la API. + + Args: + data: Diccionario con los datos del documento a enviar + """ + return await self._make_request_async('POST', 'customs/edocuments/', data=data) + + async def post_cove(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Método para enviar un número de COVE a la API. + + Args: + data: Diccionario con los datos del COVE a enviar + """ + return await self._make_request_async('POST', 'customs/coves/', data=data) + + async def put_edocument(self, edocument_id: str, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Método para actualizar un documento digitalizado en la API. + + Args: + edocument_id: UUID del documento a actualizar + data: Diccionario con los datos a actualizar + """ + return await self._make_request_async('PUT', f'customs/edocuments/{edocument_id}/', data=data) + +class PedimentoVUController(VUCEMController): + def __init__(self): + super().__init__() # Implementación específica para Coves VU + + def generate_remesas_template(self, username: str, password: str, aduana: str, patente: str, numero_operacion: str, pedimento: str) -> str: + """ + Genera el template SOAP para consultar remesas + + Args: + username: Usuario de VUCEM + password: Contraseña de VUCEM + aduana: Código de aduana + patente: Número de patente + + Returns: + str: Template SOAP XML completo + """ + soap_template = f''' + + + + + {username} + {password} + + + + + + {numero_operacion} + + {aduana} + {patente} + {pedimento} + + + + ''' + return soap_template + + def generate_pedimento_completo_template(self, username: str, password: str, aduana: str, patente: str, pedimento: str) -> str: + """ + Genera el template SOAP para consultar pedimento completo + + Args: + username: Usuario de VUCEM + password: Contraseña de VUCEM + aduana: Código de aduana + patente: Número de patente + pedimento: Número de pedimento + + Returns: + str: Template SOAP XML completo + """ + soap_template = f''' + + + + {username} + {password} + + + + + + + {aduana} + {patente} + {pedimento} + + + + ''' + + return soap_template + +# Pedimento Completo +@dataclass +class PedimentoXMLScraper: # Clase me extrae datos de Pedimento + """ + Clase para manejar la extracción de datos de un XML. + """ + + def _get_numero_operacion(self, root: ET.Element) -> str: + """ + Método para obtener el número de operación del XML. + + Args: + root: Elemento raíz del XML. + + Returns: + Número de operación como string. + """ + numero_operacion = root.find('.//ns2:numeroOperacion', namespaces={'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto'}) + return numero_operacion.text if numero_operacion is not None else None + + def _get_pedimento(self, root: ET.Element) -> str: + """ + Método para obtener el pedimento del XML. + + Args: + root: Elemento raíz del XML. + + Returns: + Pedimento como string. + """ + pedimento = root.find('.//ns2:pedimento/ns2:pedimento', namespaces={'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto'}) + return pedimento.text if pedimento is not None else None + + def _get_curp_apoderado(self, root: ET.Element) -> str: + """ + Método para obtener el CURP del apoderado del XML. + + Args: + root: Elemento raíz del XML. + + Returns: + CURP del apoderado como string. + """ + curp_apoderado = root.find('.//ns2:curpApoderadomandatario', namespaces={'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto'}) + return curp_apoderado.text if curp_apoderado is not None else None + + def _get_agente_aduanal(self, root: ET.Element) -> str: + """ + Método para obtener el RFC del agente aduanal del XML. + + Args: + root: Elemento raíz del XML. + + Returns: + RFC del agente aduanal como string. + """ + agente_aduanal = root.find('.//ns2:rfcAgenteAduanalSocFactura', namespaces={'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto'}) + return agente_aduanal.text if agente_aduanal is not None else None + + def _get_partidas(self, root: ET.Element) -> int: + """ + Método para obtener el número máximo de partidas del XML. + + Args: + root: Elemento raíz del XML. + + Returns: + Número máximo de partidas como entero. + """ + partidas_elements = root.findall('.//ns2:partidas', namespaces={'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto'}) + partidas_values = [] + for elem in partidas_elements: + try: + if elem.text is not None: + partidas_values.append(int(elem.text)) + except ValueError: + continue + + return max(partidas_values) if partidas_values else None + + def _get_identificadores_ed(self, root: ET.Element) -> list: + """ + Método para obtener todos los identificadores con clave 'ED' del XML. + + Args: + root: Elemento raíz del XML. + + Returns: + Lista de diccionarios con los datos de identificadores ED. + """ + namespaces = { + 'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto', + 'ns': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/comunes' + } + identificadores_ed = [] + + # Buscar todos los elementos identificadores + identificadores_elements = root.findall('.//ns2:identificadores/ns2:identificadores', namespaces) + + for identificador in identificadores_elements: + try: + # Extraer la clave del identificador (está dentro de claveIdentificador con namespace) + clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces) + clave = clave_elem.text if clave_elem is not None else None + + # Solo procesar si la clave es 'ED' + if clave == 'ED': + # Extraer descripción (con namespace) + descripcion_elem = identificador.find('ns:claveIdentificador/ns:descripcion', namespaces) + descripcion = descripcion_elem.text if descripcion_elem is not None else None + + # Extraer complemento1 (con namespace) + complemento1_elem = identificador.find('ns:complemento1', namespaces) + complemento1 = complemento1_elem.text if complemento1_elem is not None else None + + # Agregar a la lista si tenemos los datos básicos + if clave and complemento1: + identificadores_ed.append({ + 'clave': clave, + 'descripcion': descripcion, + 'complemento1': complemento1 + }) + + except Exception as e: + # Log del error pero continuar procesando otros identificadores + print(f"Error procesando identificador: {e}") + continue + + return identificadores_ed + + def _remesas(self, root: ET.Element) -> bool: + """ + Método para verificar si el pedimento tiene remesas. + Busca identificadores con clave 'RC' (REMESAS DE CONSOLIDADO). + + Args: + root: Elemento raíz del XML. + + Returns: + True si encuentra identificadores con clave 'RC', False en caso contrario. + """ + namespaces = { + 'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto', + 'ns': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/comunes' + } + + # Buscar todos los elementos identificadores + identificadores_elements = root.findall('.//ns2:identificadores/ns2:identificadores', namespaces) + + for identificador in identificadores_elements: + try: + # Extraer la clave del identificador + clave_elem = identificador.find('ns:claveIdentificador/ns:clave', namespaces) + clave = clave_elem.text if clave_elem is not None else None + + # Si encontramos una clave 'RC', el pedimento tiene remesas + if clave == 'RC': + return True + + except Exception as e: + # Log del error pero continuar procesando otros identificadores + print(f"Error procesando identificador para remesas: {e}") + continue + + print("No se encontraron remesas (sin identificadores RC)") + return False + + def _get_tipo_operacion(self, root: ET.Element) -> str: + """ + Método para obtener el tipo de operación del XML. + + Args: + root: Elemento raíz del XML. + + Returns: + Tipo de operación como string. + """ + tipo_operacion = root.find('.//ns2:tipoOperacion/ns2:clave', namespaces={'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto'}) + return tipo_operacion.text if tipo_operacion is not None else None + + def _get_cove(self, root: ET.Element) -> str: + """ + Método para obtener el número de COVE del XML. + + Args: + root: Elemento raíz del XML. + + Returns: + Número de COVE como string. + """ + namespaces = { + 'ns2': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarpedimentocompleto', + 'ns': 'http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/comunes' + } + facturas = root.findall('.//ns2:facturas', namespaces=namespaces) + coves = [] + for factura in facturas: + cove = factura.find('ns2:numero', namespaces) + if cove is not None: + coves.append(cove.text) + else: + print("No se encontró en la factura.") + + return coves if coves else None + + def extract_data(self, xml_content: str) -> dict: + """ + Método para extraer datos específicos del XML. + + Args: + xml_content: Contenido del XML como string. + + Returns: + Diccionario con los datos extraídos. + """ + try: + root = ET.fromstring(xml_content) + + # Extraer datos con manejo de errores individual + data = {} + + data['numero_operacion'] = self._get_numero_operacion(root) + data['pedimento'] = self._get_pedimento(root) + data['curp_apoderado'] = self._get_curp_apoderado(root) + data['agente_aduanal'] = self._get_agente_aduanal(root) + data['numero_partidas'] = self._get_partidas(root) + data['identificadores_ed'] = self._get_identificadores_ed(root) + data['remesas'] = self._remesas(root) + data['tipo_operacion'] = self._get_tipo_operacion(root) + data['coves'] = self._get_cove(root) + + # Verificar que se extrajeron los datos esenciales + if not any([data['numero_operacion'], data['pedimento'], data['curp_apoderado'], data['agente_aduanal'], data['coves']]): + return {} + + return data + + except ET.ParseError as e: + print(f"Error al parsear el XML: {e}") + return {} + except Exception as e: + print(f"Error inesperado al extraer datos del XML: {e}") + return {} + + + +pedimento_rest_controller = PedimentoController() +pedimento_vu_controller = PedimentoVUController() +pedimento_xml_scraper = PedimentoXMLScraper() + + diff --git a/api/api_v2/modules/pedimentos/routers.py b/api/api_v2/modules/pedimentos/routers.py new file mode 100644 index 0000000..a5f609d --- /dev/null +++ b/api/api_v2/modules/pedimentos/routers.py @@ -0,0 +1,21 @@ +from fastapi import APIRouter, BackgroundTasks, status, HTTPException +from fastapi.responses import JSONResponse +from .schemas import PedimentoCompletoRequestSchema +from .tasks import process_pedimento_completo_request +import logging +logger = logging.getLogger("app.api") + +router = APIRouter() + +@router.post("/services/pedimento_completo", status_code=status.HTTP_202_ACCEPTED) +async def download_pedimento_completo(Pedimento: PedimentoCompletoRequestSchema): + """ + Endpoint para iniciar la descarga completa de un pedimento. + """ + pedimento_dict = Pedimento.model_dump() + + + # Ejecuta la tarea de Celery de forma asíncrona + task = process_pedimento_completo_request.delay(pedimento_dict) + # Puedes devolver el ID de la tarea para consultar el estado después + return {"status": "submitted", "detail": "La solicitud de descarga del pedimento completo ha sido enviada.", "task_id": task.id} diff --git a/api/api_v2/modules/pedimentos/schema.py b/api/api_v2/modules/pedimentos/schema.py deleted file mode 100644 index 24e4efc..0000000 --- a/api/api_v2/modules/pedimentos/schema.py +++ /dev/null @@ -1,24 +0,0 @@ - -from typing import Optional -from pydantic import BaseModel - - -class PedimentoBaseSchema(BaseModel): - id: str - pedimento: str - pedimento_app: str - aduana: str - patente: str - regimen: str - organizacion: str - clave_pedimento: str - fecha_pago: Optional[str] - fecha_inicio: Optional[str] - fecha_fin: Optional[str] - alerta: Optional[bool] - agente_aduanal: Optional[str] - curp_apoderado: Optional[str] - importe_total: Optional[float] - saldo_disponible: Optional[float] - importe_pedimento: Optional[float] - existe_expediente: Optional[bool] diff --git a/api/api_v2/modules/pedimentos/schemas.py b/api/api_v2/modules/pedimentos/schemas.py new file mode 100644 index 0000000..8d49a3f --- /dev/null +++ b/api/api_v2/modules/pedimentos/schemas.py @@ -0,0 +1,95 @@ +from typing import Optional, Union, Dict, Any +from uuid import UUID +from datetime import datetime +# CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione +from pydantic import BaseModel, Field, validator +from schemas.CredencialSchema import CredencialBaseSchema + + + + +class PedimentoBaseSchema(BaseModel): + id: str = Field(..., description="ID único del pedimento") + pedimento: str = Field(..., description="Número de pedimento") + pedimento_app: str = Field(..., description="Número de pedimento en la aplicación") + aduana: str = Field(..., description="Aduana asociada al pedimento") + patente: str = Field(..., description="Patente asociada al pedimento") + numero_operacion: str = Field(None, description="Número de operación del pedimento") + # Usamos Field(None, ...) para campos Optional + regimen: Optional[str] = Field(None, description="Régimen aduanero del pedimento") + organizacion: str = Field(..., description="Organización asociada al pedimento") + clave_pedimento: Optional[str] = Field(None, description="Clave del pedimento") + + + fecha_pago: Optional[str] = Field(None, description="Fecha de pago del pedimento") + fecha_inicio: Optional[str] = Field(None, description="Fecha de inicio del pedimento") + fecha_fin: Optional[str] = Field(None, description="Fecha de fin del pedimento") + alerta: Optional[bool] = Field(None, description="Indica si hay alerta en el pedimento") + agente_aduanal: Optional[str] = Field(None, description="Agente aduanal asociado al pedimento") + curp_apoderado: Optional[str] = Field(None, description="CURP del apoderado") + + importe_total: Optional[float] = Field(None, description="Importe total del pedimento") + saldo_disponible: Optional[float] = Field(None, description="Saldo disponible del pedimento") + importe_pedimento: Optional[float] = Field(None, description="Importe del pedimento") + existe_expediente: Optional[bool] = Field(None, description="Indica si existe expediente") + + # Validadores de Pydantic v1 (usando @validator) + + @validator('id') + def validate_id(cls, v): + if not v or not isinstance(v, str): + raise ValueError('id must be a non-empty string') + return v + + @validator('pedimento') + def validate_pedimento(cls, v): + if not v or not isinstance(v, str): + raise ValueError('pedimento must be a non-empty string') + return v + + @validator('pedimento_app') + def validate_pedimento_app(cls, v): + if not v or not isinstance(v, str): + raise ValueError('pedimento_app must be a non-empty string') + return v + + @validator('aduana') + def validate_aduana(cls, v): + if not v or not isinstance(v, str): + raise ValueError('aduana must be a non-empty string') + return v + + @validator('patente') + def validate_patente(cls, v): + if not v or not isinstance(v, str): + raise ValueError('patente must be a non-empty string') + return v + + @validator('organizacion') + def validate_organizacion(cls, v): + if not v or not isinstance(v, str): + raise ValueError('organizacion must be a non-empty string') + return v + + # Validadores combinados para campos opcionales + @validator('fecha_pago', 'fecha_inicio', 'fecha_fin', 'agente_aduanal', 'curp_apoderado', 'regimen', 'clave_pedimento', pre=True) + def validate_optional_strings(cls, v): + if v is not None and not isinstance(v, str): + raise ValueError('Campo opcional debe ser string o None') + return v + + @validator('alerta', 'existe_expediente', pre=True) + def validate_optional_bools(cls, v): + if v is not None and not isinstance(v, bool): + raise ValueError('Campo opcional debe ser booleano o None') + return v + + @validator('importe_total', 'saldo_disponible', 'importe_pedimento', pre=True) + def validate_optional_numbers(cls, v): + if v is not None and not isinstance(v, (float, int)): + raise ValueError('Campo opcional debe ser numérico o None') + + +class PedimentoCompletoRequestSchema(BaseModel): + pedimento: PedimentoBaseSchema + credencial: CredencialBaseSchema \ No newline at end of file diff --git a/api/api_v2/modules/pedimentos/service.py b/api/api_v2/modules/pedimentos/service.py deleted file mode 100644 index e69de29..0000000 diff --git a/api/api_v2/modules/pedimentos/services.py b/api/api_v2/modules/pedimentos/services.py new file mode 100644 index 0000000..3a68dd9 --- /dev/null +++ b/api/api_v2/modules/pedimentos/services.py @@ -0,0 +1,346 @@ +"""Servicios para el manejo de pedimentos completos.""" + +import logging +from typing import Any, Dict, List, Optional + +from fastapi import HTTPException +# Importar controladores (nota: el archivo se llama controllers,py con coma) +import sys +import os +sys.path.append(os.path.dirname(__file__)) + +from .controllers import pedimento_rest_controller, pedimento_vu_controller, pedimento_xml_scraper + + +from utils.helpers import soap_error + +# Logger configurado para el módulo +logger = logging.getLogger("app.api") + + +async def consume_ws_get_pedimento_completo(**kwargs) -> Dict[str, Any]: + """ + Consume el servicio web para obtener pedimento completo. + + Args: + **kwargs: Debe contener 'credencial' y 'pedimento' con sus respectivos campos + + Returns: + Dict con 'documento' y 'xml_content' + + Raises: + HTTPException: Si hay errores en la petición o datos faltantes + """ + # Validar datos de entrada + credencial = kwargs.get('credencial', {}) + pedimento_data = kwargs.get('pedimento', {}) + + if not credencial.get('user') or not credencial.get('password'): + raise HTTPException(status_code=400, detail="Credenciales incompletas") + + required_fields = ['aduana', 'patente', 'pedimento', 'id', 'organizacion'] + missing_fields = [f for f in required_fields if not pedimento_data.get(f)] + if missing_fields: + raise HTTPException( + status_code=400, + detail=f"Datos de pedimento incompletos: {missing_fields}" + ) + + logger.info(f"Iniciando consulta SOAP para pedimento: {pedimento_data.get('pedimento')}") + + try: + # Generar XML SOAP + soap_xml = pedimento_vu_controller.generate_pedimento_completo_template( + username=credencial.get('user'), + password=credencial.get('password'), + aduana=pedimento_data.get('aduana'), + patente=pedimento_data.get('patente'), + pedimento=pedimento_data.get('pedimento') + ) + + soap_headers = { + 'Content-Type': 'text/xml; charset=utf-8' + } + + # Realizar petición SOAP + soap_response = await pedimento_vu_controller.make_request_async( + "ventanilla-ws-pedimentos/ConsultarPedimentoCompletoService?wsdl", + data=soap_xml, + headers=soap_headers + ) + + if not soap_response: + raise HTTPException(status_code=500, detail="No se recibió respuesta del servicio SOAP") + + if soap_error(soap_response): + logger.error(f"Error en respuesta SOAP: {soap_response.text if hasattr(soap_response, 'text') else 'Sin detalles'}") + raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP") + + # Extraer datos del XML + try: + data = pedimento_xml_scraper.extract_data(soap_response.text) + except Exception as e: + logger.error(f"Error al extraer datos XML: {e}") + raise HTTPException(status_code=500, detail="Error al procesar respuesta XML") + + # Generar nombre de archivo + file_name = f"vu_PC_{pedimento_data.get('pedimento_app', 'unknown')}.xml" + + # Enviar documento + try: + document_response = await pedimento_rest_controller.post_document( + soap_response=soap_response, + organizacion=pedimento_data.get('organizacion'), + pedimento=pedimento_data.get('id'), + file_name=file_name, + document_type=2, + ) + except Exception as e: + logger.error(f"Error al enviar documento: {e}") + raise HTTPException(status_code=500, detail="Error al guardar documento") + + # Enriquecer datos con información del pedimento + data['organizacion'] = pedimento_data.get('organizacion') + data['id'] = pedimento_data.get('id') + + logger.info(f"Pedimento completo procesado exitosamente: {pedimento_data.get('pedimento')}") + + return { + "documento": document_response, + "xml_content": data + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error inesperado en consume_ws_get_pedimento_completo: {e}") + raise HTTPException(status_code=500, detail=f"Error interno: {str(e)}") + +async def put_pedimento_data(**kwargs) -> Dict[str, Any]: + """ + Actualiza la información del pedimento en el sistema REST. + + Args: + **kwargs: Datos de credencial y pedimento + + Returns: + Dict con resultados del procesamiento + + Raises: + HTTPException: Si hay errores críticos en el procesamiento + """ + # Inicializar variables de respuesta + result = { + "documento": None, + "pedimento_actualizado": None, + "coves_procesados": None, + "coves_error": None, + "edocuments_procesados": None, + "edocuments_error": None, + "xml_content": None + + } + + # Obtener datos del servicio web + try: + ws_data = await consume_ws_get_pedimento_completo(**kwargs) + result["documento"] = ws_data.get("documento", None) + xml_content = ws_data.get('xml_content', {}) + result["xml_content"] = xml_content + + if not xml_content: + logger.warning("No se obtuvo contenido XML del servicio web") + return result + + except HTTPException: + raise # Re-lanzar HTTPExceptions + except Exception as e: + logger.error(f"Error inesperado al consumir servicio web: {e}") + raise HTTPException(status_code=500, detail=f"Error al obtener datos del pedimento: {str(e)}") + + # Actualizar información del pedimento (crítico) + try: + result["pedimento_actualizado"] = await _update_pedimento_info(kwargs, xml_content) + except Exception as e: + logger.error(f"Error crítico al actualizar pedimento: {e}") + raise HTTPException(status_code=500, detail=f"Error al actualizar el pedimento: {str(e)}") + + # Procesar COVEs (no crítico) + try: + result["coves_procesados"] = await _process_coves_safely(kwargs, xml_content) + except Exception as e: + logger.warning(f"Error al procesar COVEs: {e}") + result["coves_error"] = str(e) + + # Procesar documentos digitalizados (no crítico) + try: + result["edocuments_procesados"] = await _process_edocuments_safely(kwargs, xml_content) + except Exception as e: + logger.warning(f"Error al procesar documentos digitalizados: {e}") + result["edocuments_error"] = str(e) + + logger.info("Procesamiento de pedimento completo finalizado") + return result + + +async def _update_pedimento_info(kwargs: Dict[str, Any], xml_content: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """ + Actualiza la información del pedimento. + + Args: + kwargs: Datos originales + xml_content: Contenido XML extraído + + Returns: + Respuesta del servicio de actualización + """ + if not xml_content: + logger.info("No hay contenido XML para actualizar el pedimento") + return None + + # Preparar datos para actualización (excluir identificadores_ed) + update_content = {k: v for k, v in xml_content.items() if k != 'identificadores_ed'} + update_content['existe_expediente'] = True + + pedimento_id = kwargs.get('pedimento', {}).get('id') + if not pedimento_id: + raise ValueError("ID de pedimento no encontrado para actualización") + + response = await pedimento_rest_controller.put_pedimento(pedimento_id, update_content) + logger.info(f"Pedimento {pedimento_id} actualizado exitosamente") + return response + + +async def _process_coves_safely(kwargs: Dict[str, Any], xml_content: Dict[str, Any]) -> Optional[List[Dict[str, Any]]]: + """ + Procesa los COVEs de manera segura. + """ + coves = xml_content.get('coves', []) + if not coves: + logger.info("No se encontraron COVEs para procesar") + return None + + logger.info(f"Procesando {len(coves)} COVEs encontrados") + result = await _post_coves(kwargs.get('pedimento', {}), coves) + logger.info(f"Se procesaron exitosamente {len(result)} COVEs") + return result + + +async def _process_edocuments_safely(kwargs: Dict[str, Any], xml_content: Dict[str, Any]) -> Optional[List[Dict[str, Any]]]: + """ + Procesa los documentos digitalizados de manera segura. + """ + identificadores_ed = xml_content.get('identificadores_ed', []) + if not identificadores_ed: + logger.info("No se encontraron documentos digitalizados (identificadores ED)") + return None + + logger.info(f"Procesando {len(identificadores_ed)} documentos digitalizados...") + result = await _post_edocuments(kwargs.get('pedimento', {}), identificadores_ed) + logger.info(f"Se procesaron exitosamente {len(result)} documentos digitalizados") + return result + +async def _post_coves(pedimento_data: Dict[str, Any], coves: List[str]) -> List[Dict[str, Any]]: + """ + Envía COVEs al sistema REST. + + Args: + pedimento_data: Datos del pedimento + coves: Lista de números de COVE + + Returns: + Lista de respuestas exitosas + + Raises: + HTTPException: Si no se pudo procesar ningún COVE + """ + if not coves: + return [] + + responses = [] + errors = [] + + for cove in coves: + document_data = { + 'numero_cove': cove, + 'organizacion': pedimento_data.get('organizacion'), + 'pedimento': pedimento_data.get('id') + } + + try: + response = await pedimento_rest_controller.post_cove(document_data) + if response: + responses.append(response) + logger.debug(f"COVE {cove} procesado exitosamente") + except Exception as e: + error_msg = f"Error al procesar COVE {cove}: {str(e)}" + logger.warning(error_msg) + errors.append(error_msg) + + if not responses and coves: + error_detail = f"No se pudo procesar ningún COVE. Errores: {'; '.join(errors)}" + logger.error(error_detail) + raise HTTPException(status_code=500, detail=error_detail) + + if errors: + logger.warning(f"Se procesaron {len(responses)}/{len(coves)} COVEs. Errores: {len(errors)}") + + return responses + + +async def _post_edocuments(pedimento_data: Dict[str, Any], identificadores_ed: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Envía documentos digitalizados al sistema REST. + + Args: + pedimento_data: Datos del pedimento + identificadores_ed: Lista de identificadores de documentos + + Returns: + Lista de respuestas exitosas + + Raises: + HTTPException: Si no se pudo procesar ningún documento + """ + if not identificadores_ed: + return [] + + responses = [] + errors = [] + + for identificador in identificadores_ed: + try: + # Validar campos requeridos + if not identificador.get('clave') or not identificador.get('complemento1'): + logger.warning(f"Documento con datos incompletos omitido: {identificador}") + continue + + document_data = { + 'clave': identificador.get('clave'), + 'descripcion': identificador.get('descripcion', ''), + 'numero_edocument': identificador.get('complemento1'), + 'organizacion': pedimento_data.get('organizacion'), + 'pedimento': pedimento_data.get('id') + } + + response = await pedimento_rest_controller.post_edocument(document_data) + if response: + responses.append(response) + logger.debug(f"Documento {identificador.get('clave')} procesado exitosamente") + + except Exception as e: + error_msg = f"Error al procesar documento {identificador.get('clave', 'unknown')}: {str(e)}" + logger.warning(error_msg) + errors.append(error_msg) + + if not responses and identificadores_ed: + error_detail = f"No se pudo procesar ningún documento digitalizado. Errores: {'; '.join(errors)}" + logger.error(error_detail) + raise HTTPException(status_code=500, detail=error_detail) + + if errors: + logger.warning(f"Se procesaron {len(responses)}/{len(identificadores_ed)} documentos. Errores: {len(errors)}") + + return responses + + diff --git a/api/api_v2/modules/pedimentos/tasks.py b/api/api_v2/modules/pedimentos/tasks.py index e69de29..5516838 100644 --- a/api/api_v2/modules/pedimentos/tasks.py +++ b/api/api_v2/modules/pedimentos/tasks.py @@ -0,0 +1,26 @@ +from celery_app import celery_app + +from .services import put_pedimento_data +import asyncio # Necesario para ejecutar funciones async dentro de Celery + + + +@celery_app.task(bind=True) +def process_pedimento_completo_request(self, pedimento_data: dict): + """ + Tarea de Celery para procesar la descarga de un solo documento edoc. + """ + try: + # Ejecutar la función asíncrona dentro del hilo síncrono de Celery + loop = asyncio.get_event_loop() + result = loop.run_until_complete(put_pedimento_data(**pedimento_data)) + + return {"status": "success", "result": result} + except Exception as e: + # Manejo de errores + self.update_state( + state='FAILURE', + meta={'exc_type': type(e).__name__, 'exc_message': str(e)} + ) + # Es crucial volver a lanzar la excepción para que Celery la marque como fallida + raise e \ No newline at end of file diff --git a/api/api_v2/modules/remesas/__init__.py b/api/api_v2/modules/remesas/__init__.py new file mode 100644 index 0000000..c19c78e --- /dev/null +++ b/api/api_v2/modules/remesas/__init__.py @@ -0,0 +1 @@ +from . import tasks \ No newline at end of file diff --git a/api/api_v2/modules/remesas/controllers.py b/api/api_v2/modules/remesas/controllers.py new file mode 100644 index 0000000..01aadbb --- /dev/null +++ b/api/api_v2/modules/remesas/controllers.py @@ -0,0 +1,134 @@ +from controllers.RESTController import APIRESTController +from controllers.SOAPController import VUCEMController +from typing import List, Dict, Any +import xml.etree.ElementTree as ET +from dataclasses import dataclass +from typing import List, Dict + +class RemesaController(APIRESTController): + def __init__(self): + super().__init__() + + async def post_cove(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + Método para enviar un número de COVE a la API. + + Args: + data: Diccionario con los datos del COVE a enviar + """ + return await self._make_request_async('POST', 'customs/coves/', data=data) + + +class RemesaVUController(VUCEMController): + def __init__(self): + super().__init__() # Implementación específica para Coves VU + + def generate_remesas_template(self, username: str, password: str, aduana: str, patente: str, numero_operacion: str, pedimento: str) -> str: + """ + Genera el template SOAP para consultar remesas + + Args: + username: Usuario de VUCEM + password: Contraseña de VUCEM + aduana: Código de aduana + patente: Número de patente + + Returns: + str: Template SOAP XML completo + """ + soap_template = f''' + + + + + {username} + {password} + + + + + + {numero_operacion} + + {aduana} + {patente} + {pedimento} + + + + ''' + return soap_template + + +# Pedimento Completo +class RemesaXMLScraper: + """ + Controlador para scrapear XML de consultar remesas. + Extrae todos los comprobantesVE, junto con remesaAgente y remesaSA. + """ + + namespaces = { + "S": "http://schemas.xmlsoap.org/soap/envelope/", + "ns2": "http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta", + "ns3": "http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarremesas", + } + + def extract_remesas(self, xml_content: str) -> List[Dict[str, str]]: + """ + Extrae todos los comprobanteVE de un XML de remesas. + + Args: + xml_content: Contenido del XML en string. + + Returns: + Lista de diccionarios con comprobanteVE, remesaAgente y remesaSA. + """ + try: + root = ET.fromstring(xml_content) + + remesas = [] + for remesa in root.findall(".//ns3:remesas", self.namespaces): + comprobante = remesa.find("ns3:comprobanteVE", self.namespaces) + agente = remesa.find("ns3:remesaAgente", self.namespaces) + sa = remesa.find("ns3:remesaSA", self.namespaces) + + remesas.append({ + "comprobanteVE": comprobante.text if comprobante is not None else None, + "remesaAgente": agente.text if agente is not None else None, + "remesaSA": sa.text if sa is not None else None + }) + + return remesas + + except ET.ParseError as e: + print(f"Error al parsear XML: {e}") + return [] + except Exception as e: + print(f"Error inesperado: {e}") + return [] + + def extract_data(self, xml_content: str) -> Dict[str, Any]: + """ + Método de compatibilidad que llama a extract_remesas y devuelve un dict. + + Args: + xml_content: Contenido del XML en string. + + Returns: + Dict con los datos extraídos del XML. + """ + remesas_data = self.extract_remesas(xml_content) + return { + 'coves': remesas_data, + 'total_remesas': len(remesas_data) + } + + +remesa_rest_controller = RemesaController() +remesa_vu_controller = RemesaVUController() +remesa_xml_scraper = RemesaXMLScraper() + + diff --git a/api/api_v2/modules/remesas/routers.py b/api/api_v2/modules/remesas/routers.py new file mode 100644 index 0000000..e05ee2c --- /dev/null +++ b/api/api_v2/modules/remesas/routers.py @@ -0,0 +1,21 @@ +from fastapi import APIRouter, BackgroundTasks, status, HTTPException +from fastapi.responses import JSONResponse +from .schemas import RemesaBaseSchema +from .tasks import process_remesa_request +import logging +logger = logging.getLogger("app.api") + +router = APIRouter() + +@router.post("/services/remesas/", status_code=status.HTTP_202_ACCEPTED) +async def download_remesa(remesa_request: RemesaBaseSchema): + """ + Endpoint para iniciar la descarga completa de un pedimento. + """ + remesa_dict = remesa_request.model_dump() + + + # Ejecuta la tarea de Celery de forma asíncrona + task = process_remesa_request.delay(remesa_dict) + # Puedes devolver el ID de la tarea para consultar el estado después + return {"status": "submitted", "detail": "La solicitud de descarga de la remesa ha sido enviada.", "task_id": task.id} diff --git a/api/api_v2/modules/remesas/schemas.py b/api/api_v2/modules/remesas/schemas.py new file mode 100644 index 0000000..c3542bb --- /dev/null +++ b/api/api_v2/modules/remesas/schemas.py @@ -0,0 +1,15 @@ +from typing import Optional, Union, Dict, Any +from uuid import UUID +from datetime import datetime +# CORRECCIÓN CLAVE: Se importa el 'validator' para que el decorador funcione +from pydantic import BaseModel, Field, validator +from schemas.CredencialSchema import CredencialBaseSchema +from api.api_v2.modules.pedimentos.schemas import PedimentoBaseSchema + + + + +class RemesaBaseSchema(BaseModel): + pedimento: PedimentoBaseSchema + credencial: CredencialBaseSchema + diff --git a/api/api_v2/modules/remesas/services.py b/api/api_v2/modules/remesas/services.py new file mode 100644 index 0000000..0549280 --- /dev/null +++ b/api/api_v2/modules/remesas/services.py @@ -0,0 +1,242 @@ +"""Servicios para el manejo de pedimentos completos.""" + +import logging +from typing import Any, Dict, List, Optional + +from fastapi import HTTPException +# Importar controladores (nota: el archivo se llama controllers,py con coma) +import sys +import os +sys.path.append(os.path.dirname(__file__)) + +from .controllers import remesa_rest_controller, remesa_vu_controller, remesa_xml_scraper + + +from utils.helpers import soap_error + +# Logger configurado para el módulo +logger = logging.getLogger("app.api") + + +async def obtener_remesa(**kwargs) -> Dict[str, Any]: + """ + Consume el servicio web para obtener pedimento completo. + + Args: + **kwargs: Debe contener 'credencial' y 'pedimento' con sus respectivos campos + + Returns: + Dict con 'documento' y 'xml_content' + + Raises: + HTTPException: Si hay errores en la petición o datos faltantes + """ + # Validar datos de entrada + credencial = kwargs.get('credencial', {}) + pedimento_data = kwargs.get('pedimento', {}) + + if not credencial.get('user') or not credencial.get('password'): + raise HTTPException(status_code=400, detail="Credenciales incompletas") + + required_fields = ['aduana', 'patente', 'pedimento', 'id', 'organizacion'] + missing_fields = [f for f in required_fields if not pedimento_data.get(f)] + if missing_fields: + raise HTTPException( + status_code=400, + detail=f"Datos de pedimento incompletos: {missing_fields}" + ) + + logger.info(f"Iniciando consulta SOAP para pedimento: {pedimento_data.get('pedimento')}") + + try: + # Generar XML SOAP + soap_xml = remesa_vu_controller.generate_remesas_template( + username=credencial.get('user'), + password=credencial.get('password'), + aduana=pedimento_data.get('aduana'), + patente=pedimento_data.get('patente'), + pedimento=pedimento_data.get('pedimento'), + numero_operacion=pedimento_data.get('numero_operacion', '') + ) + + soap_headers = { + 'Content-Type': 'text/xml; charset=utf-8' + } + + # Realizar petición SOAP + soap_response = await remesa_vu_controller.make_request_async( + "ventanilla-ws-pedimentos/ConsultarRemesasService?wsdl", + data=soap_xml, + headers=soap_headers + ) + + if not soap_response: + raise HTTPException(status_code=500, detail="No se recibió respuesta del servicio SOAP") + + if soap_error(soap_response): + logger.error(f"Error en respuesta SOAP: {soap_response.text if hasattr(soap_response, 'text') else 'Sin detalles'}") + raise HTTPException(status_code=500, detail="Error en la respuesta del servicio SOAP") + + + # Extraer datos del XML + try: + remesas_data = remesa_xml_scraper.extract_remesas(soap_response.text) + + except Exception as e: + logger.error(f"Error al extraer datos XML: {e}") + raise HTTPException(status_code=500, detail="Error al procesar respuesta XML") + + # Generar nombre de archivo + file_name = f"vu_RM_{pedimento_data.get('pedimento_app', 'unknown')}.xml" + + # Enviar documento + try: + document_response = await remesa_rest_controller.post_document( + soap_response=soap_response, + organizacion=pedimento_data.get('organizacion'), + pedimento=pedimento_data.get('id'), + file_name=file_name, + document_type=2, + ) + except Exception as e: + logger.error(f"Error al enviar documento: {e}") + raise HTTPException(status_code=500, detail="Error al guardar documento") + + + logger.info(f"Remesa procesada exitosamente: {pedimento_data.get('pedimento')}") + + return { + "documento": document_response, + "xml_content": remesas_data + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error inesperado en consume_ws_get_pedimento_completo: {e}") + raise HTTPException(status_code=500, detail=f"Error interno: {str(e)}") + +async def post_remesa_data(**kwargs) -> Dict[str, Any]: + """ + Actualiza la información del pedimento en el sistema REST. + + Args: + **kwargs: Datos de credencial y pedimento + + Returns: + Dict con resultados del procesamiento + + Raises: + HTTPException: Si hay errores críticos en el procesamiento + """ + # Inicializar variables de respuesta + result = { + "documento": None, + "coves_procesados": None, + "coves_error": None, + "xml_content": None + + } + + # Obtener datos del servicio web + try: + ws_data = await obtener_remesa(**kwargs) + result["documento"] = ws_data.get("documento", None) + xml_content = ws_data.get('xml_content', {}) + result["xml_content"] = xml_content + + if not xml_content: + logger.warning("No se obtuvo contenido XML del servicio web") + return result + + except HTTPException: + raise # Re-lanzar HTTPExceptions + except Exception as e: + logger.error(f"Error inesperado al consumir servicio web: {e}") + raise HTTPException(status_code=500, detail=f"Error al obtener datos del pedimento: {str(e)}") + + + + # Procesar COVEs (crítico) + try: + # print(xml_content.get('coves', [])) + result["coves_procesados"] = await _process_coves_safely(kwargs, xml_content) + except Exception as e: + logger.warning(f"Error al procesar COVEs: {e}") + result["coves_error"] = str(e) + + + logger.info("Procesamiento de pedimento completo finalizado") + return result + + + +async def _process_coves_safely(kwargs: Dict[str, Any], xml_content) -> Optional[List[Dict[str, Any]]]: + """ + Procesa los COVEs de manera segura. + """ + coves = xml_content + if not coves: + logger.info("No se encontraron COVEs para procesar") + return None + + logger.info(f"Procesando {len(coves)} COVEs encontrados") + result = await _post_coves(kwargs.get('pedimento', {}), coves) + logger.info(f"Se procesaron exitosamente {len(result)} COVEs") + return result + + +async def _post_coves(pedimento_data: Dict[str, Any], coves: List[Dict[str, str]]) -> List[Dict[str, Any]]: + """ + Envía COVEs al sistema REST. + + Args: + pedimento_data: Datos del pedimento + coves: Lista de diccionarios con datos de COVE (comprobanteVE, remesaAgente, remesaSA) + + Returns: + Lista de respuestas exitosas + + Raises: + HTTPException: Si no se pudo procesar ningún COVE + """ + if not coves: + return [] + + responses = [] + errors = [] + + for cove in coves: + # Extraer el número de COVE del diccionario + numero_cove = cove.get('comprobanteVE') + if not numero_cove: + logger.warning(f"COVE sin comprobanteVE encontrado: {cove}") + continue + + document_data = { + 'numero_cove': numero_cove, + 'organizacion': pedimento_data.get('organizacion'), + 'pedimento': pedimento_data.get('id') + } + + try: + response = await remesa_rest_controller.post_cove(document_data) + if response: + responses.append(response) + logger.debug(f"COVE {numero_cove} procesado exitosamente") + except Exception as e: + error_msg = f"Error al procesar COVE {numero_cove}: {str(e)}" + logger.warning(error_msg) + errors.append(error_msg) + + if not responses and coves: + error_detail = f"No se pudo procesar ningún COVE. Errores: {'; '.join(errors)}" + logger.error(error_detail) + raise HTTPException(status_code=500, detail=error_detail) + + if errors: + logger.warning(f"Se procesaron {len(responses)}/{len(coves)} COVEs. Errores: {len(errors)}") + + return responses + + diff --git a/api/api_v2/modules/remesas/tasks.py b/api/api_v2/modules/remesas/tasks.py new file mode 100644 index 0000000..9f2754d --- /dev/null +++ b/api/api_v2/modules/remesas/tasks.py @@ -0,0 +1,27 @@ +from celery import Celery +from celery_app import celery_app +import asyncio +import logging +from typing import Dict, Any +from contextlib import asynccontextmanager + +from .services import post_remesa_data +from api.api_v2.modules.tasks.tasks import run_async_task + + +@celery_app.task +def process_remesa_request(remesa_request: Dict[str, Any]) -> Dict[str, Any]: + """ + Tarea de Celery para procesar la solicitud de acuse. + + Args: + acuse_request: Diccionario con los datos de la solicitud de acuse. + + Returns: + Diccionario con la respuesta del acuse. + """ + loop = asyncio.get_event_loop() + remesa_response = loop.run_until_complete(post_remesa_data(**remesa_request)) + + return {"status": "processed", "data": remesa_response} + diff --git a/api/api_v2/modules/partidas/service.py b/api/api_v2/modules/remesas/tests.py similarity index 100% rename from api/api_v2/modules/partidas/service.py rename to api/api_v2/modules/remesas/tests.py diff --git a/api/api_v2/modules/pedimentos/router.py b/api/api_v2/modules/tasks/__init__.py similarity index 100% rename from api/api_v2/modules/pedimentos/router.py rename to api/api_v2/modules/tasks/__init__.py diff --git a/api/api_v2/modules/tasks/routers.py b/api/api_v2/modules/tasks/routers.py index e69de29..eb5f062 100644 --- a/api/api_v2/modules/tasks/routers.py +++ b/api/api_v2/modules/tasks/routers.py @@ -0,0 +1,149 @@ +from fastapi import FastAPI +from fastapi.routing import APIRouter +from celery_app import celery_app +from fastapi import APIRouter, HTTPException, BackgroundTasks +from datetime import datetime +from fastapi.responses import JSONResponse +import logging +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.get("/async/task-status/{task_id}") +async def get_task_status(task_id: str): + """ + Consulta el estado de una tarea agendada. + + Args: + task_id: ID de la tarea a consultar + + Returns: + JSONResponse con el estado actual de la tarea + + Raises: + HTTPException: Si la tarea no existe o hay errores + """ + try: + # Obtener el resultado de la tarea desde Celery + task_result = celery_app.AsyncResult(task_id) + + if not task_result: + raise HTTPException(status_code=404, detail="Tarea no encontrada") + + # Preparar respuesta según el estado + response_data = { + "task_id": task_id, + "status": task_result.status, + "timestamp": datetime.utcnow().isoformat() + } + + if task_result.status == 'PENDING': + response_data.update({ + "message": "La tarea está pendiente de procesamiento", + "progress": 0 + }) + elif task_result.status == 'PROGRESS': + meta = task_result.info + response_data.update({ + "message": f"Procesando: {meta.get('status', 'En progreso')}", + "progress": meta.get('progress', 50), + "current_step": meta.get('status') + }) + elif task_result.status == 'SUCCESS': + response_data.update({ + "message": "Tarea completada exitosamente", + "progress": 100, + "result": task_result.result + }) + elif task_result.status == 'FAILURE': + response_data.update({ + "message": f"Error en la tarea: {str(task_result.info)}", + "progress": 0, + "error": str(task_result.info) + }) + else: + response_data.update({ + "message": f"Estado desconocido: {task_result.status}", + "progress": 0 + }) + + # Determinar código de estado HTTP + status_code = 200 + if task_result.status == 'FAILURE': + status_code = 500 + elif task_result.status in ['PENDING', 'PROGRESS']: + status_code = 202 + + return JSONResponse(content=response_data, status_code=status_code) + + except Exception as e: + logger.error(f"Error al consultar estado de tarea {task_id}: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al consultar el estado de la tarea: {str(e)}" + ) + +@router.get("/async/tasks/active") +async def get_active_tasks(): + """ + Lista todas las tareas activas en el sistema. + + Returns: + JSONResponse con la lista de tareas activas + """ + try: + # Obtener tareas activas desde Celery + inspect = celery_app.control.inspect() + active_tasks = inspect.active() + scheduled_tasks = inspect.scheduled() + + response_data = { + "active_tasks": active_tasks or {}, + "scheduled_tasks": scheduled_tasks or {}, + "timestamp": datetime.utcnow().isoformat() + } + + return JSONResponse(content=response_data, status_code=200) + + except Exception as e: + logger.error(f"Error al obtener tareas activas: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al obtener tareas activas: {str(e)}" + ) + +@router.delete("/async/task/{task_id}") +async def cancel_task(task_id: str): + """ + Cancela una tarea agendada. + + Args: + task_id: ID de la tarea a cancelar + + Returns: + JSONResponse confirmando la cancelación + + Raises: + HTTPException: Si hay errores al cancelar + """ + try: + # Revocar la tarea + celery_app.control.revoke(task_id, terminate=True) + + response_data = { + "success": True, + "message": f"Tarea {task_id} cancelada exitosamente", + "task_id": task_id, + "timestamp": datetime.utcnow().isoformat() + } + + logger.info(f"Tarea cancelada: {task_id}") + return JSONResponse(content=response_data, status_code=200) + + except Exception as e: + logger.error(f"Error al cancelar tarea {task_id}: {e}") + raise HTTPException( + status_code=500, + detail=f"Error al cancelar la tarea: {str(e)}" + ) \ No newline at end of file diff --git a/api/api_v2/modules/tasks/schemas.py b/api/api_v2/modules/tasks/schemas.py index e69de29..d91e96c 100644 --- a/api/api_v2/modules/tasks/schemas.py +++ b/api/api_v2/modules/tasks/schemas.py @@ -0,0 +1,16 @@ +from pydantic import BaseModel + + +class TaskBaseModelSchema(BaseModel): + task_id: str + +class TaskDetailInfoSchema(TaskBaseModelSchema): + status: str + result: str | None = None + +class TaskResultSchema(BaseModel): + active_tasks: list[str] + scheduled_tasks: list[str] + completed_tasks: list[str] + failed_tasks: list[str] + diff --git a/api/api_v2/modules/tasks/tasks.py b/api/api_v2/modules/tasks/tasks.py index e69de29..107a44c 100644 --- a/api/api_v2/modules/tasks/tasks.py +++ b/api/api_v2/modules/tasks/tasks.py @@ -0,0 +1,19 @@ +from celery import Celery +from celery_app import celery_app +import asyncio +import logging +from typing import Dict, Any +from contextlib import asynccontextmanager + + +def run_async_task(async_func, *args, **kwargs): + """Helper function to run async functions in Celery tasks""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(async_func(*args, **kwargs)) + finally: + loop.close() + + + diff --git a/celery_app.py b/celery_app.py index 3e04c76..1eaa713 100644 --- a/celery_app.py +++ b/celery_app.py @@ -24,4 +24,11 @@ celery_app.conf.update( ) # Autodiscovery of tasks -celery_app.autodiscover_tasks() \ No newline at end of file +celery_app.autodiscover_tasks() + +from api.api_v2.modules.acuses.tasks import process_acuse_request +from api.api_v2.modules.coves.tasks import process_cove_request, process_acuse_cove_request +from api.api_v2.modules.edocs.tasks import process_edoc_download_request, process_edocs_masivo_download_request +from api.api_v2.modules.pedimentos.tasks import process_pedimento_completo_request +from api.api_v2.modules.partidas.tasks import process_partida_request +from api.api_v2.modules.remesas.tasks import process_remesa_request \ No newline at end of file diff --git a/controllers/RESTController.py b/controllers/RESTController.py index 0a8ca9c..0f67a6b 100644 --- a/controllers/RESTController.py +++ b/controllers/RESTController.py @@ -6,9 +6,206 @@ from typing import List, Dict, Any import os import httpx from core.config import settings +from dataclasses import dataclass logger = logging.getLogger(__name__) + +# para Api_v2 +class APIRESTController: + def __init__(self): + self.base_url = settings.API_URL # URL base de la API + self.headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Token {settings.API_TOKEN}' # Token de autenticación + } + self.timeout = 5 # Timeout para las peticiones a la API + + def _make_request(self, method, endpoint, data=None): + """ + Método para hacer peticiones a la API. + """ + url = f"{self.base_url}/{endpoint}" + logger.info(f"_make_request: method={method}, url={url}, data={data}") + try: + response = requests.request(method, url, json=data, headers=self.headers, timeout=self.timeout) + logger.info(f"_make_request: response.status_code={response.status_code}") + logger.info(f"_make_request: response.text={response.text}") + response.raise_for_status() # Lanza un error si la respuesta no es 200 + result = response.json() + logger.info(f"_make_request: result={result}") + return result # Retorna el JSON de la respuesta + except requests.RequestException as e: + logger.error(f"_make_request: Exception: {e}") + if hasattr(e, 'response') and e.response is not None: + logger.error(f"_make_request: Status code del error: {e.response.status_code}") + logger.error(f"_make_request: Contenido del error: {e.response.text}") + return None + + async def _make_request_async(self, method: str, endpoint: str, data=None, return_bytes: bool = False): + """ + Método asíncrono para hacer peticiones a la API usando httpx. + """ + url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/') }" + + logger.warning(f"Realizando petición {method} a {url}") + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + logger.info(f"Haciendo petición {method} a {url}") + if method.upper() == 'GET': + response = await client.get(url, headers=self.headers) + elif method.upper() == 'POST': + response = await client.post(url, json=data, headers=self.headers) + elif method.upper() == 'PUT': + response = await client.put(url, json=data, headers=self.headers) + elif method.upper() == 'DELETE': + response = await client.delete(url, headers=self.headers) + else: + raise ValueError(f"Método HTTP no soportado: {method}") + logger.info(f"_make_request_async: response.status_code={response.status_code}") + # No loggear response.text si se esperan datos binarios + if not return_bytes: + logger.info(f"_make_request_async: response.text={response.text}") + else: + logger.info(f"_make_request_async: contenido binario recibido, tamaño: {len(response.content)} bytes") + response.raise_for_status() + logger.info(f"Respuesta exitosa: {response.status_code}") + if return_bytes: + return response.content + else: + result = response.json() if response.content else {} + logger.info(f"_make_request_async: result={result}") + return result + except httpx.TimeoutException as e: + logger.error(f"_make_request_async: TimeoutException: {e}") + logger.error(f"Timeout en petición a {url}: {e}") + return None + except httpx.HTTPStatusError as e: + logger.error(f"_make_request_async: HTTPStatusError: {e}") + logger.error(f"Error HTTP {e.response.status_code} en {url}: {e}") + return None + except Exception as e: + logger.error(f"_make_request_async: Exception: {e}") + logger.error(f"Error inesperado en petición a {url}: {e}") + import traceback + logger.error(traceback.format_exc()) + return None + + async def put_procesamiento(self, service_id: int, data: Dict[str, Any]) -> Dict[str, Any]: + return await self._make_request_async('PUT', f'customs/procesamientopedimentos/{service_id}/', data=data) + + async def post_procesamiento(self, data: Dict[str, Any]) -> Dict[str, Any]: + return await self._make_request_async('POST', 'customs/procesamientopedimentos/', data=data) + + async def post_document(self, soap_response=None, organizacion: str = None, + pedimento: str = None, file_name: str = None, document_type: int = 2, + binary_content: bytes = None, fuente: int = 2) -> Dict[str, Any]: + """ + Método para enviar documentos (XML, PDF, etc.) a la API. + + Args: + soap_response: Respuesta del servicio SOAP (para archivos XML) + organizacion: UUID de la organización (requerido) + pedimento: UUID del pedimento (requerido) + file_name: Nombre del archivo con extensión (requerido) + document_type: Tipo de documento + binary_content: Contenido binario del archivo (para PDFs, etc.) + """ + import datetime + import tempfile + import mimetypes + logger.info(f"post_document: file_name={file_name}, organizacion={organizacion}, pedimento={pedimento}, document_type={document_type}, fuente={fuente}") + if not soap_response and not binary_content: + logger.error("post_document: Debe proporcionar soap_response o binary_content") + return None + if not file_name: + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + file_name = f"documento_{timestamp}.bin" + try: + # Extraer extensión del nombre del archivo + file_extension = os.path.splitext(file_name)[1].lstrip('.').lower() + if not file_extension: + file_extension = 'bin' # Extensión por defecto + # Determinar Content-Type basado en la extensión + content_type_map = { + 'xml': 'application/xml', + 'pdf': 'application/pdf', + 'jpg': 'image/jpeg', + 'jpeg': 'image/jpeg', + 'png': 'image/png', + 'zip': 'application/zip', + 'doc': 'application/msword', + 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', + 'xls': 'application/vnd.ms-excel', + 'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' + } + content_type = content_type_map.get(file_extension, 'application/octet-stream') + # Determinar modo de archivo y contenido + if binary_content: + file_mode = 'wb' + temp_suffix = f'.{file_extension}' + content = binary_content + is_binary = True + else: + file_mode = 'w' + temp_suffix = f'.{file_extension}' + is_binary = False + if hasattr(soap_response, 'content'): + content = soap_response.content.decode('utf-8') + elif hasattr(soap_response, 'text'): + content = soap_response.text + else: + content = str(soap_response) + encoding = None if is_binary else 'utf-8' + with tempfile.NamedTemporaryFile(mode=file_mode, suffix=temp_suffix, delete=False, encoding=encoding) as temp_file: + temp_file.write(content) + temp_file_path = temp_file.name + headers = { + 'Authorization': f'Token {settings.API_TOKEN}' + } + file_size = os.path.getsize(temp_file_path) + document_data = { + 'organizacion': organizacion, + 'pedimento': pedimento, + 'extension': file_extension, + 'document_type': document_type, + 'size': file_size, + 'fuente': fuente + } + + url = f"{self.base_url}/record/documents/" + import httpx + async with httpx.AsyncClient(timeout=self.timeout) as client: + with open(temp_file_path, 'rb') as file: + files = { + 'archivo': (file_name, file.read(), content_type) + } + logger.info(f"post_document: files={list(files.keys())}") + response = await client.post( + url, + data=document_data, # Datos van como form-data + files=files, # Archivo va como multipart + headers=headers + ) + logger.info(f"post_document: response.status_code={response.status_code}") + os.unlink(temp_file_path) + response.raise_for_status() + result = response.json() + logger.info(f"post_document: result={result}") + logger.info(f"Documento {file_extension.upper()} enviado exitosamente: {file_name} (tamaño: {file_size} bytes)") + return result + except Exception as e: + logger.error(f"post_document: Exception: {e}") + import traceback + logger.error(traceback.format_exc()) + print(f"Error al enviar documento: {document_data}, Error: {e}") + # Limpiar archivo temporal en caso de error + if 'temp_file_path' in locals() and os.path.exists(temp_file_path): + os.unlink(temp_file_path) + return None + + +# Para Api_v1 eliminar cuando la integracion esté completa class APIController: """ Controlador para manejar las peticiones a la API. @@ -174,6 +371,10 @@ class APIController: 'fuente': fuente } + logger.info(f"post_document: url={self.base_url}/record/documents/") + logger.info(f"post_document: headers={headers}") + logger.info(f"post_document: document_data={document_data}") + logger.info(f"post_document: file_name={file_name}, file_size={file_size}, content_type={content_type}") # Subir archivo url = f"{self.base_url}/record/documents/" @@ -184,25 +385,31 @@ class APIController: files = { 'archivo': (file_name, file.read(), content_type) } - + logger.info(f"post_document: files={list(files.keys())}") response = await client.post( url, data=document_data, # Datos van como form-data files=files, # Archivo va como multipart headers=headers ) + logger.info(f"post_document: response.status_code={response.status_code}") + logger.info(f"post_document: response.text={response.text}") # Limpiar archivo temporal os.unlink(temp_file_path) response.raise_for_status() result = response.json() + logger.info(f"post_document: result={result}") - print(f"Documento {file_extension.upper()} enviado exitosamente: {file_name} (tamaño: {file_size} bytes)") + logger.info(f"Documento {file_extension.upper()} enviado exitosamente: {file_name} (tamaño: {file_size} bytes)") return result except Exception as e: - print(f"Error al enviar documento: {document_data}, Error: {e}") + logger.error(f"post_document: Exception: {e}") + import traceback + logger.error(traceback.format_exc()) + logger.error(f"Error al enviar documento {file_name if 'file_name' in locals() else 'unknown'}: {str(e)}") # Limpiar archivo temporal en caso de error if 'temp_file_path' in locals() and os.path.exists(temp_file_path): os.unlink(temp_file_path) @@ -281,6 +488,11 @@ class APIController: url = f"{self.base_url.rstrip('/')}/{endpoint.lstrip('/')}" + # No loggear data si return_bytes=True para evitar datos binarios en logs + if return_bytes: + logger.info(f"_make_request_async: method={method}, url={url}, return_bytes={return_bytes}") + else: + logger.info(f"_make_request_async: method={method}, url={url}, data={data}") logger.warning(f"Realizando petición {method} a {url}") try: async with httpx.AsyncClient(timeout=self.timeout) as client: @@ -296,25 +508,33 @@ class APIController: response = await client.delete(url, headers=self.headers) else: raise ValueError(f"Método HTTP no soportado: {method}") - + logger.info(f"_make_request_async: response.status_code={response.status_code}") + # No loggear response.text si se esperan datos binarios + if not return_bytes: + logger.info(f"_make_request_async: response.text={response.text}") + else: + logger.info(f"_make_request_async: contenido binario recibido, tamaño: {len(response.content)} bytes") response.raise_for_status() logger.info(f"Respuesta exitosa: {response.status_code}") if return_bytes: return response.content else: result = response.json() if response.content else {} + logger.info(f"_make_request_async: result={result}") return result - except httpx.TimeoutException as e: + logger.error(f"_make_request_async: TimeoutException: {e}") logger.error(f"Timeout en petición a {url}: {e}") return None except httpx.HTTPStatusError as e: + logger.error(f"_make_request_async: HTTPStatusError: {e}") logger.error(f"Error HTTP {e.response.status_code} en {url}: {e}") - return None except Exception as e: + logger.error(f"_make_request_async: Exception: {e}") logger.error(f"Error inesperado en petición a {url}: {e}") import traceback + logger.error(traceback.format_exc()) return None diff --git a/controllers/SOAPController.py b/controllers/SOAPController.py index d537b1b..0673915 100644 --- a/controllers/SOAPController.py +++ b/controllers/SOAPController.py @@ -5,21 +5,95 @@ import httpx import datetime import time -class SOAPController: - """ - Controlador para manejar las peticiones SOAP. - """ + + +class VUCEMController: + import ssl + # Contexto SSL personalizado para permitir claves DH pequeñas + ssl_context = ssl.create_default_context() + ssl_context.set_ciphers('DEFAULT@SECLEVEL=1') + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE def __init__(self): self.base_url = settings.SOAP_SERVICE_URL self.timeout = settings.TIMEOUT # Timeout por default + + async def make_request(self, endpoint, data=None, headers=None, max_retries=5): + intento = 0 + while intento < settings.MAX_RETRIES: + try: + with httpx.Client(verify=self.ssl_context, timeout=self.timeout) as client: + content = data.encode('utf-8') if data else None + response = client.post( + f"{self.base_url}/{endpoint}", + content=content, + headers=headers + ) + response.raise_for_status() + return response # ✅ éxito + except Exception as e: + intento += 1 + wait_time = 0 + print(f"[{endpoint}] Error intento {intento}: {e}. Reintentando en {settings.WAIT_TIME}s...") + time.sleep(settings.WAIT_TIME) + + print(f"[{endpoint}] Fallo tras {settings.MAX_RETRIES} intentos.") + return None + + async def make_request_async(self, endpoint, data=None, headers=None, max_retries=5): + """ + Método asíncrono para hacer peticiones SOAP sin bloquear el event loop + + Args: + endpoint: El endpoint al que se va a hacer la petición + data: Los datos a enviar en la petición + headers: Los headers HTTP a incluir en la petición + max_retries: Número máximo de reintentos en caso de fallo + + Returns: + La respuesta de la petición, o None si falla tras los reintentos + """ + import asyncio + intento = 0 + while intento < settings.MAX_RETRIES: + try: + async with httpx.AsyncClient(verify=self.ssl_context, timeout=self.timeout) as client: + content = data.encode('utf-8') if data else None + response = await client.post( + f"{self.base_url}/{endpoint}", + content=content, + headers=headers + ) + response.raise_for_status() + return response # ✅ éxito + except Exception as e: + intento += 1 + print(f"[{endpoint}] Error intento {intento}: {e}. Reintentando en {settings.WAIT_TIME}s...") + if intento < settings.MAX_RETRIES: + await asyncio.sleep(settings.WAIT_TIME) # ASYNC SLEEP! + + print(f"[{endpoint}] Fallo tras {settings.MAX_RETRIES} intentos.") + return None + + +class SOAPController: + """ + Controlador para manejar las peticiones SOAP. + """ + import ssl # Contexto SSL personalizado para permitir claves DH pequeñas ssl_context = ssl.create_default_context() ssl_context.set_ciphers('DEFAULT@SECLEVEL=1') ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE + def __init__(self): + self.base_url = settings.SOAP_SERVICE_URL + self.timeout = settings.TIMEOUT # Timeout por default + + async def make_request(self, endpoint, data=None, headers=None, max_retries=5): intento = 0 diff --git a/core/config.py b/core/config.py index 7fb17cf..ba4d939 100644 --- a/core/config.py +++ b/core/config.py @@ -28,12 +28,14 @@ class Settings(BaseSettings): # Configuración del servidor HOST: str = "0.0.0.0" PORT: int = 8001 - + + # Configuración de Celery + CELERY_BROKER_URL: str = "" + CELERY_RESULT_BACKEND: str = "" + # Configuración de seguridad SECRET_KEY: str = "your-secret-key-here" ALGORITHM: str = "HS256" - ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 - model_config = {"env_file": ".env"} diff --git a/main.py b/main.py index d7f042d..5316b1f 100644 --- a/main.py +++ b/main.py @@ -48,6 +48,7 @@ def create_application() -> FastAPI: description="EFC Microservice - Un microservicio profesional por AduanaSoft con soporte para tareas asíncronas", version=settings.APP_VERSION, debug=settings.DEBUG, + swagger_ui_parameters={"theme": "light"} ) # Configuración adicional para loggers específicos diff --git a/requirements.txt b/requirements.txt index cd56ab5..9289f93 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,26 +1,45 @@ +amqp==5.3.1 annotated-types==0.7.0 anyio==4.9.0 +billiard==4.2.2 +celery==5.3.4 certifi==2025.6.15 +cffi==2.0.0 charset-normalizer==3.4.2 click==8.2.1 +click-didyoumean==0.3.1 +click-plugins==1.1.1.2 +click-repl==0.3.0 +cryptography==46.0.1 +ecdsa==0.19.1 fastapi==0.116.0 h11==0.16.0 httpcore==1.0.9 httpx==0.28.1 idna==3.10 +kombu==5.5.4 +packaging==25.0 +prompt_toolkit==3.0.52 +pyasn1==0.6.1 +pycparser==2.23 pydantic==2.11.7 pydantic-settings==2.10.1 pydantic_core==2.33.2 +python-dateutil==2.9.0.post0 python-dotenv==1.1.1 +python-jose==3.5.0 +redis==5.0.1 requests==2.32.4 +rsa==4.9.1 +setuptools==80.9.0 +six==1.17.0 sniffio==1.3.1 starlette==0.46.2 +supervisor==4.2.5 typing-inspection==0.4.1 typing_extensions==4.14.1 +tzdata==2025.2 urllib3==2.5.0 uvicorn==0.35.0 -python-dotenv -cryptography -celery==5.3.4 -redis==5.0.1 -supervisor==4.2.5 +vine==5.1.0 +wcwidth==0.2.14 diff --git a/schemas/CredencialSchema.py b/schemas/CredencialSchema.py index 7b79a28..ed10976 100644 --- a/schemas/CredencialSchema.py +++ b/schemas/CredencialSchema.py @@ -1,15 +1,53 @@ -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator from typing import Optional from uuid import UUID -from schemas.importadorSchema import ImportadorBaseSchema class CredencialBaseSchema(BaseModel): - importadores: ImportadorBaseSchema + id: str = Field(..., description="UUID de la credencial") user: str = Field(..., description="Usuario de la credencial") password: str = Field(..., description="Contraseña de la credencial") efirma: str = Field(..., description="E-firma de la credencial") key: str = Field(..., description="Key de la credencial") cer: str = Field(..., description="Cer de la credencial") is_active: bool = Field(..., description="Indica si la credencial está activa") - organizacion: UUID = Field(..., description="ID de la organización asociada") \ No newline at end of file + organizacion: UUID = Field(..., description="ID de la organización asociada") + + @field_validator('id') + def validate_id(cls, v): + if not v or not isinstance(v, str): + raise ValueError('id must be a non-empty string') + return v + + @field_validator('user') + def validate_user(cls, v): + if not v or not isinstance(v, str): + raise ValueError('user must be a non-empty string') + return v + @field_validator('password') + def validate_password(cls, v): + if not v or not isinstance(v, str): + raise ValueError('password must be a non-empty string') + return v + @field_validator('efirma') + def validate_efirma(cls, v): + if not v or not isinstance(v, str): + raise ValueError('efirma must be a non-empty string') + return v + @field_validator('key') + def validate_key(cls, v): + if not v or not isinstance(v, str): + raise ValueError('key must be a non-empty string') + return v + @field_validator('cer') + def validate_cer(cls, v): + if not v or not isinstance(v, str): + raise ValueError('cer must be a non-empty string') + return v + @field_validator('organizacion') + def validate_organizacion(cls, v): + if not v or not isinstance(v, UUID): + raise ValueError('organizacion must be a valid UUID') + return v + + \ No newline at end of file diff --git a/test.xml b/test.xml new file mode 100644 index 0000000..3a936ef --- /dev/null +++ b/test.xml @@ -0,0 +1,73 @@ + + + + + + 2025-10-04T02:03:34Z + 2025-10-04T02:04:34Z + + + + + + false + + COVE2040ICIR4 + 1 + 1 + + + COVE2040OQV98 + 2 + 2 + + + COVE2040OQVJ5 + 3 + 3 + + + COVE2040OQVS2 + 4 + 4 + + + COVE2040OQVT3 + 5 + 5 + + + COVE2040OQW01 + 6 + 6 + + + COVE2040OQW17 + 7 + 7 + + + COVE2040OQW66 + 8 + 8 + + + COVE2040OQWB5 + 9 + 9 + + + + \ No newline at end of file diff --git a/utils/helpers.py b/utils/helpers.py new file mode 100644 index 0000000..7e2c826 --- /dev/null +++ b/utils/helpers.py @@ -0,0 +1,15 @@ +def soap_error(soap_response): # Testeado + """ + Verifica si la respuesta SOAP no contiene errores. + + Args: + soap_response: Respuesta del servicio SOAP + + Returns: + bool: True si no hay errores, False en caso contrario + """ + if 'true' in soap_response.text: + return True + + # Aquí podrías agregar más lógica para verificar errores específicos en el XML + return False \ No newline at end of file