MQTT can now accept any cmd topic that matches the JSON names

This commit is contained in:
Ray Jones 2019-09-20 23:00:27 +10:00
parent 095797f8b0
commit bf537d1ac5
5 changed files with 398 additions and 11 deletions

View file

@ -846,6 +846,7 @@ void loop()
pHourMeter->monitor(HeaterFrame2);
}
updateJSONclients(bReportJSONData);
updateMQTT();
CommState.set(CommStates::Idle);
NVstore.doSave(); // now is a good time to store to the NV storage, well away from any blue wire activity
break;

View file

@ -37,11 +37,13 @@
#include <string.h>
#include "HourMeter.h"
extern CModerator MQTTmoderator;
char defaultJSONstr[64];
CModerator JSONmoderator;
CTimerModerator TimerModerator;
int timerConflict = 0;
CModerator MQTTmoderator;
CModerator MQTTJSONmoderator;
CModerator IPmoderator;
CModerator GPIOmoderator;
CModerator SysModerator;
@ -57,6 +59,7 @@ bool makeJSONStringGPIO( CModerator& moderator, char* opStr, int len);
bool makeJSONStringSysInfo(CModerator& moderator, char* opStr, int len);
bool makeJSONStringMQTT(CModerator& moderator, char* opStr, int len);
bool makeJSONStringIP(CModerator& moderator, char* opStr, int len);
void DecodeCmd(const char* cmd, String& payload);
void interpretJsonCommand(char* pLine)
{
@ -76,6 +79,10 @@ void interpretJsonCommand(char* pLine)
JsonObject::iterator it;
for(it = obj.begin(); it != obj.end(); ++it) {
String payload(it->value.as<const char*>());
DecodeCmd(it->key, payload);
/*
if(strcmp("TempDesired", it->key) == 0) {
if( !reqDemand(it->value.as<uint8_t>(), false) ) { // this request is blocked if OEM controller active
JSONmoderator.reset("TempDesired");
@ -172,6 +179,7 @@ void interpretJsonCommand(char* pLine)
}
else if(strcmp("Refresh", it->key) == 0) {
resetJSONmoderator();
refreshMQTT();
}
else if(strcmp("SystemVoltage", it->key) == 0) {
setSystemVoltage(it->value.as<float>());
@ -219,7 +227,7 @@ void interpretJsonCommand(char* pLine)
}
// MQTT parameters
else if(strcmp("MQuery", it->key) == 0) {
MQTTmoderator.reset(); // force MQTT params to be sent
MQTTJSONmoderator.reset(); // force MQTT params to be sent
}
else if(strcmp("MEn", it->key) == 0) {
sMQTTparams info = NVstore.getMQTTinfo();
@ -325,7 +333,7 @@ void interpretJsonCommand(char* pLine)
ht.lowVolts = uint8_t(fCal * 10);
NVstore.setHeaterTuning(ht);
}
}
}*/
}
}
@ -620,7 +628,7 @@ void updateJSONclients(bool report)
// report MQTT params
{
if(makeJSONStringMQTT(MQTTmoderator, jsonStr, sizeof(jsonStr))) {
if(makeJSONStringMQTT(MQTTJSONmoderator, jsonStr, sizeof(jsonStr))) {
if (report) {
DebugPort.printf("JSON send: %s\r\n", jsonStr);
}
@ -693,7 +701,7 @@ void resetJSONmoderator()
void initMQTTJSONmoderator()
{
char jsonStr[800];
makeJSONStringMQTT(MQTTmoderator, jsonStr, sizeof(jsonStr));
makeJSONStringMQTT(MQTTJSONmoderator, jsonStr, sizeof(jsonStr));
}
void initIPJSONmoderator()
@ -741,3 +749,266 @@ void Expand(std::string& str)
}
}
void DecodeCmd(const char* cmd, String& payload)
{
if(strcmp("TempDesired", cmd) == 0) {
if( !reqDemand(payload.toInt(), false) ) { // this request is blocked if OEM controller active
JSONmoderator.reset("TempDesired");
}
}
else if(strcmp("Run", cmd) == 0) {
refreshMQTT();
if(payload == "1") {
requestOn();
}
else if(payload == "0") {
requestOff();
}
}
else if(strcmp("RunState", cmd) == 0) {
if(payload.toInt()) {
requestOn();
}
else {
requestOff();
}
}
else if(strcmp("PumpMin", cmd) == 0) {
setPumpMin(payload.toFloat());
}
else if(strcmp("PumpMax", cmd) == 0) {
setPumpMax(payload.toFloat());
}
else if(strcmp("FanMin", cmd) == 0) {
setFanMin(payload.toInt());
}
else if(strcmp("FanMax", cmd) == 0) {
setFanMax(payload.toInt());
}
else if(strcmp("CyclicTemp", cmd) == 0) {
setDemandDegC(payload.toInt()); // directly set demandDegC
}
else if((strcmp("CyclicOff", cmd) == 0) || (strcmp("ThermostatOvertemp", cmd) == 0)) {
sUserSettings us = NVstore.getUserSettings();
us.cyclic.Stop = payload.toInt();
if(INBOUNDS(us.cyclic.Stop, 0, 10)) {
if(us.cyclic.Stop > 1)
us.cyclic.Stop--; // internal uses a 1 offset
NVstore.setUserSettings(us);
}
}
else if((strcmp("CyclicOn", cmd) == 0) || (strcmp("ThermostatUndertemp", cmd) == 0)) {
sUserSettings us = NVstore.getUserSettings();
us.cyclic.Start = payload.toInt();
if(INBOUNDS(us.cyclic.Start, -20, 0))
NVstore.setUserSettings(us);
}
else if(strcmp("ThermostatMethod", cmd) == 0) {
sUserSettings settings = NVstore.getUserSettings();
settings.ThermostatMethod = payload.toInt();
if(INBOUNDS(settings.ThermostatMethod, 0, 3))
NVstore.setUserSettings(settings);
}
else if(strcmp("ThermostatWindow", cmd) == 0) {
sUserSettings settings = NVstore.getUserSettings();
settings.ThermostatWindow = payload.toFloat();
if(INBOUNDS(settings.ThermostatWindow, 0.2f, 10.f))
NVstore.setUserSettings(settings);
}
else if(strcmp("Thermostat", cmd) == 0) {
if(!setThermostatMode(payload.toInt())) { // this request is blocked if OEM controller active
JSONmoderator.reset("ThermoStat");
}
}
else if(strcmp("ExtThermoTmout", cmd) == 0) {
sUserSettings us = NVstore.getUserSettings();
us.ExtThermoTimeout = payload.toInt();
if(INBOUNDS(us.ExtThermoTimeout, 0, 3600000))
NVstore.setUserSettings(us);
}
else if(strcmp("NVsave", cmd) == 0) {
if(payload.toInt() == 8861)
saveNV();
}
else if(strcmp("Watchdog", cmd) == 0) {
doJSONwatchdog(payload.toInt());
}
else if(strcmp("DateTime", cmd) == 0) {
setDateTime(payload.c_str());
bTriggerDateTime = true;
}
else if(strcmp("Date", cmd) == 0) {
setDate(payload.c_str());
bTriggerDateTime = true;
}
else if(strcmp("Time", cmd) == 0) {
setTime(payload.c_str());
bTriggerDateTime = true;
}
else if(strcmp("Time12hr", cmd) == 0) {
sUserSettings us = NVstore.getUserSettings();
us.clock12hr = payload.toInt() ? 1 : 0;
NVstore.setUserSettings(us);
NVstore.save();
}
else if(strcmp("PumpPrime", cmd) == 0) {
reqPumpPrime(payload.toInt());
}
else if(strcmp("Refresh", cmd) == 0) {
resetJSONmoderator();
refreshMQTT();
}
else if(strcmp("SystemVoltage", cmd) == 0) {
setSystemVoltage(payload.toFloat());
}
else if(strcmp("TimerDays", cmd) == 0) {
// value encoded as "ID Days,Days"
decodeJSONTimerDays(payload.c_str());
}
else if(strcmp("TimerStart", cmd) == 0) {
// value encoded as "ID HH:MM"
decodeJSONTimerTime(0, payload.c_str());
}
else if(strcmp("TimerStop", cmd) == 0) {
// value encoded as "ID HH:MM"
decodeJSONTimerTime(1, payload.c_str());
}
else if(strcmp("TimerRepeat", cmd) == 0) {
// value encoded as "ID val"
decodeJSONTimerNumeric(0, payload.c_str());
}
else if(strcmp("TimerTemp", cmd) == 0) {
decodeJSONTimerNumeric(1, payload.c_str());
}
else if(strcmp("TimerConflict", cmd) == 0) {
validateTimer(payload.toInt());
}
// request specific timer refresh
else if((strcmp("TQuery", cmd) == 0) || (strcmp("TimerRefresh", cmd) == 0) ) {
int timerID = payload.toInt();
if(timerID)
TimerModerator.reset(timerID-1);
else
TimerModerator.reset();
}
else if(strcmp("FanSensor", cmd) == 0) {
setFanSensor(payload.toInt());
}
else if(strcmp("IQuery", cmd) == 0) {
IPmoderator.reset(); // force IP params to be sent
}
// system info
else if(strcmp("SQuery", cmd) == 0) {
SysModerator.reset(); // force MQTT params to be sent
bTriggerSysParams = true;
}
// MQTT parameters
else if(strcmp("MQuery", cmd) == 0) {
MQTTJSONmoderator.reset(); // force MQTT params to be sent
}
else if(strcmp("MEn", cmd) == 0) {
sMQTTparams info = NVstore.getMQTTinfo();
info.enabled = payload.toInt();
NVstore.setMQTTinfo(info);
}
else if(strcmp("MPort", cmd) == 0) {
sMQTTparams info = NVstore.getMQTTinfo();
info.port = payload.toInt();
NVstore.setMQTTinfo(info);
}
else if(strcmp("MHost", cmd) == 0) {
sMQTTparams info = NVstore.getMQTTinfo();
strncpy(info.host, payload.c_str(), 127);
info.host[127] = 0;
NVstore.setMQTTinfo(info);
}
else if(strcmp("MUser", cmd) == 0) {
sMQTTparams info = NVstore.getMQTTinfo();
strncpy(info.username, payload.c_str(), 31);
info.username[31] = 0;
NVstore.setMQTTinfo(info);
}
else if(strcmp("MPasswd", cmd) == 0) {
sMQTTparams info = NVstore.getMQTTinfo();
strncpy(info.password, payload.c_str(), 31);
info.password[31] = 0;
NVstore.setMQTTinfo(info);
}
else if(strcmp("MQoS", cmd) == 0) {
sMQTTparams info = NVstore.getMQTTinfo();
info.qos = payload.toInt();
if(INBOUNDS(info.qos, 0, 2)) {
NVstore.setMQTTinfo(info);
}
}
else if(strcmp("MTopic", cmd) == 0) {
sMQTTparams info = NVstore.getMQTTinfo();
strncpy(info.topic, payload.c_str(), 31);
info.topic[31] = 0;
NVstore.setMQTTinfo(info);
}
else if(strcmp("UploadSize", cmd) == 0) {
setUploadSize(payload.toInt());
}
else if(strcmp("GPout1", cmd) == 0) {
setGPIOout(0, payload.toInt() ? true : false);
}
else if(strcmp("GPout2", cmd) == 0) {
setGPIOout(1, payload.toInt() ? true : false);
}
else if(strcmp("GPin1", cmd) == 0) {
simulateGPIOin(payload.toInt() ? 0x01 : 0x00); // simulate key 1 press
}
else if(strcmp("GPin2", cmd) == 0) {
simulateGPIOin(payload.toInt() ? 0x02 : 0x00); // simulate key 2 press
}
else if(strcmp("JSONpack", cmd) == 0) {
sUserSettings us = NVstore.getUserSettings();
uint8_t packed = payload.toInt() ? 0x00 : 0x01;
us.JSON.LF = packed;
us.JSON.padding = packed;
us.JSON.singleElement = packed;
NVstore.setUserSettings(us);
NVstore.save();
resetJSONmoderator();
}
else if(strcmp("TempMode", cmd) == 0) {
sUserSettings us = NVstore.getUserSettings();
us.degF = payload.toInt() ? 0x01 : 0x00;
NVstore.setUserSettings(us);
NVstore.save();
}
else if(strcmp("PumpCount", cmd) == 0) { // reset fuel gauge
int Count = payload.toInt();
if(Count == 0) {
resetFuelGauge();
}
}
else if(strcmp("PumpCal", cmd) == 0) {
sHeaterTuning ht = NVstore.getHeaterTuning();
ht.pumpCal = payload.toFloat();
if(INBOUNDS(ht.pumpCal, 0.001, 1)) {
NVstore.setHeaterTuning(ht);
}
}
else if(strcmp("TempOffset", cmd) == 0) {
sHeaterTuning ht = NVstore.getHeaterTuning();
ht.tempOfs = payload.toFloat();
if(INBOUNDS(ht.tempOfs, -10.0, +10.0)) {
NVstore.setHeaterTuning(ht);
}
}
else if(strcmp("LowVoltCutout", cmd) == 0) {
float fCal = payload.toFloat();
bool bOK = false;
if(NVstore.getHeaterTuning().sysVoltage == 120)
bOK |= (fCal == 0) || INBOUNDS(fCal, 10.0, 12.5);
else
bOK |= (fCal == 0) || INBOUNDS(fCal, 20.0, 25.0);
if(bOK) {
sHeaterTuning ht = NVstore.getHeaterTuning();
ht.lowVolts = uint8_t(fCal * 10);
NVstore.setHeaterTuning(ht);
}
}
}

View file

@ -134,6 +134,24 @@ public:
bool addJson(const char* name, const char* value, JsonObject& root) {
return szModerator.addJson(name, value, root);
};
bool shouldSend(const char* name, int value) {
return iModerator.shouldSend(name, value);
};
bool shouldSend(const char* name, uint32_t value) {
return u32Moderator.shouldSend(name, value);
};
bool shouldSend(const char* name, unsigned long value) {
return u32Moderator.shouldSend(name, value);
};
bool shouldSend(const char* name, float value) {
return fModerator.shouldSend(name, value);
};
bool shouldSend(const char* name, uint8_t value) {
return ucModerator.shouldSend(name, value);
};
bool shouldSend(const char* name, const char* value) {
return szModerator.shouldSend(name, value);
};
// force changes on all held values
void reset() {
iModerator.reset();

View file

@ -94,5 +94,7 @@ void setAPpassword(const char* name);
extern void ShowOTAScreen(int percent=0, eOTAmodes updateType=eOTAnormal);
extern void updateMQTT();
extern void refreshMQTT();
#endif

View file

@ -31,15 +31,22 @@
#include "BTCWebServer.h"
#include "../Utility/DebugPort.h"
#include "../Utility/NVStorage.h"
#include "../Utility/Moderator.h"
#include "../Protocol/Protocol.h"
extern void DecodeCmd(const char* cmd, String& payload);
#define USE_RTOS_MQTTTIMER
//#define USE_LOCAL_MQTTSTRINGS
//#define MQTT_DBG_RAWBYTES
//IPAddress testMQTTserver(5, 196, 95, 208); // test.mosquito.org
//IPAddress testMQTTserver(18, 194, 98, 249); // broker.hivemq.com
AsyncMqttClient MQTTclient;
char topicnameJSONin[128];
char topicnameCmd[128];
CModerator MQTTmoderator;
#ifdef USE_LOCAL_MQTTSTRINGS
char mqttHost[128];
@ -84,9 +91,12 @@ void onMqttConnect(bool sessionPresent)
// create the topicname we use to accept incoming JSON
DebugPort.printf("MQTT: base topic name \"%s\"\r\n", NVstore.getMQTTinfo().topic);
sprintf(topicnameJSONin, "%s/JSONin", NVstore.getMQTTinfo().topic);
sprintf(topicnameCmd, "%s/cmd/#", NVstore.getMQTTinfo().topic);
// subscribe to that topic
DebugPort.printf("MQTT: Subscribing to \"%s\"\r\n", topicnameJSONin);
MQTTclient.subscribe(topicnameJSONin, NVstore.getMQTTinfo().qos);
MQTTclient.subscribe(topicnameCmd, NVstore.getMQTTinfo().qos);
MQTTclient.subscribe(statusTopic, NVstore.getMQTTinfo().qos);
// spit out an "I'm here" message
MQTTclient.publish(statusTopic, NVstore.getMQTTinfo().qos, true, "online");
@ -113,15 +123,27 @@ void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties
DebugPort.println();
#endif
// string may not neccesarily be null terminated, make sure it is
char tidyString[1024];
int maxlen = sizeof(tidyString)-1;
char szPayload[1024];
int maxlen = sizeof(szPayload)-1;
int lenLimit = len < maxlen ? len : maxlen;
strncpy(tidyString, (char*)payload, lenLimit);
tidyString[lenLimit] = 0;
DebugPort.println(tidyString);
strncpy(szPayload, (char*)payload, lenLimit);
szPayload[lenLimit] = 0;
DebugPort.println(szPayload);
if(strcmp(topic, topicnameJSONin) == 0) { // check if incoming topic is our JSONin topic
interpretJsonCommand(tidyString);
interpretJsonCommand(szPayload);
}
else if(strncmp(topic, topicnameCmd, strlen(topicnameCmd)-1) == 0) { // check if incoming topic is our cmd topic
const char* cmdTopic = &topic[strlen(topicnameCmd)-1];
DebugPort.printf("%s %s %s\r\n", topicnameCmd, cmdTopic, szPayload);
String cmdPayload(szPayload);
DecodeCmd(cmdTopic, cmdPayload);
}
else if(strcmp(topic, statusTopic) == 0) { // check if incoming topic is our general status
if(strcmp(szPayload, "1") == 0) {
MQTTmoderator.reset();
MQTTclient.publish(statusTopic, NVstore.getMQTTinfo().qos, true, "online");
}
}
}
@ -269,4 +291,77 @@ bool isMQTTconnected() {
return MQTTclient.connected();
}
void checkTopic(const char* name, int value)
{
if(MQTTclient.connected()) {
if(MQTTmoderator.shouldSend(name, value)) {
const sMQTTparams params = NVstore.getMQTTinfo();
char topic[128];
sprintf(topic, "%s/sts/%s", params.topic, name);
char payload[128];
sprintf(payload, "%d", value);
MQTTclient.publish(topic, params.qos, false, payload);
}
}
}
void checkTopic(const char* name, float value)
{
if(MQTTclient.connected()) {
if(MQTTmoderator.shouldSend(name, value)) {
const sMQTTparams params = NVstore.getMQTTinfo();
char topic[128];
sprintf(topic, "%s/sts/%s", params.topic, name);
char payload[128];
sprintf(payload, "%.1f", value);
MQTTclient.publish(topic, params.qos, false, payload);
}
}
}
void checkTopic(const char* name, const char* payload)
{
if(MQTTclient.connected()) {
if(MQTTmoderator.shouldSend(name, payload)) {
const sMQTTparams params = NVstore.getMQTTinfo();
char topic[128];
sprintf(topic, "%s/sts/%s", params.topic, name);
MQTTclient.publish(topic, params.qos, false, payload);
}
}
}
void updateMQTT()
{
checkTopic("RunState", getHeaterInfo().getRunStateEx());
// checkTopic("Run", getHeaterInfo().getRunStateEx() ? "{\"RunState\":1}" : "{\"RunState\":0}");
// checkTopic("RunSts", getHeaterInfo().getRunStateEx() ? "1" : "0");
checkTopic("Run", getHeaterInfo().getRunStateEx() ? "1" : "0");
checkTopic("RunString", getHeaterInfo().getRunStateStr());
float tidyTemp = getTemperatureSensor();
tidyTemp = int(tidyTemp * 10 + 0.5) * 0.1f; // round to 0.1 resolution
checkTopic("TempCurrent", tidyTemp);
checkTopic("TempDesired", getTemperatureDesired());
checkTopic("TempBody", getHeaterInfo().getTemperature_HeatExchg());
checkTopic("ErrorState", getHeaterInfo().getErrState());
checkTopic("ErrorString", getHeaterInfo().getErrStateStrEx()); // verbose it up!
checkTopic("Thermostat", getThermostatModeActive());
checkTopic("PumpFixed", getHeaterInfo().getPump_Fixed() );
checkTopic("PumpActual", getHeaterInfo().getPump_Actual());
checkTopic("FanRPM", getFanSpeed());
checkTopic("InputVoltage", getBatteryVoltage(false));
checkTopic("GlowVoltage", getGlowVolts());
checkTopic("GlowCurrent", getGlowCurrent());
sGPIO info;
getGPIOinfo(info);
checkTopic("GPanlg", info.algVal * 100 / 4096);
}
void refreshMQTT()
{
MQTTmoderator.reset();
}
#endif