Se agregaron los moduloes de api_v2
This commit is contained in:
134
api/api_v2/modules/remesas/controllers.py
Normal file
134
api/api_v2/modules/remesas/controllers.py
Normal file
@@ -0,0 +1,134 @@
|
||||
from controllers.RESTController import APIRESTController
|
||||
from controllers.SOAPController import VUCEMController
|
||||
from typing import List, Dict, Any
|
||||
import xml.etree.ElementTree as ET
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Dict
|
||||
|
||||
class RemesaController(APIRESTController):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
async def post_cove(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Método para enviar un número de COVE a la API.
|
||||
|
||||
Args:
|
||||
data: Diccionario con los datos del COVE a enviar
|
||||
"""
|
||||
return await self._make_request_async('POST', 'customs/coves/', data=data)
|
||||
|
||||
|
||||
class RemesaVUController(VUCEMController):
|
||||
def __init__(self):
|
||||
super().__init__() # Implementación específica para Coves VU
|
||||
|
||||
def generate_remesas_template(self, username: str, password: str, aduana: str, patente: str, numero_operacion: str, pedimento: str) -> str:
|
||||
"""
|
||||
Genera el template SOAP para consultar remesas
|
||||
|
||||
Args:
|
||||
username: Usuario de VUCEM
|
||||
password: Contraseña de VUCEM
|
||||
aduana: Código de aduana
|
||||
patente: Número de patente
|
||||
|
||||
Returns:
|
||||
str: Template SOAP XML completo
|
||||
"""
|
||||
soap_template = f'''
|
||||
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
|
||||
xmlns:con="http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarremesas"
|
||||
xmlns:com="http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/comunes">
|
||||
<soapenv:Header>
|
||||
<wsse:Security soapenv:mustUnderstand="1"
|
||||
xmlns:wsse="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd">
|
||||
<wsse:UsernameToken>
|
||||
<wsse:Username>{username}</wsse:Username>
|
||||
<wsse:Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordText">{password}</wsse:Password>
|
||||
</wsse:UsernameToken>
|
||||
</wsse:Security>
|
||||
</soapenv:Header>
|
||||
<soapenv:Body>
|
||||
<con:consultarRemesasPeticion>
|
||||
<con:numeroOperacion>{numero_operacion}</con:numeroOperacion>
|
||||
<con:peticion>
|
||||
<com:aduana>{aduana}</com:aduana>
|
||||
<com:patente>{patente}</com:patente>
|
||||
<com:pedimento>{pedimento}</com:pedimento>
|
||||
</con:peticion>
|
||||
</con:consultarRemesasPeticion>
|
||||
</soapenv:Body>
|
||||
</soapenv:Envelope>'''
|
||||
return soap_template
|
||||
|
||||
|
||||
# Pedimento Completo
|
||||
class RemesaXMLScraper:
|
||||
"""
|
||||
Controlador para scrapear XML de consultar remesas.
|
||||
Extrae todos los comprobantesVE, junto con remesaAgente y remesaSA.
|
||||
"""
|
||||
|
||||
namespaces = {
|
||||
"S": "http://schemas.xmlsoap.org/soap/envelope/",
|
||||
"ns2": "http://www.ventanillaunica.gob.mx/common/ws/oxml/respuesta",
|
||||
"ns3": "http://www.ventanillaunica.gob.mx/pedimentos/ws/oxml/consultarremesas",
|
||||
}
|
||||
|
||||
def extract_remesas(self, xml_content: str) -> List[Dict[str, str]]:
|
||||
"""
|
||||
Extrae todos los comprobanteVE de un XML de remesas.
|
||||
|
||||
Args:
|
||||
xml_content: Contenido del XML en string.
|
||||
|
||||
Returns:
|
||||
Lista de diccionarios con comprobanteVE, remesaAgente y remesaSA.
|
||||
"""
|
||||
try:
|
||||
root = ET.fromstring(xml_content)
|
||||
|
||||
remesas = []
|
||||
for remesa in root.findall(".//ns3:remesas", self.namespaces):
|
||||
comprobante = remesa.find("ns3:comprobanteVE", self.namespaces)
|
||||
agente = remesa.find("ns3:remesaAgente", self.namespaces)
|
||||
sa = remesa.find("ns3:remesaSA", self.namespaces)
|
||||
|
||||
remesas.append({
|
||||
"comprobanteVE": comprobante.text if comprobante is not None else None,
|
||||
"remesaAgente": agente.text if agente is not None else None,
|
||||
"remesaSA": sa.text if sa is not None else None
|
||||
})
|
||||
|
||||
return remesas
|
||||
|
||||
except ET.ParseError as e:
|
||||
print(f"Error al parsear XML: {e}")
|
||||
return []
|
||||
except Exception as e:
|
||||
print(f"Error inesperado: {e}")
|
||||
return []
|
||||
|
||||
def extract_data(self, xml_content: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Método de compatibilidad que llama a extract_remesas y devuelve un dict.
|
||||
|
||||
Args:
|
||||
xml_content: Contenido del XML en string.
|
||||
|
||||
Returns:
|
||||
Dict con los datos extraídos del XML.
|
||||
"""
|
||||
remesas_data = self.extract_remesas(xml_content)
|
||||
return {
|
||||
'coves': remesas_data,
|
||||
'total_remesas': len(remesas_data)
|
||||
}
|
||||
|
||||
|
||||
remesa_rest_controller = RemesaController()
|
||||
remesa_vu_controller = RemesaVUController()
|
||||
remesa_xml_scraper = RemesaXMLScraper()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user