/** * * \section COPYRIGHT * * Copyright 2013-2020 Software Radio Systems Limited * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the distribution. * */ /****************************************************************************** * File: thread_pool.h * Description: Implements a pool of threads. Pending tasks to execute are * identified by a pointer. * Reference: *****************************************************************************/ #ifndef SRSLTE_THREAD_POOL_H #define SRSLTE_THREAD_POOL_H #include #include #include #include #include #include #include #include #include #include "srslte/common/threads.h" namespace srslte { class thread_pool { public: class worker : public thread { public: worker(); ~worker() = default; void setup(uint32_t id, thread_pool* parent, uint32_t prio = 0, uint32_t mask = 255); uint32_t get_id(); void release(); protected: virtual void work_imp() = 0; private: uint32_t my_id = 0; thread_pool* my_parent = nullptr; void run_thread(); void wait_to_start(); void finished(); }; thread_pool(uint32_t nof_workers); void init_worker(uint32_t id, worker*, uint32_t prio = 0, uint32_t mask = 255); void stop(); worker* wait_worker_id(uint32_t id); worker* wait_worker(uint32_t tti); worker* wait_worker_nb(uint32_t tti); void start_worker(worker*); void start_worker(uint32_t id); worker* get_worker(uint32_t id); uint32_t get_nof_workers(); private: bool find_finished_worker(uint32_t tti, uint32_t* id); typedef enum { STOP, IDLE, START_WORK, WORKER_READY, WORKING } worker_status; std::vector workers = {}; uint32_t nof_workers = 0; uint32_t max_workers = 0; bool running = false; std::condition_variable cvar_queue = {}; std::mutex mutex_queue = {}; std::vector status = {}; std::vector cvar_worker = {}; }; class task_thread_pool { using task_t = std::function; public: explicit task_thread_pool(uint32_t nof_workers); ~task_thread_pool(); void start(int32_t prio = -1, uint32_t mask = 255); void stop(); void push_task(const task_t& task); void push_task(task_t&& task); uint32_t nof_pending_tasks(); size_t nof_workers() const { return workers.size(); } private: class worker_t : public thread { public: explicit worker_t(task_thread_pool* parent_, uint32_t id); void stop(); void setup(int32_t prio, uint32_t mask); bool is_running() const { return running; } uint32_t id() const { return id_; } void run_thread() override; private: bool wait_task(task_t* task); task_thread_pool* parent = nullptr; uint32_t id_ = 0; bool running = false; }; std::queue pending_tasks; std::vector workers; std::mutex queue_mutex; std::condition_variable cv_empty; bool running; }; } // namespace srslte #endif // SRSLTE_THREAD_POOL_H