from typing import Optional, List from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy.orm import Session from pydantic import BaseModel, Field from datetime import date from decimal import Decimal from app.database import get_db from app.models import User, Employee, OperationLog from app.routers.auth import get_current_user, get_current_admin router = APIRouter(prefix="/employees", tags=["员工管理"]) # Pydantic模型 class EmployeeTarget(BaseModel): monthly_target: Decimal = Field(default=0) quarterly_target: Decimal = Field(default=0) half_year_target: Decimal = Field(default=0) yearly_target: Decimal = Field(default=0) class EmployeeBase(BaseModel): name: str phone: Optional[str] = None email: Optional[str] = None department: Optional[str] = None position: Optional[str] = None base_salary: Decimal = Field(default=4000) hire_date: Optional[date] = None class EmployeeCreate(EmployeeBase): username: str password: str targets: Optional[EmployeeTarget] = None class EmployeeUpdate(BaseModel): name: Optional[str] = None phone: Optional[str] = None email: Optional[str] = None department: Optional[str] = None position: Optional[str] = None base_salary: Optional[Decimal] = None hire_date: Optional[date] = None status: Optional[int] = None class EmployeeResponse(BaseModel): id: int user_id: int username: str name: str phone: Optional[str] email: Optional[str] department: Optional[str] position: Optional[str] base_salary: Decimal monthly_target: Decimal quarterly_target: Decimal half_year_target: Decimal yearly_target: Decimal hire_date: Optional[date] status: int created_at: str updated_at: str class Config: from_attributes = True class EmployeeListResponse(BaseModel): items: List[EmployeeResponse] total: int page: int page_size: int def log_operation(db: Session, user_id: int, action: str, target_type: str, target_id: int, old_value: Optional[str] = None, new_value: Optional[str] = None): """记录操作日志""" log = OperationLog( user_id=user_id, action=action, target_type=target_type, target_id=target_id, old_value=old_value, new_value=new_value ) db.add(log) db.commit() @router.get("", response_model=dict) def get_employees( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=1000), search: Optional[str] = None, department: Optional[str] = None, status: Optional[int] = None, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """获取员工列表(支持分页、搜索)""" query = db.query(Employee).join(User) # 搜索条件 if search: query = query.filter( (User.name.contains(search)) | (User.username.contains(search)) | (User.phone.contains(search)) ) # 部门筛选 if department: query = query.filter(Employee.department == department) # 状态筛选 if status is not None: query = query.filter(User.status == status) # 计算总数 total = query.count() # 分页 employees = query.offset((page - 1) * page_size).limit(page_size).all() # 构建响应数据 items = [] for emp in employees: items.append({ "id": emp.id, "user_id": emp.user_id, "username": emp.user.username, "name": emp.user.name, "phone": emp.user.phone, "email": emp.user.email, "department": emp.department, "position": emp.position, "base_salary": str(emp.base_salary), "monthly_target": str(emp.monthly_target), "quarterly_target": str(emp.quarterly_target), "half_year_target": str(emp.half_year_target), "yearly_target": str(emp.yearly_target), "hire_date": emp.hire_date.isoformat() if emp.hire_date else None, "status": emp.user.status, "created_at": emp.created_at.isoformat() if emp.created_at else None, "updated_at": emp.updated_at.isoformat() if emp.updated_at else None }) return { "code": 200, "message": "success", "data": { "items": items, "total": total, "page": page, "page_size": page_size } } @router.get("/{employee_id}", response_model=dict) def get_employee( employee_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """获取员工详情""" employee = db.query(Employee).filter(Employee.id == employee_id).first() if not employee: raise HTTPException(status_code=404, detail="员工不存在") return { "code": 200, "message": "success", "data": { "id": employee.id, "user_id": employee.user_id, "username": employee.user.username, "name": employee.user.name, "phone": employee.user.phone, "email": employee.user.email, "department": employee.department, "position": employee.position, "base_salary": str(employee.base_salary), "monthly_target": str(employee.monthly_target), "quarterly_target": str(employee.quarterly_target), "half_year_target": str(employee.half_year_target), "yearly_target": str(employee.yearly_target), "hire_date": employee.hire_date.isoformat() if employee.hire_date else None, "status": employee.user.status, "created_at": employee.created_at.isoformat() if employee.created_at else None, "updated_at": employee.updated_at.isoformat() if employee.updated_at else None } } @router.post("", response_model=dict) def create_employee( data: EmployeeCreate, db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin) ): """创建员工(同时创建用户账号)""" # 检查用户名是否已存在 existing_user = db.query(User).filter(User.username == data.username).first() if existing_user: raise HTTPException(status_code=400, detail="用户名已存在") # 检查手机号是否已存在 if data.phone: existing_phone = db.query(User).filter(User.phone == data.phone).first() if existing_phone: raise HTTPException(status_code=400, detail="手机号已存在") # 创建用户 from app.auth import get_password_hash user = User( username=data.username, password_hash=get_password_hash(data.password), role="employee", name=data.name, phone=data.phone, email=data.email, status=1 ) db.add(user) db.flush() # 获取user.id # 创建员工记录 targets = data.targets or EmployeeTarget() employee = Employee( user_id=user.id, base_salary=data.base_salary, monthly_target=targets.monthly_target, quarterly_target=targets.quarterly_target, half_year_target=targets.half_year_target, yearly_target=targets.yearly_target, hire_date=data.hire_date, department=data.department, position=data.position ) db.add(employee) db.commit() db.refresh(employee) # 记录操作日志 log_operation(db, current_admin.id, "CREATE_EMPLOYEE", "employee", employee.id, new_value=f"创建员工: {data.name}, 用户名: {data.username}") return { "code": 200, "message": "员工创建成功", "data": { "id": employee.id, "user_id": employee.user_id, "username": user.username, "name": user.name, "phone": user.phone, "email": user.email, "department": employee.department, "position": employee.position, "base_salary": str(employee.base_salary), "monthly_target": str(employee.monthly_target), "quarterly_target": str(employee.quarterly_target), "half_year_target": str(employee.half_year_target), "yearly_target": str(employee.yearly_target), "hire_date": employee.hire_date.isoformat() if employee.hire_date else None, "status": user.status, "created_at": employee.created_at.isoformat() if employee.created_at else None } } @router.put("/{employee_id}", response_model=dict) def update_employee( employee_id: int, data: EmployeeUpdate, db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin) ): """更新员工信息""" employee = db.query(Employee).filter(Employee.id == employee_id).first() if not employee: raise HTTPException(status_code=404, detail="员工不存在") user = employee.user # 记录旧值 old_value = f"name={user.name}, phone={user.phone}, department={employee.department}, position={employee.position}" # 更新用户信息 if data.name is not None: user.name = data.name if data.phone is not None: user.phone = data.phone if data.email is not None: user.email = data.email if data.status is not None: user.status = data.status # 更新员工信息 if data.department is not None: employee.department = data.department if data.position is not None: employee.position = data.position if data.base_salary is not None: employee.base_salary = data.base_salary if data.hire_date is not None: employee.hire_date = data.hire_date db.commit() db.refresh(employee) # 记录操作日志 new_value = f"name={user.name}, phone={user.phone}, department={employee.department}, position={employee.position}" log_operation(db, current_admin.id, "UPDATE_EMPLOYEE", "employee", employee.id, old_value=old_value, new_value=new_value) return { "code": 200, "message": "员工信息更新成功", "data": { "id": employee.id, "user_id": employee.user_id, "username": user.username, "name": user.name, "phone": user.phone, "email": user.email, "department": employee.department, "position": employee.position, "base_salary": str(employee.base_salary), "monthly_target": str(employee.monthly_target), "quarterly_target": str(employee.quarterly_target), "half_year_target": str(employee.half_year_target), "yearly_target": str(employee.yearly_target), "hire_date": employee.hire_date.isoformat() if employee.hire_date else None, "status": user.status, "updated_at": employee.updated_at.isoformat() if employee.updated_at else None } } @router.delete("/{employee_id}", response_model=dict) def delete_employee( employee_id: int, db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin) ): """删除员工""" employee = db.query(Employee).filter(Employee.id == employee_id).first() if not employee: raise HTTPException(status_code=404, detail="员工不存在") user_id = employee.user_id user_name = employee.user.name # 删除员工(级联删除用户) db.delete(employee) db.commit() # 记录操作日志 log_operation(db, current_admin.id, "DELETE_EMPLOYEE", "employee", employee_id, old_value=f"删除员工: {user_name}, user_id={user_id}") return { "code": 200, "message": "员工删除成功", "data": None } @router.put("/{employee_id}/targets", response_model=dict) def update_employee_targets( employee_id: int, data: EmployeeTarget, db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin) ): """更新员工目标""" employee = db.query(Employee).filter(Employee.id == employee_id).first() if not employee: raise HTTPException(status_code=404, detail="员工不存在") # 记录旧值 old_value = f"monthly={employee.monthly_target}, quarterly={employee.quarterly_target}, half_year={employee.half_year_target}, yearly={employee.yearly_target}" # 更新目标 employee.monthly_target = data.monthly_target employee.quarterly_target = data.quarterly_target employee.half_year_target = data.half_year_target employee.yearly_target = data.yearly_target db.commit() db.refresh(employee) # 记录操作日志 new_value = f"monthly={data.monthly_target}, quarterly={data.quarterly_target}, half_year={data.half_year_target}, yearly={data.yearly_target}" log_operation(db, current_admin.id, "UPDATE_EMPLOYEE_TARGETS", "employee", employee_id, old_value=old_value, new_value=new_value) return { "code": 200, "message": "员工目标更新成功", "data": { "id": employee.id, "name": employee.user.name, "monthly_target": str(employee.monthly_target), "quarterly_target": str(employee.quarterly_target), "half_year_target": str(employee.half_year_target), "yearly_target": str(employee.yearly_target), "updated_at": employee.updated_at.isoformat() if employee.updated_at else None } }