work_script/CP02/powerz_pd_decode.py
Ching L 8651878be8 feat(cp02): add PD decode tools and enhance build GUI
- Add charging_viewer.py for reading PowerZ .sqlite/.db charging data
  - Add powerz_pd_decode.py for decoding USB PD protocol messages
  - Add sdkconfig file path display to build_gui.html
  - Add signing reminder to split_and_merge.sh
2026-04-03 10:50:23 +08:00

308 lines
8.8 KiB
Python

#!/usr/bin/env python3
import argparse
import sqlite3
from dataclasses import dataclass
from pathlib import Path
from typing import Any
CTRL_MESSAGES = {
1: "GoodCRC",
2: "GotoMin",
3: "Accept",
4: "Reject",
5: "Ping",
6: "PS_RDY",
7: "Get_Source_Cap",
8: "Get_Sink_Cap",
9: "DR_Swap",
10: "PR_Swap",
11: "VCONN_Swap",
12: "Wait",
13: "Soft_Reset",
16: "Not_Supported",
17: "Get_Source_Cap_Ext",
18: "Get_Status",
19: "FR_Swap",
20: "Get_PPS_Status",
21: "Get_Country_Codes",
22: "Get_Sink_Cap_Ext",
23: "Get_Source_Info",
24: "Get_Revision",
}
DATA_MESSAGES = {
1: "Source_Capabilities",
2: "Request",
3: "BIST",
4: "Sink_Capabilities",
5: "Battery_Status",
6: "Alert",
7: "Get_Country_Info",
8: "Enter_USB",
9: "EPR_Request",
10: "EPR_Mode",
11: "Source_Info",
12: "Revision",
15: "Vendor_Defined",
}
EXT_MESSAGES = {
1: "Source_Cap_Ext",
2: "Status",
3: "Get_Battery_Cap",
4: "Get_Battery_Status",
5: "Battery_Capabilities",
6: "Get_Mfr_Info",
7: "Mfr_Info",
8: "Security_Request",
9: "Security_Response",
10: "FW_Update_Request",
11: "FW_Update_Response",
12: "PPS_Status",
13: "Country_Info",
14: "Country_Codes",
15: "Sink_Cap_Ext",
16: "Extended_Control",
17: "EPR_Source_Cap",
18: "EPR_Sink_Cap",
30: "Vendor_Defined_Ext",
}
@dataclass
class Message:
time: float
vbus: float
ibus: float
raw: bytes
pd: bytes
header: dict[str, int]
cls: str
name: str
decoded: Any = None
def parse_header(halfword: int) -> dict[str, int]:
return {
"type": halfword & 0x1F,
"rev": (halfword >> 6) & 0x3,
"role": (halfword >> 8) & 0x1,
"msgid": (halfword >> 9) & 0x7,
"nobj": (halfword >> 12) & 0x7,
"ext": (halfword >> 15) & 0x1,
}
def classify_message(header: dict[str, int]) -> tuple[str, str]:
if header["ext"]:
return "ext", EXT_MESSAGES.get(header["type"], f"EXT_{header['type']}")
if header["nobj"] == 0:
return "ctrl", CTRL_MESSAGES.get(header["type"], f"CTRL_{header['type']}")
return "data", DATA_MESSAGES.get(header["type"], f"DATA_{header['type']}")
def parse_pdo(word: int) -> dict[str, Any]:
supply_type = (word >> 30) & 0x3
if supply_type == 0:
return {
"kind": "fixed",
"mv": ((word >> 10) & 0x3FF) * 50,
"ma": (word & 0x3FF) * 10,
}
if supply_type == 3:
subtype = (word >> 28) & 0xF
if subtype == 0xC:
return {
"kind": "pps",
"min_mv": ((word >> 8) & 0xFF) * 100,
"max_mv": ((word >> 17) & 0xFF) * 100,
"ma": (word & 0x7F) * 50,
}
if subtype in (0xD, 0xE):
return {
"kind": "avs",
"min_mv": ((word >> 8) & 0xFF) * 100,
"max_mv": ((word >> 17) & 0x1FF) * 100,
"pdp_w": word & 0x7F,
}
return {"kind": "other", "raw": f"0x{word:08X}"}
def parse_fixed_rdo(word: int) -> dict[str, Any]:
return {
"kind": "fixed",
"obj_pos": (word >> 28) & 0xF,
"op_ma": (word & 0x3FF) * 10,
"max_ma": ((word >> 10) & 0x3FF) * 10,
"mismatch": (word >> 26) & 0x1,
"epr": (word >> 22) & 0x1,
}
def parse_pps_rdo(word: int) -> dict[str, Any]:
return {
"kind": "pps",
"obj_pos": (word >> 28) & 0xF,
"op_ma": (word & 0x7F) * 50,
"out_mv": ((word >> 9) & 0xFFF) * 20,
"epr": (word >> 22) & 0x1,
}
def decode_message(time: float, vbus: float, ibus: float, raw: bytes) -> Message | None:
if len(raw) < 8:
return None
# PowerZ 的 Raw 前 6 字节是记录头,后面是标准 PD message header + payload。
pd = raw[6:]
if len(pd) < 2:
return None
header = parse_header(int.from_bytes(pd[:2], "little"))
cls, name = classify_message(header)
return Message(time, vbus, ibus, raw, pd, header, cls, name)
def load_messages(db_path: Path) -> list[Message]:
conn = sqlite3.connect(str(db_path))
rows = conn.execute(
"select Time, Vbus, Ibus, Raw from pd_table "
"where Raw is not null and length(Raw) > 0 order by Time"
).fetchall()
conn.close()
messages: list[Message] = []
for time, vbus, ibus, raw in rows:
msg = decode_message(time, vbus, ibus, raw if isinstance(raw, bytes) else bytes(raw))
if msg is not None:
messages.append(msg)
return messages
def decode_source_caps(messages: list[Message]) -> list[Message]:
caps: list[Message] = []
for msg in messages:
if msg.name != "Source_Capabilities":
continue
objs = []
for idx in range(msg.header["nobj"]):
offset = 2 + idx * 4
if offset + 4 > len(msg.pd):
break
objs.append(parse_pdo(int.from_bytes(msg.pd[offset : offset + 4], "little")))
msg.decoded = objs
caps.append(msg)
return caps
def decode_requests(messages: list[Message], source_caps: list[Message]) -> list[Message]:
requests: list[Message] = []
for msg in messages:
if msg.name not in ("Request", "EPR_Request") or len(msg.pd) < 6:
continue
word = int.from_bytes(msg.pd[2:6], "little")
obj_pos = (word >> 28) & 0xF
selected = None
for cap_msg in reversed(source_caps):
if cap_msg.time <= msg.time and 0 < obj_pos <= len(cap_msg.decoded):
selected = cap_msg.decoded[obj_pos - 1]
break
if selected and selected.get("kind") in ("pps", "avs"):
decoded = parse_pps_rdo(word)
else:
decoded = parse_fixed_rdo(word)
decoded["selected"] = selected
msg.decoded = decoded
requests.append(msg)
return requests
def format_pdo(pdo: dict[str, Any]) -> str:
if pdo["kind"] == "fixed":
watts = pdo["mv"] * pdo["ma"] / 1_000_000
return f"fixed {pdo['mv'] / 1000:.1f}V {pdo['ma'] / 1000:.3f}A ({watts:.1f}W)"
if pdo["kind"] == "pps":
return (
f"PPS {pdo['min_mv'] / 1000:.1f}-{pdo['max_mv'] / 1000:.1f}V "
f"{pdo['ma'] / 1000:.3f}A"
)
if pdo["kind"] == "avs":
return (
f"AVS {pdo['min_mv'] / 1000:.1f}-{pdo['max_mv'] / 1000:.1f}V "
f"{pdo['pdp_w']}W"
)
return str(pdo)
def format_request(req: dict[str, Any]) -> str:
selected = req.get("selected")
suffix = f", src={format_pdo(selected)}" if selected else ""
if req["kind"] == "pps":
return (
f"obj={req['obj_pos']} req={req['out_mv'] / 1000:.3f}V "
f"{req['op_ma'] / 1000:.3f}A{suffix}"
)
return (
f"obj={req['obj_pos']} op={req['op_ma'] / 1000:.3f}A "
f"max={req['max_ma'] / 1000:.3f}A{suffix}"
)
def print_summary(messages: list[Message], source_caps: list[Message], requests: list[Message]) -> None:
print(f"Decoded PD messages: {len(messages)}")
print()
print("Source Capabilities:")
for msg in source_caps:
print(f"{msg.time:8.3f}s VBUS={msg.vbus:6.3f}V IBUS={msg.ibus:5.3f}A")
for idx, pdo in enumerate(msg.decoded, start=1):
print(f" PDO{idx}: {format_pdo(pdo)}")
print()
print("Requests:")
for msg in requests:
print(
f"{msg.time:8.3f}s VBUS={msg.vbus:6.3f}V IBUS={msg.ibus:5.3f}A "
f"{msg.name}: {format_request(msg.decoded)}"
)
def print_window(messages: list[Message], start: float, end: float) -> None:
print()
print(f"Messages in {start:.3f}s - {end:.3f}s:")
for msg in messages:
if start <= msg.time <= end and msg.name != "GoodCRC":
print(
f"{msg.time:8.3f}s {msg.name:18} "
f"VBUS={msg.vbus:6.3f}V IBUS={msg.ibus:5.3f}A "
f"nobj={msg.header['nobj']} pd={msg.pd.hex()}"
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Decode PD messages from a PowerZ sqlite capture."
)
parser.add_argument("sqlite", type=Path, help="Path to PowerZ .sqlite file")
parser.add_argument(
"--window",
nargs=2,
type=float,
metavar=("START", "END"),
help="Print decoded messages in a time window",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
messages = load_messages(args.sqlite)
source_caps = decode_source_caps(messages)
requests = decode_requests(messages, source_caps)
print_summary(messages, source_caps, requests)
if args.window:
print_window(messages, args.window[0], args.window[1])
return 0
if __name__ == "__main__":
raise SystemExit(main())