106 lines
4.2 KiB
Python
106 lines
4.2 KiB
Python
import base64
|
|
from http.client import HTTPException
|
|
import os
|
|
from controllers.RESTController import rest_controller
|
|
from controllers.SOAPController import soap_controller
|
|
from cryptography.hazmat.primitives import serialization, hashes
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
from cryptography.hazmat.primitives.serialization import load_der_private_key
|
|
import tempfile
|
|
|
|
|
|
def sign_chain_original(key_path: str, password: str, cadena_original: str) -> str:
|
|
with open(key_path, 'rb') as key_file:
|
|
private_key = load_der_private_key(
|
|
key_file.read(),
|
|
password=password.encode() if password else None
|
|
)
|
|
|
|
signature = private_key.sign(
|
|
cadena_original.encode(),
|
|
padding.PKCS1v15(),
|
|
hashes.SHA256()
|
|
)
|
|
|
|
return base64.b64encode(signature).decode()
|
|
|
|
async def fetch_sign_and_cer(cadena_original: str, username: str, credenciales: dict, **kwargs):
|
|
cer = await rest_controller.get_cer(credenciales['id'])
|
|
if cer is None:
|
|
raise HTTPException(status_code=500, detail="No se pudo obtener el certificado para firmar el COVE")
|
|
certificado = base64.b64encode(cer).decode('utf-8')
|
|
|
|
# Obtener la key como binario y guardarla en un archivo temporal
|
|
import tempfile
|
|
key_bytes = await rest_controller.get_key(credenciales['id'])
|
|
if key_bytes is None:
|
|
raise HTTPException(status_code=500, detail="No se pudo obtener la llave privada para firmar el COVE")
|
|
with tempfile.NamedTemporaryFile(delete=False) as tmp_key_file:
|
|
tmp_key_file.write(key_bytes)
|
|
tmp_key_path = tmp_key_file.name
|
|
|
|
# Usar la ruta temporal para firmar
|
|
firma = sign_chain_original(tmp_key_path, credenciales['efirma'], cadena_original)
|
|
return firma, certificado, tmp_key_path
|
|
|
|
async def consume_ws_get_cove(**kwargs):
|
|
|
|
# valdiar kwargs
|
|
|
|
# Cadena original que vas a firmar
|
|
|
|
try:
|
|
cadena_original = f"|{username}|{cove['numero_cove']}|"
|
|
firma, certificado, tmp_key_path = await fetch_sign_and_cer(cadena_original, username, credenciales, **kwargs)
|
|
os.remove(tmp_key_path) # Eliminar el archivo temporal después de usarlo
|
|
|
|
soap_xml = soap_controller.generate_cove_template(
|
|
username=username,
|
|
password=credenciales['password'],
|
|
certificado=certificado,
|
|
firma=firma,
|
|
cove=cove,
|
|
)
|
|
|
|
soap_headers = {
|
|
'Content-Type': 'text/xml; charset=utf-8',
|
|
'SOAPAction': '',
|
|
#'Accept-Encoding': 'gzip,deflate',
|
|
}
|
|
|
|
soap_response = await soap_controller.make_request_async(
|
|
"ventanilla/ConsultarEdocumentService?wsdl",
|
|
data=soap_xml,
|
|
headers=soap_headers
|
|
)
|
|
|
|
if (soap_response) and (not soap_error(soap_response)):
|
|
remesas = 1 if response_service['pedimento'].get('remesas', 0) else 0
|
|
patente = response_service['pedimento'].get('patente', 'N/A')
|
|
aduana = response_service['pedimento'].get('aduana', 'N/A')
|
|
no_partidas = response_service['pedimento'].get('numero_partidas', 0)
|
|
tipo_operacion = response_service['pedimento'].get('tipo_operacion', 'N/A')
|
|
pedimento = response_service['pedimento'].get('pedimento', 'N/A')
|
|
_file_name = f"vu_COVE_{remesas}{no_partidas}{tipo_operacion}_{aduana}_{patente}_{pedimento}_{cove['numero_cove']}.xml"
|
|
|
|
document_response = await rest_controller.post_document(
|
|
soap_response=soap_response,
|
|
organizacion=response_service['organizacion'],
|
|
pedimento=response_service['pedimento']['id'],
|
|
file_name=_file_name,
|
|
document_type=8,
|
|
)
|
|
|
|
return {
|
|
"servicio": response_service,
|
|
"documento": document_response
|
|
}
|
|
else:
|
|
raise HTTPException(status_code=500, detail="Error en la petición SOAP al servicio VUCEM")
|
|
except HTTPException:
|
|
# Re-lanzar HTTPExceptions sin modificar
|
|
raise
|
|
except Exception as e:
|
|
import traceback
|
|
raise HTTPException(status_code=500, detail=f"Error interno al procesar acuse cove: {str(e)}")
|