import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import matplotlib.dates as mdates from matplotlib.ticker import MaxNLocator, MultipleLocator from pathlib import Path import datetime import re from collections import defaultdict import time import seaborn as sns import matplotlib.font_manager as fm # 设置中文字体支持 def setup_chinese_font(): """配置matplotlib支持中文字体""" # 尝试常见的中文字体 chinese_fonts = ['SimHei', 'Microsoft YaHei', 'SimSun', 'NSimSun', 'FangSong', 'KaiTi'] # 查找系统中已安装的字体 font_found = False for font_name in chinese_fonts: # 在Windows上查找字体 font_path = None for font in fm.findSystemFonts(fontpaths=None, fontext='ttf'): if font_name.lower() in os.path.basename(font).lower(): font_path = font break if font_path: # 如果找到字体,设置为默认字体 plt.rcParams['font.family'] = ['sans-serif'] plt.rcParams['font.sans-serif'] = [font_name, 'DejaVu Sans'] plt.rcParams['axes.unicode_minus'] = False # 正确显示负号 print(f"使用中文字体: {font_name}") font_found = True break if not font_found: # 如果没找到合适的中文字体,使用一个备选方案 print("警告: 未找到中文字体,图表中的中文可能无法正确显示") # 尝试使用不需要特定字体的解决方案 plt.rcParams['font.family'] = ['sans-serif'] plt.rcParams['axes.unicode_minus'] = False class PrefixTrie: """前缀树结构,用于高效匹配手机号前缀""" def __init__(self): self.root = {} def insert(self, prefix, province, carrier): node = self.root for digit in prefix: if digit not in node: node[digit] = {} node = node[digit] node['province'] = province node['carrier'] = carrier def longest_prefix_match(self, phone): """查找最长匹配前缀""" if not phone: return "未知", "未知" node = self.root last_match = None for i, digit in enumerate(phone): if digit not in node: break node = node[digit] if 'province' in node: last_match = (node['province'], node['carrier']) return last_match if last_match else ("未知", "未知") def load_phone_region_data(csv_file): """使用前缀树加载手机地区表数据 - 修改以支持新的运营商格式""" start = time.time() prefix_trie = PrefixTrie() try: # 读取CSV文件 df = pd.read_csv(csv_file, header=None, encoding='utf-8') if df.shape[1] < 4: # 尝试其他分隔符 df = pd.read_csv(csv_file, header=None, sep='|', encoding='utf-8') # 遍历数据行 for _, row in df.iterrows(): prefix = str(row[0]) # 手机前缀 # 不同格式的CSV处理 if len(row) >= 4: # 标准格式: 前缀,区号,省份,运营商 province = str(row[2]) carrier = str(row[3]) else: # 如果格式不匹配,尝试解析 province = "未知" carrier = "未知" if len(row) >= 3: province = str(row[2]) # 清理数据 prefix = ''.join(filter(str.isdigit, prefix)) if prefix: prefix_trie.insert(prefix, province, carrier) print(f"手机地区表加载完成,耗时: {time.time() - start:.2f}秒") except Exception as e: print(f"加载手机地区表出错: {e}") import traceback traceback.print_exc() return prefix_trie def get_time_bucket(timestamp, interval_minutes=10): """将时间戳转换为指定分钟间隔的时间段""" if pd.isna(timestamp): return None minute = timestamp.minute bucket_minute = (minute // interval_minutes) * interval_minutes return timestamp.replace(minute=bucket_minute, second=0, microsecond=0) def get_half_hour_bucket(timestamp): """将时间戳转换为半小时间隔""" if pd.isna(timestamp): return None minute = 0 if timestamp.minute < 30 else 30 return timestamp.replace(minute=minute, second=0, microsecond=0) def process_called_number(number): """处理呼入被叫号码,去掉2前缀,取前7位""" if pd.isna(number): return "" # 转换为字符串 number_str = str(number) # 提取数字 digits = ''.join(filter(str.isdigit, number_str)) # 如果以2开头,去掉前缀2 if digits.startswith('2'): digits = digits[1:] # 取前7位 digits = digits[:7] return digits def get_gateway_selection(cdr_file): """获取用户选择的被叫经由网关""" # 读取Excel文件中的所有表的第一页,只提取被叫经由网关列 xl = pd.ExcelFile(cdr_file) first_sheet = xl.sheet_names[0] df_sample = pd.read_excel(cdr_file, sheet_name=first_sheet, nrows=100) # 确保列名正确 if len(df_sample.columns) >= 6: df_sample.columns = ['终止时间', '通话时长', '终止原因', '被叫经由网关', '呼入被叫', '呼出被叫'] # 查找所有表中的唯一网关 all_gateways = set() for sheet_name in xl.sheet_names: # 只读取网关列以节省内存 df = pd.read_excel(cdr_file, sheet_name=sheet_name, usecols=[3]) # 确保列名正确 if len(df.columns) >= 1: df.columns = ['被叫经由网关'] # 获取唯一网关值并添加到集合中 gateways = df['被叫经由网关'].dropna().unique() all_gateways.update(gateways) # 将集合转换为排序列表 gateway_list = sorted(list(all_gateways)) print("\n可用的被叫经由网关列表:") for i, gateway in enumerate(gateway_list, 1): print(f"{i}. {gateway}") # 收集多次选择的网关组合 gateway_selections = [] selection_index = 1 while True: print(f"\n第 {selection_index} 次选择网关 (可多选,用逗号分隔,输入0结束选择):") selection = input("请输入网关编号: ") if selection == "0": break try: # 解析用户输入的多个选择 selected_indices = [int(idx.strip()) for idx in selection.split(",")] selected_gateways = [gateway_list[idx - 1] for idx in selected_indices if 1 <= idx <= len(gateway_list)] if selected_gateways: gateway_selections.append({ 'index': selection_index, 'name': f"选择{selection_index}", 'gateways': selected_gateways }) print(f"已添加第 {selection_index} 次选择: {', '.join(selected_gateways)}") selection_index += 1 else: print("没有选择有效的网关,请重试") except (ValueError, IndexError): print("输入格式错误,请输入有效的网关编号") if not gateway_selections: print("未选择任何网关,将分析所有数据") gateway_selections.append({ 'index': 1, 'name': "所有网关", 'gateways': gateway_list }) return gateway_selections def analyze_gateway_selection(cdr_file, prefix_trie, gateway_selection): """分析指定网关选择的数据""" start_time = time.time() selected_gateways = gateway_selection['gateways'] selection_name = gateway_selection['name'] print(f"\n开始分析 {selection_name} 的数据: {', '.join(selected_gateways)}") # 预定义错误类型的正则模式 temp_unavailable_pattern = re.compile(r'Temporarily Unavailable.*480') req_terminated_pattern = re.compile(r'Request Terminated.*487') hangup_pattern = re.compile(r'主叫挂断') # 运营商错误统计 carrier_stats = defaultdict(lambda: { 'total_calls': 0, 'temp_unavailable': 0, 'hangup': 0, 'req_terminated': 0 }) total_records = 0 filtered_records = 0 # 获取Excel文件中的所有表名 xl = pd.ExcelFile(cdr_file) # 遍历每个表进行处理 for sheet_name in xl.sheet_names: print(f" 正在处理表: {sheet_name}") # 读取Excel表 df = pd.read_excel(cdr_file, sheet_name=sheet_name) # 确保列名正确 if len(df.columns) >= 6: df.columns = ['终止时间', '通话时长', '终止原因', '被叫经由网关', '呼入被叫', '呼出被叫'] # 过滤满足指定网关的记录 if len(selected_gateways) < len(set(df['被叫经由网关'].dropna().unique())): df = df[df['被叫经由网关'].isin(selected_gateways)] # 确保通话时长为数字 df['通话时长'] = pd.to_numeric(df['通话时长'], errors='coerce').fillna(0) # 更新总记录数 total_records += len(df) filtered_records += len(df) # 分批处理记录以减少内存占用 batch_size = 5000 num_batches = (len(df) + batch_size - 1) // batch_size for batch_idx in range(num_batches): start_idx = batch_idx * batch_size end_idx = min((batch_idx + 1) * batch_size, len(df)) if batch_idx % 5 == 0 and num_batches > 1: print(f" 正在处理批次 {batch_idx + 1}/{num_batches}...") # 获取当前批次数据 batch = df.iloc[start_idx:end_idx] # 处理每条记录 for _, row in batch.iterrows(): # 处理电话号码 - 优先使用呼入被叫号码 caller_phone = "" if pd.notna(row['呼入被叫']): caller_phone = process_called_number(row['呼入被叫']) elif pd.notna(row['呼出被叫']): caller_phone = process_called_number(row['呼出被叫']) # 使用前缀树快速查找省份和运营商 _, carrier = prefix_trie.longest_prefix_match(caller_phone) # 更新运营商呼叫统计 carrier_stats[carrier]['total_calls'] += 1 # 判断错误类型 call_duration = row['通话时长'] termination_reason = str(row['终止原因']) if pd.notna(row['终止原因']) else "" if temp_unavailable_pattern.search(termination_reason): carrier_stats[carrier]['temp_unavailable'] += 1 elif hangup_pattern.search(termination_reason) and call_duration == 0: carrier_stats[carrier]['hangup'] += 1 elif req_terminated_pattern.search(termination_reason): carrier_stats[carrier]['req_terminated'] += 1 # 计算错误率并转换为DataFrame carrier_error_rates = [] for carrier, stats in carrier_stats.items(): total_calls = stats['total_calls'] if total_calls > 0: carrier_error_rates.append({ '运营商': carrier, '总呼叫量': total_calls, 'Temporarily Unavailable(480)占比(%)': (stats['temp_unavailable'] / total_calls * 100), '主叫挂断(通话时长为0)占比(%)': (stats['hangup'] / total_calls * 100), 'Request Terminated(487)占比(%)': (stats['req_terminated'] / total_calls * 100), '错误总占比(%)': ( (stats['temp_unavailable'] + stats['hangup'] + stats['req_terminated']) / total_calls * 100) }) error_df = pd.DataFrame(carrier_error_rates) # 只保留呼叫量大于阈值的运营商数据 min_calls = 10 if not error_df.empty: error_df = error_df[error_df['总呼叫量'] >= min_calls].sort_values(by='错误总占比(%)', ascending=False) print(f" 分析完成! 总记录数: {total_records}, 符合网关条件的记录: {filtered_records}") print(f" 发现 {len(error_df)} 个运营商数据") print(f" 分析耗时: {time.time() - start_time:.2f}秒") return { 'selection_name': selection_name, 'error_df': error_df, 'total_records': total_records, 'filtered_records': filtered_records } def compare_gateway_selections(all_results, output_dir): """比较不同网关选择的结果""" if not all_results: print("没有有效的网关选择分析结果") return [] # 创建输出目录 os.makedirs(output_dir, exist_ok=True) output_files = [] # 1. 为每个选择创建汇总表 for result in all_results: selection_name = result['selection_name'] error_df = result['error_df'] if error_df.empty: continue # 保存错误率表格 file_name = f"{selection_name}_运营商错误率.xlsx" file_path = os.path.join(output_dir, file_name) error_df.to_excel(file_path, index=False) output_files.append(file_path) # 2. 创建选择之间的比较表 if len(all_results) >= 2: # 获取所有结果中的所有运营商 all_carriers = set() for result in all_results: if not result['error_df'].empty: all_carriers.update(result['error_df']['运营商'].tolist()) # 准备比较数据 comparison_data = [] for carrier in sorted(all_carriers): carrier_row = {'运营商': carrier} # 添加每个选择的错误率数据 for error_type in ['总呼叫量', 'Temporarily Unavailable(480)占比(%)', '主叫挂断(通话时长为0)占比(%)', 'Request Terminated(487)占比(%)', '错误总占比(%)']: for result in all_results: selection_name = result['selection_name'] error_df = result['error_df'] # 查找当前运营商在当前选择中的数据 if not error_df.empty and carrier in error_df['运营商'].values: value = error_df.loc[error_df['运营商'] == carrier, error_type].iloc[0] carrier_row[f"{selection_name}_{error_type}"] = value else: carrier_row[f"{selection_name}_{error_type}"] = None comparison_data.append(carrier_row) # 创建比较DataFrame comparison_df = pd.DataFrame(comparison_data) # 保存比较表格 comparison_file = os.path.join(output_dir, "网关选择比较_运营商错误率.xlsx") comparison_df.to_excel(comparison_file, index=False) output_files.append(comparison_file) # 3. 生成多选择对比图 try: # 选择顶部运营商(错误率最高的前6个)进行可视化 top_carriers = [] for result in all_results: if not result['error_df'].empty: top_in_selection = result['error_df'].nlargest(3, '错误总占比(%)')['运营商'].tolist() top_carriers.extend(top_in_selection) # 去重并限制数量 top_carriers = list(dict.fromkeys(top_carriers))[:6] if top_carriers: # 为选定的运营商创建错误率比较柱状图 plt.figure(figsize=(15, 10)) # 设置位置和宽度 width = 0.2 x = np.arange(len(top_carriers)) # 绘制每个选择的错误率 for i, result in enumerate(all_results): selection_name = result['selection_name'] error_df = result['error_df'] if error_df.empty: continue # 提取数据 error_rates = [] for carrier in top_carriers: if carrier in error_df['运营商'].values: rate = error_df.loc[error_df['运营商'] == carrier, '错误总占比(%)'].iloc[0] error_rates.append(rate) else: error_rates.append(0) # 绘制柱状图 plt.bar(x + i * width - width * (len(all_results) - 1) / 2, error_rates, width=width, label=selection_name) # 添加数值标签 for j, value in enumerate(error_rates): if value > 0: plt.text(x[j] + i * width - width * (len(all_results) - 1) / 2, value + 0.5, f'{value:.1f}%', ha='center', va='bottom', fontsize=8) # 设置图表标签和标题 plt.xlabel('运营商', fontsize=12) plt.ylabel('错误总占比 (%)', fontsize=12) plt.title('不同网关选择下运营商错误率对比', fontsize=14) plt.xticks(x, top_carriers, rotation=45, ha='right') plt.legend() plt.tight_layout() plt.grid(axis='y', linestyle='--', alpha=0.7) # 保存图表 chart_file = os.path.join(output_dir, "网关选择比较_错误率柱状图.png") plt.savefig(chart_file, dpi=120) plt.close() output_files.append(chart_file) except Exception as e: print(f"生成对比图表时出错: {e}") import traceback traceback.print_exc() # 4. 创建运营商分布饼图 try: for result in all_results: selection_name = result['selection_name'] error_df = result['error_df'] if error_df.empty: continue # 选择前8个运营商,其他归为"其他" top_carriers = error_df.nlargest(8, '总呼叫量') other_calls = error_df['总呼叫量'].sum() - top_carriers['总呼叫量'].sum() # 准备饼图数据 labels = top_carriers['运营商'].tolist() if other_calls > 0: labels.append('其他') sizes = top_carriers['总呼叫量'].tolist() if other_calls > 0: sizes.append(other_calls) # 创建饼图 plt.figure(figsize=(12, 9)) plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90, shadow=True) plt.axis('equal') # 保持饼图为圆形 plt.title(f'{selection_name} - 运营商呼叫量分布', fontsize=14) plt.tight_layout() # 保存饼图 pie_file = os.path.join(output_dir, f"{selection_name}_运营商分布饼图.png") plt.savefig(pie_file, dpi=120) plt.close() output_files.append(pie_file) except Exception as e: print(f"生成饼图时出错: {e}") import traceback traceback.print_exc() return output_files def plot_ten_min_stats(answer_rate, call_volume, output_dir): """绘制10分钟间隔的应答率和呼叫量双轴图 - 修改y轴上限为35%,添加数值标签""" if not answer_rate or not call_volume: print("警告: 没有足够的10分钟间隔数据用于绘图") return None # 创建主坐标轴 fig, ax1 = plt.subplots(figsize=(20, 10)) # 确保时间序列是排序的 times = sorted(answer_rate.keys()) rates = [answer_rate[t] for t in times] volumes = [call_volume[t] for t in times] # 绘制应答率 (%) - 主坐标轴,y轴上限设为35% color1 = 'tab:blue' ax1.set_xlabel('时间', fontsize=12) ax1.set_ylabel('应答率 (%)', color=color1, fontsize=12) line1 = ax1.plot(times, rates, 'o-', color=color1, linewidth=2, label='应答率') ax1.tick_params(axis='y', labelcolor=color1, labelsize=10) ax1.set_ylim(0, 35) # 将y轴上限设为35% # 在每个数据点添加应答率标签 for x, y in zip(times, rates): ax1.annotate(f'{y:.1f}%', xy=(x, y), xytext=(0, 5), # 文本偏移 textcoords='offset points', ha='center', va='bottom', fontsize=8, color=color1) # 设置x轴格式 hours_formatter = mdates.DateFormatter('%H:%M') half_hour_locator = mdates.MinuteLocator(byminute=[0, 30]) ten_min_locator = mdates.MinuteLocator(byminute=[0, 10, 20, 30, 40, 50]) ax1.xaxis.set_major_formatter(hours_formatter) ax1.xaxis.set_major_locator(half_hour_locator) # 主刻度为半小时 ax1.xaxis.set_minor_locator(ten_min_locator) # 次刻度为10分钟 # 日期标签垂直显示以节省空间 fig.autofmt_xdate() # 添加主要时间点的网格线和标签 ax1.grid(True, which='major', linestyle='-', alpha=0.7) ax1.grid(True, which='minor', linestyle=':', alpha=0.4) # 创建次坐标轴用于呼叫量 ax2 = ax1.twinx() color2 = 'tab:red' ax2.set_ylabel('呼叫数量', color=color2, fontsize=12) line2 = ax2.plot(times, volumes, 's-', color=color2, linewidth=2, label='呼叫数量') ax2.tick_params(axis='y', labelcolor=color2, labelsize=10) ax2.yaxis.set_major_locator(MaxNLocator(integer=True)) # 确保y轴仅显示整数 # 添加图例 - 需要单独处理以包含两个轴的内容 lines = line1 + line2 labels = [l.get_label() for l in lines] ax1.legend(lines, labels, loc='upper right', fontsize=12) plt.title('10分钟间隔应答率和呼叫量统计', fontsize=14) plt.tight_layout() # 保存图表 output_file = os.path.join(output_dir, '10分钟间隔_应答率和呼叫量.png') plt.savefig(output_file, dpi=120) plt.close() return output_file def plot_error_time_series(carrier_error_df, output_dir): """绘制错误类型随时间变化图 - 改为统计错误码占比而非数量""" if carrier_error_df.empty: print("警告: 没有错误数据用于绘图") return None try: # 创建按时间和错误类型的透视表 # 先统计每个时间段的错误总数 time_total_errors = carrier_error_df.groupby('时间段')['数量'].sum().to_dict() # 基于时间和错误类型的透视表 pivot_df = carrier_error_df.pivot_table( index='时间段', columns='错误类型', values='数量', aggfunc='sum', fill_value=0 ) # 将数量转换为占比 for time_period in pivot_df.index: total = time_total_errors.get(time_period, 0) if total > 0: pivot_df.loc[time_period] = pivot_df.loc[time_period] / total * 100 # 绘制时间序列图 plt.figure(figsize=(20, 10)) # 获取错误类型列表 error_types = pivot_df.columns.tolist() # 不同错误类型使用不同颜色和标记 colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd'] markers = ['o', 's', '^', 'D', 'v'] # 绘制每种错误类型的折线 for i, error_type in enumerate(error_types): color = colors[i % len(colors)] marker = markers[i % len(markers)] values = pivot_df[error_type].values plt.plot( range(len(pivot_df.index)), values, marker=marker, color=color, linewidth=2, markersize=8, label=error_type ) # 在每个点上标注百分比 for j, v in enumerate(values): if v > 1: # 只标注大于1%的值 plt.annotate( f'{v:.1f}%', xy=(j, v), xytext=(0, 5), textcoords='offset points', ha='center', fontsize=8, color=color ) # 设置x轴标签 - 使用时间段字符串 plt.xticks( range(len(pivot_df.index)), pivot_df.index, rotation=45, ha='right' ) # 设置半小时间隔的垂直网格线 plt.grid(True, axis='x', linestyle='--', alpha=0.7) plt.grid(True, axis='y', linestyle='--', alpha=0.7) # 添加标签和图例 plt.title('错误类型随时间变化占比', fontsize=14) plt.xlabel('时间', fontsize=12) plt.ylabel('错误占比 (%)', fontsize=12) plt.legend(fontsize=12) # 确保所有内容都显示 plt.tight_layout() # 保存图表 output_file = os.path.join(output_dir, '错误类型随时间变化.png') plt.savefig(output_file, dpi=120) plt.close() return output_file except Exception as e: print(f"绘制错误类型时间序列图出错: {e}") import traceback traceback.print_exc() return None def analyze_cdr_data(cdr_file, prefix_trie): """分析CDR数据文件 - 简化版""" start_time = time.time() # 获取Excel文件中的所有表名 xl = pd.ExcelFile(cdr_file) sheet_names = xl.sheet_names print(f"文件包含 {len(sheet_names)} 张表: {', '.join(sheet_names)}") # 预定义错误类型的正则模式,避免重复编译 temp_unavailable_pattern = re.compile(r'Temporarily Unavailable.*480') req_terminated_pattern = re.compile(r'Request Terminated.*487') hangup_pattern = re.compile(r'主叫挂断') # 结果累加器 - 使用精确时间作为键 ten_min_call_stats = defaultdict(lambda: {'total': 0, 'answered': 0}) # 按半小时统计的运营商错误数据 half_hour_carrier_error_stats = defaultdict(lambda: defaultdict(lambda: defaultdict(int))) total_records = 0 # 遍历每个表进行处理 for sheet_name in sheet_names: print(f"正在处理表: {sheet_name}") # 一次性读取整个表 df = pd.read_excel(cdr_file, sheet_name=sheet_name) print(f" 表 {sheet_name} 包含 {len(df)} 条记录") # 确保列名正确 if len(df.columns) >= 6: df.columns = ['终止时间', '通话时长', '终止原因', '被叫经由网关', '呼入被叫', '呼出被叫'] # 转换时间列为datetime类型 df['终止时间'] = pd.to_datetime(df['终止时间'], errors='coerce') # 添加10分钟间隔和半小时间隔列 df['10分钟间隔'] = df['终止时间'].apply(lambda x: get_time_bucket(x, 10)) df['半小时间隔'] = df['终止时间'].apply(get_half_hour_bucket) # 确保通话时长为数字 df['通话时长'] = pd.to_numeric(df['通话时长'], errors='coerce').fillna(0) # 更新记录计数 total_records += len(df) # 更新10分钟间隔的通话统计 for _, row in df.iterrows(): ten_min_bucket = row['10分钟间隔'] half_hour_bucket = row['半小时间隔'] if pd.isna(ten_min_bucket) or pd.isna(half_hour_bucket): continue call_duration = row['通话时长'] # 更新10分钟间隔的通话统计 ten_min_call_stats[ten_min_bucket]['total'] += 1 if call_duration > 0: ten_min_call_stats[ten_min_bucket]['answered'] += 1 # 处理电话号码 - 优先使用呼入被叫号码 caller_phone = "" if pd.notna(row['呼入被叫']): caller_phone = process_called_number(row['呼入被叫']) elif pd.notna(row['呼出被叫']): caller_phone = process_called_number(row['呼出被叫']) # 使用前缀树快速查找省份和运营商 _, carrier = prefix_trie.longest_prefix_match(caller_phone) # 判断错误类型 termination_reason = str(row['终止原因']) if pd.notna(row['终止原因']) else "" error_type = None if temp_unavailable_pattern.search(termination_reason): error_type = 'Temporarily Unavailable(480)' elif hangup_pattern.search(termination_reason) and call_duration == 0: error_type = '主叫挂断(通话时长为0)' elif req_terminated_pattern.search(termination_reason): error_type = 'Request Terminated(487)' # 如果是错误记录,更新相应统计 if error_type: # 更新半小时间隔的运营商错误统计 half_hour_carrier_error_stats[half_hour_bucket][carrier][error_type] += 1 # 计算10分钟间隔的应答率 ten_min_answer_rate = {} ten_min_call_volume = {} for time_bucket, stats in sorted(ten_min_call_stats.items()): total = stats['total'] answered = stats['answered'] ten_min_answer_rate[time_bucket] = (answered / total * 100) if total > 0 else 0 ten_min_call_volume[time_bucket] = total # 准备半小时运营商错误统计表 half_hour_carrier_error_data = [] for time_bucket, carrier_data in sorted(half_hour_carrier_error_stats.items()): # 将时间转换为可读字符串格式 time_str = time_bucket.strftime('%Y-%m-%d %H:%M') for carrier, error_types in carrier_data.items(): for error_type, count in error_types.items(): half_hour_carrier_error_data.append({ '时间段': time_str, '运营商': carrier, '错误类型': error_type, '数量': count }) half_hour_carrier_error_df = pd.DataFrame(half_hour_carrier_error_data) print(f"数据分析完成,共处理 {total_records} 条记录") print(f"总耗时: {time.time() - start_time:.2f}秒") return { 'ten_min_answer_rate': pd.Series(ten_min_answer_rate), 'ten_min_call_volume': pd.Series(ten_min_call_volume), 'half_hour_carrier_error': half_hour_carrier_error_df, 'ten_min_times': sorted(ten_min_call_stats.keys()) } def save_results(stats, output_dir): """保存分析结果 - 简化版""" os.makedirs(output_dir, exist_ok=True) chart_files = [] # 1. 10分钟应答率和呼叫量图表(设置y轴上限为35%) ten_min_chart = plot_ten_min_stats( stats['ten_min_answer_rate'].to_dict(), stats['ten_min_call_volume'].to_dict(), output_dir ) if ten_min_chart: chart_files.append(ten_min_chart) # 2. 错误类型随时间变化图 - 错误码占比(而非数量) error_time_chart = plot_error_time_series( stats['half_hour_carrier_error'], output_dir ) if error_time_chart: chart_files.append(error_time_chart) return output_dir, chart_files def main(): print("CDR通话记录分析工具 - 网关选择比较版") print("=" * 50) # 设置中文字体 setup_chinese_font() start_time = time.time() # 获取当前脚本的上级目录 parent_dir = Path(__file__).parent.parent # 指定cdr目录在上级目录下 cdr_dir = parent_dir / "cdr" if not cdr_dir.exists(): print(f"错误: 目录 '{cdr_dir}' 不存在! 正在创建...") cdr_dir.mkdir(parents=True, exist_ok=True) print(f"请将CDR文件放入 '{cdr_dir}' 目录,然后重新运行程序") return # 请求用户输入CDR文件名,并明确提示位置 cdr_file_name = input(f"请输入要分析的CDR文件名称(在{cdr_dir}目录中): ") cdr_file_path = cdr_dir / cdr_file_name # 确保使用cdr_dir构建路径 print(f"尝试从以下路径读取文件: {cdr_file_path}") # 添加调试信息 if not cdr_file_path.exists(): print(f"错误: 文件 '{cdr_file_path}' 不存在!") return # 手机地区表应该在当前脚本目录中 phone_region_file = Path(__file__).parent / "手机地区表.csv" if not phone_region_file.exists(): print(f"错误: 手机地区表文件 '{phone_region_file}' 不存在!") return # 加载地区数据 print("正在加载手机地区数据...") prefix_trie = load_phone_region_data(phone_region_file) # 获取用户选择的被叫经由网关 gateway_selections = get_gateway_selection(cdr_file_path) # 创建带时间戳的输出目录 timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") output_dir = parent_dir / "分析结果" / "isp分析结果" / f"CDR分析结果_{timestamp}" # 对每个网关选择分别分析 all_results = [] for selection in gateway_selections: result = analyze_gateway_selection(cdr_file_path, prefix_trie, selection) all_results.append(result) # 比较不同网关选择的结果 print("\n正在生成网关选择比较分析...") gateway_output_dir = output_dir / "网关选择分析" comparison_files = compare_gateway_selections(all_results, gateway_output_dir) # 执行标准分析以生成时间相关图表 print("\n正在进行标准CDR分析来生成时间相关图表...") stats = analyze_cdr_data(cdr_file_path, prefix_trie) # 保存标准分析结果 print("保存分析结果...") standard_output_dir = output_dir / "标准分析" _, chart_files = save_results(stats, standard_output_dir) # 添加标准分析的文件到结果列表 all_files = comparison_files + chart_files total_time = time.time() - start_time print(f"\n分析完成! 结果已保存到: {output_dir}") print(f"生成的图表和表格文件:") for file in all_files: if file: print(f"- {file}") print(f"总执行时间: {total_time:.2f}秒") if __name__ == "__main__": main()