Merge branch 'bugfix/ws_client_sending_race' into 'master'

websocket_client: fix locking in ws client task

Closes IDFGH-1973

See merge request espressif/esp-idf!6266
This commit is contained in:
Angus Gratton 2019-10-30 12:33:59 +08:00
commit 02f6bc5438

View file

@ -239,7 +239,7 @@ esp_websocket_client_handle_t esp_websocket_client_init(const esp_websocket_clie
return NULL; return NULL;
} }
client->lock = xSemaphoreCreateMutex(); client->lock = xSemaphoreCreateRecursiveMutex();
ESP_WS_CLIENT_MEM_CHECK(TAG, client->lock, goto _websocket_init_fail); ESP_WS_CLIENT_MEM_CHECK(TAG, client->lock, goto _websocket_init_fail);
client->config = calloc(1, sizeof(websocket_config_storage_t)); client->config = calloc(1, sizeof(websocket_config_storage_t));
@ -424,6 +424,7 @@ esp_err_t esp_websocket_client_set_uri(esp_websocket_client_handle_t client, con
static void esp_websocket_client_task(void *pv) static void esp_websocket_client_task(void *pv)
{ {
const int lock_timeout = portMAX_DELAY;
int rlen; int rlen;
esp_websocket_client_handle_t client = (esp_websocket_client_handle_t) pv; esp_websocket_client_handle_t client = (esp_websocket_client_handle_t) pv;
client->run = true; client->run = true;
@ -442,8 +443,12 @@ static void esp_websocket_client_task(void *pv)
client->state = WEBSOCKET_STATE_INIT; client->state = WEBSOCKET_STATE_INIT;
xEventGroupClearBits(client->status_bits, STOPPED_BIT); xEventGroupClearBits(client->status_bits, STOPPED_BIT);
int read_select; int read_select = 0;
while (client->run) { while (client->run) {
if (xSemaphoreTakeRecursive(client->lock, lock_timeout) != pdPASS) {
ESP_LOGE(TAG, "Failed to lock ws-client tasks, exitting the task...");
break;
}
switch ((int)client->state) { switch ((int)client->state) {
case WEBSOCKET_STATE_INIT: case WEBSOCKET_STATE_INIT:
if (client->transport == NULL) { if (client->transport == NULL) {
@ -451,7 +456,6 @@ static void esp_websocket_client_task(void *pv)
client->run = false; client->run = false;
break; break;
} }
if (esp_transport_connect(client->transport, if (esp_transport_connect(client->transport,
client->config->host, client->config->host,
client->config->port, client->config->port,
@ -467,20 +471,14 @@ static void esp_websocket_client_task(void *pv)
break; break;
case WEBSOCKET_STATE_CONNECTED: case WEBSOCKET_STATE_CONNECTED:
read_select = esp_transport_poll_read(client->transport, 1000); //Poll every 1000ms
if (read_select < 0) {
ESP_LOGE(TAG, "Network error, errorno");
esp_websocket_client_abort_connection(client);
break;
}
if (_tick_get_ms() - client->ping_tick_ms > WEBSOCKET_PING_TIMEOUT_MS) { if (_tick_get_ms() - client->ping_tick_ms > WEBSOCKET_PING_TIMEOUT_MS) {
client->ping_tick_ms = _tick_get_ms(); client->ping_tick_ms = _tick_get_ms();
// Send PING ESP_LOGD(TAG, "Sending PING...");
esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PING, NULL, 0, client->config->network_timeout_ms); esp_transport_ws_send_raw(client->transport, WS_TRANSPORT_OPCODES_PING, NULL, 0, client->config->network_timeout_ms);
} }
if (read_select == 0) { if (read_select == 0) {
ESP_LOGD(TAG, "Timeout..."); ESP_LOGV(TAG, "Read poll timeout: skipping esp_transport_read()...");
continue; break;
} }
client->ping_tick_ms = _tick_get_ms(); client->ping_tick_ms = _tick_get_ms();
@ -512,9 +510,19 @@ static void esp_websocket_client_task(void *pv)
client->reconnect_tick_ms = _tick_get_ms(); client->reconnect_tick_ms = _tick_get_ms();
ESP_LOGD(TAG, "Reconnecting..."); ESP_LOGD(TAG, "Reconnecting...");
} }
vTaskDelay(client->wait_timeout_ms / 2 / portTICK_RATE_MS);
break; break;
} }
xSemaphoreGiveRecursive(client->lock);
if (WEBSOCKET_STATE_CONNECTED == client->state) {
read_select = esp_transport_poll_read(client->transport, 1000); //Poll every 1000ms
if (read_select < 0) {
ESP_LOGE(TAG, "Network error: esp_transport_poll_read() returned %d, errno=%d", read_select, errno);
esp_websocket_client_abort_connection(client);
}
} else if (WEBSOCKET_STATE_WAIT_TIMEOUT == client->state) {
// waiting for reconnecting...
vTaskDelay(client->wait_timeout_ms / 2 / portTICK_RATE_MS);
}
} }
esp_transport_close(client->transport); esp_transport_close(client->transport);
@ -576,25 +584,28 @@ static int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t c
{ {
int need_write = len; int need_write = len;
int wlen = 0, widx = 0; int wlen = 0, widx = 0;
int ret = ESP_FAIL;
if (client == NULL || data == NULL || len <= 0) { if (client == NULL || data == NULL || len <= 0) {
ESP_LOGE(TAG, "Invalid arguments"); ESP_LOGE(TAG, "Invalid arguments");
return ESP_FAIL; return ESP_FAIL;
} }
if (xSemaphoreTakeRecursive(client->lock, timeout) != pdPASS) {
ESP_LOGE(TAG, "Could not lock ws-client within %d timeout", timeout);
return ESP_FAIL;
}
if (!esp_websocket_client_is_connected(client)) { if (!esp_websocket_client_is_connected(client)) {
ESP_LOGE(TAG, "Websocket client is not connected"); ESP_LOGE(TAG, "Websocket client is not connected");
return ESP_FAIL; goto unlock_and_return;
} }
if (client->transport == NULL) { if (client->transport == NULL) {
ESP_LOGE(TAG, "Invalid transport"); ESP_LOGE(TAG, "Invalid transport");
return ESP_FAIL; goto unlock_and_return;
} }
if (xSemaphoreTake(client->lock, timeout) != pdPASS) {
return ESP_FAIL;
}
while (widx < len) { while (widx < len) {
if (need_write > client->buffer_size) { if (need_write > client->buffer_size) {
@ -604,14 +615,17 @@ static int esp_websocket_client_send_with_opcode(esp_websocket_client_handle_t c
// send with ws specific way and specific opcode // send with ws specific way and specific opcode
wlen = esp_transport_ws_send_raw(client->transport, opcode, (char *)client->tx_buffer, need_write, timeout); wlen = esp_transport_ws_send_raw(client->transport, opcode, (char *)client->tx_buffer, need_write, timeout);
if (wlen <= 0) { if (wlen <= 0) {
xSemaphoreGive(client->lock); ret = wlen;
return wlen; ESP_LOGE(TAG, "Network error: esp_transport_write() returned %d, errno=%d", ret, errno);
goto unlock_and_return;
} }
widx += wlen; widx += wlen;
need_write = len - widx; need_write = len - widx;
} }
xSemaphoreGive(client->lock); ret = widx;
return widx; unlock_and_return:
xSemaphoreGiveRecursive(client->lock);
return ret;
} }
bool esp_websocket_client_is_connected(esp_websocket_client_handle_t client) bool esp_websocket_client_is_connected(esp_websocket_client_handle_t client)