From bf8ff8c98b812cc7701f8d346e88a1cae1dcf6e4 Mon Sep 17 00:00:00 2001 From: Alexey Gerenkov Date: Fri, 25 Aug 2017 21:24:17 +0300 Subject: [PATCH] 1) Thread detach functionality added 2) Recursive mutexes support 3) C++ test updated --- components/pthread/pthread.c | 288 +++++++++++-------- components/pthread/test/test_pthread_cxx.cpp | 66 +++-- 2 files changed, 214 insertions(+), 140 deletions(-) diff --git a/components/pthread/pthread.c b/components/pthread/pthread.c index 7111949ee..acd4882eb 100644 --- a/components/pthread/pthread.c +++ b/components/pthread/pthread.c @@ -34,19 +34,20 @@ const static char *TAG = "esp_pthread"; #define PTHREAD_TASK_PRIO_DEFAULT 5 #define PTHREAD_TASK_STACK_SZ_DEFAULT (2048)//configMINIMAL_STACK_SIZE is not enough -#define ESP_PTHREAD_STATE_RUN 0 -#define ESP_PTHREAD_STATE_EXIT 1 -//#define ESP_PTHREAD_STATE_CANCEL 2 +#define PTHREAD_TASK_STATE_RUN 0 +#define PTHREAD_TASK_STATE_EXIT 1 typedef struct { ListItem_t list_item; TaskHandle_t join_task; int state; + bool detached; } esp_pthread_t; typedef struct { void *(*func)(void *); void *arg; + esp_pthread_t *pthread; } esp_pthread_task_arg_t; typedef struct { @@ -55,21 +56,18 @@ typedef struct { int type; } esp_pthread_mutex_t; + static SemaphoreHandle_t s_once_mux = NULL; static SemaphoreHandle_t s_threads_mux = NULL; -//static SemaphoreHandle_t s_mutexes_mux = NULL; static List_t s_threads_list; -//static List_t s_mutexes_list; -//static List_t s_key_list; + static int IRAM_ATTR pthread_mutex_lock_internal(esp_pthread_mutex_t *mux, TickType_t tmo); int esp_pthread_init(void) { vListInitialise((List_t *)&s_threads_list); - // vListInitialise((List_t *)&s_mutexes_list); - // vListInitialise((List_t *)&s_key_list); s_once_mux = xSemaphoreCreateMutex(); if (s_once_mux == NULL) return ESP_FAIL; @@ -78,12 +76,6 @@ int esp_pthread_init(void) vSemaphoreDelete(s_once_mux); return ESP_FAIL; } - // s_mutexes_mux = xSemaphoreCreateMutex(); - // if (s_mutexes_mux == NULL) { - // vSemaphoreDelete(s_threads_mux); - // vSemaphoreDelete(s_once_mux); - // return ESP_FAIL; - // } return ESP_OK; } @@ -115,42 +107,50 @@ static esp_pthread_t *pthread_find(TaskHandle_t task_handle) return NULL; } +static void pthread_delete(esp_pthread_t *pthread) +{ + uxListRemove(&pthread->list_item); + free(pthread); +} + static void pthread_task_func(void *arg) { esp_pthread_task_arg_t *task_arg = (esp_pthread_task_arg_t *)arg; - ESP_PTHREAD_LOGV(TAG, "%s ENTER %p", __FUNCTION__, task_arg->func); + ESP_PTHREAD_LOGV(TAG, "%s ENTER %p", __FUNCTION__, task_arg->pthread); // wait for start xTaskNotifyWait(0, 0, NULL, portMAX_DELAY); - ESP_PTHREAD_LOGV(TAG, "%s START %p", __FUNCTION__, task_arg->func); - + ESP_PTHREAD_LOGV(TAG, "%s START %p", __FUNCTION__, task_arg->pthread); task_arg->func(task_arg->arg); + ESP_PTHREAD_LOGV(TAG, "%s END %p", __FUNCTION__, task_arg->pthread); + free(task_arg); - ESP_PTHREAD_LOGV(TAG, "%s END %p", __FUNCTION__, task_arg->func); - - if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) == pdTRUE) { - esp_pthread_t *pthread = pthread_find(xTaskGetCurrentTaskHandle()); - if (pthread) { - pthread->state = ESP_PTHREAD_STATE_EXIT; - if (pthread->join_task) { - // notify join - xTaskNotify(pthread->join_task, 0, eNoAction); - } - } else { - assert(false && "Failed to find pthread for current task!"); - } - xSemaphoreGive(s_threads_mux); - } else { + if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) != pdTRUE) { assert(false && "Failed to lock threads list!"); } - // TODO: Remove from list??? - //free(task_arg->pthread); - free(task_arg); + esp_pthread_t *pthread = pthread_find(xTaskGetCurrentTaskHandle()); + if (!pthread) { + assert(false && "Failed to find pthread for current task!"); + } + if (pthread->detached) { + // auto-free for detached threads + pthread_delete(pthread); + } else { + // Remove from list, it indicates that task has exited + if (pthread->join_task) { + // notify join + xTaskNotify(pthread->join_task, 0, eNoAction); + } else { + pthread->state = PTHREAD_TASK_STATE_EXIT; + } + } + xSemaphoreGive(s_threads_mux); + vTaskDelete(NULL); - ESP_PTHREAD_LOGV(TAG, "%s EXIT %p", __FUNCTION__, task_arg->func); + ESP_PTHREAD_LOGV(TAG, "%s EXIT", __FUNCTION__); } int pthread_create(pthread_t *thread, const pthread_attr_t *attr, @@ -205,7 +205,7 @@ int pthread_create(pthread_t *thread, const pthread_attr_t *attr, memset(pthread, 0, sizeof(esp_pthread_t)); task_arg->func = start_routine; task_arg->arg = arg; - //task_arg->pthread = pthread; + task_arg->pthread = pthread; BaseType_t res = xTaskCreate(&pthread_task_func, "pthread", PTHREAD_TASK_STACK_SZ_DEFAULT, task_arg, PTHREAD_TASK_PRIO_DEFAULT, &xHandle); if(res != pdPASS) { ESP_PTHREAD_LOGE(TAG, "Failed to create task!"); @@ -223,12 +223,12 @@ int pthread_create(pthread_t *thread, const pthread_attr_t *attr, listSET_LIST_ITEM_OWNER((ListItem_t *)&pthread->list_item, pthread); listSET_LIST_ITEM_VALUE((ListItem_t *)&pthread->list_item, (TickType_t)xHandle); - if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) == pdTRUE) { - vListInsertEnd((List_t *)&s_threads_list, (ListItem_t *)&pthread->list_item); - xSemaphoreGive(s_threads_mux); - } else { + if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) != pdTRUE) { assert(false && "Failed to lock threads list!"); } + vListInsertEnd((List_t *)&s_threads_list, (ListItem_t *)&pthread->list_item); + xSemaphoreGive(s_threads_mux); + // start task xTaskNotify(xHandle, 0, eNoAction); @@ -242,55 +242,74 @@ int pthread_create(pthread_t *thread, const pthread_attr_t *attr, int pthread_join(pthread_t thread, void **retval) { esp_pthread_t *pthread = (esp_pthread_t *)thread; - bool wait = false; int ret = 0; - ets_printf("%s\n", __FUNCTION__); + ESP_PTHREAD_LOGV(TAG, "%s %p", __FUNCTION__, pthread); + // find task - if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) == pdTRUE) { - //uxBitsWaitedFor = listGET_LIST_ITEM_VALUE(list_item); - // TODO: check if task is joinable??? - // TODO: PTHREAD_CANCELED??? - TaskHandle_t handle = pthread_find_handle(thread); - if (!handle) { - errno = ESRCH; // not found - ret = ESRCH; - } else if (pthread->join_task) { - errno = EINVAL; // already have waiting task to join - ret = EINVAL; - } else if (handle == xTaskGetCurrentTaskHandle()) { - errno = EDEADLK; // join to self or join to each other - ret = EDEADLK; - } else { - esp_pthread_t *cur_pthread = pthread_find(xTaskGetCurrentTaskHandle()); - if (cur_pthread && cur_pthread->join_task == handle) { - errno = EDEADLK; // join to each other - ret = EDEADLK; - } else { - if (pthread->state == ESP_PTHREAD_STATE_RUN) { - pthread->join_task = xTaskGetCurrentTaskHandle(); - wait = true; - } - } - } - xSemaphoreGive(s_threads_mux); - } else { + if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) != pdTRUE) { assert(false && "Failed to lock threads list!"); } - if (wait) { - // TODO: handle caller cancelation??? - xTaskNotifyWait(0, 0, NULL, portMAX_DELAY); + TaskHandle_t handle = pthread_find_handle(thread); + if (!handle) { + errno = ESRCH; // not found + ret = ESRCH; + } else if (pthread->join_task) { + errno = EINVAL; // already have waiting task to join + ret = EINVAL; + } else if (handle == xTaskGetCurrentTaskHandle()) { + errno = EDEADLK; // join to self not allowed + ret = EDEADLK; + } else { + esp_pthread_t *cur_pthread = pthread_find(xTaskGetCurrentTaskHandle()); + if (cur_pthread && cur_pthread->join_task == handle) { + errno = EDEADLK; // join to each other not allowed + ret = EDEADLK; + } else { + if (pthread->state == PTHREAD_TASK_STATE_RUN) { + pthread->join_task = xTaskGetCurrentTaskHandle(); + } else { + pthread_delete(pthread); + } + } } + xSemaphoreGive(s_threads_mux); + + if (ret == 0 && pthread->join_task) { + xTaskNotifyWait(0, 0, NULL, portMAX_DELAY); + if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) != pdTRUE) { + assert(false && "Failed to lock threads list!"); + } + pthread_delete(pthread); + xSemaphoreGive(s_threads_mux); + } + if (retval) { *retval = 0; // no exit code in FreeRTOS } + + ESP_PTHREAD_LOGV(TAG, "%s %p EXIT %d", __FUNCTION__, pthread, ret); return ret; } int pthread_detach(pthread_t thread) { - assert(false && "pthread_detach not supported!"); - return -1; + esp_pthread_t *pthread = (esp_pthread_t *)thread; + int ret = 0; + + if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) != pdTRUE) { + assert(false && "Failed to lock threads list!"); + } + TaskHandle_t handle = pthread_find_handle(thread); + if (!handle) { + errno = ESRCH; // not found + ret = ESRCH; + } else { + pthread->detached = true; + } + xSemaphoreGive(s_threads_mux); + ESP_PTHREAD_LOGV(TAG, "%s %p EXIT %d", __FUNCTION__, pthread, ret); + return ret; } int pthread_cancel(pthread_t thread) @@ -307,10 +326,14 @@ int sched_yield( void ) pthread_t pthread_self(void) { + if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) != pdTRUE) { + assert(false && "Failed to lock threads list!"); + } esp_pthread_t *pthread = pthread_find(xTaskGetCurrentTaskHandle()); if (!pthread) { assert(false && "Failed to find current thread ID!"); } + xSemaphoreGive(s_threads_mux); return (pthread_t)pthread; } @@ -324,30 +347,11 @@ int pthread_key_create(pthread_key_t *key, void (*destructor)(void*)) { static int s_created; - ESP_PTHREAD_LOGV(TAG, "%s", __FUNCTION__); - + //TODO: Key destructors not suppoted! if (s_created) { - assert(false && "CREATED!"); + // key API supports just one key necessary by libstdcxx threading implementation return ENOMEM; } - -// key = (struct pthread_key *)malloc(sizeof(struct pthread_key)); -// if (!key) { -// errno = ENOMEM; -// return ENOMEM; -// } -// key->destructor = destructor; - -// list_init(&key->specific,1); - -// vListInsert((List_t *)s_key_list, (ListItem_t *)&); -// // Add key to key list -// res = list_add(&key_list, key, k); -// if (res) { -// free(key); -// errno = res; -// return res; -// } *key = 1; s_created = 1; return 0; @@ -355,22 +359,19 @@ int pthread_key_create(pthread_key_t *key, void (*destructor)(void*)) int pthread_key_delete(pthread_key_t key) { - ESP_PTHREAD_LOGV(TAG, "%s", __FUNCTION__); - assert(false && "NOT IMPLEMENTED!"); + assert(false && "pthread_key_delete not supported!"); return -1; } void *pthread_getspecific(pthread_key_t key) { - ESP_PTHREAD_LOGV(TAG, "%s", __FUNCTION__); - assert(false && "NOT IMPLEMENTED!"); + assert(false && "pthread_getspecific not supported!"); return NULL; } int pthread_setspecific(pthread_key_t key, const void *value) { - ESP_PTHREAD_LOGV(TAG, "%s", __FUNCTION__); - assert(false && "NOT IMPLEMENTED!"); + assert(false && "pthread_setspecific not supported!"); return -1; } @@ -405,26 +406,36 @@ int pthread_once(pthread_once_t *once_control, void (*init_routine)(void)) } /***************** MUTEX ******************/ +static int mutexattr_check(const pthread_mutexattr_t *attr) +{ + if (attr->type < PTHREAD_MUTEX_NORMAL || attr->type > PTHREAD_MUTEX_RECURSIVE) { + return EINVAL; + } + return 0; +} + int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) { int type = PTHREAD_MUTEX_NORMAL; - ESP_PTHREAD_LOGV(TAG, "%s %p", __FUNCTION__, mutex); - if (!mutex) { errno = EINVAL; return EINVAL; } - if (attr && attr->is_initialized) { + if (attr) { + if (!attr->is_initialized) { + errno = EINVAL; + return EINVAL; + } + int res = mutexattr_check(attr); + if (res) { + errno = res; + return res; + } type = attr->type; } - if (type < PTHREAD_MUTEX_NORMAL || type > PTHREAD_MUTEX_DEFAULT) { - errno = EINVAL; - return EINVAL; - } - esp_pthread_mutex_t *mux = (esp_pthread_mutex_t *)malloc(sizeof(esp_pthread_mutex_t)); if (!mux) { errno = ENOMEM; @@ -443,17 +454,6 @@ int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) return EAGAIN; } - // vListInitialiseItem((ListItem_t *)&mux->list_item); - // listSET_LIST_ITEM_OWNER((ListItem_t *)&mux->list_item, mux); - // //listSET_LIST_ITEM_VALUE((ListItem_t *)&pthread->list_item, (TickType_t)xHandle); - - // if (xSemaphoreTake(s_mutexes_mux, portMAX_DELAY) == pdTRUE) { - // vListInsertEnd((List_t *)&s_mutexes_list, (ListItem_t *)&mux->list_item); - // xSemaphoreGive(s_mutexes_mux); - // } else { - // assert(false && "Failed to lock mutexes list!"); - // } - *mutex = (pthread_mutex_t)mux; // pointer value fit into pthread_mutex_t (uint32_t) return 0; @@ -537,6 +537,50 @@ int IRAM_ATTR pthread_mutex_unlock(pthread_mutex_t *mutex) return 0; } +int pthread_mutexattr_init(pthread_mutexattr_t *attr) +{ + if (!attr) { + errno = EINVAL; + return EINVAL; + } + attr->type = PTHREAD_MUTEX_NORMAL; + attr->is_initialized = 1; + return 0; +} + +int pthread_mutexattr_destroy(pthread_mutexattr_t *attr) +{ + if (!attr) { + errno = EINVAL; + return EINVAL; + } + attr->is_initialized = 0; + return 0; +} + +int pthread_mutexattr_gettype(const pthread_mutexattr_t *attr, int *type) +{ + assert(false && "pthread_mutexattr_gettype not supported!"); + return -1; +} + +int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int type) +{ + if (!attr) { + errno = EINVAL; + return EINVAL; + } + pthread_mutexattr_t tmp_attr = {.type = type}; + int res = mutexattr_check(&tmp_attr); + if (res) { + errno = res; + } else { + attr->type = type; + } + return res; +} + +/***************** AUX ******************/ // TODO: move to newlib/time.c???? // needed for std::this_thread::sleep_for unsigned int sleep(unsigned int seconds) diff --git a/components/pthread/test/test_pthread_cxx.cpp b/components/pthread/test/test_pthread_cxx.cpp index 1402088a7..ab2ba07cc 100644 --- a/components/pthread/test/test_pthread_cxx.cpp +++ b/components/pthread/test/test_pthread_cxx.cpp @@ -4,46 +4,76 @@ #include "unity.h" std::shared_ptr global_sp; -std::mutex mtx; +std::mutex mtx; +std::recursive_mutex recur_mtx; -static void thread_main() { +static void thread_do_nothing() {} + +static void thread_main() +{ int i = 0; std::cout << "thread_main CXX " << std::hex << std::this_thread::get_id() << std::endl; - while (i < 10) { + while (i < 3) { + int old_val, new_val; + // mux test mtx.lock(); - // yield test + old_val = *global_sp; std::this_thread::yield(); (*global_sp)++; + std::this_thread::yield(); + new_val = *global_sp; mtx.unlock(); std::cout << "thread " << std::hex << std::this_thread::get_id() << ": " << i++ << " val= " << *global_sp << std::endl; - // sleep_for test + TEST_ASSERT_TRUE(new_val == old_val + 1); + + // sleep_for test std::chrono::milliseconds dur(300); std::this_thread::sleep_for(dur); - // sleep_until test + + // recursive mux test + recur_mtx.lock(); + recur_mtx.lock(); + old_val = *global_sp; + std::this_thread::yield(); + (*global_sp)++; + std::this_thread::yield(); + new_val = *global_sp; + recur_mtx.unlock(); + recur_mtx.unlock(); + std::cout << "thread " << std::hex << std::this_thread::get_id() << ": " << i++ << " val= " << *global_sp << std::endl; + TEST_ASSERT_TRUE(new_val == old_val + 1); + + // sleep_until test using std::chrono::system_clock; - std::time_t tt = system_clock::to_time_t(system_clock::now()); + std::time_t tt = system_clock::to_time_t(system_clock::now()); struct std::tm *ptm = std::localtime(&tt); - ptm->tm_sec = 1; + ptm->tm_sec++; std::this_thread::sleep_until(system_clock::from_time_t (mktime(ptm))); } } TEST_CASE("pthread CXX test 1", "[pthread]") { - std::cout << "TEST START!" << std::endl; - global_sp.reset(new int(1)); - std::thread t1(thread_main); + + std::thread t1(thread_do_nothing); + t1.join(); + std::thread t2(thread_main); - if (t1.joinable()) { - std::cout << "Join thread " << std::hex << t1.get_id() << std::endl; - t1.join(); + std::cout << "Detach thread " << std::hex << t2.get_id() << std::endl; + t2.detach(); + TEST_ASSERT_FALSE(t2.joinable()); + + std::thread t3(thread_main); + std::thread t4(thread_main); + if (t3.joinable()) { + std::cout << "Join thread " << std::hex << t3.get_id() << std::endl; + t3.join(); } - if (t2.joinable()) { - std::cout << "Join thread " << std::hex << t2.get_id() << std::endl; - t2.join(); + if (t4.joinable()) { + std::cout << "Join thread " << std::hex << t4.get_id() << std::endl; + t4.join(); } - std::cout << "TEST END!" << std::endl; }