123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995 |
- #include "user_interface.h"
- #include "osapi.h"
- #include "espconn.h"
- #include "os_type.h"
- #include "mem.h"
- #include "mqtt_msg.h"
- #include "debug.h"
- #include "user_config.h"
- #include "mqtt.h"
- #include "queue.h"
- #define MQTT_TASK_PRIO 2
- #define MQTT_TASK_QUEUE_SIZE 1
- #define MQTT_SEND_TIMOUT 5
- #ifndef MQTT_SSL_SIZE
- #define MQTT_SSL_SIZE 5120
- #endif
- #ifndef QUEUE_BUFFER_SIZE
- #define QUEUE_BUFFER_SIZE 2048
- #endif
- unsigned char *default_certificate;
- unsigned int default_certificate_len = 0;
- unsigned char *default_private_key;
- unsigned int default_private_key_len = 0;
- os_event_t mqtt_procTaskQueue[MQTT_TASK_QUEUE_SIZE];
- #ifdef PROTOCOL_NAMEv311
- LOCAL uint8_t zero_len_id[2] = { 0, 0 };
- #endif
- LOCAL void ICACHE_FLASH_ATTR
- mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
- {
- struct espconn *pConn = (struct espconn *)arg;
- MQTT_Client* client = (MQTT_Client *)pConn->reverse;
- if (ipaddr == NULL)
- {
- MQTT_INFO("DNS: Found, but got no ip, try to reconnect\r\n");
- client->connState = TCP_RECONNECT_REQ;
- return;
- }
- MQTT_INFO("DNS: found ip %d.%d.%d.%d\n",
- *((uint8 *) &ipaddr->addr),
- *((uint8 *) &ipaddr->addr + 1),
- *((uint8 *) &ipaddr->addr + 2),
- *((uint8 *) &ipaddr->addr + 3));
- if (client->ip.addr == 0 && ipaddr->addr != 0)
- {
- os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4);
- if (client->security) {
- #ifdef MQTT_SSL_ENABLE
- espconn_secure_set_size(ESPCONN_CLIENT, MQTT_SSL_SIZE);
- espconn_secure_connect(client->pCon);
- #else
- MQTT_INFO("TCP: Do not support SSL\r\n");
- #endif
- }
- else {
- espconn_connect(client->pCon);
- }
- client->connState = TCP_CONNECTING;
- MQTT_INFO("TCP: connecting...\r\n");
- }
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- }
- LOCAL void ICACHE_FLASH_ATTR
- deliver_publish(MQTT_Client* client, uint8_t* message, int length)
- {
- mqtt_event_data_t event_data;
- event_data.topic_length = length;
- event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length);
- event_data.data_length = length;
- event_data.data = mqtt_get_publish_data(message, &event_data.data_length);
- if (client->dataCb)
- client->dataCb((uint32_t*)client, event_data.topic, event_data.topic_length, event_data.data, event_data.data_length);
- }
- void ICACHE_FLASH_ATTR
- mqtt_send_keepalive(MQTT_Client *client)
- {
- MQTT_INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port);
- client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
- client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ;
- client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
- client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
- client->sendTimeout = MQTT_SEND_TIMOUT;
- MQTT_INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
- err_t result = ESPCONN_OK;
- if (client->security) {
- #ifdef MQTT_SSL_ENABLE
- result = espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
- #else
- MQTT_INFO("TCP: Do not support SSL\r\n");
- #endif
- }
- else {
- result = espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
- }
- client->mqtt_state.outbound_message = NULL;
- if (ESPCONN_OK == result) {
- client->keepAliveTick = 0;
- client->connState = MQTT_DATA;
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- }
- else {
- client->connState = TCP_RECONNECT_DISCONNECTING;
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- }
- }
- void ICACHE_FLASH_ATTR
- mqtt_tcpclient_delete(MQTT_Client *mqttClient)
- {
- if (mqttClient->pCon != NULL) {
- MQTT_INFO("TCP: Free memory\r\n");
-
- espconn_abort(mqttClient->pCon);
-
- espconn_delete(mqttClient->pCon);
- if (mqttClient->pCon->proto.tcp) {
- os_free(mqttClient->pCon->proto.tcp);
- mqttClient->pCon->proto.tcp = NULL;
- }
- os_free(mqttClient->pCon);
- mqttClient->pCon = NULL;
- }
- }
- void ICACHE_FLASH_ATTR
- mqtt_client_delete(MQTT_Client *mqttClient)
- {
- if (mqttClient == NULL)
- return;
- if (mqttClient->pCon != NULL) {
- mqtt_tcpclient_delete(mqttClient);
- }
- if (mqttClient->host != NULL) {
- os_free(mqttClient->host);
- mqttClient->host = NULL;
- }
- if (mqttClient->user_data != NULL) {
- os_free(mqttClient->user_data);
- mqttClient->user_data = NULL;
- }
- if (mqttClient->mqtt_state.in_buffer != NULL) {
- os_free(mqttClient->mqtt_state.in_buffer);
- mqttClient->mqtt_state.in_buffer = NULL;
- }
- if (mqttClient->mqtt_state.out_buffer != NULL) {
- os_free(mqttClient->mqtt_state.out_buffer);
- mqttClient->mqtt_state.out_buffer = NULL;
- }
- if (mqttClient->mqtt_state.outbound_message != NULL) {
- if (mqttClient->mqtt_state.outbound_message->data != NULL)
- {
- os_free(mqttClient->mqtt_state.outbound_message->data);
- mqttClient->mqtt_state.outbound_message->data = NULL;
- }
- }
- if (mqttClient->mqtt_state.mqtt_connection.buffer != NULL) {
-
- mqttClient->mqtt_state.mqtt_connection.buffer = NULL;
- }
- if (mqttClient->connect_info.client_id != NULL) {
- #ifdef PROTOCOL_NAMEv311
-
- if ( ((uint8_t*)mqttClient->connect_info.client_id) != zero_len_id )
- os_free(mqttClient->connect_info.client_id);
- #else
- os_free(mqttClient->connect_info.client_id);
- #endif
- mqttClient->connect_info.client_id = NULL;
- }
- if (mqttClient->connect_info.username != NULL) {
- os_free(mqttClient->connect_info.username);
- mqttClient->connect_info.username = NULL;
- }
- if (mqttClient->connect_info.password != NULL) {
- os_free(mqttClient->connect_info.password);
- mqttClient->connect_info.password = NULL;
- }
- if (mqttClient->connect_info.will_topic != NULL) {
- os_free(mqttClient->connect_info.will_topic);
- mqttClient->connect_info.will_topic = NULL;
- }
- if (mqttClient->connect_info.will_message != NULL) {
- os_free(mqttClient->connect_info.will_message);
- mqttClient->connect_info.will_message = NULL;
- }
- if (mqttClient->msgQueue.buf != NULL) {
- os_free(mqttClient->msgQueue.buf);
- mqttClient->msgQueue.buf = NULL;
- }
-
- mqttClient->connState = WIFI_INIT;
-
- mqttClient->connectedCb = NULL;
- mqttClient->disconnectedCb = NULL;
- mqttClient->publishedCb = NULL;
- mqttClient->timeoutCb = NULL;
- mqttClient->dataCb = NULL;
- MQTT_INFO("MQTT: client already deleted\r\n");
- }
- void ICACHE_FLASH_ATTR
- mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
- {
- uint8_t msg_type;
- uint8_t msg_qos;
- uint16_t msg_id;
- uint8_t msg_conn_ret;
- struct espconn *pCon = (struct espconn*)arg;
- MQTT_Client *client = (MQTT_Client *)pCon->reverse;
- READPACKET:
- MQTT_INFO("TCP: data received %d bytes\r\n", len);
-
- if (len < MQTT_BUF_SIZE && len > 0) {
- os_memcpy(client->mqtt_state.in_buffer, pdata, len);
- msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
- msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
- msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
- switch (client->connState) {
- case MQTT_CONNECT_SENDING:
- if (msg_type == MQTT_MSG_TYPE_CONNACK) {
- if (client->mqtt_state.pending_msg_type != MQTT_MSG_TYPE_CONNECT) {
- MQTT_INFO("MQTT: Invalid packet\r\n");
- if (client->security) {
- #ifdef MQTT_SSL_ENABLE
- espconn_secure_disconnect(client->pCon);
- #else
- MQTT_INFO("TCP: Do not support SSL\r\n");
- #endif
- }
- else {
- espconn_disconnect(client->pCon);
- }
- } else {
- msg_conn_ret = mqtt_get_connect_return_code(client->mqtt_state.in_buffer);
- switch (msg_conn_ret) {
- case CONNECTION_ACCEPTED:
- MQTT_INFO("MQTT: Connected to %s:%d\r\n", client->host, client->port);
- client->connState = MQTT_DATA;
- if (client->connectedCb)
- client->connectedCb((uint32_t*)client);
- break;
- case CONNECTION_REFUSE_PROTOCOL:
- case CONNECTION_REFUSE_SERVER_UNAVAILABLE:
- case CONNECTION_REFUSE_BAD_USERNAME:
- case CONNECTION_REFUSE_NOT_AUTHORIZED:
- MQTT_INFO("MQTT: Connection refuse, reason code: %d\r\n", msg_conn_ret);
- default:
- if (client->security) {
- #ifdef MQTT_SSL_ENABLE
- espconn_secure_disconnect(client->pCon);
- #else
- MQTT_INFO("TCP: Do not support SSL\r\n");
- #endif
- }
- else {
- espconn_disconnect(client->pCon);
- }
- }
- }
- }
- break;
- case MQTT_DATA:
- case MQTT_KEEPALIVE_SEND:
- client->mqtt_state.message_length_read = len;
- client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
- switch (msg_type)
- {
- case MQTT_MSG_TYPE_SUBACK:
- if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
- MQTT_INFO("MQTT: Subscribe successful\r\n");
- break;
- case MQTT_MSG_TYPE_UNSUBACK:
- if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
- MQTT_INFO("MQTT: UnSubscribe successful\r\n");
- break;
- case MQTT_MSG_TYPE_PUBLISH:
- if (msg_qos == 1)
- client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
- else if (msg_qos == 2)
- client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
- if (msg_qos == 1 || msg_qos == 2) {
- MQTT_INFO("MQTT: Queue response QoS: %d\r\n", msg_qos);
- if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
- MQTT_INFO("MQTT: Queue full\r\n");
- }
- }
- deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
- break;
- case MQTT_MSG_TYPE_PUBACK:
- if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) {
- MQTT_INFO("MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\r\n");
- }
- break;
- case MQTT_MSG_TYPE_PUBREC:
- client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
- if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
- MQTT_INFO("MQTT: Queue full\r\n");
- }
- break;
- case MQTT_MSG_TYPE_PUBREL:
- client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
- if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
- MQTT_INFO("MQTT: Queue full\r\n");
- }
- break;
- case MQTT_MSG_TYPE_PUBCOMP:
- if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id) {
- MQTT_INFO("MQTT: receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish\r\n");
- }
- break;
- case MQTT_MSG_TYPE_PINGREQ:
- client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection);
- if (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
- MQTT_INFO("MQTT: Queue full\r\n");
- }
- break;
- case MQTT_MSG_TYPE_PINGRESP:
-
- break;
- }
-
-
-
- if (msg_type == MQTT_MSG_TYPE_PUBLISH)
- {
- len = client->mqtt_state.message_length_read;
- if (client->mqtt_state.message_length < client->mqtt_state.message_length_read)
- {
-
-
- len -= client->mqtt_state.message_length;
- pdata += client->mqtt_state.message_length;
- MQTT_INFO("Get another published message\r\n");
- goto READPACKET;
- }
- }
- break;
- }
- } else {
- MQTT_INFO("ERROR: Message too long\r\n");
- }
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- }
- void ICACHE_FLASH_ATTR
- mqtt_tcpclient_sent_cb(void *arg)
- {
- struct espconn *pCon = (struct espconn *)arg;
- MQTT_Client* client = (MQTT_Client *)pCon->reverse;
- MQTT_INFO("TCP: Sent\r\n");
- client->sendTimeout = 0;
- client->keepAliveTick = 0;
- if ((client->connState == MQTT_DATA || client->connState == MQTT_KEEPALIVE_SEND)
- && client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH) {
- if (client->publishedCb)
- client->publishedCb((uint32_t*)client);
- }
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- }
- void ICACHE_FLASH_ATTR mqtt_timer(void *arg)
- {
- MQTT_Client* client = (MQTT_Client*)arg;
- if (client->connState == MQTT_DATA) {
- client->keepAliveTick ++;
- if (client->keepAliveTick > (client->mqtt_state.connect_info->keepalive / 2)) {
- client->connState = MQTT_KEEPALIVE_SEND;
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- }
- } else if (client->connState == TCP_RECONNECT_REQ) {
- client->reconnectTick ++;
- if (client->reconnectTick > MQTT_RECONNECT_TIMEOUT) {
- client->reconnectTick = 0;
- client->connState = TCP_RECONNECT;
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- if (client->timeoutCb)
- client->timeoutCb((uint32_t*)client);
- }
- }
- if (client->sendTimeout > 0)
- client->sendTimeout --;
- }
- void ICACHE_FLASH_ATTR
- mqtt_tcpclient_discon_cb(void *arg)
- {
- struct espconn *pespconn = (struct espconn *)arg;
- MQTT_Client* client = (MQTT_Client *)pespconn->reverse;
- MQTT_INFO("TCP: Disconnected callback\r\n");
- if (TCP_DISCONNECTING == client->connState) {
- client->connState = TCP_DISCONNECTED;
- }
- else if (MQTT_DELETING == client->connState) {
- client->connState = MQTT_DELETED;
- }
- else {
- client->connState = TCP_RECONNECT_REQ;
- }
- if (client->disconnectedCb)
- client->disconnectedCb((uint32_t*)client);
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- }
- void ICACHE_FLASH_ATTR
- mqtt_tcpclient_connect_cb(void *arg)
- {
- struct espconn *pCon = (struct espconn *)arg;
- MQTT_Client* client = (MQTT_Client *)pCon->reverse;
- espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb);
- espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv);
- espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb);
- MQTT_INFO("MQTT: Connected to broker %s:%d\r\n", client->host, client->port);
- mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length);
- client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info);
- client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
- client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
- client->sendTimeout = MQTT_SEND_TIMOUT;
- MQTT_INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
- if (client->security) {
- #ifdef MQTT_SSL_ENABLE
- espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
- #else
- MQTT_INFO("TCP: Do not support SSL\r\n");
- #endif
- }
- else {
- espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
- }
- client->mqtt_state.outbound_message = NULL;
- client->connState = MQTT_CONNECT_SENDING;
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- }
- void ICACHE_FLASH_ATTR
- mqtt_tcpclient_recon_cb(void *arg, sint8 errType)
- {
- struct espconn *pCon = (struct espconn *)arg;
- MQTT_Client* client = (MQTT_Client *)pCon->reverse;
- MQTT_INFO("TCP: Reconnect to %s:%d\r\n", client->host, client->port);
- client->connState = TCP_RECONNECT_REQ;
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- }
- BOOL ICACHE_FLASH_ATTR
- MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain)
- {
- uint8_t dataBuffer[MQTT_BUF_SIZE];
- uint16_t dataLen;
- client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
- topic, data, data_length,
- qos, retain,
- &client->mqtt_state.pending_msg_id);
- if (client->mqtt_state.outbound_message->length == 0) {
- MQTT_INFO("MQTT: Queuing publish failed\r\n");
- return FALSE;
- }
- MQTT_INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
- while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
- MQTT_INFO("MQTT: Queue full\r\n");
- if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
- MQTT_INFO("MQTT: Serious buffer error\r\n");
- return FALSE;
- }
- }
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- return TRUE;
- }
- BOOL ICACHE_FLASH_ATTR
- MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos)
- {
- uint8_t dataBuffer[MQTT_BUF_SIZE];
- uint16_t dataLen;
- client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
- topic, qos,
- &client->mqtt_state.pending_msg_id);
- MQTT_INFO("MQTT: queue subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
- while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
- MQTT_INFO("MQTT: Queue full\r\n");
- if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
- MQTT_INFO("MQTT: Serious buffer error\r\n");
- return FALSE;
- }
- }
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- return TRUE;
- }
- BOOL ICACHE_FLASH_ATTR
- MQTT_UnSubscribe(MQTT_Client *client, char* topic)
- {
- uint8_t dataBuffer[MQTT_BUF_SIZE];
- uint16_t dataLen;
- client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
- topic,
- &client->mqtt_state.pending_msg_id);
- MQTT_INFO("MQTT: queue un-subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
- while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
- MQTT_INFO("MQTT: Queue full\r\n");
- if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
- MQTT_INFO("MQTT: Serious buffer error\r\n");
- return FALSE;
- }
- }
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- return TRUE;
- }
- BOOL ICACHE_FLASH_ATTR
- MQTT_Ping(MQTT_Client *client)
- {
- uint8_t dataBuffer[MQTT_BUF_SIZE];
- uint16_t dataLen;
- client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
- if (client->mqtt_state.outbound_message->length == 0) {
- MQTT_INFO("MQTT: Queuing publish failed\r\n");
- return FALSE;
- }
- MQTT_INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
- while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
- MQTT_INFO("MQTT: Queue full\r\n");
- if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
- MQTT_INFO("MQTT: Serious buffer error\r\n");
- return FALSE;
- }
- }
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
- return TRUE;
- }
- void ICACHE_FLASH_ATTR
- MQTT_Task(os_event_t *e)
- {
- MQTT_Client* client = (MQTT_Client*)e->par;
- uint8_t dataBuffer[MQTT_BUF_SIZE];
- uint16_t dataLen;
- if (e->par == 0)
- return;
- switch (client->connState) {
- case TCP_RECONNECT_REQ:
- break;
- case TCP_RECONNECT:
- mqtt_tcpclient_delete(client);
- MQTT_Connect(client);
- MQTT_INFO("TCP: Reconnect to: %s:%d\r\n", client->host, client->port);
- client->connState = TCP_CONNECTING;
- break;
- case MQTT_DELETING:
- case TCP_DISCONNECTING:
- case TCP_RECONNECT_DISCONNECTING:
- if (client->security) {
- #ifdef MQTT_SSL_ENABLE
- espconn_secure_disconnect(client->pCon);
- #else
- MQTT_INFO("TCP: Do not support SSL\r\n");
- #endif
- }
- else {
- espconn_disconnect(client->pCon);
- }
- break;
- case TCP_DISCONNECTED:
- MQTT_INFO("MQTT: Disconnected\r\n");
- mqtt_tcpclient_delete(client);
- break;
- case MQTT_DELETED:
- MQTT_INFO("MQTT: Deleted client\r\n");
- mqtt_client_delete(client);
- break;
- case MQTT_KEEPALIVE_SEND:
- mqtt_send_keepalive(client);
- break;
- case MQTT_DATA:
- if (QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0) {
- break;
- }
- if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0) {
- client->mqtt_state.pending_msg_type = mqtt_get_type(dataBuffer);
- client->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen);
- client->sendTimeout = MQTT_SEND_TIMOUT;
- MQTT_INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
- client->keepAliveTick = 0;
- if (client->security) {
- #ifdef MQTT_SSL_ENABLE
- espconn_secure_send(client->pCon, dataBuffer, dataLen);
- #else
- MQTT_INFO("TCP: Do not support SSL\r\n");
- #endif
- }
- else {
- espconn_send(client->pCon, dataBuffer, dataLen);
- }
- client->mqtt_state.outbound_message = NULL;
- break;
- }
- break;
- }
- }
- void ICACHE_FLASH_ATTR
- MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32_t port, uint8_t security)
- {
- uint32_t temp;
- MQTT_INFO("MQTT:InitConnection\r\n");
- os_memset(mqttClient, 0, sizeof(MQTT_Client));
- temp = os_strlen(host);
- mqttClient->host = (uint8_t*)os_zalloc(temp + 1);
- os_strcpy(mqttClient->host, host);
- mqttClient->host[temp] = 0;
- mqttClient->port = port;
- mqttClient->security = security;
- }
- BOOL ICACHE_FLASH_ATTR
- MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime, uint8_t cleanSession)
- {
- uint32_t temp;
- MQTT_INFO("MQTT:InitClient\r\n");
- os_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t));
- if ( !client_id )
- {
-
- #ifdef PROTOCOL_NAMEv311
- if (cleanSession)
- {
- mqttClient->connect_info.client_id = zero_len_id;
- } else {
- MQTT_INFO("cleanSession must be set to use 0 length client_id\r\n");
- return false;
- }
-
- #else
- MQTT_INFO("Client ID required for MQTT < 3.1.1!\r\n");
- return false;
- #endif
- }
-
- if ( !(mqttClient->connect_info.client_id) )
- {
- temp = os_strlen(client_id);
- mqttClient->connect_info.client_id = (uint8_t*)os_zalloc(temp + 1);
- os_strcpy(mqttClient->connect_info.client_id, client_id);
- mqttClient->connect_info.client_id[temp] = 0;
- }
- if (client_user)
- {
- temp = os_strlen(client_user);
- mqttClient->connect_info.username = (uint8_t*)os_zalloc(temp + 1);
- os_strcpy(mqttClient->connect_info.username, client_user);
- mqttClient->connect_info.username[temp] = 0;
- }
- if (client_pass)
- {
- temp = os_strlen(client_pass);
- mqttClient->connect_info.password = (uint8_t*)os_zalloc(temp + 1);
- os_strcpy(mqttClient->connect_info.password, client_pass);
- mqttClient->connect_info.password[temp] = 0;
- }
- mqttClient->connect_info.keepalive = keepAliveTime;
- mqttClient->connect_info.clean_session = cleanSession;
- mqttClient->mqtt_state.in_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);
- mqttClient->mqtt_state.in_buffer_length = MQTT_BUF_SIZE;
- mqttClient->mqtt_state.out_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);
- mqttClient->mqtt_state.out_buffer_length = MQTT_BUF_SIZE;
- mqttClient->mqtt_state.connect_info = &mqttClient->connect_info;
- mqtt_msg_init(&mqttClient->mqtt_state.mqtt_connection, mqttClient->mqtt_state.out_buffer, mqttClient->mqtt_state.out_buffer_length);
- QUEUE_Init(&mqttClient->msgQueue, QUEUE_BUFFER_SIZE);
- system_os_task(MQTT_Task, MQTT_TASK_PRIO, mqtt_procTaskQueue, MQTT_TASK_QUEUE_SIZE);
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
- return true;
- }
- void ICACHE_FLASH_ATTR
- MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain)
- {
- uint32_t temp;
- temp = os_strlen(will_topic);
- mqttClient->connect_info.will_topic = (uint8_t*)os_zalloc(temp + 1);
- os_strcpy(mqttClient->connect_info.will_topic, will_topic);
- mqttClient->connect_info.will_topic[temp] = 0;
- temp = os_strlen(will_msg);
- mqttClient->connect_info.will_message = (uint8_t*)os_zalloc(temp + 1);
- os_strcpy(mqttClient->connect_info.will_message, will_msg);
- mqttClient->connect_info.will_message[temp] = 0;
- mqttClient->connect_info.will_qos = will_qos;
- mqttClient->connect_info.will_retain = will_retain;
- }
- void ICACHE_FLASH_ATTR
- MQTT_Connect(MQTT_Client *mqttClient)
- {
- if (mqttClient->pCon) {
-
-
-
- mqtt_tcpclient_delete(mqttClient);
- }
- mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));
- mqttClient->pCon->type = ESPCONN_TCP;
- mqttClient->pCon->state = ESPCONN_NONE;
- mqttClient->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp));
- mqttClient->pCon->proto.tcp->local_port = espconn_port();
- mqttClient->pCon->proto.tcp->remote_port = mqttClient->port;
- mqttClient->pCon->reverse = mqttClient;
- espconn_regist_connectcb(mqttClient->pCon, mqtt_tcpclient_connect_cb);
- espconn_regist_reconcb(mqttClient->pCon, mqtt_tcpclient_recon_cb);
- mqttClient->keepAliveTick = 0;
- mqttClient->reconnectTick = 0;
- os_timer_disarm(&mqttClient->mqttTimer);
- os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient);
- os_timer_arm(&mqttClient->mqttTimer, 1000, 1);
- if (UTILS_StrToIP(mqttClient->host, &mqttClient->pCon->proto.tcp->remote_ip)) {
- MQTT_INFO("TCP: Connect to ip %s:%d\r\n", mqttClient->host, mqttClient->port);
- if (mqttClient->security)
- {
- #ifdef MQTT_SSL_ENABLE
- espconn_secure_set_size(ESPCONN_CLIENT, MQTT_SSL_SIZE);
- espconn_secure_connect(mqttClient->pCon);
- #else
- MQTT_INFO("TCP: Do not support SSL\r\n");
- #endif
- }
- else
- {
- espconn_connect(mqttClient->pCon);
- }
- }
- else {
- MQTT_INFO("TCP: Connect to domain %s:%d\r\n", mqttClient->host, mqttClient->port);
- espconn_gethostbyname(mqttClient->pCon, mqttClient->host, &mqttClient->ip, mqtt_dns_found);
- }
- mqttClient->connState = TCP_CONNECTING;
- }
- void ICACHE_FLASH_ATTR
- MQTT_Disconnect(MQTT_Client *mqttClient)
- {
- mqttClient->connState = TCP_DISCONNECTING;
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
- os_timer_disarm(&mqttClient->mqttTimer);
- }
- void ICACHE_FLASH_ATTR
- MQTT_DeleteClient(MQTT_Client *mqttClient)
- {
- if (NULL == mqttClient)
- return;
- mqttClient->connState = MQTT_DELETED;
-
-
-
-
-
- system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
- os_timer_disarm(&mqttClient->mqttTimer);
- }
- void ICACHE_FLASH_ATTR
- MQTT_OnConnected(MQTT_Client *mqttClient, MqttCallback connectedCb)
- {
- mqttClient->connectedCb = connectedCb;
- }
- void ICACHE_FLASH_ATTR
- MQTT_OnDisconnected(MQTT_Client *mqttClient, MqttCallback disconnectedCb)
- {
- mqttClient->disconnectedCb = disconnectedCb;
- }
- void ICACHE_FLASH_ATTR
- MQTT_OnData(MQTT_Client *mqttClient, MqttDataCallback dataCb)
- {
- mqttClient->dataCb = dataCb;
- }
- void ICACHE_FLASH_ATTR
- MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb)
- {
- mqttClient->publishedCb = publishedCb;
- }
- void ICACHE_FLASH_ATTR
- MQTT_OnTimeout(MQTT_Client *mqttClient, MqttCallback timeoutCb)
- {
- mqttClient->timeoutCb = timeoutCb;
- }
|