#!/usr/bin/env python3 import argparse import sqlite3 from dataclasses import dataclass from pathlib import Path from typing import Any CTRL_MESSAGES = { 1: "GoodCRC", 2: "GotoMin", 3: "Accept", 4: "Reject", 5: "Ping", 6: "PS_RDY", 7: "Get_Source_Cap", 8: "Get_Sink_Cap", 9: "DR_Swap", 10: "PR_Swap", 11: "VCONN_Swap", 12: "Wait", 13: "Soft_Reset", 16: "Not_Supported", 17: "Get_Source_Cap_Ext", 18: "Get_Status", 19: "FR_Swap", 20: "Get_PPS_Status", 21: "Get_Country_Codes", 22: "Get_Sink_Cap_Ext", 23: "Get_Source_Info", 24: "Get_Revision", } DATA_MESSAGES = { 1: "Source_Capabilities", 2: "Request", 3: "BIST", 4: "Sink_Capabilities", 5: "Battery_Status", 6: "Alert", 7: "Get_Country_Info", 8: "Enter_USB", 9: "EPR_Request", 10: "EPR_Mode", 11: "Source_Info", 12: "Revision", 15: "Vendor_Defined", } EXT_MESSAGES = { 1: "Source_Cap_Ext", 2: "Status", 3: "Get_Battery_Cap", 4: "Get_Battery_Status", 5: "Battery_Capabilities", 6: "Get_Mfr_Info", 7: "Mfr_Info", 8: "Security_Request", 9: "Security_Response", 10: "FW_Update_Request", 11: "FW_Update_Response", 12: "PPS_Status", 13: "Country_Info", 14: "Country_Codes", 15: "Sink_Cap_Ext", 16: "Extended_Control", 17: "EPR_Source_Cap", 18: "EPR_Sink_Cap", 30: "Vendor_Defined_Ext", } @dataclass class Message: time: float vbus: float ibus: float raw: bytes pd: bytes header: dict[str, int] cls: str name: str decoded: Any = None def parse_header(halfword: int) -> dict[str, int]: return { "type": halfword & 0x1F, "rev": (halfword >> 6) & 0x3, "role": (halfword >> 8) & 0x1, "msgid": (halfword >> 9) & 0x7, "nobj": (halfword >> 12) & 0x7, "ext": (halfword >> 15) & 0x1, } def classify_message(header: dict[str, int]) -> tuple[str, str]: if header["ext"]: return "ext", EXT_MESSAGES.get(header["type"], f"EXT_{header['type']}") if header["nobj"] == 0: return "ctrl", CTRL_MESSAGES.get(header["type"], f"CTRL_{header['type']}") return "data", DATA_MESSAGES.get(header["type"], f"DATA_{header['type']}") def parse_pdo(word: int) -> dict[str, Any]: supply_type = (word >> 30) & 0x3 if supply_type == 0: return { "kind": "fixed", "mv": ((word >> 10) & 0x3FF) * 50, "ma": (word & 0x3FF) * 10, } if supply_type == 3: subtype = (word >> 28) & 0xF if subtype == 0xC: return { "kind": "pps", "min_mv": ((word >> 8) & 0xFF) * 100, "max_mv": ((word >> 17) & 0xFF) * 100, "ma": (word & 0x7F) * 50, } if subtype in (0xD, 0xE): return { "kind": "avs", "min_mv": ((word >> 8) & 0xFF) * 100, "max_mv": ((word >> 17) & 0x1FF) * 100, "pdp_w": word & 0x7F, } return {"kind": "other", "raw": f"0x{word:08X}"} def parse_fixed_rdo(word: int) -> dict[str, Any]: return { "kind": "fixed", "obj_pos": (word >> 28) & 0xF, "op_ma": (word & 0x3FF) * 10, "max_ma": ((word >> 10) & 0x3FF) * 10, "mismatch": (word >> 26) & 0x1, "epr": (word >> 22) & 0x1, } def parse_pps_rdo(word: int) -> dict[str, Any]: return { "kind": "pps", "obj_pos": (word >> 28) & 0xF, "op_ma": (word & 0x7F) * 50, "out_mv": ((word >> 9) & 0xFFF) * 20, "epr": (word >> 22) & 0x1, } def decode_message(time: float, vbus: float, ibus: float, raw: bytes) -> Message | None: if len(raw) < 8: return None # PowerZ 的 Raw 前 6 字节是记录头,后面是标准 PD message header + payload。 pd = raw[6:] if len(pd) < 2: return None header = parse_header(int.from_bytes(pd[:2], "little")) cls, name = classify_message(header) return Message(time, vbus, ibus, raw, pd, header, cls, name) def load_messages(db_path: Path) -> list[Message]: conn = sqlite3.connect(str(db_path)) rows = conn.execute( "select Time, Vbus, Ibus, Raw from pd_table " "where Raw is not null and length(Raw) > 0 order by Time" ).fetchall() conn.close() messages: list[Message] = [] for time, vbus, ibus, raw in rows: msg = decode_message(time, vbus, ibus, raw if isinstance(raw, bytes) else bytes(raw)) if msg is not None: messages.append(msg) return messages def decode_source_caps(messages: list[Message]) -> list[Message]: caps: list[Message] = [] for msg in messages: if msg.name != "Source_Capabilities": continue objs = [] for idx in range(msg.header["nobj"]): offset = 2 + idx * 4 if offset + 4 > len(msg.pd): break objs.append(parse_pdo(int.from_bytes(msg.pd[offset : offset + 4], "little"))) msg.decoded = objs caps.append(msg) return caps def decode_requests(messages: list[Message], source_caps: list[Message]) -> list[Message]: requests: list[Message] = [] for msg in messages: if msg.name not in ("Request", "EPR_Request") or len(msg.pd) < 6: continue word = int.from_bytes(msg.pd[2:6], "little") obj_pos = (word >> 28) & 0xF selected = None for cap_msg in reversed(source_caps): if cap_msg.time <= msg.time and 0 < obj_pos <= len(cap_msg.decoded): selected = cap_msg.decoded[obj_pos - 1] break if selected and selected.get("kind") in ("pps", "avs"): decoded = parse_pps_rdo(word) else: decoded = parse_fixed_rdo(word) decoded["selected"] = selected msg.decoded = decoded requests.append(msg) return requests def format_pdo(pdo: dict[str, Any]) -> str: if pdo["kind"] == "fixed": watts = pdo["mv"] * pdo["ma"] / 1_000_000 return f"fixed {pdo['mv'] / 1000:.1f}V {pdo['ma'] / 1000:.3f}A ({watts:.1f}W)" if pdo["kind"] == "pps": return ( f"PPS {pdo['min_mv'] / 1000:.1f}-{pdo['max_mv'] / 1000:.1f}V " f"{pdo['ma'] / 1000:.3f}A" ) if pdo["kind"] == "avs": return ( f"AVS {pdo['min_mv'] / 1000:.1f}-{pdo['max_mv'] / 1000:.1f}V " f"{pdo['pdp_w']}W" ) return str(pdo) def format_request(req: dict[str, Any]) -> str: selected = req.get("selected") suffix = f", src={format_pdo(selected)}" if selected else "" if req["kind"] == "pps": return ( f"obj={req['obj_pos']} req={req['out_mv'] / 1000:.3f}V " f"{req['op_ma'] / 1000:.3f}A{suffix}" ) return ( f"obj={req['obj_pos']} op={req['op_ma'] / 1000:.3f}A " f"max={req['max_ma'] / 1000:.3f}A{suffix}" ) def print_summary(messages: list[Message], source_caps: list[Message], requests: list[Message]) -> None: print(f"Decoded PD messages: {len(messages)}") print() print("Source Capabilities:") for msg in source_caps: print(f"{msg.time:8.3f}s VBUS={msg.vbus:6.3f}V IBUS={msg.ibus:5.3f}A") for idx, pdo in enumerate(msg.decoded, start=1): print(f" PDO{idx}: {format_pdo(pdo)}") print() print("Requests:") for msg in requests: print( f"{msg.time:8.3f}s VBUS={msg.vbus:6.3f}V IBUS={msg.ibus:5.3f}A " f"{msg.name}: {format_request(msg.decoded)}" ) def print_window(messages: list[Message], start: float, end: float) -> None: print() print(f"Messages in {start:.3f}s - {end:.3f}s:") for msg in messages: if start <= msg.time <= end and msg.name != "GoodCRC": print( f"{msg.time:8.3f}s {msg.name:18} " f"VBUS={msg.vbus:6.3f}V IBUS={msg.ibus:5.3f}A " f"nobj={msg.header['nobj']} pd={msg.pd.hex()}" ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Decode PD messages from a PowerZ sqlite capture." ) parser.add_argument("sqlite", type=Path, help="Path to PowerZ .sqlite file") parser.add_argument( "--window", nargs=2, type=float, metavar=("START", "END"), help="Print decoded messages in a time window", ) return parser.parse_args() def main() -> int: args = parse_args() messages = load_messages(args.sqlite) source_caps = decode_source_caps(messages) requests = decode_requests(messages, source_caps) print_summary(messages, source_caps, requests) if args.window: print_window(messages, args.window[0], args.window[1]) return 0 if __name__ == "__main__": raise SystemExit(main())