mqtt_cmd.c 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. //
  2. // MQTT Commands coming in from the attache microcontrollver over the serial port
  3. //
  4. #include <esp8266.h>
  5. #include "mqtt.h"
  6. #include "mqtt_client.h"
  7. #include "mqtt_cmd.h"
  8. #ifdef MQTTCMD_DBG
  9. #define DBG(format, ...) do { os_printf(format, ## __VA_ARGS__); } while(0)
  10. #else
  11. #define DBG(format, ...) do { } while(0)
  12. #endif
  13. static bool blocked; // flag to prevent MQTT from sending on serial while trying to PGM uC
  14. void ICACHE_FLASH_ATTR
  15. mqtt_block() { blocked = true; }
  16. void ICACHE_FLASH_ATTR
  17. mqtt_unblock() { blocked = false; }
  18. void ICACHE_FLASH_ATTR
  19. cmdMqttConnectedCb(MQTT_Client* client) {
  20. if (blocked) return;
  21. MqttCmdCb* cb = (MqttCmdCb*)client->user_data;
  22. DBG("MQTT: Connected Cb=%p\n", (void*)cb->connectedCb);
  23. cmdResponseStart(CMD_RESP_CB, cb->connectedCb, 0);
  24. cmdResponseEnd();
  25. }
  26. void ICACHE_FLASH_ATTR
  27. cmdMqttDisconnectedCb(MQTT_Client* client) {
  28. if (blocked) return;
  29. MqttCmdCb* cb = (MqttCmdCb*)client->user_data;
  30. DBG("MQTT: Disconnected cb=%p\n", (void*)cb->disconnectedCb);
  31. cmdResponseStart(CMD_RESP_CB, cb->disconnectedCb, 0);
  32. cmdResponseEnd();
  33. }
  34. void ICACHE_FLASH_ATTR
  35. cmdMqttPublishedCb(MQTT_Client* client) {
  36. if (blocked) return;
  37. MqttCmdCb* cb = (MqttCmdCb*)client->user_data;
  38. DBG("MQTT: Published cb=%p\n", (void*)cb->publishedCb);
  39. cmdResponseStart(CMD_RESP_CB, cb->publishedCb, 0);
  40. cmdResponseEnd();
  41. }
  42. void ICACHE_FLASH_ATTR
  43. cmdMqttDataCb(MQTT_Client* client, const char* topic, uint32_t topic_len,
  44. const char* data, uint32_t data_len)
  45. {
  46. if (blocked) return;
  47. MqttCmdCb* cb = (MqttCmdCb*)client->user_data;
  48. DBG("MQTT: Data cb=%p topic=%s len=%u\n", (void*)cb->dataCb, topic, data_len);
  49. cmdResponseStart(CMD_RESP_CB, cb->dataCb, 2);
  50. cmdResponseBody(topic, topic_len);
  51. cmdResponseBody(data, data_len);
  52. cmdResponseEnd();
  53. }
  54. void ICACHE_FLASH_ATTR
  55. MQTTCMD_Lwt(CmdPacket *cmd) {
  56. CmdRequest req;
  57. cmdRequest(&req, cmd);
  58. if (cmdGetArgc(&req) != 4) return;
  59. MQTT_Client* client = &mqttClient;
  60. // free old topic & message
  61. if (client->connect_info.will_topic)
  62. os_free(client->connect_info.will_topic);
  63. if (client->connect_info.will_message)
  64. os_free(client->connect_info.will_message);
  65. uint16_t len;
  66. // get topic
  67. len = cmdArgLen(&req);
  68. if (len > 128) return; // safety check
  69. client->connect_info.will_topic = (char*)os_zalloc(len + 1);
  70. cmdPopArg(&req, client->connect_info.will_topic, len);
  71. client->connect_info.will_topic[len] = 0;
  72. // get message
  73. len = cmdArgLen(&req);
  74. if (len > 128) return; // safety check
  75. client->connect_info.will_message = (char*)os_zalloc(len + 1);
  76. cmdPopArg(&req, client->connect_info.will_message, len);
  77. client->connect_info.will_message[len] = 0;
  78. // get qos
  79. cmdPopArg(&req, (uint8_t*)&client->connect_info.will_qos, 4);
  80. // get retain
  81. cmdPopArg(&req, (uint8_t*)&client->connect_info.will_retain, 4);
  82. DBG("MQTT: MQTTCMD_Lwt topic=%s, message=%s, qos=%d, retain=%d\n",
  83. client->connect_info.will_topic,
  84. client->connect_info.will_message,
  85. client->connect_info.will_qos,
  86. client->connect_info.will_retain);
  87. // trigger a reconnect to set the LWT
  88. MQTT_Reconnect(client);
  89. }
  90. void ICACHE_FLASH_ATTR
  91. MQTTCMD_Publish(CmdPacket *cmd) {
  92. CmdRequest req;
  93. cmdRequest(&req, cmd);
  94. if (cmdGetArgc(&req) != 5) return;
  95. MQTT_Client* client = &mqttClient;
  96. uint16_t len;
  97. // get topic
  98. len = cmdArgLen(&req);
  99. if (len > 128) return; // safety check
  100. uint8_t *topic = (uint8_t*)os_zalloc(len + 1);
  101. cmdPopArg(&req, topic, len);
  102. topic[len] = 0;
  103. // get data
  104. len = cmdArgLen(&req);
  105. uint8_t *data = (uint8_t*)os_zalloc(len+1);
  106. if (!data) { // safety check
  107. os_free(topic);
  108. return;
  109. }
  110. cmdPopArg(&req, data, len);
  111. data[len] = 0;
  112. uint16_t data_len;
  113. uint8_t qos, retain;
  114. // get data length
  115. cmdPopArg(&req, &data_len, sizeof(data_len));
  116. // get qos
  117. cmdPopArg(&req, &qos, sizeof(qos));
  118. // get retain
  119. cmdPopArg(&req, &retain, sizeof(retain));
  120. DBG("MQTT: MQTTCMD_Publish topic=%s, data_len=%d, qos=%d, retain=%d\n",
  121. topic, data_len, qos, retain);
  122. MQTT_Publish(client, (char*)topic, (char*)data, data_len, qos%3, retain&1);
  123. os_free(topic);
  124. os_free(data);
  125. return;
  126. }
  127. void ICACHE_FLASH_ATTR
  128. MQTTCMD_Subscribe(CmdPacket *cmd) {
  129. CmdRequest req;
  130. cmdRequest(&req, cmd);
  131. if (cmdGetArgc(&req) != 2) return;
  132. MQTT_Client* client = &mqttClient;
  133. uint16_t len;
  134. // get topic
  135. len = cmdArgLen(&req);
  136. if (len > 128) return; // safety check
  137. uint8_t* topic = (uint8_t*)os_zalloc(len + 1);
  138. cmdPopArg(&req, topic, len);
  139. topic[len] = 0;
  140. // get qos
  141. uint32_t qos = 0;
  142. cmdPopArg(&req, (uint8_t*)&qos, 4);
  143. DBG("MQTT: MQTTCMD_Subscribe topic=%s, qos=%u\n", topic, qos);
  144. MQTT_Subscribe(client, (char*)topic, (uint8_t)qos);
  145. os_free(topic);
  146. return;
  147. }
  148. void ICACHE_FLASH_ATTR
  149. MQTTCMD_Setup(CmdPacket *cmd) {
  150. CmdRequest req;
  151. cmdRequest(&req, cmd);
  152. MQTT_Client* client = &mqttClient;
  153. if (cmdGetArgc(&req) != 4) return;
  154. // create callback
  155. MqttCmdCb* callback = (MqttCmdCb*)os_zalloc(sizeof(MqttCmdCb));
  156. cmdPopArg(&req, &callback->connectedCb, 4);
  157. cmdPopArg(&req, &callback->disconnectedCb, 4);
  158. cmdPopArg(&req, &callback->publishedCb, 4);
  159. cmdPopArg(&req, &callback->dataCb, 4);
  160. client->user_data = callback;
  161. DBG("MQTT connectedCb=%x\n", callback->connectedCb);
  162. client->cmdConnectedCb = cmdMqttConnectedCb;
  163. client->cmdDisconnectedCb = cmdMqttDisconnectedCb;
  164. client->cmdPublishedCb = cmdMqttPublishedCb;
  165. client->cmdDataCb = cmdMqttDataCb;
  166. if (client->connState == MQTT_CONNECTED) {
  167. if (callback->connectedCb)
  168. cmdMqttConnectedCb(client);
  169. } else if (callback->disconnectedCb) {
  170. cmdMqttDisconnectedCb(client);
  171. }
  172. }