Files
endpoin/auth.py
2025-12-23 16:54:07 +00:00

233 lines
7.2 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from typing import List
from app.database import get_db
from app.models import Usuario
from app.schemas import (
UsuarioCreate, UsuarioUpdate, UsuarioResponse,
Token, LoginRequest, MessageResponse
)
from app.auth import verify_password, get_password_hash, create_access_token
from app.dependencies import get_current_user, get_current_active_superuser
from app.config import settings
router = APIRouter()
@router.post("/register", response_model=UsuarioResponse, status_code=status.HTTP_201_CREATED)
def register(usuario: UsuarioCreate, db: Session = Depends(get_db)):
"""
Registrar un nuevo usuario (público - primer usuario será superuser)
"""
# Verificar si el username ya existe
if db.query(Usuario).filter(Usuario.username == usuario.username).first():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="El username ya está registrado"
)
# Verificar si el email ya existe
if db.query(Usuario).filter(Usuario.email == usuario.email).first():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="El email ya está registrado"
)
# Si es el primer usuario, hacerlo superuser
total_usuarios = db.query(Usuario).count()
is_superuser = (total_usuarios == 0)
# Crear usuario
db_usuario = Usuario(
username=usuario.username,
email=usuario.email,
nombre_completo=usuario.nombre_completo,
hashed_password=get_password_hash(usuario.password),
is_superuser=is_superuser
)
db.add(db_usuario)
db.commit()
db.refresh(db_usuario)
return db_usuario
@router.post("/login", response_model=Token)
def login(login_data: LoginRequest, db: Session = Depends(get_db)):
"""
Iniciar sesión y obtener token JWT
"""
user = db.query(Usuario).filter(Usuario.username == login_data.username).first()
if not user or not verify_password(login_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Usuario o contraseña incorrectos",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Usuario inactivo"
)
# Actualizar último login
user.last_login = datetime.utcnow()
db.commit()
# Crear token
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/me", response_model=UsuarioResponse)
def read_users_me(current_user: Usuario = Depends(get_current_user)):
"""
Obtener información del usuario actual
"""
return current_user
@router.put("/me", response_model=UsuarioResponse)
def update_user_me(
usuario_update: UsuarioUpdate,
current_user: Usuario = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Actualizar información del usuario actual
"""
update_data = usuario_update.model_dump(exclude_unset=True)
# Verificar email único si se actualiza
if "email" in update_data and update_data["email"] != current_user.email:
if db.query(Usuario).filter(Usuario.email == update_data["email"]).first():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="El email ya está registrado"
)
# Si hay password, hashear
if "password" in update_data:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
for field, value in update_data.items():
setattr(current_user, field, value)
db.commit()
db.refresh(current_user)
return current_user
@router.get("/", response_model=List[UsuarioResponse])
def list_users(
skip: int = 0,
limit: int = 100,
current_user: Usuario = Depends(get_current_active_superuser),
db: Session = Depends(get_db)
):
"""
Listar todos los usuarios (solo superuser)
"""
usuarios = db.query(Usuario).offset(skip).limit(limit).all()
return usuarios
@router.get("/{usuario_id}", response_model=UsuarioResponse)
def get_user(
usuario_id: int,
current_user: Usuario = Depends(get_current_active_superuser),
db: Session = Depends(get_db)
):
"""
Obtener un usuario por ID (solo superuser)
"""
usuario = db.query(Usuario).filter(Usuario.id == usuario_id).first()
if not usuario:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Usuario no encontrado"
)
return usuario
@router.put("/{usuario_id}", response_model=UsuarioResponse)
def update_user(
usuario_id: int,
usuario_update: UsuarioUpdate,
current_user: Usuario = Depends(get_current_active_superuser),
db: Session = Depends(get_db)
):
"""
Actualizar un usuario (solo superuser)
"""
usuario = db.query(Usuario).filter(Usuario.id == usuario_id).first()
if not usuario:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Usuario no encontrado"
)
update_data = usuario_update.model_dump(exclude_unset=True)
# Verificar email único si se actualiza
if "email" in update_data and update_data["email"] != usuario.email:
if db.query(Usuario).filter(Usuario.email == update_data["email"]).first():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="El email ya está registrado"
)
# Si hay password, hashear
if "password" in update_data:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
for field, value in update_data.items():
setattr(usuario, field, value)
db.commit()
db.refresh(usuario)
return usuario
@router.delete("/{usuario_id}", response_model=MessageResponse)
def delete_user(
usuario_id: int,
current_user: Usuario = Depends(get_current_active_superuser),
db: Session = Depends(get_db)
):
"""
Eliminar un usuario (solo superuser)
"""
usuario = db.query(Usuario).filter(Usuario.id == usuario_id).first()
if not usuario:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Usuario no encontrado"
)
# No permitir eliminar el propio usuario
if usuario.id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No puedes eliminar tu propio usuario"
)
db.delete(usuario)
db.commit()
return {"message": f"Usuario {usuario.username} eliminado exitosamente"}