diff --git a/lib/include/srslte/common/task_scheduler.h b/lib/include/srslte/common/task_scheduler.h index 12111adf3..ed564f8f7 100644 --- a/lib/include/srslte/common/task_scheduler.h +++ b/lib/include/srslte/common/task_scheduler.h @@ -23,26 +23,17 @@ namespace srslte { class task_scheduler { public: - explicit task_scheduler(uint32_t default_extern_tasks_size = 512, - uint32_t nof_background_threads = 0, - uint32_t nof_timers_prealloc = 100) : - external_tasks{default_extern_tasks_size}, - timers{nof_timers_prealloc}, - background_tasks{nof_background_threads} + explicit task_scheduler(uint32_t default_extern_tasks_size = 512, uint32_t nof_timers_prealloc = 100) : + external_tasks{default_extern_tasks_size}, timers{nof_timers_prealloc} { background_queue_id = external_tasks.add_queue(); - - // Start background thread - if (background_tasks.nof_workers() > 0) { - background_tasks.start(); - } } + task_scheduler(const task_scheduler&) = delete; + task_scheduler(task_scheduler&&) = delete; + task_scheduler& operator=(const task_scheduler&) = delete; + task_scheduler& operator=(task_scheduler&&) = delete; - void stop() - { - background_tasks.stop(); - external_tasks.reset(); - } + void stop() { external_tasks.reset(); } srslte::unique_timer get_unique_timer() { return timers.get_unique_timer(); } @@ -56,17 +47,6 @@ public: //! Enqueues internal task to be run in next tic void defer_task(srslte::move_task_t func) { internal_tasks.push_back(std::move(func)); } - //! Delegates a task to a thread pool that runs in the background - void enqueue_background_task(std::function f) - { - if (background_tasks.nof_workers() > 0) { - background_tasks.push_task(std::move(f)); - } else { - external_tasks.push(background_queue_id, - std::bind([](const std::function& task) { task(0); }, std::move(f))); - } - } - //! Defer the handling of the result of a background task to next tic void notify_background_task_result(srslte::move_task_t task) { @@ -117,10 +97,9 @@ private: } } - srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks - int background_queue_id = -1; ///< Queue for handling the outcomes of tasks run in the background - srslte::task_multiqueue external_tasks; - srslte::timer_handler timers; + int background_queue_id = -1; ///< Queue for handling the outcomes of tasks run in the background + srslte::task_multiqueue external_tasks; + srslte::timer_handler timers; std::deque internal_tasks; ///< enqueues stack tasks from within main thread. Avoids locking }; @@ -131,8 +110,7 @@ public: task_sched_handle(task_scheduler* sched_) : sched(sched_) {} srslte::unique_timer get_unique_timer() { return sched->get_unique_timer(); } - void enqueue_background_task(std::function f) { sched->enqueue_background_task(std::move(f)); } - void notify_background_task_result(srslte::move_task_t task) + void notify_background_task_result(srslte::move_task_t task) { sched->notify_background_task_result(std::move(task)); } @@ -153,8 +131,7 @@ public: ext_task_sched_handle(task_scheduler* sched_) : sched(sched_) {} srslte::unique_timer get_unique_timer() { return sched->get_unique_timer(); } - void enqueue_background_task(std::function f) { sched->enqueue_background_task(std::move(f)); } - void notify_background_task_result(srslte::move_task_t task) + void notify_background_task_result(srslte::move_task_t task) { sched->notify_background_task_result(std::move(task)); } diff --git a/lib/include/srslte/common/thread_pool.h b/lib/include/srslte/common/thread_pool.h index c79c0d9f6..af170a568 100644 --- a/lib/include/srslte/common/thread_pool.h +++ b/lib/include/srslte/common/thread_pool.h @@ -20,6 +20,8 @@ #ifndef SRSLTE_THREAD_POOL_H #define SRSLTE_THREAD_POOL_H +#include "srslte/adt/move_callback.h" +#include "srslte/srslog/srslog.h" #include #include #include @@ -86,17 +88,22 @@ private: class task_thread_pool { - using task_t = std::function; + using task_t = srslte::move_callback; public: - explicit task_thread_pool(uint32_t nof_workers); + task_thread_pool(uint32_t nof_workers = 1, bool start_deferred = false, int32_t prio_ = -1, uint32_t mask_ = 255); + task_thread_pool(const task_thread_pool&) = delete; + task_thread_pool(task_thread_pool&&) = delete; + task_thread_pool& operator=(const task_thread_pool&) = delete; + task_thread_pool& operator=(task_thread_pool&&) = delete; ~task_thread_pool(); - void start(int32_t prio = -1, uint32_t mask = 255); - void stop(); - void push_task(const task_t& task); + void stop(); + void start(int32_t prio_ = -1, uint32_t mask_ = 255); + void set_nof_workers(uint32_t nof_workers); + void push_task(task_t&& task); - uint32_t nof_pending_tasks(); + uint32_t nof_pending_tasks() const; size_t nof_workers() const { return workers.size(); } private: @@ -105,7 +112,6 @@ private: public: explicit worker_t(task_thread_pool* parent_, uint32_t id); void stop(); - void setup(int32_t prio, uint32_t mask); bool is_running() const { return running; } uint32_t id() const { return id_; } @@ -119,13 +125,19 @@ private: bool running = false; }; - std::queue pending_tasks; - std::vector workers; - std::mutex queue_mutex; - std::condition_variable cv_empty; - bool running; + int32_t prio = -1; + uint32_t mask = 255; + srslog::basic_logger& logger; + + std::queue pending_tasks; + std::vector > workers; + mutable std::mutex queue_mutex; + std::condition_variable cv_empty; + bool running = false; }; +srslte::task_thread_pool& get_background_workers(); + } // namespace srslte #endif // SRSLTE_THREAD_POOL_H diff --git a/lib/include/srslte/test/ue_test_interfaces.h b/lib/include/srslte/test/ue_test_interfaces.h index ee878e860..27c7b9016 100644 --- a/lib/include/srslte/test/ue_test_interfaces.h +++ b/lib/include/srslte/test/ue_test_interfaces.h @@ -40,7 +40,7 @@ public: // run pending tasks without updating timers void run_pending_tasks() { task_sched.run_pending_tasks(); } - srslte::task_scheduler task_sched{512, 0, 100}; + srslte::task_scheduler task_sched{512, 100}; }; class rlc_dummy_interface : public rlc_interface_mac diff --git a/lib/src/common/thread_pool.cc b/lib/src/common/thread_pool.cc index 567f7ae35..ee47d1264 100644 --- a/lib/src/common/thread_pool.cc +++ b/lib/src/common/thread_pool.cc @@ -11,6 +11,7 @@ */ #include "srslte/common/thread_pool.h" +#include "srslte/srslog/srslog.h" #include #include #include @@ -244,11 +245,11 @@ uint32_t thread_pool::get_nof_workers() * once a worker is available *************************************************************************/ -task_thread_pool::task_thread_pool(uint32_t nof_workers) : running(false) +task_thread_pool::task_thread_pool(uint32_t nof_workers, bool start_deferred, int32_t prio_, uint32_t mask_) : + logger(srslog::fetch_basic_logger("POOL")), workers(std::max(1u, nof_workers)) { - workers.reserve(nof_workers); - for (uint32_t i = 0; i < nof_workers; ++i) { - workers.emplace_back(this, i); + if (not start_deferred) { + start(prio_, mask_); } } @@ -257,12 +258,34 @@ task_thread_pool::~task_thread_pool() stop(); } -void task_thread_pool::start(int32_t prio, uint32_t mask) +void task_thread_pool::set_nof_workers(uint32_t nof_workers) { std::lock_guard lock(queue_mutex); + if (workers.size() > nof_workers) { + logger.error("Reducing the number of workers dynamically not supported"); + return; + } + uint32_t old_size = workers.size(); + workers.resize(nof_workers); + if (running) { + for (uint32_t i = old_size; i < nof_workers; ++i) { + workers[i].reset(new worker_t(this, i)); + } + } +} + +void task_thread_pool::start(int32_t prio_, uint32_t mask_) +{ + std::lock_guard lock(queue_mutex); + if (running) { + logger.error("Starting thread pool that has already started"); + return; + } + prio = prio_; + mask = mask_; running = true; - for (worker_t& w : workers) { - w.setup(prio, mask); + for (uint32_t i = 0; i < workers.size(); ++i) { + workers[i].reset(new worker_t(this, i)); } } @@ -272,8 +295,8 @@ void task_thread_pool::stop() if (running) { running = false; bool workers_running = false; - for (worker_t& w : workers) { - if (w.is_running()) { + for (std::unique_ptr& w : workers) { + if (w->is_running()) { workers_running = true; break; } @@ -282,21 +305,12 @@ void task_thread_pool::stop() if (workers_running) { cv_empty.notify_all(); } - for (worker_t& w : workers) { - w.stop(); + for (std::unique_ptr& w : workers) { + w->stop(); } } } -void task_thread_pool::push_task(const task_t& task) -{ - { - std::lock_guard lock(queue_mutex); - pending_tasks.push(task); - } - cv_empty.notify_one(); -} - void task_thread_pool::push_task(task_t&& task) { { @@ -306,17 +320,20 @@ void task_thread_pool::push_task(task_t&& task) cv_empty.notify_one(); } -uint32_t task_thread_pool::nof_pending_tasks() +uint32_t task_thread_pool::nof_pending_tasks() const { std::lock_guard lock(queue_mutex); return pending_tasks.size(); } task_thread_pool::worker_t::worker_t(srslte::task_thread_pool* parent_, uint32_t my_id) : - parent(parent_), - thread(std::string("TASKWORKER") + std::to_string(my_id)), - id_(my_id) + parent(parent_), thread(std::string("TASKWORKER") + std::to_string(my_id)), id_(my_id), running(true) { + if (parent->mask == 255) { + start(parent->prio); + } else { + start_cpu_mask(parent->prio, parent->mask); + } } void task_thread_pool::worker_t::stop() @@ -324,16 +341,6 @@ void task_thread_pool::worker_t::stop() wait_thread_finish(); } -void task_thread_pool::worker_t::setup(int32_t prio, uint32_t mask) -{ - running = true; - if (mask == 255) { - start(prio); - } else { - start_cpu_mask(prio, mask); - } -} - bool task_thread_pool::worker_t::wait_task(task_t* task) { std::unique_lock lock(parent->queue_mutex); @@ -355,7 +362,7 @@ void task_thread_pool::worker_t::run_thread() // main loop task_t task; while (wait_task(&task)) { - task(id()); + task(); } // on exit, notify pool class @@ -363,4 +370,11 @@ void task_thread_pool::worker_t::run_thread() running = false; } +// Global thread pool for long, low-priority tasks +task_thread_pool& get_background_workers() +{ + static task_thread_pool background_workers; + return background_workers; +} + } // namespace srslte diff --git a/lib/test/common/queue_test.cc b/lib/test/common/queue_test.cc index e880e3796..05a489dac 100644 --- a/lib/test/common/queue_test.cc +++ b/lib/test/common/queue_test.cc @@ -14,6 +14,7 @@ #include "srslte/common/multiqueue.h" #include "srslte/common/thread_pool.h" #include +#include #include #include @@ -234,17 +235,15 @@ int test_task_thread_pool() std::cout << "\n====== TEST task thread pool test 1: start ======\n"; // Description: check whether the tasks are successfully distributed between workers - uint32_t nof_workers = 4, nof_runs = 10000; - std::vector count_worker(nof_workers, 0); - std::vector count_mutex(nof_workers); + uint32_t nof_workers = 4, nof_runs = 10000; + std::mutex count_mutex; + std::map count_worker; task_thread_pool thread_pool(nof_workers); - thread_pool.start(); - auto task = [&count_worker, &count_mutex](uint32_t worker_id) { - std::lock_guard lock(count_mutex[worker_id]); - // std::cout << "hello world from worker " << worker_id << std::endl; - count_worker[worker_id]++; + auto task = [&count_worker, &count_mutex]() { + std::lock_guard lock(count_mutex); + count_worker[std::this_thread::get_id()]++; }; for (uint32_t i = 0; i < nof_runs; ++i) { @@ -259,12 +258,12 @@ int test_task_thread_pool() thread_pool.stop(); uint32_t total_count = 0; - for (uint32_t i = 0; i < nof_workers; ++i) { - if (count_worker[i] < 10) { - printf("WARNING: the number of tasks %d assigned to worker %d is too low\n", count_worker[i], i); + for (auto& w : count_worker) { + if (w.second < 10) { + std::cout << "WARNING: the number of tasks " << w.second << " assigned to worker " << w.first << " is too low"; } - total_count += count_worker[i]; - printf("worker %d: %d runs\n", i, count_worker[i]); + total_count += w.second; + std::cout << "worker " << w.first << ": " << w.second << " runs\n"; } if (total_count != nof_runs) { printf("Number of task runs=%d does not match total=%d\n", total_count, nof_runs); @@ -289,14 +288,14 @@ int test_task_thread_pool2() task_thread_pool thread_pool(nof_workers); thread_pool.start(); - auto task = [&workers_started, &workers_finished, &mut](uint32_t worker_id) { + auto task = [&workers_started, &workers_finished, &mut]() { { std::lock_guard lock(mut); workers_started++; } sleep(1); std::lock_guard lock(mut); - std::cout << "worker " << worker_id << " has finished\n"; + std::cout << "worker has finished\n"; workers_finished++; }; diff --git a/lib/test/common/task_scheduler_test.cc b/lib/test/common/task_scheduler_test.cc index c7bfe89f9..dcbb04df4 100644 --- a/lib/test/common/task_scheduler_test.cc +++ b/lib/test/common/task_scheduler_test.cc @@ -39,12 +39,10 @@ int test_task_scheduler_no_pool() // TEST: background task is run, despite there are no pool workers state = task_result::null; - task_sched.enqueue_background_task([&task_sched, &state](uint32_t worker_id) { + srslte::get_background_workers().push_task([&task_sched, &state]() { task_sched.notify_background_task_result([&state]() { state = task_result::external; }); }); TESTASSERT(state == task_result::null); - task_sched.run_next_task(); // runs background task - TESTASSERT(state == task_result::null); task_sched.run_next_task(); // runs notification TESTASSERT(state == task_result::external); @@ -56,7 +54,7 @@ int test_task_scheduler_with_pool() srslte::task_scheduler task_sched{5, 2}; task_result state = task_result::null; - task_sched.enqueue_background_task([&task_sched, &state](uint32_t worker_id) { + srslte::get_background_workers().push_task([&task_sched, &state]() { task_sched.notify_background_task_result([&state]() { state = task_result::external; }); }); TESTASSERT(state == task_result::null); diff --git a/srsenb/src/stack/enb_stack_lte.cc b/srsenb/src/stack/enb_stack_lte.cc index 304b4184a..32fc1c2b5 100644 --- a/srsenb/src/stack/enb_stack_lte.cc +++ b/srsenb/src/stack/enb_stack_lte.cc @@ -29,7 +29,7 @@ enb_stack_lte::enb_stack_lte(srslte::logger* logger_, srslog::sink& log_sink) : s1ap_logger(srslog::fetch_basic_logger("S1AP", log_sink, false)), gtpu_logger(srslog::fetch_basic_logger("GTPU", log_sink, false)), stack_logger(srslog::fetch_basic_logger("STCK", log_sink, false)), - task_sched(512, 0, 128), + task_sched(512, 128), pdcp(&task_sched, pdcp_logger), mac(&task_sched, mac_logger), rlc(rlc_logger), @@ -39,6 +39,7 @@ enb_stack_lte::enb_stack_lte(srslte::logger* logger_, srslog::sink& log_sink) : logger(logger_), mac_pcap(srslte_rat_t::lte) { + get_background_workers().set_nof_workers(2); enb_task_queue = task_sched.make_task_queue(); mme_task_queue = task_sched.make_task_queue(); gtpu_task_queue = task_sched.make_task_queue(); @@ -187,6 +188,7 @@ void enb_stack_lte::stop_impl() } task_sched.stop(); + get_background_workers().stop(); started = false; } diff --git a/srsenb/src/stack/gnb_stack_nr.cc b/srsenb/src/stack/gnb_stack_nr.cc index e2799524f..63d5f481b 100644 --- a/srsenb/src/stack/gnb_stack_nr.cc +++ b/srsenb/src/stack/gnb_stack_nr.cc @@ -16,7 +16,7 @@ namespace srsenb { -gnb_stack_nr::gnb_stack_nr(srslte::logger* logger_) : logger(logger_), task_sched{512, 1, 128}, thread("gNB") +gnb_stack_nr::gnb_stack_nr(srslte::logger* logger_) : logger(logger_), task_sched{512, 128}, thread("gNB") { m_mac.reset(new mac_nr()); m_rlc.reset(new rlc_nr("RLC")); @@ -115,6 +115,7 @@ void gnb_stack_nr::stop() m_pdcp->stop(); m_mac->stop(); + srslte::get_background_workers().stop(); running = false; } } diff --git a/srsenb/src/stack/mac/mac.cc b/srsenb/src/stack/mac/mac.cc index 4bbaa1f71..23c8816fd 100644 --- a/srsenb/src/stack/mac/mac.cc +++ b/srsenb/src/stack/mac/mac.cc @@ -476,6 +476,8 @@ uint16_t mac::allocate_ue() ue_db[rnti] = std::move(ue_ptr); } + // Allocate one new UE object in advance + srslte::get_background_workers().push_task([this]() { prealloc_ue(1); }); return rnti; } @@ -486,11 +488,6 @@ uint16_t mac::reserve_new_crnti(const sched_interface::ue_cfg_t& ue_cfg) return rnti; } - task_sched.enqueue_background_task([this](uint32_t wid) { - // Allocate one new UE object in advance - prealloc_ue(1); - }); - // Add new user to the scheduler so that it can RX/TX SRB0 if (scheduler.ue_cfg(rnti, ue_cfg) != SRSLTE_SUCCESS) { logger.error("Registering new user rnti=0x%x to SCHED", rnti); @@ -556,9 +553,6 @@ void mac::rach_detected(uint32_t tti, uint32_t enb_cc_idx, uint32_t preamble_idx time_adv, rnti); }); - - // Allocate one new UE object in advance - prealloc_ue(1); } void mac::prealloc_ue(uint32_t nof_ue) diff --git a/srsenb/src/stack/rrc/rrc.cc b/srsenb/src/stack/rrc/rrc.cc index a49cc3356..dcc89857d 100644 --- a/srsenb/src/stack/rrc/rrc.cc +++ b/srsenb/src/stack/rrc/rrc.cc @@ -160,9 +160,6 @@ int rrc::add_user(uint16_t rnti, const sched_interface::ue_cfg_t& sched_ue_cfg) logger.error("Adding user rnti=0x%x - Failed to allocate user resources", rnti); return SRSLTE_ERROR; } - if (ue_pool.capacity() <= 4) { - task_sched.defer_task([]() { rrc::ue_pool.reserve(16); }); - } users.insert(std::make_pair(rnti, std::move(u))); } rlc->add_user(rnti); diff --git a/srsenb/src/stack/rrc/rrc_ue.cc b/srsenb/src/stack/rrc/rrc_ue.cc index 5a3b6ac4d..92a74f427 100644 --- a/srsenb/src/stack/rrc/rrc_ue.cc +++ b/srsenb/src/stack/rrc/rrc_ue.cc @@ -65,7 +65,11 @@ int rrc::ue::init() void* rrc::ue::operator new(size_t sz) { assert(sz == sizeof(ue)); - return rrc::ue_pool.allocate_node(sz); + void* memchunk = rrc::ue_pool.allocate_node(sz); + if (ue_pool.capacity() <= 4) { + srslte::get_background_workers().push_task([]() { rrc::ue_pool.reserve(4); }); + } + return memchunk; } void rrc::ue::operator delete(void* ptr)noexcept { diff --git a/srsue/src/stack/ue_stack_lte.cc b/srsue/src/stack/ue_stack_lte.cc index 56957d31f..7cf049b2b 100644 --- a/srsue/src/stack/ue_stack_lte.cc +++ b/srsue/src/stack/ue_stack_lte.cc @@ -43,10 +43,11 @@ ue_stack_lte::ue_stack_lte(srslog::sink& log_sink) : pdcp(&task_sched, "PDCP"), nas(&task_sched), thread("STACK"), - task_sched(512, 2, 64), + task_sched(512, 64), tti_tprof("tti_tprof", "STCK", TTI_STAT_PERIOD), mac_pcap(srslte_rat_t::lte) { + get_background_workers().set_nof_workers(2); ue_task_queue = task_sched.make_task_queue(); gw_queue_id = task_sched.make_task_queue(); cfg_task_queue = task_sched.make_task_queue(); @@ -156,16 +157,7 @@ int ue_stack_lte::init(const stack_args_t& args_, srslte::logger* logger_) mac_nr_args_t mac_nr_args = {}; mac_nr.init(mac_nr_args, phy_nr, &rlc); - rrc_nr.init(phy_nr, - &mac_nr, - &rlc, - &pdcp, - gw, - &rrc, - usim.get(), - task_sched.get_timer_handler(), - nullptr, - args.rrc_nr); + rrc_nr.init(phy_nr, &mac_nr, &rlc, &pdcp, gw, &rrc, usim.get(), task_sched.get_timer_handler(), nullptr, args.rrc_nr); rrc.init(phy, &mac, &rlc, &pdcp, &nas, usim.get(), gw, &rrc_nr, args.rrc); running = true; @@ -200,6 +192,9 @@ void ue_stack_lte::stop_impl() if (args.pcap.nas_enable) { nas_pcap.close(); } + + task_sched.stop(); + get_background_workers().stop(); } bool ue_stack_lte::switch_on() diff --git a/srsue/src/stack/ue_stack_nr.cc b/srsue/src/stack/ue_stack_nr.cc index 8d7112de4..b71e11a83 100644 --- a/srsue/src/stack/ue_stack_nr.cc +++ b/srsue/src/stack/ue_stack_nr.cc @@ -18,8 +18,9 @@ using namespace srslte; namespace srsue { ue_stack_nr::ue_stack_nr(srslte::logger* logger_) : - logger(logger_), thread("STACK"), task_sched(64, 2, 64), rlc_log("RLC"), pdcp_log("PDCP") + logger(logger_), thread("STACK"), task_sched(64, 64), rlc_log("RLC"), pdcp_log("PDCP") { + get_background_workers().set_nof_workers(2); mac.reset(new mac_nr(&task_sched)); pdcp.reset(new srslte::pdcp(&task_sched, "PDCP")); rlc.reset(new srslte::rlc("RLC")); @@ -102,6 +103,8 @@ void ue_stack_nr::stop_impl() rlc->stop(); pdcp->stop(); mac->stop(); + + get_background_workers().stop(); } bool ue_stack_nr::switch_on() diff --git a/srsue/test/upper/ue_rrc_nr_test.cc b/srsue/test/upper/ue_rrc_nr_test.cc index 4c035f766..40bbabfce 100644 --- a/srsue/test/upper/ue_rrc_nr_test.cc +++ b/srsue/test/upper/ue_rrc_nr_test.cc @@ -17,11 +17,10 @@ using namespace srsue; int rrc_nr_cap_request_test() { - srslte::log_ref rrc_log("RRC"); rrc_log->set_level(srslte::LOG_LEVEL_DEBUG); rrc_log->set_hex_limit(-1); - srslte::task_scheduler task_sched{512, 0, 100}; + srslte::task_scheduler task_sched{512, 100}; srslte::task_sched_handle task_sched_handle(&task_sched); rrc_nr rrc_nr(task_sched_handle); srslte::byte_buffer_t caps;