123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943 |
- import os
- import pandas as pd
- import numpy as np
- import matplotlib.pyplot as plt
- import matplotlib.dates as mdates
- from matplotlib.ticker import MaxNLocator, MultipleLocator
- from pathlib import Path
- import datetime
- import re
- from collections import defaultdict
- import time
- import seaborn as sns
- import matplotlib.font_manager as fm
- # 设置中文字体支持
- def setup_chinese_font():
- """配置matplotlib支持中文字体"""
- # 尝试常见的中文字体
- chinese_fonts = ['SimHei', 'Microsoft YaHei', 'SimSun', 'NSimSun', 'FangSong', 'KaiTi']
- # 查找系统中已安装的字体
- font_found = False
- for font_name in chinese_fonts:
- # 在Windows上查找字体
- font_path = None
- for font in fm.findSystemFonts(fontpaths=None, fontext='ttf'):
- if font_name.lower() in os.path.basename(font).lower():
- font_path = font
- break
- if font_path:
- # 如果找到字体,设置为默认字体
- plt.rcParams['font.family'] = ['sans-serif']
- plt.rcParams['font.sans-serif'] = [font_name, 'DejaVu Sans']
- plt.rcParams['axes.unicode_minus'] = False # 正确显示负号
- print(f"使用中文字体: {font_name}")
- font_found = True
- break
- if not font_found:
- # 如果没找到合适的中文字体,使用一个备选方案
- print("警告: 未找到中文字体,图表中的中文可能无法正确显示")
- # 尝试使用不需要特定字体的解决方案
- plt.rcParams['font.family'] = ['sans-serif']
- plt.rcParams['axes.unicode_minus'] = False
- class PrefixTrie:
- """前缀树结构,用于高效匹配手机号前缀"""
- def __init__(self):
- self.root = {}
- def insert(self, prefix, province, carrier):
- node = self.root
- for digit in prefix:
- if digit not in node:
- node[digit] = {}
- node = node[digit]
- node['province'] = province
- node['carrier'] = carrier
- def longest_prefix_match(self, phone):
- """查找最长匹配前缀"""
- if not phone:
- return "未知", "未知"
- node = self.root
- last_match = None
- for i, digit in enumerate(phone):
- if digit not in node:
- break
- node = node[digit]
- if 'province' in node:
- last_match = (node['province'], node['carrier'])
- return last_match if last_match else ("未知", "未知")
- def load_phone_region_data(csv_file):
- """使用前缀树加载手机地区表数据 - 修改以支持新的运营商格式"""
- start = time.time()
- prefix_trie = PrefixTrie()
- try:
- # 读取CSV文件
- df = pd.read_csv(csv_file, header=None, encoding='utf-8')
- if df.shape[1] < 4: # 尝试其他分隔符
- df = pd.read_csv(csv_file, header=None, sep='|', encoding='utf-8')
- # 遍历数据行
- for _, row in df.iterrows():
- prefix = str(row[0]) # 手机前缀
- # 不同格式的CSV处理
- if len(row) >= 4:
- # 标准格式: 前缀,区号,省份,运营商
- province = str(row[2])
- carrier = str(row[3])
- else:
- # 如果格式不匹配,尝试解析
- province = "未知"
- carrier = "未知"
- if len(row) >= 3:
- province = str(row[2])
- # 清理数据
- prefix = ''.join(filter(str.isdigit, prefix))
- if prefix:
- prefix_trie.insert(prefix, province, carrier)
- print(f"手机地区表加载完成,耗时: {time.time() - start:.2f}秒")
- except Exception as e:
- print(f"加载手机地区表出错: {e}")
- import traceback
- traceback.print_exc()
- return prefix_trie
- def get_time_bucket(timestamp, interval_minutes=10):
- """将时间戳转换为指定分钟间隔的时间段"""
- if pd.isna(timestamp):
- return None
- minute = timestamp.minute
- bucket_minute = (minute // interval_minutes) * interval_minutes
- return timestamp.replace(minute=bucket_minute, second=0, microsecond=0)
- def get_half_hour_bucket(timestamp):
- """将时间戳转换为半小时间隔"""
- if pd.isna(timestamp):
- return None
- minute = 0 if timestamp.minute < 30 else 30
- return timestamp.replace(minute=minute, second=0, microsecond=0)
- def process_called_number(number):
- """处理呼入被叫号码,去掉2前缀,取前7位"""
- if pd.isna(number):
- return ""
- # 转换为字符串
- number_str = str(number)
- # 提取数字
- digits = ''.join(filter(str.isdigit, number_str))
- # 如果以2开头,去掉前缀2
- if digits.startswith('2'):
- digits = digits[1:]
- # 取前7位
- digits = digits[:7]
- return digits
- def get_gateway_selection(cdr_file):
- """获取用户选择的被叫经由网关"""
- # 读取Excel文件中的所有表的第一页,只提取被叫经由网关列
- xl = pd.ExcelFile(cdr_file)
- first_sheet = xl.sheet_names[0]
- df_sample = pd.read_excel(cdr_file, sheet_name=first_sheet, nrows=100)
- # 确保列名正确
- if len(df_sample.columns) >= 6:
- df_sample.columns = ['终止时间', '通话时长', '终止原因', '被叫经由网关', '呼入被叫', '呼出被叫']
- # 查找所有表中的唯一网关
- all_gateways = set()
- for sheet_name in xl.sheet_names:
- # 只读取网关列以节省内存
- df = pd.read_excel(cdr_file, sheet_name=sheet_name, usecols=[3])
- # 确保列名正确
- if len(df.columns) >= 1:
- df.columns = ['被叫经由网关']
- # 获取唯一网关值并添加到集合中
- gateways = df['被叫经由网关'].dropna().unique()
- all_gateways.update(gateways)
- # 将集合转换为排序列表
- gateway_list = sorted(list(all_gateways))
- print("\n可用的被叫经由网关列表:")
- for i, gateway in enumerate(gateway_list, 1):
- print(f"{i}. {gateway}")
- # 收集多次选择的网关组合
- gateway_selections = []
- selection_index = 1
- while True:
- print(f"\n第 {selection_index} 次选择网关 (可多选,用逗号分隔,输入0结束选择):")
- selection = input("请输入网关编号: ")
- if selection == "0":
- break
- try:
- # 解析用户输入的多个选择
- selected_indices = [int(idx.strip()) for idx in selection.split(",")]
- selected_gateways = [gateway_list[idx - 1] for idx in selected_indices if 1 <= idx <= len(gateway_list)]
- if selected_gateways:
- gateway_selections.append({
- 'index': selection_index,
- 'name': f"选择{selection_index}",
- 'gateways': selected_gateways
- })
- print(f"已添加第 {selection_index} 次选择: {', '.join(selected_gateways)}")
- selection_index += 1
- else:
- print("没有选择有效的网关,请重试")
- except (ValueError, IndexError):
- print("输入格式错误,请输入有效的网关编号")
- if not gateway_selections:
- print("未选择任何网关,将分析所有数据")
- gateway_selections.append({
- 'index': 1,
- 'name': "所有网关",
- 'gateways': gateway_list
- })
- return gateway_selections
- def analyze_gateway_selection(cdr_file, prefix_trie, gateway_selection):
- """分析指定网关选择的数据"""
- start_time = time.time()
- selected_gateways = gateway_selection['gateways']
- selection_name = gateway_selection['name']
- print(f"\n开始分析 {selection_name} 的数据: {', '.join(selected_gateways)}")
- # 预定义错误类型的正则模式
- temp_unavailable_pattern = re.compile(r'Temporarily Unavailable.*480')
- req_terminated_pattern = re.compile(r'Request Terminated.*487')
- hangup_pattern = re.compile(r'主叫挂断')
- # 运营商错误统计
- carrier_stats = defaultdict(lambda: {
- 'total_calls': 0,
- 'temp_unavailable': 0,
- 'hangup': 0,
- 'req_terminated': 0
- })
- total_records = 0
- filtered_records = 0
- # 获取Excel文件中的所有表名
- xl = pd.ExcelFile(cdr_file)
- # 遍历每个表进行处理
- for sheet_name in xl.sheet_names:
- print(f" 正在处理表: {sheet_name}")
- # 读取Excel表
- df = pd.read_excel(cdr_file, sheet_name=sheet_name)
- # 确保列名正确
- if len(df.columns) >= 6:
- df.columns = ['终止时间', '通话时长', '终止原因', '被叫经由网关', '呼入被叫', '呼出被叫']
- # 过滤满足指定网关的记录
- if len(selected_gateways) < len(set(df['被叫经由网关'].dropna().unique())):
- df = df[df['被叫经由网关'].isin(selected_gateways)]
- # 确保通话时长为数字
- df['通话时长'] = pd.to_numeric(df['通话时长'], errors='coerce').fillna(0)
- # 更新总记录数
- total_records += len(df)
- filtered_records += len(df)
- # 分批处理记录以减少内存占用
- batch_size = 5000
- num_batches = (len(df) + batch_size - 1) // batch_size
- for batch_idx in range(num_batches):
- start_idx = batch_idx * batch_size
- end_idx = min((batch_idx + 1) * batch_size, len(df))
- if batch_idx % 5 == 0 and num_batches > 1:
- print(f" 正在处理批次 {batch_idx + 1}/{num_batches}...")
- # 获取当前批次数据
- batch = df.iloc[start_idx:end_idx]
- # 处理每条记录
- for _, row in batch.iterrows():
- # 处理电话号码 - 优先使用呼入被叫号码
- caller_phone = ""
- if pd.notna(row['呼入被叫']):
- caller_phone = process_called_number(row['呼入被叫'])
- elif pd.notna(row['呼出被叫']):
- caller_phone = process_called_number(row['呼出被叫'])
- # 使用前缀树快速查找省份和运营商
- _, carrier = prefix_trie.longest_prefix_match(caller_phone)
- # 更新运营商呼叫统计
- carrier_stats[carrier]['total_calls'] += 1
- # 判断错误类型
- call_duration = row['通话时长']
- termination_reason = str(row['终止原因']) if pd.notna(row['终止原因']) else ""
- if temp_unavailable_pattern.search(termination_reason):
- carrier_stats[carrier]['temp_unavailable'] += 1
- elif hangup_pattern.search(termination_reason) and call_duration == 0:
- carrier_stats[carrier]['hangup'] += 1
- elif req_terminated_pattern.search(termination_reason):
- carrier_stats[carrier]['req_terminated'] += 1
- # 计算错误率并转换为DataFrame
- carrier_error_rates = []
- for carrier, stats in carrier_stats.items():
- total_calls = stats['total_calls']
- if total_calls > 0:
- carrier_error_rates.append({
- '运营商': carrier,
- '总呼叫量': total_calls,
- 'Temporarily Unavailable(480)占比(%)': (stats['temp_unavailable'] / total_calls * 100),
- '主叫挂断(通话时长为0)占比(%)': (stats['hangup'] / total_calls * 100),
- 'Request Terminated(487)占比(%)': (stats['req_terminated'] / total_calls * 100),
- '错误总占比(%)': (
- (stats['temp_unavailable'] + stats['hangup'] + stats['req_terminated']) / total_calls * 100)
- })
- error_df = pd.DataFrame(carrier_error_rates)
- # 只保留呼叫量大于阈值的运营商数据
- min_calls = 10
- if not error_df.empty:
- error_df = error_df[error_df['总呼叫量'] >= min_calls].sort_values(by='错误总占比(%)', ascending=False)
- print(f" 分析完成! 总记录数: {total_records}, 符合网关条件的记录: {filtered_records}")
- print(f" 发现 {len(error_df)} 个运营商数据")
- print(f" 分析耗时: {time.time() - start_time:.2f}秒")
- return {
- 'selection_name': selection_name,
- 'error_df': error_df,
- 'total_records': total_records,
- 'filtered_records': filtered_records
- }
- def compare_gateway_selections(all_results, output_dir):
- """比较不同网关选择的结果"""
- if not all_results:
- print("没有有效的网关选择分析结果")
- return []
- # 创建输出目录
- os.makedirs(output_dir, exist_ok=True)
- output_files = []
- # 1. 为每个选择创建汇总表
- for result in all_results:
- selection_name = result['selection_name']
- error_df = result['error_df']
- if error_df.empty:
- continue
- # 保存错误率表格
- file_name = f"{selection_name}_运营商错误率.xlsx"
- file_path = os.path.join(output_dir, file_name)
- error_df.to_excel(file_path, index=False)
- output_files.append(file_path)
- # 2. 创建选择之间的比较表
- if len(all_results) >= 2:
- # 获取所有结果中的所有运营商
- all_carriers = set()
- for result in all_results:
- if not result['error_df'].empty:
- all_carriers.update(result['error_df']['运营商'].tolist())
- # 准备比较数据
- comparison_data = []
- for carrier in sorted(all_carriers):
- carrier_row = {'运营商': carrier}
- # 添加每个选择的错误率数据
- for error_type in ['总呼叫量', 'Temporarily Unavailable(480)占比(%)',
- '主叫挂断(通话时长为0)占比(%)', 'Request Terminated(487)占比(%)',
- '错误总占比(%)']:
- for result in all_results:
- selection_name = result['selection_name']
- error_df = result['error_df']
- # 查找当前运营商在当前选择中的数据
- if not error_df.empty and carrier in error_df['运营商'].values:
- value = error_df.loc[error_df['运营商'] == carrier, error_type].iloc[0]
- carrier_row[f"{selection_name}_{error_type}"] = value
- else:
- carrier_row[f"{selection_name}_{error_type}"] = None
- comparison_data.append(carrier_row)
- # 创建比较DataFrame
- comparison_df = pd.DataFrame(comparison_data)
- # 保存比较表格
- comparison_file = os.path.join(output_dir, "网关选择比较_运营商错误率.xlsx")
- comparison_df.to_excel(comparison_file, index=False)
- output_files.append(comparison_file)
- # 3. 生成多选择对比图
- try:
- # 选择顶部运营商(错误率最高的前6个)进行可视化
- top_carriers = []
- for result in all_results:
- if not result['error_df'].empty:
- top_in_selection = result['error_df'].nlargest(3, '错误总占比(%)')['运营商'].tolist()
- top_carriers.extend(top_in_selection)
- # 去重并限制数量
- top_carriers = list(dict.fromkeys(top_carriers))[:6]
- if top_carriers:
- # 为选定的运营商创建错误率比较柱状图
- plt.figure(figsize=(15, 10))
- # 设置位置和宽度
- width = 0.2
- x = np.arange(len(top_carriers))
- # 绘制每个选择的错误率
- for i, result in enumerate(all_results):
- selection_name = result['selection_name']
- error_df = result['error_df']
- if error_df.empty:
- continue
- # 提取数据
- error_rates = []
- for carrier in top_carriers:
- if carrier in error_df['运营商'].values:
- rate = error_df.loc[error_df['运营商'] == carrier, '错误总占比(%)'].iloc[0]
- error_rates.append(rate)
- else:
- error_rates.append(0)
- # 绘制柱状图
- plt.bar(x + i * width - width * (len(all_results) - 1) / 2,
- error_rates,
- width=width,
- label=selection_name)
- # 添加数值标签
- for j, value in enumerate(error_rates):
- if value > 0:
- plt.text(x[j] + i * width - width * (len(all_results) - 1) / 2,
- value + 0.5,
- f'{value:.1f}%',
- ha='center',
- va='bottom',
- fontsize=8)
- # 设置图表标签和标题
- plt.xlabel('运营商', fontsize=12)
- plt.ylabel('错误总占比 (%)', fontsize=12)
- plt.title('不同网关选择下运营商错误率对比', fontsize=14)
- plt.xticks(x, top_carriers, rotation=45, ha='right')
- plt.legend()
- plt.tight_layout()
- plt.grid(axis='y', linestyle='--', alpha=0.7)
- # 保存图表
- chart_file = os.path.join(output_dir, "网关选择比较_错误率柱状图.png")
- plt.savefig(chart_file, dpi=120)
- plt.close()
- output_files.append(chart_file)
- except Exception as e:
- print(f"生成对比图表时出错: {e}")
- import traceback
- traceback.print_exc()
- # 4. 创建运营商分布饼图
- try:
- for result in all_results:
- selection_name = result['selection_name']
- error_df = result['error_df']
- if error_df.empty:
- continue
- # 选择前8个运营商,其他归为"其他"
- top_carriers = error_df.nlargest(8, '总呼叫量')
- other_calls = error_df['总呼叫量'].sum() - top_carriers['总呼叫量'].sum()
- # 准备饼图数据
- labels = top_carriers['运营商'].tolist()
- if other_calls > 0:
- labels.append('其他')
- sizes = top_carriers['总呼叫量'].tolist()
- if other_calls > 0:
- sizes.append(other_calls)
- # 创建饼图
- plt.figure(figsize=(12, 9))
- plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90, shadow=True)
- plt.axis('equal') # 保持饼图为圆形
- plt.title(f'{selection_name} - 运营商呼叫量分布', fontsize=14)
- plt.tight_layout()
- # 保存饼图
- pie_file = os.path.join(output_dir, f"{selection_name}_运营商分布饼图.png")
- plt.savefig(pie_file, dpi=120)
- plt.close()
- output_files.append(pie_file)
- except Exception as e:
- print(f"生成饼图时出错: {e}")
- import traceback
- traceback.print_exc()
- return output_files
- def plot_ten_min_stats(answer_rate, call_volume, output_dir):
- """绘制10分钟间隔的应答率和呼叫量双轴图 - 修改y轴上限为35%,添加数值标签"""
- if not answer_rate or not call_volume:
- print("警告: 没有足够的10分钟间隔数据用于绘图")
- return None
- # 创建主坐标轴
- fig, ax1 = plt.subplots(figsize=(20, 10))
- # 确保时间序列是排序的
- times = sorted(answer_rate.keys())
- rates = [answer_rate[t] for t in times]
- volumes = [call_volume[t] for t in times]
- # 绘制应答率 (%) - 主坐标轴,y轴上限设为35%
- color1 = 'tab:blue'
- ax1.set_xlabel('时间', fontsize=12)
- ax1.set_ylabel('应答率 (%)', color=color1, fontsize=12)
- line1 = ax1.plot(times, rates, 'o-', color=color1, linewidth=2, label='应答率')
- ax1.tick_params(axis='y', labelcolor=color1, labelsize=10)
- ax1.set_ylim(0, 35) # 将y轴上限设为35%
- # 在每个数据点添加应答率标签
- for x, y in zip(times, rates):
- ax1.annotate(f'{y:.1f}%',
- xy=(x, y),
- xytext=(0, 5), # 文本偏移
- textcoords='offset points',
- ha='center',
- va='bottom',
- fontsize=8,
- color=color1)
- # 设置x轴格式
- hours_formatter = mdates.DateFormatter('%H:%M')
- half_hour_locator = mdates.MinuteLocator(byminute=[0, 30])
- ten_min_locator = mdates.MinuteLocator(byminute=[0, 10, 20, 30, 40, 50])
- ax1.xaxis.set_major_formatter(hours_formatter)
- ax1.xaxis.set_major_locator(half_hour_locator) # 主刻度为半小时
- ax1.xaxis.set_minor_locator(ten_min_locator) # 次刻度为10分钟
- # 日期标签垂直显示以节省空间
- fig.autofmt_xdate()
- # 添加主要时间点的网格线和标签
- ax1.grid(True, which='major', linestyle='-', alpha=0.7)
- ax1.grid(True, which='minor', linestyle=':', alpha=0.4)
- # 创建次坐标轴用于呼叫量
- ax2 = ax1.twinx()
- color2 = 'tab:red'
- ax2.set_ylabel('呼叫数量', color=color2, fontsize=12)
- line2 = ax2.plot(times, volumes, 's-', color=color2, linewidth=2, label='呼叫数量')
- ax2.tick_params(axis='y', labelcolor=color2, labelsize=10)
- ax2.yaxis.set_major_locator(MaxNLocator(integer=True)) # 确保y轴仅显示整数
- # 添加图例 - 需要单独处理以包含两个轴的内容
- lines = line1 + line2
- labels = [l.get_label() for l in lines]
- ax1.legend(lines, labels, loc='upper right', fontsize=12)
- plt.title('10分钟间隔应答率和呼叫量统计', fontsize=14)
- plt.tight_layout()
- # 保存图表
- output_file = os.path.join(output_dir, '10分钟间隔_应答率和呼叫量.png')
- plt.savefig(output_file, dpi=120)
- plt.close()
- return output_file
- def plot_error_time_series(carrier_error_df, output_dir):
- """绘制错误类型随时间变化图 - 改为统计错误码占比而非数量"""
- if carrier_error_df.empty:
- print("警告: 没有错误数据用于绘图")
- return None
- try:
- # 创建按时间和错误类型的透视表
- # 先统计每个时间段的错误总数
- time_total_errors = carrier_error_df.groupby('时间段')['数量'].sum().to_dict()
- # 基于时间和错误类型的透视表
- pivot_df = carrier_error_df.pivot_table(
- index='时间段',
- columns='错误类型',
- values='数量',
- aggfunc='sum',
- fill_value=0
- )
- # 将数量转换为占比
- for time_period in pivot_df.index:
- total = time_total_errors.get(time_period, 0)
- if total > 0:
- pivot_df.loc[time_period] = pivot_df.loc[time_period] / total * 100
- # 绘制时间序列图
- plt.figure(figsize=(20, 10))
- # 获取错误类型列表
- error_types = pivot_df.columns.tolist()
- # 不同错误类型使用不同颜色和标记
- colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd']
- markers = ['o', 's', '^', 'D', 'v']
- # 绘制每种错误类型的折线
- for i, error_type in enumerate(error_types):
- color = colors[i % len(colors)]
- marker = markers[i % len(markers)]
- values = pivot_df[error_type].values
- plt.plot(
- range(len(pivot_df.index)),
- values,
- marker=marker,
- color=color,
- linewidth=2,
- markersize=8,
- label=error_type
- )
- # 在每个点上标注百分比
- for j, v in enumerate(values):
- if v > 1: # 只标注大于1%的值
- plt.annotate(
- f'{v:.1f}%',
- xy=(j, v),
- xytext=(0, 5),
- textcoords='offset points',
- ha='center',
- fontsize=8,
- color=color
- )
- # 设置x轴标签 - 使用时间段字符串
- plt.xticks(
- range(len(pivot_df.index)),
- pivot_df.index,
- rotation=45,
- ha='right'
- )
- # 设置半小时间隔的垂直网格线
- plt.grid(True, axis='x', linestyle='--', alpha=0.7)
- plt.grid(True, axis='y', linestyle='--', alpha=0.7)
- # 添加标签和图例
- plt.title('错误类型随时间变化占比', fontsize=14)
- plt.xlabel('时间', fontsize=12)
- plt.ylabel('错误占比 (%)', fontsize=12)
- plt.legend(fontsize=12)
- # 确保所有内容都显示
- plt.tight_layout()
- # 保存图表
- output_file = os.path.join(output_dir, '错误类型随时间变化.png')
- plt.savefig(output_file, dpi=120)
- plt.close()
- return output_file
- except Exception as e:
- print(f"绘制错误类型时间序列图出错: {e}")
- import traceback
- traceback.print_exc()
- return None
- def analyze_cdr_data(cdr_file, prefix_trie):
- """分析CDR数据文件 - 简化版"""
- start_time = time.time()
- # 获取Excel文件中的所有表名
- xl = pd.ExcelFile(cdr_file)
- sheet_names = xl.sheet_names
- print(f"文件包含 {len(sheet_names)} 张表: {', '.join(sheet_names)}")
- # 预定义错误类型的正则模式,避免重复编译
- temp_unavailable_pattern = re.compile(r'Temporarily Unavailable.*480')
- req_terminated_pattern = re.compile(r'Request Terminated.*487')
- hangup_pattern = re.compile(r'主叫挂断')
- # 结果累加器 - 使用精确时间作为键
- ten_min_call_stats = defaultdict(lambda: {'total': 0, 'answered': 0})
- # 按半小时统计的运营商错误数据
- half_hour_carrier_error_stats = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
- total_records = 0
- # 遍历每个表进行处理
- for sheet_name in sheet_names:
- print(f"正在处理表: {sheet_name}")
- # 一次性读取整个表
- df = pd.read_excel(cdr_file, sheet_name=sheet_name)
- print(f" 表 {sheet_name} 包含 {len(df)} 条记录")
- # 确保列名正确
- if len(df.columns) >= 6:
- df.columns = ['终止时间', '通话时长', '终止原因', '被叫经由网关', '呼入被叫', '呼出被叫']
- # 转换时间列为datetime类型
- df['终止时间'] = pd.to_datetime(df['终止时间'], errors='coerce')
- # 添加10分钟间隔和半小时间隔列
- df['10分钟间隔'] = df['终止时间'].apply(lambda x: get_time_bucket(x, 10))
- df['半小时间隔'] = df['终止时间'].apply(get_half_hour_bucket)
- # 确保通话时长为数字
- df['通话时长'] = pd.to_numeric(df['通话时长'], errors='coerce').fillna(0)
- # 更新记录计数
- total_records += len(df)
- # 更新10分钟间隔的通话统计
- for _, row in df.iterrows():
- ten_min_bucket = row['10分钟间隔']
- half_hour_bucket = row['半小时间隔']
- if pd.isna(ten_min_bucket) or pd.isna(half_hour_bucket):
- continue
- call_duration = row['通话时长']
- # 更新10分钟间隔的通话统计
- ten_min_call_stats[ten_min_bucket]['total'] += 1
- if call_duration > 0:
- ten_min_call_stats[ten_min_bucket]['answered'] += 1
- # 处理电话号码 - 优先使用呼入被叫号码
- caller_phone = ""
- if pd.notna(row['呼入被叫']):
- caller_phone = process_called_number(row['呼入被叫'])
- elif pd.notna(row['呼出被叫']):
- caller_phone = process_called_number(row['呼出被叫'])
- # 使用前缀树快速查找省份和运营商
- _, carrier = prefix_trie.longest_prefix_match(caller_phone)
- # 判断错误类型
- termination_reason = str(row['终止原因']) if pd.notna(row['终止原因']) else ""
- error_type = None
- if temp_unavailable_pattern.search(termination_reason):
- error_type = 'Temporarily Unavailable(480)'
- elif hangup_pattern.search(termination_reason) and call_duration == 0:
- error_type = '主叫挂断(通话时长为0)'
- elif req_terminated_pattern.search(termination_reason):
- error_type = 'Request Terminated(487)'
- # 如果是错误记录,更新相应统计
- if error_type:
- # 更新半小时间隔的运营商错误统计
- half_hour_carrier_error_stats[half_hour_bucket][carrier][error_type] += 1
- # 计算10分钟间隔的应答率
- ten_min_answer_rate = {}
- ten_min_call_volume = {}
- for time_bucket, stats in sorted(ten_min_call_stats.items()):
- total = stats['total']
- answered = stats['answered']
- ten_min_answer_rate[time_bucket] = (answered / total * 100) if total > 0 else 0
- ten_min_call_volume[time_bucket] = total
- # 准备半小时运营商错误统计表
- half_hour_carrier_error_data = []
- for time_bucket, carrier_data in sorted(half_hour_carrier_error_stats.items()):
- # 将时间转换为可读字符串格式
- time_str = time_bucket.strftime('%Y-%m-%d %H:%M')
- for carrier, error_types in carrier_data.items():
- for error_type, count in error_types.items():
- half_hour_carrier_error_data.append({
- '时间段': time_str,
- '运营商': carrier,
- '错误类型': error_type,
- '数量': count
- })
- half_hour_carrier_error_df = pd.DataFrame(half_hour_carrier_error_data)
- print(f"数据分析完成,共处理 {total_records} 条记录")
- print(f"总耗时: {time.time() - start_time:.2f}秒")
- return {
- 'ten_min_answer_rate': pd.Series(ten_min_answer_rate),
- 'ten_min_call_volume': pd.Series(ten_min_call_volume),
- 'half_hour_carrier_error': half_hour_carrier_error_df,
- 'ten_min_times': sorted(ten_min_call_stats.keys())
- }
- def save_results(stats, output_dir):
- """保存分析结果 - 简化版"""
- os.makedirs(output_dir, exist_ok=True)
- chart_files = []
- # 1. 10分钟应答率和呼叫量图表(设置y轴上限为35%)
- ten_min_chart = plot_ten_min_stats(
- stats['ten_min_answer_rate'].to_dict(),
- stats['ten_min_call_volume'].to_dict(),
- output_dir
- )
- if ten_min_chart:
- chart_files.append(ten_min_chart)
- # 2. 错误类型随时间变化图 - 错误码占比(而非数量)
- error_time_chart = plot_error_time_series(
- stats['half_hour_carrier_error'],
- output_dir
- )
- if error_time_chart:
- chart_files.append(error_time_chart)
- return output_dir, chart_files
- def main():
- print("CDR通话记录分析工具 - 网关选择比较版")
- print("=" * 50)
- # 设置中文字体
- setup_chinese_font()
- start_time = time.time()
- # 获取当前脚本的上级目录
- parent_dir = Path(__file__).parent.parent
- # 指定cdr目录在上级目录下
- cdr_dir = parent_dir / "cdr"
- if not cdr_dir.exists():
- print(f"错误: 目录 '{cdr_dir}' 不存在! 正在创建...")
- cdr_dir.mkdir(parents=True, exist_ok=True)
- print(f"请将CDR文件放入 '{cdr_dir}' 目录,然后重新运行程序")
- return
- # 请求用户输入CDR文件名,并明确提示位置
- cdr_file_name = input(f"请输入要分析的CDR文件名称(在{cdr_dir}目录中): ")
- cdr_file_path = cdr_dir / cdr_file_name # 确保使用cdr_dir构建路径
- print(f"尝试从以下路径读取文件: {cdr_file_path}") # 添加调试信息
- if not cdr_file_path.exists():
- print(f"错误: 文件 '{cdr_file_path}' 不存在!")
- return
- # 手机地区表应该在当前脚本目录中
- phone_region_file = Path(__file__).parent / "手机地区表.csv"
- if not phone_region_file.exists():
- print(f"错误: 手机地区表文件 '{phone_region_file}' 不存在!")
- return
- # 加载地区数据
- print("正在加载手机地区数据...")
- prefix_trie = load_phone_region_data(phone_region_file)
- # 获取用户选择的被叫经由网关
- gateway_selections = get_gateway_selection(cdr_file_path)
- # 创建带时间戳的输出目录
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
- output_dir = parent_dir / "分析结果" / "isp分析结果" / f"CDR分析结果_{timestamp}"
- # 对每个网关选择分别分析
- all_results = []
- for selection in gateway_selections:
- result = analyze_gateway_selection(cdr_file_path, prefix_trie, selection)
- all_results.append(result)
- # 比较不同网关选择的结果
- print("\n正在生成网关选择比较分析...")
- gateway_output_dir = output_dir / "网关选择分析"
- comparison_files = compare_gateway_selections(all_results, gateway_output_dir)
- # 执行标准分析以生成时间相关图表
- print("\n正在进行标准CDR分析来生成时间相关图表...")
- stats = analyze_cdr_data(cdr_file_path, prefix_trie)
- # 保存标准分析结果
- print("保存分析结果...")
- standard_output_dir = output_dir / "标准分析"
- _, chart_files = save_results(stats, standard_output_dir)
- # 添加标准分析的文件到结果列表
- all_files = comparison_files + chart_files
- total_time = time.time() - start_time
- print(f"\n分析完成! 结果已保存到: {output_dir}")
- print(f"生成的图表和表格文件:")
- for file in all_files:
- if file:
- print(f"- {file}")
- print(f"总执行时间: {total_time:.2f}秒")
- if __name__ == "__main__":
- main()
|