From edb2400742d2fded2a2a19f349cf8842c1f77f36 Mon Sep 17 00:00:00 2001 From: Amey Inamdar Date: Thu, 12 Oct 2017 16:28:19 +0530 Subject: [PATCH] pthread: Added support for pthread condition variables This is required for std::condition_variable support Signed-off-by: Amey Inamdar --- components/pthread/pthread_cond_var.c | 211 ++++++++++++++++++ components/pthread/test/test_cxx_cond_var.cpp | 48 ++++ .../pthread/test/test_cxx_std_future.cpp | 31 +++ 3 files changed, 290 insertions(+) create mode 100644 components/pthread/pthread_cond_var.c create mode 100644 components/pthread/test/test_cxx_cond_var.cpp create mode 100644 components/pthread/test/test_cxx_std_future.cpp diff --git a/components/pthread/pthread_cond_var.c b/components/pthread/pthread_cond_var.c new file mode 100644 index 000000000..1d43787d3 --- /dev/null +++ b/components/pthread/pthread_cond_var.c @@ -0,0 +1,211 @@ +// Copyright 2017 Espressif Systems (Shanghai) PTE LTD +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is a simple implementation of pthread condition variables. In essence, +// the waiter creates its own semaphore to wait on and pushes it in the cond var +// specific list. Upon notify and broadcast, all the waiters for the given cond +// var are woken up. + +#include +#include +#include +#include "esp_err.h" +#include "esp_attr.h" +#include "freertos/FreeRTOS.h" +#include "freertos/task.h" +#include "freertos/semphr.h" +#include "freertos/list.h" + +#include +#include + +#define LOG_LOCAL_LEVEL CONFIG_LOG_DEFAULT_LEVEL +#include "esp_log.h" +const static char *TAG = "esp_pthread"; + +typedef struct esp_pthread_cond_waiter { + SemaphoreHandle_t wait_sem; ///< task specific semaphore to wait on + TAILQ_ENTRY(esp_pthread_cond_waiter) link; ///< stash on the list of semaphores to be notified +} esp_pthread_cond_waiter_t; + +typedef struct esp_pthread_cond { + _lock_t lock; ///< lock that protects the list of semaphores + TAILQ_HEAD(, esp_pthread_cond_waiter) waiter_list; ///< head of the list of semaphores +} esp_pthread_cond_t; + +int pthread_cond_signal(pthread_cond_t *cv) +{ + ESP_LOGV(TAG, "%s %p", __FUNCTION__, cv); + + if (cv == NULL || *cv == (pthread_cond_t) 0) { + return EINVAL; + } + + esp_pthread_cond_t *cond = (esp_pthread_cond_t *) *cv; + + _lock_acquire_recursive(&cond->lock); + esp_pthread_cond_waiter_t *entry; + entry = TAILQ_FIRST(&cond->waiter_list); + if (entry) { + xSemaphoreGive(entry->wait_sem); + } + _lock_release_recursive(&cond->lock); + + return 0; +} + +int pthread_cond_broadcast(pthread_cond_t *cv) +{ + ESP_LOGV(TAG, "%s %p", __FUNCTION__, cv); + + if (cv == NULL || *cv == (pthread_cond_t) 0) { + return EINVAL; + } + + esp_pthread_cond_t *cond = (esp_pthread_cond_t *) *cv; + + _lock_acquire_recursive(&cond->lock); + esp_pthread_cond_waiter_t *entry; + TAILQ_FOREACH(entry, &cond->waiter_list, link) { + xSemaphoreGive(entry->wait_sem); + } + _lock_release_recursive(&cond->lock); + + return 0; +} + +int pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mut) +{ + ESP_LOGV(TAG, "%s %p %p", __FUNCTION__, cv, mut); + + return pthread_cond_timedwait(cv, mut, NULL); +} + +int pthread_cond_timedwait(pthread_cond_t *cv, pthread_mutex_t *mut, const struct timespec *to) +{ + int ret; + TickType_t timeout_ticks; + + ESP_LOGV(TAG, "%s %p %p %p", __FUNCTION__, cv, mut, to); + + if (cv == NULL || *cv == (pthread_cond_t) 0) { + return EINVAL; + } + + esp_pthread_cond_t *cond = (esp_pthread_cond_t *) *cv; + + if (to == NULL) { + timeout_ticks = portMAX_DELAY; + } else { + struct timeval abs_time, cur_time, diff_time; + long timeout_msec; + + gettimeofday(&cur_time, NULL); + + abs_time.tv_sec = to->tv_sec; + abs_time.tv_usec = to->tv_nsec / 1000; + + if (timercmp(&abs_time, &cur_time, <)) { + /* As per the pthread spec, if the time has already + * passed, no sleep is required. + */ + timeout_msec = 0; + } else { + timersub(&abs_time, &cur_time, &diff_time); + timeout_msec = (diff_time.tv_sec * 1000) + (diff_time.tv_usec / 1000); + } + + if (timeout_msec <= 0) { + return ETIMEDOUT; + } + + timeout_ticks = timeout_msec / portTICK_PERIOD_MS; + } + + esp_pthread_cond_waiter_t w; + w.wait_sem = xSemaphoreCreateCounting(1, 0); /* First get will block */ + + _lock_acquire_recursive(&cond->lock); + TAILQ_INSERT_TAIL(&cond->waiter_list, &w, link); + _lock_release_recursive(&cond->lock); + pthread_mutex_unlock(mut); + + if (xSemaphoreTake(w.wait_sem, timeout_ticks) == pdTRUE) { + ret = 0; + } else { + ret = ETIMEDOUT; + } + + _lock_acquire_recursive(&cond->lock); + TAILQ_REMOVE(&cond->waiter_list, &w, link); + _lock_release_recursive(&cond->lock); + vSemaphoreDelete(w.wait_sem); + + pthread_mutex_lock(mut); + return ret; +} + +int pthread_condattr_init(pthread_condattr_t *attr) +{ + ESP_LOGV(TAG, "%s not yet implemented (%p)", __FUNCTION__, attr); + return ENOSYS; +} + +int pthread_cond_init(pthread_cond_t *cv, const pthread_condattr_t *att) +{ + (void) att; /* Unused argument as of now */ + + ESP_LOGV(TAG, "%s %p %p", __FUNCTION__, cv, att); + + if (cv == NULL) { + return EINVAL; + } + + esp_pthread_cond_t *cond = (esp_pthread_cond_t *) calloc(1, sizeof(esp_pthread_cond_t)); + if (cond == NULL) { + return ENOMEM; + } + + _lock_init_recursive(&cond->lock); + TAILQ_INIT(&cond->waiter_list); + + *cv = (pthread_cond_t) cond; + return 0; +} + +int pthread_cond_destroy(pthread_cond_t *cv) +{ + int ret = 0; + + ESP_LOGV(TAG, "%s %p", __FUNCTION__, cv); + if (cv == NULL || *cv == (pthread_cond_t) 0) { + return EINVAL; + } + + esp_pthread_cond_t *cond = (esp_pthread_cond_t *) *cv; + + _lock_acquire_recursive(&cond->lock); + if (!TAILQ_EMPTY(&cond->waiter_list)) { + ret = EBUSY; + } + _lock_release_recursive(&cond->lock); + + if (ret == 0) { + *cv = (pthread_cond_t) 0; + _lock_close_recursive(&cond->lock); + free(cond); + } + + return ret; +} diff --git a/components/pthread/test/test_cxx_cond_var.cpp b/components/pthread/test/test_cxx_cond_var.cpp new file mode 100644 index 000000000..ccb411ae6 --- /dev/null +++ b/components/pthread/test/test_cxx_cond_var.cpp @@ -0,0 +1,48 @@ +#include +#include +#include +#include +#include +#include +#include "unity.h" + +#if __GTHREADS && __GTHREADS_CXX0X + +std::condition_variable cv; +std::mutex cv_m; +std::atomic i{0}; + +static void waits(int idx, int timeout_ms) +{ + std::unique_lock lk(cv_m); + auto now = std::chrono::system_clock::now(); + + if(cv.wait_until(lk, now + std::chrono::milliseconds(timeout_ms), [](){return i == 1;})) + std::cout << "Thread " << idx << " finished waiting. i == " << i << '\n'; + else + std::cout << "Thread " << idx << " timed out. i == " << i << '\n'; +} + +static void signals(int signal_ms) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(signal_ms)); + std::cout << "Notifying...\n"; + cv.notify_all(); + std::this_thread::sleep_for(std::chrono::milliseconds(signal_ms)); + i = 1; + std::cout << "Notifying again...\n"; + cv.notify_all(); +} + +TEST_CASE("C++ condition_variable", "[std::condition_variable]") +{ + i = 0; + std::thread t1(waits, 1, 100), t2(waits, 2, 800), t3(signals, 200); + + t1.join(); + t2.join(); + t3.join(); + + std::cout << "All threads joined\n"; +} +#endif diff --git a/components/pthread/test/test_cxx_std_future.cpp b/components/pthread/test/test_cxx_std_future.cpp new file mode 100644 index 000000000..bbf8d6242 --- /dev/null +++ b/components/pthread/test/test_cxx_std_future.cpp @@ -0,0 +1,31 @@ +#include +#include +#include +#include "unity.h" + +#if __GTHREADS && __GTHREADS_CXX0X +TEST_CASE("C++ future", "[std::future]") +{ + // future from a packaged_task + std::packaged_task task([]{ return 7; }); // wrap the function + std::future f1 = task.get_future(); // get a future + std::thread t(std::move(task)); // launch on a thread + + // future from an async() + std::future f2 = std::async(std::launch::async, []{ return 8; }); + + // future from a promise + std::promise p; + std::future f3 = p.get_future(); + std::thread( [&p]{ p.set_value_at_thread_exit(9); }).detach(); + + std::cout << "Waiting..." << std::flush; + f1.wait(); + f2.wait(); + f3.wait(); + std::cout << "Done!\nResults are: " + << f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n'; + t.join(); +} +#endif +