mqtt.c 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810
  1. /* mqtt.c
  2. * Protocol: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
  3. *
  4. * Copyright (c) 2014-2015, Tuan PM <tuanpm at live dot com>
  5. * All rights reserved.
  6. *
  7. * Modified by Thorsten von Eicken to make it fully callback based
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are met:
  11. *
  12. * * Redistributions of source code must retain the above copyright notice,
  13. * this list of conditions and the following disclaimer.
  14. * * Redistributions in binary form must reproduce the above copyright
  15. * notice, this list of conditions and the following disclaimer in the
  16. * documentation and/or other materials provided with the distribution.
  17. * * Neither the name of Redis nor the names of its contributors may be used
  18. * to endorse or promote products derived from this software without
  19. * specific prior written permission.
  20. *
  21. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  22. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  23. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  24. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  25. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  26. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  27. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  28. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  29. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  30. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  31. * POSSIBILITY OF SUCH DAMAGE.
  32. */
  33. // TODO:
  34. // Handle SessionPresent=0 in CONNACK and rexmit subscriptions
  35. // Improve timeout for CONNACK, currently only has keep-alive timeout (maybe send artificial ping?)
  36. // Allow messages that don't require ACK to be sent even when pending_buffer is != NULL
  37. // Set dup flag in retransmissions
  38. #include <esp8266.h>
  39. #include "pktbuf.h"
  40. #include "mqtt.h"
  41. #ifdef MQTT_DBG
  42. #define DBG_MQTT(format, ...) os_printf(format, ## __VA_ARGS__)
  43. #else
  44. #define DBG_MQTT(format, ...) do { } while(0)
  45. #endif
  46. extern void dumpMem(void *buf, int len);
  47. // max message size supported for receive
  48. #define MQTT_MAX_RCV_MESSAGE 2048
  49. // max message size for sending (except publish)
  50. #define MQTT_MAX_SHORT_MESSAGE 128
  51. #ifdef MQTT_DBG
  52. static char* mqtt_msg_type[] = {
  53. "NULL", "TYPE_CONNECT", "CONNACK", "PUBLISH", "PUBACK", "PUBREC", "PUBREL", "PUBCOMP",
  54. "SUBSCRIBE", "SUBACK", "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "DISCONNECT", "RESV",
  55. };
  56. #endif
  57. // forward declarations
  58. static void mqtt_enq_message(MQTT_Client *client, const uint8_t *data, uint16_t len);
  59. static void mqtt_send_message(MQTT_Client* client);
  60. static void mqtt_doAbort(MQTT_Client* client);
  61. // Deliver a publish message to the client
  62. static void ICACHE_FLASH_ATTR
  63. deliver_publish(MQTT_Client* client, uint8_t* message, uint16_t length) {
  64. // parse the message into topic and data
  65. uint16_t topic_length = length;
  66. const char *topic = mqtt_get_publish_topic(message, &topic_length);
  67. uint16_t data_length = length;
  68. const char *data = mqtt_get_publish_data(message, &data_length);
  69. // callback to client
  70. if (client->dataCb)
  71. client->dataCb(client, topic, topic_length, data, data_length);
  72. if (client->cmdDataCb)
  73. client->cmdDataCb(client, topic, topic_length, data, data_length);
  74. }
  75. /**
  76. * @brief Client received callback function.
  77. * @param arg: contain the ip link information
  78. * @param pdata: received data
  79. * @param len: the length of received data
  80. * @retval None
  81. */
  82. static void ICACHE_FLASH_ATTR
  83. mqtt_tcpclient_recv(void* arg, char* pdata, unsigned short len) {
  84. //os_printf("MQTT: recv CB\n");
  85. uint8_t msg_type;
  86. uint16_t msg_id;
  87. uint16_t msg_len;
  88. struct espconn* pCon = (struct espconn*)arg;
  89. MQTT_Client* client = (MQTT_Client *)pCon->reverse;
  90. if (client == NULL) return; // aborted connection
  91. //os_printf("MQTT: Data received %d bytes\n", len);
  92. do {
  93. // append data to our buffer
  94. int avail = client->in_buffer_size - client->in_buffer_filled;
  95. if (len <= avail) {
  96. os_memcpy(client->in_buffer + client->in_buffer_filled, pdata, len);
  97. client->in_buffer_filled += len;
  98. len = 0;
  99. } else {
  100. os_memcpy(client->in_buffer + client->in_buffer_filled, pdata, avail);
  101. client->in_buffer_filled += avail;
  102. len -= avail;
  103. pdata += avail;
  104. }
  105. // check out what's at the head of the buffer
  106. msg_type = mqtt_get_type(client->in_buffer);
  107. msg_id = mqtt_get_id(client->in_buffer, client->in_buffer_size);
  108. msg_len = mqtt_get_total_length(client->in_buffer, client->in_buffer_size);
  109. if (msg_len > client->in_buffer_size) {
  110. // oops, too long a message for us to digest, disconnect and hope for a miracle
  111. os_printf("MQTT: Too long a message (%d bytes)\n", msg_len);
  112. mqtt_doAbort(client);
  113. return;
  114. }
  115. // check whether what's left in the buffer is a complete message
  116. if (msg_len > client->in_buffer_filled) break;
  117. if (client->connState != MQTT_CONNECTED) {
  118. // why are we receiving something??
  119. DBG_MQTT("MQTT ERROR: recv in invalid state %d\n", client->connState);
  120. mqtt_doAbort(client);
  121. return;
  122. }
  123. // we are connected and are sending/receiving data messages
  124. uint8_t pending_msg_type = 0;
  125. uint16_t pending_msg_id = 0;
  126. if (client->pending_buffer != NULL) {
  127. pending_msg_type = mqtt_get_type(client->pending_buffer->data);
  128. pending_msg_id = mqtt_get_id(client->pending_buffer->data, client->pending_buffer->filled);
  129. }
  130. DBG_MQTT("MQTT: Recv type=%s id=%04X len=%d; Pend type=%s id=%02X\n",
  131. mqtt_msg_type[msg_type], msg_id, msg_len, mqtt_msg_type[pending_msg_type],pending_msg_id);
  132. switch (msg_type) {
  133. case MQTT_MSG_TYPE_CONNACK:
  134. //DBG_MQTT("MQTT: Connect successful\n");
  135. // callbacks for internal and external clients
  136. if (client->connectedCb) client->connectedCb(client);
  137. if (client->cmdConnectedCb) client->cmdConnectedCb(client);
  138. client->reconTimeout = 1; // reset the reconnect backoff
  139. break;
  140. case MQTT_MSG_TYPE_SUBACK:
  141. if (pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && pending_msg_id == msg_id) {
  142. //DBG_MQTT("MQTT: Subscribe successful\n");
  143. client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer);
  144. }
  145. break;
  146. case MQTT_MSG_TYPE_UNSUBACK:
  147. if (pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && pending_msg_id == msg_id) {
  148. //DBG_MQTT("MQTT: Unsubscribe successful\n");
  149. client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer);
  150. }
  151. break;
  152. case MQTT_MSG_TYPE_PUBACK: // ack for a publish we sent
  153. if (pending_msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg_id == msg_id) {
  154. //DBG_MQTT("MQTT: QoS1 Publish successful\n");
  155. client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer);
  156. }
  157. break;
  158. case MQTT_MSG_TYPE_PUBREC: // rec for a publish we sent
  159. if (pending_msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg_id == msg_id) {
  160. //DBG_MQTT("MQTT: QoS2 publish cont\n");
  161. client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer);
  162. // we need to send PUBREL
  163. mqtt_msg_pubrel(&client->mqtt_connection, msg_id);
  164. mqtt_enq_message(client, client->mqtt_connection.message.data,
  165. client->mqtt_connection.message.length);
  166. }
  167. break;
  168. case MQTT_MSG_TYPE_PUBCOMP: // comp for a pubrel we sent (originally publish we sent)
  169. if (pending_msg_type == MQTT_MSG_TYPE_PUBREL && pending_msg_id == msg_id) {
  170. //DBG_MQTT("MQTT: QoS2 Publish successful\n");
  171. client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer);
  172. }
  173. break;
  174. case MQTT_MSG_TYPE_PUBLISH: { // incoming publish
  175. // we may need to ACK the publish
  176. uint8_t msg_qos = mqtt_get_qos(client->in_buffer);
  177. #ifdef MQTT_DBG
  178. uint16_t topic_length = msg_len;
  179. os_printf("MQTT: Recv PUBLISH qos=%d %s\n", msg_qos,
  180. mqtt_get_publish_topic(client->in_buffer, &topic_length));
  181. #endif
  182. if (msg_qos == 1) mqtt_msg_puback(&client->mqtt_connection, msg_id);
  183. if (msg_qos == 2) mqtt_msg_pubrec(&client->mqtt_connection, msg_id);
  184. if (msg_qos == 1 || msg_qos == 2) {
  185. mqtt_enq_message(client, client->mqtt_connection.message.data,
  186. client->mqtt_connection.message.length);
  187. }
  188. // send the publish message to clients
  189. deliver_publish(client, client->in_buffer, msg_len);
  190. }
  191. break;
  192. case MQTT_MSG_TYPE_PUBREL: // rel for a rec we sent (originally publish received)
  193. if (pending_msg_type == MQTT_MSG_TYPE_PUBREC && pending_msg_id == msg_id) {
  194. //DBG_MQTT("MQTT: Cont QoS2 recv\n");
  195. client->pending_buffer = PktBuf_ShiftFree(client->pending_buffer);
  196. // we need to send PUBCOMP
  197. mqtt_msg_pubcomp(&client->mqtt_connection, msg_id);
  198. mqtt_enq_message(client, client->mqtt_connection.message.data,
  199. client->mqtt_connection.message.length);
  200. }
  201. break;
  202. case MQTT_MSG_TYPE_PINGRESP:
  203. client->keepAliveAckTick = 0;
  204. break;
  205. }
  206. // Shift out the message and see whether we have another one
  207. if (msg_len < client->in_buffer_filled)
  208. os_memcpy(client->in_buffer, client->in_buffer+msg_len, client->in_buffer_filled-msg_len);
  209. client->in_buffer_filled -= msg_len;
  210. } while(client->in_buffer_filled > 0 || len > 0);
  211. // Send next packet out, if possible
  212. if (!client->sending && client->pending_buffer == NULL && client->msgQueue != NULL) {
  213. mqtt_send_message(client);
  214. }
  215. }
  216. /**
  217. * @brief Callback from TCP that previous send completed
  218. * @param arg: contain the ip link information
  219. * @retval None
  220. */
  221. static void ICACHE_FLASH_ATTR
  222. mqtt_tcpclient_sent_cb(void* arg) {
  223. //DBG_MQTT("MQTT: sent CB\n");
  224. struct espconn* pCon = (struct espconn *)arg;
  225. MQTT_Client* client = (MQTT_Client *)pCon->reverse;
  226. if (client == NULL) return; // aborted connection ?
  227. //DBG_MQTT("MQTT: Sent\n");
  228. // if the message we sent is not a "pending" one, we need to free the buffer
  229. if (client->sending_buffer != NULL) {
  230. PktBuf *buf = client->sending_buffer;
  231. //DBG_MQTT("PktBuf free %p l=%d\n", buf, buf->filled);
  232. os_free(buf);
  233. client->sending_buffer = NULL;
  234. }
  235. client->sending = false;
  236. // send next message if one is queued and we're not expecting an ACK
  237. if (client->connState == MQTT_CONNECTED && client->pending_buffer == NULL &&
  238. client->msgQueue != NULL) {
  239. mqtt_send_message(client);
  240. }
  241. }
  242. /*
  243. * @brief: Timer function to handle timeouts
  244. */
  245. static void ICACHE_FLASH_ATTR
  246. mqtt_timer(void* arg) {
  247. MQTT_Client* client = (MQTT_Client*)arg;
  248. //DBG_MQTT("MQTT: timer CB\n");
  249. switch (client->connState) {
  250. default: break;
  251. case MQTT_CONNECTED:
  252. // first check whether we're timing out for an ACK
  253. if (client->pending_buffer != NULL && --client->timeoutTick == 0) {
  254. // looks like we're not getting a response in time, abort the connection
  255. mqtt_doAbort(client);
  256. client->timeoutTick = 0; // trick to make reconnect happen in 1 second
  257. return;
  258. }
  259. // check whether our last keep-alive timed out
  260. if (client->keepAliveAckTick > 0 && --client->keepAliveAckTick == 0) {
  261. os_printf("\nMQTT ERROR: Keep-alive timed out\n");
  262. mqtt_doAbort(client);
  263. return;
  264. }
  265. // check whether we need to send a keep-alive message
  266. if (client->keepAliveTick > 0 && --client->keepAliveTick == 0) {
  267. // timeout: we need to send a ping message
  268. //DBG_MQTT("MQTT: Send keepalive\n");
  269. mqtt_msg_pingreq(&client->mqtt_connection);
  270. PktBuf *buf = PktBuf_New(client->mqtt_connection.message.length);
  271. os_memcpy(buf->data, client->mqtt_connection.message.data,
  272. client->mqtt_connection.message.length);
  273. buf->filled = client->mqtt_connection.message.length;
  274. client->msgQueue = PktBuf_Unshift(client->msgQueue, buf);
  275. mqtt_send_message(client);
  276. client->keepAliveTick = client->connect_info.keepalive;
  277. client->keepAliveAckTick = client->sendTimeout;
  278. }
  279. break;
  280. case TCP_RECONNECT_REQ:
  281. if (client->timeoutTick == 0 || --client->timeoutTick == 0) {
  282. // it's time to reconnect! start by re-enqueueing anything pending
  283. if (client->pending_buffer != NULL) {
  284. client->msgQueue = PktBuf_Unshift(client->msgQueue, client->pending_buffer);
  285. client->pending_buffer = NULL;
  286. }
  287. client->connect_info.clean_session = 0; // ask server to keep state
  288. MQTT_Connect(client);
  289. }
  290. }
  291. }
  292. /**
  293. * @brief Callback from SDK that socket is disconnected
  294. * @param arg: contain the ip link information
  295. * @retval None
  296. */
  297. void ICACHE_FLASH_ATTR
  298. mqtt_tcpclient_discon_cb(void* arg) {
  299. struct espconn* pespconn = (struct espconn *)arg;
  300. MQTT_Client* client = (MQTT_Client *)pespconn->reverse;
  301. DBG_MQTT("MQTT: Disconnect CB, freeing espconn %p\n", arg);
  302. if (pespconn->proto.tcp) os_free(pespconn->proto.tcp);
  303. os_free(pespconn);
  304. // if this is an aborted connection we're done
  305. if (client == NULL) return;
  306. DBG_MQTT("MQTT: Disconnected from %s:%d\n", client->host, client->port);
  307. if (client->disconnectedCb) client->disconnectedCb(client);
  308. if (client->cmdDisconnectedCb) client->cmdDisconnectedCb(client);
  309. // reconnect unless we're in a permanently disconnected state
  310. if (client->connState == MQTT_DISCONNECTED) return;
  311. client->timeoutTick = client->reconTimeout;
  312. if (client->reconTimeout < 128) client->reconTimeout <<= 1;
  313. client->connState = TCP_RECONNECT_REQ;
  314. }
  315. /**
  316. * @brief Callback from SDK that socket got reset, note that no discon_cb will follow
  317. * @param arg: contain the ip link information
  318. * @retval None
  319. */
  320. static void ICACHE_FLASH_ATTR
  321. mqtt_tcpclient_recon_cb(void* arg, int8_t err) {
  322. struct espconn* pespconn = (struct espconn *)arg;
  323. MQTT_Client* client = (MQTT_Client *)pespconn->reverse;
  324. //DBG_MQTT("MQTT: Reset CB, freeing espconn %p (err=%d)\n", arg, err);
  325. if (pespconn->proto.tcp) os_free(pespconn->proto.tcp);
  326. os_free(pespconn);
  327. os_printf("MQTT: Connection reset from %s:%d\n", client->host, client->port);
  328. if (client->disconnectedCb) client->disconnectedCb(client);
  329. if (client->cmdDisconnectedCb) client->cmdDisconnectedCb(client);
  330. // reconnect unless we're in a permanently disconnected state
  331. if (client->connState == MQTT_DISCONNECTED) return;
  332. client->timeoutTick = client->reconTimeout;
  333. if (client->reconTimeout < 128) client->reconTimeout <<= 1;
  334. client->connState = TCP_RECONNECT_REQ;
  335. os_printf("timeoutTick=%d reconTimeout=%d\n", client->timeoutTick, client->reconTimeout);
  336. }
  337. /**
  338. * @brief Callback from SDK that socket is connected
  339. * @param arg: contain the ip link information
  340. * @retval None
  341. */
  342. static void ICACHE_FLASH_ATTR
  343. mqtt_tcpclient_connect_cb(void* arg) {
  344. struct espconn* pCon = (struct espconn *)arg;
  345. MQTT_Client* client = (MQTT_Client *)pCon->reverse;
  346. if (client == NULL) return; // aborted connection
  347. espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb);
  348. espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv);
  349. espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb);
  350. os_printf("MQTT: TCP connected to %s:%d\n", client->host, client->port);
  351. // send MQTT connect message to broker
  352. mqtt_msg_connect(&client->mqtt_connection, &client->connect_info);
  353. PktBuf *buf = PktBuf_New(client->mqtt_connection.message.length);
  354. os_memcpy(buf->data, client->mqtt_connection.message.data,
  355. client->mqtt_connection.message.length);
  356. buf->filled = client->mqtt_connection.message.length;
  357. client->msgQueue = PktBuf_Unshift(client->msgQueue, buf); // prepend to send (rexmit) queue
  358. mqtt_send_message(client);
  359. client->connState = MQTT_CONNECTED; // v3.1.1 allows publishing while still connecting
  360. }
  361. /**
  362. * @brief Allocate and enqueue mqtt message, kick sending, if appropriate
  363. */
  364. static void ICACHE_FLASH_ATTR
  365. mqtt_enq_message(MQTT_Client *client, const uint8_t *data, uint16_t len) {
  366. PktBuf *buf = PktBuf_New(len);
  367. os_memcpy(buf->data, data, len);
  368. buf->filled = len;
  369. client->msgQueue = PktBuf_Push(client->msgQueue, buf);
  370. if (client->connState == MQTT_CONNECTED && !client->sending && client->pending_buffer == NULL) {
  371. mqtt_send_message(client);
  372. }
  373. }
  374. /**
  375. * @brief Send out top message in queue onto socket
  376. */
  377. static void ICACHE_FLASH_ATTR
  378. mqtt_send_message(MQTT_Client* client) {
  379. //DBG_MQTT("MQTT: Send_message\n");
  380. PktBuf *buf = client->msgQueue;
  381. if (buf == NULL || client->sending) return; // ahem...
  382. client->msgQueue = PktBuf_Shift(client->msgQueue);
  383. // get some details about the message
  384. uint16_t msg_type = mqtt_get_type(buf->data);
  385. #ifdef MQTT_DBG
  386. uint8_t msg_id = mqtt_get_id(buf->data, buf->filled);
  387. os_printf("MQTT: Send type=%s id=%04X len=%d\n", mqtt_msg_type[msg_type], msg_id, buf->filled);
  388. #if 0
  389. for (int i=0; i<buf->filled; i++) {
  390. if (buf->data[i] >= ' ' && buf->data[i] <= '~') os_printf("%c", buf->data[i]);
  391. else os_printf("\\x%02X", buf->data[i]);
  392. }
  393. os_printf("\n");
  394. #endif
  395. #endif
  396. // send the message out
  397. if (client->security)
  398. espconn_secure_sent(client->pCon, buf->data, buf->filled);
  399. else
  400. espconn_sent(client->pCon, buf->data, buf->filled);
  401. client->sending = true;
  402. // depending on whether it needs an ack we need to hold on to the message
  403. bool needsAck =
  404. (msg_type == MQTT_MSG_TYPE_PUBLISH && mqtt_get_qos(buf->data) > 0) ||
  405. msg_type == MQTT_MSG_TYPE_PUBREL || msg_type == MQTT_MSG_TYPE_PUBREC ||
  406. msg_type == MQTT_MSG_TYPE_SUBSCRIBE || msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE ||
  407. msg_type == MQTT_MSG_TYPE_PINGREQ;
  408. if (msg_type == MQTT_MSG_TYPE_PINGREQ) {
  409. client->pending_buffer = NULL; // we don't need to rexmit this one
  410. client->sending_buffer = buf;
  411. } else if (needsAck) {
  412. client->pending_buffer = buf; // remeber for rexmit on disconnect/reconnect
  413. client->sending_buffer = NULL;
  414. client->timeoutTick = client->sendTimeout+1; // +1 to ensure full sendTireout seconds
  415. } else {
  416. client->pending_buffer = NULL;
  417. client->sending_buffer = buf;
  418. client->timeoutTick = 0;
  419. }
  420. client->keepAliveTick = client->connect_info.keepalive > 0 ? client->connect_info.keepalive+1 : 0;
  421. }
  422. /**
  423. * @brief DNS lookup for broker hostname completed, move to next phase
  424. */
  425. static void ICACHE_FLASH_ATTR
  426. mqtt_dns_found(const char* name, ip_addr_t* ipaddr, void* arg) {
  427. struct espconn* pConn = (struct espconn *)arg;
  428. MQTT_Client* client = (MQTT_Client *)pConn->reverse;
  429. if (ipaddr == NULL) {
  430. os_printf("MQTT: DNS lookup failed\n");
  431. if (client != NULL) {
  432. client->timeoutTick = client->reconTimeout;
  433. if (client->reconTimeout < 128) client->reconTimeout <<= 1;
  434. client->connState = TCP_RECONNECT_REQ; // the timer will kick-off a reconnection
  435. }
  436. return;
  437. }
  438. DBG_MQTT("MQTT: ip %d.%d.%d.%d\n",
  439. *((uint8 *)&ipaddr->addr),
  440. *((uint8 *)&ipaddr->addr + 1),
  441. *((uint8 *)&ipaddr->addr + 2),
  442. *((uint8 *)&ipaddr->addr + 3));
  443. if (client != NULL && client->ip.addr == 0 && ipaddr->addr != 0) {
  444. os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4);
  445. uint8_t err;
  446. if (client->security)
  447. err = espconn_secure_connect(client->pCon);
  448. else
  449. err = espconn_connect(client->pCon);
  450. if (err != 0) {
  451. os_printf("MQTT ERROR: Failed to connect\n");
  452. client->timeoutTick = client->reconTimeout;
  453. if (client->reconTimeout < 128) client->reconTimeout <<= 1;
  454. client->connState = TCP_RECONNECT_REQ;
  455. } else {
  456. DBG_MQTT("MQTT: connecting...\n");
  457. }
  458. }
  459. }
  460. //===== publish / subscribe
  461. static void ICACHE_FLASH_ATTR
  462. msg_conn_init(mqtt_connection_t *new_msg, mqtt_connection_t *old_msg,
  463. uint8_t *buf, uint16_t buflen) {
  464. new_msg->message_id = old_msg->message_id;
  465. new_msg->buffer = buf;
  466. new_msg->buffer_length = buflen;
  467. }
  468. /**
  469. * @brief MQTT publish function.
  470. * @param client: MQTT_Client reference
  471. * @param topic: string topic will publish to
  472. * @param data: buffer data send point to
  473. * @param data_length: length of data
  474. * @param qos: qos
  475. * @param retain: retain
  476. * @retval TRUE if success queue
  477. */
  478. bool ICACHE_FLASH_ATTR
  479. MQTT_Publish(MQTT_Client* client, const char* topic, const char* data, uint16_t data_length,
  480. uint8_t qos, uint8_t retain)
  481. {
  482. // estimate the packet size to allocate a buffer
  483. uint16_t topic_length = os_strlen(topic);
  484. // estimate: fixed hdr, pkt-id, topic length, topic, data, fudge
  485. uint16_t buf_len = 3 + 2 + 2 + topic_length + data_length + 16;
  486. PktBuf *buf = PktBuf_New(buf_len);
  487. if (buf == NULL) {
  488. os_printf("MQTT ERROR: Cannot allocate buffer for %d byte publish\n", buf_len);
  489. return FALSE;
  490. }
  491. // use a temporary mqtt_message_t pointing to our buffer, this is a bit of a mess because we
  492. // need to keep track of the message_id that is embedded in it
  493. mqtt_connection_t msg;
  494. msg_conn_init(&msg, &client->mqtt_connection, buf->data, buf_len);
  495. uint16_t msg_id;
  496. if (!mqtt_msg_publish(&msg, topic, data, data_length, qos, retain, &msg_id)){
  497. os_printf("MQTT ERROR: Queuing Publish failed\n");
  498. os_free(buf);
  499. return FALSE;
  500. }
  501. client->mqtt_connection.message_id = msg.message_id;
  502. if (msg.message.data != buf->data)
  503. os_memcpy(buf->data, msg.message.data, msg.message.length);
  504. buf->filled = msg.message.length;
  505. DBG_MQTT("MQTT: Publish, topic: \"%s\", length: %d\n", topic, msg.message.length);
  506. //dumpMem(buf, buf_len);
  507. client->msgQueue = PktBuf_Push(client->msgQueue, buf);
  508. if (!client->sending && client->pending_buffer == NULL) {
  509. mqtt_send_message(client);
  510. }
  511. return TRUE;
  512. }
  513. /**
  514. * @brief MQTT subscribe function.
  515. * @param client: MQTT_Client reference
  516. * @param topic: string topic will subscribe
  517. * @param qos: qos
  518. * @retval TRUE if success queue
  519. */
  520. bool ICACHE_FLASH_ATTR
  521. MQTT_Subscribe(MQTT_Client* client, char* topic, uint8_t qos) {
  522. uint16_t msg_id;
  523. if (!mqtt_msg_subscribe(&client->mqtt_connection, topic, 0, &msg_id)) {
  524. os_printf("MQTT ERROR: Queuing Subscribe failed (too long)\n");
  525. return FALSE;
  526. }
  527. DBG_MQTT("MQTT: Subscribe, topic: \"%s\"\n", topic);
  528. mqtt_enq_message(client, client->mqtt_connection.message.data,
  529. client->mqtt_connection.message.length);
  530. return TRUE;
  531. }
  532. //===== Initialization and connect/disconnect
  533. /**
  534. * @brief MQTT initialization mqtt client function
  535. * @param client: MQTT_Client reference
  536. * @param host: Domain or IP string
  537. * @param port: Port to connect
  538. * @param security: 1 for ssl, 0 for none
  539. * @param clientid: MQTT client id
  540. * @param client_user: MQTT client user
  541. * @param client_pass: MQTT client password
  542. * @param keepAliveTime: MQTT keep alive timer, in second
  543. * @param cleanSession: On connection, a client sets the "clean session" flag, which is sometimes also known as the "clean start" flag.
  544. * If clean session is set to false, then the connection is treated as durable. This means that when the client
  545. * disconnects, any subscriptions it has will remain and any subsequent QoS 1 or 2 messages will be stored until
  546. * it connects again in the future. If clean session is true, then all subscriptions will be removed for the client
  547. * when it disconnects.
  548. * @retval None
  549. */
  550. void ICACHE_FLASH_ATTR
  551. MQTT_Init(MQTT_Client* client, char* host, uint32 port, uint8_t security, uint8_t sendTimeout,
  552. char* client_id, char* client_user, char* client_pass,
  553. uint8_t keepAliveTime) {
  554. DBG_MQTT("MQTT_Init, host=%s\n", host);
  555. os_memset(client, 0, sizeof(MQTT_Client));
  556. client->host = (char*)os_zalloc(os_strlen(host) + 1);
  557. os_strcpy(client->host, host);
  558. client->port = port;
  559. client->security = !!security;
  560. // timeouts with sanity checks
  561. client->sendTimeout = sendTimeout == 0 ? 1 : sendTimeout;
  562. client->reconTimeout = 1; // reset reconnect back-off
  563. os_memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
  564. client->connect_info.client_id = (char*)os_zalloc(os_strlen(client_id) + 1);
  565. os_strcpy(client->connect_info.client_id, client_id);
  566. client->connect_info.username = (char*)os_zalloc(os_strlen(client_user) + 1);
  567. os_strcpy(client->connect_info.username, client_user);
  568. client->connect_info.password = (char*)os_zalloc(os_strlen(client_pass) + 1);
  569. os_strcpy(client->connect_info.password, client_pass);
  570. client->connect_info.keepalive = keepAliveTime;
  571. client->connect_info.clean_session = 1;
  572. client->in_buffer = (uint8_t *)os_zalloc(MQTT_MAX_RCV_MESSAGE);
  573. client->in_buffer_size = MQTT_MAX_RCV_MESSAGE;
  574. uint8_t *out_buffer = (uint8_t *)os_zalloc(MQTT_MAX_SHORT_MESSAGE);
  575. mqtt_msg_init(&client->mqtt_connection, out_buffer, MQTT_MAX_SHORT_MESSAGE);
  576. }
  577. /**
  578. * @brief MQTT Set Last Will Topic, must be called before MQTT_Connect
  579. */
  580. void ICACHE_FLASH_ATTR
  581. MQTT_InitLWT(MQTT_Client* client, char* will_topic, char* will_msg,
  582. uint8_t will_qos, uint8_t will_retain) {
  583. client->connect_info.will_topic = (char*)os_zalloc(os_strlen(will_topic) + 1);
  584. os_strcpy((char*)client->connect_info.will_topic, will_topic);
  585. client->connect_info.will_message = (char*)os_zalloc(os_strlen(will_msg) + 1);
  586. os_strcpy((char*)client->connect_info.will_message, will_msg);
  587. client->connect_info.will_qos = will_qos;
  588. client->connect_info.will_retain = will_retain;
  589. // TODO: if we're connected we should disconnect and reconnect to establish the new LWT
  590. }
  591. /**
  592. * @brief Begin connect to MQTT broker
  593. * @param client: MQTT_Client reference
  594. * @retval None
  595. */
  596. void ICACHE_FLASH_ATTR
  597. MQTT_Connect(MQTT_Client* client) {
  598. //MQTT_Disconnect(client);
  599. client->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));
  600. client->pCon->type = ESPCONN_TCP;
  601. client->pCon->state = ESPCONN_NONE;
  602. client->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp));
  603. client->pCon->proto.tcp->local_port = espconn_port();
  604. client->pCon->proto.tcp->remote_port = client->port;
  605. client->pCon->reverse = client;
  606. espconn_regist_connectcb(client->pCon, mqtt_tcpclient_connect_cb);
  607. espconn_regist_reconcb(client->pCon, mqtt_tcpclient_recon_cb);
  608. // start timer function to tick every second
  609. os_timer_disarm(&client->mqttTimer);
  610. os_timer_setfn(&client->mqttTimer, (os_timer_func_t *)mqtt_timer, client);
  611. os_timer_arm(&client->mqttTimer, 1000, 1);
  612. // initiate the TCP connection or DNS lookup
  613. os_printf("MQTT: Connect to %s:%d %p (client=%p)\n",
  614. client->host, client->port, client->pCon, client);
  615. if (UTILS_StrToIP((const char *)client->host,
  616. (void*)&client->pCon->proto.tcp->remote_ip)) {
  617. uint8_t err;
  618. if (client->security)
  619. err = espconn_secure_connect(client->pCon);
  620. else
  621. err = espconn_connect(client->pCon);
  622. if (err != 0) {
  623. os_printf("MQTT ERROR: Failed to connect\n");
  624. os_free(client->pCon->proto.tcp);
  625. os_free(client->pCon);
  626. client->pCon = NULL;
  627. return;
  628. }
  629. } else {
  630. espconn_gethostbyname(client->pCon, (const char *)client->host, &client->ip,
  631. mqtt_dns_found);
  632. }
  633. client->connState = TCP_CONNECTING;
  634. client->timeoutTick = 20; // generous timeout to allow for DNS, etc
  635. client->sending = FALSE;
  636. }
  637. static void ICACHE_FLASH_ATTR
  638. mqtt_doAbort(MQTT_Client* client) {
  639. os_printf("MQTT: Disconnecting from %s:%d (%p)\n", client->host, client->port, client->pCon);
  640. client->pCon->reverse = NULL; // ensure we jettison this pCon...
  641. if (client->security)
  642. espconn_secure_disconnect(client->pCon);
  643. else
  644. espconn_disconnect(client->pCon);
  645. if (client->disconnectedCb) client->disconnectedCb(client);
  646. if (client->cmdDisconnectedCb) client->cmdDisconnectedCb(client);
  647. if (client->sending_buffer != NULL) {
  648. os_free(client->sending_buffer);
  649. client->sending_buffer = NULL;
  650. }
  651. client->pCon = NULL; // it will be freed in disconnect callback
  652. client->connState = TCP_RECONNECT_REQ;
  653. client->timeoutTick = client->reconTimeout; // reconnect in a few seconds
  654. if (client->reconTimeout < 128) client->reconTimeout <<= 1;
  655. }
  656. void ICACHE_FLASH_ATTR
  657. MQTT_Reconnect(MQTT_Client* client) {
  658. DBG_MQTT("MQTT: Reconnect requested\n");
  659. if (client->connState == MQTT_DISCONNECTED)
  660. MQTT_Connect(client);
  661. else if (client->connState == MQTT_CONNECTED)
  662. mqtt_doAbort(client);
  663. // in other cases we're already in the reconnecting process
  664. }
  665. void ICACHE_FLASH_ATTR
  666. MQTT_Disconnect(MQTT_Client* client) {
  667. DBG_MQTT("MQTT: Disconnect requested\n");
  668. os_timer_disarm(&client->mqttTimer);
  669. if (client->connState == MQTT_DISCONNECTED) return;
  670. if (client->connState == TCP_RECONNECT_REQ) {
  671. client->connState = MQTT_DISCONNECTED;
  672. return;
  673. }
  674. mqtt_doAbort(client);
  675. //void *out_buffer = client->mqtt_connection.buffer;
  676. //if (out_buffer != NULL) os_free(out_buffer);
  677. client->connState = MQTT_DISCONNECTED; // ensure we don't automatically reconnect
  678. }
  679. void ICACHE_FLASH_ATTR
  680. MQTT_Free(MQTT_Client* client) {
  681. DBG_MQTT("MQTT: Free requested\n");
  682. MQTT_Disconnect(client);
  683. if (client->host) os_free(client->host);
  684. client->host = NULL;
  685. if (client->connect_info.client_id) os_free(client->connect_info.client_id);
  686. if (client->connect_info.username) os_free(client->connect_info.username);
  687. if (client->connect_info.password) os_free(client->connect_info.password);
  688. os_memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
  689. if (client->in_buffer) os_free(client->in_buffer);
  690. client->in_buffer = NULL;
  691. if (client->mqtt_connection.buffer) os_free(client->mqtt_connection.buffer);
  692. os_memset(&client->mqtt_connection, 0, sizeof(client->mqtt_connection));
  693. }
  694. void ICACHE_FLASH_ATTR
  695. MQTT_OnConnected(MQTT_Client* client, MqttCallback connectedCb) {
  696. client->connectedCb = connectedCb;
  697. }
  698. void ICACHE_FLASH_ATTR
  699. MQTT_OnDisconnected(MQTT_Client* client, MqttCallback disconnectedCb) {
  700. client->disconnectedCb = disconnectedCb;
  701. }
  702. void ICACHE_FLASH_ATTR
  703. MQTT_OnData(MQTT_Client* client, MqttDataCallback dataCb) {
  704. client->dataCb = dataCb;
  705. }
  706. void ICACHE_FLASH_ATTR
  707. MQTT_OnPublished(MQTT_Client* client, MqttCallback publishedCb) {
  708. client->publishedCb = publishedCb;
  709. }