from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from datetime import datetime, timedelta from typing import List from app.database import get_db from app.models import Usuario from app.schemas import ( UsuarioCreate, UsuarioUpdate, UsuarioResponse, Token, LoginRequest, MessageResponse ) from app.auth import verify_password, get_password_hash, create_access_token from app.dependencies import get_current_user, get_current_active_superuser from app.config import settings router = APIRouter() @router.post("/register", response_model=UsuarioResponse, status_code=status.HTTP_201_CREATED) def register(usuario: UsuarioCreate, db: Session = Depends(get_db)): """ Registrar un nuevo usuario (público - primer usuario será superuser) """ # Verificar si el username ya existe if db.query(Usuario).filter(Usuario.username == usuario.username).first(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="El username ya está registrado" ) # Verificar si el email ya existe if db.query(Usuario).filter(Usuario.email == usuario.email).first(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="El email ya está registrado" ) # Si es el primer usuario, hacerlo superuser total_usuarios = db.query(Usuario).count() is_superuser = (total_usuarios == 0) # Crear usuario db_usuario = Usuario( username=usuario.username, email=usuario.email, nombre_completo=usuario.nombre_completo, hashed_password=get_password_hash(usuario.password), is_superuser=is_superuser ) db.add(db_usuario) db.commit() db.refresh(db_usuario) return db_usuario @router.post("/login", response_model=Token) def login(login_data: LoginRequest, db: Session = Depends(get_db)): """ Iniciar sesión y obtener token JWT """ user = db.query(Usuario).filter(Usuario.username == login_data.username).first() if not user or not verify_password(login_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Usuario o contraseña incorrectos", headers={"WWW-Authenticate": "Bearer"}, ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Usuario inactivo" ) # Actualizar último login user.last_login = datetime.utcnow() db.commit() # Crear token access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @router.get("/me", response_model=UsuarioResponse) def read_users_me(current_user: Usuario = Depends(get_current_user)): """ Obtener información del usuario actual """ return current_user @router.put("/me", response_model=UsuarioResponse) def update_user_me( usuario_update: UsuarioUpdate, current_user: Usuario = Depends(get_current_user), db: Session = Depends(get_db) ): """ Actualizar información del usuario actual """ update_data = usuario_update.model_dump(exclude_unset=True) # Verificar email único si se actualiza if "email" in update_data and update_data["email"] != current_user.email: if db.query(Usuario).filter(Usuario.email == update_data["email"]).first(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="El email ya está registrado" ) # Si hay password, hashear if "password" in update_data: update_data["hashed_password"] = get_password_hash(update_data.pop("password")) for field, value in update_data.items(): setattr(current_user, field, value) db.commit() db.refresh(current_user) return current_user @router.get("/", response_model=List[UsuarioResponse]) def list_users( skip: int = 0, limit: int = 100, current_user: Usuario = Depends(get_current_active_superuser), db: Session = Depends(get_db) ): """ Listar todos los usuarios (solo superuser) """ usuarios = db.query(Usuario).offset(skip).limit(limit).all() return usuarios @router.get("/{usuario_id}", response_model=UsuarioResponse) def get_user( usuario_id: int, current_user: Usuario = Depends(get_current_active_superuser), db: Session = Depends(get_db) ): """ Obtener un usuario por ID (solo superuser) """ usuario = db.query(Usuario).filter(Usuario.id == usuario_id).first() if not usuario: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Usuario no encontrado" ) return usuario @router.put("/{usuario_id}", response_model=UsuarioResponse) def update_user( usuario_id: int, usuario_update: UsuarioUpdate, current_user: Usuario = Depends(get_current_active_superuser), db: Session = Depends(get_db) ): """ Actualizar un usuario (solo superuser) """ usuario = db.query(Usuario).filter(Usuario.id == usuario_id).first() if not usuario: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Usuario no encontrado" ) update_data = usuario_update.model_dump(exclude_unset=True) # Verificar email único si se actualiza if "email" in update_data and update_data["email"] != usuario.email: if db.query(Usuario).filter(Usuario.email == update_data["email"]).first(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="El email ya está registrado" ) # Si hay password, hashear if "password" in update_data: update_data["hashed_password"] = get_password_hash(update_data.pop("password")) for field, value in update_data.items(): setattr(usuario, field, value) db.commit() db.refresh(usuario) return usuario @router.delete("/{usuario_id}", response_model=MessageResponse) def delete_user( usuario_id: int, current_user: Usuario = Depends(get_current_active_superuser), db: Session = Depends(get_db) ): """ Eliminar un usuario (solo superuser) """ usuario = db.query(Usuario).filter(Usuario.id == usuario_id).first() if not usuario: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Usuario no encontrado" ) # No permitir eliminar el propio usuario if usuario.id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="No puedes eliminar tu propio usuario" ) db.delete(usuario) db.commit() return {"message": f"Usuario {usuario.username} eliminado exitosamente"}