diff --git a/scripts/test_firmware.py b/scripts/test_firmware.py new file mode 100644 index 0000000..e078822 --- /dev/null +++ b/scripts/test_firmware.py @@ -0,0 +1,213 @@ +""" +Firmware update dry-run test for VR-N76. + +Tests layers 1 and 2 of firmware.py without touching the radio. + +Modes: + --local Use already-downloaded local files (no network needed) + --check Only run the gRPC check, don't download + --full Run gRPC check + download + assemble (default) + +Usage: + cd E:\\G14 Transfer\\Projects\\benlink + pip install grpcio bsdiff4 + python scripts/test_firmware.py --local + python scripts/test_firmware.py --full +""" + +import sys +import os +import hashlib +import argparse + +# Known-good hashes from captured v0.9.3-7 firmware +KNOWN_PATCH_MD5 = "878b35b8e06d3465484ea0ace669de62" +KNOWN_BASE_MD5 = "74b6d097d8d2d9d2d9fac88133198a08" +KNOWN_FIRMWARE_MD5 = "0c0d095da50bebe664822adcb244834a" + +# Local file paths (set to wherever yours are) +LOCAL_BASE_BIN = r"E:\firmware_capture\upgrade_base_v1.bin.zip" +LOCAL_PATCH_BIN = r"E:\firmware_capture\patch_base_to_vr_n76.bin" +LOCAL_OUTPUT_BIN = r"E:\firmware_capture\259_test.firmware" + +# Known working OSS URLs (from bugreport logcat, v0.9.3-7 / OSS v147) +KNOWN_PATCH_URL = ( + "https://pubdatas.oss-cn-shenzhen.aliyuncs.com" + "/firmware/v147/patch_base_to_vr_n76.bin" +) +KNOWN_BASE_URL = ( + "https://pubdatas.oss-cn-shenzhen.aliyuncs.com" + "/upgrade_base_v1.bin.zip" +) + +# Load firmware.py directly — avoids pulling in bleak/BLE dependencies +# through benlink/__init__.py +import importlib.util as _ilu +_fw_path = os.path.join(os.path.dirname(__file__), "..", "src", "benlink", "firmware.py") +_spec = _ilu.spec_from_file_location("benlink.firmware", os.path.abspath(_fw_path)) +_fw = _ilu.module_from_spec(_spec) +sys.modules["benlink.firmware"] = _fw # must be registered before exec so @dataclass can resolve __module__ +_spec.loader.exec_module(_fw) + +check_update = _fw.check_update +download_firmware = _fw.download_firmware +assemble_from_files = _fw.assemble_from_files +UpdateInfo = _fw.UpdateInfo +FirmwareBundle = _fw.FirmwareBundle + + +def _md5_file(path: str) -> str: + h = hashlib.md5() + with open(path, "rb") as f: + while chunk := f.read(65536): + h.update(chunk) + return h.hexdigest() + + +def _progress(stage: str, done: int, total: int): + pct = f"{done * 100 // total:3d}%" if total else "???%" + bar = "#" * (done * 30 // total) if total else "" + print(f"\r {stage:12s} {pct} [{bar:<30s}] {done//1024}KB", end="", flush=True) + if total and done >= total: + print() + + +def run_check(): + print("\n── Layer 1: gRPC check ────────────────────────────────────") + print(f" Host: rpc.benshikj.com:800 (TLS)") + print(f" Method: benshikj.APP/CheckUpdate") + print(f" Model: VR_N7600") + print(f" fw_version: V0.0.0 (triggers update response)") + + try: + info = check_update(did="", fw_version="V0.0.0") + except Exception as e: + print(f"\n ERROR: {e}") + return None + + if info is None: + print("\n Server returned haveUpdate=False or empty response.") + print(" This is expected if the DID is unrecognised by the server.") + print(f" Falling back to known URLs from bugreport logcat.") + return UpdateInfo(patch_url=KNOWN_PATCH_URL, base_url=KNOWN_BASE_URL) + + print(f"\n ✓ haveUpdate=True") + print(f" patch_url : {info.patch_url}") + print(f" base_url : {info.base_url}") + print(f" internal : {info.internal_version}") + return info + + +def run_download(info: UpdateInfo) -> FirmwareBundle: + print("\n── Layer 2: Download + assemble ───────────────────────────") + bundle = download_firmware(info, progress_cb=_progress) + print(f" Assembled : {bundle.size:,} bytes") + print(f" MD5 : {bundle.md5}") + + if bundle.md5 == KNOWN_FIRMWARE_MD5: + print(f" ✓ MD5 matches known-good v0.9.3-7 firmware") + else: + print(f" ⚠ MD5 differs from known v0.9.3-7 ({KNOWN_FIRMWARE_MD5})") + print(f" This may indicate a newer firmware version.") + + print(f" md5_tail : {bundle.md5_tail.hex()} (used in UPDATE_SYNC_REQ)") + bundle.save(LOCAL_OUTPUT_BIN) + print(f" Saved to : {LOCAL_OUTPUT_BIN}") + return bundle + + +def run_local() -> FirmwareBundle: + print("\n── Local assemble (no network) ────────────────────────────") + + # Verify input files + for path, expected_md5, label in [ + (LOCAL_PATCH_BIN, KNOWN_PATCH_MD5, "patch"), + ]: + if not os.path.exists(path): + print(f" ERROR: {path} not found") + sys.exit(1) + actual = _md5_file(path) + status = "✓" if actual == expected_md5 else "⚠ md5 mismatch" + print(f" {label:12s} {actual} {status}") + + if not os.path.exists(LOCAL_BASE_BIN): + print(f" ERROR: {LOCAL_BASE_BIN} not found") + sys.exit(1) + + # Extract base from zip and assemble + import zipfile, io, sys as _sys + sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + + try: + import bsdiff4 + except ImportError: + print(" ERROR: bsdiff4 not installed. Run: pip install bsdiff4") + sys.exit(1) + + with open(LOCAL_BASE_BIN, "rb") as f: + zip_bytes = f.read() + with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: + bin_name = next(n for n in zf.namelist() if n.endswith(".bin")) + base_bytes = zf.read(bin_name) + + base_md5 = hashlib.md5(base_bytes).hexdigest() + status = "✓" if base_md5 == KNOWN_BASE_MD5 else "⚠ md5 mismatch" + print(f" {'base':12s} {base_md5} {status}") + + with open(LOCAL_PATCH_BIN, "rb") as f: + patch_bytes = f.read() + + fw_bytes = bsdiff4.patch(base_bytes, patch_bytes) + fw_md5 = hashlib.md5(fw_bytes).hexdigest() + status = "✓" if fw_md5 == KNOWN_FIRMWARE_MD5 else "⚠ unexpected md5" + print(f"\n Assembled : {len(fw_bytes):,} bytes") + print(f" MD5 : {fw_md5} {status}") + md5_tail = bytes.fromhex(fw_md5)[-4:] + print(f" md5_tail : {md5_tail.hex()} (used in UPDATE_SYNC_REQ)") + + with open(LOCAL_OUTPUT_BIN, "wb") as f: + f.write(fw_bytes) + print(f" Saved to : {LOCAL_OUTPUT_BIN}") + + info = UpdateInfo(patch_url=KNOWN_PATCH_URL, base_url=KNOWN_BASE_URL) + return FirmwareBundle(data=fw_bytes, update_info=info) + + +def main(): + parser = argparse.ArgumentParser(description="VR-N76 firmware update dry-run") + group = parser.add_mutually_exclusive_group() + group.add_argument("--local", action="store_true", + help="Assemble from local files only (no network)") + group.add_argument("--check", action="store_true", + help="Only run gRPC check, don't download") + group.add_argument("--full", action="store_true", default=True, + help="Full check + download + assemble (default)") + args = parser.parse_args() + + print("VR-N76 Firmware Update — Dry Run") + print("=" * 50) + print(f"Radio BT MAC : 38:D2:00:00:F7:F5") + print(f"Model : VR_N7600") + print(f"Target FW : V0.9.3-7 (OSS v147, internal 259)") + + if args.local: + bundle = run_local() + elif args.check: + info = run_check() + if info: + print(f"\n patch_url : {info.patch_url}") + print(f" base_url : {info.base_url}") + return + else: + info = run_check() + if info: + bundle = run_download(info) + + print("\n── Summary ────────────────────────────────────────────────") + print(" Layers 1 + 2 complete. Firmware is ready to flash.") + print(" Next step: FirmwareUpdater (Layer 3) — GAIA BT delivery") + print(f" VM RFCOMM UUID: 00001107-D102-11E1-9B23-00025B00A5A5") + + +if __name__ == "__main__": + main() diff --git a/src/benlink/firmware.py b/src/benlink/firmware.py new file mode 100644 index 0000000..c8fd139 --- /dev/null +++ b/src/benlink/firmware.py @@ -0,0 +1,440 @@ +""" +# VR-N76 Firmware Update Support + +This module provides three layers of firmware update support: + +1. **Check** — query `rpc.benshikj.com:800` (gRPC, TLS) for available updates +2. **Download** — fetch patch + base zip from Alibaba Cloud OSS, assemble via bsdiff4 +3. **Flash** — deliver assembled firmware to the radio over the GAIA VM RFCOMM channel + (UUID `00001107-D102-11E1-9B23-00025B00A5A5`) + +Layer 3 (FirmwareUpdater) is a work-in-progress — layers 1 and 2 are fully implemented. + +## Quick start + +```python +from benlink.firmware import fetch_firmware + +bundle = fetch_firmware(fw_version="V0.9.2-7") +if bundle: + print(f"Update available: {bundle.size} bytes, md5={bundle.md5}") + # bundle.data contains the assembled .firmware bytes ready to flash +``` + +## Notes on the gRPC endpoint + +- Host: `rpc.benshikj.com:800` (TLS on port 800, not 443) +- Service: `benshikj.APP`, method: `CheckUpdate` +- The `did` field is the factory device ID (from radio Status menu S/N). + The server returns URLs regardless of whether `did` is recognized — + it returns `haveUpdate=False` only when the model+version combo has no update. +- Both the patch URL and base zip URL are returned in the response. + +## OSS URL structure (as of v0.9.3-7) + +- Patch: `https://pubdatas.oss-cn-shenzhen.aliyuncs.com/firmware/v{N}/patch_base_to_vr_n76.bin` +- Base: `https://pubdatas.oss-cn-shenzhen.aliyuncs.com/upgrade_base_v{N}.bin.zip` + +The internal version `N` (e.g. 147) differs from the user-facing version string (e.g. V0.9.3-7). +The base image version (v1 as of 2025-04-28) is independent of the firmware version — +the same base is shared across multiple firmware releases. +""" + +from __future__ import annotations + +import hashlib +import io +import zipfile +import typing as t +from dataclasses import dataclass, field + +# ── Optional dependency guards ────────────────────────────────────────────── + +def _require_grpc(): + try: + import grpc + return grpc + except ImportError: + raise ImportError( + "grpcio is required for firmware update checks. " + "Install with: pip install grpcio" + ) + + +def _require_bsdiff4(): + try: + import bsdiff4 + return bsdiff4 + except ImportError: + raise ImportError( + "bsdiff4 is required to assemble firmware. " + "Install with: pip install bsdiff4" + ) + + +# ── gRPC / proto constants ─────────────────────────────────────────────────── + +_GRPC_HOST = "rpc.benshikj.com:800" +_GRPC_METHOD = "/benshikj.APP/CheckUpdate" +_MODEL = "VR_N7600" + + +# ── Minimal proto3 wire-format encoder / decoder ──────────────────────────── +# Avoids grpcio-tools / protoc as a runtime dependency. + +def _varint(n: int) -> bytes: + out = [] + while n > 0x7F: + out.append((n & 0x7F) | 0x80) + n >>= 7 + out.append(n & 0x7F) + return bytes(out) if out else b'\x00' + + +def _encode_string_field(field_num: int, s: str) -> bytes: + b = s.encode("utf-8") + tag = (field_num << 3) | 2 # wire type 2 = length-delimited + return bytes([tag]) + _varint(len(b)) + b + + +def _encode_check_request(did: str, fw_version: str, model: str) -> bytes: + """Encode CheckFirmwareUpdateRequest { did=1, firmwareVersion=2, model=3 }""" + return ( + _encode_string_field(1, did) + + _encode_string_field(2, fw_version) + + _encode_string_field(3, model) + ) + + +def _decode_proto_fields(data: bytes) -> t.List[t.Tuple[int, int, bytes]]: + """ + Decode proto3 wire fields. + Returns list of (field_number, wire_type, raw_value_bytes). + wire_type 0 = varint, wire_type 2 = length-delimited (string/bytes/message). + """ + out: t.List[t.Tuple[int, int, bytes]] = [] + pos = 0 + n = len(data) + + def read_varint() -> int: + nonlocal pos + val = shift = 0 + while pos < n: + b = data[pos]; pos += 1 + val |= (b & 0x7F) << shift + shift += 7 + if not (b & 0x80): + return val + return val + + while pos < n: + tag = read_varint() + field_num = tag >> 3 + wire_type = tag & 0x7 + + if wire_type == 0: # varint + val = read_varint() + out.append((field_num, wire_type, _varint(val))) + elif wire_type == 2: # length-delimited + length = read_varint() + out.append((field_num, wire_type, data[pos: pos + length])) + pos += length + elif wire_type == 5: # 32-bit fixed + out.append((field_num, wire_type, data[pos: pos + 4])) + pos += 4 + elif wire_type == 1: # 64-bit fixed + out.append((field_num, wire_type, data[pos: pos + 8])) + pos += 8 + else: + break # unknown — stop gracefully + + return out + + +def _extract_urls(data: bytes) -> t.List[str]: + """ + Recursively walk proto fields and collect all HTTPS URL strings. + Works regardless of whether the server puts URLs in one or two sub-messages. + """ + urls: t.List[str] = [] + for _, wire_type, value in _decode_proto_fields(data): + if wire_type == 2: + # Try to decode as UTF-8 string first + try: + s = value.decode("utf-8") + if s.startswith("https://"): + urls.append(s) + continue + except UnicodeDecodeError: + pass + # Otherwise recurse — might be a nested message + urls.extend(_extract_urls(value)) + return urls + + +def _decode_have_update(data: bytes) -> bool: + """Extract field 1 (bool haveUpdate) from CheckFirmwareUpdateResult.""" + for field_num, wire_type, value in _decode_proto_fields(data): + if field_num == 1 and wire_type == 0: + return bool(int.from_bytes(value[:1], "little")) + return False + + +# ── Public data types ──────────────────────────────────────────────────────── + +@dataclass +class UpdateInfo: + """URLs returned by the server for an available firmware update.""" + patch_url: str + base_url: str + + @property + def internal_version(self) -> str: + """Best-effort parse of OSS internal version (e.g. 'v147').""" + import re + m = re.search(r"/firmware/(v\d+)/", self.patch_url) + return m.group(1) if m else "unknown" + + +@dataclass +class FirmwareBundle: + """Assembled, ready-to-flash firmware image.""" + data: bytes + update_info: UpdateInfo + base_md5: str = field(init=False) + + def __post_init__(self): + self.base_md5 = hashlib.md5(self.data).hexdigest() + + @property + def md5(self) -> str: + return self.base_md5 + + @property + def size(self) -> int: + return len(self.data) + + @property + def md5_tail(self) -> bytes: + """Last 4 bytes of md5 digest — used in UPDATE_SYNC_REQ.""" + return bytes.fromhex(self.base_md5)[-4:] + + def save(self, path: str) -> None: + """Write assembled firmware to disk.""" + with open(path, "wb") as f: + f.write(self.data) + + +ProgressCallback = t.Callable[[str, int, int], None] +"""progress_cb(stage: str, bytes_done: int, bytes_total: int)""" + + +# ── Layer 1: Check ─────────────────────────────────────────────────────────── + +def check_update( + did: str = "", + fw_version: str = "V0.0.0", +) -> t.Optional[UpdateInfo]: + """ + Query the Benshikj gRPC endpoint for an available firmware update. + + Args: + did: Device ID from radio Status menu (S/N field). Optional — the + server currently returns URLs regardless of DID value. + fw_version: Current firmware version string (e.g. "V0.9.2-7"). + Use "V0.0.0" to always trigger an update response. + + Returns: + UpdateInfo with patch_url and base_url if an update is available, + None if the server says haveUpdate=False or returns an empty response. + + Raises: + ImportError: if grpcio is not installed. + RuntimeError: on gRPC transport errors. + """ + grpc = _require_grpc() + + creds = grpc.ssl_channel_credentials() + channel = grpc.secure_channel(_GRPC_HOST, creds) + + try: + req_bytes = _encode_check_request(did, fw_version, _MODEL) + resp_bytes: bytes = channel.unary_unary( + _GRPC_METHOD, + request_serializer=lambda x: x, + response_deserializer=lambda x: x, + )(req_bytes, timeout=10) + except Exception as exc: + grpc_module = _require_grpc() + if isinstance(exc, grpc_module.RpcError): + raise RuntimeError( + f"gRPC error: {exc.code()} — {exc.details()}" + ) from exc + raise + finally: + channel.close() + + if not resp_bytes: + return None + + if not _decode_have_update(resp_bytes): + return None + + urls = _extract_urls(resp_bytes) + if len(urls) < 2: + return None + + patch_url = next((u for u in urls if "patch" in u), urls[0]) + base_url = next((u for u in urls if "base" in u and "patch" not in u), + next((u for u in urls if u != patch_url), None)) + + if not base_url: + return None + + return UpdateInfo(patch_url=patch_url, base_url=base_url) + + +# ── Layer 2: Download + assemble ───────────────────────────────────────────── + +def download_firmware( + update_info: UpdateInfo, + progress_cb: t.Optional[ProgressCallback] = None, +) -> FirmwareBundle: + """ + Download patch + base zip from OSS and assemble the final firmware image. + + Args: + update_info: URLs returned by check_update(). + progress_cb: Optional progress callback (stage, bytes_done, bytes_total). + + Returns: + FirmwareBundle with assembled firmware bytes and md5. + + Raises: + ImportError: if bsdiff4 is not installed. + RuntimeError: on download or patch errors. + """ + import urllib.request + + bsdiff4 = _require_bsdiff4() + + def _fetch(url: str, label: str) -> bytes: + with urllib.request.urlopen(url) as resp: + total = int(resp.headers.get("Content-Length", 0)) + chunks: t.List[bytes] = [] + received = 0 + while True: + chunk = resp.read(65536) + if not chunk: + break + chunks.append(chunk) + received += len(chunk) + if progress_cb: + progress_cb(label, received, total) + return b"".join(chunks) + + patch_bytes = _fetch(update_info.patch_url, "patch") + base_zip_bytes = _fetch(update_info.base_url, "base_zip") + + # Extract upgrade_base.bin from the zip + with zipfile.ZipFile(io.BytesIO(base_zip_bytes)) as zf: + bin_names = [n for n in zf.namelist() if n.endswith(".bin")] + if not bin_names: + raise RuntimeError("No .bin file found inside base zip") + base_bytes = zf.read(bin_names[0]) + + # Apply BSDIFF40 patch + if not patch_bytes[:8] == b"BSDIFF40": + raise RuntimeError( + f"Unexpected patch magic: {patch_bytes[:8]!r} (expected b'BSDIFF40')" + ) + + firmware_bytes = bsdiff4.patch(base_bytes, patch_bytes) + + return FirmwareBundle(data=firmware_bytes, update_info=update_info) + + +# ── Layer 2 (offline): assemble from local files ───────────────────────────── + +def assemble_from_files( + base_bin_path: str, + patch_bin_path: str, +) -> bytes: + """ + Assemble firmware locally from already-downloaded files. + Useful for testing without hitting the network. + + Args: + base_bin_path: Path to upgrade_base.bin (extracted from zip). + patch_bin_path: Path to patch_base_to_vr_n76.bin. + + Returns: + Assembled firmware bytes. + """ + bsdiff4 = _require_bsdiff4() + + with open(base_bin_path, "rb") as f: + base_bytes = f.read() + with open(patch_bin_path, "rb") as f: + patch_bytes = f.read() + + if patch_bytes[:8] != b"BSDIFF40": + raise RuntimeError( + f"Unexpected patch magic: {patch_bytes[:8]!r}" + ) + + return bsdiff4.patch(base_bytes, patch_bytes) + + +# ── Layer 1+2 combined ─────────────────────────────────────────────────────── + +def fetch_firmware( + did: str = "", + fw_version: str = "V0.0.0", + progress_cb: t.Optional[ProgressCallback] = None, +) -> t.Optional[FirmwareBundle]: + """ + Check for an update and, if one is available, download and assemble it. + + Args: + did: Device ID (radio S/N). Optional. + fw_version: Current firmware version. Use "V0.0.0" to always fetch. + progress_cb: Optional (stage, bytes_done, bytes_total) callback. + + Returns: + FirmwareBundle if an update is available and downloaded, else None. + """ + info = check_update(did=did, fw_version=fw_version) + if info is None: + return None + return download_firmware(info, progress_cb=progress_cb) + + +# ── Layer 3: GAIA BT delivery ───────────────────────────────────────────────── +# TODO: implement FirmwareUpdater state machine +# +# The GAIA firmware update uses a SEPARATE RFCOMM channel from the main +# command channel: +# VM UUID: 00001107-D102-11E1-9B23-00025B00A5A5 +# +# Protocol flow (from vm.py): +# VM_CONNECT +# → UPDATE_SYNC_REQ (md5_tail = bundle.md5_tail) +# ← UPDATE_SYNC_CFM +# → UPDATE_START_REQ +# ← UPDATE_START_CFM (code=OK → continue, code=GOTO_NEXT_STATE → jump to reconnect) +# → UPDATE_DATA_START_REQ +# ← UPDATE_DATA_BYTES_REQ (device requests N bytes at offset skip) +# → UPDATE_DATA (145-byte chunks, is_final_fragment=True on last) +# → UPDATE_IS_VALIDATION_DONE_REQ +# ← UPDATE_TRANSFER_COMPLETE_IND +# → UPDATE_TRANSFER_COMPLETE_RES(is_complete=True) +# [radio reboots] +# VM_CONNECT +# → UPDATE_SYNC_REQ +# ← UPDATE_SYNC_CFM +# → UPDATE_START_REQ +# ← UPDATE_START_CFM (GOTO_NEXT_STATE) +# → UPDATE_IN_PROGRESS_RES +# ← UPDATE_COMPLETE_IND +# VM_DISCONNECT diff --git a/src/benlink/firmware_updater.py b/src/benlink/firmware_updater.py new file mode 100644 index 0000000..8703b44 --- /dev/null +++ b/src/benlink/firmware_updater.py @@ -0,0 +1,431 @@ +""" +Layer 3: GAIA BT firmware delivery over the VM RFCOMM channel. + +The update is split into two phases separated by a radio reboot: + +**Phase 1 — transfer:** + VM_CONNECT → UPDATE_SYNC_REQ → UPDATE_START_REQ → + UPDATE_DATA_START_REQ → [chunk loop driven by UPDATE_DATA_BYTES_REQ] → + UPDATE_IS_VALIDATION_DONE_REQ → UPDATE_TRANSFER_COMPLETE_RES + (radio reboots, BT connection drops) + +**Phase 2 — confirm** (after reconnect): + VM_CONNECT → UPDATE_SYNC_REQ → UPDATE_START_REQ (GOTO_NEXT_STATE) → + UPDATE_IN_PROGRESS_RES → UPDATE_COMPLETE_IND → VM_DISCONNECT + +Usage:: + + import asyncio + from benlink.firmware import fetch_firmware + from benlink.firmware_updater import FirmwareUpdater + from benlink.command import CommandConnection + + async def update_radio(mac: str, channel: int): + # Fetch firmware from Benshikj OSS (layers 1+2) + bundle = fetch_firmware(fw_version="V0.9.2-7") + if bundle is None: + print("No update available") + return + + # Phase 1 — transfer + async with CommandConnection.new_rfcomm(mac, channel) as conn: + updater = FirmwareUpdater(conn, bundle, + progress_cb=lambda s, d, t: print(f"{s} {d}/{t}")) + await updater.transfer() + # Radio reboots here; RFCOMM connection will drop shortly after + + print("Transfer complete — waiting for radio to reboot…") + await asyncio.sleep(20) + + # Phase 2 — confirm (fresh connection) + async with CommandConnection.new_rfcomm(mac, channel) as conn: + await FirmwareUpdater.confirm(conn, bundle) + + print("Firmware update complete!") + + asyncio.run(update_radio("38:D2:00:00:F7:F5", channel=1)) + +Notes +----- +- Uses the existing RFCOMM command connection (same channel as normal radio comms). + The VM channel UUID 00001107-D102-11E1-9B23-00025B00A5A5 is the GAIA VM RFCOMM + *service* UUID used for SDP discovery; the actual communication goes over the + already-connected GAIA command channel. +- Chunk size is device-driven via UPDATE_DATA_BYTES_REQ; no hard-coded 145-byte + assumption is made. +- n_bytes_skip in UPDATE_DATA_BYTES_REQ supports mid-update resume; implemented + defensively but not exercised in normal logs. +""" + +from __future__ import annotations + +import asyncio +import typing as t + +from .firmware import FirmwareBundle, ProgressCallback +from .command import UnknownProtocolMessage + +from .protocol.command.message import Message, CommandGroup, ExtendedCommand +from .protocol.command.common import ReplyStatus +from .protocol.command.bt_notification import BtEventNotificationBody, BtEventType +from .protocol.command.vm import ( + UpdateStartCfmCode, + VmConnectBody, + VmDisconnectBody, + VmControlBody, + VmControlType, + VmuPacketType, + VmControlUpdateSyncReq, + VmControlUpdateStartReq, + VmControlUpdateDataStartReq, + VmControlUpdateData, + VmControlUpdateIsValidationDoneReq, + VmControlUpdateTransferCompleteRes, + VmControlUpdateInProgressRes, + VmControlUpdateAbortReq, +) + +# ── Timeouts ────────────────────────────────────────────────────────────────── + +_VM_REPLY_TIMEOUT: float = 15.0 # VM_CONNECT / VM_DISCONNECT reply +_VMU_TIMEOUT: float = 30.0 # General VMU_PACKET confirmation +_CHUNK_TIMEOUT: float = 60.0 # UPDATE_DATA_BYTES_REQ between chunks +_VALIDATION_TIMEOUT: float = 120.0 # UPDATE_TRANSFER_COMPLETE_IND (CRC check) +_COMPLETE_TIMEOUT: float = 120.0 # UPDATE_COMPLETE_IND after post-reboot confirm + + +# ── Low-level message builders ──────────────────────────────────────────────── + +def _msg_vm_connect() -> Message: + return Message( + command_group=CommandGroup.EXTENDED, + is_reply=False, + command=ExtendedCommand.VM_CONNECT, + body=VmConnectBody(), + ) + + +def _msg_vm_disconnect() -> Message: + return Message( + command_group=CommandGroup.EXTENDED, + is_reply=False, + command=ExtendedCommand.VM_DISCONNECT, + body=VmDisconnectBody(), + ) + + +def _msg_vm_control( + ctrl_type: VmControlType, + inner: t.Any, + n_bytes_payload: int, +) -> Message: + return Message( + command_group=CommandGroup.EXTENDED, + is_reply=False, + command=ExtendedCommand.VM_CONTROL, + body=VmControlBody( + vm_control_type=ctrl_type, + n_bytes_payload=n_bytes_payload, + msg=inner, + ), + ) + + +# ── Main class ──────────────────────────────────────────────────────────────── + +class FirmwareUpdater: + """ + GAIA BT firmware delivery state machine (Layer 3). + + Accepts a ``CommandConnection`` (from :mod:`benlink.command`) that is + already connected over RFCOMM. VM_CONNECT / VM_CONTROL messages are sent + over this same connection; VMU_PACKET replies arrive as + ``BT_EVENT_NOTIFICATION`` events on the same channel. + + See module docstring for a complete usage example. + """ + + def __init__( + self, + conn: t.Any, # benlink.command.CommandConnection; t.Any avoids bleak import + bundle: FirmwareBundle, + progress_cb: t.Optional[ProgressCallback] = None, + ) -> None: + self._conn = conn + self._bundle = bundle + self._progress_cb = progress_cb + + # ── Private transport helpers ───────────────────────────────────────────── + + async def _send(self, msg: Message) -> None: + """Send a raw protocol Message via the underlying link.""" + await self._conn._link.send(msg) + + async def _wait_vm_reply( + self, + command: ExtendedCommand, + timeout: float = _VM_REPLY_TIMEOUT, + ) -> t.Any: + """ + Wait for the ``is_reply=True`` acknowledgement for a VM extended command. + + Returns the parsed body object (e.g. ``VmConnectReplyBody``). + Raises ``RuntimeError`` on timeout. + """ + queue: asyncio.Queue[t.Any] = asyncio.Queue() + + def _handler(radio_msg: t.Any) -> None: + if not isinstance(radio_msg, UnknownProtocolMessage): + return + proto = radio_msg.message + if ( + proto.is_reply + and proto.command_group == CommandGroup.EXTENDED + and proto.command == command + ): + queue.put_nowait(proto.body) + + remove = self._conn._add_message_handler(_handler) + try: + return await asyncio.wait_for(queue.get(), timeout=timeout) + except asyncio.TimeoutError: + raise RuntimeError( + f"Timed out ({timeout}s) waiting for {command.name} reply" + ) + finally: + remove() + + async def _wait_vmu( + self, + expected_type: VmuPacketType, + timeout: float = _VMU_TIMEOUT, + ) -> t.Any: + """ + Wait for a ``BT_EVENT_NOTIFICATION / VMU_PACKET`` of ``expected_type``. + + Returns the inner ``msg`` object of the matching ``VmuPacket`` + (e.g. ``VmControlUpdateStartCfm``, ``VmControlUpdateDataBytesReq``). + Raises ``RuntimeError`` on timeout. + """ + queue: asyncio.Queue[t.Any] = asyncio.Queue() + + def _handler(radio_msg: t.Any) -> None: + if not isinstance(radio_msg, UnknownProtocolMessage): + return + body = radio_msg.message.body + if not isinstance(body, BtEventNotificationBody): + return + if body.bt_event_type != BtEventType.VMU_PACKET: + return + vmu = body.bt_event + if vmu.vmu_packet_type == expected_type: + queue.put_nowait(vmu.msg) + + remove = self._conn._add_message_handler(_handler) + try: + return await asyncio.wait_for(queue.get(), timeout=timeout) + except asyncio.TimeoutError: + raise RuntimeError( + f"Timed out ({timeout}s) waiting for VMU {expected_type.name}" + ) + finally: + remove() + + # ── Protocol step helpers ───────────────────────────────────────────────── + + async def _step_vm_connect(self) -> None: + await self._send(_msg_vm_connect()) + reply = await self._wait_vm_reply(ExtendedCommand.VM_CONNECT) + if hasattr(reply, "status") and reply.status != ReplyStatus.SUCCESS: + raise RuntimeError(f"VM_CONNECT rejected: {reply.status.name}") + + async def _step_vm_disconnect(self) -> None: + await self._send(_msg_vm_disconnect()) + + async def _step_sync(self, md5_tail: bytes) -> None: + """Send UPDATE_SYNC_REQ; wait for UPDATE_SYNC_CFM.""" + await self._send(_msg_vm_control( + VmControlType.UPDATE_SYNC_REQ, + VmControlUpdateSyncReq(md5sum_tail=md5_tail), + n_bytes_payload=4, + )) + await self._wait_vmu(VmuPacketType.UPDATE_SYNC_CFM) + + async def _step_start(self) -> UpdateStartCfmCode: + """Send UPDATE_START_REQ; return cfm_code from UPDATE_START_CFM.""" + await self._send(_msg_vm_control( + VmControlType.UPDATE_START_REQ, + VmControlUpdateStartReq(), + n_bytes_payload=0, + )) + cfm = await self._wait_vmu(VmuPacketType.UPDATE_START_CFM) + return cfm.cfm_code + + async def _step_data_start(self) -> None: + """Send UPDATE_DATA_START_REQ (no VMU reply; device follows with BYTES_REQ).""" + await self._send(_msg_vm_control( + VmControlType.UPDATE_START_DATA_REQ, + VmControlUpdateDataStartReq(), + n_bytes_payload=0, + )) + + async def _step_abort(self) -> None: + """Send UPDATE_ABORT_REQ (best-effort; no reply wait).""" + try: + await self._send(_msg_vm_control( + VmControlType.UPDATE_ABORT_REQ, + VmControlUpdateAbortReq(), + n_bytes_payload=0, + )) + except Exception: + pass + + # ── Public API ──────────────────────────────────────────────────────────── + + async def transfer(self) -> None: + """ + Phase 1: Transfer the firmware image to the radio. + + The method returns once the radio has acknowledged the complete + transfer (``UPDATE_TRANSFER_COMPLETE_RES`` sent). Shortly afterwards + the radio will reboot and the BT connection will drop. + + Typical flow after calling this method: + + 1. Disconnect / close the ``CommandConnection``. + 2. ``await asyncio.sleep(15)`` (or poll until BT reappears). + 3. Reconnect on the same RFCOMM channel. + 4. Call :meth:`confirm` on the new connection. + + Raises + ------ + RuntimeError + On any protocol error or timeout. + """ + fw = self._bundle.data + total = len(fw) + md5_tail = self._bundle.md5_tail + + try: + # ── Phase 1a: handshake ─────────────────────────────────────────── + await self._step_vm_connect() + await self._step_sync(md5_tail) + + cfm_code = await self._step_start() + if cfm_code == UpdateStartCfmCode.GOTO_NEXT_STATE: + raise RuntimeError( + "UPDATE_START_CFM returned GOTO_NEXT_STATE before transfer. " + "The radio may already be partway through an update. " + "Power-cycle the radio and retry, or call confirm() if a " + "previous transfer completed." + ) + + # ── Phase 1b: data transfer (device-driven chunking) ────────────── + await self._step_data_start() + + offset = 0 + while offset < total: + req = await self._wait_vmu( + VmuPacketType.UPDATE_DATA_BYTES_REQ, + timeout=_CHUNK_TIMEOUT, + ) + n = req.n_bytes_requested + skip = req.n_bytes_skip # non-zero only on resume + offset += skip + + chunk = fw[offset: offset + n] + if not chunk: + raise RuntimeError( + f"Device requested {n} bytes at offset {offset} " + f"but firmware is only {total} bytes" + ) + + is_last = (offset + len(chunk) >= total) + + await self._send(_msg_vm_control( + VmControlType.UPDATE_DATA, + VmControlUpdateData( + is_final_fragment=is_last, + data=chunk, + ), + n_bytes_payload=1 + len(chunk), + )) + + offset += len(chunk) + + if self._progress_cb: + self._progress_cb("flash", offset, total) + + # ── Phase 1c: validation & transfer-complete ────────────────────── + await self._send(_msg_vm_control( + VmControlType.UPDATE_IS_VALIDATION_DONE_REQ, + VmControlUpdateIsValidationDoneReq(), + n_bytes_payload=0, + )) + await self._wait_vmu( + VmuPacketType.UPDATE_TRANSFER_COMPLETE_IND, + timeout=_VALIDATION_TIMEOUT, + ) + + # Tell the radio the transfer is complete — it will now reboot + await self._send(_msg_vm_control( + VmControlType.UPDATE_TRANSFER_COMPLETE_RES, + VmControlUpdateTransferCompleteRes(is_complete=True), + n_bytes_payload=1, + )) + + except Exception: + await self._step_abort() + raise + + @staticmethod + async def confirm( + conn: t.Any, # CommandConnection (fresh, post-reboot) + bundle: FirmwareBundle, + ) -> None: + """ + Phase 2: Confirm the completed update after the radio reboots. + + Call on a *new* ``CommandConnection`` after reconnecting post-reboot. + The radio expects ``UPDATE_START_CFM`` to return ``GOTO_NEXT_STATE`` + at this point, indicating it is ready to finalise the update. + + Parameters + ---------- + conn: + A freshly connected ``CommandConnection`` (RFCOMM). + bundle: + The same ``FirmwareBundle`` used in :meth:`transfer` + (needed for the md5_tail in UPDATE_SYNC_REQ). + + Raises + ------ + RuntimeError + On any protocol error or timeout. + """ + updater = FirmwareUpdater(conn, bundle) + + await updater._step_vm_connect() + await updater._step_sync(bundle.md5_tail) + + cfm_code = await updater._step_start() + if cfm_code != UpdateStartCfmCode.GOTO_NEXT_STATE: + raise RuntimeError( + f"Expected GOTO_NEXT_STATE in post-reboot UPDATE_START_CFM, " + f"got {cfm_code.name}. The radio may not have rebooted yet." + ) + + # Signal that the update is in progress (finalising) + await updater._send(_msg_vm_control( + VmControlType.UPDATE_IN_PROGRESS_RES, + VmControlUpdateInProgressRes(), + n_bytes_payload=1, + )) + + # Wait for the radio to confirm the update is fully applied + await updater._wait_vmu( + VmuPacketType.UPDATE_COMPLETE_IND, + timeout=_COMPLETE_TIMEOUT, + ) + + await updater._step_vm_disconnect() diff --git a/src/benlink/protocol/command/bt_notification.py b/src/benlink/protocol/command/bt_notification.py new file mode 100644 index 0000000..52a1026 --- /dev/null +++ b/src/benlink/protocol/command/bt_notification.py @@ -0,0 +1,44 @@ +from __future__ import annotations +from .bitfield import Bitfield, bf_int_enum, bf_dyn, bf_bytes, bf_bitfield +from enum import IntEnum +from .vm import VmuPacket + +################################################# +# BT_EVENT_NOTIFICATION + + +class BtEventType(IntEnum): + START = 0 + RSSI_LOW_THRESHOLD = 1 + RSSI_HIGH_THRESHOLD = 2 + BATTERY_LOW_THRESHOLD = 3 + BATTERY_HIGH_THRESHOLD = 4 + DEVICE_STATE_CHANGED = 5 + PIO_CHANGED = 6 + DEBUG_MESSAGE = 7 + BATTERY_CHARGED = 8 + CHARGER_CONNECTION = 9 + CAPSENSE_UPDATE = 10 + USER_ACTION = 11 + SPEECH_RECOGNITION = 12 + AV_COMMAND = 13 + REMOTE_BATTERY_LEVEL = 14 + KEY = 15 + DFU_STATE = 16 + UART_RECEIVED_DATA = 17 + VMU_PACKET = 18 + + +def bt_event_disc(m: BtEventNotificationBody, n: int): + match m.bt_event_type: + case BtEventType.VMU_PACKET: + out = VmuPacket + case _: + return bf_bytes(n // 8) + + return bf_bitfield(out, n) + + +class BtEventNotificationBody(Bitfield): + bt_event_type: BtEventType = bf_int_enum(BtEventType, 8) + bt_event: VmuPacket | bytes = bf_dyn(bt_event_disc) diff --git a/src/benlink/protocol/command/dev_state_var.py b/src/benlink/protocol/command/dev_state_var.py deleted file mode 100644 index 5eed02d..0000000 --- a/src/benlink/protocol/command/dev_state_var.py +++ /dev/null @@ -1,28 +0,0 @@ -from enum import IntEnum - -################################################# -# GET_DEV_STATE_VAR - - -class DevStateVar(IntEnum): - START = 0 - RSSI_LOW_THRESHOLD = 1 - RSSI_HIGH_THRESHOLD = 2 - BATTERY_LOW_THRESHOLD = 3 - BATTERY_HIGH_THRESHOLD = 4 - DEVICE_STATE_CHANGED = 5 - PIO_CHANGED = 6 - DEBUG_MESSAGE = 7 - BATTERY_CHARGED = 8 - CHARGER_CONNECTION = 9 - CAPSENSE_UPDATE = 10 - USER_ACTION = 11 - SPEECH_RECOGNITION = 12 - AV_COMMAND = 13 - REMOTE_BATTERY_LEVEL = 14 - KEY = 15 - DFU_STATE = 16 - UART_RECEIVED_DATA = 17 - VMU_PACKET = 18 - -# TODO diff --git a/src/benlink/protocol/command/message.py b/src/benlink/protocol/command/message.py index 3228967..097062d 100644 --- a/src/benlink/protocol/command/message.py +++ b/src/benlink/protocol/command/message.py @@ -29,6 +29,14 @@ ) from .phone_status import SetPhoneStatusBody, SetPhoneStatusReplyBody from .status import GetHtStatusBody, GetHtStatusReplyBody +from .vm import ( + VmControlBody, VmControlReplyBody, + VmConnectBody, VmConnectReplyBody, + VmDisconnectBody, VmDisconnectReplyBody, +) +from .bt_notification import ( + BtEventNotificationBody +) from .position import GetPositionBody, GetPositionReplyBody @@ -39,13 +47,13 @@ class CommandGroup(IntEnum): class ExtendedCommand(IntEnum): UNKNOWN = 0 + VM_CONNECT = 1600 + VM_DISCONNECT = 1601 + VM_CONTROL = 1602 GET_BT_SIGNAL = 769 - UNKNOWN_01 = 1600 - UNKNOWN_02 = 1601 - UNKNOWN_03 = 1602 - UNKNOWN_04 = 16385 - UNKNOWN_05 = 16386 - GET_DEV_STATE_VAR = 16387 + REGISTER_BT_NOTIFICATION = 16385 + CANCEL_BT_NOTIFICATION = 16386 + BT_EVENT_NOTIFICATION = 16387 DEV_REGISTRATION = 1825 @classmethod @@ -187,6 +195,18 @@ def body_disc(m: Message, n: int): return bf_bytes(n // 8) case CommandGroup.EXTENDED: match m.command: + case ExtendedCommand.VM_CONTROL: + out = VmControlReplyBody if m.is_reply else VmControlBody + case ExtendedCommand.VM_CONNECT: + out = VmConnectReplyBody if m.is_reply else VmConnectBody + case ExtendedCommand.VM_DISCONNECT: + out = VmDisconnectReplyBody if m.is_reply else VmDisconnectBody + case ExtendedCommand.BT_EVENT_NOTIFICATION: + if m.is_reply: + raise ValueError( + "BtEventNotification cannot be a reply" + ) + out = BtEventNotificationBody case _: return bf_bytes(n // 8) @@ -220,6 +240,13 @@ def body_disc(m: Message, n: int): SetPhoneStatusReplyBody, GetHtStatusBody, GetHtStatusReplyBody, + VmControlBody, + VmControlReplyBody, + VmConnectBody, + VmConnectReplyBody, + VmDisconnectBody, + VmDisconnectReplyBody, + BtEventNotificationBody, GetPositionReplyBody, GetPositionBody, ] @@ -229,4 +256,4 @@ class Message(Bitfield): command_group: CommandGroup = bf_int_enum(CommandGroup, 16) is_reply: bool = bf_bool() command: BasicCommand | ExtendedCommand = bf_dyn(frame_type_disc) - body: MessageBody | bytes = bf_dyn(body_disc) + body: MessageBody | bytes = bf_dyn(body_disc) \ No newline at end of file diff --git a/src/benlink/protocol/command/phone_status.py b/src/benlink/protocol/command/phone_status.py index 3f771f5..d94a3f3 100644 --- a/src/benlink/protocol/command/phone_status.py +++ b/src/benlink/protocol/command/phone_status.py @@ -1,10 +1,10 @@ from __future__ import annotations -from .bitfield import Bitfield, bf_lit_int, bf_int_enum, bf_list, bf_bool +from .bitfield import Bitfield, bf_lit_int, bf_int_enum, bf_list, bf_bool, bf_dyn, bf_bytes import typing as t from .common import ReplyStatus -class SetPhoneStatusBody(Bitfield): +class PhoneStatus(Bitfield): is_channel_bonded_lower: t.List[bool] = bf_list(bf_bool(), 16) is_linked: bool _pad: t.Literal[0] = bf_lit_int(1, default=0) @@ -12,5 +12,18 @@ class SetPhoneStatusBody(Bitfield): _pad2: t.Literal[0] = bf_lit_int(14, default=0) +def phone_status_disc(_: SetPhoneStatusBody, n: int): + if n == PhoneStatus.length(): + return PhoneStatus + + # TODO: There's a 32 bit version of phone status that popped up in + # uv-pro 0.7.9-32 upgrade firmware. I'll need to see what it is... + return bf_bytes(n // 8) + + +class SetPhoneStatusBody(Bitfield): + phone_status: PhoneStatus | bytes = bf_dyn(phone_status_disc) + + class SetPhoneStatusReplyBody(Bitfield): reply_status: ReplyStatus = bf_int_enum(ReplyStatus, 8) diff --git a/src/benlink/protocol/command/vm.py b/src/benlink/protocol/command/vm.py new file mode 100644 index 0000000..5f3589f --- /dev/null +++ b/src/benlink/protocol/command/vm.py @@ -0,0 +1,285 @@ +from __future__ import annotations +import typing as t +from .bitfield import Bitfield, bf_int_enum, bf_int, bf_bytes, bf_dyn, bf_map, bf_bitfield, bf_lit_int +from .common import ReplyStatus +from enum import IntEnum + +##################################################################### +# Order of events in a firmware update: +# +# 1. VM_CONNECT +# 2. VM_CONTROL: +# a. UPDATE_SYNC_REQ (UPDATE_SYNC_CFM) (with last 4 bytes of firmware md5sum) +# b. UPDATE_START_REQ (UPDATE_START_CFM) +# c. UPDATE_DATA_START_REQ +# d. (UPDATE_DATA_BYTES_REQ) UPDATE_DATA (145 bytes at a time. repeat until all data is sent, except for the last fragment) +# e. UPDATE_DATA (final fragment with is_final_fragment=True) +# f. UPDATE_IS_VALIDATION_DONE_REQ (UPDATE_TRANSFER_COMPLETE_IND) +# g. UPDATE_TRANSFER_COMPLETE_RES (triggers REStart?) +# +# Reboot happens? +# +# 3. VM_CONNECT +# h. UPDATE_SYNC_REQ (UPDATE_SYNC_CFM) (with last 4 bytes of firmware md5sum) +# i. UPDATE_START_REQ (UPDATE_START_CFM) +# j. UPDATE_IN_PROGRESS_RES (UPDATE_COMPLETE_IND) +# 4. VM_DISCONNECT + +##################################################################### +# Order of events in an aborted firmware update: +# +# 1. VM_CONNECT +# 2. VM_CONTROL: +# a. UPDATE_SYNC_REQ (UPDATE_SYNC_CFM) +# b. UPDATE_START_REQ (UPDATE_START_CFM) +# c. UPDATE_DATA_START_REQ +# d. (UPDATE_DATA_BYTES_REQ) UPDATE_DATA +# e. UPDATE_ABORT_REQ (UPDATE_ABORT_CFM) +# 3. VM_DISCONNECT + + +class VmControlType(IntEnum): + # Command from the app to the device + + # Regular firmware update flow + UPDATE_SYNC_REQ = 19 + UPDATE_START_REQ = 1 + UPDATE_START_DATA_REQ = 21 + UPDATE_DATA = 4 + UPDATE_IS_VALIDATION_DONE_REQ = 22 + UPDATE_TRANSFER_COMPLETE_RES = 12 + UPDATE_IN_PROGRESS_RES = 14 + UPDATE_ABORT_REQ = 7 + + # This looks like a fancy way of aborting when + # you get an error code in the update process + # looks like you always just send one after the other + # with the same error code? + UPDATE_ABORT_WITH_CODE_1_REQ = 31 + UPDATE_ABORT_WITH_CODE_2_REQ = 32 + + # Not used in regular firmware update? + # It seems like there's a hidden debug firmware GUI + # in the app somewhere that can send these commands + UPDATE_COMMIT_CFM = 16 + UPDATE_ERASE_SQIF_CFM = 30 + + +class VmuPacketType(IntEnum): + # Replies to commands from the VMU_PACKET BT notifications + UPDATE_START_CFM = 2 + UPDATE_DATA_BYTES_REQ = 3 + UPDATE_ABORT_CFM = 8 + UPDATE_TRANSFER_COMPLETE_IND = 11 + UPDATE_SYNC_CFM = 20 + UPDATE_COMPLETE_IND = 18 + UPDATE_ERROR = 17 # Not seen in logs + UPDATE_IS_VALIDATION_DONE_CFM = 23 # Not seen in logs + UPDATE_COMMIT_ERASE_SQIF_RES = 29 # Not seen in logs + UPDATE_COMMIT_RES = 15 # Not seen in logs + + +class BoolTransform: + def forward(self, x: int) -> bool: + return bool(x) + + def back(self, y: bool) -> int: + return int(y) + + +bf_bool_byte = bf_map(bf_int(8), BoolTransform()) + + +class VmControlUpdateSyncReq(Bitfield): + md5sum_tail: bytes = bf_bytes(4) + + +class VmControlUpdateStartReq(Bitfield): + pass + + +class VmControlUpdateDataStartReq(Bitfield): + pass + + +class VmControlUpdateData(Bitfield): + is_final_fragment: bool = bf_bool_byte + data: bytes = bf_dyn(lambda _, n: bf_bytes(n // 8)) + + +class VmControlUpdateIsValidationDoneReq(Bitfield): + pass + + +class VmControlUpdateTransferCompleteRes(Bitfield): + is_complete: bool = bf_bool_byte + + +class VmControlUpdateInProgressRes(Bitfield): + _pad: t.Literal[0] = bf_lit_int(8, default=0) + + +class VmControlUpdateAbortReq(Bitfield): + pass + + +class UpdateState(IntEnum): + DATA_TRANSFER = 0 + VALIDATION = 1 + TRANSFER_COMPLETE = 2 + IN_PROGRESS = 3 + COMMIT = 4 + + +class UpdateStartCfmCode(IntEnum): + OK = 0 + GOTO_NEXT_STATE = 9 + + +class UpdateError(IntEnum): + UNKNOWN = 0 + BATTERY_LOW = 33 + SYNC_IS_DIFFERENT = 129 + + @classmethod + def _missing_(cls, value: object): + import sys + print(f"Unknown value for {cls.__name__}: {value}", file=sys.stderr) + return cls.UNKNOWN + +# Messages from VMU_PACKET + + +class VmControlUpdateSyncCfm(Bitfield): + update_state: UpdateState = bf_int_enum(UpdateState, 8) + md5sum_tail: bytes = bf_bytes(4) + unknown: bytes = bf_bytes(1) + + +class VmControlUpdateStartCfm(Bitfield): + cfm_code: UpdateStartCfmCode = bf_int_enum(UpdateStartCfmCode, 8) + unknown: bytes = bf_bytes(2) + + +class VmControlUpdateCompleteInd(Bitfield): + pass + + +class VmControlUpdateTransferCompleteInd(Bitfield): + pass + + +class VmControlUpdateAbortCfm(Bitfield): + pass + + +class VmControlUpdateError(Bitfield): + update_error: UpdateError = bf_int_enum(UpdateError, 16) + + +class VmControlUpdateDataBytesReq(Bitfield): + # The max bytes requested that the HT app allows is 250 + n_bytes_requested: int = bf_int(32) + # Skip allows for resuming a firmware update maybe? + # I don't see it used in any of my logs + n_bytes_skip: int = bf_int(32) + + +def vm_control_disc(m: VmControlBody): + match m.vm_control_type: + case VmControlType.UPDATE_SYNC_REQ: + out = VmControlUpdateSyncReq + case VmControlType.UPDATE_START_REQ: + out = VmControlUpdateStartReq + case VmControlType.UPDATE_START_DATA_REQ: + out = VmControlUpdateDataStartReq + case VmControlType.UPDATE_DATA: + out = VmControlUpdateData + case VmControlType.UPDATE_IS_VALIDATION_DONE_REQ: + out = VmControlUpdateIsValidationDoneReq + case VmControlType.UPDATE_TRANSFER_COMPLETE_RES: + out = VmControlUpdateTransferCompleteRes + case VmControlType.UPDATE_IN_PROGRESS_RES: + out = VmControlUpdateInProgressRes + case VmControlType.UPDATE_ABORT_REQ: + out = VmControlUpdateAbortReq + case _: + return bf_bytes(m.n_bytes_payload) + + return bf_bitfield(out, m.n_bytes_payload*8) + + +def vmu_packet_desc(m: VmuPacket): + match m.vmu_packet_type: + case VmuPacketType.UPDATE_DATA_BYTES_REQ: + out = VmControlUpdateDataBytesReq + case VmuPacketType.UPDATE_SYNC_CFM: + out = VmControlUpdateSyncCfm + case VmuPacketType.UPDATE_COMPLETE_IND: + out = VmControlUpdateCompleteInd + case VmuPacketType.UPDATE_TRANSFER_COMPLETE_IND: + out = VmControlUpdateTransferCompleteInd + case VmuPacketType.UPDATE_START_CFM: + out = VmControlUpdateStartCfm + case VmuPacketType.UPDATE_ERROR: + out = VmControlUpdateError + case VmuPacketType.UPDATE_ABORT_CFM: + out = VmControlUpdateAbortCfm + case _: + return bf_bytes(m.n_bytes_payload) + + return bf_bitfield(out, m.n_bytes_payload*8) + + +VmControlMessage = t.Union[ + VmControlUpdateSyncReq, + VmControlUpdateStartReq, + VmControlUpdateDataStartReq, + VmControlUpdateData, + VmControlUpdateIsValidationDoneReq, + VmControlUpdateTransferCompleteRes, + VmControlUpdateInProgressRes, + VmControlUpdateAbortReq, +] + +VmuPacketMessage = t.Union[ + VmControlUpdateDataBytesReq, + VmControlUpdateSyncCfm, + VmControlUpdateCompleteInd, + VmControlUpdateTransferCompleteInd, + VmControlUpdateStartCfm, + VmControlUpdateError, + VmControlUpdateAbortCfm, +] + + +class VmControlBody(Bitfield): + vm_control_type: VmControlType = bf_int_enum(VmControlType, 8) + n_bytes_payload: int = bf_int(16) + msg: VmControlMessage | bytes = bf_dyn(vm_control_disc) + + +class VmuPacket(Bitfield): + vmu_packet_type: VmuPacketType = bf_int_enum(VmuPacketType, 8) + n_bytes_payload: int = bf_int(16) + msg: VmuPacketMessage | bytes = bf_dyn(vmu_packet_desc) + + +class VmControlReplyBody(Bitfield): + status: ReplyStatus = bf_int_enum(ReplyStatus, 8) + + +class VmConnectBody(Bitfield): + pass + + +class VmConnectReplyBody(Bitfield): + status: ReplyStatus = bf_int_enum(ReplyStatus, 8) + + +class VmDisconnectBody(Bitfield): + pass + + +class VmDisconnectReplyBody(Bitfield): + status: ReplyStatus = bf_int_enum(ReplyStatus, 8)