from datetime import date, datetime from decimal import Decimal from typing import Optional, Dict, List, Any from sqlalchemy.orm import Session from sqlalchemy import func, and_ import json from app.models import ( Employee, SecondaryAgent, ProductCategory, PerformanceRecord, CalculationResult, User ) def get_period_dates(period: str, year: int, month: Optional[int] = None, quarter: Optional[int] = None) -> tuple: """ 根据周期类型获取开始和结束日期 Returns: tuple: (start_date, end_date, period_key) """ if period == "monthly": if month is None: raise ValueError("月度周期需要提供month参数") start_date = date(year, month, 1) if month == 12: end_date = date(year + 1, 1, 1) else: end_date = date(year, month + 1, 1) period_key = f"{year}-{month:02d}" elif period == "quarterly": if quarter is None: raise ValueError("季度周期需要提供quarter参数") start_month = (quarter - 1) * 3 + 1 end_month = quarter * 3 + 1 start_date = date(year, start_month, 1) if end_month > 12: end_date = date(year + 1, 1, 1) else: end_date = date(year, end_month, 1) period_key = f"{year}-Q{quarter}" elif period == "half_yearly": start_date = date(year, 1, 1) end_date = date(year, 7, 1) period_key = f"{year}-H1" elif period == "yearly": start_date = date(year, 1, 1) end_date = date(year + 1, 1, 1) period_key = f"{year}" else: raise ValueError(f"不支持的周期类型: {period}") return start_date, end_date, period_key def get_target_by_period(employee: Employee, period: str) -> Decimal: """根据周期类型获取员工目标""" if period == "monthly": return employee.monthly_target or Decimal("0") elif period == "quarterly": return employee.quarterly_target or Decimal("0") elif period == "half_yearly": return employee.half_year_target or Decimal("0") elif period == "yearly": return employee.yearly_target or Decimal("0") return Decimal("0") def get_rebate_rate_by_period(category: ProductCategory, period: str) -> Decimal: """根据周期类型获取返点比例""" if period == "monthly": return category.monthly_rebate or Decimal("0") elif period == "quarterly": return category.quarterly_rebate or category.monthly_rebate or Decimal("0") elif period == "half_yearly": return category.quarterly_rebate or category.monthly_rebate or Decimal("0") elif period == "yearly": return category.quarterly_rebate or category.monthly_rebate or Decimal("0") return Decimal("0") def calculate_employee_income( db: Session, employee_id: int, period: str, year: int, month: Optional[int] = None, quarter: Optional[int] = None, save_result: bool = True ) -> Dict[str, Any]: """ 计算员工在指定周期的收益 计算逻辑: 1. 获取员工业绩数据(根据period筛选) 2. 计算完成率 = 总业绩 / 目标 3. 绩效奖金 = 1000 * min(完成率, 1.0) 如果完成率>=50%,否则0 4. 个人提成 = 业绩按分类汇总 * 各分类提成比例 5. 代理提成 = 二级代理业绩总和 * 0.01 6. 总收入 = 底薪 + 绩效奖金 + 个人提成 + 代理提成 Args: db: 数据库会话 employee_id: 员工ID period: 周期类型 (monthly/quarterly/half_yearly/yearly) year: 年份 month: 月份(月度周期需要) quarter: 季度(季度周期需要) save_result: 是否保存计算结果到数据库 Returns: 计算结果字典 """ # 获取员工信息 employee = db.query(Employee).filter(Employee.id == employee_id).first() if not employee: raise ValueError(f"员工不存在: {employee_id}") # 获取周期日期范围 start_date, end_date, period_key = get_period_dates(period, year, month, quarter) # 获取目标金额 target_amount = get_target_by_period(employee, period) # 1. 获取员工业绩数据(个人业绩) personal_records = db.query(PerformanceRecord).filter( and_( PerformanceRecord.employee_id == employee_id, PerformanceRecord.record_type == "employee", PerformanceRecord.record_date >= start_date, PerformanceRecord.record_date < end_date ) ).all() # 按分类汇总个人业绩 category_performance = {} total_personal_performance = Decimal("0") for record in personal_records: category_id = record.category_id amount = record.amount or Decimal("0") total_personal_performance += amount if category_id not in category_performance: category_performance[category_id] = { "amount": Decimal("0"), "category_name": record.category.name if record.category else "未知分类" } category_performance[category_id]["amount"] += amount # 2. 获取二级代理业绩数据 agent_records = db.query(PerformanceRecord).filter( and_( PerformanceRecord.employee_id == employee_id, PerformanceRecord.record_type == "agent", PerformanceRecord.record_date >= start_date, PerformanceRecord.record_date < end_date ) ).all() total_agent_performance = Decimal("0") for record in agent_records: total_agent_performance += record.amount or Decimal("0") # 计算总业绩 total_performance = total_personal_performance + total_agent_performance # 3. 计算完成率 completion_rate = Decimal("0") if target_amount > 0: completion_rate = (total_performance / target_amount) * 100 # 4. 计算绩效奖金 performance_bonus = Decimal("0") if completion_rate >= 50: rate = min(completion_rate / 100, Decimal("1.0")) performance_bonus = Decimal("1000") * rate # 5. 计算个人提成 personal_commission = Decimal("0") commission_details = [] for category_id, data in category_performance.items(): category = db.query(ProductCategory).filter(ProductCategory.id == category_id).first() if category: commission_rate = category.commission_rate or Decimal("0") commission = data["amount"] * commission_rate personal_commission += commission commission_details.append({ "category_id": category_id, "category_name": data["category_name"], "amount": float(data["amount"]), "commission_rate": float(commission_rate), "commission": float(commission) }) # 6. 计算代理提成(二级代理业绩的1%) agent_commission_rate = Decimal("0.01") agent_commission = total_agent_performance * agent_commission_rate # 7. 计算总收入 base_salary = employee.base_salary or Decimal("0") total_income = base_salary + performance_bonus + personal_commission + agent_commission # 计算公司相关数据 company_rebate = Decimal("0") for category_id, data in category_performance.items(): category = db.query(ProductCategory).filter(ProductCategory.id == category_id).first() if category: rebate_rate = get_rebate_rate_by_period(category, period) company_rebate += data["amount"] * rebate_rate # 公司成本 company_cost = total_income # 员工成本 # 代理分成成本 agent_share_cost = Decimal("0") for record in agent_records: if record.agent: share_rate = record.agent.profit_share_rate or Decimal("0.60") agent_share_cost += (record.amount or Decimal("0")) * share_rate company_cost += agent_share_cost # 公司利润 company_profit = company_rebate - company_cost # 构建详情JSON detail_json = { "personal_performance": { "total": float(total_personal_performance), "by_category": commission_details }, "agent_performance": { "total": float(total_agent_performance), "commission_rate": float(agent_commission_rate), "commission": float(agent_commission) }, "target": { "amount": float(target_amount), "completion_rate": float(completion_rate) }, "bonus_calculation": { "threshold": 50, "max_bonus": 1000, "actual_bonus": float(performance_bonus) } } result = { "employee_id": employee_id, "employee_name": employee.user.name if employee.user else "", "period": period, "year": year, "month": month, "quarter": quarter, "period_key": period_key, "period_start_date": start_date.isoformat(), "period_end_date": end_date.isoformat(), "total_performance": float(total_performance), "target_amount": float(target_amount), "completion_rate": float(completion_rate), "base_salary": float(base_salary), "performance_bonus": float(performance_bonus), "personal_commission": float(personal_commission), "agent_commission": float(agent_commission), "total_income": float(total_income), "company_rebate": float(company_rebate), "company_cost": float(company_cost), "company_profit": float(company_profit), "agent_performance": float(total_agent_performance), "agent_share_amount": float(agent_share_cost), "detail": detail_json } # 保存计算结果到数据库 if save_result: calc_result = CalculationResult( employee_id=employee_id, calc_period=period, calc_year=year, calc_month=month, calc_quarter=quarter, period_start_date=start_date, period_end_date=end_date, total_performance=total_performance, target_amount=target_amount, completion_rate=completion_rate, base_salary=base_salary, performance_bonus=performance_bonus, personal_commission=personal_commission, agent_commission=agent_commission, total_income=total_income, company_rebate=company_rebate, company_cost=company_cost, company_profit=company_profit, agent_performance=total_agent_performance, agent_share_amount=agent_share_cost, detail_json=json.dumps(detail_json, ensure_ascii=False) ) db.add(calc_result) db.commit() db.refresh(calc_result) result["calculation_id"] = calc_result.id return result def calculate_company_profit( db: Session, period: str, year: int, month: Optional[int] = None, quarter: Optional[int] = None, save_result: bool = False ) -> Dict[str, Any]: """ 计算公司在指定周期的收益 计算逻辑: 1. 获取所有员工业绩 2. 公司返点 = 业绩按分类汇总 * 各分类返点比例 3. 公司成本 = 所有员工(底薪+绩效+提成) + 代理分成 4. 公司利润 = 返点 - 成本 Args: db: 数据库会话 period: 周期类型 year: 年份 month: 月份 quarter: 季度 save_result: 是否保存结果 Returns: 计算结果字典 """ # 获取周期日期范围 start_date, end_date, period_key = get_period_dates(period, year, month, quarter) # 获取所有员工 employees = db.query(Employee).join(User).filter(User.status == 1).all() total_company_rebate = Decimal("0") total_company_cost = Decimal("0") total_agent_share = Decimal("0") employee_results = [] # 获取所有业绩记录 all_records = db.query(PerformanceRecord).filter( and_( PerformanceRecord.record_date >= start_date, PerformanceRecord.record_date < end_date ) ).all() # 按分类汇总业绩 category_totals = {} for record in all_records: if record.record_type == "employee": cat_id = record.category_id if cat_id not in category_totals: category_totals[cat_id] = Decimal("0") category_totals[cat_id] += record.amount or Decimal("0") # 计算公司返点 rebate_details = [] for cat_id, amount in category_totals.items(): category = db.query(ProductCategory).filter(ProductCategory.id == cat_id).first() if category: rebate_rate = get_rebate_rate_by_period(category, period) rebate = amount * rebate_rate total_company_rebate += rebate rebate_details.append({ "category_id": cat_id, "category_name": category.name, "amount": float(amount), "rebate_rate": float(rebate_rate), "rebate": float(rebate) }) # 计算每个员工的收益和代理分成 for employee in employees: try: emp_result = calculate_employee_income( db, employee.id, period, year, month, quarter, save_result=False ) total_company_cost += Decimal(str(emp_result["total_income"])) total_agent_share += Decimal(str(emp_result["agent_share_amount"])) employee_results.append({ "employee_id": employee.id, "employee_name": emp_result["employee_name"], "total_income": emp_result["total_income"], "agent_share": emp_result["agent_share_amount"] }) except Exception as e: # 跳过计算失败的员工 continue # 总成本 total_cost = total_company_cost + total_agent_share # 公司利润 company_profit = total_company_rebate - total_cost result = { "period": period, "year": year, "month": month, "quarter": quarter, "period_key": period_key, "period_start_date": start_date.isoformat(), "period_end_date": end_date.isoformat(), "total_rebate": float(total_company_rebate), "total_employee_cost": float(total_company_cost), "total_agent_share": float(total_agent_share), "total_cost": float(total_cost), "company_profit": float(company_profit), "employee_count": len(employee_results), "rebate_details": rebate_details, "employee_details": employee_results } return result def calculate_agent_profit( db: Session, agent_id: int, period: str, year: int, month: Optional[int] = None, quarter: Optional[int] = None ) -> Dict[str, Any]: """ 计算二级代理在指定周期的收益 计算逻辑: 代理分成 = 代理业绩 * 代理分佣比例(默认60%) Args: db: 数据库会话 agent_id: 代理ID period: 周期类型 year: 年份 month: 月份 quarter: 季度 Returns: 计算结果字典 """ # 获取代理信息 agent = db.query(SecondaryAgent).filter(SecondaryAgent.id == agent_id).first() if not agent: raise ValueError(f"代理不存在: {agent_id}") # 获取周期日期范围 start_date, end_date, period_key = get_period_dates(period, year, month, quarter) # 获取代理业绩 agent_records = db.query(PerformanceRecord).filter( and_( PerformanceRecord.agent_id == agent_id, PerformanceRecord.record_date >= start_date, PerformanceRecord.record_date < end_date ) ).all() total_performance = Decimal("0") performance_details = [] for record in agent_records: amount = record.amount or Decimal("0") total_performance += amount performance_details.append({ "record_id": record.id, "date": record.record_date.isoformat() if record.record_date else None, "amount": float(amount), "customer_name": record.customer_name, "order_no": record.order_no }) # 计算代理分成 profit_share_rate = agent.profit_share_rate or Decimal("0.60") profit_share_amount = total_performance * profit_share_rate result = { "agent_id": agent_id, "agent_name": agent.company_name, "contact_name": agent.contact_name, "employee_id": agent.employee_id, "employee_name": agent.employee.user.name if agent.employee and agent.employee.user else "", "period": period, "year": year, "month": month, "quarter": quarter, "period_key": period_key, "period_start_date": start_date.isoformat(), "period_end_date": end_date.isoformat(), "total_performance": float(total_performance), "profit_share_rate": float(profit_share_rate), "profit_share_amount": float(profit_share_amount), "performance_count": len(agent_records), "performance_details": performance_details } return result def get_calculation_history( db: Session, employee_id: Optional[int] = None, period: Optional[str] = None, year: Optional[int] = None, page: int = 1, page_size: int = 20 ) -> Dict[str, Any]: """获取计算历史列表""" query = db.query(CalculationResult) if employee_id: query = query.filter(CalculationResult.employee_id == employee_id) if period: query = query.filter(CalculationResult.calc_period == period) if year: query = query.filter(CalculationResult.calc_year == year) total = query.count() results = query.order_by( CalculationResult.calc_year.desc(), CalculationResult.created_at.desc() ).offset((page - 1) * page_size).limit(page_size).all() items = [] for result in results: items.append({ "id": result.id, "employee_id": result.employee_id, "employee_name": result.employee.user.name if result.employee and result.employee.user else "", "calc_period": result.calc_period, "calc_year": result.calc_year, "calc_month": result.calc_month, "calc_quarter": result.calc_quarter, "period_start_date": result.period_start_date.isoformat() if result.period_start_date else None, "period_end_date": result.period_end_date.isoformat() if result.period_end_date else None, "total_performance": float(result.total_performance) if result.total_performance else 0, "target_amount": float(result.target_amount) if result.target_amount else 0, "completion_rate": float(result.completion_rate) if result.completion_rate else 0, "total_income": float(result.total_income) if result.total_income else 0, "company_profit": float(result.company_profit) if result.company_profit else 0, "created_at": result.created_at.isoformat() if result.created_at else None }) return { "items": items, "total": total, "page": page, "page_size": page_size } def get_calculation_detail(db: Session, calculation_id: int) -> Optional[Dict[str, Any]]: """获取计算历史详情""" result = db.query(CalculationResult).filter(CalculationResult.id == calculation_id).first() if not result: return None detail = None if result.detail_json: try: detail = json.loads(result.detail_json) except: detail = None return { "id": result.id, "employee_id": result.employee_id, "employee_name": result.employee.user.name if result.employee and result.employee.user else "", "calc_period": result.calc_period, "calc_year": result.calc_year, "calc_month": result.calc_month, "calc_quarter": result.calc_quarter, "period_start_date": result.period_start_date.isoformat() if result.period_start_date else None, "period_end_date": result.period_end_date.isoformat() if result.period_end_date else None, "total_performance": float(result.total_performance) if result.total_performance else 0, "target_amount": float(result.target_amount) if result.target_amount else 0, "completion_rate": float(result.completion_rate) if result.completion_rate else 0, "base_salary": float(result.base_salary) if result.base_salary else 0, "performance_bonus": float(result.performance_bonus) if result.performance_bonus else 0, "personal_commission": float(result.personal_commission) if result.personal_commission else 0, "agent_commission": float(result.agent_commission) if result.agent_commission else 0, "total_income": float(result.total_income) if result.total_income else 0, "company_rebate": float(result.company_rebate) if result.company_rebate else 0, "company_cost": float(result.company_cost) if result.company_cost else 0, "company_profit": float(result.company_profit) if result.company_profit else 0, "agent_performance": float(result.agent_performance) if result.agent_performance else 0, "agent_share_amount": float(result.agent_share_amount) if result.agent_share_amount else 0, "detail": detail, "created_at": result.created_at.isoformat() if result.created_at else None }