1) Thread detach functionality added

2) Recursive mutexes support
3) C++ test updated
This commit is contained in:
Alexey Gerenkov 2017-08-25 21:24:17 +03:00 committed by Ivan Grokhotkov
parent c631c6b358
commit bf8ff8c98b
2 changed files with 214 additions and 140 deletions

View file

@ -34,19 +34,20 @@ const static char *TAG = "esp_pthread";
#define PTHREAD_TASK_PRIO_DEFAULT 5 #define PTHREAD_TASK_PRIO_DEFAULT 5
#define PTHREAD_TASK_STACK_SZ_DEFAULT (2048)//configMINIMAL_STACK_SIZE is not enough #define PTHREAD_TASK_STACK_SZ_DEFAULT (2048)//configMINIMAL_STACK_SIZE is not enough
#define ESP_PTHREAD_STATE_RUN 0 #define PTHREAD_TASK_STATE_RUN 0
#define ESP_PTHREAD_STATE_EXIT 1 #define PTHREAD_TASK_STATE_EXIT 1
//#define ESP_PTHREAD_STATE_CANCEL 2
typedef struct { typedef struct {
ListItem_t list_item; ListItem_t list_item;
TaskHandle_t join_task; TaskHandle_t join_task;
int state; int state;
bool detached;
} esp_pthread_t; } esp_pthread_t;
typedef struct { typedef struct {
void *(*func)(void *); void *(*func)(void *);
void *arg; void *arg;
esp_pthread_t *pthread;
} esp_pthread_task_arg_t; } esp_pthread_task_arg_t;
typedef struct { typedef struct {
@ -55,21 +56,18 @@ typedef struct {
int type; int type;
} esp_pthread_mutex_t; } esp_pthread_mutex_t;
static SemaphoreHandle_t s_once_mux = NULL; static SemaphoreHandle_t s_once_mux = NULL;
static SemaphoreHandle_t s_threads_mux = NULL; static SemaphoreHandle_t s_threads_mux = NULL;
//static SemaphoreHandle_t s_mutexes_mux = NULL;
static List_t s_threads_list; static List_t s_threads_list;
//static List_t s_mutexes_list;
//static List_t s_key_list;
static int IRAM_ATTR pthread_mutex_lock_internal(esp_pthread_mutex_t *mux, TickType_t tmo); static int IRAM_ATTR pthread_mutex_lock_internal(esp_pthread_mutex_t *mux, TickType_t tmo);
int esp_pthread_init(void) int esp_pthread_init(void)
{ {
vListInitialise((List_t *)&s_threads_list); vListInitialise((List_t *)&s_threads_list);
// vListInitialise((List_t *)&s_mutexes_list);
// vListInitialise((List_t *)&s_key_list);
s_once_mux = xSemaphoreCreateMutex(); s_once_mux = xSemaphoreCreateMutex();
if (s_once_mux == NULL) if (s_once_mux == NULL)
return ESP_FAIL; return ESP_FAIL;
@ -78,12 +76,6 @@ int esp_pthread_init(void)
vSemaphoreDelete(s_once_mux); vSemaphoreDelete(s_once_mux);
return ESP_FAIL; return ESP_FAIL;
} }
// s_mutexes_mux = xSemaphoreCreateMutex();
// if (s_mutexes_mux == NULL) {
// vSemaphoreDelete(s_threads_mux);
// vSemaphoreDelete(s_once_mux);
// return ESP_FAIL;
// }
return ESP_OK; return ESP_OK;
} }
@ -115,42 +107,50 @@ static esp_pthread_t *pthread_find(TaskHandle_t task_handle)
return NULL; return NULL;
} }
static void pthread_delete(esp_pthread_t *pthread)
{
uxListRemove(&pthread->list_item);
free(pthread);
}
static void pthread_task_func(void *arg) static void pthread_task_func(void *arg)
{ {
esp_pthread_task_arg_t *task_arg = (esp_pthread_task_arg_t *)arg; esp_pthread_task_arg_t *task_arg = (esp_pthread_task_arg_t *)arg;
ESP_PTHREAD_LOGV(TAG, "%s ENTER %p", __FUNCTION__, task_arg->func); ESP_PTHREAD_LOGV(TAG, "%s ENTER %p", __FUNCTION__, task_arg->pthread);
// wait for start // wait for start
xTaskNotifyWait(0, 0, NULL, portMAX_DELAY); xTaskNotifyWait(0, 0, NULL, portMAX_DELAY);
ESP_PTHREAD_LOGV(TAG, "%s START %p", __FUNCTION__, task_arg->func); ESP_PTHREAD_LOGV(TAG, "%s START %p", __FUNCTION__, task_arg->pthread);
task_arg->func(task_arg->arg); task_arg->func(task_arg->arg);
ESP_PTHREAD_LOGV(TAG, "%s END %p", __FUNCTION__, task_arg->pthread);
free(task_arg);
ESP_PTHREAD_LOGV(TAG, "%s END %p", __FUNCTION__, task_arg->func); if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) != pdTRUE) {
if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) == pdTRUE) {
esp_pthread_t *pthread = pthread_find(xTaskGetCurrentTaskHandle());
if (pthread) {
pthread->state = ESP_PTHREAD_STATE_EXIT;
if (pthread->join_task) {
// notify join
xTaskNotify(pthread->join_task, 0, eNoAction);
}
} else {
assert(false && "Failed to find pthread for current task!");
}
xSemaphoreGive(s_threads_mux);
} else {
assert(false && "Failed to lock threads list!"); assert(false && "Failed to lock threads list!");
} }
// TODO: Remove from list??? esp_pthread_t *pthread = pthread_find(xTaskGetCurrentTaskHandle());
//free(task_arg->pthread); if (!pthread) {
free(task_arg); assert(false && "Failed to find pthread for current task!");
}
if (pthread->detached) {
// auto-free for detached threads
pthread_delete(pthread);
} else {
// Remove from list, it indicates that task has exited
if (pthread->join_task) {
// notify join
xTaskNotify(pthread->join_task, 0, eNoAction);
} else {
pthread->state = PTHREAD_TASK_STATE_EXIT;
}
}
xSemaphoreGive(s_threads_mux);
vTaskDelete(NULL); vTaskDelete(NULL);
ESP_PTHREAD_LOGV(TAG, "%s EXIT %p", __FUNCTION__, task_arg->func); ESP_PTHREAD_LOGV(TAG, "%s EXIT", __FUNCTION__);
} }
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
@ -205,7 +205,7 @@ int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
memset(pthread, 0, sizeof(esp_pthread_t)); memset(pthread, 0, sizeof(esp_pthread_t));
task_arg->func = start_routine; task_arg->func = start_routine;
task_arg->arg = arg; task_arg->arg = arg;
//task_arg->pthread = pthread; task_arg->pthread = pthread;
BaseType_t res = xTaskCreate(&pthread_task_func, "pthread", PTHREAD_TASK_STACK_SZ_DEFAULT, task_arg, PTHREAD_TASK_PRIO_DEFAULT, &xHandle); BaseType_t res = xTaskCreate(&pthread_task_func, "pthread", PTHREAD_TASK_STACK_SZ_DEFAULT, task_arg, PTHREAD_TASK_PRIO_DEFAULT, &xHandle);
if(res != pdPASS) { if(res != pdPASS) {
ESP_PTHREAD_LOGE(TAG, "Failed to create task!"); ESP_PTHREAD_LOGE(TAG, "Failed to create task!");
@ -223,12 +223,12 @@ int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
listSET_LIST_ITEM_OWNER((ListItem_t *)&pthread->list_item, pthread); listSET_LIST_ITEM_OWNER((ListItem_t *)&pthread->list_item, pthread);
listSET_LIST_ITEM_VALUE((ListItem_t *)&pthread->list_item, (TickType_t)xHandle); listSET_LIST_ITEM_VALUE((ListItem_t *)&pthread->list_item, (TickType_t)xHandle);
if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) == pdTRUE) { if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) != pdTRUE) {
vListInsertEnd((List_t *)&s_threads_list, (ListItem_t *)&pthread->list_item);
xSemaphoreGive(s_threads_mux);
} else {
assert(false && "Failed to lock threads list!"); assert(false && "Failed to lock threads list!");
} }
vListInsertEnd((List_t *)&s_threads_list, (ListItem_t *)&pthread->list_item);
xSemaphoreGive(s_threads_mux);
// start task // start task
xTaskNotify(xHandle, 0, eNoAction); xTaskNotify(xHandle, 0, eNoAction);
@ -242,55 +242,74 @@ int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
int pthread_join(pthread_t thread, void **retval) int pthread_join(pthread_t thread, void **retval)
{ {
esp_pthread_t *pthread = (esp_pthread_t *)thread; esp_pthread_t *pthread = (esp_pthread_t *)thread;
bool wait = false;
int ret = 0; int ret = 0;
ets_printf("%s\n", __FUNCTION__); ESP_PTHREAD_LOGV(TAG, "%s %p", __FUNCTION__, pthread);
// find task // find task
if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) == pdTRUE) { if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) != pdTRUE) {
//uxBitsWaitedFor = listGET_LIST_ITEM_VALUE(list_item);
// TODO: check if task is joinable???
// TODO: PTHREAD_CANCELED???
TaskHandle_t handle = pthread_find_handle(thread);
if (!handle) {
errno = ESRCH; // not found
ret = ESRCH;
} else if (pthread->join_task) {
errno = EINVAL; // already have waiting task to join
ret = EINVAL;
} else if (handle == xTaskGetCurrentTaskHandle()) {
errno = EDEADLK; // join to self or join to each other
ret = EDEADLK;
} else {
esp_pthread_t *cur_pthread = pthread_find(xTaskGetCurrentTaskHandle());
if (cur_pthread && cur_pthread->join_task == handle) {
errno = EDEADLK; // join to each other
ret = EDEADLK;
} else {
if (pthread->state == ESP_PTHREAD_STATE_RUN) {
pthread->join_task = xTaskGetCurrentTaskHandle();
wait = true;
}
}
}
xSemaphoreGive(s_threads_mux);
} else {
assert(false && "Failed to lock threads list!"); assert(false && "Failed to lock threads list!");
} }
if (wait) { TaskHandle_t handle = pthread_find_handle(thread);
// TODO: handle caller cancelation??? if (!handle) {
xTaskNotifyWait(0, 0, NULL, portMAX_DELAY); errno = ESRCH; // not found
ret = ESRCH;
} else if (pthread->join_task) {
errno = EINVAL; // already have waiting task to join
ret = EINVAL;
} else if (handle == xTaskGetCurrentTaskHandle()) {
errno = EDEADLK; // join to self not allowed
ret = EDEADLK;
} else {
esp_pthread_t *cur_pthread = pthread_find(xTaskGetCurrentTaskHandle());
if (cur_pthread && cur_pthread->join_task == handle) {
errno = EDEADLK; // join to each other not allowed
ret = EDEADLK;
} else {
if (pthread->state == PTHREAD_TASK_STATE_RUN) {
pthread->join_task = xTaskGetCurrentTaskHandle();
} else {
pthread_delete(pthread);
}
}
} }
xSemaphoreGive(s_threads_mux);
if (ret == 0 && pthread->join_task) {
xTaskNotifyWait(0, 0, NULL, portMAX_DELAY);
if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) != pdTRUE) {
assert(false && "Failed to lock threads list!");
}
pthread_delete(pthread);
xSemaphoreGive(s_threads_mux);
}
if (retval) { if (retval) {
*retval = 0; // no exit code in FreeRTOS *retval = 0; // no exit code in FreeRTOS
} }
ESP_PTHREAD_LOGV(TAG, "%s %p EXIT %d", __FUNCTION__, pthread, ret);
return ret; return ret;
} }
int pthread_detach(pthread_t thread) int pthread_detach(pthread_t thread)
{ {
assert(false && "pthread_detach not supported!"); esp_pthread_t *pthread = (esp_pthread_t *)thread;
return -1; int ret = 0;
if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) != pdTRUE) {
assert(false && "Failed to lock threads list!");
}
TaskHandle_t handle = pthread_find_handle(thread);
if (!handle) {
errno = ESRCH; // not found
ret = ESRCH;
} else {
pthread->detached = true;
}
xSemaphoreGive(s_threads_mux);
ESP_PTHREAD_LOGV(TAG, "%s %p EXIT %d", __FUNCTION__, pthread, ret);
return ret;
} }
int pthread_cancel(pthread_t thread) int pthread_cancel(pthread_t thread)
@ -307,10 +326,14 @@ int sched_yield( void )
pthread_t pthread_self(void) pthread_t pthread_self(void)
{ {
if (xSemaphoreTake(s_threads_mux, portMAX_DELAY) != pdTRUE) {
assert(false && "Failed to lock threads list!");
}
esp_pthread_t *pthread = pthread_find(xTaskGetCurrentTaskHandle()); esp_pthread_t *pthread = pthread_find(xTaskGetCurrentTaskHandle());
if (!pthread) { if (!pthread) {
assert(false && "Failed to find current thread ID!"); assert(false && "Failed to find current thread ID!");
} }
xSemaphoreGive(s_threads_mux);
return (pthread_t)pthread; return (pthread_t)pthread;
} }
@ -324,30 +347,11 @@ int pthread_key_create(pthread_key_t *key, void (*destructor)(void*))
{ {
static int s_created; static int s_created;
ESP_PTHREAD_LOGV(TAG, "%s", __FUNCTION__); //TODO: Key destructors not suppoted!
if (s_created) { if (s_created) {
assert(false && "CREATED!"); // key API supports just one key necessary by libstdcxx threading implementation
return ENOMEM; return ENOMEM;
} }
// key = (struct pthread_key *)malloc(sizeof(struct pthread_key));
// if (!key) {
// errno = ENOMEM;
// return ENOMEM;
// }
// key->destructor = destructor;
// list_init(&key->specific,1);
// vListInsert((List_t *)s_key_list, (ListItem_t *)&);
// // Add key to key list
// res = list_add(&key_list, key, k);
// if (res) {
// free(key);
// errno = res;
// return res;
// }
*key = 1; *key = 1;
s_created = 1; s_created = 1;
return 0; return 0;
@ -355,22 +359,19 @@ int pthread_key_create(pthread_key_t *key, void (*destructor)(void*))
int pthread_key_delete(pthread_key_t key) int pthread_key_delete(pthread_key_t key)
{ {
ESP_PTHREAD_LOGV(TAG, "%s", __FUNCTION__); assert(false && "pthread_key_delete not supported!");
assert(false && "NOT IMPLEMENTED!");
return -1; return -1;
} }
void *pthread_getspecific(pthread_key_t key) void *pthread_getspecific(pthread_key_t key)
{ {
ESP_PTHREAD_LOGV(TAG, "%s", __FUNCTION__); assert(false && "pthread_getspecific not supported!");
assert(false && "NOT IMPLEMENTED!");
return NULL; return NULL;
} }
int pthread_setspecific(pthread_key_t key, const void *value) int pthread_setspecific(pthread_key_t key, const void *value)
{ {
ESP_PTHREAD_LOGV(TAG, "%s", __FUNCTION__); assert(false && "pthread_setspecific not supported!");
assert(false && "NOT IMPLEMENTED!");
return -1; return -1;
} }
@ -405,26 +406,36 @@ int pthread_once(pthread_once_t *once_control, void (*init_routine)(void))
} }
/***************** MUTEX ******************/ /***************** MUTEX ******************/
static int mutexattr_check(const pthread_mutexattr_t *attr)
{
if (attr->type < PTHREAD_MUTEX_NORMAL || attr->type > PTHREAD_MUTEX_RECURSIVE) {
return EINVAL;
}
return 0;
}
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
{ {
int type = PTHREAD_MUTEX_NORMAL; int type = PTHREAD_MUTEX_NORMAL;
ESP_PTHREAD_LOGV(TAG, "%s %p", __FUNCTION__, mutex);
if (!mutex) { if (!mutex) {
errno = EINVAL; errno = EINVAL;
return EINVAL; return EINVAL;
} }
if (attr && attr->is_initialized) { if (attr) {
if (!attr->is_initialized) {
errno = EINVAL;
return EINVAL;
}
int res = mutexattr_check(attr);
if (res) {
errno = res;
return res;
}
type = attr->type; type = attr->type;
} }
if (type < PTHREAD_MUTEX_NORMAL || type > PTHREAD_MUTEX_DEFAULT) {
errno = EINVAL;
return EINVAL;
}
esp_pthread_mutex_t *mux = (esp_pthread_mutex_t *)malloc(sizeof(esp_pthread_mutex_t)); esp_pthread_mutex_t *mux = (esp_pthread_mutex_t *)malloc(sizeof(esp_pthread_mutex_t));
if (!mux) { if (!mux) {
errno = ENOMEM; errno = ENOMEM;
@ -443,17 +454,6 @@ int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
return EAGAIN; return EAGAIN;
} }
// vListInitialiseItem((ListItem_t *)&mux->list_item);
// listSET_LIST_ITEM_OWNER((ListItem_t *)&mux->list_item, mux);
// //listSET_LIST_ITEM_VALUE((ListItem_t *)&pthread->list_item, (TickType_t)xHandle);
// if (xSemaphoreTake(s_mutexes_mux, portMAX_DELAY) == pdTRUE) {
// vListInsertEnd((List_t *)&s_mutexes_list, (ListItem_t *)&mux->list_item);
// xSemaphoreGive(s_mutexes_mux);
// } else {
// assert(false && "Failed to lock mutexes list!");
// }
*mutex = (pthread_mutex_t)mux; // pointer value fit into pthread_mutex_t (uint32_t) *mutex = (pthread_mutex_t)mux; // pointer value fit into pthread_mutex_t (uint32_t)
return 0; return 0;
@ -537,6 +537,50 @@ int IRAM_ATTR pthread_mutex_unlock(pthread_mutex_t *mutex)
return 0; return 0;
} }
int pthread_mutexattr_init(pthread_mutexattr_t *attr)
{
if (!attr) {
errno = EINVAL;
return EINVAL;
}
attr->type = PTHREAD_MUTEX_NORMAL;
attr->is_initialized = 1;
return 0;
}
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr)
{
if (!attr) {
errno = EINVAL;
return EINVAL;
}
attr->is_initialized = 0;
return 0;
}
int pthread_mutexattr_gettype(const pthread_mutexattr_t *attr, int *type)
{
assert(false && "pthread_mutexattr_gettype not supported!");
return -1;
}
int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int type)
{
if (!attr) {
errno = EINVAL;
return EINVAL;
}
pthread_mutexattr_t tmp_attr = {.type = type};
int res = mutexattr_check(&tmp_attr);
if (res) {
errno = res;
} else {
attr->type = type;
}
return res;
}
/***************** AUX ******************/
// TODO: move to newlib/time.c???? // TODO: move to newlib/time.c????
// needed for std::this_thread::sleep_for // needed for std::this_thread::sleep_for
unsigned int sleep(unsigned int seconds) unsigned int sleep(unsigned int seconds)

