From bf537d1ac550fa5f3aa6b32fb054c968ab78108f Mon Sep 17 00:00:00 2001 From: Ray Jones Date: Fri, 20 Sep 2019 23:00:27 +1000 Subject: [PATCH] MQTT can now accept any cmd topic that matches the JSON names --- src/Afterburner.cpp | 1 + src/Utility/BTC_JSON.cpp | 281 ++++++++++++++++++++++++++++++++++++++- src/Utility/Moderator.h | 18 +++ src/Utility/helpers.h | 2 + src/WiFi/ABMqtt.cpp | 107 ++++++++++++++- 5 files changed, 398 insertions(+), 11 deletions(-) diff --git a/src/Afterburner.cpp b/src/Afterburner.cpp index 7114f0a..7327e55 100644 --- a/src/Afterburner.cpp +++ b/src/Afterburner.cpp @@ -846,6 +846,7 @@ void loop() pHourMeter->monitor(HeaterFrame2); } updateJSONclients(bReportJSONData); + updateMQTT(); CommState.set(CommStates::Idle); NVstore.doSave(); // now is a good time to store to the NV storage, well away from any blue wire activity break; diff --git a/src/Utility/BTC_JSON.cpp b/src/Utility/BTC_JSON.cpp index 8fe66f6..6c1c76c 100644 --- a/src/Utility/BTC_JSON.cpp +++ b/src/Utility/BTC_JSON.cpp @@ -37,11 +37,13 @@ #include #include "HourMeter.h" +extern CModerator MQTTmoderator; + char defaultJSONstr[64]; CModerator JSONmoderator; CTimerModerator TimerModerator; int timerConflict = 0; -CModerator MQTTmoderator; +CModerator MQTTJSONmoderator; CModerator IPmoderator; CModerator GPIOmoderator; CModerator SysModerator; @@ -57,6 +59,7 @@ bool makeJSONStringGPIO( CModerator& moderator, char* opStr, int len); bool makeJSONStringSysInfo(CModerator& moderator, char* opStr, int len); bool makeJSONStringMQTT(CModerator& moderator, char* opStr, int len); bool makeJSONStringIP(CModerator& moderator, char* opStr, int len); +void DecodeCmd(const char* cmd, String& payload); void interpretJsonCommand(char* pLine) { @@ -76,6 +79,10 @@ void interpretJsonCommand(char* pLine) JsonObject::iterator it; for(it = obj.begin(); it != obj.end(); ++it) { + String payload(it->value.as()); + DecodeCmd(it->key, payload); +/* + if(strcmp("TempDesired", it->key) == 0) { if( !reqDemand(it->value.as(), false) ) { // this request is blocked if OEM controller active JSONmoderator.reset("TempDesired"); @@ -172,6 +179,7 @@ void interpretJsonCommand(char* pLine) } else if(strcmp("Refresh", it->key) == 0) { resetJSONmoderator(); + refreshMQTT(); } else if(strcmp("SystemVoltage", it->key) == 0) { setSystemVoltage(it->value.as()); @@ -219,7 +227,7 @@ void interpretJsonCommand(char* pLine) } // MQTT parameters else if(strcmp("MQuery", it->key) == 0) { - MQTTmoderator.reset(); // force MQTT params to be sent + MQTTJSONmoderator.reset(); // force MQTT params to be sent } else if(strcmp("MEn", it->key) == 0) { sMQTTparams info = NVstore.getMQTTinfo(); @@ -325,7 +333,7 @@ void interpretJsonCommand(char* pLine) ht.lowVolts = uint8_t(fCal * 10); NVstore.setHeaterTuning(ht); } - } + }*/ } } @@ -620,7 +628,7 @@ void updateJSONclients(bool report) // report MQTT params { - if(makeJSONStringMQTT(MQTTmoderator, jsonStr, sizeof(jsonStr))) { + if(makeJSONStringMQTT(MQTTJSONmoderator, jsonStr, sizeof(jsonStr))) { if (report) { DebugPort.printf("JSON send: %s\r\n", jsonStr); } @@ -693,7 +701,7 @@ void resetJSONmoderator() void initMQTTJSONmoderator() { char jsonStr[800]; - makeJSONStringMQTT(MQTTmoderator, jsonStr, sizeof(jsonStr)); + makeJSONStringMQTT(MQTTJSONmoderator, jsonStr, sizeof(jsonStr)); } void initIPJSONmoderator() @@ -741,3 +749,266 @@ void Expand(std::string& str) } } +void DecodeCmd(const char* cmd, String& payload) +{ + if(strcmp("TempDesired", cmd) == 0) { + if( !reqDemand(payload.toInt(), false) ) { // this request is blocked if OEM controller active + JSONmoderator.reset("TempDesired"); + } + } + else if(strcmp("Run", cmd) == 0) { + refreshMQTT(); + if(payload == "1") { + requestOn(); + } + else if(payload == "0") { + requestOff(); + } + } + else if(strcmp("RunState", cmd) == 0) { + if(payload.toInt()) { + requestOn(); + } + else { + requestOff(); + } + } + else if(strcmp("PumpMin", cmd) == 0) { + setPumpMin(payload.toFloat()); + } + else if(strcmp("PumpMax", cmd) == 0) { + setPumpMax(payload.toFloat()); + } + else if(strcmp("FanMin", cmd) == 0) { + setFanMin(payload.toInt()); + } + else if(strcmp("FanMax", cmd) == 0) { + setFanMax(payload.toInt()); + } + else if(strcmp("CyclicTemp", cmd) == 0) { + setDemandDegC(payload.toInt()); // directly set demandDegC + } + else if((strcmp("CyclicOff", cmd) == 0) || (strcmp("ThermostatOvertemp", cmd) == 0)) { + sUserSettings us = NVstore.getUserSettings(); + us.cyclic.Stop = payload.toInt(); + if(INBOUNDS(us.cyclic.Stop, 0, 10)) { + if(us.cyclic.Stop > 1) + us.cyclic.Stop--; // internal uses a 1 offset + NVstore.setUserSettings(us); + } + } + else if((strcmp("CyclicOn", cmd) == 0) || (strcmp("ThermostatUndertemp", cmd) == 0)) { + sUserSettings us = NVstore.getUserSettings(); + us.cyclic.Start = payload.toInt(); + if(INBOUNDS(us.cyclic.Start, -20, 0)) + NVstore.setUserSettings(us); + } + else if(strcmp("ThermostatMethod", cmd) == 0) { + sUserSettings settings = NVstore.getUserSettings(); + settings.ThermostatMethod = payload.toInt(); + if(INBOUNDS(settings.ThermostatMethod, 0, 3)) + NVstore.setUserSettings(settings); + } + else if(strcmp("ThermostatWindow", cmd) == 0) { + sUserSettings settings = NVstore.getUserSettings(); + settings.ThermostatWindow = payload.toFloat(); + if(INBOUNDS(settings.ThermostatWindow, 0.2f, 10.f)) + NVstore.setUserSettings(settings); + } + else if(strcmp("Thermostat", cmd) == 0) { + if(!setThermostatMode(payload.toInt())) { // this request is blocked if OEM controller active + JSONmoderator.reset("ThermoStat"); + } + } + else if(strcmp("ExtThermoTmout", cmd) == 0) { + sUserSettings us = NVstore.getUserSettings(); + us.ExtThermoTimeout = payload.toInt(); + if(INBOUNDS(us.ExtThermoTimeout, 0, 3600000)) + NVstore.setUserSettings(us); + } + else if(strcmp("NVsave", cmd) == 0) { + if(payload.toInt() == 8861) + saveNV(); + } + else if(strcmp("Watchdog", cmd) == 0) { + doJSONwatchdog(payload.toInt()); + } + else if(strcmp("DateTime", cmd) == 0) { + setDateTime(payload.c_str()); + bTriggerDateTime = true; + } + else if(strcmp("Date", cmd) == 0) { + setDate(payload.c_str()); + bTriggerDateTime = true; + } + else if(strcmp("Time", cmd) == 0) { + setTime(payload.c_str()); + bTriggerDateTime = true; + } + else if(strcmp("Time12hr", cmd) == 0) { + sUserSettings us = NVstore.getUserSettings(); + us.clock12hr = payload.toInt() ? 1 : 0; + NVstore.setUserSettings(us); + NVstore.save(); + } + else if(strcmp("PumpPrime", cmd) == 0) { + reqPumpPrime(payload.toInt()); + } + else if(strcmp("Refresh", cmd) == 0) { + resetJSONmoderator(); + refreshMQTT(); + } + else if(strcmp("SystemVoltage", cmd) == 0) { + setSystemVoltage(payload.toFloat()); + } + else if(strcmp("TimerDays", cmd) == 0) { + // value encoded as "ID Days,Days" + decodeJSONTimerDays(payload.c_str()); + } + else if(strcmp("TimerStart", cmd) == 0) { + // value encoded as "ID HH:MM" + decodeJSONTimerTime(0, payload.c_str()); + } + else if(strcmp("TimerStop", cmd) == 0) { + // value encoded as "ID HH:MM" + decodeJSONTimerTime(1, payload.c_str()); + } + else if(strcmp("TimerRepeat", cmd) == 0) { + // value encoded as "ID val" + decodeJSONTimerNumeric(0, payload.c_str()); + } + else if(strcmp("TimerTemp", cmd) == 0) { + decodeJSONTimerNumeric(1, payload.c_str()); + } + else if(strcmp("TimerConflict", cmd) == 0) { + validateTimer(payload.toInt()); + } + // request specific timer refresh + else if((strcmp("TQuery", cmd) == 0) || (strcmp("TimerRefresh", cmd) == 0) ) { + int timerID = payload.toInt(); + if(timerID) + TimerModerator.reset(timerID-1); + else + TimerModerator.reset(); + } + else if(strcmp("FanSensor", cmd) == 0) { + setFanSensor(payload.toInt()); + } + else if(strcmp("IQuery", cmd) == 0) { + IPmoderator.reset(); // force IP params to be sent + } + // system info + else if(strcmp("SQuery", cmd) == 0) { + SysModerator.reset(); // force MQTT params to be sent + bTriggerSysParams = true; + } + // MQTT parameters + else if(strcmp("MQuery", cmd) == 0) { + MQTTJSONmoderator.reset(); // force MQTT params to be sent + } + else if(strcmp("MEn", cmd) == 0) { + sMQTTparams info = NVstore.getMQTTinfo(); + info.enabled = payload.toInt(); + NVstore.setMQTTinfo(info); + } + else if(strcmp("MPort", cmd) == 0) { + sMQTTparams info = NVstore.getMQTTinfo(); + info.port = payload.toInt(); + NVstore.setMQTTinfo(info); + } + else if(strcmp("MHost", cmd) == 0) { + sMQTTparams info = NVstore.getMQTTinfo(); + strncpy(info.host, payload.c_str(), 127); + info.host[127] = 0; + NVstore.setMQTTinfo(info); + } + else if(strcmp("MUser", cmd) == 0) { + sMQTTparams info = NVstore.getMQTTinfo(); + strncpy(info.username, payload.c_str(), 31); + info.username[31] = 0; + NVstore.setMQTTinfo(info); + } + else if(strcmp("MPasswd", cmd) == 0) { + sMQTTparams info = NVstore.getMQTTinfo(); + strncpy(info.password, payload.c_str(), 31); + info.password[31] = 0; + NVstore.setMQTTinfo(info); + } + else if(strcmp("MQoS", cmd) == 0) { + sMQTTparams info = NVstore.getMQTTinfo(); + info.qos = payload.toInt(); + if(INBOUNDS(info.qos, 0, 2)) { + NVstore.setMQTTinfo(info); + } + } + else if(strcmp("MTopic", cmd) == 0) { + sMQTTparams info = NVstore.getMQTTinfo(); + strncpy(info.topic, payload.c_str(), 31); + info.topic[31] = 0; + NVstore.setMQTTinfo(info); + } + else if(strcmp("UploadSize", cmd) == 0) { + setUploadSize(payload.toInt()); + } + else if(strcmp("GPout1", cmd) == 0) { + setGPIOout(0, payload.toInt() ? true : false); + } + else if(strcmp("GPout2", cmd) == 0) { + setGPIOout(1, payload.toInt() ? true : false); + } + else if(strcmp("GPin1", cmd) == 0) { + simulateGPIOin(payload.toInt() ? 0x01 : 0x00); // simulate key 1 press + } + else if(strcmp("GPin2", cmd) == 0) { + simulateGPIOin(payload.toInt() ? 0x02 : 0x00); // simulate key 2 press + } + else if(strcmp("JSONpack", cmd) == 0) { + sUserSettings us = NVstore.getUserSettings(); + uint8_t packed = payload.toInt() ? 0x00 : 0x01; + us.JSON.LF = packed; + us.JSON.padding = packed; + us.JSON.singleElement = packed; + NVstore.setUserSettings(us); + NVstore.save(); + resetJSONmoderator(); + } + else if(strcmp("TempMode", cmd) == 0) { + sUserSettings us = NVstore.getUserSettings(); + us.degF = payload.toInt() ? 0x01 : 0x00; + NVstore.setUserSettings(us); + NVstore.save(); + } + else if(strcmp("PumpCount", cmd) == 0) { // reset fuel gauge + int Count = payload.toInt(); + if(Count == 0) { + resetFuelGauge(); + } + } + else if(strcmp("PumpCal", cmd) == 0) { + sHeaterTuning ht = NVstore.getHeaterTuning(); + ht.pumpCal = payload.toFloat(); + if(INBOUNDS(ht.pumpCal, 0.001, 1)) { + NVstore.setHeaterTuning(ht); + } + } + else if(strcmp("TempOffset", cmd) == 0) { + sHeaterTuning ht = NVstore.getHeaterTuning(); + ht.tempOfs = payload.toFloat(); + if(INBOUNDS(ht.tempOfs, -10.0, +10.0)) { + NVstore.setHeaterTuning(ht); + } + } + else if(strcmp("LowVoltCutout", cmd) == 0) { + float fCal = payload.toFloat(); + bool bOK = false; + if(NVstore.getHeaterTuning().sysVoltage == 120) + bOK |= (fCal == 0) || INBOUNDS(fCal, 10.0, 12.5); + else + bOK |= (fCal == 0) || INBOUNDS(fCal, 20.0, 25.0); + if(bOK) { + sHeaterTuning ht = NVstore.getHeaterTuning(); + ht.lowVolts = uint8_t(fCal * 10); + NVstore.setHeaterTuning(ht); + } + } +} \ No newline at end of file diff --git a/src/Utility/Moderator.h b/src/Utility/Moderator.h index 9d09e7f..ee24e37 100644 --- a/src/Utility/Moderator.h +++ b/src/Utility/Moderator.h @@ -134,6 +134,24 @@ public: bool addJson(const char* name, const char* value, JsonObject& root) { return szModerator.addJson(name, value, root); }; + bool shouldSend(const char* name, int value) { + return iModerator.shouldSend(name, value); + }; + bool shouldSend(const char* name, uint32_t value) { + return u32Moderator.shouldSend(name, value); + }; + bool shouldSend(const char* name, unsigned long value) { + return u32Moderator.shouldSend(name, value); + }; + bool shouldSend(const char* name, float value) { + return fModerator.shouldSend(name, value); + }; + bool shouldSend(const char* name, uint8_t value) { + return ucModerator.shouldSend(name, value); + }; + bool shouldSend(const char* name, const char* value) { + return szModerator.shouldSend(name, value); + }; // force changes on all held values void reset() { iModerator.reset(); diff --git a/src/Utility/helpers.h b/src/Utility/helpers.h index bc1728b..f414273 100644 --- a/src/Utility/helpers.h +++ b/src/Utility/helpers.h @@ -94,5 +94,7 @@ void setAPpassword(const char* name); extern void ShowOTAScreen(int percent=0, eOTAmodes updateType=eOTAnormal); +extern void updateMQTT(); +extern void refreshMQTT(); #endif \ No newline at end of file diff --git a/src/WiFi/ABMqtt.cpp b/src/WiFi/ABMqtt.cpp index 85ea99f..c6e3cb2 100644 --- a/src/WiFi/ABMqtt.cpp +++ b/src/WiFi/ABMqtt.cpp @@ -31,15 +31,22 @@ #include "BTCWebServer.h" #include "../Utility/DebugPort.h" #include "../Utility/NVStorage.h" +#include "../Utility/Moderator.h" +#include "../Protocol/Protocol.h" + +extern void DecodeCmd(const char* cmd, String& payload); #define USE_RTOS_MQTTTIMER //#define USE_LOCAL_MQTTSTRINGS +//#define MQTT_DBG_RAWBYTES //IPAddress testMQTTserver(5, 196, 95, 208); // test.mosquito.org //IPAddress testMQTTserver(18, 194, 98, 249); // broker.hivemq.com AsyncMqttClient MQTTclient; char topicnameJSONin[128]; +char topicnameCmd[128]; +CModerator MQTTmoderator; #ifdef USE_LOCAL_MQTTSTRINGS char mqttHost[128]; @@ -84,9 +91,12 @@ void onMqttConnect(bool sessionPresent) // create the topicname we use to accept incoming JSON DebugPort.printf("MQTT: base topic name \"%s\"\r\n", NVstore.getMQTTinfo().topic); sprintf(topicnameJSONin, "%s/JSONin", NVstore.getMQTTinfo().topic); + sprintf(topicnameCmd, "%s/cmd/#", NVstore.getMQTTinfo().topic); // subscribe to that topic DebugPort.printf("MQTT: Subscribing to \"%s\"\r\n", topicnameJSONin); MQTTclient.subscribe(topicnameJSONin, NVstore.getMQTTinfo().qos); + MQTTclient.subscribe(topicnameCmd, NVstore.getMQTTinfo().qos); + MQTTclient.subscribe(statusTopic, NVstore.getMQTTinfo().qos); // spit out an "I'm here" message MQTTclient.publish(statusTopic, NVstore.getMQTTinfo().qos, true, "online"); @@ -113,15 +123,27 @@ void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties DebugPort.println(); #endif // string may not neccesarily be null terminated, make sure it is - char tidyString[1024]; - int maxlen = sizeof(tidyString)-1; + char szPayload[1024]; + int maxlen = sizeof(szPayload)-1; int lenLimit = len < maxlen ? len : maxlen; - strncpy(tidyString, (char*)payload, lenLimit); - tidyString[lenLimit] = 0; - DebugPort.println(tidyString); + strncpy(szPayload, (char*)payload, lenLimit); + szPayload[lenLimit] = 0; + DebugPort.println(szPayload); if(strcmp(topic, topicnameJSONin) == 0) { // check if incoming topic is our JSONin topic - interpretJsonCommand(tidyString); + interpretJsonCommand(szPayload); + } + else if(strncmp(topic, topicnameCmd, strlen(topicnameCmd)-1) == 0) { // check if incoming topic is our cmd topic + const char* cmdTopic = &topic[strlen(topicnameCmd)-1]; + DebugPort.printf("%s %s %s\r\n", topicnameCmd, cmdTopic, szPayload); + String cmdPayload(szPayload); + DecodeCmd(cmdTopic, cmdPayload); + } + else if(strcmp(topic, statusTopic) == 0) { // check if incoming topic is our general status + if(strcmp(szPayload, "1") == 0) { + MQTTmoderator.reset(); + MQTTclient.publish(statusTopic, NVstore.getMQTTinfo().qos, true, "online"); + } } } @@ -269,4 +291,77 @@ bool isMQTTconnected() { return MQTTclient.connected(); } + +void checkTopic(const char* name, int value) +{ + if(MQTTclient.connected()) { + if(MQTTmoderator.shouldSend(name, value)) { + const sMQTTparams params = NVstore.getMQTTinfo(); + char topic[128]; + sprintf(topic, "%s/sts/%s", params.topic, name); + char payload[128]; + sprintf(payload, "%d", value); + MQTTclient.publish(topic, params.qos, false, payload); + } + } +} + +void checkTopic(const char* name, float value) +{ + if(MQTTclient.connected()) { + if(MQTTmoderator.shouldSend(name, value)) { + const sMQTTparams params = NVstore.getMQTTinfo(); + char topic[128]; + sprintf(topic, "%s/sts/%s", params.topic, name); + char payload[128]; + sprintf(payload, "%.1f", value); + MQTTclient.publish(topic, params.qos, false, payload); + } + } +} + +void checkTopic(const char* name, const char* payload) +{ + if(MQTTclient.connected()) { + if(MQTTmoderator.shouldSend(name, payload)) { + const sMQTTparams params = NVstore.getMQTTinfo(); + char topic[128]; + sprintf(topic, "%s/sts/%s", params.topic, name); + MQTTclient.publish(topic, params.qos, false, payload); + } + } +} + +void updateMQTT() +{ + checkTopic("RunState", getHeaterInfo().getRunStateEx()); +// checkTopic("Run", getHeaterInfo().getRunStateEx() ? "{\"RunState\":1}" : "{\"RunState\":0}"); +// checkTopic("RunSts", getHeaterInfo().getRunStateEx() ? "1" : "0"); + checkTopic("Run", getHeaterInfo().getRunStateEx() ? "1" : "0"); + checkTopic("RunString", getHeaterInfo().getRunStateStr()); + float tidyTemp = getTemperatureSensor(); + tidyTemp = int(tidyTemp * 10 + 0.5) * 0.1f; // round to 0.1 resolution + checkTopic("TempCurrent", tidyTemp); + checkTopic("TempDesired", getTemperatureDesired()); + checkTopic("TempBody", getHeaterInfo().getTemperature_HeatExchg()); + checkTopic("ErrorState", getHeaterInfo().getErrState()); + checkTopic("ErrorString", getHeaterInfo().getErrStateStrEx()); // verbose it up! + checkTopic("Thermostat", getThermostatModeActive()); + checkTopic("PumpFixed", getHeaterInfo().getPump_Fixed() ); + checkTopic("PumpActual", getHeaterInfo().getPump_Actual()); + checkTopic("FanRPM", getFanSpeed()); + checkTopic("InputVoltage", getBatteryVoltage(false)); + checkTopic("GlowVoltage", getGlowVolts()); + checkTopic("GlowCurrent", getGlowCurrent()); + sGPIO info; + getGPIOinfo(info); + checkTopic("GPanlg", info.algVal * 100 / 4096); +} + +void refreshMQTT() +{ + MQTTmoderator.reset(); +} + + #endif \ No newline at end of file