Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 47 additions & 47 deletions app/modules/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ typedef struct lmqtt_userdata
uint8_t secure;
#endif
bool connected; // indicate socket connected, not mqtt prot connected.
bool keepalive_sent;
ETSTimer mqttTimer;
tConnState connState;
}lmqtt_userdata;
Expand Down Expand Up @@ -212,9 +213,9 @@ static void mqtt_connack_fail(lmqtt_userdata * mud, int reason_code)
{
return;
}
lua_State *L = lua_getstate();

lua_State *L = lua_getstate();

lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref);
lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua
lua_pushinteger(L, reason_code);
Expand All @@ -241,12 +242,12 @@ static sint8 mqtt_send_if_possible(struct espconn *pesp_conn)
#ifdef CLIENT_SSL_ENABLE
if( mud->secure )
{
espconn_status = espconn_secure_send( pesp_conn, pending_msg->msg.data, pending_msg->msg.length );
espconn_status = espconn_secure_send( pesp_conn, pending_msg->msg.data, pending_msg->msg.length );
}
else
#endif
{
espconn_status = espconn_send( pesp_conn, pending_msg->msg.data, pending_msg->msg.length );
espconn_status = espconn_send( pesp_conn, pending_msg->msg.data, pending_msg->msg.length );
}
mud->keep_alive_tick = 0;
}
Expand Down Expand Up @@ -275,7 +276,7 @@ static void mqtt_socket_received(void *arg, char *pdata, unsigned short len)

READPACKET:
if(length > MQTT_BUF_SIZE || length <= 0)
return;
return;

// c_memcpy(in_buffer, pdata, length);
uint8_t temp_buffer[MQTT_BUF_SIZE];
Expand All @@ -287,7 +288,7 @@ static void mqtt_socket_received(void *arg, char *pdata, unsigned short len)
case MQTT_CONNECT_SENDING:
case MQTT_CONNECT_SENT:
mud->event_timeout = 0;

if(mqtt_get_type(in_buffer) != MQTT_MSG_TYPE_CONNACK){
NODE_DBG("MQTT: Invalid packet\r\n");
mud->connState = MQTT_INIT;
Expand All @@ -301,16 +302,16 @@ static void mqtt_socket_received(void *arg, char *pdata, unsigned short len)
{
espconn_disconnect(pesp_conn);
}

mqtt_connack_fail(mud, MQTT_CONN_FAIL_NOT_A_CONNACK_MSG);

break;

} else if (mqtt_get_connect_ret_code(in_buffer) != MQTT_CONNACK_ACCEPTED) {
NODE_DBG("MQTT: CONNACK REFUSED (CODE: %d)\n", mqtt_get_connect_ret_code(in_buffer));

mud->connState = MQTT_INIT;

#ifdef CLIENT_SSL_ENABLE
if(mud->secure)
{
Expand All @@ -320,12 +321,12 @@ static void mqtt_socket_received(void *arg, char *pdata, unsigned short len)
#endif
{
espconn_disconnect(pesp_conn);
}
}

mqtt_connack_fail(mud, mqtt_get_connect_ret_code(in_buffer));

break;

} else {
mud->connState = MQTT_DATA;
NODE_DBG("MQTT: Connected\r\n");
Expand Down Expand Up @@ -454,6 +455,7 @@ static void mqtt_socket_received(void *arg, char *pdata, unsigned short len)
break;
case MQTT_MSG_TYPE_PINGRESP:
// Ignore
mud->keepalive_sent = 0;
NODE_DBG("MQTT: PINGRESP received\r\n");
break;
}
Expand Down Expand Up @@ -585,7 +587,7 @@ void mqtt_socket_timer(void *arg)
NODE_DBG("timer, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
if(mud->event_timeout > 0){
NODE_DBG("event_timeout: %d.\n", mud->event_timeout);
mud->event_timeout --;
mud->event_timeout --;
if(mud->event_timeout > 0){
return;
} else {
Expand All @@ -605,7 +607,7 @@ void mqtt_socket_timer(void *arg)
NODE_DBG("sSend MQTT_CONNECT failed.\n");
mud->connState = MQTT_INIT;
mqtt_connack_fail(mud, MQTT_CONN_FAIL_TIMEOUT_SENDING);

#ifdef CLIENT_SSL_ENABLE
if(mud->secure)
{
Expand All @@ -620,7 +622,7 @@ void mqtt_socket_timer(void *arg)
} else if(mud->connState == MQTT_CONNECT_SENT) { // wait for CONACK time out.
NODE_DBG("MQTT_CONNECT timeout.\n");
mud->connState = MQTT_INIT;

#ifdef CLIENT_SSL_ENABLE
if(mud->secure)
{
Expand All @@ -630,7 +632,7 @@ void mqtt_socket_timer(void *arg)
#endif
{
espconn_disconnect(mud->pesp_conn);
}
}
mqtt_connack_fail(mud, MQTT_CONN_FAIL_TIMEOUT_RECEIVING);
} else if(mud->connState == MQTT_DATA){
msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q));
Expand All @@ -640,13 +642,20 @@ void mqtt_socket_timer(void *arg)
// no queued event.
mud->keep_alive_tick ++;
if(mud->keep_alive_tick > mud->mqtt_state.connect_info->keepalive){
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
NODE_DBG("\r\nMQTT: Send keepalive packet\r\n");
mqtt_message_t* temp_msg = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection);
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
0, MQTT_MSG_TYPE_PINGREQ, (int)mqtt_get_qos(temp_msg->data) );
mqtt_send_if_possible(mud->pesp_conn);
if (mud->keepalive_sent) {
// Oh dear -- keepalive timer expired and still no ack of previous message
mqtt_socket_reconnected(mud->pesp_conn, 0);
} else {
uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
NODE_DBG("\r\nMQTT: Send keepalive packet\r\n");
mqtt_message_t* temp_msg = mqtt_msg_pingreq(&mud->mqtt_state.mqtt_connection);
msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
0, MQTT_MSG_TYPE_PINGREQ, (int)mqtt_get_qos(temp_msg->data) );
mud->keepalive_sent = 1;
mud->keep_alive_tick = 0; // Need to reset to zero in case flow control stopped.
mqtt_send_if_possible(mud->pesp_conn);
}
}
}
}
Expand Down Expand Up @@ -675,6 +684,7 @@ static int mqtt_socket_client( lua_State* L )

