From 3e4ce429c764620b3e47e10d11ba1534faeda627 Mon Sep 17 00:00:00 2001 From: Ray Jones Date: Sun, 1 Sep 2019 14:43:00 +1000 Subject: [PATCH] BUG FIX: bad clientID due to bad MQTT library - causes disconnects with multiple AB's on one broker! IMPROVEMENT: MQTT reconnect implemented. --- lib/async-mqtt-client/src/AsyncMqttClient.cpp | 3 +- src/Afterburner.cpp | 7 +- src/WiFi/ABMqtt.cpp | 196 ++++++++++++++---- src/WiFi/ABmqtt.h | 4 + 4 files changed, 162 insertions(+), 48 deletions(-) diff --git a/lib/async-mqtt-client/src/AsyncMqttClient.cpp b/lib/async-mqtt-client/src/AsyncMqttClient.cpp index f62e1ef..60160be 100644 --- a/lib/async-mqtt-client/src/AsyncMqttClient.cpp +++ b/lib/async-mqtt-client/src/AsyncMqttClient.cpp @@ -37,7 +37,8 @@ AsyncMqttClient::AsyncMqttClient() _client.onPoll([](void* obj, AsyncClient* c) { (static_cast(obj))->_onPoll(c); }, this); #ifdef ESP32 - sprintf(_generatedClientId, "esp32%06x", ESP.getEfuseMac()); + // WHY OH WHY WON'T THE DEV DO THIS FIX (original is %06x - 32bits, excludes the important unique bits!) + sprintf(_generatedClientId, "esp32%06llx", ESP.getEfuseMac()); _xSemaphore = xSemaphoreCreateMutex(); #elif defined(ESP8266) sprintf(_generatedClientId, "esp8266%06x", ESP.getChipId()); diff --git a/src/Afterburner.cpp b/src/Afterburner.cpp index 97d9dbe..2c3cf16 100644 --- a/src/Afterburner.cpp +++ b/src/Afterburner.cpp @@ -123,8 +123,8 @@ #define RX_DATA_TIMOUT 50 const int FirmwareRevision = 31; -const int FirmwareSubRevision = 0; -const char* FirmwareDate = "31 Aug 2019"; +const int FirmwareSubRevision = 1; +const char* FirmwareDate = "1 Sep 2019"; #ifdef ESP32 @@ -1624,7 +1624,8 @@ void doStreaming() bHaveWebClient = doWebServer(); #endif //USE_WEBSERVER #if USE_MQTT == 1 - // MQTT is managed via callbacks!!! + // most MQTT is managed via callbacks, but need some sundry housekeeping + doMQTT(); #endif diff --git a/src/WiFi/ABMqtt.cpp b/src/WiFi/ABMqtt.cpp index 1d8cfd6..d6cd704 100644 --- a/src/WiFi/ABMqtt.cpp +++ b/src/WiFi/ABMqtt.cpp @@ -5,6 +5,7 @@ #include "../cfg/BTCConfig.h" #if USE_MQTT == 1 + #include #include "ABMqtt.h" #include "../../lib/async-mqtt-client/src/AsyncMqttClient.h" @@ -13,37 +14,68 @@ #include "../Utility/DebugPort.h" #include "../Utility/NVStorage.h" -//IPAddress testMQTTserver(5, 196, 95, 208); // test.mosquito.org -IPAddress testMQTTserver(18, 194, 98, 249); // broker.hivemq.com +#define USE_RTOS_MQTTTIMER +//#define USE_LOCAL_MQTTSTRINGS + +//IPAddress testMQTTserver(5, 196, 95, 208); // test.mosquito.org +//IPAddress testMQTTserver(18, 194, 98, 249); // broker.hivemq.com AsyncMqttClient MQTTclient; -TimerHandle_t mqttReconnectTimer = NULL; char topicnameJSONin[128]; +#ifdef USE_LOCAL_MQTTSTRINGS +char mqttHost[128]; +char mqttUser[32]; +char mqttPass[32]; +#endif + +#ifdef USE_RTOS_MQTTTIMER +TimerHandle_t mqttReconnectTimer = NULL; +#else +unsigned long mqttReconnect = 0; +#endif + void connectToMqtt() { - DebugPort.println("MQTT: Connecting..."); - MQTTclient.connect(); +#ifdef USE_RTOS_MQTTTIMER + xTimerStop(mqttReconnectTimer, 0); +#else + mqttReconnect = 0; +#endif + if(!MQTTclient.connected()) { + DebugPort.println("MQTT: Connecting..."); + if(NVstore.getMQTTinfo().enabled) { + MQTTclient.connect(); + } + } } void onMqttConnect(bool sessionPresent) { +#ifdef USE_RTOS_MQTTTIMER + xTimerStop(mqttReconnectTimer, 0); +#else + mqttReconnect = 0; +#endif + DebugPort.println("MQTT: Connected to broker."); // DebugPort.printf("Session present: %d\r\n", sessionPresent); - const sMQTTparams params = NVstore.getMQTTinfo(); - char topic[128]; - DebugPort.printf("MQTT: base topic name \"%s\"\r\n", params.topic); - sprintf(topicnameJSONin, "%s/JSONin", params.topic); + // create the topicname we use to accept incoming JSON + DebugPort.printf("MQTT: base topic name \"%s\"\r\n", NVstore.getMQTTinfo().topic); + sprintf(topicnameJSONin, "%s/JSONin", NVstore.getMQTTinfo().topic); + // subscribe to that topic DebugPort.printf("MQTT: Subscribing to \"%s\"\r\n", topicnameJSONin); - MQTTclient.subscribe(topicnameJSONin, params.qos); + MQTTclient.subscribe(topicnameJSONin, NVstore.getMQTTinfo().qos); + // spit out an "I'm here" message - sprintf(topic, "%s/Status", params.topic); - MQTTclient.publish(topic, params.qos, true, "onMqttConnect"); + char lcltopic[128]; + sprintf(lcltopic, "%s/Status", NVstore.getMQTTinfo().topic); + MQTTclient.publish(lcltopic, NVstore.getMQTTinfo().qos, true, "onMqttConnect"); #ifdef MQTT_DBG_LOOPBACK // testo - loopback - sprintf(topic, "%s/JSONout", params.topic); - MQTTclient.subscribe(topic, params.qos); + sprintflcl(topic, "%s/JSONout", NVstore.getMQTTinfo().topic); + MQTTclient.subscribe(lcltopic, NVstore.getMQTTinfo().qos); #endif resetJSONmoderator(); @@ -73,11 +105,28 @@ void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties } void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) { - DebugPort.println("MQTT: Disconnected!"); + DebugPort.print("MQTT: Disconnected, reason: "); + // ref: DisconnectReasons.hpp + switch(reason) { + case AsyncMqttClientDisconnectReason::TCP_DISCONNECTED: DebugPort.println("TCP disconnected"); break; + case AsyncMqttClientDisconnectReason::MQTT_UNACCEPTABLE_PROTOCOL_VERSION: DebugPort.println("protocol version"); break; + case AsyncMqttClientDisconnectReason::MQTT_IDENTIFIER_REJECTED: DebugPort.println("Identifier rejected"); break; + case AsyncMqttClientDisconnectReason::MQTT_SERVER_UNAVAILABLE: DebugPort.println("Server unavailable"); break; + case AsyncMqttClientDisconnectReason::MQTT_MALFORMED_CREDENTIALS: DebugPort.println("Malformed credentials"); break; + case AsyncMqttClientDisconnectReason::MQTT_NOT_AUTHORIZED: DebugPort.println("No authorised"); break; + case AsyncMqttClientDisconnectReason::ESP8266_NOT_ENOUGH_SPACE: DebugPort.println("Not enough space"); break; + case AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT: DebugPort.println("Bad TLS fingerprint"); break; + } -// if (WiFi.isConnected()) { -// xTimerStart(mqttReconnectTimer, 0); -// } + if (WiFi.isConnected()) { + if(NVstore.getMQTTinfo().enabled) { +#ifdef USE_RTOS_MQTTTIMER + xTimerStart(mqttReconnectTimer, 0); +#else + mqttReconnect = millis() + 5000; +#endif + } + } } void onMqttSubscribe(uint16_t packetId, uint8_t qos) { @@ -88,37 +137,57 @@ void onMqttSubscribe(uint16_t packetId, uint8_t qos) { bool mqttInit() { -// if(mqttReconnectTimer==NULL) -// mqttReconnectTimer = xTimerCreate("mqttTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0, reinterpret_cast(connectToMqtt)); +#ifdef USE_RTOS_MQTTTIMER + if(mqttReconnectTimer==NULL) + mqttReconnectTimer = xTimerCreate("mqttTimer", pdMS_TO_TICKS(2000), pdFALSE, (void*)0, reinterpret_cast(connectToMqtt)); +#else + mqttReconnect = 0; +#endif memset(topicnameJSONin, 0, sizeof(topicnameJSONin)); + + MQTTclient.disconnect(true); + long escape = millis() + 10000; + while(MQTTclient.connected()) { + long tDelta = millis()-escape; + if(tDelta > 0) { + DebugPort.println("MQTT: TIMEOUT waiting for broker disconnect"); + break; + } + } + const sMQTTparams params = NVstore.getMQTTinfo(); if(params.enabled) { - if(strlen(params.host)) { - MQTTclient.disconnect(); - long escape = millis() + 10000; - while(MQTTclient.connected()) { - long tDelta = millis()-escape; - if(tDelta > 0) { - DebugPort.println("MQTT: TIMEOUT waiting for broker disconnect"); - break; - } - } - DebugPort.printf("MQTT: setting broker to %s:%d\r\n", params.host, params.port); - MQTTclient.setServer(params.host, params.port); - MQTTclient.setCredentials(params.username, params.password); - static bool setCallbacks = false; - // callbacks should only be added once (vector of callbacks in client!) - if(!setCallbacks) { - MQTTclient.onConnect(onMqttConnect); - MQTTclient.onMessage(onMqttMessage); - MQTTclient.onDisconnect(onMqttDisconnect); - MQTTclient.onSubscribe(onMqttSubscribe); - setCallbacks = true; - } - MQTTclient.connect(); - return true; +#ifdef USE_LOCAL_MQTTSTRINGS + strncpy(mqttHost, params.host, 127); + strncpy(mqttUser, params.username, 31); + strncpy(mqttPass, params.password, 31); + mqttHost[127] = 0; + mqttUser[31] = 0; + mqttPass[31] = 0; + DebugPort.printf("MQTT: setting broker to %s:%d\r\n", mqttHost, params.port); + DebugPort.printf("MQTT: %s/%s\r\n", mqttUser, mqttPass); + MQTTclient.setServer(mqttHost, params.port); + MQTTclient.setCredentials(mqttUser, mqttPass); +#else + // the client only stores a pointer - this must not be a volatile memory location! + // - NO STACK vars!!! + DebugPort.printf("MQTT: setting broker to %s:%d\r\n", NVstore.getMQTTinfo().host, NVstore.getMQTTinfo().port); + MQTTclient.setServer(NVstore.getMQTTinfo().host, NVstore.getMQTTinfo().port); + DebugPort.printf("MQTT: %s/%s\r\n", NVstore.getMQTTinfo().username, NVstore.getMQTTinfo().password); + MQTTclient.setCredentials(NVstore.getMQTTinfo().username, NVstore.getMQTTinfo().password); +#endif + static bool setCallbacks = false; + // callbacks should only be added once (vector of callbacks in client!) + if(!setCallbacks) { + MQTTclient.onConnect(onMqttConnect); + MQTTclient.onMessage(onMqttMessage); + MQTTclient.onDisconnect(onMqttDisconnect); + MQTTclient.onSubscribe(onMqttSubscribe); + setCallbacks = true; } + // connection takes pplace via delayed start method + return true; } return false; } @@ -135,5 +204,44 @@ bool mqttPublishJSON(const char* str) return false; } +void kickMQTT() { + if (WiFi.isConnected()) { + if(NVstore.getMQTTinfo().enabled) { +#ifdef USE_RTOS_MQTTTIMER + xTimerStart(mqttReconnectTimer, 0); +#else + mqttReconnect = millis() + 5000; +#endif + } + } +} + +void doMQTT() +{ + // most MQTT is managed via callbacks!!! + if(NVstore.getMQTTinfo().enabled) { +#ifndef USE_RTOS_MQTTTIMER + if(mqttReconnect) { + long tDelta = millis() - mqttReconnect; + if(tDelta > 0) { + mqttReconnect = 0; + connectToMqtt(); + } + } +#endif + +#ifdef USE_RTOS_MQTTTIMER + if (!MQTTclient.connected() && WiFi.isConnected() && !xTimerIsTimerActive(mqttReconnectTimer)) { + xTimerStart(mqttReconnectTimer, 0); + } +#else + if (!MQTTclient.connected() && WiFi.isConnected() && mqttReconnect==0) { + mqttReconnect = millis() + 5000; + } +#endif + + } + +} #endif \ No newline at end of file diff --git a/src/WiFi/ABmqtt.h b/src/WiFi/ABmqtt.h index a06a73a..9ecfafc 100644 --- a/src/WiFi/ABmqtt.h +++ b/src/WiFi/ABmqtt.h @@ -5,7 +5,11 @@ bool mqttInit(); +void doMQTT(); bool mqttPublishJSON(const char* str); +void connectToMqtt(); +void kickMQTT(); + #endif