639 lines
23 KiB
Python
639 lines
23 KiB
Python
import base64
|
|
import os
|
|
import logging
|
|
import re
|
|
import tempfile
|
|
from typing import Any, Dict, List, Optional
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from fastapi import HTTPException
|
|
from controllers.RESTController import rest_controller
|
|
from controllers.SOAPController import soap_controller
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
from cryptography.hazmat.primitives.serialization import load_der_private_key
|
|
|
|
from utils.helpers import soap_error
|
|
from .controllers import coves_vu_controller, coves_rest_controller
|
|
from ..common import create_service_response, create_error_response
|
|
|
|
# Logger para el módulo
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Logica de negocio para consumir el servicio SOAP de VUCEM y procesar la respuesta
|
|
|
|
|
|
async def consume_ws_get_cove(**kwargs):
|
|
"""
|
|
Consume el servicio SOAP para obtener un COVE y procesar la respuesta.
|
|
|
|
Args:
|
|
**kwargs: Debe contener 'credencial', 'pedimento' y 'cove'
|
|
|
|
Returns:
|
|
Dict serializable con 'documento' y 'cove_put_response'
|
|
|
|
Raises:
|
|
Exception: Si hay errores en el procesamiento
|
|
"""
|
|
try:
|
|
logger.info("Iniciando procesamiento de COVE")
|
|
|
|
credenciales = kwargs.get('credencial')
|
|
username = credenciales.get('user')
|
|
pedimento_app = kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')
|
|
cove = kwargs['cove'].get('cove', None)
|
|
|
|
if not credenciales or not username or not cove:
|
|
missing = []
|
|
if not credenciales:
|
|
missing.append("credenciales")
|
|
if not username:
|
|
missing.append("nombre de usuario")
|
|
if not cove:
|
|
missing.append("número de COVE")
|
|
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=create_error_response(
|
|
message="Datos incompletos para procesar COVE",
|
|
errors=[f"Falta: {', '.join(missing)}"],
|
|
metadata={"provided_data": {
|
|
"has_credentials": bool(credenciales),
|
|
"has_username": bool(username),
|
|
"has_cove": bool(cove)
|
|
}}
|
|
)
|
|
)
|
|
|
|
logger.info(f"Procesando COVE: {cove} para usuario: {username}")
|
|
|
|
# Generar cadena original y obtener firma/certificado
|
|
cadena_original = f"|{credenciales.get('user')}|{cove}|"
|
|
firma, certificado, tmp_key_path = await fetch_sign_and_cer(cadena_original, username, credenciales)
|
|
|
|
# Limpiar archivo temporal inmediatamente
|
|
try:
|
|
os.remove(tmp_key_path)
|
|
logger.debug("Archivo temporal de llave eliminado")
|
|
except Exception as e:
|
|
logger.warning(f"Error al eliminar archivo temporal: {e}")
|
|
|
|
# Generar template SOAP
|
|
soap_xml = coves_vu_controller.generate_cove_template(
|
|
username=username,
|
|
password=credenciales['password'],
|
|
certificado=certificado,
|
|
firma=firma,
|
|
cove=cove,
|
|
)
|
|
|
|
# Enviar documento de request a EFC
|
|
try:
|
|
file_name_request = f"vu_COVE_{pedimento_app}_{cove}_REQUEST.xml"
|
|
document_response = await coves_rest_controller.post_document(
|
|
soap_response=soap_xml,
|
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
|
pedimento=kwargs.get('pedimento').get('id'),
|
|
file_name=file_name_request,
|
|
document_type=19,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error al enviar documento request: {e}")
|
|
|
|
soap_headers = {
|
|
'Content-Type': 'text/xml; charset=utf-8',
|
|
'SOAPAction': '',
|
|
}
|
|
|
|
logger.info("Enviando petición SOAP a VUCEM")
|
|
soap_response = await coves_vu_controller.make_request_async(
|
|
"ventanilla/ConsultarEdocumentService?wsdl",
|
|
data=soap_xml,
|
|
headers=soap_headers
|
|
)
|
|
|
|
if not soap_response:
|
|
raise Exception("No se recibió respuesta del servicio SOAP")
|
|
|
|
if soap_error(soap_response):
|
|
document_response = await coves_rest_controller.post_document(
|
|
soap_response=soap_response,
|
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
|
pedimento=kwargs.get('pedimento').get('id'),
|
|
file_name=f"vu_COVE_{pedimento_app}_{cove}_ERROR.xml",
|
|
document_type=20,
|
|
)
|
|
|
|
raise Exception("Error en la respuesta del servicio SOAP")
|
|
|
|
logger.info("Respuesta SOAP exitosa, enviando documento")
|
|
|
|
# Enviar documento
|
|
_file_name = f"vu_COVE_{pedimento_app}_{cove}.xml"
|
|
try:
|
|
document_response = await coves_rest_controller.post_document(
|
|
soap_response=soap_response,
|
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
|
pedimento=kwargs.get('pedimento').get('id'),
|
|
file_name=_file_name,
|
|
document_type=8,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error detectado en la respuesta SOAP: {str(e)}")
|
|
raise Exception(f"Error en la respuesta SOAP: {str(e)}")
|
|
|
|
logger.info("Documento enviado, actualizando status de COVE")
|
|
|
|
# Actualizar status del COVE
|
|
cove_status_response = await change_cove_status(
|
|
cove=kwargs.get('cove'),
|
|
status=True,
|
|
pedimento=kwargs.get('pedimento')
|
|
)
|
|
|
|
logger.info(f"COVE {cove} procesado exitosamente")
|
|
|
|
return create_service_response(
|
|
message=f"COVE {cove} procesado exitosamente",
|
|
data={
|
|
"documento": document_response if document_response else None,
|
|
"cove_update_response": cove_status_response if cove_status_response else None
|
|
},
|
|
metadata={
|
|
"cove_number": cove,
|
|
"file_name": _file_name,
|
|
"document_type": 8,
|
|
"pedimento_app": pedimento_app,
|
|
"username": username,
|
|
"organizacion": kwargs.get('pedimento', {}).get('organizacion')
|
|
}
|
|
)
|
|
|
|
except HTTPException as he:
|
|
raise he
|
|
except Exception as e:
|
|
logger.error(f"Error procesando COVE: {str(e)}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=create_error_response(
|
|
message="Error interno al procesar COVE",
|
|
errors=[str(e)],
|
|
metadata={
|
|
"cove_number": cove,
|
|
"username": username,
|
|
"pedimento_app": pedimento_app
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
async def consume_ws_get_acuse_cove(**kwargs):
|
|
credenciales = kwargs.get('credencial')
|
|
soap_headers = {
|
|
'Content-Type': 'text/xml; charset=utf-8',
|
|
'SOAPAction': 'http://www.ventanillaunica.gob.mx/ventanilla/ConsultaAcusesService/consultarAcuseCove',
|
|
'Accept-Encoding': 'gzip,deflate',
|
|
}
|
|
|
|
soap_xml = coves_vu_controller.generate_acuse_template(
|
|
username=credenciales.get('user'),
|
|
password=credenciales.get('password'),
|
|
cove=kwargs['cove'].get('cove', None),
|
|
)
|
|
|
|
# Enviar documento de request a EFC
|
|
try:
|
|
file_name_request = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_REQUEST.xml"
|
|
document_response = await coves_rest_controller.post_document(
|
|
soap_response=soap_xml,
|
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
|
pedimento=kwargs.get('pedimento').get('id'),
|
|
file_name=file_name_request,
|
|
document_type=23,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error al enviar documento request de acuse cove: {e}")
|
|
|
|
response = await coves_vu_controller.make_request_async(
|
|
"ventanilla-acuses-HA/ConsultaAcusesServiceWS?wsdl",
|
|
data=soap_xml,
|
|
headers=soap_headers
|
|
)
|
|
|
|
if response is None:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=create_error_response(
|
|
message="Error al contactar el servicio SOAP",
|
|
errors=["No se obtuvo respuesta del servicio SOAP"],
|
|
metadata={
|
|
"cove_number": kwargs['cove'].get('cove'),
|
|
"pedimento_app": kwargs.get('pedimento', {}).get('pedimento_app')
|
|
}
|
|
)
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
raise HTTPException(
|
|
status_code=response.status_code,
|
|
detail=create_error_response(
|
|
message="Error en la solicitud SOAP",
|
|
errors=[f"Código de estado: {response.status_code}"],
|
|
data={"soap_response": response.text[:500]},
|
|
metadata={
|
|
"status_code": response.status_code,
|
|
"cove_number": kwargs['cove'].get('cove')
|
|
}
|
|
)
|
|
)
|
|
|
|
if soap_error(response):
|
|
error_file_name = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_ERROR.xml"
|
|
try:
|
|
rest_response = await coves_rest_controller.post_document(
|
|
soap_response=response,
|
|
organizacion=kwargs.get('pedimento').get('organizacion'),
|
|
pedimento=kwargs.get('pedimento').get('id'),
|
|
file_name=error_file_name,
|
|
document_type=24,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error al guardar respuesta SOAP errónea: {e}")
|
|
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=create_error_response(
|
|
message="Error en la respuesta del servicio SOAP",
|
|
errors=["Se detectó un error en la respuesta SOAP"],
|
|
data={"error_file": error_file_name} if 'rest_response' in locals() else None,
|
|
metadata={
|
|
"cove_number": kwargs['cove'].get('cove'),
|
|
"document_type": 10
|
|
}
|
|
)
|
|
)
|
|
if (response) and (not soap_error(response)):
|
|
logger.debug(f"Respuesta SOAP recibida, extrayendo acuse...")
|
|
acuse_base64 = _extract_acuse_data(response.text)
|
|
|
|
if acuse_base64 is None:
|
|
logger.error(
|
|
"No se encontró elemento acuseDocumento en la respuesta")
|
|
logger.debug(
|
|
f"Contenido de respuesta (primeros 1000 chars): {response.text}")
|
|
else:
|
|
logger.error("Error en respuesta SOAP o soap_error detectado")
|
|
logger.debug(
|
|
f"Contenido de respuesta con error: {response.text if response else 'No response'}")
|
|
|
|
if acuse_base64 is None:
|
|
# Log de la respuesta SOAP para debugging
|
|
logger.error(f"Contenido de respuesta SOAP: {response.text}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="No se pudo extraer el acuse del documento de la respuesta SOAP. Verifique el log para más detalles."
|
|
)
|
|
|
|
pdf_bytes = _decode_acuse_base64_content(acuse_base64)
|
|
|
|
if not pdf_bytes:
|
|
raise HTTPException(
|
|
status_code=500, detail="No se pudo decodificar el documento del acuse")
|
|
|
|
# Validar que el PDF sea válido
|
|
if not pdf_bytes.startswith(b'%PDF'):
|
|
logger.warning("El contenido decodificado no parece ser un PDF válido")
|
|
|
|
# Mejorar el nombre del archivo usando todos los datos relevantes
|
|
pedimento = kwargs.get('pedimento', {})
|
|
pedimento_num = pedimento.get('pedimento', '')
|
|
_file_name = _get_file_name(**kwargs)
|
|
|
|
# Validar que organización y pedimento no sean None
|
|
organizacion = pedimento.get("organizacion", None)
|
|
pedimento_id = pedimento.get("id", None)
|
|
|
|
rest_response = await coves_rest_controller.post_document(
|
|
binary_content=pdf_bytes,
|
|
organizacion=organizacion,
|
|
pedimento=pedimento_id,
|
|
file_name=_file_name,
|
|
document_type=7
|
|
)
|
|
|
|
acuse_status = await change_acuse_status(
|
|
cove=kwargs.get('cove'),
|
|
status=True,
|
|
pedimento=kwargs.get('pedimento')
|
|
)
|
|
|
|
return create_service_response(
|
|
message="Acuse de COVE procesado exitosamente",
|
|
data={
|
|
"document_response": rest_response,
|
|
"file_name": _file_name,
|
|
"pedimento": pedimento_num,
|
|
"acuse_update": acuse_status
|
|
},
|
|
metadata={
|
|
"document_type": 7,
|
|
"pedimento_app": pedimento.get('pedimento_app'),
|
|
"organizacion": organizacion,
|
|
"cove_number": kwargs['cove'].get('cove'),
|
|
"content_type": "application/pdf"
|
|
}
|
|
)
|
|
|
|
|
|
def _decode_acuse_base64_content(base64_content): # Testeado
|
|
"""
|
|
Decodifica el contenido Base64 del acuse y limpia caracteres especiales.
|
|
|
|
Args:
|
|
base64_content (str): Contenido codificado en Base64
|
|
|
|
Returns:
|
|
bytes: Contenido decodificado o None si hay error
|
|
"""
|
|
try:
|
|
# Limpiar el contenido Base64 de manera exhaustiva
|
|
cleaned_content = base64_content
|
|
|
|
# Remover entidades HTML/XML como 
, 
, etc.
|
|
cleaned_content = re.sub(r'&#x[0-9a-fA-F]+;', '', cleaned_content)
|
|
cleaned_content = re.sub(r'&#[0-9]+;', '', cleaned_content)
|
|
|
|
# Remover espacios en blanco, saltos de línea, etc.
|
|
cleaned_content = re.sub(r'[\s\n\r\t]', '', cleaned_content)
|
|
|
|
# Remover caracteres no válidos para Base64
|
|
cleaned_content = re.sub(r'[^A-Za-z0-9+/=]', '', cleaned_content)
|
|
|
|
# Agregar padding si es necesario
|
|
missing_padding = len(cleaned_content) % 4
|
|
if missing_padding:
|
|
cleaned_content += '=' * (4 - missing_padding)
|
|
|
|
# Decodificar Base64
|
|
decoded_content = base64.b64decode(cleaned_content)
|
|
|
|
return decoded_content
|
|
|
|
except Exception as e:
|
|
|
|
# Intentar con validación estricta deshabilitada
|
|
try:
|
|
decoded_content = base64.b64decode(cleaned_content, validate=False)
|
|
return decoded_content
|
|
except Exception as e2:
|
|
return None
|
|
|
|
|
|
def _extract_acuse_data(soap_response_text: str) -> str:
|
|
"""
|
|
Extrae el contenido base64 del acuse desde la respuesta SOAP.
|
|
|
|
Args:
|
|
soap_response_text: Texto completo de la respuesta SOAP
|
|
|
|
Returns:
|
|
str: Contenido base64 del acuse o None si no se encuentra
|
|
"""
|
|
try:
|
|
logger.debug("Iniciando extracción de datos del acuse")
|
|
|
|
# Primero, extraer la parte XML del contenido multipart
|
|
xml_start = soap_response_text.find('<?xml')
|
|
if xml_start == -1:
|
|
logger.error("No se encontró inicio de XML en la respuesta")
|
|
return None
|
|
|
|
# Extraer solo la parte XML
|
|
xml_content = soap_response_text[xml_start:]
|
|
|
|
# Si hay más contenido multipart después, cortarlo
|
|
boundary_end = xml_content.find('--uuid:')
|
|
if boundary_end != -1:
|
|
xml_content = xml_content[:boundary_end]
|
|
|
|
logger.debug("XML extraído, parseando contenido...")
|
|
|
|
# Parsear el XML
|
|
root = ET.fromstring(xml_content.strip())
|
|
|
|
# Log de la estructura XML para debugging
|
|
logger.debug(f"Elemento raíz: {root.tag}")
|
|
logger.debug(
|
|
f"Namespaces encontrados: {root.nsmap if hasattr(root, 'nsmap') else 'No disponible'}")
|
|
|
|
# Buscar el elemento acuseDocumento con diferentes estrategias
|
|
namespaces = {
|
|
'S': 'http://schemas.xmlsoap.org/soap/envelope/',
|
|
'ns3': 'http://www.ventanillaunica.gob.mx/ws/consulta/acuses/',
|
|
'soap': 'http://schemas.xmlsoap.org/soap/envelope/',
|
|
'ns1': 'http://www.ventanillaunica.gob.mx/ws/consulta/acuses/'
|
|
}
|
|
|
|
# Estrategia 1: Con namespace ns3
|
|
acuse_elemento = root.find(
|
|
'.//ns3:responseConsultaAcuses/acuseDocumento', namespaces)
|
|
|
|
if acuse_elemento is None:
|
|
# Estrategia 2: Con namespace ns1
|
|
acuse_elemento = root.find(
|
|
'.//ns1:responseConsultaAcuses/acuseDocumento', namespaces)
|
|
|
|
if acuse_elemento is None:
|
|
# Estrategia 3: Sin namespace específico
|
|
acuse_elemento = root.find('.//acuseDocumento')
|
|
|
|
if acuse_elemento is None:
|
|
# Estrategia 4: Buscar cualquier elemento que contenga "acuse" o "documento"
|
|
for elem in root.iter():
|
|
if 'acuse' in elem.tag.lower() and 'documento' in elem.tag.lower():
|
|
acuse_elemento = elem
|
|
break
|
|
elif elem.tag.endswith('acuseDocumento'):
|
|
acuse_elemento = elem
|
|
break
|
|
|
|
if acuse_elemento is None:
|
|
# Log de todos los elementos para debugging
|
|
logger.error(
|
|
"No se encontró elemento acuseDocumento. Elementos disponibles:")
|
|
for elem in root.iter():
|
|
logger.error(
|
|
f" - {elem.tag}: {elem.text[:50] if elem.text else 'Sin contenido'}...")
|
|
if 'acuse' in elem.tag.lower():
|
|
logger.error(f"Elemento similar encontrado: {elem.tag}")
|
|
return None
|
|
|
|
if acuse_elemento is not None and acuse_elemento.text:
|
|
logger.debug(
|
|
f"Acuse encontrado, longitud: {len(acuse_elemento.text)} caracteres")
|
|
return acuse_elemento.text.strip()
|
|
else:
|
|
logger.error(
|
|
"Elemento acuseDocumento encontrado pero sin contenido de texto")
|
|
return None
|
|
|
|
except ET.ParseError as e:
|
|
logger.error(f"Error parseando XML: {e}")
|
|
logger.debug(
|
|
f"Contenido XML problemático: {xml_content[:500] if 'xml_content' in locals() else 'No disponible'}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error general extrayendo acuse: {e}")
|
|
return None
|
|
|
|
|
|
def _get_file_name(**kwargs) -> dict:
|
|
pedimento = kwargs.get('pedimento', {})
|
|
pedimento_app = pedimento.get('pedimento_app', 'N/A')
|
|
cove = kwargs['cove'].get('cove', 'N/A')
|
|
_file_name = f"vu_AC_COVE_{pedimento_app}_{cove}.pdf"
|
|
return _file_name
|
|
|
|
|
|
async def change_cove_status(cove: dict, status: bool, pedimento: dict):
|
|
data = {
|
|
"id": cove.get("id"),
|
|
"cove_descargado": status,
|
|
"numero_cove": cove.get("cove"),
|
|
"pedimento": pedimento.get("id"),
|
|
"organizacion": pedimento.get("organizacion"),
|
|
}
|
|
|
|
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
|
|
|
|
return response
|
|
|
|
|
|
async def change_acuse_status(cove: dict, status: bool, pedimento: dict):
|
|
data = {
|
|
"id": cove.get("id"),
|
|
"acuse_cove_descargado": status,
|
|
"numero_cove": cove.get("cove"),
|
|
"pedimento": pedimento.get("id"),
|
|
"organizacion": pedimento.get("organizacion"),
|
|
}
|
|
|
|
print(data)
|
|
response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data)
|
|
|
|
return response
|
|
|
|
|
|
async def fetch_sign_and_cer(cadena_original: str, username: str, credenciales: dict):
|
|
"""
|
|
Obtiene certificado y llave, genera la firma para la cadena original.
|
|
|
|
Args:
|
|
cadena_original: Cadena a firmar
|
|
username: Usuario de VUCEM
|
|
credenciales: Diccionario con credenciales
|
|
|
|
Returns:
|
|
tuple: (firma_base64, certificado_base64, ruta_archivo_temporal)
|
|
|
|
Raises:
|
|
Exception: Si no se pueden obtener los certificados o generar la firma
|
|
"""
|
|
try:
|
|
logger.debug("Obteniendo certificado desde API")
|
|
|
|
# Obtener certificado como bytes
|
|
cer = await coves_rest_controller.get_cer(credenciales['id'])
|
|
if cer is None:
|
|
raise Exception(
|
|
"No se pudo obtener el certificado para firmar el COVE")
|
|
|
|
# Convertir certificado a base64 string
|
|
certificado = base64.b64encode(cer).decode('utf-8')
|
|
logger.debug("Certificado obtenido y codificado exitosamente")
|
|
|
|
# Obtener llave privada como bytes
|
|
logger.debug("Obteniendo llave privada desde API")
|
|
key_bytes = await coves_rest_controller.get_key(credenciales['id'])
|
|
if key_bytes is None:
|
|
raise Exception(
|
|
"No se pudo obtener la llave privada para firmar el COVE")
|
|
|
|
# Crear archivo temporal para la llave (requerido por cryptography)
|
|
with tempfile.NamedTemporaryFile(delete=False, mode='wb') as tmp_key_file:
|
|
tmp_key_file.write(key_bytes)
|
|
tmp_key_path = tmp_key_file.name
|
|
|
|
logger.debug(
|
|
f"Llave privada guardada temporalmente en: {tmp_key_path}")
|
|
|
|
# Generar firma usando el archivo temporal
|
|
firma = sign_chain_original(
|
|
tmp_key_path, credenciales['efirma'], cadena_original)
|
|
logger.debug("Firma generada exitosamente")
|
|
|
|
return firma, certificado, tmp_key_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error obteniendo certificado/llave o generando firma: {e}")
|
|
# Limpiar archivo temporal si existe
|
|
if 'tmp_key_path' in locals() and os.path.exists(tmp_key_path):
|
|
try:
|
|
os.remove(tmp_key_path)
|
|
except Exception as cleanup_error:
|
|
logger.warning(f"Error al limpiar archivo temporal: {cleanup_error}")
|
|
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=create_error_response(
|
|
message="Error al procesar certificado y firma",
|
|
errors=[str(e)],
|
|
metadata={
|
|
"username": username,
|
|
"has_key": bool(key_bytes) if 'key_bytes' in locals() else False,
|
|
"has_cert": bool(cer) if 'cer' in locals() else False
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
def sign_chain_original(key_path: str, password: str, cadena_original: str) -> str:
|
|
"""
|
|
Firma una cadena original usando una llave privada.
|
|
|
|
Args:
|
|
key_path: Ruta al archivo de la llave privada
|
|
password: Password de la llave privada
|
|
cadena_original: Cadena a firmar
|
|
|
|
Returns:
|
|
str: Firma en base64
|
|
|
|
Raises:
|
|
Exception: Si hay errores en el proceso de firma
|
|
"""
|
|
try:
|
|
logger.debug(f"Firmando cadena original: {cadena_original}")
|
|
|
|
with open(key_path, 'rb') as key_file:
|
|
private_key = load_der_private_key(
|
|
key_file.read(),
|
|
password=password.encode() if password else None
|
|
)
|
|
|
|
signature = private_key.sign(
|
|
cadena_original.encode('utf-8'),
|
|
padding.PKCS1v15(),
|
|
hashes.SHA256()
|
|
)
|
|
|
|
firma_b64 = base64.b64encode(signature).decode('utf-8')
|
|
logger.debug("Cadena firmada exitosamente")
|
|
|
|
return firma_b64
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error firmando cadena original: {e}")
|
|
raise Exception(f"Error en sign_chain_original: {str(e)}")
|