VFS: Support concurrent VFS select calls

Closes https://github.com/espressif/esp-idf/issues/3392
This commit is contained in:
Roland Dobai 2019-07-10 14:08:21 +02:00
parent ac5508efd5
commit 91ce5db172
4 changed files with 253 additions and 155 deletions

View file

@ -236,7 +236,7 @@ typedef struct
#endif // CONFIG_VFS_SUPPORT_TERMIOS #endif // CONFIG_VFS_SUPPORT_TERMIOS
/** start_select is called for setting up synchronous I/O multiplexing of the desired file descriptors in the given VFS */ /** start_select is called for setting up synchronous I/O multiplexing of the desired file descriptors in the given VFS */
esp_err_t (*start_select)(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, esp_vfs_select_sem_t sem); esp_err_t (*start_select)(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, esp_vfs_select_sem_t sem, void **end_select_args);
/** socket select function for socket FDs with the functionality of POSIX select(); this should be set only for the socket VFS */ /** socket select function for socket FDs with the functionality of POSIX select(); this should be set only for the socket VFS */
int (*socket_select)(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds, struct timeval *timeout); int (*socket_select)(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds, struct timeval *timeout);
/** called by VFS to interrupt the socket_select call when select is activated from a non-socket VFS driver; set only for the socket driver */ /** called by VFS to interrupt the socket_select call when select is activated from a non-socket VFS driver; set only for the socket driver */
@ -246,7 +246,7 @@ typedef struct
/** end_select is called to stop the I/O multiplexing and deinitialize the environment created by start_select for the given VFS */ /** end_select is called to stop the I/O multiplexing and deinitialize the environment created by start_select for the given VFS */
void* (*get_socket_select_semaphore)(void); void* (*get_socket_select_semaphore)(void);
/** get_socket_select_semaphore returns semaphore allocated in the socket driver; set only for the socket driver */ /** get_socket_select_semaphore returns semaphore allocated in the socket driver; set only for the socket driver */
void (*end_select)(void); esp_err_t (*end_select)(void *end_select_args);
} esp_vfs_t; } esp_vfs_t;

View file

