233 lines
7.2 KiB
Python
233 lines
7.2 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from datetime import datetime, timedelta
|
|
from typing import List
|
|
from app.database import get_db
|
|
from app.models import Usuario
|
|
from app.schemas import (
|
|
UsuarioCreate, UsuarioUpdate, UsuarioResponse,
|
|
Token, LoginRequest, MessageResponse
|
|
)
|
|
from app.auth import verify_password, get_password_hash, create_access_token
|
|
from app.dependencies import get_current_user, get_current_active_superuser
|
|
from app.config import settings
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/register", response_model=UsuarioResponse, status_code=status.HTTP_201_CREATED)
|
|
def register(usuario: UsuarioCreate, db: Session = Depends(get_db)):
|
|
"""
|
|
Registrar un nuevo usuario (público - primer usuario será superuser)
|
|
"""
|
|
# Verificar si el username ya existe
|
|
if db.query(Usuario).filter(Usuario.username == usuario.username).first():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="El username ya está registrado"
|
|
)
|
|
|
|
# Verificar si el email ya existe
|
|
if db.query(Usuario).filter(Usuario.email == usuario.email).first():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="El email ya está registrado"
|
|
)
|
|
|
|
# Si es el primer usuario, hacerlo superuser
|
|
total_usuarios = db.query(Usuario).count()
|
|
is_superuser = (total_usuarios == 0)
|
|
|
|
# Crear usuario
|
|
db_usuario = Usuario(
|
|
username=usuario.username,
|
|
email=usuario.email,
|
|
nombre_completo=usuario.nombre_completo,
|
|
hashed_password=get_password_hash(usuario.password),
|
|
is_superuser=is_superuser
|
|
)
|
|
|
|
db.add(db_usuario)
|
|
db.commit()
|
|
db.refresh(db_usuario)
|
|
|
|
return db_usuario
|
|
|
|
|
|
@router.post("/login", response_model=Token)
|
|
def login(login_data: LoginRequest, db: Session = Depends(get_db)):
|
|
"""
|
|
Iniciar sesión y obtener token JWT
|
|
"""
|
|
user = db.query(Usuario).filter(Usuario.username == login_data.username).first()
|
|
|
|
if not user or not verify_password(login_data.password, user.hashed_password):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Usuario o contraseña incorrectos",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
if not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Usuario inactivo"
|
|
)
|
|
|
|
# Actualizar último login
|
|
user.last_login = datetime.utcnow()
|
|
db.commit()
|
|
|
|
# Crear token
|
|
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={"sub": user.username}, expires_delta=access_token_expires
|
|
)
|
|
|
|
return {"access_token": access_token, "token_type": "bearer"}
|
|
|
|
|
|
@router.get("/me", response_model=UsuarioResponse)
|
|
def read_users_me(current_user: Usuario = Depends(get_current_user)):
|
|
"""
|
|
Obtener información del usuario actual
|
|
"""
|
|
return current_user
|
|
|
|
|
|
@router.put("/me", response_model=UsuarioResponse)
|
|
def update_user_me(
|
|
usuario_update: UsuarioUpdate,
|
|
current_user: Usuario = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Actualizar información del usuario actual
|
|
"""
|
|
update_data = usuario_update.model_dump(exclude_unset=True)
|
|
|
|
# Verificar email único si se actualiza
|
|
if "email" in update_data and update_data["email"] != current_user.email:
|
|
if db.query(Usuario).filter(Usuario.email == update_data["email"]).first():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="El email ya está registrado"
|
|
)
|
|
|
|
# Si hay password, hashear
|
|
if "password" in update_data:
|
|
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
|
|
|
|
for field, value in update_data.items():
|
|
setattr(current_user, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(current_user)
|
|
|
|
return current_user
|
|
|
|
|
|
@router.get("/", response_model=List[UsuarioResponse])
|
|
def list_users(
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
current_user: Usuario = Depends(get_current_active_superuser),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Listar todos los usuarios (solo superuser)
|
|
"""
|
|
usuarios = db.query(Usuario).offset(skip).limit(limit).all()
|
|
return usuarios
|
|
|
|
|
|
@router.get("/{usuario_id}", response_model=UsuarioResponse)
|
|
def get_user(
|
|
usuario_id: int,
|
|
current_user: Usuario = Depends(get_current_active_superuser),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Obtener un usuario por ID (solo superuser)
|
|
"""
|
|
usuario = db.query(Usuario).filter(Usuario.id == usuario_id).first()
|
|
|
|
if not usuario:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Usuario no encontrado"
|
|
)
|
|
|
|
return usuario
|
|
|
|
|
|
@router.put("/{usuario_id}", response_model=UsuarioResponse)
|
|
def update_user(
|
|
usuario_id: int,
|
|
usuario_update: UsuarioUpdate,
|
|
current_user: Usuario = Depends(get_current_active_superuser),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Actualizar un usuario (solo superuser)
|
|
"""
|
|
usuario = db.query(Usuario).filter(Usuario.id == usuario_id).first()
|
|
|
|
if not usuario:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Usuario no encontrado"
|
|
)
|
|
|
|
update_data = usuario_update.model_dump(exclude_unset=True)
|
|
|
|
# Verificar email único si se actualiza
|
|
if "email" in update_data and update_data["email"] != usuario.email:
|
|
if db.query(Usuario).filter(Usuario.email == update_data["email"]).first():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="El email ya está registrado"
|
|
)
|
|
|
|
# Si hay password, hashear
|
|
if "password" in update_data:
|
|
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
|
|
|
|
for field, value in update_data.items():
|
|
setattr(usuario, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(usuario)
|
|
|
|
return usuario
|
|
|
|
|
|
@router.delete("/{usuario_id}", response_model=MessageResponse)
|
|
def delete_user(
|
|
usuario_id: int,
|
|
current_user: Usuario = Depends(get_current_active_superuser),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Eliminar un usuario (solo superuser)
|
|
"""
|
|
usuario = db.query(Usuario).filter(Usuario.id == usuario_id).first()
|
|
|
|
if not usuario:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Usuario no encontrado"
|
|
)
|
|
|
|
# No permitir eliminar el propio usuario
|
|
if usuario.id == current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="No puedes eliminar tu propio usuario"
|
|
)
|
|
|
|
db.delete(usuario)
|
|
db.commit()
|
|
|
|
return {"message": f"Usuario {usuario.username} eliminado exitosamente"}
|