This commit is contained in:
2025-10-05 21:01:05 -06:00
parent 3448c723b2
commit a0055de043
2 changed files with 124 additions and 106 deletions

View File

@@ -18,36 +18,39 @@ from .controllers import coves_vu_controller, coves_rest_controller
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Logica de negocio para consumir el servicio SOAP de VUCEM y procesar la respuesta # Logica de negocio para consumir el servicio SOAP de VUCEM y procesar la respuesta
async def consume_ws_get_cove(**kwargs): async def consume_ws_get_cove(**kwargs):
""" """
Consume el servicio SOAP para obtener un COVE y procesar la respuesta. Consume el servicio SOAP para obtener un COVE y procesar la respuesta.
Args: Args:
**kwargs: Debe contener 'credencial', 'pedimento' y 'cove' **kwargs: Debe contener 'credencial', 'pedimento' y 'cove'
Returns: Returns:
Dict serializable con 'documento' y 'cove_put_response' Dict serializable con 'documento' y 'cove_put_response'
Raises: Raises:
Exception: Si hay errores en el procesamiento Exception: Si hay errores en el procesamiento
""" """
try: try:
logger.info("Iniciando procesamiento de COVE") logger.info("Iniciando procesamiento de COVE")
credenciales = kwargs.get('credencial') credenciales = kwargs.get('credencial')
username = credenciales.get('user') username = credenciales.get('user')
pedimento_app = kwargs.get('pedimento', {}).get('pedimento_app', 'N/A') pedimento_app = kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')
cove = kwargs['cove'].get('cove', None) cove = kwargs['cove'].get('cove', None)
if not credenciales or not username or not cove: if not credenciales or not username or not cove:
raise Exception("Credenciales o COVE no proporcionados correctamente") raise Exception(
"Credenciales o COVE no proporcionados correctamente")
logger.info(f"Procesando COVE: {cove} para usuario: {username}") logger.info(f"Procesando COVE: {cove} para usuario: {username}")
# Generar cadena original y obtener firma/certificado # Generar cadena original y obtener firma/certificado
cadena_original = f"|{credenciales.get('user')}|{cove}|" cadena_original = f"|{credenciales.get('user')}|{cove}|"
firma, certificado, tmp_key_path = await fetch_sign_and_cer(cadena_original, username, credenciales) firma, certificado, tmp_key_path = await fetch_sign_and_cer(cadena_original, username, credenciales)
# Limpiar archivo temporal inmediatamente # Limpiar archivo temporal inmediatamente
try: try:
os.remove(tmp_key_path) os.remove(tmp_key_path)
@@ -71,7 +74,7 @@ async def consume_ws_get_cove(**kwargs):
logger.info("Enviando petición SOAP a VUCEM") logger.info("Enviando petición SOAP a VUCEM")
soap_response = await coves_vu_controller.make_request_async( soap_response = await coves_vu_controller.make_request_async(
"ventanilla/ConsultarEdocumentService?wsdl", "ventanilla/ConsultarEdocumentService?wsdl",
data=soap_xml, data=soap_xml,
headers=soap_headers headers=soap_headers
) )
@@ -89,9 +92,9 @@ async def consume_ws_get_cove(**kwargs):
) )
raise Exception("Error en la respuesta del servicio SOAP") raise Exception("Error en la respuesta del servicio SOAP")
logger.info("Respuesta SOAP exitosa, enviando documento") logger.info("Respuesta SOAP exitosa, enviando documento")
# Enviar documento # Enviar documento
_file_name = f"vu_COVE_{pedimento_app}_{cove}.xml" _file_name = f"vu_COVE_{pedimento_app}_{cove}.xml"
try: try:
@@ -100,18 +103,18 @@ async def consume_ws_get_cove(**kwargs):
organizacion=kwargs.get('pedimento').get('organizacion'), organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'), pedimento=kwargs.get('pedimento').get('id'),
file_name=_file_name, file_name=_file_name,
document_type=8, document_type=8,
) )
except Exception as e: except Exception as e:
logger.error(f"Error detectado en la respuesta SOAP: {str(e)}") logger.error(f"Error detectado en la respuesta SOAP: {str(e)}")
raise Exception(f"Error en la respuesta SOAP: {str(e)}") raise Exception(f"Error en la respuesta SOAP: {str(e)}")
logger.info("Documento enviado, actualizando status de COVE") logger.info("Documento enviado, actualizando status de COVE")
# Actualizar status del COVE # Actualizar status del COVE
cove_status_response = await change_cove_status( cove_status_response = await change_cove_status(
cove=kwargs.get('cove'), cove=kwargs.get('cove'),
status=True, status=True,
pedimento=kwargs.get('pedimento') pedimento=kwargs.get('pedimento')
) )
@@ -119,12 +122,12 @@ async def consume_ws_get_cove(**kwargs):
# Asegurar que la respuesta sea serializable # Asegurar que la respuesta sea serializable
result = { result = {
"documento": document_response if document_response else None, "documento": document_response if document_response else None,
"cove_update_response": cove_status_response if cove_status_response else None "cove_update_response": cove_status_response if cove_status_response else None
} }
return result return result
except Exception as e: except Exception as e:
logger.error(f"Error procesando COVE: {str(e)}", exc_info=True) logger.error(f"Error procesando COVE: {str(e)}", exc_info=True)
# Asegurar que no se retornen datos binarios en el error # Asegurar que no se retornen datos binarios en el error
@@ -139,22 +142,18 @@ async def consume_ws_get_acuse_cove(**kwargs):
'Accept-Encoding': 'gzip,deflate', 'Accept-Encoding': 'gzip,deflate',
} }
soap_xml = coves_vu_controller.generate_acuse_template( soap_xml = coves_vu_controller.generate_acuse_template(
username=credenciales.get('user'), username=credenciales.get('user'),
password=credenciales.get('password'), password=credenciales.get('password'),
cove=kwargs['cove'].get('cove', None), cove=kwargs['cove'].get('cove', None),
) )
response = await coves_vu_controller.make_request_async( response = await coves_vu_controller.make_request_async(
"ventanilla-acuses-HA/ConsultaAcusesServiceWS?wsdl", "ventanilla-acuses-HA/ConsultaAcusesServiceWS?wsdl",
data=soap_xml, data=soap_xml,
headers=soap_headers headers=soap_headers
) )
if response is None: if response is None:
raise Exception("No se obtuvo respuesta del servicio SOAP.") raise Exception("No se obtuvo respuesta del servicio SOAP.")
@@ -163,23 +162,26 @@ async def consume_ws_get_acuse_cove(**kwargs):
if soap_error(response): if soap_error(response):
rest_response = await coves_rest_controller.post_document( rest_response = await coves_rest_controller.post_document(
soap_response=response, soap_response=response,
organizacion=kwargs.get('pedimento').get('organizacion'), organizacion=kwargs.get('pedimento').get('organizacion'),
pedimento=kwargs.get('pedimento').get('id'), pedimento=kwargs.get('pedimento').get('id'),
file_name=f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_ERROR.xml", file_name=f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_ERROR.xml",
document_type=10, document_type=10,
) )
raise Exception("Error detectado en la respuesta SOAP.") raise Exception("Error detectado en la respuesta SOAP.")
if (response) and (not soap_error(response)): if (response) and (not soap_error(response)):
logger.debug(f"Respuesta SOAP recibida, extrayendo acuse...") logger.debug(f"Respuesta SOAP recibida, extrayendo acuse...")
acuse_base64 = _extract_acuse_data(response.text) acuse_base64 = _extract_acuse_data(response.text)
if acuse_base64 is None: if acuse_base64 is None:
logger.error("No se encontró elemento acuseDocumento en la respuesta") logger.error(
logger.debug(f"Contenido de respuesta (primeros 1000 chars): {response.text}") "No se encontró elemento acuseDocumento en la respuesta")
logger.debug(
f"Contenido de respuesta (primeros 1000 chars): {response.text}")
else: else:
logger.error("Error en respuesta SOAP o soap_error detectado") logger.error("Error en respuesta SOAP o soap_error detectado")
logger.debug(f"Contenido de respuesta con error: {response.text if response else 'No response'}") logger.debug(
f"Contenido de respuesta con error: {response.text if response else 'No response'}")
if acuse_base64 is None: if acuse_base64 is None:
# Log de la respuesta SOAP para debugging # Log de la respuesta SOAP para debugging
@@ -188,21 +190,20 @@ async def consume_ws_get_acuse_cove(**kwargs):
status_code=500, status_code=500,
detail="No se pudo extraer el acuse del documento de la respuesta SOAP. Verifique el log para más detalles." detail="No se pudo extraer el acuse del documento de la respuesta SOAP. Verifique el log para más detalles."
) )
pdf_bytes = _decode_acuse_base64_content(acuse_base64) pdf_bytes = _decode_acuse_base64_content(acuse_base64)
if not pdf_bytes: if not pdf_bytes:
raise HTTPException(status_code=500, detail="No se pudo decodificar el documento del acuse") raise HTTPException(
status_code=500, detail="No se pudo decodificar el documento del acuse")
# Validar que el PDF sea válido # Validar que el PDF sea válido
if not pdf_bytes.startswith(b'%PDF'): if not pdf_bytes.startswith(b'%PDF'):
logger.warning("El contenido decodificado no parece ser un PDF válido") logger.warning("El contenido decodificado no parece ser un PDF válido")
# Mejorar el nombre del archivo usando todos los datos relevantes # Mejorar el nombre del archivo usando todos los datos relevantes
pedimento = kwargs.get('pedimento', {}) pedimento = kwargs.get('pedimento', {})
pedimento_num = pedimento.get('pedimento','') pedimento_num = pedimento.get('pedimento', '')
_file_name = _get_file_name(**kwargs) _file_name = _get_file_name(**kwargs)
# Validar que organización y pedimento no sean None # Validar que organización y pedimento no sean None
@@ -218,11 +219,11 @@ async def consume_ws_get_acuse_cove(**kwargs):
) )
acuse_status = await change_acuse_status( acuse_status = await change_acuse_status(
cove=kwargs.get('cove'), cove=kwargs.get('cove'),
status=True, status=True,
pedimento=kwargs.get('pedimento') pedimento=kwargs.get('pedimento')
) )
return { return {
"document_response": rest_response, "document_response": rest_response,
"file_name": _file_name, "file_name": _file_name,
@@ -230,43 +231,43 @@ async def consume_ws_get_acuse_cove(**kwargs):
"acuse_update": acuse_status "acuse_update": acuse_status
} }
def _decode_acuse_base64_content(base64_content): # Testeado
def _decode_acuse_base64_content(base64_content): # Testeado
""" """
Decodifica el contenido Base64 del acuse y limpia caracteres especiales. Decodifica el contenido Base64 del acuse y limpia caracteres especiales.
Args: Args:
base64_content (str): Contenido codificado en Base64 base64_content (str): Contenido codificado en Base64
Returns: Returns:
bytes: Contenido decodificado o None si hay error bytes: Contenido decodificado o None si hay error
""" """
try: try:
# Limpiar el contenido Base64 de manera exhaustiva # Limpiar el contenido Base64 de manera exhaustiva
cleaned_content = base64_content cleaned_content = base64_content
# Remover entidades HTML/XML como 
, 
, etc. # Remover entidades HTML/XML como 
, 
, etc.
cleaned_content = re.sub(r'&#x[0-9a-fA-F]+;', '', cleaned_content) cleaned_content = re.sub(r'&#x[0-9a-fA-F]+;', '', cleaned_content)
cleaned_content = re.sub(r'&#[0-9]+;', '', cleaned_content) cleaned_content = re.sub(r'&#[0-9]+;', '', cleaned_content)
# Remover espacios en blanco, saltos de línea, etc. # Remover espacios en blanco, saltos de línea, etc.
cleaned_content = re.sub(r'[\s\n\r\t]', '', cleaned_content) cleaned_content = re.sub(r'[\s\n\r\t]', '', cleaned_content)
# Remover caracteres no válidos para Base64 # Remover caracteres no válidos para Base64
cleaned_content = re.sub(r'[^A-Za-z0-9+/=]', '', cleaned_content) cleaned_content = re.sub(r'[^A-Za-z0-9+/=]', '', cleaned_content)
# Agregar padding si es necesario # Agregar padding si es necesario
missing_padding = len(cleaned_content) % 4 missing_padding = len(cleaned_content) % 4
if missing_padding: if missing_padding:
cleaned_content += '=' * (4 - missing_padding) cleaned_content += '=' * (4 - missing_padding)
# Decodificar Base64 # Decodificar Base64
decoded_content = base64.b64decode(cleaned_content) decoded_content = base64.b64decode(cleaned_content)
return decoded_content return decoded_content
except Exception as e: except Exception as e:
# Intentar con validación estricta deshabilitada # Intentar con validación estricta deshabilitada
try: try:
decoded_content = base64.b64decode(cleaned_content, validate=False) decoded_content = base64.b64decode(cleaned_content, validate=False)
@@ -274,42 +275,44 @@ def _decode_acuse_base64_content(base64_content): # Testeado
except Exception as e2: except Exception as e2:
return None return None
def _extract_acuse_data(soap_response_text: str) -> str: def _extract_acuse_data(soap_response_text: str) -> str:
""" """
Extrae el contenido base64 del acuse desde la respuesta SOAP. Extrae el contenido base64 del acuse desde la respuesta SOAP.
Args: Args:
soap_response_text: Texto completo de la respuesta SOAP soap_response_text: Texto completo de la respuesta SOAP
Returns: Returns:
str: Contenido base64 del acuse o None si no se encuentra str: Contenido base64 del acuse o None si no se encuentra
""" """
try: try:
logger.debug("Iniciando extracción de datos del acuse") logger.debug("Iniciando extracción de datos del acuse")
# Primero, extraer la parte XML del contenido multipart # Primero, extraer la parte XML del contenido multipart
xml_start = soap_response_text.find('<?xml') xml_start = soap_response_text.find('<?xml')
if xml_start == -1: if xml_start == -1:
logger.error("No se encontró inicio de XML en la respuesta") logger.error("No se encontró inicio de XML en la respuesta")
return None return None
# Extraer solo la parte XML # Extraer solo la parte XML
xml_content = soap_response_text[xml_start:] xml_content = soap_response_text[xml_start:]
# Si hay más contenido multipart después, cortarlo # Si hay más contenido multipart después, cortarlo
boundary_end = xml_content.find('--uuid:') boundary_end = xml_content.find('--uuid:')
if boundary_end != -1: if boundary_end != -1:
xml_content = xml_content[:boundary_end] xml_content = xml_content[:boundary_end]
logger.debug("XML extraído, parseando contenido...") logger.debug("XML extraído, parseando contenido...")
# Parsear el XML # Parsear el XML
root = ET.fromstring(xml_content.strip()) root = ET.fromstring(xml_content.strip())
# Log de la estructura XML para debugging # Log de la estructura XML para debugging
logger.debug(f"Elemento raíz: {root.tag}") logger.debug(f"Elemento raíz: {root.tag}")
logger.debug(f"Namespaces encontrados: {root.nsmap if hasattr(root, 'nsmap') else 'No disponible'}") logger.debug(
f"Namespaces encontrados: {root.nsmap if hasattr(root, 'nsmap') else 'No disponible'}")
# Buscar el elemento acuseDocumento con diferentes estrategias # Buscar el elemento acuseDocumento con diferentes estrategias
namespaces = { namespaces = {
'S': 'http://schemas.xmlsoap.org/soap/envelope/', 'S': 'http://schemas.xmlsoap.org/soap/envelope/',
@@ -317,18 +320,20 @@ def _extract_acuse_data(soap_response_text: str) -> str:
'soap': 'http://schemas.xmlsoap.org/soap/envelope/', 'soap': 'http://schemas.xmlsoap.org/soap/envelope/',
'ns1': 'http://www.ventanillaunica.gob.mx/ws/consulta/acuses/' 'ns1': 'http://www.ventanillaunica.gob.mx/ws/consulta/acuses/'
} }
# Estrategia 1: Con namespace ns3 # Estrategia 1: Con namespace ns3
acuse_elemento = root.find('.//ns3:responseConsultaAcuses/acuseDocumento', namespaces) acuse_elemento = root.find(
'.//ns3:responseConsultaAcuses/acuseDocumento', namespaces)
if acuse_elemento is None: if acuse_elemento is None:
# Estrategia 2: Con namespace ns1 # Estrategia 2: Con namespace ns1
acuse_elemento = root.find('.//ns1:responseConsultaAcuses/acuseDocumento', namespaces) acuse_elemento = root.find(
'.//ns1:responseConsultaAcuses/acuseDocumento', namespaces)
if acuse_elemento is None: if acuse_elemento is None:
# Estrategia 3: Sin namespace específico # Estrategia 3: Sin namespace específico
acuse_elemento = root.find('.//acuseDocumento') acuse_elemento = root.find('.//acuseDocumento')
if acuse_elemento is None: if acuse_elemento is None:
# Estrategia 4: Buscar cualquier elemento que contenga "acuse" o "documento" # Estrategia 4: Buscar cualquier elemento que contenga "acuse" o "documento"
for elem in root.iter(): for elem in root.iter():
@@ -338,31 +343,37 @@ def _extract_acuse_data(soap_response_text: str) -> str:
elif elem.tag.endswith('acuseDocumento'): elif elem.tag.endswith('acuseDocumento'):
acuse_elemento = elem acuse_elemento = elem
break break
if acuse_elemento is None: if acuse_elemento is None:
# Log de todos los elementos para debugging # Log de todos los elementos para debugging
logger.error("No se encontró elemento acuseDocumento. Elementos disponibles:") logger.error(
"No se encontró elemento acuseDocumento. Elementos disponibles:")
for elem in root.iter(): for elem in root.iter():
logger.error(f" - {elem.tag}: {elem.text[:50] if elem.text else 'Sin contenido'}...") logger.error(
f" - {elem.tag}: {elem.text[:50] if elem.text else 'Sin contenido'}...")
if 'acuse' in elem.tag.lower(): if 'acuse' in elem.tag.lower():
logger.error(f"Elemento similar encontrado: {elem.tag}") logger.error(f"Elemento similar encontrado: {elem.tag}")
return None return None
if acuse_elemento is not None and acuse_elemento.text: if acuse_elemento is not None and acuse_elemento.text:
logger.debug(f"Acuse encontrado, longitud: {len(acuse_elemento.text)} caracteres") logger.debug(
f"Acuse encontrado, longitud: {len(acuse_elemento.text)} caracteres")
return acuse_elemento.text.strip() return acuse_elemento.text.strip()
else: else:
logger.error("Elemento acuseDocumento encontrado pero sin contenido de texto") logger.error(
"Elemento acuseDocumento encontrado pero sin contenido de texto")
return None return None
except ET.ParseError as e: except ET.ParseError as e:
logger.error(f"Error parseando XML: {e}") logger.error(f"Error parseando XML: {e}")
logger.debug(f"Contenido XML problemático: {xml_content[:500] if 'xml_content' in locals() else 'No disponible'}") logger.debug(
f"Contenido XML problemático: {xml_content[:500] if 'xml_content' in locals() else 'No disponible'}")
return None return None
except Exception as e: except Exception as e:
logger.error(f"Error general extrayendo acuse: {e}") logger.error(f"Error general extrayendo acuse: {e}")
return None return None
def _get_file_name(**kwargs) -> dict: def _get_file_name(**kwargs) -> dict:
pedimento = kwargs.get('pedimento', {}) pedimento = kwargs.get('pedimento', {})
pedimento_app = pedimento.get('pedimento_app', 'N/A') pedimento_app = pedimento.get('pedimento_app', 'N/A')
@@ -371,7 +382,6 @@ def _get_file_name(**kwargs) -> dict:
return _file_name return _file_name
async def change_cove_status(cove: dict, status: bool, pedimento: dict): async def change_cove_status(cove: dict, status: bool, pedimento: dict):
data = { data = {
"id": cove.get("id"), "id": cove.get("id"),
@@ -385,6 +395,7 @@ async def change_cove_status(cove: dict, status: bool, pedimento: dict):
return response return response
async def change_acuse_status(cove: dict, status: bool, pedimento: dict): async def change_acuse_status(cove: dict, status: bool, pedimento: dict):
data = { data = {
"id": cove.get("id"), "id": cove.get("id"),
@@ -399,29 +410,31 @@ async def change_acuse_status(cove: dict, status: bool, pedimento: dict):
return response return response
async def fetch_sign_and_cer(cadena_original: str, username: str, credenciales: dict): async def fetch_sign_and_cer(cadena_original: str, username: str, credenciales: dict):
""" """
Obtiene certificado y llave, genera la firma para la cadena original. Obtiene certificado y llave, genera la firma para la cadena original.
Args: Args:
cadena_original: Cadena a firmar cadena_original: Cadena a firmar
username: Usuario de VUCEM username: Usuario de VUCEM
credenciales: Diccionario con credenciales credenciales: Diccionario con credenciales
Returns: Returns:
tuple: (firma_base64, certificado_base64, ruta_archivo_temporal) tuple: (firma_base64, certificado_base64, ruta_archivo_temporal)
Raises: Raises:
Exception: Si no se pueden obtener los certificados o generar la firma Exception: Si no se pueden obtener los certificados o generar la firma
""" """
try: try:
logger.debug("Obteniendo certificado desde API") logger.debug("Obteniendo certificado desde API")
# Obtener certificado como bytes # Obtener certificado como bytes
cer = await coves_rest_controller.get_cer(credenciales['id']) cer = await coves_rest_controller.get_cer(credenciales['id'])
if cer is None: if cer is None:
raise Exception("No se pudo obtener el certificado para firmar el COVE") raise Exception(
"No se pudo obtener el certificado para firmar el COVE")
# Convertir certificado a base64 string # Convertir certificado a base64 string
certificado = base64.b64encode(cer).decode('utf-8') certificado = base64.b64encode(cer).decode('utf-8')
logger.debug("Certificado obtenido y codificado exitosamente") logger.debug("Certificado obtenido y codificado exitosamente")
@@ -430,23 +443,27 @@ async def fetch_sign_and_cer(cadena_original: str, username: str, credenciales:
logger.debug("Obteniendo llave privada desde API") logger.debug("Obteniendo llave privada desde API")
key_bytes = await coves_rest_controller.get_key(credenciales['id']) key_bytes = await coves_rest_controller.get_key(credenciales['id'])
if key_bytes is None: if key_bytes is None:
raise Exception("No se pudo obtener la llave privada para firmar el COVE") raise Exception(
"No se pudo obtener la llave privada para firmar el COVE")
# Crear archivo temporal para la llave (requerido por cryptography) # Crear archivo temporal para la llave (requerido por cryptography)
with tempfile.NamedTemporaryFile(delete=False, mode='wb') as tmp_key_file: with tempfile.NamedTemporaryFile(delete=False, mode='wb') as tmp_key_file:
tmp_key_file.write(key_bytes) tmp_key_file.write(key_bytes)
tmp_key_path = tmp_key_file.name tmp_key_path = tmp_key_file.name
logger.debug(f"Llave privada guardada temporalmente en: {tmp_key_path}") logger.debug(
f"Llave privada guardada temporalmente en: {tmp_key_path}")
# Generar firma usando el archivo temporal # Generar firma usando el archivo temporal
firma = sign_chain_original(tmp_key_path, credenciales['efirma'], cadena_original) firma = sign_chain_original(
tmp_key_path, credenciales['efirma'], cadena_original)
logger.debug("Firma generada exitosamente") logger.debug("Firma generada exitosamente")
return firma, certificado, tmp_key_path return firma, certificado, tmp_key_path
except Exception as e: except Exception as e:
logger.error(f"Error obteniendo certificado/llave o generando firma: {e}") logger.error(
f"Error obteniendo certificado/llave o generando firma: {e}")
# Limpiar archivo temporal si existe # Limpiar archivo temporal si existe
if 'tmp_key_path' in locals() and os.path.exists(tmp_key_path): if 'tmp_key_path' in locals() and os.path.exists(tmp_key_path):
try: try:
@@ -459,21 +476,21 @@ async def fetch_sign_and_cer(cadena_original: str, username: str, credenciales:
def sign_chain_original(key_path: str, password: str, cadena_original: str) -> str: def sign_chain_original(key_path: str, password: str, cadena_original: str) -> str:
""" """
Firma una cadena original usando una llave privada. Firma una cadena original usando una llave privada.
Args: Args:
key_path: Ruta al archivo de la llave privada key_path: Ruta al archivo de la llave privada
password: Password de la llave privada password: Password de la llave privada
cadena_original: Cadena a firmar cadena_original: Cadena a firmar
Returns: Returns:
str: Firma en base64 str: Firma en base64
Raises: Raises:
Exception: Si hay errores en el proceso de firma Exception: Si hay errores en el proceso de firma
""" """
try: try:
logger.debug(f"Firmando cadena original: {cadena_original}") logger.debug(f"Firmando cadena original: {cadena_original}")
with open(key_path, 'rb') as key_file: with open(key_path, 'rb') as key_file:
private_key = load_der_private_key( private_key = load_der_private_key(
key_file.read(), key_file.read(),
@@ -485,13 +502,12 @@ def sign_chain_original(key_path: str, password: str, cadena_original: str) -> s
padding.PKCS1v15(), padding.PKCS1v15(),
hashes.SHA256() hashes.SHA256()
) )
firma_b64 = base64.b64encode(signature).decode('utf-8') firma_b64 = base64.b64encode(signature).decode('utf-8')
logger.debug("Cadena firmada exitosamente") logger.debug("Cadena firmada exitosamente")
return firma_b64 return firma_b64
except Exception as e: except Exception as e:
logger.error(f"Error firmando cadena original: {e}") logger.error(f"Error firmando cadena original: {e}")
raise Exception(f"Error en sign_chain_original: {str(e)}") raise Exception(f"Error en sign_chain_original: {str(e)}")

View File

@@ -165,12 +165,7 @@ async def put_pedimento_data(**kwargs) -> Dict[str, Any]:
logger.error(f"Error inesperado al consumir servicio web: {e}") logger.error(f"Error inesperado al consumir servicio web: {e}")
raise HTTPException(status_code=500, detail=f"Error al obtener datos del pedimento: {str(e)}") raise HTTPException(status_code=500, detail=f"Error al obtener datos del pedimento: {str(e)}")
# Actualizar información del pedimento (crítico)
try:
result["pedimento_actualizado"] = await _update_pedimento_info(kwargs, xml_content)
except Exception as e:
logger.error(f"Error crítico al actualizar pedimento: {e}")
raise HTTPException(status_code=500, detail=f"Error al actualizar el pedimento: {str(e)}")
# Procesar COVEs (no crítico) # Procesar COVEs (no crítico)
try: try:
@@ -185,6 +180,13 @@ async def put_pedimento_data(**kwargs) -> Dict[str, Any]:
except Exception as e: except Exception as e:
logger.warning(f"Error al procesar documentos digitalizados: {e}") logger.warning(f"Error al procesar documentos digitalizados: {e}")
result["edocuments_error"] = str(e) result["edocuments_error"] = str(e)
# Actualizar información del pedimento (crítico)
try:
result["pedimento_actualizado"] = await _update_pedimento_info(kwargs, xml_content)
except Exception as e:
logger.error(f"Error crítico al actualizar pedimento: {e}")
raise HTTPException(status_code=500, detail=f"Error al actualizar el pedimento: {str(e)}")
logger.info("Procesamiento de pedimento completo finalizado") logger.info("Procesamiento de pedimento completo finalizado")
return result return result