// create a object
mud = (lmqtt_userdata *)lua_newuserdata(L, sizeof(lmqtt_userdata));
c_memset(mud, 0, sizeof(*mud));
// pre-initialize it, in case of errors
mud->self_ref = LUA_NOREF;
mud->cb_connect_ref = LUA_NOREF;
Expand All @@ -685,18 +695,8 @@ static int mqtt_socket_client( lua_State* L )
mud->cb_suback_ref = LUA_NOREF;
mud->cb_unsuback_ref = LUA_NOREF;
mud->cb_puback_ref = LUA_NOREF;
mud->pesp_conn = NULL;
#ifdef CLIENT_SSL_ENABLE
mud->secure = 0;
#endif

mud->keep_alive_tick = 0;
mud->event_timeout = 0;
mud->connState = MQTT_INIT;
mud->connected = false;
c_memset(&mud->mqttTimer, 0, sizeof(ETSTimer));
c_memset(&mud->mqtt_state, 0, sizeof(mqtt_state_t));
c_memset(&mud->connect_info, 0, sizeof(mqtt_connect_info_t));

// set its metatable
luaL_getmetatable(L, "mqtt.socket");
Expand Down Expand Up @@ -761,7 +761,7 @@ static int mqtt_socket_client( lua_State* L )
c_free(mud->connect_info.password);
mud->connect_info.password = NULL;
}
return luaL_error(L, "not enough memory");
return luaL_error(L, "not enough memory");
}

c_memcpy(mud->connect_info.client_id, clientId, idl);
Expand Down Expand Up @@ -819,8 +819,8 @@ static int mqtt_delete( lua_State* L )

// ---- alloc-ed in mqtt_socket_lwt()
if(mud->connect_info.will_topic){
c_free(mud->connect_info.will_topic);
mud->connect_info.will_topic = NULL;
c_free(mud->connect_info.will_topic);
mud->connect_info.will_topic = NULL;
}

if(mud->connect_info.will_message){
Expand Down Expand Up @@ -926,15 +926,15 @@ static sint8 socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
if( dns_reconn_count >= 5 ){
NODE_ERR( "DNS Fail!\n" );
// Note: should delete the pesp_conn or unref self_ref here.

struct espconn *pesp_conn = arg;
if(pesp_conn != NULL) {
lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse;
if(mud != NULL) {
mqtt_connack_fail(mud, MQTT_CONN_FAIL_DNS);
}
}

mqtt_socket_disconnected(arg); // although not connected, but fire disconnect callback to release every thing.
return -1;
}
Expand Down Expand Up @@ -1070,7 +1070,7 @@ static int mqtt_socket_connect( lua_State* L )
}

stack++;

// call back function when a connection fails
if ((stack<=top) && (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION)){
lua_pushvalue(L, stack); // copy argument (func) to the top of stack
Expand Down Expand Up @@ -1135,7 +1135,7 @@ static int mqtt_socket_close( lua_State* L )
// Send disconnect message
mqtt_message_t* temp_msg = mqtt_msg_disconnect(&mud->mqtt_state.mqtt_connection);
NODE_DBG("Send MQTT disconnect infomation, data len: %d, d[0]=%d \r\n", temp_msg->length, temp_msg->data[0]);

#ifdef CLIENT_SSL_ENABLE
if(mud->secure) {
espconn_status = espconn_secure_send(mud->pesp_conn, temp_msg->data, temp_msg->length);
Expand All @@ -1150,7 +1150,7 @@ static int mqtt_socket_close( lua_State* L )
}
}
mud->connected = 0;

while (mud->mqtt_state.pending_msg_q) {
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
}
Expand Down Expand Up @@ -1595,7 +1595,7 @@ static const LUA_REG_TYPE mqtt_socket_map[] = {
{ LNILKEY, LNILVAL }
};


static const LUA_REG_TYPE mqtt_map[] = {
{ LSTRKEY( "Client" ), LFUNCVAL( mqtt_socket_client ) },

Expand All @@ -1609,7 +1609,7 @@ static const LUA_REG_TYPE mqtt_map[] = {
{ LSTRKEY( "CONNACK_REFUSED_ID_REJECTED" ), LNUMVAL( MQTT_CONNACK_REFUSED_ID_REJECTED ) },
{ LSTRKEY( "CONNACK_REFUSED_SERVER_UNAVAILABLE" ), LNUMVAL( MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE ) },
{ LSTRKEY( "CONNACK_REFUSED_BAD_USER_OR_PASS" ), LNUMVAL( MQTT_CONNACK_REFUSED_BAD_USER_OR_PASS ) },
{ LSTRKEY( "CONNACK_REFUSED_NOT_AUTHORIZED" ), LNUMVAL( MQTT_CONNACK_REFUSED_NOT_AUTHORIZED ) },
{ LSTRKEY( "CONNACK_REFUSED_NOT_AUTHORIZED" ), LNUMVAL( MQTT_CONNACK_REFUSED_NOT_AUTHORIZED ) },

{ LSTRKEY( "__metatable" ), LROVAL( mqtt_map ) },
{ LNILKEY, LNILVAL }
Expand Down