Files
sales-manager-system/backend/app/services/report_service.py
2026-04-13 14:22:31 +08:00

370 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import io
from datetime import date, datetime
from typing import Optional, Dict, List, Any
from sqlalchemy.orm import Session
from sqlalchemy import func, and_
from app.models import Employee, SecondaryAgent, ProductCategory, PerformanceRecord, User
from app.services.calculate_service import (
calculate_employee_income, calculate_company_profit,
get_period_dates, get_rebate_rate_by_period
)
def export_employee_report(
db: Session,
employee_id: int,
period: str,
year: int,
month: Optional[int] = None
) -> tuple:
"""
导出员工收益明细Excel
Returns:
tuple: (excel_bytes, filename)
"""
# 计算员工收益
result = calculate_employee_income(db, employee_id, period, year, month, save_result=False)
# 获取周期日期范围
start_date, end_date, period_key = get_period_dates(period, year, month)
# 获取业绩明细
personal_records = db.query(PerformanceRecord).filter(
and_(
PerformanceRecord.employee_id == employee_id,
PerformanceRecord.record_type == "employee",
PerformanceRecord.record_date >= start_date,
PerformanceRecord.record_date < end_date
)
).all()
agent_records = db.query(PerformanceRecord).filter(
and_(
PerformanceRecord.employee_id == employee_id,
PerformanceRecord.record_type == "agent",
PerformanceRecord.record_date >= start_date,
PerformanceRecord.record_date < end_date
)
).all()
# 创建Excel writer
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
# Sheet 1: 收益汇总
summary_data = {
'项目': [
'员工姓名',
'计算周期',
'业绩总额',
'目标金额',
'完成率(%)',
'底薪',
'绩效奖金',
'个人提成',
'代理提成',
'总收入',
'公司返点',
'公司成本',
'公司利润'
],
'金额': [
result['employee_name'],
result['period_key'],
result['total_performance'],
result['target_amount'],
f"{result['completion_rate']:.2f}",
result['base_salary'],
result['performance_bonus'],
result['personal_commission'],
result['agent_commission'],
result['total_income'],
result['company_rebate'],
result['company_cost'],
result['company_profit']
]
}
df_summary = pd.DataFrame(summary_data)
df_summary.to_excel(writer, sheet_name='收益汇总', index=False)
# Sheet 2: 个人业绩明细
if personal_records:
personal_data = []
for record in personal_records:
personal_data.append({
'日期': record.record_date,
'客户名称': record.customer_name or '',
'订单号': record.order_no or '',
'产品分类': record.category.name if record.category else '',
'业绩金额': float(record.amount or 0),
'提成比例': float(record.category.commission_rate or 0) if record.category else 0,
'提成金额': float(record.amount or 0) * float(record.category.commission_rate or 0) if record.category else 0
})
df_personal = pd.DataFrame(personal_data)
df_personal.to_excel(writer, sheet_name='个人业绩明细', index=False)
else:
pd.DataFrame({'提示': ['该周期内无个人业绩记录']}).to_excel(writer, sheet_name='个人业绩明细', index=False)
# Sheet 3: 代理业绩明细
if agent_records:
agent_data = []
for record in agent_records:
agent = record.agent
agent_data.append({
'日期': record.record_date,
'代理公司': agent.company_name if agent else '',
'客户名称': record.customer_name or '',
'订单号': record.order_no or '',
'产品分类': record.category.name if record.category else '',
'业绩金额': float(record.amount or 0),
'分佣比例': float(agent.profit_share_rate or 0.6) if agent else 0.6,
'分佣金额': float(record.amount or 0) * float(agent.profit_share_rate or 0.6) if agent else float(record.amount or 0) * 0.6
})
df_agent = pd.DataFrame(agent_data)
df_agent.to_excel(writer, sheet_name='代理业绩明细', index=False)
else:
pd.DataFrame({'提示': ['该周期内无代理业绩记录']}).to_excel(writer, sheet_name='代理业绩明细', index=False)
# Sheet 4: 提成明细
if result.get('detail') and result['detail'].get('personal_performance'):
commission_data = []
for item in result['detail']['personal_performance'].get('by_category', []):
commission_data.append({
'产品分类': item['category_name'],
'业绩金额': item['amount'],
'提成比例': item['commission_rate'],
'提成金额': item['commission']
})
if commission_data:
df_commission = pd.DataFrame(commission_data)
df_commission.to_excel(writer, sheet_name='提成明细', index=False)
output.seek(0)
filename = f"employee_report_{employee_id}_{period_key}.xlsx"
return output.getvalue(), filename
def export_company_report(
db: Session,
period: str,
year: int,
month: Optional[int] = None
) -> tuple:
"""
导出公司收益汇总Excel
Returns:
tuple: (excel_bytes, filename)
"""
# 计算公司收益
result = calculate_company_profit(db, period, year, month)
# 获取周期日期范围
start_date, end_date, period_key = get_period_dates(period, year, month)
# 创建Excel writer
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
# Sheet 1: 公司收益汇总
summary_data = {
'项目': [
'计算周期',
'员工数量',
'总返点收入',
'员工成本',
'代理分成',
'总成本',
'公司利润'
],
'金额': [
result['period_key'],
result['employee_count'],
result['total_rebate'],
result['total_employee_cost'],
result['total_agent_share'],
result['total_cost'],
result['company_profit']
]
}
df_summary = pd.DataFrame(summary_data)
df_summary.to_excel(writer, sheet_name='公司收益汇总', index=False)
# Sheet 2: 返点明细
if result.get('rebate_details'):
rebate_data = []
for item in result['rebate_details']:
rebate_data.append({
'产品分类': item['category_name'],
'业绩金额': item['amount'],
'返点比例': item['rebate_rate'],
'返点金额': item['rebate']
})
df_rebate = pd.DataFrame(rebate_data)
df_rebate.to_excel(writer, sheet_name='返点明细', index=False)
# Sheet 3: 员工成本明细
if result.get('employee_details'):
emp_data = []
for item in result['employee_details']:
emp_data.append({
'员工ID': item['employee_id'],
'员工姓名': item['employee_name'],
'员工收入': item['total_income'],
'代理分成': item['agent_share']
})
df_emp = pd.DataFrame(emp_data)
df_emp.to_excel(writer, sheet_name='员工成本明细', index=False)
output.seek(0)
filename = f"company_report_{period_key}.xlsx"
return output.getvalue(), filename
def export_performance_report(
db: Session,
filters: Dict[str, Any]
) -> tuple:
"""
导出业绩报表Excel
Args:
filters: 筛选条件
- start_date: 开始日期
- end_date: 结束日期
- employee_id: 员工ID可选
- agent_id: 代理ID可选
- category_id: 分类ID可选
- record_type: 记录类型employee/agent
Returns:
tuple: (excel_bytes, filename)
"""
# 构建查询
query = db.query(PerformanceRecord)
# 应用筛选条件
if filters.get('start_date'):
query = query.filter(PerformanceRecord.record_date >= filters['start_date'])
if filters.get('end_date'):
query = query.filter(PerformanceRecord.record_date <= filters['end_date'])
if filters.get('employee_id'):
query = query.filter(PerformanceRecord.employee_id == filters['employee_id'])
if filters.get('agent_id'):
query = query.filter(PerformanceRecord.agent_id == filters['agent_id'])
if filters.get('category_id'):
query = query.filter(PerformanceRecord.category_id == filters['category_id'])
if filters.get('record_type'):
query = query.filter(PerformanceRecord.record_type == filters['record_type'])
records = query.order_by(PerformanceRecord.record_date.desc()).all()
# 创建Excel writer
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
# Sheet 1: 业绩明细
if records:
data = []
total_amount = 0
for record in records:
amount = float(record.amount or 0)
total_amount += amount
employee_name = ""
if record.employee and record.employee.user:
employee_name = record.employee.user.name
agent_name = ""
if record.agent:
agent_name = record.agent.company_name
category_name = ""
if record.category:
category_name = record.category.name
data.append({
'ID': record.id,
'记录类型': '员工业绩' if record.record_type == 'employee' else '代理业绩',
'日期': record.record_date,
'员工': employee_name,
'代理': agent_name,
'产品分类': category_name,
'客户名称': record.customer_name or '',
'订单号': record.order_no or '',
'业绩金额': amount,
'备注': record.remark or ''
})
df = pd.DataFrame(data)
df.to_excel(writer, sheet_name='业绩明细', index=False)
# Sheet 2: 汇总统计
summary_data = {
'统计项': [
'记录总数',
'业绩总额',
'平均单笔业绩',
'员工业绩笔数',
'代理业绩笔数'
],
'数值': [
len(records),
total_amount,
round(total_amount / len(records), 2) if records else 0,
len([r for r in records if r.record_type == 'employee']),
len([r for r in records if r.record_type == 'agent'])
]
}
df_summary = pd.DataFrame(summary_data)
df_summary.to_excel(writer, sheet_name='汇总统计', index=False)
# Sheet 3: 按员工汇总
emp_summary = {}
for record in records:
emp_name = record.employee.user.name if record.employee and record.employee.user else '未知'
if emp_name not in emp_summary:
emp_summary[emp_name] = {'count': 0, 'amount': 0}
emp_summary[emp_name]['count'] += 1
emp_summary[emp_name]['amount'] += float(record.amount or 0)
emp_data = []
for name, stats in emp_summary.items():
emp_data.append({
'员工': name,
'业绩笔数': stats['count'],
'业绩总额': stats['amount']
})
df_emp = pd.DataFrame(emp_data)
df_emp.to_excel(writer, sheet_name='按员工汇总', index=False)
# Sheet 4: 按分类汇总
cat_summary = {}
for record in records:
cat_name = record.category.name if record.category else '未知'
if cat_name not in cat_summary:
cat_summary[cat_name] = {'count': 0, 'amount': 0}
cat_summary[cat_name]['count'] += 1
cat_summary[cat_name]['amount'] += float(record.amount or 0)
cat_data = []
for name, stats in cat_summary.items():
cat_data.append({
'产品分类': name,
'业绩笔数': stats['count'],
'业绩总额': stats['amount']
})
df_cat = pd.DataFrame(cat_data)
df_cat.to_excel(writer, sheet_name='按分类汇总', index=False)
else:
pd.DataFrame({'提示': ['无业绩记录']}).to_excel(writer, sheet_name='业绩明细', index=False)
output.seek(0)
# 生成文件名
date_str = datetime.now().strftime('%Y%m%d')
filename = f"performance_report_{date_str}.xlsx"
return output.getvalue(), filename