- Add charging_viewer.py for reading PowerZ .sqlite/.db charging data - Add powerz_pd_decode.py for decoding USB PD protocol messages - Add sdkconfig file path display to build_gui.html - Add signing reminder to split_and_merge.sh
308 lines
8.8 KiB
Python
308 lines
8.8 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import sqlite3
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
CTRL_MESSAGES = {
|
|
1: "GoodCRC",
|
|
2: "GotoMin",
|
|
3: "Accept",
|
|
4: "Reject",
|
|
5: "Ping",
|
|
6: "PS_RDY",
|
|
7: "Get_Source_Cap",
|
|
8: "Get_Sink_Cap",
|
|
9: "DR_Swap",
|
|
10: "PR_Swap",
|
|
11: "VCONN_Swap",
|
|
12: "Wait",
|
|
13: "Soft_Reset",
|
|
16: "Not_Supported",
|
|
17: "Get_Source_Cap_Ext",
|
|
18: "Get_Status",
|
|
19: "FR_Swap",
|
|
20: "Get_PPS_Status",
|
|
21: "Get_Country_Codes",
|
|
22: "Get_Sink_Cap_Ext",
|
|
23: "Get_Source_Info",
|
|
24: "Get_Revision",
|
|
}
|
|
|
|
DATA_MESSAGES = {
|
|
1: "Source_Capabilities",
|
|
2: "Request",
|
|
3: "BIST",
|
|
4: "Sink_Capabilities",
|
|
5: "Battery_Status",
|
|
6: "Alert",
|
|
7: "Get_Country_Info",
|
|
8: "Enter_USB",
|
|
9: "EPR_Request",
|
|
10: "EPR_Mode",
|
|
11: "Source_Info",
|
|
12: "Revision",
|
|
15: "Vendor_Defined",
|
|
}
|
|
|
|
EXT_MESSAGES = {
|
|
1: "Source_Cap_Ext",
|
|
2: "Status",
|
|
3: "Get_Battery_Cap",
|
|
4: "Get_Battery_Status",
|
|
5: "Battery_Capabilities",
|
|
6: "Get_Mfr_Info",
|
|
7: "Mfr_Info",
|
|
8: "Security_Request",
|
|
9: "Security_Response",
|
|
10: "FW_Update_Request",
|
|
11: "FW_Update_Response",
|
|
12: "PPS_Status",
|
|
13: "Country_Info",
|
|
14: "Country_Codes",
|
|
15: "Sink_Cap_Ext",
|
|
16: "Extended_Control",
|
|
17: "EPR_Source_Cap",
|
|
18: "EPR_Sink_Cap",
|
|
30: "Vendor_Defined_Ext",
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class Message:
|
|
time: float
|
|
vbus: float
|
|
ibus: float
|
|
raw: bytes
|
|
pd: bytes
|
|
header: dict[str, int]
|
|
cls: str
|
|
name: str
|
|
decoded: Any = None
|
|
|
|
|
|
def parse_header(halfword: int) -> dict[str, int]:
|
|
return {
|
|
"type": halfword & 0x1F,
|
|
"rev": (halfword >> 6) & 0x3,
|
|
"role": (halfword >> 8) & 0x1,
|
|
"msgid": (halfword >> 9) & 0x7,
|
|
"nobj": (halfword >> 12) & 0x7,
|
|
"ext": (halfword >> 15) & 0x1,
|
|
}
|
|
|
|
|
|
def classify_message(header: dict[str, int]) -> tuple[str, str]:
|
|
if header["ext"]:
|
|
return "ext", EXT_MESSAGES.get(header["type"], f"EXT_{header['type']}")
|
|
if header["nobj"] == 0:
|
|
return "ctrl", CTRL_MESSAGES.get(header["type"], f"CTRL_{header['type']}")
|
|
return "data", DATA_MESSAGES.get(header["type"], f"DATA_{header['type']}")
|
|
|
|
|
|
def parse_pdo(word: int) -> dict[str, Any]:
|
|
supply_type = (word >> 30) & 0x3
|
|
if supply_type == 0:
|
|
return {
|
|
"kind": "fixed",
|
|
"mv": ((word >> 10) & 0x3FF) * 50,
|
|
"ma": (word & 0x3FF) * 10,
|
|
}
|
|
if supply_type == 3:
|
|
subtype = (word >> 28) & 0xF
|
|
if subtype == 0xC:
|
|
return {
|
|
"kind": "pps",
|
|
"min_mv": ((word >> 8) & 0xFF) * 100,
|
|
"max_mv": ((word >> 17) & 0xFF) * 100,
|
|
"ma": (word & 0x7F) * 50,
|
|
}
|
|
if subtype in (0xD, 0xE):
|
|
return {
|
|
"kind": "avs",
|
|
"min_mv": ((word >> 8) & 0xFF) * 100,
|
|
"max_mv": ((word >> 17) & 0x1FF) * 100,
|
|
"pdp_w": word & 0x7F,
|
|
}
|
|
return {"kind": "other", "raw": f"0x{word:08X}"}
|
|
|
|
|
|
def parse_fixed_rdo(word: int) -> dict[str, Any]:
|
|
return {
|
|
"kind": "fixed",
|
|
"obj_pos": (word >> 28) & 0xF,
|
|
"op_ma": (word & 0x3FF) * 10,
|
|
"max_ma": ((word >> 10) & 0x3FF) * 10,
|
|
"mismatch": (word >> 26) & 0x1,
|
|
"epr": (word >> 22) & 0x1,
|
|
}
|
|
|
|
|
|
def parse_pps_rdo(word: int) -> dict[str, Any]:
|
|
return {
|
|
"kind": "pps",
|
|
"obj_pos": (word >> 28) & 0xF,
|
|
"op_ma": (word & 0x7F) * 50,
|
|
"out_mv": ((word >> 9) & 0xFFF) * 20,
|
|
"epr": (word >> 22) & 0x1,
|
|
}
|
|
|
|
|
|
def decode_message(time: float, vbus: float, ibus: float, raw: bytes) -> Message | None:
|
|
if len(raw) < 8:
|
|
return None
|
|
# PowerZ 的 Raw 前 6 字节是记录头,后面是标准 PD message header + payload。
|
|
pd = raw[6:]
|
|
if len(pd) < 2:
|
|
return None
|
|
header = parse_header(int.from_bytes(pd[:2], "little"))
|
|
cls, name = classify_message(header)
|
|
return Message(time, vbus, ibus, raw, pd, header, cls, name)
|
|
|
|
|
|
def load_messages(db_path: Path) -> list[Message]:
|
|
conn = sqlite3.connect(str(db_path))
|
|
rows = conn.execute(
|
|
"select Time, Vbus, Ibus, Raw from pd_table "
|
|
"where Raw is not null and length(Raw) > 0 order by Time"
|
|
).fetchall()
|
|
conn.close()
|
|
messages: list[Message] = []
|
|
for time, vbus, ibus, raw in rows:
|
|
msg = decode_message(time, vbus, ibus, raw if isinstance(raw, bytes) else bytes(raw))
|
|
if msg is not None:
|
|
messages.append(msg)
|
|
return messages
|
|
|
|
|
|
def decode_source_caps(messages: list[Message]) -> list[Message]:
|
|
caps: list[Message] = []
|
|
for msg in messages:
|
|
if msg.name != "Source_Capabilities":
|
|
continue
|
|
objs = []
|
|
for idx in range(msg.header["nobj"]):
|
|
offset = 2 + idx * 4
|
|
if offset + 4 > len(msg.pd):
|
|
break
|
|
objs.append(parse_pdo(int.from_bytes(msg.pd[offset : offset + 4], "little")))
|
|
msg.decoded = objs
|
|
caps.append(msg)
|
|
return caps
|
|
|
|
|
|
def decode_requests(messages: list[Message], source_caps: list[Message]) -> list[Message]:
|
|
requests: list[Message] = []
|
|
for msg in messages:
|
|
if msg.name not in ("Request", "EPR_Request") or len(msg.pd) < 6:
|
|
continue
|
|
word = int.from_bytes(msg.pd[2:6], "little")
|
|
obj_pos = (word >> 28) & 0xF
|
|
selected = None
|
|
for cap_msg in reversed(source_caps):
|
|
if cap_msg.time <= msg.time and 0 < obj_pos <= len(cap_msg.decoded):
|
|
selected = cap_msg.decoded[obj_pos - 1]
|
|
break
|
|
if selected and selected.get("kind") in ("pps", "avs"):
|
|
decoded = parse_pps_rdo(word)
|
|
else:
|
|
decoded = parse_fixed_rdo(word)
|
|
decoded["selected"] = selected
|
|
msg.decoded = decoded
|
|
requests.append(msg)
|
|
return requests
|
|
|
|
|
|
def format_pdo(pdo: dict[str, Any]) -> str:
|
|
if pdo["kind"] == "fixed":
|
|
watts = pdo["mv"] * pdo["ma"] / 1_000_000
|
|
return f"fixed {pdo['mv'] / 1000:.1f}V {pdo['ma'] / 1000:.3f}A ({watts:.1f}W)"
|
|
if pdo["kind"] == "pps":
|
|
return (
|
|
f"PPS {pdo['min_mv'] / 1000:.1f}-{pdo['max_mv'] / 1000:.1f}V "
|
|
f"{pdo['ma'] / 1000:.3f}A"
|
|
)
|
|
if pdo["kind"] == "avs":
|
|
return (
|
|
f"AVS {pdo['min_mv'] / 1000:.1f}-{pdo['max_mv'] / 1000:.1f}V "
|
|
f"{pdo['pdp_w']}W"
|
|
)
|
|
return str(pdo)
|
|
|
|
|
|
def format_request(req: dict[str, Any]) -> str:
|
|
selected = req.get("selected")
|
|
suffix = f", src={format_pdo(selected)}" if selected else ""
|
|
if req["kind"] == "pps":
|
|
return (
|
|
f"obj={req['obj_pos']} req={req['out_mv'] / 1000:.3f}V "
|
|
f"{req['op_ma'] / 1000:.3f}A{suffix}"
|
|
)
|
|
return (
|
|
f"obj={req['obj_pos']} op={req['op_ma'] / 1000:.3f}A "
|
|
f"max={req['max_ma'] / 1000:.3f}A{suffix}"
|
|
)
|
|
|
|
|
|
def print_summary(messages: list[Message], source_caps: list[Message], requests: list[Message]) -> None:
|
|
print(f"Decoded PD messages: {len(messages)}")
|
|
print()
|
|
|
|
print("Source Capabilities:")
|
|
for msg in source_caps:
|
|
print(f"{msg.time:8.3f}s VBUS={msg.vbus:6.3f}V IBUS={msg.ibus:5.3f}A")
|
|
for idx, pdo in enumerate(msg.decoded, start=1):
|
|
print(f" PDO{idx}: {format_pdo(pdo)}")
|
|
print()
|
|
|
|
print("Requests:")
|
|
for msg in requests:
|
|
print(
|
|
f"{msg.time:8.3f}s VBUS={msg.vbus:6.3f}V IBUS={msg.ibus:5.3f}A "
|
|
f"{msg.name}: {format_request(msg.decoded)}"
|
|
)
|
|
|
|
|
|
def print_window(messages: list[Message], start: float, end: float) -> None:
|
|
print()
|
|
print(f"Messages in {start:.3f}s - {end:.3f}s:")
|
|
for msg in messages:
|
|
if start <= msg.time <= end and msg.name != "GoodCRC":
|
|
print(
|
|
f"{msg.time:8.3f}s {msg.name:18} "
|
|
f"VBUS={msg.vbus:6.3f}V IBUS={msg.ibus:5.3f}A "
|
|
f"nobj={msg.header['nobj']} pd={msg.pd.hex()}"
|
|
)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Decode PD messages from a PowerZ sqlite capture."
|
|
)
|
|
parser.add_argument("sqlite", type=Path, help="Path to PowerZ .sqlite file")
|
|
parser.add_argument(
|
|
"--window",
|
|
nargs=2,
|
|
type=float,
|
|
metavar=("START", "END"),
|
|
help="Print decoded messages in a time window",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
messages = load_messages(args.sqlite)
|
|
source_caps = decode_source_caps(messages)
|
|
requests = decode_requests(messages, source_caps)
|
|
print_summary(messages, source_caps, requests)
|
|
if args.window:
|
|
print_window(messages, args.window[0], args.window[1])
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|