135 lines
4.9 KiB
Python
135 lines
4.9 KiB
Python
from controllers.RESTController import APIRESTController
|
|
from controllers.SOAPController import VUCEMController
|
|
from typing import List, Dict, Any
|
|
import xml.etree.ElementTree as ET
|
|
from dataclasses import dataclass
|
|
from typing import List, Dict
|
|
|
|
class RemesaController(APIRESTController):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
async def post_cove(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Método para enviar un número de COVE a la API.
|
|
|
|
Args:
|
|
data: Diccionario con los datos del COVE a enviar
|
|
"""
|
|
return await self._make_request_async('POST', 'customs/coves/', data=data)
|
|
|
|
|
|
class RemesaVUController(VUCEMController):
|
|
def __init__(self):
|
|
super().__init__() # Implementación específica para Coves VU
|
|
|
|
def generate_remesas_template(self, username: str, password: str, aduana: str, patente: str, numero_operacion: str, pedimento: str) -> str:
|
|
"""
|
|
Genera el template SOAP para consultar remesas
|
|
|
|
Args:
|
|
username: Usuario de VUCEM
|
|
password: Contraseña de VUCEM
|
|
aduana: Código de aduana
|
|
patente: Número de patente
|
|
|
|
Returns:
|
|
str: Template SOAP XML completo
|
|
"""
|
|
soap_template = f'''
|
|
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
|
|
xmlns:con="http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarremesas"
|
|
xmlns:com="http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/comunes">
|
|
<soapenv:Header>
|
|
<wsse:Security soapenv:mustUnderstand="1"
|
|
xmlns:wsse="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd">
|
|
<wsse:UsernameToken>
|
|
<wsse:Username>{username}</wsse:Username>
|
|
<wsse:Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordText">{password}</wsse:Password>
|
|
</wsse:UsernameToken>
|
|
</wsse:Security>
|
|
</soapenv:Header>
|
|
<soapenv:Body>
|
|
<con:consultarRemesasPeticion>
|
|
<con:numeroOperacion>{numero_operacion}</con:numeroOperacion>
|
|
<con:peticion>
|
|
<com:aduana>{aduana}</com:aduana>
|
|
<com:patente>{patente}</com:patente>
|
|
<com:pedimento>{pedimento}</com:pedimento>
|
|
</con:peticion>
|
|
</con:consultarRemesasPeticion>
|
|
</soapenv:Body>
|
|
</soapenv:Envelope>'''
|
|
return soap_template
|
|
|
|
|
|
# Pedimento Completo
|
|
class RemesaXMLScraper:
|
|
"""
|
|
Controlador para scrapear XML de consultar remesas.
|
|
Extrae todos los comprobantesVE, junto con remesaAgente y remesaSA.
|
|
"""
|
|
|
|
namespaces = {
|
|
"S": "http://schemas.xmlsoap.org/soap/envelope/",
|
|
"ns2": "http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta",
|
|
"ns3": "http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarremesas",
|
|
}
|
|
|
|
def extract_remesas(self, xml_content: str) -> List[Dict[str, str]]:
|
|
"""
|
|
Extrae todos los comprobanteVE de un XML de remesas.
|
|
|
|
Args:
|
|
xml_content: Contenido del XML en string.
|
|
|
|
Returns:
|
|
Lista de diccionarios con comprobanteVE, remesaAgente y remesaSA.
|
|
"""
|
|
try:
|
|
root = ET.fromstring(xml_content)
|
|
|
|
remesas = []
|
|
for remesa in root.findall(".//ns3:remesas", self.namespaces):
|
|
comprobante = remesa.find("ns3:comprobanteVE", self.namespaces)
|
|
agente = remesa.find("ns3:remesaAgente", self.namespaces)
|
|
sa = remesa.find("ns3:remesaSA", self.namespaces)
|
|
|
|
remesas.append({
|
|
"comprobanteVE": comprobante.text if comprobante is not None else None,
|
|
"remesaAgente": agente.text if agente is not None else None,
|
|
"remesaSA": sa.text if sa is not None else None
|
|
})
|
|
|
|
return remesas
|
|
|
|
except ET.ParseError as e:
|
|
print(f"Error al parsear XML: {e}")
|
|
return []
|
|
except Exception as e:
|
|
print(f"Error inesperado: {e}")
|
|
return []
|
|
|
|
def extract_data(self, xml_content: str) -> Dict[str, Any]:
|
|
"""
|
|
Método de compatibilidad que llama a extract_remesas y devuelve un dict.
|
|
|
|
Args:
|
|
xml_content: Contenido del XML en string.
|
|
|
|
Returns:
|
|
Dict con los datos extraídos del XML.
|
|
"""
|
|
remesas_data = self.extract_remesas(xml_content)
|
|
return {
|
|
'coves': remesas_data,
|
|
'total_remesas': len(remesas_data)
|
|
}
|
|
|
|
|
|
remesa_rest_controller = RemesaController()
|
|
remesa_vu_controller = RemesaVUController()
|
|
remesa_xml_scraper = RemesaXMLScraper()
|
|
|
|
|