import base64 import os import logging import re import tempfile from typing import Any, Dict, List, Optional import xml.etree.ElementTree as ET from fastapi import HTTPException from controllers.RESTController import rest_controller from controllers.SOAPController import soap_controller from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.serialization import load_der_private_key from utils.helpers import soap_error from .controllers import coves_vu_controller, coves_rest_controller from ..common import create_service_response, create_error_response # Logger para el módulo logger = logging.getLogger(__name__) # Logica de negocio para consumir el servicio SOAP de VUCEM y procesar la respuesta async def consume_ws_get_cove(**kwargs): """ Consume el servicio SOAP para obtener un COVE y procesar la respuesta. Args: **kwargs: Debe contener 'credencial', 'pedimento' y 'cove' Returns: Dict serializable con 'documento' y 'cove_put_response' Raises: Exception: Si hay errores en el procesamiento """ try: logger.info("Iniciando procesamiento de COVE") credenciales = kwargs.get('credencial') username = credenciales.get('user') pedimento_app = kwargs.get('pedimento', {}).get('pedimento_app', 'N/A') cove = kwargs['cove'].get('cove', None) if not credenciales or not username or not cove: missing = [] if not credenciales: missing.append("credenciales") if not username: missing.append("nombre de usuario") if not cove: missing.append("número de COVE") raise HTTPException( status_code=400, detail=create_error_response( message="Datos incompletos para procesar COVE", errors=[f"Falta: {', '.join(missing)}"], metadata={"provided_data": { "has_credentials": bool(credenciales), "has_username": bool(username), "has_cove": bool(cove) }} ) ) logger.info(f"Procesando COVE: {cove} para usuario: {username}") # Generar cadena original y obtener firma/certificado cadena_original = f"|{credenciales.get('user')}|{cove}|" firma, certificado, tmp_key_path = await fetch_sign_and_cer(cadena_original, username, credenciales) # Limpiar archivo temporal inmediatamente try: os.remove(tmp_key_path) logger.debug("Archivo temporal de llave eliminado") except Exception as e: logger.warning(f"Error al eliminar archivo temporal: {e}") # Generar template SOAP soap_xml = coves_vu_controller.generate_cove_template( username=username, password=credenciales['password'], certificado=certificado, firma=firma, cove=cove, ) # Enviar documento de request a EFC try: file_name_request = f"vu_COVE_{pedimento_app}_{cove}_REQUEST.xml" document_response = await coves_rest_controller.post_document( soap_response=soap_xml, organizacion=kwargs.get('pedimento').get('organizacion'), pedimento=kwargs.get('pedimento').get('id'), file_name=file_name_request, document_type=19, ) except Exception as e: logger.error(f"Error al enviar documento request: {e}") soap_headers = { 'Content-Type': 'text/xml; charset=utf-8', 'SOAPAction': '', } logger.info("Enviando petición SOAP a VUCEM") soap_response = await coves_vu_controller.make_request_async( "ventanilla/ConsultarEdocumentService?wsdl", data=soap_xml, headers=soap_headers ) if not soap_response: raise Exception("No se recibió respuesta del servicio SOAP") if soap_error(soap_response): document_response = await coves_rest_controller.post_document( soap_response=soap_response, organizacion=kwargs.get('pedimento').get('organizacion'), pedimento=kwargs.get('pedimento').get('id'), file_name=f"vu_COVE_{pedimento_app}_{cove}_ERROR.xml", document_type=20, ) raise Exception("Error en la respuesta del servicio SOAP") logger.info("Respuesta SOAP exitosa, enviando documento") # Enviar documento _file_name = f"vu_COVE_{pedimento_app}_{cove}.xml" try: document_response = await coves_rest_controller.post_document( soap_response=soap_response, organizacion=kwargs.get('pedimento').get('organizacion'), pedimento=kwargs.get('pedimento').get('id'), file_name=_file_name, document_type=8, ) except Exception as e: logger.error(f"Error detectado en la respuesta SOAP: {str(e)}") raise Exception(f"Error en la respuesta SOAP: {str(e)}") logger.info("Documento enviado, actualizando status de COVE") # Actualizar status del COVE cove_status_response = await change_cove_status( cove=kwargs.get('cove'), status=True, pedimento=kwargs.get('pedimento') ) logger.info(f"COVE {cove} procesado exitosamente") return create_service_response( message=f"COVE {cove} procesado exitosamente", data={ "documento": document_response if document_response else None, "cove_update_response": cove_status_response if cove_status_response else None }, metadata={ "cove_number": cove, "file_name": _file_name, "document_type": 8, "pedimento_app": pedimento_app, "username": username, "organizacion": kwargs.get('pedimento', {}).get('organizacion') } ) except HTTPException as he: raise he except Exception as e: logger.error(f"Error procesando COVE: {str(e)}", exc_info=True) raise HTTPException( status_code=500, detail=create_error_response( message="Error interno al procesar COVE", errors=[str(e)], metadata={ "cove_number": cove, "username": username, "pedimento_app": pedimento_app } ) ) async def consume_ws_get_acuse_cove(**kwargs): credenciales = kwargs.get('credencial') soap_headers = { 'Content-Type': 'text/xml; charset=utf-8', 'SOAPAction': 'http://www.ventanillaunica.gob.mx/ventanilla/ConsultaAcusesService/consultarAcuseCove', 'Accept-Encoding': 'gzip,deflate', } soap_xml = coves_vu_controller.generate_acuse_template( username=credenciales.get('user'), password=credenciales.get('password'), cove=kwargs['cove'].get('cove', None), ) # Enviar documento de request a EFC try: file_name_request = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_REQUEST.xml" document_response = await coves_rest_controller.post_document( soap_response=soap_xml, organizacion=kwargs.get('pedimento').get('organizacion'), pedimento=kwargs.get('pedimento').get('id'), file_name=file_name_request, document_type=23, ) except Exception as e: logger.error(f"Error al enviar documento request de acuse cove: {e}") response = await coves_vu_controller.make_request_async( "ventanilla-acuses-HA/ConsultaAcusesServiceWS?wsdl", data=soap_xml, headers=soap_headers ) if response is None: raise HTTPException( status_code=500, detail=create_error_response( message="Error al contactar el servicio SOAP", errors=["No se obtuvo respuesta del servicio SOAP"], metadata={ "cove_number": kwargs['cove'].get('cove'), "pedimento_app": kwargs.get('pedimento', {}).get('pedimento_app') } ) ) if response.status_code != 200: raise HTTPException( status_code=response.status_code, detail=create_error_response( message="Error en la solicitud SOAP", errors=[f"Código de estado: {response.status_code}"], data={"soap_response": response.text[:500]}, metadata={ "status_code": response.status_code, "cove_number": kwargs['cove'].get('cove') } ) ) if soap_error(response): error_file_name = f"vu_AC_COVE_{kwargs.get('pedimento', {}).get('pedimento_app', 'N/A')}_{kwargs['cove'].get('cove', 'N/A')}_ERROR.xml" try: rest_response = await coves_rest_controller.post_document( soap_response=response, organizacion=kwargs.get('pedimento').get('organizacion'), pedimento=kwargs.get('pedimento').get('id'), file_name=error_file_name, document_type=24, ) except Exception as e: logger.error(f"Error al guardar respuesta SOAP errónea: {e}") raise HTTPException( status_code=500, detail=create_error_response( message="Error en la respuesta del servicio SOAP", errors=["Se detectó un error en la respuesta SOAP"], data={"error_file": error_file_name} if 'rest_response' in locals() else None, metadata={ "cove_number": kwargs['cove'].get('cove'), "document_type": 10 } ) ) if (response) and (not soap_error(response)): logger.debug(f"Respuesta SOAP recibida, extrayendo acuse...") acuse_base64 = _extract_acuse_data(response.text) if acuse_base64 is None: logger.error( "No se encontró elemento acuseDocumento en la respuesta") logger.debug( f"Contenido de respuesta (primeros 1000 chars): {response.text}") else: logger.error("Error en respuesta SOAP o soap_error detectado") logger.debug( f"Contenido de respuesta con error: {response.text if response else 'No response'}") if acuse_base64 is None: # Log de la respuesta SOAP para debugging logger.error(f"Contenido de respuesta SOAP: {response.text}") raise HTTPException( status_code=500, detail="No se pudo extraer el acuse del documento de la respuesta SOAP. Verifique el log para más detalles." ) pdf_bytes = _decode_acuse_base64_content(acuse_base64) if not pdf_bytes: raise HTTPException( status_code=500, detail="No se pudo decodificar el documento del acuse") # Validar que el PDF sea válido if not pdf_bytes.startswith(b'%PDF'): logger.warning("El contenido decodificado no parece ser un PDF válido") # Mejorar el nombre del archivo usando todos los datos relevantes pedimento = kwargs.get('pedimento', {}) pedimento_num = pedimento.get('pedimento', '') _file_name = _get_file_name(**kwargs) # Validar que organización y pedimento no sean None organizacion = pedimento.get("organizacion", None) pedimento_id = pedimento.get("id", None) rest_response = await coves_rest_controller.post_document( binary_content=pdf_bytes, organizacion=organizacion, pedimento=pedimento_id, file_name=_file_name, document_type=7 ) acuse_status = await change_acuse_status( cove=kwargs.get('cove'), status=True, pedimento=kwargs.get('pedimento') ) return create_service_response( message="Acuse de COVE procesado exitosamente", data={ "document_response": rest_response, "file_name": _file_name, "pedimento": pedimento_num, "acuse_update": acuse_status }, metadata={ "document_type": 7, "pedimento_app": pedimento.get('pedimento_app'), "organizacion": organizacion, "cove_number": kwargs['cove'].get('cove'), "content_type": "application/pdf" } ) def _decode_acuse_base64_content(base64_content): # Testeado """ Decodifica el contenido Base64 del acuse y limpia caracteres especiales. Args: base64_content (str): Contenido codificado en Base64 Returns: bytes: Contenido decodificado o None si hay error """ try: # Limpiar el contenido Base64 de manera exhaustiva cleaned_content = base64_content # Remover entidades HTML/XML como , , etc. cleaned_content = re.sub(r'&#x[0-9a-fA-F]+;', '', cleaned_content) cleaned_content = re.sub(r'&#[0-9]+;', '', cleaned_content) # Remover espacios en blanco, saltos de línea, etc. cleaned_content = re.sub(r'[\s\n\r\t]', '', cleaned_content) # Remover caracteres no válidos para Base64 cleaned_content = re.sub(r'[^A-Za-z0-9+/=]', '', cleaned_content) # Agregar padding si es necesario missing_padding = len(cleaned_content) % 4 if missing_padding: cleaned_content += '=' * (4 - missing_padding) # Decodificar Base64 decoded_content = base64.b64decode(cleaned_content) return decoded_content except Exception as e: # Intentar con validación estricta deshabilitada try: decoded_content = base64.b64decode(cleaned_content, validate=False) return decoded_content except Exception as e2: return None def _extract_acuse_data(soap_response_text: str) -> str: """ Extrae el contenido base64 del acuse desde la respuesta SOAP. Args: soap_response_text: Texto completo de la respuesta SOAP Returns: str: Contenido base64 del acuse o None si no se encuentra """ try: logger.debug("Iniciando extracción de datos del acuse") # Primero, extraer la parte XML del contenido multipart xml_start = soap_response_text.find(' dict: pedimento = kwargs.get('pedimento', {}) pedimento_app = pedimento.get('pedimento_app', 'N/A') cove = kwargs['cove'].get('cove', 'N/A') _file_name = f"vu_AC_COVE_{pedimento_app}_{cove}.pdf" return _file_name async def change_cove_status(cove: dict, status: bool, pedimento: dict): data = { "id": cove.get("id"), "cove_descargado": status, "numero_cove": cove.get("cove"), "pedimento": pedimento.get("id"), "organizacion": pedimento.get("organizacion"), } response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data) return response async def change_acuse_status(cove: dict, status: bool, pedimento: dict): data = { "id": cove.get("id"), "acuse_cove_descargado": status, "numero_cove": cove.get("cove"), "pedimento": pedimento.get("id"), "organizacion": pedimento.get("organizacion"), } print(data) response = await coves_rest_controller.put_cove_data(cove_id=cove.get("id"), data=data) return response async def fetch_sign_and_cer(cadena_original: str, username: str, credenciales: dict): """ Obtiene certificado y llave, genera la firma para la cadena original. Args: cadena_original: Cadena a firmar username: Usuario de VUCEM credenciales: Diccionario con credenciales Returns: tuple: (firma_base64, certificado_base64, ruta_archivo_temporal) Raises: Exception: Si no se pueden obtener los certificados o generar la firma """ try: logger.debug("Obteniendo certificado desde API") # Obtener certificado como bytes cer = await coves_rest_controller.get_cer(credenciales['id']) if cer is None: raise Exception( "No se pudo obtener el certificado para firmar el COVE") # Convertir certificado a base64 string certificado = base64.b64encode(cer).decode('utf-8') logger.debug("Certificado obtenido y codificado exitosamente") # Obtener llave privada como bytes logger.debug("Obteniendo llave privada desde API") key_bytes = await coves_rest_controller.get_key(credenciales['id']) if key_bytes is None: raise Exception( "No se pudo obtener la llave privada para firmar el COVE") # Crear archivo temporal para la llave (requerido por cryptography) with tempfile.NamedTemporaryFile(delete=False, mode='wb') as tmp_key_file: tmp_key_file.write(key_bytes) tmp_key_path = tmp_key_file.name logger.debug( f"Llave privada guardada temporalmente en: {tmp_key_path}") # Generar firma usando el archivo temporal firma = sign_chain_original( tmp_key_path, credenciales['efirma'], cadena_original) logger.debug("Firma generada exitosamente") return firma, certificado, tmp_key_path except Exception as e: logger.error(f"Error obteniendo certificado/llave o generando firma: {e}") # Limpiar archivo temporal si existe if 'tmp_key_path' in locals() and os.path.exists(tmp_key_path): try: os.remove(tmp_key_path) except Exception as cleanup_error: logger.warning(f"Error al limpiar archivo temporal: {cleanup_error}") raise HTTPException( status_code=500, detail=create_error_response( message="Error al procesar certificado y firma", errors=[str(e)], metadata={ "username": username, "has_key": bool(key_bytes) if 'key_bytes' in locals() else False, "has_cert": bool(cer) if 'cer' in locals() else False } ) ) def sign_chain_original(key_path: str, password: str, cadena_original: str) -> str: """ Firma una cadena original usando una llave privada. Args: key_path: Ruta al archivo de la llave privada password: Password de la llave privada cadena_original: Cadena a firmar Returns: str: Firma en base64 Raises: Exception: Si hay errores en el proceso de firma """ try: logger.debug(f"Firmando cadena original: {cadena_original}") with open(key_path, 'rb') as key_file: private_key = load_der_private_key( key_file.read(), password=password.encode() if password else None ) signature = private_key.sign( cadena_original.encode('utf-8'), padding.PKCS1v15(), hashes.SHA256() ) firma_b64 = base64.b64encode(signature).decode('utf-8') logger.debug("Cadena firmada exitosamente") return firma_b64 except Exception as e: logger.error(f"Error firmando cadena original: {e}") raise Exception(f"Error en sign_chain_original: {str(e)}")