2565 字
13 分钟
逆向解析 ColorOS 离线号码库 (三):携号转网的二分查找与高并发全量导出

逆向解析 ColorOS 离线号码库 (三):携号转网的二分查找与高并发全量导出#

0. 前言#

在前两节中,我们成功剥离了 ColorOS 底层的 12-Bit 归属地压缩算法与多级指针寻址的运营商识别模型,并修复了历史遗留的区号重定向问题。

然而,基于“前 7 位号段”的查询逻辑存在一个无法逾越的物理死角——携号转网(MNP)。当一个原本属于“北京移动”的用户带着号码转入“中国电信”时,传统的号段映射将彻底失效。本节,我们将揭开最后一个特权数据库 PortabilityNumberData.dat 的面纱,并利用 Python 的多进程并发技术,编写一个秒级导出百万号段的全量查询脚本。

1. 应对携号转网:特权白名单机制#

由于携号转网的号码没有任何规律可言,无法像普通号码那样进行前缀压缩。OPPO 工程师在此引入了最高优先级的特权数据库:PortabilityNumberData.dat

系统在处理任何来电时,其真实的宏观调度逻辑如下:

  1. 查询转网特权库:先在 PortabilityNumberData.dat 中精确查找 11 位完整号码。若命中,直接提取新的归属地和运营商,查询终止。
  2. 查询常规号段库:若未命中,说明是未转网的常规号码。系统截取前 7 位,走我们前两节逆向出的普通库(PhoneNumberData + CarrierData)进行解析。

2. 解析 PortabilityNumberData 数据结构#

既然没有任何规律,最简单粗暴的方法就是把 11 位完整的手机号存下来。但如果我们把数以百万计的转网号码全部塞进文件,每次来电都从头遍历,必然会导致严重的性能灾难。

通过反编译 PortabilityNumbersUtil.kt,我们看到了堪称教科书级别的空间优化与二分查找 (Binary Search) 算法应用。

image.jpg

2.1 极致的 11 字节存储模型#

在反编译的 a(String str, long j3) 方法中,我们发现每条转网记录在底层仅占用 11 个字节

  • 8 字节 (long):存储 11 位完整的手机号码。由于 11 位数字(如 13812345678)超出了 32 位 int 的最大值(21亿),必须采用 64 位长整型。
  • 2 字节 (short):新的城市 ID。
  • 1 字节 (byte):新的运营商 ID。

这意味着,即使全国有 100 万个携号转网用户,该文件的主体体积也仅有约 10.4 MB

2.2 毫秒级寻址的二分查找#

为了在 100 万个记录中实现极速查询,整个二进制文件内部是严格按号码大小升序排列的。代码中实现了一个底层的文件流二分查找:

int left = 0;
// right 是总记录数
while (left <= right) {
int mid = (right + left) / 2;
15 collapsed lines
// 核心:根据 mid 算出绝对字节偏移,直接移动文件指针
randomAccessFile.seek((mid * 11) + baseOffset);
// 读取 8 字节的长整型号码
long currentNumber = randomAccessFile.readLong();
if (targetNumber > currentNumber) {
left = mid + 1; // 去右半边找
} else if (targetNumber == currentNumber) {
// 命中!紧接着读取 2 字节城市 ID 和 1 字节运营商 ID
return new Result(randomAccessFile.readShort(), randomAccessFile.readByte());
} else {
right = mid - 1; // 去左半边找
}
}

通过二分查找,在 100 万条数据中定位特定号码最多只需查找 log2(1000000)20\log_2(1000000) \approx 20 次。磁盘指针仅需跳转 20 次,耗时在 1 毫秒以内,完美兼顾了存储空间与查询性能。

3. 终极实战:高并发全量导出脚本#

作为本次逆向工程的最终目标,我们需要将 13000001999999 整整 70 万个基础号段(抛开携号转网的特例)的全息数据,极速导出为 .csv 文件。

为了满足 “高并发、极速、有序” 的要求,我们利用 Python 的 multiprocessing 多进程模块。核心优化点如下:

  1. 消除 IPC 瓶颈:利用进程池的 initializer 参数,让每个 CPU 核心在启动时,独立将几个 MB 的数据库加载到自己的内存中,避免了昂贵的进程间数据传递。
  2. 严格保序:使用 pool.map 替代普通异步回调,它会自动阻塞并按照任务分配的顺序重组结果列表,确保导出的 CSV 文件从 130 严格递增至 199。

完整的导出脚本 (export_all_phones.py):

