diff --git a/lib/include/srsran/common/thread_pool.h b/lib/include/srsran/common/thread_pool.h index efed3f953..a2ad3f1c6 100644 --- a/lib/include/srsran/common/thread_pool.h +++ b/lib/include/srsran/common/thread_pool.h @@ -20,13 +20,13 @@ #ifndef SRSRAN_THREAD_POOL_H #define SRSRAN_THREAD_POOL_H +#include "srsran/adt/circular_buffer.h" #include "srsran/adt/move_callback.h" #include "srsran/srslog/srslog.h" #include #include #include #include -#include #include #include #include @@ -89,7 +89,9 @@ private: class task_thread_pool { - using task_t = srsran::move_callback; + using task_t = srsran::move_callback; + static constexpr uint32_t max_task_shift = 14; + static constexpr uint32_t max_task_num = 1u << max_task_shift; public: task_thread_pool(uint32_t nof_workers = 1, bool start_deferred = false, int32_t prio_ = -1, uint32_t mask_ = 255); @@ -130,7 +132,7 @@ private: uint32_t mask = 255; srslog::basic_logger& logger; - std::queue pending_tasks; + srsran::dyn_circular_buffer pending_tasks; std::vector > workers; mutable std::mutex queue_mutex; std::condition_variable cv_empty; diff --git a/lib/src/common/thread_pool.cc b/lib/src/common/thread_pool.cc index 8a4391a35..ea40aa6c8 100644 --- a/lib/src/common/thread_pool.cc +++ b/lib/src/common/thread_pool.cc @@ -253,7 +253,7 @@ uint32_t thread_pool::get_nof_workers() *************************************************************************/ task_thread_pool::task_thread_pool(uint32_t nof_workers, bool start_deferred, int32_t prio_, uint32_t mask_) : - logger(srslog::fetch_basic_logger("POOL")), workers(std::max(1u, nof_workers)) + logger(srslog::fetch_basic_logger("POOL")), pending_tasks(max_task_num), workers(std::max(1u, nof_workers)) { if (not start_deferred) { start(prio_, mask_); @@ -322,6 +322,10 @@ void task_thread_pool::push_task(task_t&& task) { { std::lock_guard lock(queue_mutex); + if (pending_tasks.full()) { + logger.error("Cannot push anymore tasks into the queue, maximum size is %u", uint32_t(max_task_num)); + return; + } pending_tasks.push(std::move(task)); } cv_empty.notify_one(); @@ -358,7 +362,7 @@ bool task_thread_pool::worker_t::wait_task(task_t* task) return false; } if (task) { - *task = std::move(parent->pending_tasks.front()); + *task = std::move(parent->pending_tasks.top()); } parent->pending_tasks.pop(); return true;