Files
2026-04-13 14:22:31 +08:00

153 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Any
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from pydantic import BaseModel
from datetime import timedelta
from app.database import get_db
from app.auth import verify_password, create_access_token, get_password_hash
from app.models import User
from app.config import get_settings
router = APIRouter(prefix="/auth", tags=["认证"])
settings = get_settings()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
# Pydantic模型
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
access_token: str
token_type: str
expires_in: int
user: dict
class UserInfo(BaseModel):
id: int
username: str
name: str
role: str
phone: str = None
email: str = None
last_login_at: str = None
# 依赖函数
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User:
"""获取当前登录用户"""
from app.auth import decode_token
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_token(token)
if payload is None:
raise credentials_exception
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
user = db.query(User).filter(User.id == int(user_id), User.status == 1).first()
if user is None:
raise credentials_exception
return user
async def get_current_admin(current_user: User = Depends(get_current_user)) -> User:
"""验证当前用户是管理员"""
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="权限不足,需要管理员权限"
)
return current_user
# 路由
@router.post("/login", response_model=dict)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
"""用户登录"""
# 查找用户
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
if user.status != 1:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="账号已被禁用"
)
# 更新最后登录时间
from sqlalchemy import func
user.last_login_at = func.now()
db.commit()
# 创建访问令牌
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": str(user.id), "role": user.role},
expires_delta=access_token_expires
)
return {
"code": 200,
"message": "登录成功",
"data": {
"access_token": access_token,
"token_type": "bearer",
"expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
"user": {
"id": user.id,
"username": user.username,
"name": user.name,
"role": user.role
}
}
}
@router.post("/logout")
def logout(current_user: User = Depends(get_current_user)):
"""用户退出前端清除token即可"""
return {
"code": 200,
"message": "退出成功",
"data": None
}
@router.get("/me", response_model=dict)
def get_me(current_user: User = Depends(get_current_user)):
"""获取当前用户信息"""
return {
"code": 200,
"message": "success",
"data": {
"id": current_user.id,
"username": current_user.username,
"name": current_user.name,
"role": current_user.role,
"phone": current_user.phone,
"email": current_user.email,
"last_login_at": current_user.last_login_at.isoformat() if current_user.last_login_at else None
}
}