1002 lines
48 KiB
Python
1002 lines
48 KiB
Python
from celery import Celery
|
|
from celery_app import celery_app
|
|
import asyncio
|
|
import logging
|
|
from typing import Dict, Any
|
|
from contextlib import asynccontextmanager
|
|
from controllers.RESTController import rest_controller
|
|
from controllers.SOAPController import soap_controller
|
|
from utils.peticiones import (
|
|
get_soap_acuseCOVE, get_soap_cove, get_soap_pedimento_completo,
|
|
get_soap_remesas, get_soap_partidas, get_soap_acuse, get_soap_edocument
|
|
)
|
|
from utils.servicios import (
|
|
_validate_request_data,
|
|
_get_pedimento_service,
|
|
_update_service_status,
|
|
_get_vucem_credentials,
|
|
_create_response,
|
|
_post_edocuments,
|
|
_schedule_follow_up_services,
|
|
_post_coves
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Estados del servicio
|
|
ESTADO_CREADO = 1
|
|
ESTADO_EN_PROCESO = 2
|
|
ESTADO_FINALIZADO = 3
|
|
ESTADO_ERROR = 4
|
|
|
|
def run_async_task(async_func, *args, **kwargs):
|
|
"""Helper function to run async functions in Celery tasks"""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
return loop.run_until_complete(async_func(*args, **kwargs))
|
|
finally:
|
|
loop.close()
|
|
|
|
@celery_app.task(bind=True, name='tasks.pedimento_completo_task')
|
|
def pedimento_completo_task(self, request_data: Dict[str, Any]):
|
|
"""
|
|
Tarea asíncrona para obtener pedimento completo
|
|
"""
|
|
async def _execute_pedimento_completo():
|
|
operation_name = "pedimento_completo"
|
|
service_data = None
|
|
|
|
try:
|
|
logger.info(f"[TASK] Iniciando consulta de pedimento completo - Pedimento: {request_data['pedimento']}")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
|
await _validate_request_data(request_data)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
|
service_data = await _get_pedimento_service(
|
|
pedimento_id=request_data['pedimento'],
|
|
service_type=3,
|
|
operation_name=operation_name
|
|
)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
|
update_success = await _update_service_status(
|
|
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
|
)
|
|
|
|
if not update_success:
|
|
raise Exception("Error al actualizar estado del servicio")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo credenciales VUCEM'})
|
|
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
|
if not contribuyente_id:
|
|
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("ID de contribuyente no encontrado")
|
|
|
|
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Realizando petición SOAP'})
|
|
logger.info("[TASK] Realizando petición SOAP para pedimento completo...")
|
|
|
|
soap_response = await get_soap_pedimento_completo(
|
|
credenciales=credentials,
|
|
response_service=service_data,
|
|
soap_controller=soap_controller
|
|
)
|
|
|
|
if not soap_response:
|
|
raise Exception("Error en la petición SOAP para pedimento completo")
|
|
|
|
|
|
logger.info("[TASK] Petición SOAP para pedimento completo completada exitosamente")
|
|
|
|
# Actualizar datos del pedimento con información del XML (igual que en el endpoint)
|
|
try:
|
|
xml_content = soap_response.get('xml_content', {})
|
|
if xml_content:
|
|
update_content = {k: v for k, v in xml_content.items() if k != 'identificadores_ed'}
|
|
update_content['existe_expediente'] = True
|
|
pedimento_response = await rest_controller.put_pedimento(
|
|
service_data['pedimento']['id'],
|
|
update_content
|
|
)
|
|
logger.info("Pedimento actualizado exitosamente (TASK)")
|
|
else:
|
|
logger.warning("No se recibió contenido XML para actualizar el pedimento (TASK)")
|
|
except Exception as e:
|
|
logger.warning(f"No se pudo actualizar el pedimento (TASK, continuando proceso): {e}")
|
|
|
|
|
|
# Procesar y guardar COVEs (igual que en el endpoint)
|
|
try:
|
|
coves = xml_content.get('coves', []) if 'xml_content' in locals() else []
|
|
logger.warning(f"COVEs encontrados: {coves}")
|
|
for cove in coves:
|
|
logger.warning(f"Procesando COVE: {cove}")
|
|
cove_result = await _post_coves(
|
|
response_service=service_data,
|
|
coves=coves
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error al procesar COVEs: {e}")
|
|
cove_result = None
|
|
|
|
# Subir documento de pedimento completo si la petición fue exitosa
|
|
try:
|
|
upload_result = await _post_edocuments(
|
|
response_service=service_data,
|
|
identificadores_ed=[soap_response.get('documento', {}).get('numero_edocument')]
|
|
)
|
|
logger.info(f"Documento de pedimento completo subido exitosamente: {upload_result}")
|
|
except Exception as upload_err:
|
|
logger.error(f"Error al subir documento de pedimento completo: {upload_err}")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
|
|
|
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
|
|
|
response_data = await _create_response(
|
|
service_data=service_data,
|
|
additional_data={
|
|
"pedimento_completo": soap_response,
|
|
"documento": soap_response.get('documento', {}),
|
|
"xml_content": soap_response.get('xml_content', {})
|
|
},
|
|
success_message="Pedimento completo obtenido exitosamente"
|
|
)
|
|
|
|
logger.info(f"[TASK] Consulta de pedimento completo completada exitosamente - Servicio: {service_data['id']}")
|
|
return response_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"[TASK] Error en pedimento_completo: {e}")
|
|
if service_data:
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise e
|
|
|
|
return run_async_task(_execute_pedimento_completo)
|
|
|
|
@celery_app.task(bind=True)
|
|
def partidas_task(self, **kwargs):
|
|
"""Tarea asíncrona para obtener partidas"""
|
|
async def _execute_partidas():
|
|
operation_name = "PARTIDAS_ASYNC"
|
|
service_data = None
|
|
|
|
try:
|
|
request_data = kwargs
|
|
logger.info(f"[TASK] Iniciando consulta de partidas - Pedimento: {request_data.get('pedimento')}")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
|
await _validate_request_data(request_data)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
|
service_data = await _get_pedimento_service(
|
|
pedimento_id=request_data['pedimento'],
|
|
service_type=4,
|
|
operation_name=operation_name
|
|
)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
|
update_success = await _update_service_status(
|
|
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
|
)
|
|
|
|
if not update_success:
|
|
raise Exception("Error al actualizar estado del servicio a 'En proceso'")
|
|
|
|
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
|
if not contribuyente_id:
|
|
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("ID de contribuyente no encontrado")
|
|
|
|
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
|
|
|
# Obtener número de partidas
|
|
numero_partidas = service_data['pedimento'].get('numero_partidas', 0)
|
|
partidas_procesadas = []
|
|
logger.info(f"Procesando {numero_partidas} partidas...")
|
|
|
|
if numero_partidas <= 0:
|
|
logger.warning("El pedimento no tiene partidas para procesar")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("No se encontraron partidas para el pedimento")
|
|
|
|
from utils.peticiones import get_soap_partidas
|
|
|
|
for partida_num in range(1, numero_partidas + 1):
|
|
try:
|
|
logger.info(f"Procesando partida {partida_num}/{numero_partidas}")
|
|
soap_response = await get_soap_partidas(
|
|
credenciales=credentials,
|
|
response_service=service_data,
|
|
soap_controller=soap_controller,
|
|
partida=str(partida_num)
|
|
)
|
|
if soap_response:
|
|
partidas_procesadas.append({
|
|
"numero": partida_num,
|
|
"procesada": True,
|
|
"documento": soap_response.get('documento', {})
|
|
})
|
|
logger.info(f"Partida {partida_num} procesada exitosamente")
|
|
else:
|
|
logger.warning(f"No se pudo procesar la partida {partida_num}")
|
|
partidas_procesadas.append({
|
|
"numero": partida_num,
|
|
"procesada": False,
|
|
"error": "Error en petición SOAP"
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error al procesar partida {partida_num}: {e}")
|
|
partidas_procesadas.append({
|
|
"numero": partida_num,
|
|
"procesada": False,
|
|
"error": str(e)
|
|
})
|
|
continue
|
|
|
|
partidas_exitosas = [p for p in partidas_procesadas if p.get('procesada', False)]
|
|
|
|
if not partidas_exitosas:
|
|
logger.error("No se pudo procesar ninguna partida")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("No se pudo procesar ninguna partida")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
|
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
|
|
|
response_data = await _create_response(
|
|
service_data=service_data,
|
|
additional_data={
|
|
"partidas": partidas_procesadas,
|
|
"total_partidas": numero_partidas,
|
|
"partidas_exitosas": len(partidas_exitosas),
|
|
"partidas_fallidas": len(partidas_procesadas) - len(partidas_exitosas)
|
|
},
|
|
success_message=f"Se procesaron {len(partidas_exitosas)}/{numero_partidas} partidas exitosamente"
|
|
)
|
|
|
|
if len(partidas_exitosas) < numero_partidas:
|
|
response_data["warnings"] = [
|
|
f"Se procesaron solo {len(partidas_exitosas)} de {numero_partidas} partidas"
|
|
]
|
|
|
|
logger.info(f"Procesamiento de partidas completado - Exitosas: {len(partidas_exitosas)}/{numero_partidas}")
|
|
return response_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"[TASK] Error en partidas: {e}")
|
|
if service_data:
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise e
|
|
|
|
return run_async_task(_execute_partidas)
|
|
|
|
@celery_app.task(bind=True)
|
|
def remesas_task(self, **kwargs):
|
|
"""Tarea asíncrona para obtener remesas"""
|
|
async def _execute_remesas():
|
|
operation_name = "REMESAS_ASYNC"
|
|
service_data = None
|
|
|
|
try:
|
|
request_data = kwargs
|
|
logger.info(f"[TASK] Iniciando consulta de remesas - Pedimento: {request_data.get('pedimento')}")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
|
await _validate_request_data(request_data)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
|
service_data = await _get_pedimento_service(
|
|
pedimento_id=request_data['pedimento'],
|
|
service_type=5,
|
|
operation_name=operation_name
|
|
)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
|
update_success = await _update_service_status(
|
|
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
|
)
|
|
|
|
if not update_success:
|
|
raise Exception("Error al actualizar estado del servicio a 'En proceso'")
|
|
|
|
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
|
if not contribuyente_id:
|
|
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("ID de contribuyente no encontrado")
|
|
|
|
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Realizando petición SOAP'})
|
|
logger.info("[TASK] Realizando petición SOAP para remesas...")
|
|
|
|
from utils.peticiones import get_soap_remesas
|
|
|
|
soap_response = await get_soap_remesas(
|
|
credenciales=credentials,
|
|
response_service=service_data,
|
|
soap_controller=soap_controller
|
|
)
|
|
|
|
if not soap_response:
|
|
raise Exception("Error en la petición SOAP para remesas")
|
|
|
|
logger.info("[TASK] Petición SOAP para remesas completada exitosamente")
|
|
# Subir documento de remesas si la petición fue exitosa
|
|
try:
|
|
upload_result = await _post_edocuments(
|
|
response_service=service_data,
|
|
identificadores_ed=[soap_response.get('documento', {}).get('numero_edocument')]
|
|
)
|
|
logger.info(f"Documento de remesas subido exitosamente: {upload_result}")
|
|
except Exception as upload_err:
|
|
logger.error(f"Error al subir documento de remesas: {upload_err}")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
|
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
|
|
|
response_data = await _create_response(
|
|
service_data=service_data,
|
|
additional_data={
|
|
"remesas": soap_response,
|
|
"documento": soap_response.get('documento', {}),
|
|
"xml_content": soap_response.get('xml_content', {})
|
|
},
|
|
success_message="Remesas obtenidas exitosamente"
|
|
)
|
|
|
|
logger.info(f"[TASK] Consulta de remesas completada exitosamente - Servicio: {service_data['id']}")
|
|
return response_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"[TASK] Error en remesas: {e}")
|
|
if service_data:
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise e
|
|
|
|
return run_async_task(_execute_remesas)
|
|
|
|
@celery_app.task(bind=True)
|
|
def acuse_task(self, **kwargs):
|
|
"""Tarea asíncrona para obtener acuse"""
|
|
async def _execute_acuse():
|
|
operation_name = "ACUSE_ASYNC"
|
|
service_data = None
|
|
|
|
try:
|
|
request_data = kwargs
|
|
logger.info(f"[TASK] Iniciando consulta de acuse - Pedimento: {request_data.get('pedimento')}")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
|
await _validate_request_data(request_data)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
|
service_data = await _get_pedimento_service(
|
|
pedimento_id=request_data['pedimento'],
|
|
service_type=6,
|
|
operation_name=operation_name
|
|
)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
|
update_success = await _update_service_status(
|
|
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
|
)
|
|
|
|
if not update_success:
|
|
raise Exception("Error al actualizar estado del servicio a 'En proceso'")
|
|
|
|
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
|
if not contribuyente_id:
|
|
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("ID de contribuyente no encontrado")
|
|
|
|
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
|
|
|
# Obtener documentos digitalizados (e-documents)
|
|
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo documentos digitalizados'})
|
|
logger.info("[TASK] Obteniendo documentos digitalizados...")
|
|
|
|
try:
|
|
edocs = await rest_controller.get_edocs(service_data['pedimento']['id'])
|
|
|
|
if not edocs:
|
|
logger.warning("No se encontraron documentos digitalizados para el pedimento")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("No se encontraron documentos digitalizados para el pedimento")
|
|
|
|
logger.info(f"Se encontraron {len(edocs)} documentos digitalizados")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al obtener documentos digitalizados: {e}")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise e
|
|
|
|
# Procesar acuses de documentos digitalizados
|
|
self.update_state(state='PROGRESS', meta={'status': 'Procesando acuses de documentos'})
|
|
documentos_procesados = []
|
|
documentos_exitosos = 0
|
|
|
|
logger.info(f"[TASK] Procesando acuses para {len(edocs)} documentos...")
|
|
|
|
from utils.peticiones import get_soap_acuse
|
|
|
|
for idx, edoc in enumerate(edocs):
|
|
documento_info = {
|
|
"clave": edoc.get('clave', 'N/A'),
|
|
"descripcion": edoc.get('descripcion', 'N/A'),
|
|
"numero_edocument": edoc.get('numero_edocument', 'N/A'),
|
|
"procesado": False,
|
|
"error": None
|
|
}
|
|
|
|
# Verificar que el documento tenga número de e-document
|
|
if not edoc.get('numero_edocument'):
|
|
logger.warning(f"Documento {idx + 1} no tiene numero_edocument, saltando...")
|
|
documento_info["error"] = "Sin número de e-document"
|
|
documentos_procesados.append(documento_info)
|
|
continue
|
|
|
|
try:
|
|
logger.info(f"Procesando acuse para documento {idx + 1}: {edoc['numero_edocument']}")
|
|
|
|
soap_response = await get_soap_acuse(
|
|
credenciales=credentials,
|
|
response_service=service_data,
|
|
soap_controller=soap_controller,
|
|
edocument=edoc,
|
|
idx=idx + 1
|
|
)
|
|
|
|
if soap_response:
|
|
documento_info["procesado"] = True
|
|
documento_info["documento"] = soap_response.get('documento', {})
|
|
documentos_exitosos += 1
|
|
logger.info(f"Acuse del documento {idx + 1} procesado exitosamente")
|
|
else:
|
|
documento_info["error"] = "Error en petición SOAP"
|
|
logger.warning(f"No se pudo procesar el acuse del documento {idx + 1}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al procesar acuse del documento {idx + 1}: {e}")
|
|
documento_info["error"] = str(e)
|
|
# Continuar con los siguientes documentos
|
|
|
|
documentos_procesados.append(documento_info)
|
|
|
|
# Verificar si se procesó al menos un documento
|
|
if documentos_exitosos == 0:
|
|
logger.error("No se pudo procesar ningún acuse de documento digitalizado")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("No se pudo procesar ningún acuse de documento digitalizado")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
|
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
|
|
|
response_data = await _create_response(
|
|
service_data=service_data,
|
|
additional_data={
|
|
"acuses": documentos_procesados,
|
|
"documentos_procesados": len(documentos_procesados),
|
|
"documentos_exitosos": documentos_exitosos
|
|
},
|
|
success_message=f"Acuses procesados exitosamente: {documentos_exitosos} de {len(documentos_procesados)} documentos"
|
|
)
|
|
|
|
logger.info(f"[TASK] Consulta de acuse completada exitosamente - Servicio: {service_data['id']}")
|
|
return response_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"[TASK] Error en acuse: {e}")
|
|
if service_data:
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise e
|
|
|
|
return run_async_task(_execute_acuse)
|
|
|
|
# Tarea asíncrona para consultar el estado de un pedimento
|
|
@celery_app.task(bind=True, name='tasks.estado_pedimento_task')
|
|
def estado_pedimento_task(self, request_data: Dict[str, Any]):
|
|
"""
|
|
Tarea asíncrona para consultar el estado de un pedimento
|
|
"""
|
|
async def _execute_estado_pedimento():
|
|
operation_name = "estado_pedimento"
|
|
service_data = None
|
|
try:
|
|
logger.info(f"[TASK] Iniciando consulta de estado de pedimento - Pedimento: {request_data['pedimento']}")
|
|
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
|
await _validate_request_data(request_data)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
|
service_data = await _get_pedimento_service(
|
|
pedimento_id=request_data['pedimento'],
|
|
service_type=1,
|
|
operation_name=operation_name
|
|
)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
|
update_success = await _update_service_status(
|
|
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
|
)
|
|
if not update_success:
|
|
raise Exception("Error al actualizar estado del servicio")
|
|
|
|
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
|
if not contribuyente_id:
|
|
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("ID de contribuyente no encontrado")
|
|
|
|
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Realizando petición SOAP'})
|
|
logger.info("[TASK] Realizando petición SOAP para estado de pedimento...")
|
|
from utils.peticiones import get_soap_pedimento_estado
|
|
soap_response = await get_soap_pedimento_estado(
|
|
credenciales=credentials,
|
|
response_service=service_data,
|
|
soap_controller=soap_controller
|
|
)
|
|
if not soap_response:
|
|
raise Exception("Error en la petición SOAP para estado de pedimento")
|
|
|
|
logger.info("[TASK] Petición SOAP para estado de pedimento completada exitosamente")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
|
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
|
|
|
response_data = await _create_response(
|
|
service_data=service_data,
|
|
additional_data={
|
|
"estado_pedimento": soap_response
|
|
},
|
|
success_message="Estado de pedimento obtenido exitosamente"
|
|
)
|
|
logger.info(f"[TASK] Consulta de estado de pedimento completada exitosamente - Servicio: {service_data['id']}")
|
|
return response_data
|
|
except Exception as e:
|
|
logger.error(f"[TASK] Error en estado de pedimento: {e}")
|
|
if service_data:
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise e
|
|
return run_async_task(_execute_estado_pedimento)
|
|
|
|
@celery_app.task(bind=True)
|
|
def edocument_task(self, **kwargs):
|
|
"""Tarea asíncrona para obtener edocument"""
|
|
async def _execute_edocument():
|
|
operation_name = "EDOCUMENT_ASYNC"
|
|
service_data = None
|
|
|
|
try:
|
|
request_data = kwargs
|
|
logger.info(f"[TASK] Iniciando consulta de edocument - Pedimento: {request_data.get('pedimento')}")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
|
await _validate_request_data(request_data)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
|
service_data = await _get_pedimento_service(
|
|
pedimento_id=request_data['pedimento'],
|
|
service_type=7,
|
|
operation_name=operation_name
|
|
)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
|
update_success = await _update_service_status(
|
|
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
|
)
|
|
|
|
if not update_success:
|
|
raise Exception("Error al actualizar estado del servicio a 'En proceso'")
|
|
|
|
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
|
if not contribuyente_id:
|
|
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("ID de contribuyente no encontrado")
|
|
|
|
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
|
|
|
# Obtener documentos digitalizados (e-documents)
|
|
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo documentos digitalizados'})
|
|
logger.info("[TASK] Obteniendo documentos digitalizados...")
|
|
|
|
try:
|
|
edocs = await rest_controller.get_edocs(service_data['pedimento']['id'])
|
|
|
|
if not edocs:
|
|
logger.warning("No se encontraron documentos digitalizados para el pedimento")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("No se encontraron documentos digitalizados para el pedimento")
|
|
|
|
logger.info(f"Se encontraron {len(edocs)} documentos digitalizados")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al obtener documentos digitalizados: {e}")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise e
|
|
|
|
# Procesar edocuments
|
|
self.update_state(state='PROGRESS', meta={'status': 'Procesando documentos electrónicos'})
|
|
documentos_procesados = []
|
|
documentos_exitosos = 0
|
|
|
|
logger.info(f"[TASK] Procesando edocuments para {len(edocs)} documentos...")
|
|
|
|
from utils.peticiones import get_soap_edocument
|
|
|
|
for idx, edoc in enumerate(edocs):
|
|
documento_info = {
|
|
"clave": edoc.get('clave', 'N/A'),
|
|
"descripcion": edoc.get('descripcion', 'N/A'),
|
|
"numero_edocument": edoc.get('numero_edocument', 'N/A'),
|
|
"procesado": False,
|
|
"error": None
|
|
}
|
|
|
|
# Verificar que el documento tenga número de e-document
|
|
if not edoc.get('numero_edocument'):
|
|
logger.warning(f"Documento {idx + 1} no tiene numero_edocument, saltando...")
|
|
documento_info["error"] = "Sin número de e-document"
|
|
documentos_procesados.append(documento_info)
|
|
continue
|
|
|
|
try:
|
|
logger.info(f"Procesando e-document {idx + 1}: {edoc['numero_edocument']}")
|
|
|
|
soap_response = await get_soap_edocument(
|
|
credenciales=credentials,
|
|
response_service=service_data,
|
|
soap_controller=soap_controller,
|
|
edocument=edoc,
|
|
idx=idx + 1
|
|
)
|
|
|
|
if soap_response:
|
|
documento_info["procesado"] = True
|
|
documento_info["documento"] = soap_response.get('documento', {})
|
|
documentos_exitosos += 1
|
|
logger.info(f"E-document {idx + 1} procesado exitosamente")
|
|
# Subir el documento si la petición fue exitosa
|
|
try:
|
|
upload_result = await _post_edocuments(
|
|
response_service=service_data,
|
|
identificadores_ed=[edoc.get('numero_edocument')]
|
|
)
|
|
documento_info["upload_result"] = upload_result
|
|
logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente")
|
|
except Exception as upload_err:
|
|
documento_info["upload_error"] = str(upload_err)
|
|
logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}")
|
|
# Subir el documento si la petición fue exitosa
|
|
try:
|
|
upload_result = await _post_edocuments(
|
|
response_service=service_data,
|
|
identificadores_ed=[edoc.get('numero_edocument')]
|
|
)
|
|
documento_info["upload_result"] = upload_result
|
|
logger.info(f"Documento {edoc.get('numero_edocument')} subido exitosamente")
|
|
except Exception as upload_err:
|
|
documento_info["upload_error"] = str(upload_err)
|
|
logger.error(f"Error al subir documento {edoc.get('numero_edocument')}: {upload_err}")
|
|
else:
|
|
documento_info["error"] = "Error en petición SOAP"
|
|
logger.warning(f"No se pudo procesar el e-document {idx + 1}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al procesar e-document {idx + 1}: {e}")
|
|
documento_info["error"] = str(e)
|
|
# Continuar con los siguientes documentos
|
|
|
|
documentos_procesados.append(documento_info)
|
|
|
|
# Verificar si se procesó al menos un documento
|
|
if documentos_exitosos == 0:
|
|
logger.error("No se pudo procesar ningún e-document")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("No se pudo procesar ningún e-document")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
|
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
|
|
|
response_data = await _create_response(
|
|
service_data=service_data,
|
|
additional_data={
|
|
"edocuments": documentos_procesados,
|
|
"documentos_procesados": len(documentos_procesados),
|
|
"documentos_exitosos": documentos_exitosos
|
|
},
|
|
success_message=f"E-documents procesados exitosamente: {documentos_exitosos} de {len(documentos_procesados)} documentos"
|
|
)
|
|
|
|
logger.info(f"[TASK] Consulta de edocument completada exitosamente - Servicio: {service_data['id']}")
|
|
return response_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"[TASK] Error en edocument: {e}")
|
|
if service_data:
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise e
|
|
|
|
return run_async_task(_execute_edocument)
|
|
|
|
@celery_app.task(bind=True)
|
|
def coves_task(self, **kwargs):
|
|
"""Tarea asíncrona para obtener coves"""
|
|
async def _execute_coves():
|
|
operation_name = "COVES_ASYNC"
|
|
service_data = None
|
|
|
|
try:
|
|
request_data = kwargs
|
|
logger.info(f"[TASK] Iniciando consulta de coves - Pedimento: {request_data.get('pedimento')}")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
|
await _validate_request_data(request_data)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
|
service_data = await _get_pedimento_service(
|
|
pedimento_id=request_data['pedimento'],
|
|
service_type=8,
|
|
operation_name=operation_name
|
|
)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
|
update_success = await _update_service_status(
|
|
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
|
)
|
|
|
|
if not update_success:
|
|
raise Exception("Error al actualizar estado del servicio a 'En proceso'")
|
|
|
|
# Para COVES, se usa el RESTController en lugar de SOAP
|
|
self.update_state(state='PROGRESS', meta={'status': 'Realizando petición REST'})
|
|
logger.info("[TASK] Realizando petición REST para coves...")
|
|
|
|
coves_response = await rest_controller.get_coves(request_data['pedimento'])
|
|
|
|
if not coves_response:
|
|
raise Exception("Error en la petición REST para coves")
|
|
|
|
|
|
logger.info("[TASK] Petición REST para coves completada exitosamente")
|
|
# Procesar acuses de documentos digitalizados
|
|
documentos_procesados = []
|
|
documentos_exitosos = 0
|
|
|
|
coves = coves_response if isinstance(coves_response, list) else []
|
|
logger.info(f"Procesando COVE para {len(coves)} documentos...")
|
|
|
|
# Obtener credenciales VUCEM para la firma
|
|
credentials = await _get_vucem_credentials(service_data.get('pedimento', {}).get('contribuyente', ''), operation_name)
|
|
from controllers.SOAPController import soap_controller
|
|
|
|
for idx, cove in enumerate(coves):
|
|
documento_info = {
|
|
"numero_cove": cove.get('numero_cove', 'N/A'),
|
|
"procesado": False,
|
|
"error": None
|
|
}
|
|
|
|
if not cove.get('numero_cove'):
|
|
logger.warning(f"Documento {idx + 1} no tiene numero_cove, saltando...")
|
|
documento_info["error"] = "Sin número de cove"
|
|
documentos_procesados.append(documento_info)
|
|
continue
|
|
|
|
try:
|
|
logger.info(f"Procesando cove para documento {idx + 1}: {cove['numero_cove']}")
|
|
soap_response = await get_soap_cove(
|
|
credenciales=credentials,
|
|
response_service=service_data,
|
|
soap_controller=soap_controller,
|
|
cove=cove,
|
|
idx=idx + 1
|
|
)
|
|
|
|
if soap_response:
|
|
documento_info["procesado"] = True
|
|
documento_info["documento"] = soap_response.get('documento', {})
|
|
documentos_exitosos += 1
|
|
logger.info(f"cove del documento {idx + 1} procesado exitosamente")
|
|
else:
|
|
documento_info["error"] = "Error en petición SOAP"
|
|
logger.warning(f"No se pudo procesar el cove del documento {idx + 1}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al procesar cove del documento {idx + 1}: {e}")
|
|
documento_info["error"] = str(e)
|
|
|
|
documentos_procesados.append(documento_info)
|
|
|
|
if documentos_exitosos == 0:
|
|
logger.error("No se pudo procesar ningún cove de documento digitalizado")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("No se pudo procesar ningún acuse cove de documento digitalizado")
|
|
|
|
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
|
|
|
response_data = await _create_response(
|
|
service_data=service_data,
|
|
additional_data={
|
|
"covesDocs": documentos_procesados,
|
|
"total_documentos": len(coves),
|
|
"documentos_exitosos": documentos_exitosos,
|
|
"documentos_fallidos": len(coves) - documentos_exitosos
|
|
},
|
|
success_message=f"Se procesaron {documentos_exitosos}/{len(coves)} cove de documentos exitosamente"
|
|
)
|
|
|
|
if documentos_exitosos < len(coves):
|
|
response_data["warnings"] = [
|
|
f"Se procesaron solo {documentos_exitosos} de {len(coves)} coves"
|
|
]
|
|
|
|
logger.info(f"Procesamiento de acuses cove completado - Exitosos: {documentos_exitosos}/{len(coves)}")
|
|
return response_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"[TASK] Error en coves: {e}")
|
|
if service_data:
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise e
|
|
|
|
return run_async_task(_execute_coves)
|
|
|
|
@celery_app.task(bind=True)
|
|
def acuse_cove_task(self, **kwargs):
|
|
"""Tarea asíncrona para obtener acuse de COVE"""
|
|
async def _execute_acuse_cove():
|
|
operation_name = "ACUSE_COVE_ASYNC"
|
|
service_data = None
|
|
|
|
try:
|
|
request_data = kwargs
|
|
logger.info(f"[TASK] Iniciando consulta de acuse COVE - Pedimento: {request_data.get('pedimento')}")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Validando datos de entrada'})
|
|
await _validate_request_data(request_data)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo servicio de pedimento'})
|
|
service_data = await _get_pedimento_service(
|
|
pedimento_id=request_data['pedimento'],
|
|
service_type=9,
|
|
operation_name=operation_name
|
|
)
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Actualizando estado del servicio'})
|
|
update_success = await _update_service_status(
|
|
service_data['id'], ESTADO_EN_PROCESO, service_data, operation_name
|
|
)
|
|
|
|
if not update_success:
|
|
raise Exception("Error al actualizar estado del servicio a 'En proceso'")
|
|
|
|
contribuyente_id = service_data.get('pedimento', {}).get('contribuyente', '')
|
|
if not contribuyente_id:
|
|
logger.error("No se encontró ID de contribuyente en los datos del servicio")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("ID de contribuyente no encontrado")
|
|
|
|
credentials = await _get_vucem_credentials(contribuyente_id, operation_name)
|
|
|
|
# Obtener COVES
|
|
self.update_state(state='PROGRESS', meta={'status': 'Obteniendo COVES'})
|
|
logger.info("[TASK] Obteniendo COVES...")
|
|
|
|
try:
|
|
coves = await rest_controller.get_coves(service_data['pedimento']['id'])
|
|
|
|
if not coves:
|
|
logger.warning("No se encontraron COVES para el pedimento")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("No se encontraron COVES para el pedimento")
|
|
|
|
logger.info(f"Se encontraron {len(coves)} COVES")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al obtener COVES: {e}")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise e
|
|
|
|
# Procesar acuses de COVES
|
|
self.update_state(state='PROGRESS', meta={'status': 'Procesando acuses de COVES'})
|
|
documentos_procesados = []
|
|
documentos_exitosos = 0
|
|
|
|
logger.info(f"[TASK] Procesando acuses para {len(coves)} COVES...")
|
|
|
|
from utils.peticiones import get_soap_acuseCOVE
|
|
|
|
for idx, cove in enumerate(coves):
|
|
documento_info = {
|
|
"clave": cove.get('clave', 'N/A'),
|
|
"descripcion": cove.get('descripcion', 'N/A'),
|
|
"numero_cove": cove.get('numero_cove', 'N/A'),
|
|
"procesado": False,
|
|
"error": None
|
|
}
|
|
|
|
# Verificar que el documento tenga número de COVE
|
|
if not cove.get('numero_cove'):
|
|
logger.warning(f"COVE {idx + 1} no tiene numero_cove, saltando...")
|
|
documento_info["error"] = "Sin número de COVE"
|
|
documentos_procesados.append(documento_info)
|
|
continue
|
|
|
|
try:
|
|
logger.info(f"Procesando acuse para COVE {idx + 1}: {cove['numero_cove']}")
|
|
|
|
soap_response = await get_soap_acuseCOVE(
|
|
credenciales=credentials,
|
|
response_service=service_data,
|
|
soap_controller=soap_controller,
|
|
cove=cove,
|
|
idx=idx + 1
|
|
)
|
|
|
|
if soap_response:
|
|
documento_info["procesado"] = True
|
|
documento_info["documento"] = soap_response.get('documento', {})
|
|
documentos_exitosos += 1
|
|
logger.info(f"Acuse de COVE {idx + 1} procesado exitosamente")
|
|
# Subir el documento de COVE si la petición fue exitosa
|
|
try:
|
|
upload_result = await _post_coves(
|
|
response_service=service_data,
|
|
identificadores_cove=[cove.get('numero_cove')]
|
|
)
|
|
documento_info["upload_result"] = upload_result
|
|
logger.info(f"Documento COVE {cove.get('numero_cove')} subido exitosamente")
|
|
except Exception as upload_err:
|
|
documento_info["upload_error"] = str(upload_err)
|
|
logger.error(f"Error al subir documento COVE {cove.get('numero_cove')}: {upload_err}")
|
|
else:
|
|
documento_info["error"] = "Error en petición SOAP"
|
|
logger.warning(f"No se pudo procesar el acuse de COVE {idx + 1}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error al procesar acuse de COVE {idx + 1}: {e}")
|
|
documento_info["error"] = str(e)
|
|
# Continuar con los siguientes documentos
|
|
|
|
documentos_procesados.append(documento_info)
|
|
|
|
# Verificar si se procesó al menos un documento
|
|
if documentos_exitosos == 0:
|
|
logger.error("No se pudo procesar ningún acuse de COVE")
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise Exception("No se pudo procesar ningún acuse de COVE")
|
|
|
|
self.update_state(state='PROGRESS', meta={'status': 'Finalizando proceso'})
|
|
await _update_service_status(service_data['id'], ESTADO_FINALIZADO, service_data, operation_name)
|
|
|
|
response_data = await _create_response(
|
|
service_data=service_data,
|
|
additional_data={
|
|
"acuses_cove": documentos_procesados,
|
|
"documentos_procesados": len(documentos_procesados),
|
|
"documentos_exitosos": documentos_exitosos
|
|
},
|
|
success_message=f"Acuses de COVE procesados exitosamente: {documentos_exitosos} de {len(documentos_procesados)} documentos"
|
|
)
|
|
|
|
logger.info(f"[TASK] Consulta de acuse COVE completada exitosamente - Servicio: {service_data['id']}")
|
|
return response_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"[TASK] Error en acuse COVE: {e}")
|
|
if service_data:
|
|
await _update_service_status(service_data['id'], ESTADO_ERROR, service_data, operation_name)
|
|
raise e
|
|
|
|
return run_async_task(_execute_acuse_cove)
|