560 lines
20 KiB
Python
560 lines
20 KiB
Python
from typing import Optional, List, Dict, Any
|
||
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
||
from sqlalchemy.orm import Session
|
||
from pydantic import BaseModel, Field
|
||
from datetime import datetime
|
||
from decimal import Decimal
|
||
import json
|
||
|
||
from app.database import get_db
|
||
from app.models import User, ProductCategory, OperationLog
|
||
from app.routers.auth import get_current_user, get_current_admin
|
||
|
||
router = APIRouter(prefix="/categories", tags=["产品分类管理"])
|
||
|
||
|
||
# Pydantic模型
|
||
class CategoryBase(BaseModel):
|
||
name: str
|
||
code: Optional[str] = None
|
||
parent_id: Optional[int] = None
|
||
commission_rate: Decimal = Field(default=0.03)
|
||
monthly_rebate: Decimal = Field(default=0.10)
|
||
quarterly_rebate: Optional[Decimal] = None
|
||
is_main_product: int = Field(default=0)
|
||
sort_order: int = Field(default=0)
|
||
|
||
|
||
class CategoryCreate(CategoryBase):
|
||
pass
|
||
|
||
|
||
class CategoryUpdate(BaseModel):
|
||
name: Optional[str] = None
|
||
code: Optional[str] = None
|
||
parent_id: Optional[int] = None
|
||
commission_rate: Optional[Decimal] = None
|
||
monthly_rebate: Optional[Decimal] = None
|
||
quarterly_rebate: Optional[Decimal] = None
|
||
is_main_product: Optional[int] = None
|
||
sort_order: Optional[int] = None
|
||
status: Optional[int] = None
|
||
|
||
|
||
class CategoryImportItem(BaseModel):
|
||
name: str
|
||
code: str
|
||
parent_code: Optional[str] = None
|
||
commission_rate: Decimal = Field(default=0.03)
|
||
monthly_rebate: Decimal = Field(default=0.10)
|
||
quarterly_rebate: Optional[Decimal] = None
|
||
is_main_product: int = Field(default=0)
|
||
|
||
|
||
class CategoryImportData(BaseModel):
|
||
items: List[CategoryImportItem]
|
||
source_file: str
|
||
|
||
|
||
# 导入预览缓存(实际生产环境应使用Redis等)
|
||
_import_preview_cache: Dict[str, Any] = {}
|
||
|
||
|
||
def log_operation(db: Session, user_id: int, action: str, target_type: str, target_id: Optional[int] = None,
|
||
old_value: Optional[str] = None, new_value: Optional[str] = None):
|
||
"""记录操作日志"""
|
||
log = OperationLog(
|
||
user_id=user_id,
|
||
action=action,
|
||
target_type=target_type,
|
||
target_id=target_id,
|
||
old_value=old_value,
|
||
new_value=new_value
|
||
)
|
||
db.add(log)
|
||
db.commit()
|
||
|
||
|
||
def build_category_tree(categories: List[ProductCategory], parent_id: Optional[int] = None) -> List[Dict]:
|
||
"""构建分类树形结构"""
|
||
tree = []
|
||
for cat in categories:
|
||
if cat.parent_id == parent_id:
|
||
children = build_category_tree(categories, cat.id)
|
||
node = {
|
||
"id": cat.id,
|
||
"name": cat.name,
|
||
"code": cat.code,
|
||
"parent_id": cat.parent_id,
|
||
"commission_rate": str(cat.commission_rate),
|
||
"monthly_rebate": str(cat.monthly_rebate),
|
||
"quarterly_rebate": str(cat.quarterly_rebate) if cat.quarterly_rebate else None,
|
||
"is_main_product": cat.is_main_product,
|
||
"sort_order": cat.sort_order,
|
||
"status": cat.status,
|
||
"source_file": cat.source_file,
|
||
"import_batch": cat.import_batch,
|
||
"last_import_at": cat.last_import_at.isoformat() if cat.last_import_at else None,
|
||
"created_at": cat.created_at.isoformat() if cat.created_at else None,
|
||
"updated_at": cat.updated_at.isoformat() if cat.updated_at else None,
|
||
"children": children if children else []
|
||
}
|
||
tree.append(node)
|
||
return sorted(tree, key=lambda x: x["sort_order"])
|
||
|
||
|
||
@router.get("", response_model=dict)
|
||
def get_categories(
|
||
tree: bool = Query(True, description="是否返回树形结构"),
|
||
status: Optional[int] = None,
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user)
|
||
):
|
||
"""获取分类列表(树形结构)"""
|
||
query = db.query(ProductCategory)
|
||
|
||
# 状态筛选
|
||
if status is not None:
|
||
query = query.filter(ProductCategory.status == status)
|
||
|
||
categories = query.order_by(ProductCategory.sort_order, ProductCategory.id).all()
|
||
|
||
if tree:
|
||
data = build_category_tree(categories)
|
||
else:
|
||
data = []
|
||
for cat in categories:
|
||
data.append({
|
||
"id": cat.id,
|
||
"name": cat.name,
|
||
"code": cat.code,
|
||
"parent_id": cat.parent_id,
|
||
"commission_rate": str(cat.commission_rate),
|
||
"monthly_rebate": str(cat.monthly_rebate),
|
||
"quarterly_rebate": str(cat.quarterly_rebate) if cat.quarterly_rebate else None,
|
||
"is_main_product": cat.is_main_product,
|
||
"sort_order": cat.sort_order,
|
||
"status": cat.status,
|
||
"source_file": cat.source_file,
|
||
"import_batch": cat.import_batch,
|
||
"last_import_at": cat.last_import_at.isoformat() if cat.last_import_at else None,
|
||
"created_at": cat.created_at.isoformat() if cat.created_at else None,
|
||
"updated_at": cat.updated_at.isoformat() if cat.updated_at else None
|
||
})
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "success",
|
||
"data": data
|
||
}
|
||
|
||
|
||
@router.get("/{category_id}", response_model=dict)
|
||
def get_category(
|
||
category_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_user: User = Depends(get_current_user)
|
||
):
|
||
"""获取分类详情"""
|
||
category = db.query(ProductCategory).filter(ProductCategory.id == category_id).first()
|
||
|
||
if not category:
|
||
raise HTTPException(status_code=404, detail="分类不存在")
|
||
|
||
# 获取父级信息
|
||
parent_name = None
|
||
if category.parent_id:
|
||
parent = db.query(ProductCategory).filter(ProductCategory.id == category.parent_id).first()
|
||
if parent:
|
||
parent_name = parent.name
|
||
|
||
# 获取子分类
|
||
children = db.query(ProductCategory).filter(ProductCategory.parent_id == category_id).all()
|
||
children_data = [{"id": c.id, "name": c.name, "code": c.code} for c in children]
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "success",
|
||
"data": {
|
||
"id": category.id,
|
||
"name": category.name,
|
||
"code": category.code,
|
||
"parent_id": category.parent_id,
|
||
"parent_name": parent_name,
|
||
"commission_rate": str(category.commission_rate),
|
||
"monthly_rebate": str(category.monthly_rebate),
|
||
"quarterly_rebate": str(category.quarterly_rebate) if category.quarterly_rebate else None,
|
||
"is_main_product": category.is_main_product,
|
||
"sort_order": category.sort_order,
|
||
"status": category.status,
|
||
"source_file": category.source_file,
|
||
"import_batch": category.import_batch,
|
||
"last_import_at": category.last_import_at.isoformat() if category.last_import_at else None,
|
||
"children": children_data,
|
||
"created_at": category.created_at.isoformat() if category.created_at else None,
|
||
"updated_at": category.updated_at.isoformat() if category.updated_at else None
|
||
}
|
||
}
|
||
|
||
|
||
@router.post("", response_model=dict)
|
||
def create_category(
|
||
data: CategoryCreate,
|
||
db: Session = Depends(get_db),
|
||
current_admin: User = Depends(get_current_admin)
|
||
):
|
||
"""创建分类"""
|
||
# 检查code是否已存在
|
||
if data.code:
|
||
existing = db.query(ProductCategory).filter(ProductCategory.code == data.code).first()
|
||
if existing:
|
||
raise HTTPException(status_code=400, detail="分类编码已存在")
|
||
|
||
# 检查父级是否存在
|
||
if data.parent_id:
|
||
parent = db.query(ProductCategory).filter(ProductCategory.id == data.parent_id).first()
|
||
if not parent:
|
||
raise HTTPException(status_code=400, detail="父级分类不存在")
|
||
|
||
category = ProductCategory(
|
||
name=data.name,
|
||
code=data.code,
|
||
parent_id=data.parent_id,
|
||
commission_rate=data.commission_rate,
|
||
monthly_rebate=data.monthly_rebate,
|
||
quarterly_rebate=data.quarterly_rebate,
|
||
is_main_product=data.is_main_product,
|
||
sort_order=data.sort_order,
|
||
status=1
|
||
)
|
||
db.add(category)
|
||
db.commit()
|
||
db.refresh(category)
|
||
|
||
# 记录操作日志
|
||
log_operation(db, current_admin.id, "CREATE_CATEGORY", "category", category.id,
|
||
new_value=f"创建分类: {data.name}, code={data.code}")
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "分类创建成功",
|
||
"data": {
|
||
"id": category.id,
|
||
"name": category.name,
|
||
"code": category.code,
|
||
"parent_id": category.parent_id,
|
||
"commission_rate": str(category.commission_rate),
|
||
"monthly_rebate": str(category.monthly_rebate),
|
||
"quarterly_rebate": str(category.quarterly_rebate) if category.quarterly_rebate else None,
|
||
"is_main_product": category.is_main_product,
|
||
"sort_order": category.sort_order,
|
||
"status": category.status,
|
||
"created_at": category.created_at.isoformat() if category.created_at else None
|
||
}
|
||
}
|
||
|
||
|
||
@router.put("/{category_id}", response_model=dict)
|
||
def update_category(
|
||
category_id: int,
|
||
data: CategoryUpdate,
|
||
db: Session = Depends(get_db),
|
||
current_admin: User = Depends(get_current_admin)
|
||
):
|
||
"""更新分类"""
|
||
category = db.query(ProductCategory).filter(ProductCategory.id == category_id).first()
|
||
|
||
if not category:
|
||
raise HTTPException(status_code=404, detail="分类不存在")
|
||
|
||
# 如果更换父级,检查是否存在且不会形成循环
|
||
if data.parent_id is not None and data.parent_id != category.parent_id:
|
||
if data.parent_id == category_id:
|
||
raise HTTPException(status_code=400, detail="不能将自己设为父级")
|
||
parent = db.query(ProductCategory).filter(ProductCategory.id == data.parent_id).first()
|
||
if not parent:
|
||
raise HTTPException(status_code=400, detail="父级分类不存在")
|
||
# 检查是否会成为子分类的子级(循环引用)
|
||
def check_circular(parent_id: int, target_id: int) -> bool:
|
||
if parent_id == target_id:
|
||
return True
|
||
children = db.query(ProductCategory).filter(ProductCategory.parent_id == parent_id).all()
|
||
for child in children:
|
||
if check_circular(child.id, target_id):
|
||
return True
|
||
return False
|
||
|
||
if check_circular(category_id, data.parent_id):
|
||
raise HTTPException(status_code=400, detail="不能将分类设为其子分类的子级")
|
||
|
||
# 检查code是否已存在
|
||
if data.code and data.code != category.code:
|
||
existing = db.query(ProductCategory).filter(ProductCategory.code == data.code).first()
|
||
if existing:
|
||
raise HTTPException(status_code=400, detail="分类编码已存在")
|
||
|
||
# 记录旧值
|
||
old_value = f"name={category.name}, code={category.code}, commission_rate={category.commission_rate}"
|
||
|
||
# 更新分类信息
|
||
if data.name is not None:
|
||
category.name = data.name
|
||
if data.code is not None:
|
||
category.code = data.code
|
||
if data.parent_id is not None:
|
||
category.parent_id = data.parent_id
|
||
if data.commission_rate is not None:
|
||
category.commission_rate = data.commission_rate
|
||
if data.monthly_rebate is not None:
|
||
category.monthly_rebate = data.monthly_rebate
|
||
if data.quarterly_rebate is not None:
|
||
category.quarterly_rebate = data.quarterly_rebate
|
||
if data.is_main_product is not None:
|
||
category.is_main_product = data.is_main_product
|
||
if data.sort_order is not None:
|
||
category.sort_order = data.sort_order
|
||
if data.status is not None:
|
||
category.status = data.status
|
||
|
||
db.commit()
|
||
db.refresh(category)
|
||
|
||
# 记录操作日志
|
||
new_value = f"name={category.name}, code={category.code}, commission_rate={category.commission_rate}"
|
||
log_operation(db, current_admin.id, "UPDATE_CATEGORY", "category", category_id,
|
||
old_value=old_value, new_value=new_value)
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "分类更新成功",
|
||
"data": {
|
||
"id": category.id,
|
||
"name": category.name,
|
||
"code": category.code,
|
||
"parent_id": category.parent_id,
|
||
"commission_rate": str(category.commission_rate),
|
||
"monthly_rebate": str(category.monthly_rebate),
|
||
"quarterly_rebate": str(category.quarterly_rebate) if category.quarterly_rebate else None,
|
||
"is_main_product": category.is_main_product,
|
||
"sort_order": category.sort_order,
|
||
"status": category.status,
|
||
"updated_at": category.updated_at.isoformat() if category.updated_at else None
|
||
}
|
||
}
|
||
|
||
|
||
@router.delete("/{category_id}", response_model=dict)
|
||
def delete_category(
|
||
category_id: int,
|
||
db: Session = Depends(get_db),
|
||
current_admin: User = Depends(get_current_admin)
|
||
):
|
||
"""删除分类"""
|
||
category = db.query(ProductCategory).filter(ProductCategory.id == category_id).first()
|
||
|
||
if not category:
|
||
raise HTTPException(status_code=404, detail="分类不存在")
|
||
|
||
# 检查是否有子分类
|
||
children = db.query(ProductCategory).filter(ProductCategory.parent_id == category_id).first()
|
||
if children:
|
||
raise HTTPException(status_code=400, detail="该分类下有子分类,请先删除子分类")
|
||
|
||
# 检查是否有关联的业绩记录
|
||
from app.models import PerformanceRecord
|
||
records = db.query(PerformanceRecord).filter(PerformanceRecord.category_id == category_id).first()
|
||
if records:
|
||
raise HTTPException(status_code=400, detail="该分类已关联业绩记录,无法删除")
|
||
|
||
name = category.name
|
||
|
||
# 删除分类
|
||
db.delete(category)
|
||
db.commit()
|
||
|
||
# 记录操作日志
|
||
log_operation(db, current_admin.id, "DELETE_CATEGORY", "category", category_id,
|
||
old_value=f"删除分类: {name}")
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "分类删除成功",
|
||
"data": None
|
||
}
|
||
|
||
|
||
@router.post("/import", response_model=dict)
|
||
def preview_import_categories(
|
||
data: CategoryImportData,
|
||
db: Session = Depends(get_db),
|
||
current_admin: User = Depends(get_current_admin)
|
||
):
|
||
"""火山引擎清单导入(预览)"""
|
||
import uuid
|
||
|
||
batch_id = str(uuid.uuid4())[:8]
|
||
preview_items = []
|
||
errors = []
|
||
|
||
# 收集现有分类编码
|
||
existing_codes = {c.code: c for c in db.query(ProductCategory).all() if c.code}
|
||
|
||
# 构建编码到ID的映射(用于父级关联)
|
||
code_to_id = {}
|
||
new_code_to_index = {}
|
||
|
||
for idx, item in enumerate(data.items):
|
||
if not item.code:
|
||
errors.append({"index": idx, "message": "分类编码不能为空"})
|
||
continue
|
||
|
||
if not item.name:
|
||
errors.append({"index": idx, "message": "分类名称不能为空"})
|
||
continue
|
||
|
||
# 检查编码是否重复(在当前导入数据中)
|
||
if item.code in new_code_to_index:
|
||
errors.append({"index": idx, "message": f"编码 '{item.code}' 在导入数据中重复"})
|
||
continue
|
||
|
||
new_code_to_index[item.code] = idx
|
||
|
||
# 判断是新增还是更新
|
||
action = "update" if item.code in existing_codes else "create"
|
||
existing = existing_codes.get(item.code)
|
||
|
||
preview_item = {
|
||
"index": idx,
|
||
"code": item.code,
|
||
"name": item.name,
|
||
"parent_code": item.parent_code,
|
||
"commission_rate": str(item.commission_rate),
|
||
"monthly_rebate": str(item.monthly_rebate),
|
||
"quarterly_rebate": str(item.quarterly_rebate) if item.quarterly_rebate else None,
|
||
"is_main_product": item.is_main_product,
|
||
"action": action,
|
||
"existing_id": existing.id if existing else None,
|
||
"existing_name": existing.name if existing else None
|
||
}
|
||
preview_items.append(preview_item)
|
||
|
||
# 检查父级编码是否存在
|
||
for item in preview_items:
|
||
if item["parent_code"]:
|
||
if item["parent_code"] not in existing_codes and item["parent_code"] not in new_code_to_index:
|
||
errors.append({"index": item["index"], "message": f"父级编码 '{item['parent_code']}' 不存在"})
|
||
|
||
# 保存预览数据到缓存
|
||
preview_data = {
|
||
"batch_id": batch_id,
|
||
"source_file": data.source_file,
|
||
"items": data.items,
|
||
"preview": preview_items,
|
||
"errors": errors,
|
||
"created_at": datetime.now().isoformat()
|
||
}
|
||
_import_preview_cache[batch_id] = preview_data
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "预览生成成功",
|
||
"data": {
|
||
"batch_id": batch_id,
|
||
"total": len(data.items),
|
||
"create_count": len([p for p in preview_items if p["action"] == "create"]),
|
||
"update_count": len([p for p in preview_items if p["action"] == "update"]),
|
||
"error_count": len(errors),
|
||
"preview": preview_items,
|
||
"errors": errors
|
||
}
|
||
}
|
||
|
||
|
||
@router.post("/import/confirm", response_model=dict)
|
||
def confirm_import_categories(
|
||
batch_id: str,
|
||
db: Session = Depends(get_db),
|
||
current_admin: User = Depends(get_current_admin)
|
||
):
|
||
"""确认导入"""
|
||
if batch_id not in _import_preview_cache:
|
||
raise HTTPException(status_code=404, detail="导入批次不存在或已过期")
|
||
|
||
preview_data = _import_preview_cache[batch_id]
|
||
|
||
if preview_data["errors"]:
|
||
raise HTTPException(status_code=400, detail="导入数据存在错误,无法确认导入")
|
||
|
||
items = preview_data["items"]
|
||
source_file = preview_data["source_file"]
|
||
|
||
# 收集现有分类
|
||
existing_codes = {c.code: c for c in db.query(ProductCategory).all() if c.code}
|
||
|
||
# 第一步:创建/更新所有分类(不处理parent_id)
|
||
created_items = {}
|
||
updated_count = 0
|
||
created_count = 0
|
||
|
||
for item in items:
|
||
if item.code in existing_codes:
|
||
# 更新现有分类
|
||
category = existing_codes[item.code]
|
||
category.name = item.name
|
||
category.commission_rate = item.commission_rate
|
||
category.monthly_rebate = item.monthly_rebate
|
||
category.quarterly_rebate = item.quarterly_rebate
|
||
category.is_main_product = item.is_main_product
|
||
category.source_file = source_file
|
||
category.import_batch = batch_id
|
||
category.last_import_at = datetime.now()
|
||
updated_count += 1
|
||
else:
|
||
# 创建新分类
|
||
category = ProductCategory(
|
||
name=item.name,
|
||
code=item.code,
|
||
commission_rate=item.commission_rate,
|
||
monthly_rebate=item.monthly_rebate,
|
||
quarterly_rebate=item.quarterly_rebate,
|
||
is_main_product=item.is_main_product,
|
||
source_file=source_file,
|
||
import_batch=batch_id,
|
||
last_import_at=datetime.now(),
|
||
status=1
|
||
)
|
||
db.add(category)
|
||
db.flush() # 获取ID
|
||
created_count += 1
|
||
|
||
created_items[item.code] = category
|
||
|
||
# 第二步:处理父级关联
|
||
for item in items:
|
||
if item.parent_code:
|
||
category = created_items[item.code]
|
||
if item.parent_code in created_items:
|
||
category.parent_id = created_items[item.parent_code].id
|
||
elif item.parent_code in existing_codes:
|
||
category.parent_id = existing_codes[item.parent_code].id
|
||
|
||
db.commit()
|
||
|
||
# 记录操作日志
|
||
log_operation(db, current_admin.id, "IMPORT_CATEGORIES", "category", None,
|
||
new_value=f"导入分类: 创建{created_count}个, 更新{updated_count}个, 来源:{source_file}")
|
||
|
||
# 清理缓存
|
||
del _import_preview_cache[batch_id]
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "导入成功",
|
||
"data": {
|
||
"batch_id": batch_id,
|
||
"created": created_count,
|
||
"updated": updated_count,
|
||
"total": len(items)
|
||
}
|
||
}
|