Se adapto el codigo para que funcione con los elementos de credentials
This commit is contained in:
@@ -253,6 +253,24 @@ class APIController:
|
||||
"""
|
||||
return await self._make_request_async('PUT', f'customs/edocuments/{edocument_id}/', data=data)
|
||||
|
||||
async def get_cer(self, id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Método para obtener un certificado específico desde la API.
|
||||
|
||||
Args:
|
||||
id: UUID del certificado a consultar
|
||||
"""
|
||||
return await self._make_request_async('GET', f'vucem/vucem/{id}/download_cer/')
|
||||
|
||||
async def get_key(self, id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Método para obtener una llave específica desde la API.
|
||||
|
||||
Args:
|
||||
id: UUID de la llave a consultar
|
||||
"""
|
||||
return await self._make_request_async('GET', f'vucem/vucem/{id}/download_key/')
|
||||
|
||||
async def _make_request_async(self, method: str, endpoint: str, data=None):
|
||||
"""
|
||||
Método asíncrono para hacer peticiones a la API usando httpx.
|
||||
|
||||
@@ -34,10 +34,6 @@ class Settings(BaseSettings):
|
||||
ALGORITHM: str = "HS256"
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
|
||||
|
||||
# cer key y contrasena uso temporal
|
||||
KEY_PASSWORD: str = ""
|
||||
CERT_PATH: str = ""
|
||||
KEY_PATH: str = ""
|
||||
|
||||
model_config = {"env_file": ".env"}
|
||||
|
||||
|
||||
@@ -24,10 +24,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
from controllers.XMLController import xml_controller
|
||||
|
||||
def load_cert_base64(cert_path: str) -> str:
|
||||
with open(cert_path, 'rb') as cert_file:
|
||||
cert_data = cert_file.read()
|
||||
return base64.b64encode(cert_data).decode()
|
||||
|
||||
def sign_chain_original(key_path: str, password: str, cadena_original: str) -> str:
|
||||
with open(key_path, 'rb') as key_file:
|
||||
@@ -44,7 +40,6 @@ def sign_chain_original(key_path: str, password: str, cadena_original: str) -> s
|
||||
|
||||
return base64.b64encode(signature).decode()
|
||||
|
||||
|
||||
def validate_pedimento_data(response_service: Dict[str, Any], credenciales: Dict[str, Any]) -> tuple: # Testeado
|
||||
"""
|
||||
Valida y extrae los datos necesarios para la petición SOAP.
|
||||
@@ -934,8 +929,21 @@ async def get_soap_cove(credenciales, response_service, soap_controller, cove, i
|
||||
cadena_original = f"|{username}|{cove['numero_cove']}|"
|
||||
|
||||
# Obtener certificado base64 y firma
|
||||
certificado = load_cert_base64(settings.CERT_PATH)
|
||||
firma = sign_chain_original(settings.KEY_PATH, settings.KEY_PASSWORD, cadena_original)
|
||||
cer = rest_controller.get_cer(credenciales['id'])
|
||||
certificado = base64.b64encode(cer).decode('utf-8')
|
||||
|
||||
# Obtener la key como binario y guardarla en un archivo temporal
|
||||
import tempfile
|
||||
key_bytes = rest_controller.get_key(credenciales['id'])
|
||||
with tempfile.NamedTemporaryFile(delete=False) as tmp_key_file:
|
||||
tmp_key_file.write(key_bytes)
|
||||
tmp_key_path = tmp_key_file.name
|
||||
|
||||
# Usar la ruta temporal para firmar
|
||||
firma = sign_chain_original(tmp_key_path, credenciales['efirma'], cadena_original)
|
||||
|
||||
# Eliminar el archivo temporal después de firmar
|
||||
os.remove(tmp_key_path)
|
||||
|
||||
logger.info(f"Datos para SOAP - Usuario: {username}, Aduana: {aduana}, Patente: {patente}, Pedimento: {pedimento}, Numero Operacion: {numero_operacion}")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user