diff --git a/CP02/README.md b/CP02/README.md index addbda1..95e882b 100644 --- a/CP02/README.md +++ b/CP02/README.md @@ -9,6 +9,7 @@ - 将输入固件文件切分为前面部分和最后256字节 - 调用合并脚本生成最终固件 - 自动删除最后1字节并检查文件大小限制 + - 合并完成后提示签名命令 ### merge_2323_firmware.sh - **功能**: 合并 PA768 updater、2323固件和IUM数据为单一固件文件 @@ -109,6 +110,27 @@ - `--check` 模式仅显示中文注释,不做修改 - 交互式确认后才执行删除 +### charging_viewer.py +- **功能**: 充电数据查看器,读取 PowerZ 等工具导出的 .sqlite / .db 充电测试数据 +- **用法**: `python3 charging_viewer.py [--no-page] [目录或文件路径 ...]` +- **描述**: + - 自动检测 pd_chart 和 table_1 表格式 + - 显示功率、电压、电流、温度、充电量等统计信息 + - 解析 PD 消息头、PDO(Fixed/Battery/Variable/PPS)、RDO + - 从 VBUS/CC 波形分析 PD 协商事件(电压跳变检测) + - 支持翻页输出(自动使用 less) + - 无参数时扫描当前目录下所有 .sqlite/.db 文件 + +### powerz_pd_decode.py +- **功能**: 解码 PowerZ 抓取的 USB PD 协议消息 +- **用法**: `python3 powerz_pd_decode.py [--window START END]` +- **描述**: + - 从 PowerZ sqlite 文件的 pd_table 读取原始 PD 数据 + - 解码 Source_Capabilities(Fixed/PPS/AVS PDO) + - 解码 Request/EPR_Request(Fixed RDO 和 PPS RDO) + - 关联 Request 与对应的 Source_Cap 显示选中的 PDO + - `--window` 参数可查看指定时间窗口内的所有消息 + ### build_gui.html - **功能**: IonBridge 构建工具的可视化命令生成器 - **用法**: 在浏览器中打开 `build_gui.html` @@ -117,6 +139,7 @@ - 可配置 build 选项(production、coredump)和 flash 选项(littlefs、protected_data、nvs 等) - 一键生成 Build、Flash、Monitor、Clean 等命令并复制 - 自动保存配置和命令历史到 localStorage + - 显示当前 variant 和模式对应的 sdkconfig 文件路径 ### flash_guide.md - **功能**: IonBridge 烧录操作参考文档 @@ -175,6 +198,15 @@ python3 txt_to_bin_converter.py ium11.txt ium11.bin # 文本转二 # 合成加密固件 python3 merge_encrypted_firmware.py base.bin encrypted.bin ium.bin -o programmer.bin +# 查看充电测试数据 +python3 charging_viewer.py . # 扫描当前目录 +python3 charging_viewer.py data.sqlite # 指定文件 +python3 charging_viewer.py --no-page /path/to/captures/ # 不翻页输出 + +# 解码 PowerZ PD 消息 +python3 powerz_pd_decode.py capture.sqlite # 显示摘要 +python3 powerz_pd_decode.py capture.sqlite --window 1.0 5.0 # 查看时间窗口 + # 复制 .o 文件 ./copy_o_files.sh /path/to/build /path/to/output diff --git a/CP02/build_gui.html b/CP02/build_gui.html index 6341564..89065e0 100644 --- a/CP02/build_gui.html +++ b/CP02/build_gui.html @@ -242,6 +242,15 @@ select:focus, input:focus { border-color: var(--accent); } border-color: var(--accent); color: var(--text); } +.sdkconfig-info { + font-size: 11px; + color: var(--text-dim); + background: var(--bg); + padding: 5px 8px; + border-radius: 4px; + margin-bottom: 10px; + font-family: 'SF Mono', monospace; +} @@ -263,6 +272,7 @@ select:focus, input:focus { border-color: var(--accent); } +
@@ -390,6 +400,27 @@ document.addEventListener('change', (e) => { if (e.target.matches('select')) saveState(); }); +// Develop: sdkconfig.{variant_config}, Production: sdkconfig-{variant}.production (fallback: sdkconfig.production) +const SDKCONFIG_MAP = { + cp02: { develop: 'sdkconfig.develop', production: 'sdkconfig.production' }, + cp02s: { develop: 'sdkconfig.cp02s.develop', production: 'sdkconfig-cp02s.production' }, + '3536': { develop: 'sdkconfig.3536.develop', production: 'sdkconfig.production' }, + gpu_demo: { develop: 'sdkconfig.develop', production: 'sdkconfig.production' }, + bird: { develop: 'sdkconfig.bird', production: 'sdkconfig.production' }, +}; + +function updateSdkconfigInfo() { + const v = val('variant'); + const mode = checked('opt-production') ? 'production' : 'develop'; + const cfg = SDKCONFIG_MAP[v]; + const file = cfg ? cfg[mode] : '?'; + document.getElementById('sdkconfigInfo').textContent = 'sdkconfig \u2190 configs/' + file; +} + +// Update on variant or production change +document.getElementById('variant').addEventListener('change', updateSdkconfigInfo); +document.getElementById('opt-production').addEventListener('change', updateSdkconfigInfo); + function buildCmd() { let cmd = 'make build variant=' + val('variant'); if (checked('opt-production')) cmd += ' production=1'; @@ -541,6 +572,7 @@ async function copyFromHistory(el, text) { } loadState(); +updateSdkconfigInfo(); diff --git a/CP02/charging_viewer.py b/CP02/charging_viewer.py new file mode 100644 index 0000000..a8b7a48 --- /dev/null +++ b/CP02/charging_viewer.py @@ -0,0 +1,775 @@ +#!/usr/bin/env python3 +"""充电数据查看器 - 读取 .sqlite / .db 充电测试数据并显示统计信息""" + +import sqlite3 +import sys +import os +import glob +import io +import subprocess +import shutil + + +# ── USB PD 消息类型 ────────────────────────────────────────── + +PD_CTRL_MSG = { + 0x01: 'GoodCRC', 0x02: 'GotoMin', 0x03: 'Accept', 0x04: 'Reject', + 0x05: 'Ping', 0x06: 'PS_RDY', 0x07: 'Get_Source_Cap', + 0x08: 'Get_Sink_Cap', 0x09: 'DR_Swap', 0x0A: 'PR_Swap', + 0x0B: 'VCONN_Swap', 0x0C: 'Wait', 0x0D: 'Soft_Reset', + 0x0E: 'Data_Reset', 0x0F: 'Data_Reset_Cpl', + 0x10: 'Not_Supported', 0x11: 'Get_Src_Cap_Ext', + 0x12: 'Get_Status', 0x13: 'FR_Swap', 0x14: 'Get_PPS_Status', + 0x15: 'Get_Country_Codes', 0x16: 'Get_Snk_Cap_Ext', + 0x17: 'Get_Source_Info', 0x18: 'Get_Revision', +} + +PD_DATA_MSG = { + 0x01: 'Source_Cap', 0x02: 'Request', 0x03: 'BIST', + 0x04: 'Sink_Cap', 0x05: 'Battery_Status', 0x06: 'Alert', + 0x07: 'Get_Country_Info', 0x08: 'Enter_USB', 0x09: 'EPR_Request', + 0x0A: 'EPR_Mode', 0x0B: 'Source_Info', 0x0C: 'Revision', + 0x0F: 'Vendor_Defined', +} + +PD_SPEC_REV = {0: '1.0', 1: '2.0', 2: '3.0', 3: '3.1'} + + +# ── PD 解析 ────────────────────────────────────────────────── + +def _to_int(val): + """尝试将值转为整数""" + if isinstance(val, int): + return val + if isinstance(val, float): + return int(val) + if isinstance(val, str): + s = val.strip().replace(' ', '') + if s.startswith(('0x', '0X')): + try: + return int(s, 16) + except ValueError: + return None + try: + return int(s) + except ValueError: + try: + return int(s, 16) + except ValueError: + return None + if isinstance(val, (bytes, bytearray)): + return int.from_bytes(val, 'little') + return None + + +def parse_pd_header(val): + """解析 PD 消息头 (16-bit)""" + h = _to_int(val) + if h is None or not (0 <= h <= 0xFFFF): + return None + num_do = (h >> 12) & 7 + msg_type = h & 0x1F + if num_do == 0: + name = PD_CTRL_MSG.get(msg_type, f'Ctrl_0x{msg_type:02X}') + else: + name = PD_DATA_MSG.get(msg_type, f'Data_0x{msg_type:02X}') + return { + 'raw': h, 'name': name, 'num_do': num_do, + 'msg_id': (h >> 9) & 7, + 'spec': PD_SPEC_REV.get((h >> 6) & 3, '?'), + 'prole': 'SRC' if (h >> 8) & 1 else 'SNK', + 'drole': 'DFP' if (h >> 5) & 1 else 'UFP', + } + + +def parse_pdo(val): + """解析 Power Data Object""" + pdo = _to_int(val) + if pdo is None: + return str(val) + t = (pdo >> 30) & 3 + if t == 0: # Fixed + v = ((pdo >> 10) & 0x3FF) * 0.05 + i_max = (pdo & 0x3FF) * 0.01 + flags = [] + if (pdo >> 29) & 1: flags.append('DRP') + if (pdo >> 27) & 1: flags.append('UP') + if (pdo >> 26) & 1: flags.append('USB') + if (pdo >> 25) & 1: flags.append('DRD') + f = f' [{",".join(flags)}]' if flags else '' + return f'Fixed {v:.1f}V/{i_max:.2f}A({v*i_max:.0f}W){f}' + elif t == 1: # Battery + max_v = ((pdo >> 20) & 0x3FF) * 0.05 + min_v = ((pdo >> 10) & 0x3FF) * 0.05 + max_p = (pdo & 0x3FF) * 0.25 + return f'Batt {min_v:.1f}-{max_v:.1f}V/{max_p:.1f}W' + elif t == 2: # Variable + max_v = ((pdo >> 20) & 0x3FF) * 0.05 + min_v = ((pdo >> 10) & 0x3FF) * 0.05 + max_i = (pdo & 0x3FF) * 0.01 + return f'Var {min_v:.1f}-{max_v:.1f}V/{max_i:.2f}A' + else: # Augmented + sub = (pdo >> 28) & 3 + if sub == 0: # SPR PPS + max_v = ((pdo >> 17) & 0xFF) * 0.1 + min_v = ((pdo >> 8) & 0xFF) * 0.1 + max_i = (pdo & 0x7F) * 0.05 + return f'PPS {min_v:.1f}-{max_v:.1f}V/{max_i:.2f}A' + return f'APDO 0x{pdo:08X}' + + +def parse_rdo(val): + """解析 Request Data Object""" + rdo = _to_int(val) + if rdo is None: + return str(val) + pos = (rdo >> 28) & 0x7 + op_i = ((rdo >> 10) & 0x3FF) * 0.01 + max_i = (rdo & 0x3FF) * 0.01 + flags = [] + if (rdo >> 27) & 1: flags.append('GiveBack') + if (rdo >> 26) & 1: flags.append('Mismatch') + f = f' [{",".join(flags)}]' if flags else '' + return f'Pos{pos} Op:{op_i:.2f}A Max:{max_i:.2f}A{f}' + + +def _parse_data_blob(val): + """尝试将 data/payload 拆分为多个 32-bit 数据对象""" + if val is None: + return [] + if isinstance(val, (bytes, bytearray)): + return [int.from_bytes(val[i:i+4], 'little') + for i in range(0, len(val) - 3, 4)] + if isinstance(val, str): + s = val.strip().replace(' ', '') + if s.startswith(('0x', '0X')): + s = s[2:] + if len(s) >= 8 and all(c in '0123456789abcdefABCDEF' for c in s): + return [int(s[i:i+8], 16) for i in range(0, len(s) - 7, 8)] + return [] + + +# ── PD 标准电压 ────────────────────────────────────────────── + +PD_STD_VOLTAGES = {5: '5V', 9: '9V', 12: '12V', 15: '15V', 20: '20V'} + + +def _nearest_pd_voltage(v): + """找到最近的标准 PD 电压""" + nearest = min(PD_STD_VOLTAGES, key=lambda x: abs(x - v)) + return nearest if abs(nearest - v) < 1.5 else None + + +# ── 数据读取 ───────────────────────────────────────────────── + +def read_pd_chart(db_path): + """读取 pd_chart 表""" + conn = sqlite3.connect(db_path) + cur = conn.cursor() + tables = [r[0] for r in cur.execute( + "SELECT name FROM sqlite_master WHERE type='table'").fetchall()] + if 'pd_chart' not in tables: + conn.close() + return None + rows = cur.execute( + "SELECT Time, VBUS, IBUS, CC1, CC2 FROM pd_chart ORDER BY Time" + ).fetchall() + conn.close() + return { + 'type': 'pd_chart', + 'columns': ['Time(s)', 'VBUS(V)', 'IBUS(A)', 'CC1(V)', 'CC2(V)'], + 'rows': rows, + } if rows else None + + +def read_table_1(db_path): + """读取 table_1 表""" + conn = sqlite3.connect(db_path) + cur = conn.cursor() + tables = [r[0] for r in cur.execute( + "SELECT name FROM sqlite_master WHERE type='table'").fetchall()] + if 'table_1' not in tables: + conn.close() + return None + rows = cur.execute( + "SELECT ElapsedTime, Unix, VBUS, IBUS, DP, DM, CC1, CC2, TEMP, CHARGE, ENERGY " + "FROM table_1 ORDER BY Unix" + ).fetchall() + params = None + if 'table_1_param' in tables: + p = cur.execute("SELECT * FROM table_1_param").fetchone() + if p: + params = p + conn.close() + if not rows: + return None + return { + 'type': 'table_1', + 'columns': ['时间', 'Unix(s)', 'VBUS(V)', 'IBUS(A)', 'DP(V)', 'DM(V)', + 'CC1(V)', 'CC2(V)', 'TEMP(°C)', 'CHARGE(Ah)', 'ENERGY(Wh)'], + 'rows': rows, + 'params': params, + } + + +def read_extra_tables(db_path): + """读取已知表以外的所有表 (PD 消息等)""" + known = {'pd_chart', 'table_1', 'table_1_param'} + conn = sqlite3.connect(db_path) + cur = conn.cursor() + tables = [r[0] for r in cur.execute( + "SELECT name FROM sqlite_master WHERE type='table'").fetchall()] + results = [] + for table in tables: + if table in known or table.startswith('sqlite_'): + continue + cols = [r[1] for r in cur.execute( + f"PRAGMA table_info([{table}])").fetchall()] + rows = cur.execute(f"SELECT * FROM [{table}]").fetchall() + if rows: + results.append({'table': table, 'columns': cols, 'rows': rows}) + conn.close() + return results or None + + +def load_file(path): + """自动检测文件类型并加载数据""" + data = read_pd_chart(path) + if data: + return data + data = read_table_1(path) + if data: + return data + return None + + +# ── 显示函数 ───────────────────────────────────────────────── + +def fmt_duration(seconds): + m, s = divmod(int(seconds), 60) + h, m = divmod(m, 60) + if h > 0: + return f"{h}h{m:02d}m{s:02d}s" + if m > 0: + return f"{m}m{s:02d}s" + return f"{s}s" + + +def show_stats(data, filename): + """显示数据统计信息""" + rows = data['rows'] + if not rows: + print(" (无数据)") + return + + print(f"\n{'='*70}") + print(f" 文件: {os.path.basename(filename)}") + print(f" 类型: {data['type']} | 数据点: {len(rows):,}") + print(f"{'='*70}") + + if data['type'] == 'pd_chart': + t_idx, v_idx, i_idx = 0, 1, 2 + else: + t_idx, v_idx, i_idx = 1, 2, 3 + + times = [r[t_idx] for r in rows] + vbus = [r[v_idx] for r in rows] + ibus = [r[i_idx] for r in rows] + power = [v * i for v, i in zip(vbus, ibus)] + + duration = max(times) - min(times) + max_p = max(power) + max_p_idx = power.index(max_p) + avg_p = sum(power) / len(power) + max_v = max(vbus) + avg_v = sum(vbus) / len(vbus) + max_i = max(ibus) + avg_i = sum(ibus) / len(ibus) + + print(f"\n ── 功率 ──────────────────────────────────") + print(f" 最大功率: {max_p:8.2f} W (@ {times[max_p_idx]:.1f}s)") + print(f" 平均功率: {avg_p:8.2f} W") + + print(f"\n ── 电压 ──────────────────────────────────") + print(f" 最大电压: {max_v:8.3f} V") + print(f" 最小电压: {min(vbus):8.3f} V") + print(f" 平均电压: {avg_v:8.3f} V") + + print(f"\n ── 电流 ──────────────────────────────────") + print(f" 最大电流: {max_i:8.3f} A") + print(f" 平均电流: {avg_i:8.3f} A") + + print(f"\n ── 时间 ──────────────────────────────────") + print(f" 测试时长: {fmt_duration(duration)} ({duration:.1f}s)") + + if data['type'] == 'table_1': + temps = [r[8] for r in rows] + charge = rows[-1][9] + energy = rows[-1][10] + print(f"\n ── 温度 ──────────────────────────────────") + print(f" 最高温度: {max(temps):8.1f} °C") + print(f" 最低温度: {min(temps):8.1f} °C") + print(f" 平均温度: {sum(temps)/len(temps):8.1f} °C") + print(f"\n ── 充电量 ────────────────────────────────") + print(f" 总电荷量: {charge*1000:8.2f} mAh") + print(f" 总能量: {energy*1000:8.2f} mWh") + + if data.get('params'): + p = data['params'] + print(f"\n ── 参数 ──────────────────────────────────") + print(f" 名称: {p[0]}") + print(f" 采样率: {p[3]}") + + # 功率分段概览 + print(f"\n ── 功率变化概览 ──────────────────────────") + n_segments = min(10, len(rows)) + seg_size = len(rows) // n_segments + print(f" {'时间段':>10s} {'平均功率':>8s} {'最大功率':>8s} {'柱状图'}") + max_seg_p = 0 + segments = [] + for i in range(n_segments): + start = i * seg_size + end = (i + 1) * seg_size if i < n_segments - 1 else len(rows) + seg_power = power[start:end] + seg_avg = sum(seg_power) / len(seg_power) + seg_max = max(seg_power) + t_start = times[start] + t_end = times[end - 1] + segments.append((t_start, t_end, seg_avg, seg_max)) + if seg_max > max_seg_p: + max_seg_p = seg_max + + bar_width = 30 + for t_start, t_end, seg_avg, seg_max in segments: + bar_len = int(seg_avg / max(max_seg_p, 0.01) * bar_width) if max_seg_p > 0 else 0 + bar = '█' * bar_len + '░' * (bar_width - bar_len) + label = f"{t_start:6.0f}-{t_end:.0f}s" + print(f" {label:>12s} {seg_avg:7.1f}W {seg_max:7.1f}W {bar}") + + print() + + +def show_data_preview(data, n=10): + """显示前 N 条数据预览""" + rows = data['rows'] + cols = data['columns'] + ['Power(W)'] + + if data['type'] == 'pd_chart': + v_idx, i_idx = 1, 2 + else: + v_idx, i_idx = 2, 3 + + widths = [max(len(c), 10) for c in cols] + + header = ' '.join(c.rjust(w) for c, w in zip(cols, widths)) + print(f" {header}") + print(f" {'─' * len(header)}") + + display = rows[:n] + for row in display: + power = row[v_idx] * row[i_idx] + vals = [] + for v in row: + if isinstance(v, float): + vals.append(f"{v:.4f}") + else: + vals.append(str(v)) + vals.append(f"{power:.2f}") + line = ' '.join(v.rjust(w) for v, w in zip(vals, widths)) + print(f" {line}") + + if len(rows) > n: + print(f" ... (共 {len(rows):,} 条)") + + +# ── PD 消息显示 ────────────────────────────────────────────── + +def _find_col(cols_lower, *keywords): + """查找包含关键词的列索引""" + for kw in keywords: + for i, c in enumerate(cols_lower): + if kw in c: + return i + return None + + +def show_extra_tables(extra_tables): + """显示额外的表 (PD 消息等)""" + for mt in extra_tables: + table = mt['table'] + cols = mt['columns'] + rows = mt['rows'] + cl = [c.lower() for c in cols] + + print(f"\n ── {table} ({len(rows)} 行) ── 列: {', '.join(cols)} ──") + + hdr_idx = _find_col(cl, 'header') + time_idx = _find_col(cl, 'time', 'timestamp') + sop_idx = _find_col(cl, 'sop') + type_idx = _find_col(cl, 'type', 'msgtype', 'msg_type') + + if hdr_idx is not None: + _show_pd_msg_table(rows, cols, cl, time_idx, hdr_idx, sop_idx) + elif type_idx is not None: + _show_typed_msg_table(rows, cols, cl, time_idx, type_idx) + else: + _show_generic_table(rows, cols) + + +def _show_pd_msg_table(rows, cols, cl, time_idx, hdr_idx, sop_idx): + """显示含 PD header 的消息表""" + skip = {time_idx, hdr_idx, sop_idx} - {None} + ignore_names = {'id', 'index', 'rowid', '_id', 'no', 'cc', 'dir', 'direction'} + do_indices = [] + data_idx = None + + for i, c in enumerate(cl): + if i in skip or c in ignore_names: + continue + if c in ('data', 'payload'): + data_idx = i + continue + do_indices.append(i) + + sop_names = {0: 'SOP', 1: "SOP'", 2: 'SOP"', 3: "SOP'D", 4: 'SOP"D'} + type_counts = {} + + for idx, row in enumerate(rows): + hdr = parse_pd_header(row[hdr_idx]) + + t_str = '' + if time_idx is not None: + tv = row[time_idx] + t_str = f'{tv:8.3f}s' if isinstance(tv, (int, float)) else str(tv)[:12] + + sop_str = '' + if sop_idx is not None: + sv = _to_int(row[sop_idx]) + if sv is not None: + sop_str = sop_names.get(sv, str(row[sop_idx])[:6]) + else: + sop_str = str(row[sop_idx])[:6] + + if hdr: + type_counts[hdr['name']] = type_counts.get(hdr['name'], 0) + 1 + hdr_hex = f'0x{hdr["raw"]:04X}' + meta = f'PD{hdr["spec"]} {hdr["prole"]}/{hdr["drole"]}' + print(f' #{idx+1:<4d} {t_str:>10s} {sop_str:<6s}' + f' [{hdr["name"]:<20s}] {hdr_hex} {meta}') + + if hdr['num_do'] > 0: + data_vals = [] + if data_idx is not None and row[data_idx]: + data_vals = _parse_data_blob(row[data_idx]) + if not data_vals and do_indices: + data_vals = [row[i] for i in do_indices + if row[i] is not None and row[i] != 0 and row[i] != ''] + _print_data_objects(hdr['name'], data_vals) + else: + print(f' #{idx+1:<4d} {t_str:>10s} {sop_str:<6s}' + f' Header={row[hdr_idx]}') + + _print_type_summary(type_counts) + + +def _show_typed_msg_table(rows, cols, cl, time_idx, type_idx): + """显示含 type 列的消息表""" + skip = {time_idx, type_idx} - {None} + data_indices = [i for i in range(len(cols)) if i not in skip + and cl[i] not in ('id', 'index', 'rowid', '_id', 'no')] + + type_counts = {} + for idx, row in enumerate(rows): + t_str = '' + if time_idx is not None: + tv = row[time_idx] + t_str = f'{tv:8.3f}s' if isinstance(tv, (int, float)) else str(tv)[:12] + + type_name = str(row[type_idx]) + type_counts[type_name] = type_counts.get(type_name, 0) + 1 + + extra = '' + if data_indices: + parts = [] + for i in data_indices: + v = row[i] + if v is not None and v != '' and v != 0: + iv = _to_int(v) + if iv is not None and isinstance(iv, int) and iv > 0xFFFF: + parts.append(f'{cols[i]}=0x{iv:08X}') + else: + parts.append(f'{cols[i]}={v}') + if parts: + extra = ' ' + ', '.join(parts) + + print(f' #{idx+1:<4d} {t_str:>10s} [{type_name:<20s}]{extra}') + + _print_type_summary(type_counts) + + +def _print_data_objects(msg_name, data_vals): + """打印 PD 数据对象""" + for j, dv in enumerate(data_vals): + iv = _to_int(dv) + hs = f'0x{iv:08X}' if iv is not None else str(dv) + if msg_name in ('Source_Cap', 'Sink_Cap'): + print(f' PDO{j+1}: {hs} -> {parse_pdo(dv)}') + elif msg_name == 'Request' and j == 0: + print(f' RDO: {hs} -> {parse_rdo(dv)}') + else: + print(f' DO{j+1}: {hs}') + + +def _print_type_summary(type_counts): + """打印消息类型统计""" + if type_counts: + print(f'\n ── 消息类型统计 ──────────────────────────') + for name, cnt in sorted(type_counts.items(), key=lambda x: -x[1]): + print(f' {name:<24s} {cnt:>5d} 条') + + +def _show_generic_table(rows, cols): + """通用表格显示""" + widths = [len(c) for c in cols] + for row in rows[:100]: + for i, v in enumerate(row): + s = f'{v:.4f}' if isinstance(v, float) else str(v) + widths[i] = max(widths[i], min(len(s), 24)) + + header = ' '.join(c.ljust(w) for c, w in zip(cols, widths)) + print(f' {header}') + print(f' {"─" * len(header)}') + + for row in rows: + vals = [] + for v in row: + if isinstance(v, float): + vals.append(f'{v:.4f}') + elif isinstance(v, int) and abs(v) > 0xFFFF: + vals.append(f'0x{v:X}') + else: + vals.append(str(v) if v is not None else '') + line = ' '.join(s.ljust(w)[:w] for s, w in zip(vals, widths)) + print(f' {line}') + + +# ── 数据库结构 & PD 事件分析 ────────────────────────────────── + +def show_db_schema(db_path): + """显示数据库所有表和列信息""" + conn = sqlite3.connect(db_path) + cur = conn.cursor() + tables = cur.execute( + "SELECT name FROM sqlite_master WHERE type='table'").fetchall() + print(f"\n ── 数据库结构 ────────────────────────────") + for (table,) in tables: + if table.startswith('sqlite_'): + continue + cols = cur.execute(f"PRAGMA table_info([{table}])").fetchall() + col_desc = ', '.join(f'{c[1]}({c[2]})' for c in cols) + count = cur.execute(f"SELECT COUNT(*) FROM [{table}]").fetchone()[0] + print(f' {table}: {count} 行 [{col_desc}]') + conn.close() + + +def analyze_pd_events(data): + """从 VBUS/IBUS/CC 波形数据分析 PD 协商事件""" + rows = data['rows'] + if not rows or len(rows) < 10: + return + + if data['type'] == 'pd_chart': + t_idx, v_idx, i_idx, cc1_idx, cc2_idx = 0, 1, 2, 3, 4 + else: + t_idx, v_idx, i_idx = 1, 2, 3 + cc1_idx, cc2_idx = 6, 7 + + # ── 活跃 CC 通道检测 + cc1_avg = sum(r[cc1_idx] for r in rows) / len(rows) + cc2_avg = sum(r[cc2_idx] for r in rows) / len(rows) + active_cc = 'CC1' if cc1_avg > cc2_avg else 'CC2' + active_cc_v = max(cc1_avg, cc2_avg) + + # ── 检测 VBUS 电压跳变 (PD 协商事件) + # 用滑动窗口平滑噪声 + win = min(5, max(1, len(rows) // 20)) + + def avg_v(center, radius=win): + lo = max(0, center - radius) + hi = min(len(rows), center + radius + 1) + vals = [rows[j][v_idx] for j in range(lo, hi)] + return sum(vals) / len(vals) + + events = [] + cooldown = 0 + + for idx in range(win, len(rows) - win): + if cooldown > 0: + cooldown -= 1 + continue + + v_before = avg_v(idx - win) + v_after = avg_v(idx + win) + dv = v_after - v_before + + if abs(dv) < 1.0: + continue + + t = rows[idx][t_idx] + i_val = rows[idx][i_idx] + from_pdv = _nearest_pd_voltage(v_before) + to_pdv = _nearest_pd_voltage(v_after) + from_s = f'{from_pdv}V' if from_pdv else f'{v_before:.1f}V' + to_s = f'{to_pdv}V' if to_pdv else f'{v_after:.1f}V' + + if v_after < 0.5: + etype = '断开' + msgs = 'Hard_Reset / 拔出' + elif dv > 0: + if from_pdv == 5: + etype = 'PD 协商成功' + msgs = 'Source_Cap → Request → Accept → PS_RDY' + else: + etype = '电压切换' + msgs = 'Request → Accept → PS_RDY' + else: + if to_pdv == 5 and (from_pdv and from_pdv > 5): + etype = '重置/重新协商' + msgs = 'Soft_Reset → Source_Cap → ...' + elif v_after < 0.5: + etype = '断开' + msgs = 'Hard_Reset' + else: + etype = '电压下降' + msgs = 'Request → Accept → PS_RDY' + + # 计算跳变后稳定功率 + stable_start = min(idx + win * 2, len(rows) - 1) + stable_end = min(stable_start + win * 4, len(rows)) + if stable_end > stable_start: + stable_v = sum(rows[j][v_idx] for j in range(stable_start, stable_end)) / (stable_end - stable_start) + stable_i = sum(rows[j][i_idx] for j in range(stable_start, stable_end)) / (stable_end - stable_start) + else: + stable_v, stable_i = v_after, i_val + + events.append({ + 'time': t, 'type': etype, + 'from': from_s, 'to': to_s, + 'voltage': stable_v, 'current': stable_i, + 'power': stable_v * stable_i, 'msgs': msgs, + }) + cooldown = win * 4 + + # ── 输出 + print(f"\n ── PD 协商事件 ──────────────────────────") + print(f" 活跃通道: {active_cc} (平均 {active_cc_v:.2f}V)") + + if not events: + avg_vbus = sum(r[v_idx] for r in rows) / len(rows) + pdv = _nearest_pd_voltage(avg_vbus) + v_str = f'{pdv}V' if pdv else f'{avg_vbus:.1f}V' + avg_ibus = sum(r[i_idx] for r in rows) / len(rows) + print(f' VBUS 全程稳定在 {v_str} ({avg_ibus:.2f}A, {avg_vbus*avg_ibus:.1f}W)') + print(f' 未检测到 PD 电压协商变化') + return + + for e in events: + print(f'\n {e["time"]:8.2f}s [{e["type"]}]') + print(f' {e["from"]} → {e["to"]} ' + f'({e["voltage"]:.1f}V {e["current"]:.2f}A {e["power"]:.1f}W)') + print(f' 推测: {e["msgs"]}') + + # 最终稳定状态 + last_50 = rows[-min(50, len(rows)):] + final_v = sum(r[v_idx] for r in last_50) / len(last_50) + final_i = sum(r[i_idx] for r in last_50) / len(last_50) + final_pdv = _nearest_pd_voltage(final_v) + final_v_str = f'{final_pdv}V' if final_pdv else f'{final_v:.1f}V' + print(f'\n 最终状态: {final_v_str} {final_i:.2f}A ({final_v*final_i:.1f}W)') + + +# ── 翻页输出 ───────────────────────────────────────────────── + +def page_output(text): + """内容超过终端高度时使用 less 翻页""" + try: + term_h = os.get_terminal_size().lines + except OSError: + term_h = 40 + + if text.count('\n') <= term_h - 2: + sys.stdout.write(text) + return + + pager = shutil.which('less') + if pager: + try: + proc = subprocess.Popen([pager, '-R'], stdin=subprocess.PIPE) + proc.communicate(input=text.encode('utf-8')) + except (BrokenPipeError, KeyboardInterrupt): + pass + else: + sys.stdout.write(text) + + +# ── 主程序 ─────────────────────────────────────────────────── + +def main(): + no_page = '--no-page' in sys.argv + args = [a for a in sys.argv[1:] if a != '--no-page'] + targets = args if args else ['.'] + + files = [] + for target in targets: + if os.path.isdir(target): + files.extend(glob.glob(os.path.join(target, '*.sqlite'))) + files.extend(glob.glob(os.path.join(target, '*.db'))) + elif os.path.isfile(target): + files.append(target) + else: + print(f" 跳过: {target} (不存在)") + + if not files: + print("未找到 .sqlite 或 .db 文件") + print(f"用法: {sys.argv[0]} [--no-page] [目录或文件路径 ...]") + sys.exit(1) + + files.sort() + + buf = io.StringIO() + old_stdout = sys.stdout + sys.stdout = buf + try: + print(f"\n找到 {len(files)} 个数据文件") + + for f in files: + data = load_file(f) + extra = read_extra_tables(f) + + if data: + show_stats(data, f) + analyze_pd_events(data) + show_data_preview(data) + elif not extra: + print(f"\n 跳过: {os.path.basename(f)} (无法识别的表结构)") + + if extra: + if not data: + print(f"\n{'='*70}") + print(f" 文件: {os.path.basename(f)}") + print(f"{'='*70}") + show_extra_tables(extra) + + show_db_schema(f) + finally: + sys.stdout = old_stdout + + text = buf.getvalue() + if no_page or not sys.stdout.isatty(): + sys.stdout.write(text) + else: + page_output(text) + + +if __name__ == '__main__': + main() diff --git a/CP02/powerz_pd_decode.py b/CP02/powerz_pd_decode.py new file mode 100644 index 0000000..b25540d --- /dev/null +++ b/CP02/powerz_pd_decode.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 +import argparse +import sqlite3 +from dataclasses import dataclass +from pathlib import Path +from typing import Any + + +CTRL_MESSAGES = { + 1: "GoodCRC", + 2: "GotoMin", + 3: "Accept", + 4: "Reject", + 5: "Ping", + 6: "PS_RDY", + 7: "Get_Source_Cap", + 8: "Get_Sink_Cap", + 9: "DR_Swap", + 10: "PR_Swap", + 11: "VCONN_Swap", + 12: "Wait", + 13: "Soft_Reset", + 16: "Not_Supported", + 17: "Get_Source_Cap_Ext", + 18: "Get_Status", + 19: "FR_Swap", + 20: "Get_PPS_Status", + 21: "Get_Country_Codes", + 22: "Get_Sink_Cap_Ext", + 23: "Get_Source_Info", + 24: "Get_Revision", +} + +DATA_MESSAGES = { + 1: "Source_Capabilities", + 2: "Request", + 3: "BIST", + 4: "Sink_Capabilities", + 5: "Battery_Status", + 6: "Alert", + 7: "Get_Country_Info", + 8: "Enter_USB", + 9: "EPR_Request", + 10: "EPR_Mode", + 11: "Source_Info", + 12: "Revision", + 15: "Vendor_Defined", +} + +EXT_MESSAGES = { + 1: "Source_Cap_Ext", + 2: "Status", + 3: "Get_Battery_Cap", + 4: "Get_Battery_Status", + 5: "Battery_Capabilities", + 6: "Get_Mfr_Info", + 7: "Mfr_Info", + 8: "Security_Request", + 9: "Security_Response", + 10: "FW_Update_Request", + 11: "FW_Update_Response", + 12: "PPS_Status", + 13: "Country_Info", + 14: "Country_Codes", + 15: "Sink_Cap_Ext", + 16: "Extended_Control", + 17: "EPR_Source_Cap", + 18: "EPR_Sink_Cap", + 30: "Vendor_Defined_Ext", +} + + +@dataclass +class Message: + time: float + vbus: float + ibus: float + raw: bytes + pd: bytes + header: dict[str, int] + cls: str + name: str + decoded: Any = None + + +def parse_header(halfword: int) -> dict[str, int]: + return { + "type": halfword & 0x1F, + "rev": (halfword >> 6) & 0x3, + "role": (halfword >> 8) & 0x1, + "msgid": (halfword >> 9) & 0x7, + "nobj": (halfword >> 12) & 0x7, + "ext": (halfword >> 15) & 0x1, + } + + +def classify_message(header: dict[str, int]) -> tuple[str, str]: + if header["ext"]: + return "ext", EXT_MESSAGES.get(header["type"], f"EXT_{header['type']}") + if header["nobj"] == 0: + return "ctrl", CTRL_MESSAGES.get(header["type"], f"CTRL_{header['type']}") + return "data", DATA_MESSAGES.get(header["type"], f"DATA_{header['type']}") + + +def parse_pdo(word: int) -> dict[str, Any]: + supply_type = (word >> 30) & 0x3 + if supply_type == 0: + return { + "kind": "fixed", + "mv": ((word >> 10) & 0x3FF) * 50, + "ma": (word & 0x3FF) * 10, + } + if supply_type == 3: + subtype = (word >> 28) & 0xF + if subtype == 0xC: + return { + "kind": "pps", + "min_mv": ((word >> 8) & 0xFF) * 100, + "max_mv": ((word >> 17) & 0xFF) * 100, + "ma": (word & 0x7F) * 50, + } + if subtype in (0xD, 0xE): + return { + "kind": "avs", + "min_mv": ((word >> 8) & 0xFF) * 100, + "max_mv": ((word >> 17) & 0x1FF) * 100, + "pdp_w": word & 0x7F, + } + return {"kind": "other", "raw": f"0x{word:08X}"} + + +def parse_fixed_rdo(word: int) -> dict[str, Any]: + return { + "kind": "fixed", + "obj_pos": (word >> 28) & 0xF, + "op_ma": (word & 0x3FF) * 10, + "max_ma": ((word >> 10) & 0x3FF) * 10, + "mismatch": (word >> 26) & 0x1, + "epr": (word >> 22) & 0x1, + } + + +def parse_pps_rdo(word: int) -> dict[str, Any]: + return { + "kind": "pps", + "obj_pos": (word >> 28) & 0xF, + "op_ma": (word & 0x7F) * 50, + "out_mv": ((word >> 9) & 0xFFF) * 20, + "epr": (word >> 22) & 0x1, + } + + +def decode_message(time: float, vbus: float, ibus: float, raw: bytes) -> Message | None: + if len(raw) < 8: + return None + # PowerZ 的 Raw 前 6 字节是记录头,后面是标准 PD message header + payload。 + pd = raw[6:] + if len(pd) < 2: + return None + header = parse_header(int.from_bytes(pd[:2], "little")) + cls, name = classify_message(header) + return Message(time, vbus, ibus, raw, pd, header, cls, name) + + +def load_messages(db_path: Path) -> list[Message]: + conn = sqlite3.connect(str(db_path)) + rows = conn.execute( + "select Time, Vbus, Ibus, Raw from pd_table " + "where Raw is not null and length(Raw) > 0 order by Time" + ).fetchall() + conn.close() + messages: list[Message] = [] + for time, vbus, ibus, raw in rows: + msg = decode_message(time, vbus, ibus, raw if isinstance(raw, bytes) else bytes(raw)) + if msg is not None: + messages.append(msg) + return messages + + +def decode_source_caps(messages: list[Message]) -> list[Message]: + caps: list[Message] = [] + for msg in messages: + if msg.name != "Source_Capabilities": + continue + objs = [] + for idx in range(msg.header["nobj"]): + offset = 2 + idx * 4 + if offset + 4 > len(msg.pd): + break + objs.append(parse_pdo(int.from_bytes(msg.pd[offset : offset + 4], "little"))) + msg.decoded = objs + caps.append(msg) + return caps + + +def decode_requests(messages: list[Message], source_caps: list[Message]) -> list[Message]: + requests: list[Message] = [] + for msg in messages: + if msg.name not in ("Request", "EPR_Request") or len(msg.pd) < 6: + continue + word = int.from_bytes(msg.pd[2:6], "little") + obj_pos = (word >> 28) & 0xF + selected = None + for cap_msg in reversed(source_caps): + if cap_msg.time <= msg.time and 0 < obj_pos <= len(cap_msg.decoded): + selected = cap_msg.decoded[obj_pos - 1] + break + if selected and selected.get("kind") in ("pps", "avs"): + decoded = parse_pps_rdo(word) + else: + decoded = parse_fixed_rdo(word) + decoded["selected"] = selected + msg.decoded = decoded + requests.append(msg) + return requests + + +def format_pdo(pdo: dict[str, Any]) -> str: + if pdo["kind"] == "fixed": + watts = pdo["mv"] * pdo["ma"] / 1_000_000 + return f"fixed {pdo['mv'] / 1000:.1f}V {pdo['ma'] / 1000:.3f}A ({watts:.1f}W)" + if pdo["kind"] == "pps": + return ( + f"PPS {pdo['min_mv'] / 1000:.1f}-{pdo['max_mv'] / 1000:.1f}V " + f"{pdo['ma'] / 1000:.3f}A" + ) + if pdo["kind"] == "avs": + return ( + f"AVS {pdo['min_mv'] / 1000:.1f}-{pdo['max_mv'] / 1000:.1f}V " + f"{pdo['pdp_w']}W" + ) + return str(pdo) + + +def format_request(req: dict[str, Any]) -> str: + selected = req.get("selected") + suffix = f", src={format_pdo(selected)}" if selected else "" + if req["kind"] == "pps": + return ( + f"obj={req['obj_pos']} req={req['out_mv'] / 1000:.3f}V " + f"{req['op_ma'] / 1000:.3f}A{suffix}" + ) + return ( + f"obj={req['obj_pos']} op={req['op_ma'] / 1000:.3f}A " + f"max={req['max_ma'] / 1000:.3f}A{suffix}" + ) + + +def print_summary(messages: list[Message], source_caps: list[Message], requests: list[Message]) -> None: + print(f"Decoded PD messages: {len(messages)}") + print() + + print("Source Capabilities:") + for msg in source_caps: + print(f"{msg.time:8.3f}s VBUS={msg.vbus:6.3f}V IBUS={msg.ibus:5.3f}A") + for idx, pdo in enumerate(msg.decoded, start=1): + print(f" PDO{idx}: {format_pdo(pdo)}") + print() + + print("Requests:") + for msg in requests: + print( + f"{msg.time:8.3f}s VBUS={msg.vbus:6.3f}V IBUS={msg.ibus:5.3f}A " + f"{msg.name}: {format_request(msg.decoded)}" + ) + + +def print_window(messages: list[Message], start: float, end: float) -> None: + print() + print(f"Messages in {start:.3f}s - {end:.3f}s:") + for msg in messages: + if start <= msg.time <= end and msg.name != "GoodCRC": + print( + f"{msg.time:8.3f}s {msg.name:18} " + f"VBUS={msg.vbus:6.3f}V IBUS={msg.ibus:5.3f}A " + f"nobj={msg.header['nobj']} pd={msg.pd.hex()}" + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Decode PD messages from a PowerZ sqlite capture." + ) + parser.add_argument("sqlite", type=Path, help="Path to PowerZ .sqlite file") + parser.add_argument( + "--window", + nargs=2, + type=float, + metavar=("START", "END"), + help="Print decoded messages in a time window", + ) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + messages = load_messages(args.sqlite) + source_caps = decode_source_caps(messages) + requests = decode_requests(messages, source_caps) + print_summary(messages, source_caps, requests) + if args.window: + print_window(messages, args.window[0], args.window[1]) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/CP02/split_and_merge.sh b/CP02/split_and_merge.sh index e91343c..5421688 100755 --- a/CP02/split_and_merge.sh +++ b/CP02/split_and_merge.sh @@ -79,6 +79,7 @@ if ./merge_2323_firmware.sh -u "$updater" -f "$head_file" -i "$tail_file" -o "$o # 清理临时文件 rm -f "$head_file" "$tail_file" echo "⚠️⚠️⚠️ Remember signing output file! ⚠️⚠️⚠️" + echo "python signer.py --output_dir output --firmware /tmp/40_xiaomi_0403.bin" else echo "合并失败,保留临时文件以便排查" fi