View file

@ -4,46 +4,76 @@
#include "unity.h" #include "unity.h"
std::shared_ptr<int> global_sp; std::shared_ptr<int> global_sp;
std::mutex mtx; std::mutex mtx;
std::recursive_mutex recur_mtx;
static void thread_main() { static void thread_do_nothing() {}
static void thread_main()
{
int i = 0; int i = 0;
std::cout << "thread_main CXX " << std::hex << std::this_thread::get_id() << std::endl; std::cout << "thread_main CXX " << std::hex << std::this_thread::get_id() << std::endl;
while (i < 10) { while (i < 3) {
int old_val, new_val;
// mux test // mux test
mtx.lock(); mtx.lock();
// yield test old_val = *global_sp;
std::this_thread::yield(); std::this_thread::yield();
(*global_sp)++; (*global_sp)++;
std::this_thread::yield();
new_val = *global_sp;
mtx.unlock(); mtx.unlock();
std::cout << "thread " << std::hex << std::this_thread::get_id() << ": " << i++ << " val= " << *global_sp << std::endl; std::cout << "thread " << std::hex << std::this_thread::get_id() << ": " << i++ << " val= " << *global_sp << std::endl;
// sleep_for test TEST_ASSERT_TRUE(new_val == old_val + 1);
// sleep_for test
std::chrono::milliseconds dur(300); std::chrono::milliseconds dur(300);
std::this_thread::sleep_for(dur); std::this_thread::sleep_for(dur);
// sleep_until test
// recursive mux test
recur_mtx.lock();
recur_mtx.lock();
old_val = *global_sp;
std::this_thread::yield();
(*global_sp)++;
std::this_thread::yield();
new_val = *global_sp;
recur_mtx.unlock();
recur_mtx.unlock();
std::cout << "thread " << std::hex << std::this_thread::get_id() << ": " << i++ << " val= " << *global_sp << std::endl;
TEST_ASSERT_TRUE(new_val == old_val + 1);
// sleep_until test
using std::chrono::system_clock; using std::chrono::system_clock;
std::time_t tt = system_clock::to_time_t(system_clock::now()); std::time_t tt = system_clock::to_time_t(system_clock::now());
struct std::tm *ptm = std::localtime(&tt); struct std::tm *ptm = std::localtime(&tt);
ptm->tm_sec = 1; ptm->tm_sec++;
std::this_thread::sleep_until(system_clock::from_time_t (mktime(ptm))); std::this_thread::sleep_until(system_clock::from_time_t (mktime(ptm)));
} }
} }
TEST_CASE("pthread CXX test 1", "[pthread]") TEST_CASE("pthread CXX test 1", "[pthread]")
{ {
std::cout << "TEST START!" << std::endl;
global_sp.reset(new int(1)); global_sp.reset(new int(1));
std::thread t1(thread_main);
std::thread t1(thread_do_nothing);
t1.join();
std::thread t2(thread_main); std::thread t2(thread_main);
if (t1.joinable()) { std::cout << "Detach thread " << std::hex << t2.get_id() << std::endl;
std::cout << "Join thread " << std::hex << t1.get_id() << std::endl; t2.detach();
t1.join(); TEST_ASSERT_FALSE(t2.joinable());
std::thread t3(thread_main);
std::thread t4(thread_main);
if (t3.joinable()) {
std::cout << "Join thread " << std::hex << t3.get_id() << std::endl;
t3.join();
} }
if (t2.joinable()) { if (t4.joinable()) {
std::cout << "Join thread " << std::hex << t2.get_id() << std::endl; std::cout << "Join thread " << std::hex << t4.get_id() << std::endl;
t2.join(); t4.join();
} }
std::cout << "TEST END!" << std::endl;
} }