逆向解析 ColorOS 离线号码库 (三):携号转网的二分查找与高并发全量导出
0. 前言
在前两节中,我们成功剥离了 ColorOS 底层的 12-Bit 归属地压缩算法与多级指针寻址的运营商识别模型,并修复了历史遗留的区号重定向问题。
然而,基于“前 7 位号段”的查询逻辑存在一个无法逾越的物理死角——携号转网(MNP)。当一个原本属于“北京移动”的用户带着号码转入“中国电信”时,传统的号段映射将彻底失效。本节,我们将揭开最后一个特权数据库 PortabilityNumberData.dat 的面纱,并利用 Python 的多进程并发技术,编写一个秒级导出百万号段的全量查询脚本。
1. 应对携号转网:特权白名单机制
由于携号转网的号码没有任何规律可言,无法像普通号码那样进行前缀压缩。OPPO 工程师在此引入了最高优先级的特权数据库:PortabilityNumberData.dat。
系统在处理任何来电时,其真实的宏观调度逻辑如下:
- 查询转网特权库:先在
PortabilityNumberData.dat中精确查找 11 位完整号码。若命中,直接提取新的归属地和运营商,查询终止。 - 查询常规号段库:若未命中,说明是未转网的常规号码。系统截取前 7 位,走我们前两节逆向出的普通库(
PhoneNumberData+CarrierData)进行解析。
2. 解析 PortabilityNumberData 数据结构
既然没有任何规律,最简单粗暴的方法就是把 11 位完整的手机号存下来。但如果我们把数以百万计的转网号码全部塞进文件,每次来电都从头遍历,必然会导致严重的性能灾难。
通过反编译 PortabilityNumbersUtil.kt,我们看到了堪称教科书级别的空间优化与二分查找 (Binary Search) 算法应用。

