import pandas as pd import io from datetime import date, datetime from typing import Optional, Dict, List, Any from sqlalchemy.orm import Session from sqlalchemy import func, and_ from app.models import Employee, SecondaryAgent, ProductCategory, PerformanceRecord, User from app.services.calculate_service import ( calculate_employee_income, calculate_company_profit, get_period_dates, get_rebate_rate_by_period ) def export_employee_report( db: Session, employee_id: int, period: str, year: int, month: Optional[int] = None ) -> tuple: """ 导出员工收益明细Excel Returns: tuple: (excel_bytes, filename) """ # 计算员工收益 result = calculate_employee_income(db, employee_id, period, year, month, save_result=False) # 获取周期日期范围 start_date, end_date, period_key = get_period_dates(period, year, month) # 获取业绩明细 personal_records = db.query(PerformanceRecord).filter( and_( PerformanceRecord.employee_id == employee_id, PerformanceRecord.record_type == "employee", PerformanceRecord.record_date >= start_date, PerformanceRecord.record_date < end_date ) ).all() agent_records = db.query(PerformanceRecord).filter( and_( PerformanceRecord.employee_id == employee_id, PerformanceRecord.record_type == "agent", PerformanceRecord.record_date >= start_date, PerformanceRecord.record_date < end_date ) ).all() # 创建Excel writer output = io.BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: # Sheet 1: 收益汇总 summary_data = { '项目': [ '员工姓名', '计算周期', '业绩总额', '目标金额', '完成率(%)', '底薪', '绩效奖金', '个人提成', '代理提成', '总收入', '公司返点', '公司成本', '公司利润' ], '金额': [ result['employee_name'], result['period_key'], result['total_performance'], result['target_amount'], f"{result['completion_rate']:.2f}", result['base_salary'], result['performance_bonus'], result['personal_commission'], result['agent_commission'], result['total_income'], result['company_rebate'], result['company_cost'], result['company_profit'] ] } df_summary = pd.DataFrame(summary_data) df_summary.to_excel(writer, sheet_name='收益汇总', index=False) # Sheet 2: 个人业绩明细 if personal_records: personal_data = [] for record in personal_records: personal_data.append({ '日期': record.record_date, '客户名称': record.customer_name or '', '订单号': record.order_no or '', '产品分类': record.category.name if record.category else '', '业绩金额': float(record.amount or 0), '提成比例': float(record.category.commission_rate or 0) if record.category else 0, '提成金额': float(record.amount or 0) * float(record.category.commission_rate or 0) if record.category else 0 }) df_personal = pd.DataFrame(personal_data) df_personal.to_excel(writer, sheet_name='个人业绩明细', index=False) else: pd.DataFrame({'提示': ['该周期内无个人业绩记录']}).to_excel(writer, sheet_name='个人业绩明细', index=False) # Sheet 3: 代理业绩明细 if agent_records: agent_data = [] for record in agent_records: agent = record.agent agent_data.append({ '日期': record.record_date, '代理公司': agent.company_name if agent else '', '客户名称': record.customer_name or '', '订单号': record.order_no or '', '产品分类': record.category.name if record.category else '', '业绩金额': float(record.amount or 0), '分佣比例': float(agent.profit_share_rate or 0.6) if agent else 0.6, '分佣金额': float(record.amount or 0) * float(agent.profit_share_rate or 0.6) if agent else float(record.amount or 0) * 0.6 }) df_agent = pd.DataFrame(agent_data) df_agent.to_excel(writer, sheet_name='代理业绩明细', index=False) else: pd.DataFrame({'提示': ['该周期内无代理业绩记录']}).to_excel(writer, sheet_name='代理业绩明细', index=False) # Sheet 4: 提成明细 if result.get('detail') and result['detail'].get('personal_performance'): commission_data = [] for item in result['detail']['personal_performance'].get('by_category', []): commission_data.append({ '产品分类': item['category_name'], '业绩金额': item['amount'], '提成比例': item['commission_rate'], '提成金额': item['commission'] }) if commission_data: df_commission = pd.DataFrame(commission_data) df_commission.to_excel(writer, sheet_name='提成明细', index=False) output.seek(0) filename = f"employee_report_{employee_id}_{period_key}.xlsx" return output.getvalue(), filename def export_company_report( db: Session, period: str, year: int, month: Optional[int] = None ) -> tuple: """ 导出公司收益汇总Excel Returns: tuple: (excel_bytes, filename) """ # 计算公司收益 result = calculate_company_profit(db, period, year, month) # 获取周期日期范围 start_date, end_date, period_key = get_period_dates(period, year, month) # 创建Excel writer output = io.BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: # Sheet 1: 公司收益汇总 summary_data = { '项目': [ '计算周期', '员工数量', '总返点收入', '员工成本', '代理分成', '总成本', '公司利润' ], '金额': [ result['period_key'], result['employee_count'], result['total_rebate'], result['total_employee_cost'], result['total_agent_share'], result['total_cost'], result['company_profit'] ] } df_summary = pd.DataFrame(summary_data) df_summary.to_excel(writer, sheet_name='公司收益汇总', index=False) # Sheet 2: 返点明细 if result.get('rebate_details'): rebate_data = [] for item in result['rebate_details']: rebate_data.append({ '产品分类': item['category_name'], '业绩金额': item['amount'], '返点比例': item['rebate_rate'], '返点金额': item['rebate'] }) df_rebate = pd.DataFrame(rebate_data) df_rebate.to_excel(writer, sheet_name='返点明细', index=False) # Sheet 3: 员工成本明细 if result.get('employee_details'): emp_data = [] for item in result['employee_details']: emp_data.append({ '员工ID': item['employee_id'], '员工姓名': item['employee_name'], '员工收入': item['total_income'], '代理分成': item['agent_share'] }) df_emp = pd.DataFrame(emp_data) df_emp.to_excel(writer, sheet_name='员工成本明细', index=False) output.seek(0) filename = f"company_report_{period_key}.xlsx" return output.getvalue(), filename def export_performance_report( db: Session, filters: Dict[str, Any] ) -> tuple: """ 导出业绩报表Excel Args: filters: 筛选条件 - start_date: 开始日期 - end_date: 结束日期 - employee_id: 员工ID(可选) - agent_id: 代理ID(可选) - category_id: 分类ID(可选) - record_type: 记录类型(employee/agent) Returns: tuple: (excel_bytes, filename) """ # 构建查询 query = db.query(PerformanceRecord) # 应用筛选条件 if filters.get('start_date'): query = query.filter(PerformanceRecord.record_date >= filters['start_date']) if filters.get('end_date'): query = query.filter(PerformanceRecord.record_date <= filters['end_date']) if filters.get('employee_id'): query = query.filter(PerformanceRecord.employee_id == filters['employee_id']) if filters.get('agent_id'): query = query.filter(PerformanceRecord.agent_id == filters['agent_id']) if filters.get('category_id'): query = query.filter(PerformanceRecord.category_id == filters['category_id']) if filters.get('record_type'): query = query.filter(PerformanceRecord.record_type == filters['record_type']) records = query.order_by(PerformanceRecord.record_date.desc()).all() # 创建Excel writer output = io.BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: # Sheet 1: 业绩明细 if records: data = [] total_amount = 0 for record in records: amount = float(record.amount or 0) total_amount += amount employee_name = "" if record.employee and record.employee.user: employee_name = record.employee.user.name agent_name = "" if record.agent: agent_name = record.agent.company_name category_name = "" if record.category: category_name = record.category.name data.append({ 'ID': record.id, '记录类型': '员工业绩' if record.record_type == 'employee' else '代理业绩', '日期': record.record_date, '员工': employee_name, '代理': agent_name, '产品分类': category_name, '客户名称': record.customer_name or '', '订单号': record.order_no or '', '业绩金额': amount, '备注': record.remark or '' }) df = pd.DataFrame(data) df.to_excel(writer, sheet_name='业绩明细', index=False) # Sheet 2: 汇总统计 summary_data = { '统计项': [ '记录总数', '业绩总额', '平均单笔业绩', '员工业绩笔数', '代理业绩笔数' ], '数值': [ len(records), total_amount, round(total_amount / len(records), 2) if records else 0, len([r for r in records if r.record_type == 'employee']), len([r for r in records if r.record_type == 'agent']) ] } df_summary = pd.DataFrame(summary_data) df_summary.to_excel(writer, sheet_name='汇总统计', index=False) # Sheet 3: 按员工汇总 emp_summary = {} for record in records: emp_name = record.employee.user.name if record.employee and record.employee.user else '未知' if emp_name not in emp_summary: emp_summary[emp_name] = {'count': 0, 'amount': 0} emp_summary[emp_name]['count'] += 1 emp_summary[emp_name]['amount'] += float(record.amount or 0) emp_data = [] for name, stats in emp_summary.items(): emp_data.append({ '员工': name, '业绩笔数': stats['count'], '业绩总额': stats['amount'] }) df_emp = pd.DataFrame(emp_data) df_emp.to_excel(writer, sheet_name='按员工汇总', index=False) # Sheet 4: 按分类汇总 cat_summary = {} for record in records: cat_name = record.category.name if record.category else '未知' if cat_name not in cat_summary: cat_summary[cat_name] = {'count': 0, 'amount': 0} cat_summary[cat_name]['count'] += 1 cat_summary[cat_name]['amount'] += float(record.amount or 0) cat_data = [] for name, stats in cat_summary.items(): cat_data.append({ '产品分类': name, '业绩笔数': stats['count'], '业绩总额': stats['amount'] }) df_cat = pd.DataFrame(cat_data) df_cat.to_excel(writer, sheet_name='按分类汇总', index=False) else: pd.DataFrame({'提示': ['无业绩记录']}).to_excel(writer, sheet_name='业绩明细', index=False) output.seek(0) # 生成文件名 date_str = datetime.now().strftime('%Y%m%d') filename = f"performance_report_{date_str}.xlsx" return output.getvalue(), filename