#!/usr/bin/env python3 """充电数据查看器 - 读取 .sqlite / .db 充电测试数据并显示统计信息""" import sqlite3 import sys import os import glob import io import subprocess import shutil # ── USB PD 消息类型 ────────────────────────────────────────── PD_CTRL_MSG = { 0x01: 'GoodCRC', 0x02: 'GotoMin', 0x03: 'Accept', 0x04: 'Reject', 0x05: 'Ping', 0x06: 'PS_RDY', 0x07: 'Get_Source_Cap', 0x08: 'Get_Sink_Cap', 0x09: 'DR_Swap', 0x0A: 'PR_Swap', 0x0B: 'VCONN_Swap', 0x0C: 'Wait', 0x0D: 'Soft_Reset', 0x0E: 'Data_Reset', 0x0F: 'Data_Reset_Cpl', 0x10: 'Not_Supported', 0x11: 'Get_Src_Cap_Ext', 0x12: 'Get_Status', 0x13: 'FR_Swap', 0x14: 'Get_PPS_Status', 0x15: 'Get_Country_Codes', 0x16: 'Get_Snk_Cap_Ext', 0x17: 'Get_Source_Info', 0x18: 'Get_Revision', } PD_DATA_MSG = { 0x01: 'Source_Cap', 0x02: 'Request', 0x03: 'BIST', 0x04: 'Sink_Cap', 0x05: 'Battery_Status', 0x06: 'Alert', 0x07: 'Get_Country_Info', 0x08: 'Enter_USB', 0x09: 'EPR_Request', 0x0A: 'EPR_Mode', 0x0B: 'Source_Info', 0x0C: 'Revision', 0x0F: 'Vendor_Defined', } PD_SPEC_REV = {0: '1.0', 1: '2.0', 2: '3.0', 3: '3.1'} # ── PD 解析 ────────────────────────────────────────────────── def _to_int(val): """尝试将值转为整数""" if isinstance(val, int): return val if isinstance(val, float): return int(val) if isinstance(val, str): s = val.strip().replace(' ', '') if s.startswith(('0x', '0X')): try: return int(s, 16) except ValueError: return None try: return int(s) except ValueError: try: return int(s, 16) except ValueError: return None if isinstance(val, (bytes, bytearray)): return int.from_bytes(val, 'little') return None def parse_pd_header(val): """解析 PD 消息头 (16-bit)""" h = _to_int(val) if h is None or not (0 <= h <= 0xFFFF): return None num_do = (h >> 12) & 7 msg_type = h & 0x1F if num_do == 0: name = PD_CTRL_MSG.get(msg_type, f'Ctrl_0x{msg_type:02X}') else: name = PD_DATA_MSG.get(msg_type, f'Data_0x{msg_type:02X}') return { 'raw': h, 'name': name, 'num_do': num_do, 'msg_id': (h >> 9) & 7, 'spec': PD_SPEC_REV.get((h >> 6) & 3, '?'), 'prole': 'SRC' if (h >> 8) & 1 else 'SNK', 'drole': 'DFP' if (h >> 5) & 1 else 'UFP', } def parse_pdo(val): """解析 Power Data Object""" pdo = _to_int(val) if pdo is None: return str(val) t = (pdo >> 30) & 3 if t == 0: # Fixed v = ((pdo >> 10) & 0x3FF) * 0.05 i_max = (pdo & 0x3FF) * 0.01 flags = [] if (pdo >> 29) & 1: flags.append('DRP') if (pdo >> 27) & 1: flags.append('UP') if (pdo >> 26) & 1: flags.append('USB') if (pdo >> 25) & 1: flags.append('DRD') f = f' [{",".join(flags)}]' if flags else '' return f'Fixed {v:.1f}V/{i_max:.2f}A({v*i_max:.0f}W){f}' elif t == 1: # Battery max_v = ((pdo >> 20) & 0x3FF) * 0.05 min_v = ((pdo >> 10) & 0x3FF) * 0.05 max_p = (pdo & 0x3FF) * 0.25 return f'Batt {min_v:.1f}-{max_v:.1f}V/{max_p:.1f}W' elif t == 2: # Variable max_v = ((pdo >> 20) & 0x3FF) * 0.05 min_v = ((pdo >> 10) & 0x3FF) * 0.05 max_i = (pdo & 0x3FF) * 0.01 return f'Var {min_v:.1f}-{max_v:.1f}V/{max_i:.2f}A' else: # Augmented sub = (pdo >> 28) & 3 if sub == 0: # SPR PPS max_v = ((pdo >> 17) & 0xFF) * 0.1 min_v = ((pdo >> 8) & 0xFF) * 0.1 max_i = (pdo & 0x7F) * 0.05 return f'PPS {min_v:.1f}-{max_v:.1f}V/{max_i:.2f}A' return f'APDO 0x{pdo:08X}' def parse_rdo(val): """解析 Request Data Object""" rdo = _to_int(val) if rdo is None: return str(val) pos = (rdo >> 28) & 0x7 op_i = ((rdo >> 10) & 0x3FF) * 0.01 max_i = (rdo & 0x3FF) * 0.01 flags = [] if (rdo >> 27) & 1: flags.append('GiveBack') if (rdo >> 26) & 1: flags.append('Mismatch') f = f' [{",".join(flags)}]' if flags else '' return f'Pos{pos} Op:{op_i:.2f}A Max:{max_i:.2f}A{f}' def _parse_data_blob(val): """尝试将 data/payload 拆分为多个 32-bit 数据对象""" if val is None: return [] if isinstance(val, (bytes, bytearray)): return [int.from_bytes(val[i:i+4], 'little') for i in range(0, len(val) - 3, 4)] if isinstance(val, str): s = val.strip().replace(' ', '') if s.startswith(('0x', '0X')): s = s[2:] if len(s) >= 8 and all(c in '0123456789abcdefABCDEF' for c in s): return [int(s[i:i+8], 16) for i in range(0, len(s) - 7, 8)] return [] # ── PD 标准电压 ────────────────────────────────────────────── PD_STD_VOLTAGES = {5: '5V', 9: '9V', 12: '12V', 15: '15V', 20: '20V'} def _nearest_pd_voltage(v): """找到最近的标准 PD 电压""" nearest = min(PD_STD_VOLTAGES, key=lambda x: abs(x - v)) return nearest if abs(nearest - v) < 1.5 else None # ── 数据读取 ───────────────────────────────────────────────── def read_pd_chart(db_path): """读取 pd_chart 表""" conn = sqlite3.connect(db_path) cur = conn.cursor() tables = [r[0] for r in cur.execute( "SELECT name FROM sqlite_master WHERE type='table'").fetchall()] if 'pd_chart' not in tables: conn.close() return None rows = cur.execute( "SELECT Time, VBUS, IBUS, CC1, CC2 FROM pd_chart ORDER BY Time" ).fetchall() conn.close() return { 'type': 'pd_chart', 'columns': ['Time(s)', 'VBUS(V)', 'IBUS(A)', 'CC1(V)', 'CC2(V)'], 'rows': rows, } if rows else None def read_table_1(db_path): """读取 table_1 表""" conn = sqlite3.connect(db_path) cur = conn.cursor() tables = [r[0] for r in cur.execute( "SELECT name FROM sqlite_master WHERE type='table'").fetchall()] if 'table_1' not in tables: conn.close() return None rows = cur.execute( "SELECT ElapsedTime, Unix, VBUS, IBUS, DP, DM, CC1, CC2, TEMP, CHARGE, ENERGY " "FROM table_1 ORDER BY Unix" ).fetchall() params = None if 'table_1_param' in tables: p = cur.execute("SELECT * FROM table_1_param").fetchone() if p: params = p conn.close() if not rows: return None return { 'type': 'table_1', 'columns': ['时间', 'Unix(s)', 'VBUS(V)', 'IBUS(A)', 'DP(V)', 'DM(V)', 'CC1(V)', 'CC2(V)', 'TEMP(°C)', 'CHARGE(Ah)', 'ENERGY(Wh)'], 'rows': rows, 'params': params, } def read_extra_tables(db_path): """读取已知表以外的所有表 (PD 消息等)""" known = {'pd_chart', 'table_1', 'table_1_param'} conn = sqlite3.connect(db_path) cur = conn.cursor() tables = [r[0] for r in cur.execute( "SELECT name FROM sqlite_master WHERE type='table'").fetchall()] results = [] for table in tables: if table in known or table.startswith('sqlite_'): continue cols = [r[1] for r in cur.execute( f"PRAGMA table_info([{table}])").fetchall()] rows = cur.execute(f"SELECT * FROM [{table}]").fetchall() if rows: results.append({'table': table, 'columns': cols, 'rows': rows}) conn.close() return results or None def load_file(path): """自动检测文件类型并加载数据""" data = read_pd_chart(path) if data: return data data = read_table_1(path) if data: return data return None # ── 显示函数 ───────────────────────────────────────────────── def fmt_duration(seconds): m, s = divmod(int(seconds), 60) h, m = divmod(m, 60) if h > 0: return f"{h}h{m:02d}m{s:02d}s" if m > 0: return f"{m}m{s:02d}s" return f"{s}s" def show_stats(data, filename): """显示数据统计信息""" rows = data['rows'] if not rows: print(" (无数据)") return print(f"\n{'='*70}") print(f" 文件: {os.path.basename(filename)}") print(f" 类型: {data['type']} | 数据点: {len(rows):,}") print(f"{'='*70}") if data['type'] == 'pd_chart': t_idx, v_idx, i_idx = 0, 1, 2 else: t_idx, v_idx, i_idx = 1, 2, 3 times = [r[t_idx] for r in rows] vbus = [r[v_idx] for r in rows] ibus = [r[i_idx] for r in rows] power = [v * i for v, i in zip(vbus, ibus)] duration = max(times) - min(times) max_p = max(power) max_p_idx = power.index(max_p) avg_p = sum(power) / len(power) max_v = max(vbus) avg_v = sum(vbus) / len(vbus) max_i = max(ibus) avg_i = sum(ibus) / len(ibus) print(f"\n ── 功率 ──────────────────────────────────") print(f" 最大功率: {max_p:8.2f} W (@ {times[max_p_idx]:.1f}s)") print(f" 平均功率: {avg_p:8.2f} W") print(f"\n ── 电压 ──────────────────────────────────") print(f" 最大电压: {max_v:8.3f} V") print(f" 最小电压: {min(vbus):8.3f} V") print(f" 平均电压: {avg_v:8.3f} V") print(f"\n ── 电流 ──────────────────────────────────") print(f" 最大电流: {max_i:8.3f} A") print(f" 平均电流: {avg_i:8.3f} A") print(f"\n ── 时间 ──────────────────────────────────") print(f" 测试时长: {fmt_duration(duration)} ({duration:.1f}s)") if data['type'] == 'table_1': temps = [r[8] for r in rows] charge = rows[-1][9] energy = rows[-1][10] print(f"\n ── 温度 ──────────────────────────────────") print(f" 最高温度: {max(temps):8.1f} °C") print(f" 最低温度: {min(temps):8.1f} °C") print(f" 平均温度: {sum(temps)/len(temps):8.1f} °C") print(f"\n ── 充电量 ────────────────────────────────") print(f" 总电荷量: {charge*1000:8.2f} mAh") print(f" 总能量: {energy*1000:8.2f} mWh") if data.get('params'): p = data['params'] print(f"\n ── 参数 ──────────────────────────────────") print(f" 名称: {p[0]}") print(f" 采样率: {p[3]}") # 功率分段概览 print(f"\n ── 功率变化概览 ──────────────────────────") n_segments = min(10, len(rows)) seg_size = len(rows) // n_segments print(f" {'时间段':>10s} {'平均功率':>8s} {'最大功率':>8s} {'柱状图'}") max_seg_p = 0 segments = [] for i in range(n_segments): start = i * seg_size end = (i + 1) * seg_size if i < n_segments - 1 else len(rows) seg_power = power[start:end] seg_avg = sum(seg_power) / len(seg_power) seg_max = max(seg_power) t_start = times[start] t_end = times[end - 1] segments.append((t_start, t_end, seg_avg, seg_max)) if seg_max > max_seg_p: max_seg_p = seg_max bar_width = 30 for t_start, t_end, seg_avg, seg_max in segments: bar_len = int(seg_avg / max(max_seg_p, 0.01) * bar_width) if max_seg_p > 0 else 0 bar = '█' * bar_len + '░' * (bar_width - bar_len) label = f"{t_start:6.0f}-{t_end:.0f}s" print(f" {label:>12s} {seg_avg:7.1f}W {seg_max:7.1f}W {bar}") print() def show_data_preview(data, n=10): """显示前 N 条数据预览""" rows = data['rows'] cols = data['columns'] + ['Power(W)'] if data['type'] == 'pd_chart': v_idx, i_idx = 1, 2 else: v_idx, i_idx = 2, 3 widths = [max(len(c), 10) for c in cols] header = ' '.join(c.rjust(w) for c, w in zip(cols, widths)) print(f" {header}") print(f" {'─' * len(header)}") display = rows[:n] for row in display: power = row[v_idx] * row[i_idx] vals = [] for v in row: if isinstance(v, float): vals.append(f"{v:.4f}") else: vals.append(str(v)) vals.append(f"{power:.2f}") line = ' '.join(v.rjust(w) for v, w in zip(vals, widths)) print(f" {line}") if len(rows) > n: print(f" ... (共 {len(rows):,} 条)") # ── PD 消息显示 ────────────────────────────────────────────── def _find_col(cols_lower, *keywords): """查找包含关键词的列索引""" for kw in keywords: for i, c in enumerate(cols_lower): if kw in c: return i return None def show_extra_tables(extra_tables): """显示额外的表 (PD 消息等)""" for mt in extra_tables: table = mt['table'] cols = mt['columns'] rows = mt['rows'] cl = [c.lower() for c in cols] print(f"\n ── {table} ({len(rows)} 行) ── 列: {', '.join(cols)} ──") hdr_idx = _find_col(cl, 'header') time_idx = _find_col(cl, 'time', 'timestamp') sop_idx = _find_col(cl, 'sop') type_idx = _find_col(cl, 'type', 'msgtype', 'msg_type') if hdr_idx is not None: _show_pd_msg_table(rows, cols, cl, time_idx, hdr_idx, sop_idx) elif type_idx is not None: _show_typed_msg_table(rows, cols, cl, time_idx, type_idx) else: _show_generic_table(rows, cols) def _show_pd_msg_table(rows, cols, cl, time_idx, hdr_idx, sop_idx): """显示含 PD header 的消息表""" skip = {time_idx, hdr_idx, sop_idx} - {None} ignore_names = {'id', 'index', 'rowid', '_id', 'no', 'cc', 'dir', 'direction'} do_indices = [] data_idx = None for i, c in enumerate(cl): if i in skip or c in ignore_names: continue if c in ('data', 'payload'): data_idx = i continue do_indices.append(i) sop_names = {0: 'SOP', 1: "SOP'", 2: 'SOP"', 3: "SOP'D", 4: 'SOP"D'} type_counts = {} for idx, row in enumerate(rows): hdr = parse_pd_header(row[hdr_idx]) t_str = '' if time_idx is not None: tv = row[time_idx] t_str = f'{tv:8.3f}s' if isinstance(tv, (int, float)) else str(tv)[:12] sop_str = '' if sop_idx is not None: sv = _to_int(row[sop_idx]) if sv is not None: sop_str = sop_names.get(sv, str(row[sop_idx])[:6]) else: sop_str = str(row[sop_idx])[:6] if hdr: type_counts[hdr['name']] = type_counts.get(hdr['name'], 0) + 1 hdr_hex = f'0x{hdr["raw"]:04X}' meta = f'PD{hdr["spec"]} {hdr["prole"]}/{hdr["drole"]}' print(f' #{idx+1:<4d} {t_str:>10s} {sop_str:<6s}' f' [{hdr["name"]:<20s}] {hdr_hex} {meta}') if hdr['num_do'] > 0: data_vals = [] if data_idx is not None and row[data_idx]: data_vals = _parse_data_blob(row[data_idx]) if not data_vals and do_indices: data_vals = [row[i] for i in do_indices if row[i] is not None and row[i] != 0 and row[i] != ''] _print_data_objects(hdr['name'], data_vals) else: print(f' #{idx+1:<4d} {t_str:>10s} {sop_str:<6s}' f' Header={row[hdr_idx]}') _print_type_summary(type_counts) def _show_typed_msg_table(rows, cols, cl, time_idx, type_idx): """显示含 type 列的消息表""" skip = {time_idx, type_idx} - {None} data_indices = [i for i in range(len(cols)) if i not in skip and cl[i] not in ('id', 'index', 'rowid', '_id', 'no')] type_counts = {} for idx, row in enumerate(rows): t_str = '' if time_idx is not None: tv = row[time_idx] t_str = f'{tv:8.3f}s' if isinstance(tv, (int, float)) else str(tv)[:12] type_name = str(row[type_idx]) type_counts[type_name] = type_counts.get(type_name, 0) + 1 extra = '' if data_indices: parts = [] for i in data_indices: v = row[i] if v is not None and v != '' and v != 0: iv = _to_int(v) if iv is not None and isinstance(iv, int) and iv > 0xFFFF: parts.append(f'{cols[i]}=0x{iv:08X}') else: parts.append(f'{cols[i]}={v}') if parts: extra = ' ' + ', '.join(parts) print(f' #{idx+1:<4d} {t_str:>10s} [{type_name:<20s}]{extra}') _print_type_summary(type_counts) def _print_data_objects(msg_name, data_vals): """打印 PD 数据对象""" for j, dv in enumerate(data_vals): iv = _to_int(dv) hs = f'0x{iv:08X}' if iv is not None else str(dv) if msg_name in ('Source_Cap', 'Sink_Cap'): print(f' PDO{j+1}: {hs} -> {parse_pdo(dv)}') elif msg_name == 'Request' and j == 0: print(f' RDO: {hs} -> {parse_rdo(dv)}') else: print(f' DO{j+1}: {hs}') def _print_type_summary(type_counts): """打印消息类型统计""" if type_counts: print(f'\n ── 消息类型统计 ──────────────────────────') for name, cnt in sorted(type_counts.items(), key=lambda x: -x[1]): print(f' {name:<24s} {cnt:>5d} 条') def _show_generic_table(rows, cols): """通用表格显示""" widths = [len(c) for c in cols] for row in rows[:100]: for i, v in enumerate(row): s = f'{v:.4f}' if isinstance(v, float) else str(v) widths[i] = max(widths[i], min(len(s), 24)) header = ' '.join(c.ljust(w) for c, w in zip(cols, widths)) print(f' {header}') print(f' {"─" * len(header)}') for row in rows: vals = [] for v in row: if isinstance(v, float): vals.append(f'{v:.4f}') elif isinstance(v, int) and abs(v) > 0xFFFF: vals.append(f'0x{v:X}') else: vals.append(str(v) if v is not None else '') line = ' '.join(s.ljust(w)[:w] for s, w in zip(vals, widths)) print(f' {line}') # ── 数据库结构 & PD 事件分析 ────────────────────────────────── def show_db_schema(db_path): """显示数据库所有表和列信息""" conn = sqlite3.connect(db_path) cur = conn.cursor() tables = cur.execute( "SELECT name FROM sqlite_master WHERE type='table'").fetchall() print(f"\n ── 数据库结构 ────────────────────────────") for (table,) in tables: if table.startswith('sqlite_'): continue cols = cur.execute(f"PRAGMA table_info([{table}])").fetchall() col_desc = ', '.join(f'{c[1]}({c[2]})' for c in cols) count = cur.execute(f"SELECT COUNT(*) FROM [{table}]").fetchone()[0] print(f' {table}: {count} 行 [{col_desc}]') conn.close() def analyze_pd_events(data): """从 VBUS/IBUS/CC 波形数据分析 PD 协商事件""" rows = data['rows'] if not rows or len(rows) < 10: return if data['type'] == 'pd_chart': t_idx, v_idx, i_idx, cc1_idx, cc2_idx = 0, 1, 2, 3, 4 else: t_idx, v_idx, i_idx = 1, 2, 3 cc1_idx, cc2_idx = 6, 7 # ── 活跃 CC 通道检测 cc1_avg = sum(r[cc1_idx] for r in rows) / len(rows) cc2_avg = sum(r[cc2_idx] for r in rows) / len(rows) active_cc = 'CC1' if cc1_avg > cc2_avg else 'CC2' active_cc_v = max(cc1_avg, cc2_avg) # ── 检测 VBUS 电压跳变 (PD 协商事件) # 用滑动窗口平滑噪声 win = min(5, max(1, len(rows) // 20)) def avg_v(center, radius=win): lo = max(0, center - radius) hi = min(len(rows), center + radius + 1) vals = [rows[j][v_idx] for j in range(lo, hi)] return sum(vals) / len(vals) events = [] cooldown = 0 for idx in range(win, len(rows) - win): if cooldown > 0: cooldown -= 1 continue v_before = avg_v(idx - win) v_after = avg_v(idx + win) dv = v_after - v_before if abs(dv) < 1.0: continue t = rows[idx][t_idx] i_val = rows[idx][i_idx] from_pdv = _nearest_pd_voltage(v_before) to_pdv = _nearest_pd_voltage(v_after) from_s = f'{from_pdv}V' if from_pdv else f'{v_before:.1f}V' to_s = f'{to_pdv}V' if to_pdv else f'{v_after:.1f}V' if v_after < 0.5: etype = '断开' msgs = 'Hard_Reset / 拔出' elif dv > 0: if from_pdv == 5: etype = 'PD 协商成功' msgs = 'Source_Cap → Request → Accept → PS_RDY' else: etype = '电压切换' msgs = 'Request → Accept → PS_RDY' else: if to_pdv == 5 and (from_pdv and from_pdv > 5): etype = '重置/重新协商' msgs = 'Soft_Reset → Source_Cap → ...' elif v_after < 0.5: etype = '断开' msgs = 'Hard_Reset' else: etype = '电压下降' msgs = 'Request → Accept → PS_RDY' # 计算跳变后稳定功率 stable_start = min(idx + win * 2, len(rows) - 1) stable_end = min(stable_start + win * 4, len(rows)) if stable_end > stable_start: stable_v = sum(rows[j][v_idx] for j in range(stable_start, stable_end)) / (stable_end - stable_start) stable_i = sum(rows[j][i_idx] for j in range(stable_start, stable_end)) / (stable_end - stable_start) else: stable_v, stable_i = v_after, i_val events.append({ 'time': t, 'type': etype, 'from': from_s, 'to': to_s, 'voltage': stable_v, 'current': stable_i, 'power': stable_v * stable_i, 'msgs': msgs, }) cooldown = win * 4 # ── 输出 print(f"\n ── PD 协商事件 ──────────────────────────") print(f" 活跃通道: {active_cc} (平均 {active_cc_v:.2f}V)") if not events: avg_vbus = sum(r[v_idx] for r in rows) / len(rows) pdv = _nearest_pd_voltage(avg_vbus) v_str = f'{pdv}V' if pdv else f'{avg_vbus:.1f}V' avg_ibus = sum(r[i_idx] for r in rows) / len(rows) print(f' VBUS 全程稳定在 {v_str} ({avg_ibus:.2f}A, {avg_vbus*avg_ibus:.1f}W)') print(f' 未检测到 PD 电压协商变化') return for e in events: print(f'\n {e["time"]:8.2f}s [{e["type"]}]') print(f' {e["from"]} → {e["to"]} ' f'({e["voltage"]:.1f}V {e["current"]:.2f}A {e["power"]:.1f}W)') print(f' 推测: {e["msgs"]}') # 最终稳定状态 last_50 = rows[-min(50, len(rows)):] final_v = sum(r[v_idx] for r in last_50) / len(last_50) final_i = sum(r[i_idx] for r in last_50) / len(last_50) final_pdv = _nearest_pd_voltage(final_v) final_v_str = f'{final_pdv}V' if final_pdv else f'{final_v:.1f}V' print(f'\n 最终状态: {final_v_str} {final_i:.2f}A ({final_v*final_i:.1f}W)') # ── 翻页输出 ───────────────────────────────────────────────── def page_output(text): """内容超过终端高度时使用 less 翻页""" try: term_h = os.get_terminal_size().lines except OSError: term_h = 40 if text.count('\n') <= term_h - 2: sys.stdout.write(text) return pager = shutil.which('less') if pager: try: proc = subprocess.Popen([pager, '-R'], stdin=subprocess.PIPE) proc.communicate(input=text.encode('utf-8')) except (BrokenPipeError, KeyboardInterrupt): pass else: sys.stdout.write(text) # ── 主程序 ─────────────────────────────────────────────────── def main(): no_page = '--no-page' in sys.argv args = [a for a in sys.argv[1:] if a != '--no-page'] targets = args if args else ['.'] files = [] for target in targets: if os.path.isdir(target): files.extend(glob.glob(os.path.join(target, '*.sqlite'))) files.extend(glob.glob(os.path.join(target, '*.db'))) elif os.path.isfile(target): files.append(target) else: print(f" 跳过: {target} (不存在)") if not files: print("未找到 .sqlite 或 .db 文件") print(f"用法: {sys.argv[0]} [--no-page] [目录或文件路径 ...]") sys.exit(1) files.sort() buf = io.StringIO() old_stdout = sys.stdout sys.stdout = buf try: print(f"\n找到 {len(files)} 个数据文件") for f in files: data = load_file(f) extra = read_extra_tables(f) if data: show_stats(data, f) analyze_pd_events(data) show_data_preview(data) elif not extra: print(f"\n 跳过: {os.path.basename(f)} (无法识别的表结构)") if extra: if not data: print(f"\n{'='*70}") print(f" 文件: {os.path.basename(f)}") print(f"{'='*70}") show_extra_tables(extra) show_db_schema(f) finally: sys.stdout = old_stdout text = buf.getvalue() if no_page or not sys.stdout.isatty(): sys.stdout.write(text) else: page_output(text) if __name__ == '__main__': main()