2.1 极致的 11 字节存储模型
在反编译的 a(String str, long j3) 方法中,我们发现每条转网记录在底层仅占用 11 个字节:
- 8 字节 (
long):存储 11 位完整的手机号码。由于 11 位数字(如 13812345678)超出了 32 位int的最大值(21亿),必须采用 64 位长整型。 - 2 字节 (
short):新的城市 ID。 - 1 字节 (
byte):新的运营商 ID。
这意味着,即使全国有 100 万个携号转网用户,该文件的主体体积也仅有约 10.4 MB。
2.2 毫秒级寻址的二分查找
为了在 100 万个记录中实现极速查询,整个二进制文件内部是严格按号码大小升序排列的。代码中实现了一个底层的文件流二分查找:
int left = 0;// right 是总记录数while (left <= right) { int mid = (right + left) / 2;
15 collapsed lines
// 核心:根据 mid 算出绝对字节偏移,直接移动文件指针 randomAccessFile.seek((mid * 11) + baseOffset);
// 读取 8 字节的长整型号码 long currentNumber = randomAccessFile.readLong();
if (targetNumber > currentNumber) { left = mid + 1; // 去右半边找 } else if (targetNumber == currentNumber) { // 命中!紧接着读取 2 字节城市 ID 和 1 字节运营商 ID return new Result(randomAccessFile.readShort(), randomAccessFile.readByte()); } else { right = mid - 1; // 去左半边找 }}通过二分查找,在 100 万条数据中定位特定号码最多只需查找 次。磁盘指针仅需跳转 20 次,耗时在 1 毫秒以内,完美兼顾了存储空间与查询性能。
3. 终极实战:高并发全量导出脚本
作为本次逆向工程的最终目标,我们需要将 1300000 到 1999999 整整 70 万个基础号段(抛开携号转网的特例)的全息数据,极速导出为 .csv 文件。
为了满足 “高并发、极速、有序” 的要求,我们利用 Python 的 multiprocessing 多进程模块。核心优化点如下:
- 消除 IPC 瓶颈:利用进程池的
initializer参数,让每个 CPU 核心在启动时,独立将几个 MB 的数据库加载到自己的内存中,避免了昂贵的进程间数据传递。 - 严格保序:使用
pool.map替代普通异步回调,它会自动阻塞并按照任务分配的顺序重组结果列表,确保导出的 CSV 文件从 130 严格递增至 199。
完整的导出脚本 (export_all_phones.py):
import osimport structimport csvimport timefrom multiprocessing import Pool, cpu_count248 collapsed lines
# ==========================================# 全局变量,用于子进程共享内存数据# ==========================================loc_data = Nonecar_data = None
# ==========================================# 模块 1:归属地数据库解析# ==========================================def load_location_db(file_path, patch_file): with open(file_path, 'rb') as f: data = f.read()
tail_data = data[-12002:] area_codes_raw = tail_data[2:2002] city_names_raw = tail_data[2002:10002] extend_prefix_raw = tail_data[10002:12002]
cities = [] for i in range(400): area = area_codes_raw[i*5 : i*5+5].decode('ascii', errors='ignore').strip() name_bytes = city_names_raw[i*20 : i*20+20].split(b'\x00')[0] name = name_bytes.decode('gbk', errors='ignore').strip() cities.append([area, name])
# 加载官方补丁 治愈 9999 if os.path.exists(patch_file): with open(patch_file, "r", encoding="utf-8") as f: lines = f.readlines() for line in lines[1:]: parts = line.strip().split('\t') if len(parts) == 4: origin_id = int(parts[0]) equal_id = int(parts[2]) if equal_id - 1 < len(cities) and origin_id - 1 < len(cities): target_area, target_name = cities[equal_id - 1] cities[origin_id - 1][0] = target_area # cities[origin_id - 1][1] = target_name
prefix_map = {} base_idx = 0 for p in range(130, 140): prefix_map[str(p)] = base_idx; base_idx += 10000 for p in range(150, 160): prefix_map[str(p)] = base_idx; base_idx += 10000 prefix_map["188"] = base_idx; base_idx += 10000 prefix_map["189"] = base_idx; base_idx += 10000
for i in range(1000): b1 = extend_prefix_raw[i*2] b2 = extend_prefix_raw[i*2 + 1] prefix_val = (b1 << 8) | b2 if prefix_val == 0: break prefix_map[str(prefix_val)] = base_idx base_idx += 10000
binary_array = data[16 : -12002] return prefix_map, cities, binary_array
# ==========================================# 模块 2:运营商数据库解析# ==========================================def load_carrier_db(file_path): with open(file_path, 'rb') as f: data = f.read()
offset = 16 carrier_names = {} num_carriers = struct.unpack_from(">H", data, offset)[0] offset += 2 for _ in range(num_carriers): carrier_id = struct.unpack_from(">H", data, offset)[0] offset += 2 names = [] for _ in range(4): str_len = struct.unpack_from(">H", data, offset)[0] offset += 2 if str_len > 0: name = data[offset : offset + str_len].decode('utf-8', errors='ignore') offset += str_len else: name = "" names.append(name) carrier_names[carrier_id] = names
general_prefixes = {} num_gen_prefixes = struct.unpack_from(">H", data, offset)[0] offset += 2 for _ in range(num_gen_prefixes): prefix_val = struct.unpack_from(">i", data, offset)[0] offset += 4 cid = struct.unpack_from(">H", data, offset)[0] offset += 2 general_prefixes[str(prefix_val)] = cid
detailed_prefixes = {} num_det_prefixes = struct.unpack_from(">H", data, offset)[0] offset += 2 for _ in range(num_det_prefixes): prefix_val = struct.unpack_from(">i", data, offset)[0] offset += 4 file_offset = struct.unpack_from(">i", data, offset)[0] offset += 4 detailed_prefixes[str(prefix_val)] = file_offset
return data, carrier_names, general_prefixes, detailed_prefixes
# ==========================================# 模块 3:子进程初始化与工作函数# ==========================================def init_worker(loc_file, car_file, patch_file): """ 初始化每个 CPU 核心的工作进程,把数据库装载到子进程的内存中。 这样不需要在进程间传递几兆的数据,极大提升性能。 """ global loc_data, car_data loc_data = load_location_db(loc_file, patch_file) car_data = load_carrier_db(car_file)
def process_chunk(args): """ 处理分配到的号码区间,例如 1300000 到 1309999 """ start_num, end_num = args results = [] prefix_map, cities, binary_array = loc_data car_raw_data, carrier_names, general_prefixes, detailed_prefixes = car_data
for phone_int in range(start_num, end_num + 1): phone_7 = str(phone_int)
# 运营商的底层查询需要 4 位后缀,为了凑齐 8 位进行偏移运算,安全补齐到 11 位 eval_phone = phone_7 + "0000"
# --- A. 归属地极速查询 --- prefix = phone_7[:3] suffix = phone_7[3:] loc_name = ""
if prefix in prefix_map: absolute_index = prefix_map[prefix] + int(suffix) byte_offset = (absolute_index // 2) * 3 if byte_offset + 2 < len(binary_array): b1, b2, b3 = binary_array[byte_offset:byte_offset+3] city_id = (b1 << 4) | (b2 >> 4) if absolute_index % 2 == 0 else ((b2 & 0x0F) << 8) | b3 if city_id != 0 and city_id < len(cities): _, loc_name = cities[city_id]
if not loc_name: loc_name = "未知"
# --- B. 运营商极速查询 --- carrier_name = "" found_carrier = False
# 1. 查精细号段偏移(如虚拟运营商) for length in range(4, 2, -1): p_prefix = eval_phone[:length] if p_prefix in detailed_prefixes: p_suffix = eval_phone[length : length+4] if len(p_suffix) == 4: target_offset = detailed_prefixes[p_prefix] + (int(p_suffix) * 2) if target_offset + 2 <= len(car_raw_data): cid = struct.unpack_from(">H", car_raw_data, target_offset)[0] if cid in carrier_names: carrier_name = carrier_names[cid][0] or carrier_names[cid][1] found_carrier = True break
# 2. 查全局号段 (如移动/联通基础号段) if not found_carrier: for length in range(4, 2, -1): p_prefix = eval_phone[:length] if p_prefix in general_prefixes: cid = general_prefixes[p_prefix] if cid in carrier_names: carrier_name = carrier_names[cid][0] or carrier_names[cid][1] break
if not carrier_name: carrier_name = "未知"
# 如果这个号段在两大数据库里全都是“未知”,说明这是一个彻头彻尾的空号段,直接跳过不写入 if loc_name == "未知" and carrier_name == "未知": continue
results.append(f"{phone_7},{loc_name},{carrier_name}\n")
return results
# ==========================================# 主程序:分发任务并整合写入# ==========================================def main(): loc_file = "E:/APKS/PhoneNumberData_3_1_0.dat" car_file = "E:/APKS/CarrierData_1_0.dat" patch_file = "E:/APKS/Multi_Areano_Table.txt" out_file = "phonenumber-fin.csv"
if not all(os.path.exists(f) for f in [loc_file, car_file, patch_file]): print("❌ 缺少必要的文件,请检查目录!") return
start_num = 1300000 end_num = 1999999 chunk_size = 20000 # 每个 CPU 分配两万个号码处理
# 计算区块分配范围 chunks = [] for i in range(start_num, end_num + 1, chunk_size): chunk_end = min(i + chunk_size - 1, end_num) chunks.append((i, chunk_end))
total_chunks = len(chunks) cores = cpu_count() print("=" * 50) print(f"🚀 准备火力全开进行高并发全息扫描...") print(f"🎯 扫描区间: {start_num} - {end_num} (共 {end_num - start_num + 1} 个号段)") print(f"⚙️ 调用核心: {cores} 个 CPU 核心") print("=" * 50)
start_time = time.time()
# 创建进程池,使用 map 确保按顺序返回 with Pool(processes=cores, initializer=init_worker, initargs=(loc_file, car_file, patch_file)) as pool: # pool.map 会阻塞并收集全部结果,并且严格保证结果列表顺序等于 chunks 顺序 all_results = pool.map(process_chunk, chunks)
print(f"✅ 扫描计算完毕,耗时: {time.time() - start_time:.2f} 秒。正在合并且保存...")
# 将结果按照顺序写入 CSV # CSV 只会保存真正有归属地或运营商记录的真实号段,抛弃了无用垃圾号段 written_count = 0 with open(out_file, 'w', encoding='utf-8-sig') as f: f.write("号段前缀,归属城市,网络运营商\n") # 写入表头 for chunk_result in all_results: if chunk_result: f.writelines(chunk_result) written_count += len(chunk_result)
print("=" * 50) print(f"🎉 导出大功告成!") print(f"📁 导出文件: {out_file}") print(f"📊 有效号段: {written_count} 条记录") print(f"⏱️ 总计耗时: {time.time() - start_time:.2f} 秒") print("=" * 50)
if __name__ == '__main__': main()利用多核 CPU 的并行计算,这 70 万个号段的数据提取在现代处理器上通常只需 1~2 秒即可跑完,真正做到了工业级的性能表现。
4. 全系列总结
在这个过程中,我们看到了一家成熟的商业手机厂商,为了在极其有限的内存和苛刻的延迟要求下提供优秀的用户体验,所做出的教科书级别的架构设计:
- 用 12-Bit 位移运算 榨干最后一丝存储空间;
- 用 墓碑机制与外挂补丁 优雅解决行政区划合并的更新阵痛;
- 用 多级指针与二分查找 应对携号转网带来的规则崩塌。
在如今 API 满天飞、算力过剩的时代,这种“锱铢必较”的底层工程美学,或许才是逆向分析带给我们的最大乐趣与收获。