diff --git a/lib/include/srslte/common/thread_pool.h b/lib/include/srslte/common/thread_pool.h index 16fce9010..5bc607ba1 100644 --- a/lib/include/srslte/common/thread_pool.h +++ b/lib/include/srslte/common/thread_pool.h @@ -51,7 +51,6 @@ public: public: worker(); void setup(uint32_t id, thread_pool* parent, uint32_t prio = 0, uint32_t mask = 255); - virtual void stop(); uint32_t get_id(); void release(); @@ -59,9 +58,9 @@ public: virtual void work_imp() = 0; private: - uint32_t my_id; - thread_pool* my_parent; - bool running; + uint32_t my_id = 0; + thread_pool* my_parent = nullptr; + void run_thread(); void wait_to_start(); void finished(); @@ -70,7 +69,7 @@ public: thread_pool(uint32_t nof_workers); void init_worker(uint32_t id, worker*, uint32_t prio = 0, uint32_t mask = 255); void stop(); - worker* wait_worker(); + worker* wait_worker_id(uint32_t id); worker* wait_worker(uint32_t tti); worker* wait_worker_nb(uint32_t tti); void start_worker(worker*); @@ -81,18 +80,16 @@ public: private: bool find_finished_worker(uint32_t tti, uint32_t* id); - typedef enum { IDLE, START_WORK, WORKER_READY, WORKING } worker_status; + typedef enum { STOP, IDLE, START_WORK, WORKER_READY, WORKING } worker_status; - std::vector workers; - uint32_t nof_workers; - uint32_t max_workers; - bool running; - pthread_cond_t cvar_queue; - pthread_mutex_t mutex_queue; - std::vector status; - std::vector cvar; - std::vector mutex; - std::stack available_workers; + std::vector workers = {}; + uint32_t nof_workers = 0; + uint32_t max_workers = 0; + bool running = false; + std::condition_variable cvar_queue = {}; + std::mutex mutex_queue = {}; + std::vector status = {}; + std::vector cvar_worker = {}; }; class task_thread_pool diff --git a/lib/src/common/test/thread_pool_test.cc b/lib/src/common/test/thread_pool_test.cc index c1358d95f..a4fad8e13 100644 --- a/lib/src/common/test/thread_pool_test.cc +++ b/lib/src/common/test/thread_pool_test.cc @@ -133,7 +133,7 @@ int main(int argc, char** argv) srslte::log_filter radio_log("radio", &logger); std::vector > worker_logs; - radio_log.set_level("info"); + radio_log.set_level("none"); // Radio dummy_radio radio(radio_log); @@ -146,7 +146,7 @@ int main(int argc, char** argv) // Create log filter srslte::log_filter* log_filter = new srslte::log_filter(log_name, &logger); - log_filter->set_level("info"); + log_filter->set_level("none"); // Create worker auto* worker = new dummy_worker(i, &tti_semaphore, log_filter, &radio); diff --git a/lib/src/common/thread_pool.cc b/lib/src/common/thread_pool.cc index 205f758f9..88dea1f10 100644 --- a/lib/src/common/thread_pool.cc +++ b/lib/src/common/thread_pool.cc @@ -31,11 +31,9 @@ printf(fmt, __VA_ARGS__); \ } while (0) -#define USE_QUEUE - namespace srslte { -thread_pool::worker::worker() : my_id(0), running(false), my_parent(NULL), thread("THREAD_POOL_WORKER") {} +thread_pool::worker::worker() : thread("THREAD_POOL_WORKER") {} void thread_pool::worker::setup(uint32_t id, thread_pool* parent, uint32_t prio, uint32_t mask) { @@ -52,10 +50,9 @@ void thread_pool::worker::setup(uint32_t id, thread_pool* parent, uint32_t prio, void thread_pool::worker::run_thread() { set_name(std::string("WORKER") + std::to_string(my_id)); - running = true; - while (running) { + while (my_parent->status[my_id] != STOP) { wait_to_start(); - if (running) { + if (my_parent->status[my_id] != STOP) { work_imp(); finished(); } @@ -67,65 +64,53 @@ uint32_t thread_pool::worker::get_id() return my_id; } -void thread_pool::worker::stop() -{ - running = false; - pthread_cond_signal(&my_parent->cvar[my_id]); - wait_thread_finish(); -} - -thread_pool::thread_pool(uint32_t max_workers_) : - workers(max_workers_), - status(max_workers_), - cvar(max_workers_), - mutex(max_workers_) +thread_pool::thread_pool(uint32_t max_workers_) : workers(max_workers_), status(max_workers_), cvar_worker(max_workers_) { max_workers = max_workers_; for (uint32_t i = 0; i < max_workers; i++) { workers[i] = NULL; status[i] = IDLE; - pthread_mutex_init(&mutex[i], NULL); - pthread_cond_init(&cvar[i], NULL); } - pthread_mutex_init(&mutex_queue, NULL); - pthread_cond_init(&cvar_queue, NULL); running = true; nof_workers = 0; } void thread_pool::init_worker(uint32_t id, worker* obj, uint32_t prio, uint32_t mask) { + std::lock_guard lock(mutex_queue); if (id < max_workers) { if (id >= nof_workers) { nof_workers = id + 1; } - pthread_mutex_lock(&mutex_queue); workers[id] = obj; - available_workers.push(obj); obj->setup(id, this, prio, mask); - pthread_cond_signal(&cvar_queue); - pthread_mutex_unlock(&mutex_queue); + cvar_queue.notify_all(); } } void thread_pool::stop() { + mutex_queue.lock(); + /* Stop any thread waiting for available worker */ running = false; /* Now stop all workers */ for (uint32_t i = 0; i < nof_workers; i++) { if (workers[i]) { - workers[i]->stop(); - // Need to call start to wake it up - start_worker(i); - workers[i]->wait_thread_finish(); + debug_thread("stop(): stoping %d\n", i); + status[i] = STOP; + cvar_worker[i].notify_all(); + cvar_queue.notify_all(); } - pthread_cond_destroy(&cvar[i]); - pthread_mutex_destroy(&mutex[i]); } - pthread_cond_destroy(&cvar_queue); - pthread_mutex_destroy(&mutex_queue); + mutex_queue.unlock(); + + for (uint32_t i = 0; i < nof_workers; i++) { + debug_thread("stop(): waiting %d\n", i); + workers[i]->wait_thread_finish(); + debug_thread("stop(): done %d\n", i); + } } void thread_pool::worker::release() @@ -135,40 +120,28 @@ void thread_pool::worker::release() void thread_pool::worker::wait_to_start() { + std::unique_lock lock(my_parent->mutex_queue); debug_thread("wait_to_start() id=%d, status=%d, enter\n", my_id, my_parent->status[my_id]); - pthread_mutex_lock(&my_parent->mutex[my_id]); - while (my_parent->status[my_id] != START_WORK && running) { - pthread_cond_wait(&my_parent->cvar[my_id], &my_parent->mutex[my_id]); + while (my_parent->status[my_id] != START_WORK && my_parent->status[my_id] != STOP) { + my_parent->cvar_worker[my_id].wait(lock); + } + if (my_parent->status[my_id] != STOP) { + my_parent->status[my_id] = WORKING; } - my_parent->status[my_id] = WORKING; - pthread_mutex_unlock(&my_parent->mutex[my_id]); debug_thread("wait_to_start() id=%d, status=%d, exit\n", my_id, my_parent->status[my_id]); } void thread_pool::worker::finished() { -#ifdef USE_QUEUE - pthread_mutex_lock(&my_parent->mutex[my_id]); - my_parent->status[my_id] = IDLE; - pthread_mutex_unlock(&my_parent->mutex[my_id]); - - pthread_mutex_lock(&my_parent->mutex_queue); - pthread_cond_signal(&my_parent->cvar_queue); - pthread_mutex_unlock(&my_parent->mutex_queue); -#else - pthread_mutex_lock(&my_parent->mutex[my_id]); - my_parent->status[my_id] = IDLE; - pthread_cond_signal(&my_parent->cvar[my_id]); - pthread_mutex_unlock(&my_parent->mutex[my_id]); -#endif -} - -thread_pool::worker* thread_pool::wait_worker() -{ - return wait_worker(0); + std::lock_guard lock(my_parent->mutex_queue); + if (my_parent->status[my_id] != STOP) { + my_parent->status[my_id] = IDLE; + my_parent->cvar_worker[my_id].notify_all(); + my_parent->cvar_queue.notify_all(); + } } bool thread_pool::find_finished_worker(uint32_t tti, uint32_t* id) @@ -182,77 +155,73 @@ bool thread_pool::find_finished_worker(uint32_t tti, uint32_t* id) return false; } +thread_pool::worker* thread_pool::wait_worker_id(uint32_t id) +{ + std::unique_lock lock(mutex_queue); + + debug_thread("wait_worker_id() - enter - id=%d, state0=%d, state1=%d\n", id, status[0], status[1]); + + thread_pool::worker* ret = nullptr; + + while (status[id] != IDLE && running) { + cvar_queue.wait(lock); + } + if (running) { + ret = workers[id]; + status[id] = WORKER_READY; + } + debug_thread("wait_worker_id() - exit - id=%d\n", id); + return ret; +} + thread_pool::worker* thread_pool::wait_worker(uint32_t tti) { - thread_pool::worker* x; + std::unique_lock lock(mutex_queue); -#ifdef USE_QUEUE debug_thread("wait_worker() - enter - tti=%d, state0=%d, state1=%d\n", tti, status[0], status[1]); - pthread_mutex_lock(&mutex_queue); - uint32_t id = 0; + + thread_pool::worker* ret = nullptr; + uint32_t id = 0; + while (!find_finished_worker(tti, &id) && running) { - pthread_cond_wait(&cvar_queue, &mutex_queue); + cvar_queue.wait(lock); } - pthread_mutex_unlock(&mutex_queue); if (running) { - x = workers[id]; - pthread_mutex_lock(&mutex[id]); + ret = workers[id]; status[id] = WORKER_READY; - pthread_mutex_unlock(&mutex[id]); - } else { - x = NULL; } debug_thread("wait_worker() - exit - id=%d\n", id); -#else - - uint32_t id = tti % nof_workers; - pthread_mutex_lock(&mutex[id]); - while (status[id] != IDLE && running) { - pthread_cond_wait(&cvar[id], &mutex[id]); - } - if (running) { - x = (worker*)workers[id]; - status[id] = WORKER_READY; - } else { - x = NULL; - } - pthread_mutex_unlock(&mutex[id]); -#endif - return x; + return ret; } thread_pool::worker* thread_pool::wait_worker_nb(uint32_t tti) { - thread_pool::worker* x; + std::unique_lock lock(mutex_queue); - debug_thread("wait_worker() - enter - tti=%d, state0=%d, state1=%d\n", tti, status[0], status[1]); - pthread_mutex_lock(&mutex_queue); - uint32_t id = 0; - if (find_finished_worker(tti, &id)) { - x = workers[id]; - } else { - x = NULL; - } - pthread_mutex_unlock(&mutex_queue); - if (running && x) { - pthread_mutex_lock(&mutex[id]); + debug_thread("wait_worker_nb() - enter - tti=%d, state0=%d, state1=%d\n", tti, status[0], status[1]); + + thread_pool::worker* ret = nullptr; + uint32_t id = 0; + + if (find_finished_worker(tti, &id) && running) { + ret = workers[id]; status[id] = WORKER_READY; - pthread_mutex_unlock(&mutex[id]); - } else { - x = NULL; } - debug_thread("wait_worker() - exit - id=%d\n", id); - return x; + + debug_thread("wait_worker_nb() - exit - id=%d\n", id); + return ret; } void thread_pool::start_worker(uint32_t id) { + std::unique_lock lock(mutex_queue); if (id < nof_workers) { - pthread_mutex_lock(&mutex[id]); - status[id] = START_WORK; - pthread_cond_signal(&cvar[id]); - pthread_mutex_unlock(&mutex[id]); debug_thread("start_worker() id=%d, status=%d\n", id, status[id]); + if (status[id] != STOP) { + status[id] = START_WORK; + cvar_worker[id].notify_all(); + cvar_queue.notify_all(); + } } } diff --git a/srsenb/hdr/phy/sf_worker.h b/srsenb/hdr/phy/sf_worker.h index 4428f279d..11a6ff37c 100644 --- a/srsenb/hdr/phy/sf_worker.h +++ b/srsenb/hdr/phy/sf_worker.h @@ -37,7 +37,6 @@ public: sf_worker() = default; ~sf_worker(); void init(phy_common* phy, srslte::log* log_h); - void stop() final; cf_t* get_buffer_rx(uint32_t cc_idx, uint32_t antenna_idx); void set_time(uint32_t tti, uint32_t tx_worker_cnt, srslte_timestamp_t tx_time); diff --git a/srsenb/src/phy/sf_worker.cc b/srsenb/src/phy/sf_worker.cc index edf0e9cdb..69580d7b2 100644 --- a/srsenb/src/phy/sf_worker.cc +++ b/srsenb/src/phy/sf_worker.cc @@ -106,13 +106,6 @@ void sf_worker::init(phy_common* phy_, srslte::log* log_h_) #endif } -void sf_worker::stop() -{ - std::lock_guard lock(work_mutex); - running = false; - srslte::thread_pool::worker::stop(); -} - cf_t* sf_worker::get_buffer_rx(uint32_t cc_idx, uint32_t antenna_idx) { return cc_workers[cc_idx]->get_buffer_rx(antenna_idx); diff --git a/srsue/src/phy/sf_worker.cc b/srsue/src/phy/sf_worker.cc index abe04c5a7..d331f4e3d 100644 --- a/srsue/src/phy/sf_worker.cc +++ b/srsue/src/phy/sf_worker.cc @@ -90,7 +90,7 @@ void sf_worker::reset() bool sf_worker::set_cell(uint32_t cc_idx, srslte_cell_t cell_) { - std::lock_guard lock(mutex); + // std::lock_guard lock(mutex); if (cc_idx < cc_workers.size()) { if (!cc_workers[cc_idx]->set_cell(cell_)) {