from typing import Any from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from sqlalchemy.orm import Session from pydantic import BaseModel from datetime import timedelta from app.database import get_db from app.auth import verify_password, create_access_token, get_password_hash from app.models import User from app.config import get_settings router = APIRouter(prefix="/auth", tags=["认证"]) settings = get_settings() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") # Pydantic模型 class LoginRequest(BaseModel): username: str password: str class LoginResponse(BaseModel): access_token: str token_type: str expires_in: int user: dict class UserInfo(BaseModel): id: int username: str name: str role: str phone: str = None email: str = None last_login_at: str = None # 依赖函数 async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User: """获取当前登录用户""" from app.auth import decode_token credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的认证凭据", headers={"WWW-Authenticate": "Bearer"}, ) payload = decode_token(token) if payload is None: raise credentials_exception user_id: str = payload.get("sub") if user_id is None: raise credentials_exception user = db.query(User).filter(User.id == int(user_id), User.status == 1).first() if user is None: raise credentials_exception return user async def get_current_admin(current_user: User = Depends(get_current_user)) -> User: """验证当前用户是管理员""" if current_user.role != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="权限不足,需要管理员权限" ) return current_user # 路由 @router.post("/login", response_model=dict) def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): """用户登录""" # 查找用户 user = db.query(User).filter(User.username == form_data.username).first() if not user or not verify_password(form_data.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码错误", headers={"WWW-Authenticate": "Bearer"}, ) if user.status != 1: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="账号已被禁用" ) # 更新最后登录时间 from sqlalchemy import func user.last_login_at = func.now() db.commit() # 创建访问令牌 access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": str(user.id), "role": user.role}, expires_delta=access_token_expires ) return { "code": 200, "message": "登录成功", "data": { "access_token": access_token, "token_type": "bearer", "expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60, "user": { "id": user.id, "username": user.username, "name": user.name, "role": user.role } } } @router.post("/logout") def logout(current_user: User = Depends(get_current_user)): """用户退出(前端清除token即可)""" return { "code": 200, "message": "退出成功", "data": None } @router.get("/me", response_model=dict) def get_me(current_user: User = Depends(get_current_user)): """获取当前用户信息""" return { "code": 200, "message": "success", "data": { "id": current_user.id, "username": current_user.username, "name": current_user.name, "role": current_user.role, "phone": current_user.phone, "email": current_user.email, "last_login_at": current_user.last_login_at.isoformat() if current_user.last_login_at else None } }