@ -31,6 +31,16 @@ typedef struct {
xSemaphoreHandle sem; xSemaphoreHandle sem;
} test_task_param_t; } test_task_param_t;
typedef struct {
fd_set *rdfds;
fd_set *wrfds;
fd_set *errfds;
int maxfds;
struct timeval *tv;
int select_ret;
xSemaphoreHandle sem;
} test_select_task_param_t;
static const char message[] = "Hello world!"; static const char message[] = "Hello world!";
static int open_dummy_socket(void) static int open_dummy_socket(void)
@ -420,73 +430,121 @@ TEST_CASE("poll() timeout", "[vfs]")
deinit(uart_fd, socket_fd); deinit(uart_fd, socket_fd);
} }
static void select_task(void *param) static void select_task(void *task_param)
{ {
const test_task_param_t *test_task_param = param; const test_select_task_param_t *param = task_param;
struct timeval tv = {
.tv_sec = 0,
.tv_usec = 100000,
};
fd_set rfds; int s = select(param->maxfds, param->rdfds, param->wrfds, param->errfds, param->tv);
FD_ZERO(&rfds); TEST_ASSERT_EQUAL(param->select_ret, s);
FD_SET(test_task_param->fd, &rfds);
int s = select(test_task_param->fd + 1, &rfds, NULL, NULL, &tv); if (param->sem) {
TEST_ASSERT_EQUAL(0, s); //timeout xSemaphoreGive(param->sem);
if (test_task_param->sem) {
xSemaphoreGive(test_task_param->sem);
} }
vTaskDelete(NULL); vTaskDelete(NULL);
} }
TEST_CASE("concurent selects work", "[vfs]") static void inline start_select_task(test_select_task_param_t *param)
{ {
struct timeval tv = { xTaskCreate(select_task, "select_task", 4*1024, (void *) param, 5, NULL);
.tv_sec = 0, }
.tv_usec = 100000,//irrelevant
};
TEST_CASE("concurrent selects work", "[vfs]")
{
int uart_fd, socket_fd; int uart_fd, socket_fd;
init(&uart_fd, &socket_fd); init(&uart_fd, &socket_fd);
const int dummy_socket_fd = open_dummy_socket(); const int dummy_socket_fd = open_dummy_socket();
fd_set rfds; {
FD_ZERO(&rfds); // Two tasks will wait for the same UART FD for reading and they will time-out
FD_SET(uart_fd, &rfds);
test_task_param_t test_task_param = { struct timeval tv = {
.fd = uart_fd, .tv_sec = 0,
.sem = xSemaphoreCreateBinary(), .tv_usec = 100000,
}; };
TEST_ASSERT_NOT_NULL(test_task_param.sem);
xTaskCreate(select_task, "select_task", 4*1024, (void *) &test_task_param, 5, NULL); fd_set rdfds1;
vTaskDelay(10 / portTICK_PERIOD_MS); //make sure the task has started and waits in select() FD_ZERO(&rdfds1);
FD_SET(uart_fd, &rdfds1);
int s = select(uart_fd + 1, &rfds, NULL, NULL, &tv); test_select_task_param_t param = {
TEST_ASSERT_EQUAL(-1, s); //this select should fail because two selects are accessing UART .rdfds = &rdfds1,
//(the other one is waiting for the timeout) .wrfds = NULL,
TEST_ASSERT_EQUAL(EINTR, errno); .errfds = NULL,
.maxfds = uart_fd + 1,
.tv = &tv,
.select_ret = 0, // expected timeout
.sem = xSemaphoreCreateBinary(),
};
TEST_ASSERT_NOT_NULL(param.sem);
TEST_ASSERT_EQUAL(pdTRUE, xSemaphoreTake(test_task_param.sem, 1000 / portTICK_PERIOD_MS)); fd_set rdfds2;
FD_ZERO(&rdfds2);
FD_SET(uart_fd, &rdfds2);
FD_SET(socket_fd, &rdfds2);
FD_SET(dummy_socket_fd, &rdfds2);
FD_ZERO(&rfds); start_select_task(&param);
FD_SET(socket_fd, &rfds); vTaskDelay(10 / portTICK_PERIOD_MS); //make sure the task has started and waits in select()
test_task_param.fd = dummy_socket_fd; int s = select(MAX(MAX(uart_fd, dummy_socket_fd), socket_fd) + 1, &rdfds2, NULL, NULL, &tv);
TEST_ASSERT_EQUAL(0, s); // timeout here as well
xTaskCreate(select_task, "select_task", 4*1024, (void *) &test_task_param, 5, NULL); TEST_ASSERT_EQUAL(pdTRUE, xSemaphoreTake(param.sem, 1000 / portTICK_PERIOD_MS));
vTaskDelay(10 / portTICK_PERIOD_MS); //make sure the task has started and waits in select() vSemaphoreDelete(param.sem);
}
s = select(socket_fd + 1, &rfds, NULL, NULL, &tv); {
TEST_ASSERT_EQUAL(0, s); //this select should timeout as well as the concurrent one because // One tasks waits for UART reading and one for writing. The former will be successful and latter will
//concurrent socket select should work // time-out.
TEST_ASSERT_EQUAL(pdTRUE, xSemaphoreTake(test_task_param.sem, 1000 / portTICK_PERIOD_MS)); struct timeval tv = {
vSemaphoreDelete(test_task_param.sem); .tv_sec = 0,
.tv_usec = 100000,
};
fd_set wrfds1;
FD_ZERO(&wrfds1);
FD_SET(uart_fd, &wrfds1);
test_select_task_param_t param = {
.rdfds = NULL,
.wrfds = &wrfds1,
.errfds = NULL,
.maxfds = uart_fd + 1,
.tv = &tv,
.select_ret = 0, // expected timeout
.sem = xSemaphoreCreateBinary(),
};
TEST_ASSERT_NOT_NULL(param.sem);
start_select_task(&param);
fd_set rdfds2;
FD_ZERO(&rdfds2);
FD_SET(uart_fd, &rdfds2);
FD_SET(socket_fd, &rdfds2);
FD_SET(dummy_socket_fd, &rdfds2);
const test_task_param_t send_param = {
.fd = uart_fd,
.delay_ms = 50,
.sem = xSemaphoreCreateBinary(),
};
TEST_ASSERT_NOT_NULL(send_param.sem);
start_task(&send_param); // This task will write to UART which will be detected by select()
int s = select(MAX(MAX(uart_fd, dummy_socket_fd), socket_fd) + 1, &rdfds2, NULL, NULL, &tv);
TEST_ASSERT_EQUAL(1, s);
TEST_ASSERT(FD_ISSET(uart_fd, &rdfds2));
TEST_ASSERT_UNLESS(FD_ISSET(socket_fd, &rdfds2));
TEST_ASSERT_UNLESS(FD_ISSET(dummy_socket_fd, &rdfds2));
TEST_ASSERT_EQUAL(pdTRUE, xSemaphoreTake(param.sem, 1000 / portTICK_PERIOD_MS));
vSemaphoreDelete(param.sem);
TEST_ASSERT_EQUAL(pdTRUE, xSemaphoreTake(send_param.sem, 1000 / portTICK_PERIOD_MS));
vSemaphoreDelete(send_param.sem);
}
deinit(uart_fd, socket_fd); deinit(uart_fd, socket_fd);
close(dummy_socket_fd); close(dummy_socket_fd);

View file

@ -794,13 +794,16 @@ int truncate(const char *path, off_t length)
return ret; return ret;
} }
static void call_end_selects(int end_index, const fds_triple_t *vfs_fds_triple) static void call_end_selects(int end_index, const fds_triple_t *vfs_fds_triple, void **driver_args)
{ {
for (int i = 0; i < end_index; ++i) { for (int i = 0; i < end_index; ++i) {
const vfs_entry_t *vfs = get_vfs_for_index(i); const vfs_entry_t *vfs = get_vfs_for_index(i);
const fds_triple_t *item = &vfs_fds_triple[i]; const fds_triple_t *item = &vfs_fds_triple[i];
if (vfs && vfs->vfs.end_select && item->isset) { if (vfs && vfs->vfs.end_select && item->isset) {
vfs->vfs.end_select(); esp_err_t err = vfs->vfs.end_select(driver_args[i]);
if (err != ESP_OK) {
ESP_LOGD(TAG, "end_select failed: %s", esp_err_to_name(err));
}
} }
} }
} }
@ -947,6 +950,15 @@ int esp_vfs_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds
} }
} }
void **driver_args = calloc(s_vfs_count, sizeof(void *));
if (driver_args == NULL) {
free(vfs_fds_triple);
__errno_r(r) = ENOMEM;
ESP_LOGD(TAG, "calloc is unsuccessful for driver args");
return -1;
}
for (int i = 0; i < s_vfs_count; ++i) { for (int i = 0; i < s_vfs_count; ++i) {
const vfs_entry_t *vfs = get_vfs_for_index(i); const vfs_entry_t *vfs = get_vfs_for_index(i);
fds_triple_t *item = &vfs_fds_triple[i]; fds_triple_t *item = &vfs_fds_triple[i];
@ -958,16 +970,18 @@ int esp_vfs_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds
esp_vfs_log_fd_set("readfds", &item->readfds); esp_vfs_log_fd_set("readfds", &item->readfds);
esp_vfs_log_fd_set("writefds", &item->writefds); esp_vfs_log_fd_set("writefds", &item->writefds);
esp_vfs_log_fd_set("errorfds", &item->errorfds); esp_vfs_log_fd_set("errorfds", &item->errorfds);
esp_err_t err = vfs->vfs.start_select(nfds, &item->readfds, &item->writefds, &item->errorfds, sel_sem); esp_err_t err = vfs->vfs.start_select(nfds, &item->readfds, &item->writefds, &item->errorfds, sel_sem,
driver_args + i);
if (err != ESP_OK) { if (err != ESP_OK) {
call_end_selects(i, vfs_fds_triple); call_end_selects(i, vfs_fds_triple, driver_args);
(void) set_global_fd_sets(vfs_fds_triple, s_vfs_count, readfds, writefds, errorfds); (void) set_global_fd_sets(vfs_fds_triple, s_vfs_count, readfds, writefds, errorfds);
if (sel_sem.is_sem_local && sel_sem.sem) { if (sel_sem.is_sem_local && sel_sem.sem) {
vSemaphoreDelete(sel_sem.sem); vSemaphoreDelete(sel_sem.sem);
sel_sem.sem = NULL; sel_sem.sem = NULL;
} }
free(vfs_fds_triple); free(vfs_fds_triple);
free(driver_args);
__errno_r(r) = EINTR; __errno_r(r) = EINTR;
ESP_LOGD(TAG, "start_select failed: %s", esp_err_to_name(err)); ESP_LOGD(TAG, "start_select failed: %s", esp_err_to_name(err));
return -1; return -1;
@ -1006,7 +1020,7 @@ int esp_vfs_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds
xSemaphoreTake(sel_sem.sem, ticks_to_wait); xSemaphoreTake(sel_sem.sem, ticks_to_wait);
} }
call_end_selects(s_vfs_count, vfs_fds_triple); // for VFSs for start_select was called before call_end_selects(s_vfs_count, vfs_fds_triple, driver_args); // for VFSs for start_select was called before
if (ret >= 0) { if (ret >= 0) {
ret += set_global_fd_sets(vfs_fds_triple, s_vfs_count, readfds, writefds, errorfds); ret += set_global_fd_sets(vfs_fds_triple, s_vfs_count, readfds, writefds, errorfds);
} }
@ -1015,6 +1029,7 @@ int esp_vfs_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *errorfds
sel_sem.sem = NULL; sel_sem.sem = NULL;
} }
free(vfs_fds_triple); free(vfs_fds_triple);
free(driver_args);
ESP_LOGD(TAG, "esp_vfs_select returns %d", ret); ESP_LOGD(TAG, "esp_vfs_select returns %d", ret);
esp_vfs_log_fd_set("readfds", readfds); esp_vfs_log_fd_set("readfds", readfds);