import os
import struct
import csv
import time
from multiprocessing import Pool, cpu_count
248 collapsed lines
# ==========================================
# 全局变量,用于子进程共享内存数据
# ==========================================
loc_data = None
car_data = None
# ==========================================
# 模块 1:归属地数据库解析
# ==========================================
def load_location_db(file_path, patch_file):
with open(file_path, 'rb') as f:
data = f.read()
tail_data = data[-12002:]
area_codes_raw = tail_data[2:2002]
city_names_raw = tail_data[2002:10002]
extend_prefix_raw = tail_data[10002:12002]
cities = []
for i in range(400):
area = area_codes_raw[i*5 : i*5+5].decode('ascii', errors='ignore').strip()
name_bytes = city_names_raw[i*20 : i*20+20].split(b'\x00')[0]
name = name_bytes.decode('gbk', errors='ignore').strip()
cities.append([area, name])
# 加载官方补丁 治愈 9999
if os.path.exists(patch_file):
with open(patch_file, "r", encoding="utf-8") as f:
lines = f.readlines()
for line in lines[1:]:
parts = line.strip().split('\t')
if len(parts) == 4:
origin_id = int(parts[0])
equal_id = int(parts[2])
if equal_id - 1 < len(cities) and origin_id - 1 < len(cities):
target_area, target_name = cities[equal_id - 1]
cities[origin_id - 1][0] = target_area
# cities[origin_id - 1][1] = target_name
prefix_map = {}
base_idx = 0
for p in range(130, 140): prefix_map[str(p)] = base_idx; base_idx += 10000
for p in range(150, 160): prefix_map[str(p)] = base_idx; base_idx += 10000
prefix_map["188"] = base_idx; base_idx += 10000
prefix_map["189"] = base_idx; base_idx += 10000
for i in range(1000):
b1 = extend_prefix_raw[i*2]
b2 = extend_prefix_raw[i*2 + 1]
prefix_val = (b1 << 8) | b2
if prefix_val == 0: break
prefix_map[str(prefix_val)] = base_idx
base_idx += 10000
binary_array = data[16 : -12002]
return prefix_map, cities, binary_array
# ==========================================
# 模块 2:运营商数据库解析
# ==========================================
def load_carrier_db(file_path):
with open(file_path, 'rb') as f:
data = f.read()
offset = 16
carrier_names = {}
num_carriers = struct.unpack_from(">H", data, offset)[0]
offset += 2
for _ in range(num_carriers):
carrier_id = struct.unpack_from(">H", data, offset)[0]
offset += 2
names = []
for _ in range(4):
str_len = struct.unpack_from(">H", data, offset)[0]
offset += 2
if str_len > 0:
name = data[offset : offset + str_len].decode('utf-8', errors='ignore')
offset += str_len
else:
name = ""
names.append(name)
carrier_names[carrier_id] = names
general_prefixes = {}
num_gen_prefixes = struct.unpack_from(">H", data, offset)[0]
offset += 2
for _ in range(num_gen_prefixes):
prefix_val = struct.unpack_from(">i", data, offset)[0]
offset += 4
cid = struct.unpack_from(">H", data, offset)[0]
offset += 2
general_prefixes[str(prefix_val)] = cid
detailed_prefixes = {}
num_det_prefixes = struct.unpack_from(">H", data, offset)[0]
offset += 2
for _ in range(num_det_prefixes):
prefix_val = struct.unpack_from(">i", data, offset)[0]
offset += 4
file_offset = struct.unpack_from(">i", data, offset)[0]
offset += 4
detailed_prefixes[str(prefix_val)] = file_offset
return data, carrier_names, general_prefixes, detailed_prefixes
# ==========================================
# 模块 3:子进程初始化与工作函数
# ==========================================
def init_worker(loc_file, car_file, patch_file):
"""
初始化每个 CPU 核心的工作进程,把数据库装载到子进程的内存中。
这样不需要在进程间传递几兆的数据,极大提升性能。
"""
global loc_data, car_data
loc_data = load_location_db(loc_file, patch_file)
car_data = load_carrier_db(car_file)
def process_chunk(args):
"""
处理分配到的号码区间,例如 1300000 到 1309999
"""
start_num, end_num = args
results = []
prefix_map, cities, binary_array = loc_data
car_raw_data, carrier_names, general_prefixes, detailed_prefixes = car_data
for phone_int in range(start_num, end_num + 1):
phone_7 = str(phone_int)
# 运营商的底层查询需要 4 位后缀,为了凑齐 8 位进行偏移运算,安全补齐到 11 位
eval_phone = phone_7 + "0000"
# --- A. 归属地极速查询 ---
prefix = phone_7[:3]
suffix = phone_7[3:]
loc_name = ""
if prefix in prefix_map:
absolute_index = prefix_map[prefix] + int(suffix)
byte_offset = (absolute_index // 2) * 3
if byte_offset + 2 < len(binary_array):
b1, b2, b3 = binary_array[byte_offset:byte_offset+3]
city_id = (b1 << 4) | (b2 >> 4) if absolute_index % 2 == 0 else ((b2 & 0x0F) << 8) | b3
if city_id != 0 and city_id < len(cities):
_, loc_name = cities[city_id]
if not loc_name:
loc_name = "未知"
# --- B. 运营商极速查询 ---
carrier_name = ""
found_carrier = False
# 1. 查精细号段偏移(如虚拟运营商)
for length in range(4, 2, -1):
p_prefix = eval_phone[:length]
if p_prefix in detailed_prefixes:
p_suffix = eval_phone[length : length+4]
if len(p_suffix) == 4:
target_offset = detailed_prefixes[p_prefix] + (int(p_suffix) * 2)
if target_offset + 2 <= len(car_raw_data):
cid = struct.unpack_from(">H", car_raw_data, target_offset)[0]
if cid in carrier_names:
carrier_name = carrier_names[cid][0] or carrier_names[cid][1]
found_carrier = True
break
# 2. 查全局号段 (如移动/联通基础号段)
if not found_carrier:
for length in range(4, 2, -1):
p_prefix = eval_phone[:length]
if p_prefix in general_prefixes:
cid = general_prefixes[p_prefix]
if cid in carrier_names:
carrier_name = carrier_names[cid][0] or carrier_names[cid][1]
break
if not carrier_name:
carrier_name = "未知"
# 如果这个号段在两大数据库里全都是“未知”,说明这是一个彻头彻尾的空号段,直接跳过不写入
if loc_name == "未知" and carrier_name == "未知":
continue
results.append(f"{phone_7},{loc_name},{carrier_name}\n")
return results
# ==========================================
# 主程序:分发任务并整合写入
# ==========================================
def main():
loc_file = "E:/APKS/PhoneNumberData_3_1_0.dat"
car_file = "E:/APKS/CarrierData_1_0.dat"
patch_file = "E:/APKS/Multi_Areano_Table.txt"
out_file = "phonenumber-fin.csv"
if not all(os.path.exists(f) for f in [loc_file, car_file, patch_file]):
print("❌ 缺少必要的文件,请检查目录!")
return
start_num = 1300000
end_num = 1999999
chunk_size = 20000 # 每个 CPU 分配两万个号码处理
# 计算区块分配范围
chunks = []
for i in range(start_num, end_num + 1, chunk_size):
chunk_end = min(i + chunk_size - 1, end_num)
chunks.append((i, chunk_end))
total_chunks = len(chunks)
cores = cpu_count()
print("=" * 50)
print(f"🚀 准备火力全开进行高并发全息扫描...")
print(f"🎯 扫描区间: {start_num} - {end_num} (共 {end_num - start_num + 1} 个号段)")
print(f"⚙️ 调用核心: {cores} 个 CPU 核心")
print("=" * 50)
start_time = time.time()
# 创建进程池,使用 map 确保按顺序返回
with Pool(processes=cores, initializer=init_worker, initargs=(loc_file, car_file, patch_file)) as pool:
# pool.map 会阻塞并收集全部结果,并且严格保证结果列表顺序等于 chunks 顺序
all_results = pool.map(process_chunk, chunks)
print(f"✅ 扫描计算完毕,耗时: {time.time() - start_time:.2f} 秒。正在合并且保存...")
# 将结果按照顺序写入 CSV
# CSV 只会保存真正有归属地或运营商记录的真实号段,抛弃了无用垃圾号段
written_count = 0
with open(out_file, 'w', encoding='utf-8-sig') as f:
f.write("号段前缀,归属城市,网络运营商\n") # 写入表头
for chunk_result in all_results:
if chunk_result:
f.writelines(chunk_result)
written_count += len(chunk_result)
print("=" * 50)
print(f"🎉 导出大功告成!")
print(f"📁 导出文件: {out_file}")
print(f"📊 有效号段: {written_count} 条记录")
print(f"⏱️ 总计耗时: {time.time() - start_time:.2f} 秒")
print("=" * 50)
if __name__ == '__main__':
main()

利用多核 CPU 的并行计算,这 70 万个号段的数据提取在现代处理器上通常只需 1~2 秒即可跑完,真正做到了工业级的性能表现。

4. 全系列总结#

在这个过程中,我们看到了一家成熟的商业手机厂商,为了在极其有限的内存和苛刻的延迟要求下提供优秀的用户体验,所做出的教科书级别的架构设计:

  • 12-Bit 位移运算 榨干最后一丝存储空间;
  • 墓碑机制与外挂补丁 优雅解决行政区划合并的更新阵痛;
  • 多级指针与二分查找 应对携号转网带来的规则崩塌。

在如今 API 满天飞、算力过剩的时代,这种“锱铢必较”的底层工程美学,或许才是逆向分析带给我们的最大乐趣与收获。

逆向解析 ColorOS 离线号码库 (三):携号转网的二分查找与高并发全量导出
https://006lp.de/posts/reverse-coloros-offline-number-lookup/binary-search-mnp-export-coloros-3/
作者
Hotaru
发布于
2026-03-14
许可协议
CC BY-NC-ND 4.0