From 3598e21d9f79f91bc0dde932e5f117128bdabe78 Mon Sep 17 00:00:00 2001 From: SeanAkin <56386301+SeanAkin@users.noreply.github.com> Date: Thu, 23 Apr 2026 22:24:24 +0100 Subject: [PATCH 1/2] Add ESP32 Homestation Support --- packages/homestation/ESP32/.micropico | 3 + .../homestation/ESP32/.vscode/extensions.json | 7 + .../homestation/ESP32/.vscode/settings.json | 16 + packages/homestation/ESP32/SETUP.md | 101 ++ packages/homestation/ESP32/ble_handler.py | 95 ++ packages/homestation/ESP32/config.json | 22 + .../homestation/ESP32/lib/mqtt_as/__init__.py | 952 ++++++++++++++++++ packages/homestation/ESP32/main.py | 55 + packages/homestation/ESP32/mqtt_handler.py | 43 + 9 files changed, 1294 insertions(+) create mode 100644 packages/homestation/ESP32/.micropico create mode 100644 packages/homestation/ESP32/.vscode/extensions.json create mode 100644 packages/homestation/ESP32/.vscode/settings.json create mode 100644 packages/homestation/ESP32/SETUP.md create mode 100644 packages/homestation/ESP32/ble_handler.py create mode 100644 packages/homestation/ESP32/config.json create mode 100644 packages/homestation/ESP32/lib/mqtt_as/__init__.py create mode 100644 packages/homestation/ESP32/main.py create mode 100644 packages/homestation/ESP32/mqtt_handler.py diff --git a/packages/homestation/ESP32/.micropico b/packages/homestation/ESP32/.micropico new file mode 100644 index 0000000..3de3977 --- /dev/null +++ b/packages/homestation/ESP32/.micropico @@ -0,0 +1,3 @@ +{ + "info": "This file is just used to identify a project folder." +} \ No newline at end of file diff --git a/packages/homestation/ESP32/.vscode/extensions.json b/packages/homestation/ESP32/.vscode/extensions.json new file mode 100644 index 0000000..11805ca --- /dev/null +++ b/packages/homestation/ESP32/.vscode/extensions.json @@ -0,0 +1,7 @@ +{ + "recommendations": [ + "ms-python.python", + "ms-python.vscode-pylance", + "paulober.pico-w-go" + ] +} \ No newline at end of file diff --git a/packages/homestation/ESP32/.vscode/settings.json b/packages/homestation/ESP32/.vscode/settings.json new file mode 100644 index 0000000..7f3849e --- /dev/null +++ b/packages/homestation/ESP32/.vscode/settings.json @@ -0,0 +1,16 @@ +{ + "python.languageServer": "Pylance", + "python.analysis.typeCheckingMode": "basic", + "python.analysis.diagnosticSeverityOverrides": { + "reportMissingModuleSource": "none" + }, + "python.terminal.activateEnvironment": false, + "micropico.openOnStart": true, + "python.analysis.typeshedPaths": [ + "~/.micropico-stubs/included" + ], + "python.analysis.extraPaths": [ + "~/.micropico-stubs/included", + "./lib" + ] +} \ No newline at end of file diff --git a/packages/homestation/ESP32/SETUP.md b/packages/homestation/ESP32/SETUP.md new file mode 100644 index 0000000..4b8f640 --- /dev/null +++ b/packages/homestation/ESP32/SETUP.md @@ -0,0 +1,101 @@ +# Homestation ESP32 Setup + +## Requirements + +- ESP32 with MicroPython v1.24.1+ flashed +- VS Code with MicroPico extension + +## 1. Install packages on the ESP32 + +Connect to the board via the MicroPico REPL and run the following. The board must be connected to WiFi first. + +```python +# Connect to WiFi +import network +wlan = network.WLAN(network.STA_IF) +wlan.active(True) +wlan.connect("your-ssid", "your-password") + +# Install packages +import mip +mip.install("aioble") +``` + +## 2. Configure + +Edit `config.json` with your settings before uploading: + +```json +{ + "wifi": { + "ssid": "your-wifi-ssid", + "password": "your-wifi-password" + }, + "mqtt": { + "broker": "your-mqtt-broker-hostname", + "port": 1883, + "topic": "/your-traccar-device-topic", + "client_id": "your-findmycat-device-imei" + }, + "location": { + "latitude": 0.0, + "longitude": 0.0, + "accuracy": 5.00 + }, + "ble": { + "collar_name_prefix": "FMC", ## This MUST correspond to the collars advertising BLE packet on the indoor location engine DEFAULT: FMC + "homestation_name": "FindMyCatHomeStation", ## This MUST correspond to what the indoor location engine is looking for DEFAULT: FindMyCatHomeStation + "presence_timeout_ms": 30000 ## Don't change this if you're not sure what you're doing + } +} +``` + +## 3. Upload files + +Right-click the project root in VS Code and select **Upload project to Pico**. This uploads all files preserving the directory structure, including `lib/mqtt_as/__init__.py`. + +## 4. Verify files on the board + +In the MicroPico REPL, confirm all files landed correctly: + +```python +import os + +# Root — should contain main.py, ble_handler.py, mqtt_handler.py, config.json +os.listdir("/") + +# lib — should contain mqtt_as +os.listdir("/lib") + +# mqtt_as package — should contain __init__.py +os.listdir("/lib/mqtt_as") +``` + +## 5. Run + +Reset the board (press EN/RST or run `machine.reset()` in the REPL). You should see: + +``` +[MQTT] Connected to broker +[BLE] Advertising as: FindMyCatHomeStation +[BLE] Collar not home +``` + +When the collar comes into range: + +``` +[BLE] Collar Found +[BLE] Collar home +[MQTT] Published: {"msg": "..."} +``` + +## Testing without a collar + +To verify the WiFi → MQTT pipeline without a collar nearby, uncomment `periodic_trigger_loop` in `main.py`. This publishes a location update every 60 seconds regardless of BLE state. + +--- + +## Additional Info + +- **mqtt_as** (Peter Hinch) — the MQTT client library bundled in `lib/mqtt_as/`. Handles WiFi and MQTT reconnection automatically: https://github.com/peterhinch/micropython-mqtt +- **aioble** (MicroPython) — official async BLE library, installed via `mip.install("aioble")`: https://github.com/micropython/micropython-lib/tree/master/micropython/bluetooth/aioble diff --git a/packages/homestation/ESP32/ble_handler.py b/packages/homestation/ESP32/ble_handler.py new file mode 100644 index 0000000..837acdc --- /dev/null +++ b/packages/homestation/ESP32/ble_handler.py @@ -0,0 +1,95 @@ +import aioble # type: ignore — installed on device via mip +import uasyncio as asyncio +import time +from micropython import const + +try: + from typing import Optional +except ImportError: + pass # typing is not available on MicroPython at runtime + +# Advertising interval in microseconds (20ms) +_ADV_INTERVAL_US = const(20_000) + +# BLE scan parameters +_SCAN_DURATION_MS = const(5_000) +_SCAN_INTERVAL_US = const(30_000) +_SCAN_WINDOW_US = const(30_000) + +_STATUS_LOG_INTERVAL_MS = const(10_000) + + +class BLEHandler: + def __init__(self, config: dict) -> None: + self._collar_prefix: str = config["ble"]["collar_name_prefix"] + self._homestation_name: str = config["ble"]["homestation_name"] + self._presence_timeout_ms: int = config["ble"]["presence_timeout_ms"] + self._last_seen_ms: Optional[int] = None + self._last_battery: Optional[str] = None + self._last_publish_ms: Optional[int] = None + self._last_status_log_ms: int = 0 + + async def advertise_loop(self) -> None: + """Broadcast as FindMyCatHomeStation indefinitely so the collar can detect us.""" + print("[BLE] Advertising as:", self._homestation_name) + while True: + try: + await aioble.advertise( + _ADV_INTERVAL_US, + name=self._homestation_name, + connectable=False, + ) + except Exception as e: + print("[BLE] Advertise error:", e) + await asyncio.sleep(1) + + async def scan_loop(self) -> None: + """Continuously scan for the collar and record its battery level.""" + while True: + try: + async with aioble.scan( + _SCAN_DURATION_MS, + interval_us=_SCAN_INTERVAL_US, + window_us=_SCAN_WINDOW_US, + active=True, + ) as scanner: + async for result in scanner: + if result.name() and result.name().startswith(self._collar_prefix): ## TODO: Maybe we should do exact match rather than startswith? + self._last_seen_ms = time.ticks_ms() + mfr = result.manufacturer() + if mfr: + try: + _, data = mfr + self._last_battery = bytes(data).decode("utf-8").strip() ## TODO: Untested, need to test but in theory this should work (I think :D) + except Exception: + pass + except Exception as e: + print("[BLE] Scan error:", e) + + now = time.ticks_ms() + if time.ticks_diff(now, self._last_status_log_ms) >= _STATUS_LOG_INTERVAL_MS: ## Just so we don't spam the console with log events (useful when debugging) + if self.collar_is_home(): + print("[BLE] Collar Found") + else: + print("[BLE] Collar not home") + self._last_status_log_ms = now + + def collar_is_home(self) -> bool: + """Return True if collar was seen within the presence timeout window.""" + if self._last_seen_ms is None: + return False + return time.ticks_diff(time.ticks_ms(), self._last_seen_ms) < self._presence_timeout_ms + + def should_publish(self) -> bool: + """Return True if collar is home and we haven't published in the last 30 seconds.""" + if not self.collar_is_home(): + return False + if self._last_publish_ms is None: + return True + return time.ticks_diff(time.ticks_ms(), self._last_publish_ms) >= self._presence_timeout_ms + + def mark_published(self) -> None: + self._last_publish_ms = time.ticks_ms() + + def get_battery(self) -> Optional[str]: + return self._last_battery diff --git a/packages/homestation/ESP32/config.json b/packages/homestation/ESP32/config.json new file mode 100644 index 0000000..7341b47 --- /dev/null +++ b/packages/homestation/ESP32/config.json @@ -0,0 +1,22 @@ +{ + "wifi": { + "ssid": "your-wifi-ssid", + "password": "your-wifi-password" + }, + "mqtt": { + "broker": "your-mqtt-broker-hostname", + "port": 1883, + "topic": "/your-traccar-device-topic", + "client_id": "your-findmycat-device-imei" + }, + "location": { + "latitude": 0.0, + "longitude": 0.0, + "accuracy": 5.00 + }, + "ble": { + "collar_name_prefix": "FMC", + "homestation_name": "FindMyCatHomeStation", + "presence_timeout_ms": 30000 + } +} diff --git a/packages/homestation/ESP32/lib/mqtt_as/__init__.py b/packages/homestation/ESP32/lib/mqtt_as/__init__.py new file mode 100644 index 0000000..93d196d --- /dev/null +++ b/packages/homestation/ESP32/lib/mqtt_as/__init__.py @@ -0,0 +1,952 @@ +# mqtt_as.py Asynchronous version of umqtt.robust +# (C) Copyright Peter Hinch 2017-2025. +# Released under the MIT licence. + +# Pyboard D support added also RP2/default +# Various improvements contributed by Kevin Köck +# V5 support added by Bob Veringa. +# Also other contributors. + +import gc +import socket +import struct +import time + +gc.collect() +from binascii import hexlify +import asyncio + +gc.collect() +from time import ticks_ms, ticks_diff +from errno import EINPROGRESS, ETIMEDOUT + +gc.collect() +from micropython import const +from machine import unique_id +import network + +gc.collect() +from sys import platform, implementation + +VERSION = (0, 8, 5) +# Default initial size for input messge buffer. Increase this if large messages +# are expected, but rarely, to avoid big runtime allocations +IBUFSIZE = 50 +# By default the callback interface returns and incoming message as bytes. +# For performance reasons with large messages it may return a memoryview. +MSG_BYTES = True + +# Legitimate errors while waiting on a socket. See uasyncio __init__.py open_connection(). +ESP32 = platform == "esp32" +RP2 = platform == "rp2" +NINA = RP2 and implementation._machine.startswith("Arduino") # ublox Nina radio +if ESP32: + # https://forum.micropython.org/viewtopic.php?f=16&t=3608&p=20942#p20942 + BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, 118, 119] # Add in weird ESP32 errors +elif RP2 and not NINA: + BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, -110] +else: + BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT] + +ESP8266 = platform == "esp8266" +PYBOARD = platform == "pyboard" + + +# Default "do little" coro for optional user replacement +async def eliza(*_): # e.g. via set_wifi_handler(coro): see test program + await asyncio.sleep_ms(0) + + +class MsgQueue: + def __init__(self, size): + self._q = [0 for _ in range(max(size, 4))] + self._size = size + self._wi = 0 + self._ri = 0 + self._evt = asyncio.Event() + self.discards = 0 + + def put(self, *v): + self._q[self._wi] = v + self._evt.set() + self._wi = (self._wi + 1) % self._size + if self._wi == self._ri: # Would indicate empty + self._ri = (self._ri + 1) % self._size # Discard a message + self.discards += 1 + + def __aiter__(self): + return self + + async def __anext__(self): + if self._ri == self._wi: # Empty + self._evt.clear() + await self._evt.wait() + r = self._q[self._ri] + self._ri = (self._ri + 1) % self._size + return r + + +config = { + "client_id": hexlify(unique_id()), + "server": None, + "port": 0, + "user": "", + "password": "", + "keepalive": 60, + "ping_interval": 0, + "ssl": False, + "ssl_params": {}, + "response_time": 10, + "clean_init": True, + "clean": True, + "max_repubs": 4, + "will": None, + "subs_cb": lambda *_: None, + "wifi_coro": eliza, + "connect_coro": eliza, + "ssid": None, + "wifi_pw": None, + "queue_len": 0, + "gateway": False, + "mqttv5": False, + "mqttv5_con_props": None, +} + + +class MQTTException(Exception): + pass + + +def pid_gen(): + pid = 0 + while True: + pid = pid + 1 if pid < 65535 else 1 + yield pid + + +def qos_check(qos): + if not (qos == 0 or qos == 1): + raise ValueError("Only qos 0 and 1 are supported.") + + +# Populate a byte array with a variable byte integer. Args: buf the bytearray, +# offs: start offset. x the value. Returns the end offset. +# 1-4 bytes allowed, encoding up to 268,435,455 (V3.1.1 table 2.4). No point trapping this. +def vbi(buf: bytearray, offs: int, x: int): + buf[offs] = x & 0x7F + if x := x >> 7: + buf[offs] |= 0x80 + return vbi(buf, offs + 1, x) if x else (offs + 1) + + +encode_properties = None +decode_properties = None + + +class MQTT_base: + REPUB_COUNT = 0 # TEST + DEBUG = False + + def __init__(self, config): + self._events = config["queue_len"] > 0 + # MQTT config + self._client_id = config["client_id"] + self._user = config["user"] + self._pswd = config["password"] + self._keepalive = config["keepalive"] + if self._keepalive >= 65536: + raise ValueError("invalid keepalive time") + self._response_time = config["response_time"] * 1000 # Repub if no PUBACK received (ms). + self._max_repubs = config["max_repubs"] + self._clean_init = config["clean_init"] # clean_session state on first connection + self._clean = config["clean"] # clean_session state on reconnect + will = config["will"] + if will is None: + self._lw_topic = False + else: + self._set_last_will(*will) + # WiFi config + self._ssid = config["ssid"] # Required for ESP32 / Pyboard D. Optional ESP8266 + self._wifi_pw = config["wifi_pw"] + self._ssl = config["ssl"] + self._ssl_params = config["ssl_params"] + # Callbacks and coros + if self._events: + self.up = asyncio.Event() + self.down = asyncio.Event() + self.queue = MsgQueue(config["queue_len"]) + self._cb = self.queue.put + else: # Callbacks + self._cb = config["subs_cb"] + self._wifi_handler = config["wifi_coro"] + self._connect_handler = config["connect_coro"] + # Network + self.port = config["port"] + if self.port == 0: + self.port = 8883 if self._ssl else 1883 + self.server = config["server"] + if self.server is None: + raise ValueError("no server specified.") + self._sock = None + self._sta_if = network.WLAN(network.STA_IF) + self._sta_if.active(True) + if config["gateway"]: # Called from gateway (hence ESP32). + import aioespnow # Set up ESPNOW + + while not (sta := self._sta_if).active(): + time.sleep(0.1) + sta.config(pm=sta.PM_NONE) # No power management + sta.active(True) + self._espnow = aioespnow.AIOESPNow() # Returns AIOESPNow enhanced with async support + self._espnow.active(True) + + self.newpid = pid_gen() + self.rcv_pids = set() # PUBACK and SUBACK pids awaiting ACK response + self.last_rx = ticks_ms() # Time of last communication from broker + self.lock = asyncio.Lock() + self._ibuf = bytearray(IBUFSIZE) + self._mvbuf = memoryview(self._ibuf) + + self.mqttv5 = config.get("mqttv5") + self.mqttv5_con_props = config.get("mqttv5_con_props") + self.topic_alias_maximum = 0 + + if self.mqttv5: + global encode_properties, decode_properties + from .mqtt_v5_properties import encode_properties, decode_properties # noqa + + def _set_last_will(self, topic, msg, retain=False, qos=0): + qos_check(qos) + if not topic: + raise ValueError("Empty topic.") + self._lw_topic = topic + self._lw_msg = msg + self._lw_qos = qos + self._lw_retain = retain + + def dprint(self, msg, *args): + if self.DEBUG: + print(msg % args) + + def _timeout(self, t): + return ticks_diff(ticks_ms(), t) > self._response_time + + async def _as_read(self, n, sock=None): # OSError caught by superclass + if sock is None: + sock = self._sock + # Ensure input buffer is big enough to hold data. It keeps the new size + oflow = n - len(self._ibuf) + if oflow > 0: # Grow the buffer and re-create the memoryview + # Avoid too frequent small allocations by adding some extra bytes + self._ibuf.extend(bytearray(oflow + 50)) + self._mvbuf = memoryview(self._ibuf) + buffer = self._mvbuf + size = 0 + t = ticks_ms() + while size < n: + if self._timeout(t) or not self.isconnected(): + raise OSError(-1, "Timeout on socket read") + try: + msg_size = sock.readinto(buffer[size:], n - size) + except OSError as e: # ESP32 issues weird 119 errors here + msg_size = None + if e.args[0] not in BUSY_ERRORS: + raise + if msg_size == 0: # Connection closed by host + raise OSError(-1, "Connection closed by host") + if msg_size is not None: # data received + size += msg_size + t = ticks_ms() + self.last_rx = ticks_ms() + await asyncio.sleep_ms(0) + return buffer[:n] + + async def _as_write(self, bytes_wr, length=0, sock=None): + if sock is None: + sock = self._sock + + # Wrap bytes in memoryview to avoid copying during slicing + bytes_wr = memoryview(bytes_wr) + if length: + bytes_wr = bytes_wr[:length] + t = ticks_ms() + while bytes_wr: + if self._timeout(t) or not self.isconnected(): + raise OSError(-1, "Timeout on socket write") + try: + n = sock.write(bytes_wr) + except OSError as e: # ESP32 issues weird 119 errors here + n = 0 + if e.args[0] not in BUSY_ERRORS: + raise + if n: + t = ticks_ms() + bytes_wr = bytes_wr[n:] + await asyncio.sleep_ms(0) + + async def _send_str(self, s): + await self._as_write(struct.pack("!H", len(s))) + await self._as_write(s) + + # Receive a Variable Byte Integer and decode. + async def _recv_len(self, d=0, i=0): + s = (await self._as_read(1))[0] + d |= (s & 0x7F) << (i * 7) + return await self._recv_len(d, i + 1) if (s & 0x80) else (d, i + 1) + + async def _connect(self, clean): + mqttv5 = self.mqttv5 # Cache local + self._sock = socket.socket() + self._sock.setblocking(False) + try: + self._sock.connect(self._addr) + except OSError as e: + if e.args[0] not in BUSY_ERRORS: + raise + await asyncio.sleep_ms(0) + self.dprint("Connecting to broker.") + if self._ssl: + try: + import ssl + except ImportError: + import ussl as ssl + + self._sock = ssl.wrap_socket(self._sock, **self._ssl_params) + premsg = bytearray(b"\x10\0\0\0\0\0") + msg = bytearray(b"\x04MQTT\x00\0\0\0") + msg[5] = 0x05 if mqttv5 else 0x04 + + sz = 10 + 2 + len(self._client_id) + msg[6] = clean << 1 + if self._user: + sz += 2 + len(self._user) + 2 + len(self._pswd) + msg[6] |= 0xC0 + if self._keepalive: + msg[7] |= self._keepalive >> 8 + msg[8] |= self._keepalive & 0x00FF + if self._lw_topic: + sz += 2 + len(self._lw_topic) + 2 + len(self._lw_msg) + if mqttv5: + # Extra for the will properties + sz += 1 + msg[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 + msg[6] |= self._lw_retain << 5 + + if mqttv5: + properties = encode_properties(self.mqttv5_con_props) + sz += len(properties) + + i = vbi(premsg, 1, sz) # sz -> Variable Byte Integer + await self._as_write(premsg, i + 1) + await self._as_write(msg) + if mqttv5: + await self._as_write(properties) + + await self._send_str(self._client_id) + if self._lw_topic: + if mqttv5: + # We don't support will properties, so we send 0x00 for properties length + await self._as_write(b"\x00") + await self._send_str(self._lw_topic) + await self._send_str(self._lw_msg) + if self._user: + await self._send_str(self._user) + await self._send_str(self._pswd) + # Await CONNACK + # read causes ECONNABORTED if broker is out; triggers a reconnect. + del premsg, msg + packet_type = await self._as_read(1) + if packet_type[0] != 0x20: + raise OSError(-1, "CONNACK not received") + # The connect packet has changed, so size might be different now. But + # we can still handle it the same for 3.1.1 and v5 + sz, _ = await self._recv_len() + if not mqttv5 and sz != 2: + raise OSError(-1, "Invalid CONNACK packet") + + # Only read the first 2 bytes, as properties have their own length + connack_resp = await self._as_read(2) + + # Connect ack flags + if connack_resp[0] != 0: + raise OSError(-1, "CONNACK flags not 0") + # Reason code + if connack_resp[1] != 0: + # On MQTTv5 Reason codes below 128 may need to be handled + # differently. For now, we just raise an error. Spec is a bit weird + # on this. + raise OSError(-1, "CONNACK reason code 0x%x" % connack_resp[1]) + + del connack_resp + if not mqttv5: + # If we are not on MQTTv5 we can stop here + return + + connack_props_length, _ = await self._recv_len() + if connack_props_length > 0: + connack_props = await self._as_read(connack_props_length) + decoded_props = decode_properties(connack_props, connack_props_length) + self.dprint("CONNACK properties: %s", decoded_props) + self.topic_alias_maximum = decoded_props.get(0x22, 0) + + async def _ping(self): + async with self.lock: + await self._as_write(b"\xc0\0") + + # Check internet connectivity by sending DNS lookup to Google's 8.8.8.8 + async def wan_ok( + self, + packet=b"$\x1a\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x06google\x03com\x00\x00\x01\x00\x01", + ): + if not self.isconnected(): # WiFi is down + return False + length = 32 # DNS query and response packet size + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.setblocking(False) + s.connect(("8.8.8.8", 53)) + await asyncio.sleep(1) + async with self.lock: + try: + await self._as_write(packet, sock=s) + await asyncio.sleep(2) + res = await self._as_read(length, s) + if len(res) == length: + return True # DNS response size OK + except OSError: # Timeout on read: no connectivity. + return False + finally: + s.close() + return False + + async def broker_up(self): # Test broker connectivity + if not self.isconnected(): + return False + tlast = self.last_rx + if ticks_diff(ticks_ms(), tlast) < 1000: + return True + try: + await self._ping() + except OSError: + return False + t = ticks_ms() + while not self._timeout(t): + await asyncio.sleep_ms(100) + if ticks_diff(self.last_rx, tlast) > 0: # Response received + return True + return False + + async def disconnect(self): + if self._sock is not None: + await self._kill_tasks(False) # Keep socket open + try: + async with self.lock: + self._sock.write(b"\xe0\0") # Close broker connection + await asyncio.sleep_ms(100) + except OSError: + pass + self._close() + self._has_connected = False + + def _close(self): + if self._sock is not None: + self._sock.close() + + def close(self): # API. See https://github.com/peterhinch/micropython-mqtt/issues/60 + self._close() + try: + self._sta_if.disconnect() # Disconnect Wi-Fi to avoid errors + except OSError: + self.dprint("Wi-Fi not started, unable to disconnect interface") + self._sta_if.active(False) + + async def _await_pid(self, pid): + t = ticks_ms() + while pid in self.rcv_pids: # local copy + if self._timeout(t) or not self.isconnected(): + break # Must repub or bail out + await asyncio.sleep_ms(100) + else: + return True # PID received. All done. + return False + + # qos == 1: coro blocks until wait_msg gets correct PID. + # If WiFi fails completely subclass re-publishes with new PID. + async def publish(self, topic, msg, retain, qos, properties=None): + pid = next(self.newpid) + if qos: + self.rcv_pids.add(pid) + async with self.lock: + await self._publish(topic, msg, retain, qos, 0, pid, properties) + if qos == 0: + return + + count = 0 + while 1: # Await PUBACK, republish on timeout + if await self._await_pid(pid): + return + # No match + if count >= self._max_repubs or not self.isconnected(): + raise OSError(-1) # Subclass to re-publish with new PID + async with self.lock: + # Add pid + await self._publish(topic, msg, retain, qos, dup=1, pid=pid, properties=properties) + count += 1 + self.REPUB_COUNT += 1 + + async def _publish(self, topic, msg, retain, qos, dup, pid, properties=None): + pkt = bytearray(b"\x30\0\0\0") + pkt[0] |= qos << 1 | retain | dup << 3 + sz = 2 + len(topic) + len(msg) + if qos > 0: + sz += 2 + + if self.mqttv5: + properties = encode_properties(properties) + sz += len(properties) + + await self._as_write(pkt, vbi(pkt, 1, sz)) # Encode size as VBI + await self._send_str(topic) + if qos > 0: + struct.pack_into("!H", pkt, 0, pid) + await self._as_write(pkt, 2) + if self.mqttv5: + await self._as_write(properties) + await self._as_write(msg) + + async def subscribe(self, topic, qos, properties=None): + await self._usub(topic, qos, properties) + + async def unsubscribe(self, topic, properties=None): + await self._usub(topic, None, properties) + + # Subscribe/unsubscribe + # Can raise OSError if WiFi fails. Subclass traps. + async def _usub(self, topic, qos, properties): + sub = qos is not None + pkt = bytearray(7) + pkt[0] = 0x82 if sub else 0xA2 + pid = next(self.newpid) + self.rcv_pids.add(pid) + # 2 bytes of PID + 2 bytes of topic length + len(topic) + sz = 2 + 2 + len(topic) + (1 if sub else 0) + if self.mqttv5: + # Return length as VBI followed by properties or b'\0' + properties = encode_properties(properties) + sz += len(properties) + offs = vbi(pkt, 1, sz) # Store size as variable byte integer + struct.pack_into("!H", pkt, offs, pid) + + async with self.lock: + await self._as_write(pkt, offs + 2) + if self.mqttv5: + await self._as_write(properties) + await self._send_str(topic) + if sub: + # Only QoS is supported other features such as: + # (NL) No Local, (RAP) Retain As Published and Retain Handling. + # Are not supported. + await self._as_write(qos.to_bytes(1, "little")) + + if not await self._await_pid(pid): + raise OSError(-1) + + # Remove a pending pid after a successful receive. + def kill_pid(self, pid, msg): + if pid in self.rcv_pids: + self.rcv_pids.discard(pid) + else: + raise OSError(-1, f"Invalid pid in {msg} packet") + + # Wait for a single incoming MQTT message and process it. + # Subscribed messages are delivered to a callback previously + # set by .setup() method. Other (internal) MQTT + # messages processed internally. + # Immediate return if no data available. Called from ._handle_msg(). + async def wait_msg(self): + mqttv5 = self.mqttv5 # Cache local + try: + res = self._sock.read(1) # Throws OSError on WiFi fail + except OSError as e: + if e.args[0] in BUSY_ERRORS: # Needed by RP2 + await asyncio.sleep_ms(0) + return + raise + + if res is None: + return + if res == b"": + raise OSError(-1, "Empty response") # Can happen on broker fail + + if res == b"\xd0": # PINGRESP + await self._as_read(1) # Update .last_rx time + return + op = res[0] + + if op == 0x40: # PUBACK + sz, _ = await self._recv_len() + if not mqttv5 and sz != 2: + raise OSError(-1, "Invalid PUBACK packet") + rcv_pid = await self._as_read(2) + pid = rcv_pid[0] << 8 | rcv_pid[1] + # For some reason even on MQTTv5 reason code is optional + if sz != 2: + reason_code = await self._as_read(1) + reason_code = reason_code[0] + if reason_code >= 0x80: + raise OSError(-1, "PUBACK reason code 0x%x" % reason_code) + if sz > 3: + puback_props_sz, _ = await self._recv_len() + if puback_props_sz > 0: + puback_props = await self._as_read(puback_props_sz) + decoded_props = decode_properties(puback_props, puback_props_sz) + self.dprint("PUBACK properties %s", decoded_props) + # No exception thrown: PUBACK successfuly received. Remove pending PID + self.kill_pid(pid, "PUBACK") + + if op == 0x90 or op == 0xB0: # [UN]SUBACK + un = "UN" if op == 0xB0 else "" + suback = op == 0x90 + sz, _ = await self._recv_len() + rcv_pid = await self._as_read(2) + pid = rcv_pid[0] << 8 | rcv_pid[1] + sz -= 2 + # Handle properties + if mqttv5: + suback_props_sz, sz_len = await self._recv_len() + sz -= sz_len + sz -= suback_props_sz + if suback_props_sz > 0: + suback_props = await self._as_read(suback_props_sz) + decoded_props = decode_properties(suback_props, suback_props_sz) + self.dprint("[UN] SUBACK properties %s", decoded_props) + + if sz > 1: + raise OSError(-1, "Got too many bytes") + if suback or mqttv5: + reason_code = await self._as_read(sz) + reason_code = reason_code[0] + if reason_code >= 0x80: + raise OSError(-1, f"{un}SUBACK reason code 0x{reason_code:x}") + self.kill_pid(pid, f"{un}SUBACK") + + if op == 0xE0: # DISCONNECT + if mqttv5: + sz, _ = await self._recv_len() + reason_code = await self._as_read(1) + reason_code = reason_code[0] + + sz -= 1 + if sz > 0: + dis_props_sz, dis_len = await self._recv_len() + sz -= dis_len + disconnect_props = await self._as_read(dis_props_sz) + decoded_props = decode_properties(disconnect_props, dis_props_sz) + self.dprint("DISCONNECT properties %s", decoded_props) + + if reason_code >= 0x80: + raise OSError(-1, "DISCONNECT reason code 0x%x" % reason_code) + + if op & 0xF0 != 0x30: + return + + sz, _ = await self._recv_len() + topic_len = await self._as_read(2) + topic_len = (topic_len[0] << 8) | topic_len[1] + topic = await self._as_read(topic_len) + topic = bytes(topic) # Copy before re-using the read buffer + sz -= topic_len + 2 + # MQTT V3.1.1 section 2.3.1 non-normative comment. Get server PID. + if op & 6: # This is distinct from client PIDs. + pid = await self._as_read(2) + pid = pid[0] << 8 | pid[1] + sz -= 2 + + decoded_props = None + if mqttv5: + pub_props_sz, pub_props_sz_len = await self._recv_len() + sz -= pub_props_sz_len + sz -= pub_props_sz + if pub_props_sz > 0: + pub_props = await self._as_read(pub_props_sz) + decoded_props = decode_properties(pub_props, pub_props_sz) + + msg = await self._as_read(sz) + # In event mode we must copy the message otherwise .queue contents will be wrong: + # every entry would contain the same message. + # In callback mode not copying the message is OK so long as the callback is purely + # synchronous. Overruns can't occur because of the lock. + if self._events or MSG_BYTES: + msg = bytes(msg) + retained = op & 0x01 + args = [topic, msg, bool(retained)] + if mqttv5: + args.append(decoded_props) + self._cb(*args) + + if op & 6 == 2: # qos 1 + pkt = bytearray(b"\x40\x02\0\0") # Send PUBACK + struct.pack_into("!H", pkt, 2, pid) + await self._as_write(pkt) + elif op & 6 == 4: # qos 2 not supported + raise OSError(-1, "QoS 2 not supported") + + +# MQTTClient class. Handles issues relating to connectivity. + + +class MQTTClient(MQTT_base): + def __init__(self, config): + super().__init__(config) + self._isconnected = False # Current connection state + keepalive = 1000 * self._keepalive # ms + self._ping_interval = keepalive // 4 if keepalive else 20000 + p_i = config["ping_interval"] * 1000 # Can specify shorter e.g. for subscribe-only + if p_i and p_i < self._ping_interval: + self._ping_interval = p_i + self._in_connect = False + self._has_connected = False # Define 'Clean Session' value to use. + self._tasks = [] + if ESP8266: + import esp + + esp.sleep_type(0) # Improve connection integrity at cost of power consumption. + + async def wifi_connect(self, quick=False): + s = self._sta_if + if ESP8266: + if s.isconnected(): # 1st attempt, already connected. + return + s.active(True) + s.connect() # ESP8266 remembers connection. + for _ in range(60): + # Break out on fail or success. Check once per sec. + if s.status() != network.STAT_CONNECTING: + break + await asyncio.sleep(1) + # might hang forever awaiting dhcp lease renewal or something else + if s.status() == network.STAT_CONNECTING: + s.disconnect() + await asyncio.sleep(1) + if not s.isconnected() and self._ssid is not None and self._wifi_pw is not None: + s.connect(self._ssid, self._wifi_pw) + # Break out on fail or success. Check once per sec. + while s.status() == network.STAT_CONNECTING: + await asyncio.sleep(1) + else: + s.active(True) + if RP2 and not NINA: # Disable auto-sleep. + # https://datasheets.raspberrypi.com/picow/connecting-to-the-internet-with-pico-w.pdf + # para 3.6.3 + s.config(pm=0xA11140) + s.connect(self._ssid, self._wifi_pw) + for _ in range(60): # Break out on fail or success. Check once per sec. + await asyncio.sleep(1) + # Loop while connecting or no IP + if s.isconnected(): + break + if ESP32: + # Status values >= STAT_IDLE can occur during connect: + # STAT_IDLE 1000, STAT_CONNECTING 1001, STAT_GOT_IP 1010 + # Error statuses are in range 200..204 + if s.status() < network.STAT_IDLE: + # pause as workaround to avoid persistent reconnect failures + # see https://github.com/peterhinch/micropython-mqtt/issues/132 for details + await asyncio.sleep(1) + break + elif PYBOARD: # No symbolic constants in network + if not 1 <= s.status() <= 2: + break + elif RP2 and not NINA: # 1 is STAT_CONNECTING. 2 reported by user (No IP?) + if not 1 <= s.status() <= 2: + break + else: # Timeout: still in connecting state + s.disconnect() + await asyncio.sleep(1) + + if not s.isconnected(): # Timed out + raise OSError("Wi-Fi connect timed out") + if not quick: # Skip on first connection only if power saving + # Ensure connection stays up for a few secs. + self.dprint("Checking WiFi integrity.") + for _ in range(5): + if not s.isconnected(): + raise OSError("Connection Unstable") # in 1st 5 secs + await asyncio.sleep(1) + self.dprint("Got reliable connection") + + async def connect(self, *, quick=False): # Quick initial connect option for battery apps + if not self._has_connected: + await self.wifi_connect(quick) # On 1st call, caller handles error + # Note this blocks if DNS lookup occurs. Do it once to prevent + # blocking during later internet outage: + self._addr = socket.getaddrinfo(self.server, self.port)[0][-1] + self._in_connect = True # Disable low level ._isconnected check + try: + is_clean = self._clean + if not self._has_connected and self._clean_init and not self._clean: + if self.mqttv5: + is_clean = True + else: + # Power up. Clear previous session data but subsequently save it. + # Issue #40 + await self._connect(True) # Connect with clean session + try: + async with self.lock: + self._sock.write(b"\xe0\0") # Force disconnect but keep socket open + except OSError: + pass + self.dprint("Waiting for disconnect") + await asyncio.sleep(2) # Wait for broker to disconnect + self.dprint("About to reconnect with unclean session.") + await self._connect(is_clean) + except Exception: + self._close() + self._in_connect = False # Caller may run .isconnected() + raise + self.rcv_pids.clear() + # If we get here without error broker/LAN must be up. + self._isconnected = True + self._in_connect = False # Low level code can now check connectivity. + if not self._events: + asyncio.create_task(self._wifi_handler(True)) # User handler. + if not self._has_connected: + self._has_connected = True # Use normal clean flag on reconnect. + asyncio.create_task(self._keep_connected()) + # Runs forever unless user issues .disconnect() + + asyncio.create_task(self._handle_msg()) # Task quits on connection fail. + self._tasks.append(asyncio.create_task(self._keep_alive())) + if self.DEBUG: + self._tasks.append(asyncio.create_task(self._memory())) + if self._events: + self.up.set() # Connectivity is up + else: + asyncio.create_task(self._connect_handler(self)) # User handler. + + # Launched by .connect(). Runs until connectivity fails. Checks for and + # handles incoming messages. + async def _handle_msg(self): + try: + while self.isconnected(): + async with self.lock: + await self.wait_msg() # Immediate return if no message + # https://github.com/peterhinch/micropython-mqtt/issues/166 + # A delay > 0 is necessary for webrepl compatibility. + await asyncio.sleep_ms(5) # Let other tasks get lock + + except OSError: + pass + self._reconnect() # Broker or WiFi fail. + + # Keep broker alive MQTT spec 3.1.2.10 Keep Alive. + # Runs until ping failure or no response in keepalive period. + async def _keep_alive(self): + while self.isconnected(): + pings_due = ticks_diff(ticks_ms(), self.last_rx) // self._ping_interval + if pings_due >= 4: + self.dprint("Reconnect: broker fail.") + break + await asyncio.sleep_ms(self._ping_interval) + try: + await self._ping() + except OSError: + break + self._reconnect() # Broker or WiFi fail. + + async def _kill_tasks(self, kill_skt): # Cancel running tasks + for task in self._tasks: + task.cancel() + self._tasks.clear() + await asyncio.sleep_ms(0) # Ensure cancellation complete + if kill_skt: # Close socket + self._close() + + # DEBUG: show RAM messages. + async def _memory(self): + while True: + await asyncio.sleep(20) + gc.collect() + self.dprint("RAM free %d alloc %d", gc.mem_free(), gc.mem_alloc()) + + def isconnected(self): + if self._in_connect: # Disable low-level check during .connect() + return True + + if self._isconnected and not self._sta_if.isconnected(): # It's going down. + self._reconnect() + return self._isconnected + + def _reconnect(self): # Schedule a reconnection if not underway. + if self._isconnected: + self._isconnected = False + asyncio.create_task(self._kill_tasks(True)) # Shut down tasks and socket + if self._events: # Signal an outage + self.down.set() + else: + asyncio.create_task(self._wifi_handler(False)) # User handler. + + # Await broker connection. + async def _connection(self): + while not self._isconnected: + await asyncio.sleep(1) + + # Scheduled on 1st successful connection. Runs forever maintaining wifi and + # broker connection. Must handle conditions at edge of WiFi range. + async def _keep_connected(self): + while self._has_connected: + if self.isconnected(): # Pause for 1 second + await asyncio.sleep(1) + gc.collect() + else: # Link is down, socket is closed, tasks are killed + try: + self._sta_if.disconnect() + except OSError: + self.dprint("Wi-Fi not started, unable to disconnect interface") + await asyncio.sleep(1) + try: + await self.wifi_connect() + except OSError: + continue + if not self._has_connected: # User has issued the terminal .disconnect() + self.dprint("Disconnected, exiting _keep_connected") + break + try: + await self.connect() + # Now has set ._isconnected and scheduled _connect_handler(). + self.dprint("Reconnect OK!") + except OSError as e: + self.dprint("Error in reconnect. %s", e) + # Can get ECONNABORTED or -1. The latter signifies no or bad CONNACK received. + self._close() # Disconnect and try again. + self._in_connect = False + self._isconnected = False + self.dprint("Disconnected, exited _keep_connected") + + async def subscribe(self, topic, qos=0, properties=None): + qos_check(qos) + while 1: + await self._connection() + try: + return await super().subscribe(topic, qos, properties) + except OSError: + pass + self._reconnect() # Broker or WiFi fail. + + async def unsubscribe(self, topic, properties=None): + while 1: + await self._connection() + try: + return await super().unsubscribe(topic, properties) + except OSError: + pass + self._reconnect() # Broker or WiFi fail. + + async def publish(self, topic, msg, retain=False, qos=0, properties=None): + qos_check(qos) + while 1: + await self._connection() + try: + return await super().publish(topic, msg, retain, qos, properties) + except OSError: + pass + self._reconnect() # Broker or WiFi fail. diff --git a/packages/homestation/ESP32/main.py b/packages/homestation/ESP32/main.py new file mode 100644 index 0000000..a6e501e --- /dev/null +++ b/packages/homestation/ESP32/main.py @@ -0,0 +1,55 @@ +import machine +import uasyncio as asyncio +import ujson +from ble_handler import BLEHandler +from mqtt_handler import MQTTHandler + +def load_config() -> dict: + with open("config.json") as f: + return ujson.load(f) + + +async def publish_loop(ble: BLEHandler, mqtt: MQTTHandler, wdt: machine.WDT) -> None: + """Check every 10 seconds if the collar is home and publish if needed.""" + while True: + wdt.feed() + await asyncio.sleep(10) + + if ble.should_publish(): + try: + await mqtt.publish(ble.get_battery()) + ble.mark_published() + except Exception as e: + print("[MQTT] Publish error:", e) + + +async def periodic_trigger_loop(mqtt: MQTTHandler) -> None: + """Publish a location update every 60 seconds regardless of collar presence (Testing Purposes)""" + while True: + await asyncio.sleep(60) + print("[TRIGGER] Periodic trigger firing") + try: + await mqtt.publish(battery=None) + except Exception as e: + print("[TRIGGER] Publish error:", e) + + +async def main() -> None: + config: dict = load_config() + + mqtt = MQTTHandler(config) + ble = BLEHandler(config) + + await mqtt.connect() + + wdt = machine.WDT(timeout=30000) + + await asyncio.gather( + ble.advertise_loop(), + ble.scan_loop(), + publish_loop(ble, mqtt, wdt), + # periodic_trigger_loop(mqtt), + ) + + +asyncio.run(main()) diff --git a/packages/homestation/ESP32/mqtt_handler.py b/packages/homestation/ESP32/mqtt_handler.py new file mode 100644 index 0000000..a08a532 --- /dev/null +++ b/packages/homestation/ESP32/mqtt_handler.py @@ -0,0 +1,43 @@ +import ujson +from mqtt_as import MQTTClient, config as _base_config # type: ignore — installed on device via mip + +try: + from typing import Optional +except ImportError: + pass # typing is not available on MicroPython at runtime + + +class MQTTHandler: + def __init__(self, config: dict) -> None: + self._topic: str = config["mqtt"]["topic"] + self._location: dict = config["location"] + + # mqtt_as manages both WiFi and MQTT connection internally + mqtt_config = dict(_base_config) + mqtt_config.update({ + "client_id": config["mqtt"]["client_id"], + "server": config["mqtt"]["broker"], + "port": config["mqtt"]["port"], + "ssid": config["wifi"]["ssid"], + "wifi_pw": config["wifi"]["password"], + "clean": True, + }) + + self._client = MQTTClient(mqtt_config) + + async def connect(self) -> None: + """Connect to WiFi and MQTT broker. mqtt_as handles both.""" + await self._client.connect() + print("[MQTT] Connected to broker") + + async def publish(self, battery: Optional[str]) -> None: + payload_str: str = "{},{},{},{},{}".format( + self._location["latitude"], + self._location["longitude"], + 0, + self._location["accuracy"], + battery if battery else "0", + ) + message: str = ujson.dumps({"msg": payload_str}) + await self._client.publish(self._topic, message, qos=1) + print("[MQTT] Published:", message) From 6fb41b138d270292d276acd530b575151bbe5e00 Mon Sep 17 00:00:00 2001 From: SeanAkin <56386301+SeanAkin@users.noreply.github.com> Date: Thu, 23 Apr 2026 22:34:30 +0100 Subject: [PATCH 2/2] Satellites comment --- packages/homestation/ESP32/mqtt_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/homestation/ESP32/mqtt_handler.py b/packages/homestation/ESP32/mqtt_handler.py index a08a532..f32e5fa 100644 --- a/packages/homestation/ESP32/mqtt_handler.py +++ b/packages/homestation/ESP32/mqtt_handler.py @@ -34,7 +34,7 @@ async def publish(self, battery: Optional[str]) -> None: payload_str: str = "{},{},{},{},{}".format( self._location["latitude"], self._location["longitude"], - 0, + 0, # Satellites self._location["accuracy"], battery if battery else "0", )