View file

@ -111,20 +111,21 @@ static vfs_uart_context_t* s_ctx[UART_NUM] = {
#endif #endif
}; };
/* Lock ensuring that uart_select is used from only one task at the time */ typedef struct {
static _lock_t s_one_select_lock; esp_vfs_select_sem_t select_sem;
fd_set *readfds;
fd_set *writefds;
fd_set *errorfds;
fd_set readfds_orig;
fd_set writefds_orig;
fd_set errorfds_orig;
} uart_select_args_t;
static esp_vfs_select_sem_t _select_sem = {.sem = NULL}; static uart_select_args_t **s_registered_selects = NULL;
static fd_set *_readfds = NULL; static int s_registered_select_num = 0;
static fd_set *_writefds = NULL; static portMUX_TYPE s_registered_select_lock = portMUX_INITIALIZER_UNLOCKED;
static fd_set *_errorfds = NULL;
static fd_set *_readfds_orig = NULL;
static fd_set *_writefds_orig = NULL;
static fd_set *_errorfds_orig = NULL;
static void uart_end_select(void);
static esp_err_t uart_end_select(void *end_select_args);
static int uart_open(const char * path, int flags, int mode) static int uart_open(const char * path, int flags, int mode)
{ {
@ -335,132 +336,156 @@ static int uart_fsync(int fd)
return 0; return 0;
} }
static void select_notif_callback(uart_port_t uart_num, uart_select_notif_t uart_select_notif, BaseType_t *task_woken) static esp_err_t register_select(uart_select_args_t *args)
{ {
switch (uart_select_notif) { esp_err_t ret = ESP_ERR_INVALID_ARG;
case UART_SELECT_READ_NOTIF:
if (FD_ISSET(uart_num, _readfds_orig)) { if (args) {
FD_SET(uart_num, _readfds); portENTER_CRITICAL(&s_registered_select_lock);
esp_vfs_select_triggered_isr(_select_sem, task_woken); const int new_size = s_registered_select_num + 1;
} if ((s_registered_selects = realloc(s_registered_selects, new_size * sizeof(uart_select_args_t *))) == NULL) {
break; ret = ESP_ERR_NO_MEM;
case UART_SELECT_WRITE_NOTIF: } else {
if (FD_ISSET(uart_num, _writefds_orig)) { s_registered_selects[s_registered_select_num] = args;
FD_SET(uart_num, _writefds); s_registered_select_num = new_size;
esp_vfs_select_triggered_isr(_select_sem, task_woken); ret = ESP_OK;
} }
break; portEXIT_CRITICAL(&s_registered_select_lock);
case UART_SELECT_ERROR_NOTIF:
if (FD_ISSET(uart_num, _errorfds_orig)) {
FD_SET(uart_num, _errorfds);
esp_vfs_select_triggered_isr(_select_sem, task_woken);
}
break;
} }
return ret;
} }
static esp_err_t uart_start_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, esp_vfs_select_sem_t select_sem) static esp_err_t unregister_select(uart_select_args_t *args)
{ {
if (_lock_try_acquire(&s_one_select_lock)) { esp_err_t ret = ESP_OK;
return ESP_ERR_INVALID_STATE; if (args) {
ret = ESP_ERR_INVALID_STATE;
portENTER_CRITICAL(&s_registered_select_lock);
for (int i = 0; i < s_registered_select_num; ++i) {
if (s_registered_selects[i] == args) {
const int new_size = s_registered_select_num - 1;
// The item is removed by overwriting it with the last item. The subsequent rellocation will drop the
// last item.
s_registered_selects[i] = s_registered_selects[new_size];
s_registered_selects = realloc(s_registered_selects, new_size * sizeof(uart_select_args_t *));
if (s_registered_selects || new_size == 0) {
s_registered_select_num = new_size;
ret = ESP_OK;
} else {
ret = ESP_ERR_NO_MEM;
}
break;
}
}
portEXIT_CRITICAL(&s_registered_select_lock);
} }
return ret;
}
const int max_fds = MIN(nfds, UART_NUM); static void select_notif_callback_isr(uart_port_t uart_num, uart_select_notif_t uart_select_notif, BaseType_t *task_woken)
{
portENTER_CRITICAL(uart_get_selectlock()); portENTER_CRITICAL_ISR(&s_registered_select_lock);
for (int i = 0; i < s_registered_select_num; ++i) {
if (_readfds || _writefds || _errorfds || _readfds_orig || _writefds_orig || _errorfds_orig || _select_sem.sem) { uart_select_args_t *args = s_registered_selects[i];
portEXIT_CRITICAL(uart_get_selectlock()); if (args) {
uart_end_select(); switch (uart_select_notif) {
return ESP_ERR_INVALID_STATE; case UART_SELECT_READ_NOTIF:
} if (FD_ISSET(uart_num, &args->readfds_orig)) {
FD_SET(uart_num, args->readfds);
if ((_readfds_orig = malloc(sizeof(fd_set))) == NULL) { esp_vfs_select_triggered_isr(args->select_sem, task_woken);
portEXIT_CRITICAL(uart_get_selectlock()); }
uart_end_select(); break;
return ESP_ERR_NO_MEM; case UART_SELECT_WRITE_NOTIF:
} if (FD_ISSET(uart_num, &args->writefds_orig)) {
FD_SET(uart_num, args->writefds);
if ((_writefds_orig = malloc(sizeof(fd_set))) == NULL) { esp_vfs_select_triggered_isr(args->select_sem, task_woken);
portEXIT_CRITICAL(uart_get_selectlock()); }
uart_end_select(); break;
return ESP_ERR_NO_MEM; case UART_SELECT_ERROR_NOTIF:
} if (FD_ISSET(uart_num, &args->errorfds_orig)) {
FD_SET(uart_num, args->errorfds);
if ((_errorfds_orig = malloc(sizeof(fd_set))) == NULL) { esp_vfs_select_triggered_isr(args->select_sem, task_woken);
portEXIT_CRITICAL(uart_get_selectlock()); }
uart_end_select(); break;
return ESP_ERR_NO_MEM; }
}
//uart_set_select_notif_callback set the callbacks in UART ISR
for (int i = 0; i < max_fds; ++i) {
if (FD_ISSET(i, readfds) || FD_ISSET(i, writefds) || FD_ISSET(i, exceptfds)) {
uart_set_select_notif_callback(i, select_notif_callback);
} }
} }
portEXIT_CRITICAL_ISR(&s_registered_select_lock);
}
_select_sem = select_sem; static esp_err_t uart_start_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
esp_vfs_select_sem_t select_sem, void **end_select_args)
{
const int max_fds = MIN(nfds, UART_NUM);
*end_select_args = NULL;
_readfds = readfds; uart_select_args_t *args = malloc(sizeof(uart_select_args_t));
_writefds = writefds;
_errorfds = exceptfds;
*_readfds_orig = *readfds; if (args == NULL) {
*_writefds_orig = *writefds; return ESP_ERR_NO_MEM;
*_errorfds_orig = *exceptfds; }
args->select_sem = select_sem;
args->readfds = readfds;
args->writefds = writefds;
args->errorfds = exceptfds;
args->readfds_orig = *readfds; // store the original values because they will be set to zero
args->writefds_orig = *writefds;
args->errorfds_orig = *exceptfds;
FD_ZERO(readfds); FD_ZERO(readfds);
FD_ZERO(writefds); FD_ZERO(writefds);
FD_ZERO(exceptfds); FD_ZERO(exceptfds);
portENTER_CRITICAL(uart_get_selectlock());
//uart_set_select_notif_callback sets the callbacks in UART ISR
for (int i = 0; i < max_fds; ++i) { for (int i = 0; i < max_fds; ++i) {
if (FD_ISSET(i, _readfds_orig)) { if (FD_ISSET(i, &args->readfds_orig) || FD_ISSET(i, &args->writefds_orig) || FD_ISSET(i, &args->errorfds_orig)) {
uart_set_select_notif_callback(i, select_notif_callback_isr);
}
}
for (int i = 0; i < max_fds; ++i) {
if (FD_ISSET(i, &args->readfds_orig)) {
size_t buffered_size; size_t buffered_size;
if (uart_get_buffered_data_len(i, &buffered_size) == ESP_OK && buffered_size > 0) { if (uart_get_buffered_data_len(i, &buffered_size) == ESP_OK && buffered_size > 0) {
// signalize immediately when data is buffered // signalize immediately when data is buffered
FD_SET(i, _readfds); FD_SET(i, readfds);
esp_vfs_select_triggered(_select_sem); esp_vfs_select_triggered(args->select_sem);
} }
} }
} }
portEXIT_CRITICAL(uart_get_selectlock()); esp_err_t ret = register_select(args);
// s_one_select_lock is not released on successfull exit - will be if (ret != ESP_OK) {
// released in uart_end_select() portEXIT_CRITICAL(uart_get_selectlock());
free(args);
return ret;
}
portEXIT_CRITICAL(uart_get_selectlock());
*end_select_args = args;
return ESP_OK; return ESP_OK;
} }
static void uart_end_select(void) static esp_err_t uart_end_select(void *end_select_args)
{ {
uart_select_args_t *args = end_select_args;
if (args) {
free(args);
}
portENTER_CRITICAL(uart_get_selectlock()); portENTER_CRITICAL(uart_get_selectlock());
esp_err_t ret = unregister_select(args);
for (int i = 0; i < UART_NUM; ++i) { for (int i = 0; i < UART_NUM; ++i) {
uart_set_select_notif_callback(i, NULL); uart_set_select_notif_callback(i, NULL);
} }
_select_sem.sem = NULL;
_readfds = NULL;
_writefds = NULL;
_errorfds = NULL;
if (_readfds_orig) {
free(_readfds_orig);
_readfds_orig = NULL;
}
if (_writefds_orig) {
free(_writefds_orig);
_writefds_orig = NULL;
}
if (_errorfds_orig) {
free(_errorfds_orig);
_errorfds_orig = NULL;
}
portEXIT_CRITICAL(uart_get_selectlock()); portEXIT_CRITICAL(uart_get_selectlock());
_lock_release(&s_one_select_lock);
return ret;
} }
#ifdef CONFIG_VFS_SUPPORT_TERMIOS #ifdef CONFIG_VFS_SUPPORT_TERMIOS