mqtt.c 33 KB


  1. /* mqtt.c
  2. * Protocol: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
  3. *
  4. * Copyright (c) 2014-2015, Tuan PM <tuanpm at live dot com>
  5. * All rights reserved.
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. *
  10. * * Redistributions of source code must retain the above copyright notice,
  11. * this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above copyright
  13. * notice, this list of conditions and the following disclaimer in the
  14. * documentation and/or other materials provided with the distribution.
  15. * * Neither the name of Redis nor the names of its contributors may be used
  16. * to endorse or promote products derived from this software without
  17. * specific prior written permission.
  18. *
  19. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  20. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  21. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  22. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  23. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  24. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  25. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  26. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  27. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  28. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  29. * POSSIBILITY OF SUCH DAMAGE.
  30. */
  31. #include "user_interface.h"
  32. #include "osapi.h"
  33. #include "espconn.h"
  34. #include "os_type.h"
  35. #include "mem.h"
  36. #include "mqtt/mqtt_msg.h"
  37. #include "mqtt/debug.h"
  38. #include "user_config.h"
  39. #include "mqtt/mqtt.h"
  40. #include "mqtt/queue.h"
  41. #define MQTT_TASK_PRIO 2
  42. #define MQTT_TASK_QUEUE_SIZE 1
  43. #define MQTT_SEND_TIMOUT 5
  44. #ifndef MQTT_SSL_SIZE
  45. #define MQTT_SSL_SIZE 5120
  46. #endif
  47. #ifndef QUEUE_BUFFER_SIZE
  48. #define QUEUE_BUFFER_SIZE 2048
  49. #endif
  50. unsigned char *default_certificate;
  51. unsigned int default_certificate_len = 0;
  52. unsigned char *default_private_key;
  53. unsigned int default_private_key_len = 0;
  54. os_event_t mqtt_procTaskQueue[MQTT_TASK_QUEUE_SIZE];
  55. #ifdef PROTOCOL_NAMEv311
  56. LOCAL uint8_t zero_len_id[2] = { 0, 0 };
  57. #endif
  58. LOCAL void ICACHE_FLASH_ATTR
  59. mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
  60. {
  61. struct espconn *pConn = (struct espconn *)arg;
  62. MQTT_Client* client = (MQTT_Client *)pConn->reverse;
  63. if (ipaddr == NULL)
  64. {
  65. MQTT_INFO("DNS: Found, but got no ip, try to reconnect\r\n");
  66. client->connState = TCP_RECONNECT_REQ;
  67. return;
  68. }
  69. MQTT_INFO("DNS: found ip %d.%d.%d.%d\n",
  70. *((uint8 *) &ipaddr->addr),
  71. *((uint8 *) &ipaddr->addr + 1),
  72. *((uint8 *) &ipaddr->addr + 2),
  73. *((uint8 *) &ipaddr->addr + 3));
  74. if (client->ip.addr == 0 && ipaddr->addr != 0)
  75. {
  76. os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4);
  77. if (client->security) {
  78. #ifdef MQTT_SSL_ENABLE
  79. espconn_secure_set_size(ESPCONN_CLIENT, MQTT_SSL_SIZE);
  80. espconn_secure_connect(client->pCon);
  81. #else
  82. MQTT_INFO("TCP: Do not support SSL\r\n");
  83. #endif
  84. }
  85. else {
  86. espconn_connect(client->pCon);
  87. }
  88. client->connState = TCP_CONNECTING;
  89. MQTT_INFO("TCP: connecting...\r\n");
  90. }
  91. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  92. }
  93. LOCAL void ICACHE_FLASH_ATTR
  94. deliver_publish(MQTT_Client* client, uint8_t* message, int length)
  95. {
  96. mqtt_event_data_t event_data;
  97. event_data.topic_length = length;
  98. event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length);
  99. event_data.data_length = length;
  100. event_data.data = mqtt_get_publish_data(message, &event_data.data_length);
  101. if (client->dataCb)
  102. client->dataCb((uint32_t*)client, event_data.topic, event_data.topic_length, event_data.data, event_data.data_length);
  103. }
  104. void ICACHE_FLASH_ATTR
  105. mqtt_send_keepalive(MQTT_Client *client)
  106. {
  107. MQTT_INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port);
  108. client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
  109. client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ;
  110. client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
  111. client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
  112. client->sendTimeout = MQTT_SEND_TIMOUT;
  113. MQTT_INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
  114. err_t result = ESPCONN_OK;
  115. if (client->security) {
  116. #ifdef MQTT_SSL_ENABLE
  117. result = espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
  118. #else
  119. MQTT_INFO("TCP: Do not support SSL\r\n");
  120. #endif
  121. }
  122. else {
  123. result = espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
  124. }
  125. client->mqtt_state.outbound_message = NULL;
  126. if (ESPCONN_OK == result) {
  127. client->keepAliveTick = 0;
  128. client->connState = MQTT_DATA;
  129. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  130. }
  131. else {
  132. client->connState = TCP_RECONNECT_DISCONNECTING;
  133. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  134. }
  135. }
  136. /**
  137. * @brief Delete tcp client and free all memory
  138. * @param mqttClient: The mqtt client which contain TCP client
  139. * @retval None
  140. */
  141. void ICACHE_FLASH_ATTR
  142. mqtt_tcpclient_delete(MQTT_Client *mqttClient)
  143. {
  144. if (mqttClient->pCon != NULL) {
  145. MQTT_INFO("TCP: Free memory\r\n");
  146. // Force abort connections
  147. espconn_abort(mqttClient->pCon);
  148. // Delete connections
  149. espconn_delete(mqttClient->pCon);
  150. if (mqttClient->pCon->proto.tcp) {
  151. os_free(mqttClient->pCon->proto.tcp);
  152. mqttClient->pCon->proto.tcp = NULL;
  153. }
  154. os_free(mqttClient->pCon);
  155. mqttClient->pCon = NULL;
  156. }
  157. }
  158. /**
  159. * @brief Delete MQTT client and free all memory
  160. * @param mqttClient: The mqtt client
  161. * @retval None
  162. */
  163. void ICACHE_FLASH_ATTR
  164. mqtt_client_delete(MQTT_Client *mqttClient)
  165. {
  166. if (mqttClient == NULL)
  167. return;
  168. if (mqttClient->pCon != NULL) {
  169. mqtt_tcpclient_delete(mqttClient);
  170. }
  171. if (mqttClient->host != NULL) {
  172. os_free(mqttClient->host);
  173. mqttClient->host = NULL;
  174. }
  175. if (mqttClient->user_data != NULL) {
  176. os_free(mqttClient->user_data);
  177. mqttClient->user_data = NULL;
  178. }
  179. if (mqttClient->mqtt_state.in_buffer != NULL) {
  180. os_free(mqttClient->mqtt_state.in_buffer);
  181. mqttClient->mqtt_state.in_buffer = NULL;
  182. }
  183. if (mqttClient->mqtt_state.out_buffer != NULL) {
  184. os_free(mqttClient->mqtt_state.out_buffer);
  185. mqttClient->mqtt_state.out_buffer = NULL;
  186. }
  187. if (mqttClient->mqtt_state.outbound_message != NULL) {
  188. if (mqttClient->mqtt_state.outbound_message->data != NULL)
  189. {
  190. os_free(mqttClient->mqtt_state.outbound_message->data);
  191. mqttClient->mqtt_state.outbound_message->data = NULL;
  192. }
  193. }
  194. if (mqttClient->mqtt_state.mqtt_connection.buffer != NULL) {
  195. // Already freed but not NULL
  196. mqttClient->mqtt_state.mqtt_connection.buffer = NULL;
  197. }
  198. if (mqttClient->connect_info.client_id != NULL) {
  199. #ifdef PROTOCOL_NAMEv311
  200. /* Don't attempt to free if it's the zero_len array */
  201. if ( ((uint8_t*)mqttClient->connect_info.client_id) != zero_len_id )
  202. os_free(mqttClient->connect_info.client_id);
  203. #else
  204. os_free(mqttClient->connect_info.client_id);
  205. #endif
  206. mqttClient->connect_info.client_id = NULL;
  207. }
  208. if (mqttClient->connect_info.username != NULL) {
  209. os_free(mqttClient->connect_info.username);
  210. mqttClient->connect_info.username = NULL;
  211. }
  212. if (mqttClient->connect_info.password != NULL) {
  213. os_free(mqttClient->connect_info.password);
  214. mqttClient->connect_info.password = NULL;
  215. }
  216. if (mqttClient->connect_info.will_topic != NULL) {
  217. os_free(mqttClient->connect_info.will_topic);
  218. mqttClient->connect_info.will_topic = NULL;
  219. }
  220. if (mqttClient->connect_info.will_message != NULL) {
  221. os_free(mqttClient->connect_info.will_message);
  222. mqttClient->connect_info.will_message = NULL;
  223. }
  224. if (mqttClient->msgQueue.buf != NULL) {
  225. os_free(mqttClient->msgQueue.buf);
  226. mqttClient->msgQueue.buf = NULL;
  227. }
  228. // Initialize state
  229. mqttClient->connState = WIFI_INIT;
  230. // Clear callback functions to avoid abnormal callback
  231. mqttClient->connectedCb = NULL;
  232. mqttClient->disconnectedCb = NULL;
  233. mqttClient->publishedCb = NULL;
  234. mqttClient->timeoutCb = NULL;
  235. mqttClient->dataCb = NULL;
  236. MQTT_INFO("MQTT: client already deleted\r\n");
  237. }
  238. /**
  239. * @brief Client received callback function.
  240. * @param arg: contain the ip link information
  241. * @param pdata: received data
  242. * @param len: the lenght of received data
  243. * @retval None
  244. */
  245. void ICACHE_FLASH_ATTR
  246. mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
  247. {
  248. uint8_t msg_type;
  249. uint8_t msg_qos;
  250. uint16_t msg_id;
  251. uint8_t msg_conn_ret;
  252. struct espconn *pCon = (struct espconn*)arg;
  253. MQTT_Client *client = (MQTT_Client *)pCon->reverse;
  254. READPACKET:
  255. MQTT_INFO("TCP: data received %d bytes\r\n", len);
  256. // MQTT_INFO("STATE: %d\r\n", client->connState);
  257. if (len < MQTT_BUF_SIZE && len > 0) {
  258. os_memcpy(client->mqtt_state.in_buffer, pdata, len);
  259. msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
  260. msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
  261. msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
  262. switch (client->connState) {
  263. case MQTT_CONNECT_SENDING:
  264. if (msg_type == MQTT_MSG_TYPE_CONNACK) {
  265. if (client->mqtt_state.pending_msg_type != MQTT_MSG_TYPE_CONNECT) {
  266. MQTT_INFO("MQTT: Invalid packet\r\n");
  267. if (client->security) {
  268. #ifdef MQTT_SSL_ENABLE
  269. espconn_secure_disconnect(client->pCon);
  270. #else
  271. MQTT_INFO("TCP: Do not support SSL\r\n");
  272. #endif
  273. }
  274. else {
  275. espconn_disconnect(client->pCon);
  276. }
  277. } else {
  278. msg_conn_ret = mqtt_get_connect_return_code(client->mqtt_state.in_buffer);
  279. switch (msg_conn_ret) {
  280. case CONNECTION_ACCEPTED:
  281. MQTT_INFO("MQTT: Connected to %s:%d\r\n", client->host, client->port);
  282. client->connState = MQTT_DATA;
  283. if (client->connectedCb)
  284. client->connectedCb((uint32_t*)client);
  285. break;
  286. case CONNECTION_REFUSE_PROTOCOL:
  287. case CONNECTION_REFUSE_SERVER_UNAVAILABLE:
  288. case CONNECTION_REFUSE_BAD_USERNAME:
  289. case CONNECTION_REFUSE_NOT_AUTHORIZED:
  290. MQTT_INFO("MQTT: Connection refuse, reason code: %d\r\n", msg_conn_ret);
  291. default:
  292. if (client->security) {
  293. #ifdef MQTT_SSL_ENABLE
  294. espconn_secure_disconnect(client->pCon);
  295. #else
  296. MQTT_INFO("TCP: Do not support SSL\r\n");
  297. #endif
  298. }
  299. else {
  300. espconn_disconnect(client->pCon);
  301. }
  302. }
  303. }
  304. }
  305. break;
  306. case MQTT_DATA:
  307. case MQTT_KEEPALIVE_SEND:
  308. client->mqtt_state.message_length_read = len;
  309. client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
  310. switch (msg_type)
  311. {
  312. case MQTT_MSG_TYPE_SUBACK:
  313. if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
  314. MQTT_INFO("MQTT: Subscribe successful\r\n");
  315. break;
  316. case MQTT_MSG_TYPE_UNSUBACK:
  317. if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
  318. MQTT_INFO("MQTT: UnSubscribe successful\r\n");
  319. break;
  320. case MQTT_MSG_TYPE_PUBLISH:
  321. if (msg_qos == 1)
  322. client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
  323. else if (msg_qos == 2)
  324. client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
  325. if (msg_qos == 1 || msg_qos == 2) {
  326. MQTT_INFO("MQTT: Queue response QoS: %d\r\n", msg_qos);
  327. if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
  328. MQTT_INFO("MQTT: Queue full\r\n");
  329. }
  330. }
  331. deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
  332. break;
  333. case MQTT_MSG_TYPE_PUBACK:
  334. if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) {
  335. MQTT_INFO("MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\r\n");
  336. }
  337. break;
  338. case MQTT_MSG_TYPE_PUBREC:
  339. client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
  340. if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
  341. MQTT_INFO("MQTT: Queue full\r\n");
  342. }
  343. break;
  344. case MQTT_MSG_TYPE_PUBREL:
  345. client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
  346. if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
  347. MQTT_INFO("MQTT: Queue full\r\n");
  348. }
  349. break;
  350. case MQTT_MSG_TYPE_PUBCOMP:
  351. if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) {
  352. MQTT_INFO("MQTT: receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish\r\n");
  353. }
  354. break;
  355. case MQTT_MSG_TYPE_PINGREQ:
  356. client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection);
  357. if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
  358. MQTT_INFO("MQTT: Queue full\r\n");
  359. }
  360. break;
  361. case MQTT_MSG_TYPE_PINGRESP:
  362. // Ignore
  363. break;
  364. }
  365. // NOTE: this is done down here and not in the switch case above
  366. // because the PSOCK_READBUF_LEN() won't work inside a switch
  367. // statement due to the way protothreads resume.
  368. if (msg_type == MQTT_MSG_TYPE_PUBLISH)
  369. {
  370. len = client->mqtt_state.message_length_read;
  371. if (client->mqtt_state.message_length < client->mqtt_state.message_length_read)
  372. {
  373. //client->connState = MQTT_PUBLISH_RECV;
  374. //Not Implement yet
  375. len -= client->mqtt_state.message_length;
  376. pdata += client->mqtt_state.message_length;
  377. MQTT_INFO("Get another published message\r\n");
  378. goto READPACKET;
  379. }
  380. }
  381. break;
  382. }
  383. } else {
  384. MQTT_INFO("ERROR: Message too long\r\n");
  385. }
  386. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  387. }
  388. /**
  389. * @brief Client send over callback function.
  390. * @param arg: contain the ip link information
  391. * @retval None
  392. */
  393. void ICACHE_FLASH_ATTR
  394. mqtt_tcpclient_sent_cb(void *arg)
  395. {
  396. struct espconn *pCon = (struct espconn *)arg;
  397. MQTT_Client* client = (MQTT_Client *)pCon->reverse;
  398. MQTT_INFO("TCP: Sent\r\n");
  399. client->sendTimeout = 0;
  400. client->keepAliveTick = 0;
  401. if ((client->connState == MQTT_DATA || client->connState == MQTT_KEEPALIVE_SEND)
  402. && client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH) {
  403. if (client->publishedCb)
  404. client->publishedCb((uint32_t*)client);
  405. }
  406. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  407. }
  408. void ICACHE_FLASH_ATTR mqtt_timer(void *arg)
  409. {
  410. MQTT_Client* client = (MQTT_Client*)arg;
  411. if (client->connState == MQTT_DATA) {
  412. client->keepAliveTick ++;
  413. if (client->keepAliveTick > (client->mqtt_state.connect_info->keepalive / 2)) {
  414. client->connState = MQTT_KEEPALIVE_SEND;
  415. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  416. }
  417. } else if (client->connState == TCP_RECONNECT_REQ) {
  418. client->reconnectTick ++;
  419. if (client->reconnectTick > MQTT_RECONNECT_TIMEOUT) {
  420. client->reconnectTick = 0;
  421. client->connState = TCP_RECONNECT;
  422. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  423. if (client->timeoutCb)
  424. client->timeoutCb((uint32_t*)client);
  425. }
  426. }
  427. if (client->sendTimeout > 0)
  428. client->sendTimeout --;
  429. }
  430. void ICACHE_FLASH_ATTR
  431. mqtt_tcpclient_discon_cb(void *arg)
  432. {
  433. struct espconn *pespconn = (struct espconn *)arg;
  434. MQTT_Client* client = (MQTT_Client *)pespconn->reverse;
  435. MQTT_INFO("TCP: Disconnected callback\r\n");
  436. if (TCP_DISCONNECTING == client->connState) {
  437. client->connState = TCP_DISCONNECTED;
  438. }
  439. else if (MQTT_DELETING == client->connState) {
  440. client->connState = MQTT_DELETED;
  441. }
  442. else {
  443. client->connState = TCP_RECONNECT_REQ;
  444. }
  445. if (client->disconnectedCb)
  446. client->disconnectedCb((uint32_t*)client);
  447. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  448. }
  449. /**
  450. * @brief Tcp client connect success callback function.
  451. * @param arg: contain the ip link information
  452. * @retval None
  453. */
  454. void ICACHE_FLASH_ATTR
  455. mqtt_tcpclient_connect_cb(void *arg)
  456. {
  457. struct espconn *pCon = (struct espconn *)arg;
  458. MQTT_Client* client = (MQTT_Client *)pCon->reverse;
  459. espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb);
  460. espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv);////////
  461. espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb);///////
  462. MQTT_INFO("MQTT: Connected to broker %s:%d\r\n", client->host, client->port);
  463. mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length);
  464. client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info);
  465. client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
  466. client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
  467. client->sendTimeout = MQTT_SEND_TIMOUT;
  468. MQTT_INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
  469. if (client->security) {
  470. #ifdef MQTT_SSL_ENABLE
  471. espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
  472. #else
  473. MQTT_INFO("TCP: Do not support SSL\r\n");
  474. #endif
  475. }
  476. else {
  477. espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
  478. }
  479. client->mqtt_state.outbound_message = NULL;
  480. client->connState = MQTT_CONNECT_SENDING;
  481. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  482. }
  483. /**
  484. * @brief Tcp client connect repeat callback function.
  485. * @param arg: contain the ip link information
  486. * @retval None
  487. */
  488. void ICACHE_FLASH_ATTR
  489. mqtt_tcpclient_recon_cb(void *arg, sint8 errType)
  490. {
  491. struct espconn *pCon = (struct espconn *)arg;
  492. MQTT_Client* client = (MQTT_Client *)pCon->reverse;
  493. MQTT_INFO("TCP: Reconnect to %s:%d\r\n", client->host, client->port);
  494. client->connState = TCP_RECONNECT_REQ;
  495. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  496. }
  497. /**
  498. * @brief MQTT publish function.
  499. * @param client: MQTT_Client reference
  500. * @param topic: string topic will publish to
  501. * @param data: buffer data send point to
  502. * @param data_length: length of data
  503. * @param qos: qos
  504. * @param retain: retain
  505. * @retval TRUE if success queue
  506. */
  507. BOOL ICACHE_FLASH_ATTR
  508. MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain)
  509. {
  510. uint8_t dataBuffer[MQTT_BUF_SIZE];
  511. uint16_t dataLen;
  512. client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
  513. topic, data, data_length,
  514. qos, retain,
  515. &client->mqtt_state.pending_msg_id);
  516. if (client->mqtt_state.outbound_message->length == 0) {
  517. MQTT_INFO("MQTT: Queuing publish failed\r\n");
  518. return FALSE;
  519. }
  520. MQTT_INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
  521. while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
  522. MQTT_INFO("MQTT: Queue full\r\n");
  523. if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
  524. MQTT_INFO("MQTT: Serious buffer error\r\n");
  525. return FALSE;
  526. }
  527. }
  528. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  529. return TRUE;
  530. }
  531. /**
  532. * @brief MQTT subscibe function.
  533. * @param client: MQTT_Client reference
  534. * @param topic: string topic will subscribe
  535. * @param qos: qos
  536. * @retval TRUE if success queue
  537. */
  538. BOOL ICACHE_FLASH_ATTR
  539. MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos)
  540. {
  541. uint8_t dataBuffer[MQTT_BUF_SIZE];
  542. uint16_t dataLen;
  543. client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
  544. topic, qos,
  545. &client->mqtt_state.pending_msg_id);
  546. MQTT_INFO("MQTT: queue subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
  547. while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
  548. MQTT_INFO("MQTT: Queue full\r\n");
  549. if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
  550. MQTT_INFO("MQTT: Serious buffer error\r\n");
  551. return FALSE;
  552. }
  553. }
  554. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  555. return TRUE;
  556. }
  557. /**
  558. * @brief MQTT un-subscibe function.
  559. * @param client: MQTT_Client reference
  560. * @param topic: String topic will un-subscribe
  561. * @retval TRUE if success queue
  562. */
  563. BOOL ICACHE_FLASH_ATTR
  564. MQTT_UnSubscribe(MQTT_Client *client, char* topic)
  565. {
  566. uint8_t dataBuffer[MQTT_BUF_SIZE];
  567. uint16_t dataLen;
  568. client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
  569. topic,
  570. &client->mqtt_state.pending_msg_id);
  571. MQTT_INFO("MQTT: queue un-subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
  572. while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
  573. MQTT_INFO("MQTT: Queue full\r\n");
  574. if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
  575. MQTT_INFO("MQTT: Serious buffer error\r\n");
  576. return FALSE;
  577. }
  578. }
  579. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  580. return TRUE;
  581. }
  582. /**
  583. * @brief MQTT ping function.
  584. * @param client: MQTT_Client reference
  585. * @retval TRUE if success queue
  586. */
  587. BOOL ICACHE_FLASH_ATTR
  588. MQTT_Ping(MQTT_Client *client)
  589. {
  590. uint8_t dataBuffer[MQTT_BUF_SIZE];
  591. uint16_t dataLen;
  592. client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
  593. if (client->mqtt_state.outbound_message->length == 0) {
  594. MQTT_INFO("MQTT: Queuing publish failed\r\n");
  595. return FALSE;
  596. }
  597. MQTT_INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
  598. while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
  599. MQTT_INFO("MQTT: Queue full\r\n");
  600. if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
  601. MQTT_INFO("MQTT: Serious buffer error\r\n");
  602. return FALSE;
  603. }
  604. }
  605. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
  606. return TRUE;
  607. }
  608. void ICACHE_FLASH_ATTR
  609. MQTT_Task(os_event_t *e)
  610. {
  611. MQTT_Client* client = (MQTT_Client*)e->par;
  612. uint8_t dataBuffer[MQTT_BUF_SIZE];
  613. uint16_t dataLen;
  614. if (e->par == 0)
  615. return;
  616. switch (client->connState) {
  617. case TCP_RECONNECT_REQ:
  618. break;
  619. case TCP_RECONNECT:
  620. mqtt_tcpclient_delete(client);
  621. MQTT_Connect(client);
  622. MQTT_INFO("TCP: Reconnect to: %s:%d\r\n", client->host, client->port);
  623. client->connState = TCP_CONNECTING;
  624. break;
  625. case MQTT_DELETING:
  626. case TCP_DISCONNECTING:
  627. case TCP_RECONNECT_DISCONNECTING:
  628. if (client->security) {
  629. #ifdef MQTT_SSL_ENABLE
  630. espconn_secure_disconnect(client->pCon);
  631. #else
  632. MQTT_INFO("TCP: Do not support SSL\r\n");
  633. #endif
  634. }
  635. else {
  636. espconn_disconnect(client->pCon);
  637. }
  638. break;
  639. case TCP_DISCONNECTED:
  640. MQTT_INFO("MQTT: Disconnected\r\n");
  641. mqtt_tcpclient_delete(client);
  642. break;
  643. case MQTT_DELETED:
  644. MQTT_INFO("MQTT: Deleted client\r\n");
  645. mqtt_client_delete(client);
  646. break;
  647. case MQTT_KEEPALIVE_SEND:
  648. mqtt_send_keepalive(client);
  649. break;
  650. case MQTT_DATA:
  651. if (QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0) {
  652. break;
  653. }
  654. if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0) {
  655. client->mqtt_state.pending_msg_type = mqtt_get_type(dataBuffer);
  656. client->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen);
  657. client->sendTimeout = MQTT_SEND_TIMOUT;
  658. MQTT_INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
  659. client->keepAliveTick = 0;
  660. if (client->security) {
  661. #ifdef MQTT_SSL_ENABLE
  662. espconn_secure_send(client->pCon, dataBuffer, dataLen);
  663. #else
  664. MQTT_INFO("TCP: Do not support SSL\r\n");
  665. #endif
  666. }
  667. else {
  668. espconn_send(client->pCon, dataBuffer, dataLen);
  669. }
  670. client->mqtt_state.outbound_message = NULL;
  671. break;
  672. }
  673. break;
  674. }
  675. }
  676. /**
  677. * @brief MQTT initialization connection function
  678. * @param client: MQTT_Client reference
  679. * @param host: Domain or IP string
  680. * @param port: Port to connect
  681. * @param security: 1 for ssl, 0 for none
  682. * @retval None
  683. */
  684. void ICACHE_FLASH_ATTR
  685. MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32_t port, uint8_t security)
  686. {
  687. uint32_t temp;
  688. MQTT_INFO("MQTT:InitConnection\r\n");
  689. os_memset(mqttClient, 0, sizeof(MQTT_Client));
  690. temp = os_strlen(host);
  691. mqttClient->host = (uint8_t*)os_zalloc(temp + 1);
  692. os_strcpy(mqttClient->host, host);
  693. mqttClient->host[temp] = 0;
  694. mqttClient->port = port;
  695. mqttClient->security = security;
  696. }
  697. /**
  698. * @brief MQTT initialization mqtt client function
  699. * @param client: MQTT_Client reference
  700. * @param clientid: MQTT client id
  701. * @param client_user:MQTT client user
  702. * @param client_pass:MQTT client password
  703. * @param client_pass:MQTT keep alive timer, in second
  704. * @retval None
  705. */
  706. BOOL ICACHE_FLASH_ATTR
  707. MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime, uint8_t cleanSession)
  708. {
  709. uint32_t temp;
  710. MQTT_INFO("MQTT:InitClient\r\n");
  711. os_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t));
  712. if ( !client_id )
  713. {
  714. /* Should be allowed by broker, but clean session flag must be set. */
  715. #ifdef PROTOCOL_NAMEv311
  716. if (cleanSession)
  717. {
  718. mqttClient->connect_info.client_id = zero_len_id;
  719. } else {
  720. MQTT_INFO("cleanSession must be set to use 0 length client_id\r\n");
  721. return false;
  722. }
  723. /* Not supported. Return. */
  724. #else
  725. MQTT_INFO("Client ID required for MQTT < 3.1.1!\r\n");
  726. return false;
  727. #endif
  728. }
  729. /* If connect_info's client_id is still NULL and we get here, we can *
  730. * assume the passed client_id is non-NULL. */
  731. if ( !(mqttClient->connect_info.client_id) )
  732. {
  733. temp = os_strlen(client_id);
  734. mqttClient->connect_info.client_id = (uint8_t*)os_zalloc(temp + 1);
  735. os_strcpy(mqttClient->connect_info.client_id, client_id);
  736. mqttClient->connect_info.client_id[temp] = 0;
  737. }
  738. if (client_user)
  739. {
  740. temp = os_strlen(client_user);
  741. mqttClient->connect_info.username = (uint8_t*)os_zalloc(temp + 1);
  742. os_strcpy(mqttClient->connect_info.username, client_user);
  743. mqttClient->connect_info.username[temp] = 0;
  744. }
  745. if (client_pass)
  746. {
  747. temp = os_strlen(client_pass);
  748. mqttClient->connect_info.password = (uint8_t*)os_zalloc(temp + 1);
  749. os_strcpy(mqttClient->connect_info.password, client_pass);
  750. mqttClient->connect_info.password[temp] = 0;
  751. }
  752. mqttClient->connect_info.keepalive = keepAliveTime;
  753. mqttClient->connect_info.clean_session = cleanSession;
  754. mqttClient->mqtt_state.in_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);
  755. mqttClient->mqtt_state.in_buffer_length = MQTT_BUF_SIZE;
  756. mqttClient->mqtt_state.out_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);
  757. mqttClient->mqtt_state.out_buffer_length = MQTT_BUF_SIZE;
  758. mqttClient->mqtt_state.connect_info = &mqttClient->connect_info;
  759. mqtt_msg_init(&mqttClient->mqtt_state.mqtt_connection, mqttClient->mqtt_state.out_buffer, mqttClient->mqtt_state.out_buffer_length);
  760. QUEUE_Init(&mqttClient->msgQueue, QUEUE_BUFFER_SIZE);
  761. system_os_task(MQTT_Task, MQTT_TASK_PRIO, mqtt_procTaskQueue, MQTT_TASK_QUEUE_SIZE);
  762. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
  763. return true;
  764. }
  765. void ICACHE_FLASH_ATTR
  766. MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain)
  767. {
  768. uint32_t temp;
  769. temp = os_strlen(will_topic);
  770. mqttClient->connect_info.will_topic = (uint8_t*)os_zalloc(temp + 1);
  771. os_strcpy(mqttClient->connect_info.will_topic, will_topic);
  772. mqttClient->connect_info.will_topic[temp] = 0;
  773. temp = os_strlen(will_msg);
  774. mqttClient->connect_info.will_message = (uint8_t*)os_zalloc(temp + 1);
  775. os_strcpy(mqttClient->connect_info.will_message, will_msg);
  776. mqttClient->connect_info.will_message[temp] = 0;
  777. mqttClient->connect_info.will_qos = will_qos;
  778. mqttClient->connect_info.will_retain = will_retain;
  779. }
  780. /**
  781. * @brief Begin connect to MQTT broker
  782. * @param client: MQTT_Client reference
  783. * @retval None
  784. */
  785. void ICACHE_FLASH_ATTR
  786. MQTT_Connect(MQTT_Client *mqttClient)
  787. {
  788. if (mqttClient->pCon) {
  789. // Clean up the old connection forcefully - using MQTT_Disconnect
  790. // does not actually release the old connection until the
  791. // disconnection callback is invoked.
  792. mqtt_tcpclient_delete(mqttClient);
  793. }
  794. mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));
  795. mqttClient->pCon->type = ESPCONN_TCP;
  796. mqttClient->pCon->state = ESPCONN_NONE;
  797. mqttClient->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp));
  798. mqttClient->pCon->proto.tcp->local_port = espconn_port();
  799. mqttClient->pCon->proto.tcp->remote_port = mqttClient->port;
  800. mqttClient->pCon->reverse = mqttClient;
  801. espconn_regist_connectcb(mqttClient->pCon, mqtt_tcpclient_connect_cb);
  802. espconn_regist_reconcb(mqttClient->pCon, mqtt_tcpclient_recon_cb);
  803. mqttClient->keepAliveTick = 0;
  804. mqttClient->reconnectTick = 0;
  805. os_timer_disarm(&mqttClient->mqttTimer);
  806. os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient);
  807. os_timer_arm(&mqttClient->mqttTimer, 1000, 1);
  808. if (UTILS_StrToIP(mqttClient->host, &mqttClient->pCon->proto.tcp->remote_ip)) {
  809. MQTT_INFO("TCP: Connect to ip %s:%d\r\n", mqttClient->host, mqttClient->port);
  810. if (mqttClient->security)
  811. {
  812. #ifdef MQTT_SSL_ENABLE
  813. espconn_secure_set_size(ESPCONN_CLIENT, MQTT_SSL_SIZE);
  814. espconn_secure_connect(mqttClient->pCon);
  815. #else
  816. MQTT_INFO("TCP: Do not support SSL\r\n");
  817. #endif
  818. }
  819. else
  820. {
  821. espconn_connect(mqttClient->pCon);
  822. }
  823. }
  824. else {
  825. MQTT_INFO("TCP: Connect to domain %s:%d\r\n", mqttClient->host, mqttClient->port);
  826. espconn_gethostbyname(mqttClient->pCon, mqttClient->host, &mqttClient->ip, mqtt_dns_found);
  827. }
  828. mqttClient->connState = TCP_CONNECTING;
  829. }
  830. void ICACHE_FLASH_ATTR
  831. MQTT_Disconnect(MQTT_Client *mqttClient)
  832. {
  833. mqttClient->connState = TCP_DISCONNECTING;
  834. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
  835. os_timer_disarm(&mqttClient->mqttTimer);
  836. }
  837. void ICACHE_FLASH_ATTR
  838. MQTT_DeleteClient(MQTT_Client *mqttClient)
  839. {
  840. if (NULL == mqttClient)
  841. return;
  842. mqttClient->connState = MQTT_DELETED;
  843. // if(TCP_DISCONNECTED == mqttClient->connState) {
  844. // mqttClient->connState = MQTT_DELETED;
  845. // } else if(MQTT_DELETED != mqttClient->connState) {
  846. // mqttClient->connState = MQTT_DELETING;
  847. // }
  848. system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
  849. os_timer_disarm(&mqttClient->mqttTimer);
  850. }
  851. void ICACHE_FLASH_ATTR
  852. MQTT_OnConnected(MQTT_Client *mqttClient, MqttCallback connectedCb)
  853. {
  854. mqttClient->connectedCb = connectedCb;
  855. }
  856. void ICACHE_FLASH_ATTR
  857. MQTT_OnDisconnected(MQTT_Client *mqttClient, MqttCallback disconnectedCb)
  858. {
  859. mqttClient->disconnectedCb = disconnectedCb;
  860. }
  861. void ICACHE_FLASH_ATTR
  862. MQTT_OnData(MQTT_Client *mqttClient, MqttDataCallback dataCb)
  863. {
  864. mqttClient->dataCb = dataCb;
  865. }
  866. void ICACHE_FLASH_ATTR
  867. MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb)
  868. {
  869. mqttClient->publishedCb = publishedCb;
  870. }
  871. void ICACHE_FLASH_ATTR
  872. MQTT_OnTimeout(MQTT_Client *mqttClient, MqttCallback timeoutCb)
  873. {
  874. mqttClient->timeoutCb = timeoutCb;
  875. }