From 4f5e65781fffb4ec815ebc62a87b21bdae12353c Mon Sep 17 00:00:00 2001 From: Francisco Paisana Date: Wed, 8 Jul 2020 18:39:23 +0100 Subject: [PATCH] created task scheduler class to deal with timers, thread pool, multiqueue, internal tasks --- lib/include/srslte/common/interfaces_common.h | 1 + lib/include/srslte/common/multiqueue.h | 2 + lib/include/srslte/common/task_scheduler.h | 161 ++++++++++++++++++ lib/include/srslte/common/thread_pool.h | 1 + lib/include/srslte/test/ue_test_interfaces.h | 40 ++--- lib/test/common/CMakeLists.txt | 4 + lib/test/common/task_scheduler_test.cc | 85 +++++++++ lib/test/upper/pdcp_lte_test_rx.cc | 10 +- lib/test/upper/pdcp_nr_test_discard_sdu.cc | 12 +- lib/test/upper/pdcp_nr_test_rx.cc | 10 +- srsenb/hdr/stack/enb_stack_lte.h | 1 + srsenb/hdr/stack/gnb_stack_nr.h | 12 +- srsenb/src/stack/enb_stack_lte.cc | 5 + srsue/hdr/stack/ue_stack_lte.h | 30 ++-- srsue/hdr/stack/ue_stack_nr.h | 12 +- srsue/src/stack/ue_stack_lte.cc | 64 +++---- srsue/src/stack/ue_stack_nr.cc | 4 +- srsue/test/mac_test.cc | 17 +- srsue/test/ttcn3/src/ttcn3_syssim.cc | 2 +- srsue/test/upper/nas_test.cc | 4 +- srsue/test/upper/rrc_meas_test.cc | 4 +- 21 files changed, 358 insertions(+), 123 deletions(-) create mode 100644 lib/include/srslte/common/task_scheduler.h create mode 100644 lib/test/common/task_scheduler_test.cc diff --git a/lib/include/srslte/common/interfaces_common.h b/lib/include/srslte/common/interfaces_common.h index fbd4e970e..f8b917fda 100644 --- a/lib/include/srslte/common/interfaces_common.h +++ b/lib/include/srslte/common/interfaces_common.h @@ -93,6 +93,7 @@ class task_handler_interface public: virtual srslte::timer_handler::unique_timer get_unique_timer() = 0; virtual srslte::task_multiqueue::queue_handler make_task_queue() = 0; + virtual srslte::task_multiqueue::queue_handler make_task_queue(uint32_t queue_size) = 0; virtual void defer_callback(uint32_t duration_ms, std::function func) = 0; virtual void defer_task(srslte::move_task_t func) = 0; virtual void enqueue_background_task(std::function task) = 0; diff --git a/lib/include/srslte/common/multiqueue.h b/lib/include/srslte/common/multiqueue.h index aca551341..a69390007 100644 --- a/lib/include/srslte/common/multiqueue.h +++ b/lib/include/srslte/common/multiqueue.h @@ -102,6 +102,7 @@ public: } bool try_push(const myobj& value) { return parent->try_push(queue_id, value); } std::pair try_push(myobj&& value) { return parent->try_push(queue_id, std::move(value)); } + size_t size() { return parent->size(queue_id); } private: multiqueue_handler* parent = nullptr; @@ -293,6 +294,7 @@ public: } queue_handler get_queue_handler() { return {this, add_queue()}; } + queue_handler get_queue_handler(uint32_t size) { return {this, add_queue(size)}; } private: bool is_queue_active_(int qidx) const { return running and queues[qidx].active; } diff --git a/lib/include/srslte/common/task_scheduler.h b/lib/include/srslte/common/task_scheduler.h new file mode 100644 index 000000000..fd1bd8627 --- /dev/null +++ b/lib/include/srslte/common/task_scheduler.h @@ -0,0 +1,161 @@ +/* + * Copyright 2013-2020 Software Radio Systems Limited + * + * This file is part of srsLTE. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#ifndef SRSLTE_TASK_SCHEDULER_H +#define SRSLTE_TASK_SCHEDULER_H + +#include "interfaces_common.h" +#include "multiqueue.h" +#include "thread_pool.h" + +namespace srslte { + +class task_scheduler : public srslte::task_handler_interface +{ +public: + explicit task_scheduler(uint32_t default_extern_tasks_size = 512, + uint32_t nof_background_threads = 0, + uint32_t nof_timers_prealloc = 100) : + external_tasks{default_extern_tasks_size}, + timers{nof_timers_prealloc}, + background_tasks{nof_background_threads} + { + background_queue_id = external_tasks.add_queue(); + + // Start background thread + if (background_tasks.nof_workers() > 0) { + background_tasks.start(); + } + } + + srslte::timer_handler::unique_timer get_unique_timer() final { return timers.get_unique_timer(); } + + //! Creates new queue for tasks coming from external thread + srslte::task_multiqueue::queue_handler make_task_queue() final { return external_tasks.get_queue_handler(); } + srslte::task_multiqueue::queue_handler make_task_queue(uint32_t size) final + { + return external_tasks.get_queue_handler(size); + } + + //! Delays a task processing by duration_ms + void defer_callback(uint32_t duration_ms, std::function func) final + { + timers.defer_callback(duration_ms, func); + } + + //! Enqueues internal task to be run in next tic + void defer_task(srslte::move_task_t func) final { internal_tasks.push_back(std::move(func)); } + + //! Delegates a task to a thread pool that runs in the background + void enqueue_background_task(std::function f) final + { + if (background_tasks.nof_workers() > 0) { + background_tasks.push_task(std::move(f)); + } else { + external_tasks.push(background_queue_id, + std::bind([](const std::function& task) { task(-1); }, std::move(f))); + } + } + + //! Defer the handling of the result of a background task to next tic + void notify_background_task_result(srslte::move_task_t task) final + { + // run the notification in next tic + external_tasks.push(background_queue_id, std::move(task)); + } + + //! Updates timers, and run any pending internal tasks. + // CAUTION: Should be called in main thread + void tic() + { + timers.step_all(); + run_all_internal_tasks(); + } + + //! Processes the next task in the multiqueue. + // CAUTION: This is a blocking call + bool run_next_external_task() + { + srslte::move_task_t task{}; + if (external_tasks.wait_pop(&task) >= 0) { + task(); + return true; + } + return false; + } + + //! Processes the next task in the multiqueue if it exists. + bool try_run_next_external_task() + { + srslte::move_task_t task{}; + if (external_tasks.try_pop(&task) >= 0) { + task(); + return true; + } + return false; + } + + srslte::timer_handler* get_timer_handler() { return &timers; } + +private: + void run_all_internal_tasks() + { + // Perform pending stack deferred tasks + // Note: Keep it indexed-based, bc a task may enqueue another task, which may cause vector reallocation, + // and iterator invalidation + for (size_t i = 0; i < internal_tasks.size(); ++i) { + internal_tasks[i](); + } + internal_tasks.clear(); + } + + int background_queue_id = -1; ///< Queue for handling the outcomes of tasks run in the background + srslte::task_multiqueue external_tasks; + srslte::timer_handler timers; + srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks + std::vector internal_tasks; ///< enqueues stack tasks from within main thread. Avoids locking +}; + +//! Handle to provide to classes/functions running within main thread +class task_sched_handle +{ +public: + task_sched_handle(task_scheduler* sched_) : sched(sched_) {} + + srslte::timer_handler::unique_timer get_unique_timer() { return sched->get_unique_timer(); } + void enqueue_background_task(std::function f) { sched->enqueue_background_task(std::move(f)); } + void notify_background_task_result(srslte::move_task_t task) + { + sched->notify_background_task_result(std::move(task)); + } + void defer_callback(uint32_t duration_ms, std::function func) + { + sched->defer_callback(duration_ms, std::move(func)); + } + void defer_task(srslte::move_task_t func) { sched->defer_task(std::move(func)); } + +private: + task_scheduler* sched; +}; + +} // namespace srslte + +#endif // SRSLTE_TASK_SCHEDULER_H diff --git a/lib/include/srslte/common/thread_pool.h b/lib/include/srslte/common/thread_pool.h index 0000d8ade..2d08c1149 100644 --- a/lib/include/srslte/common/thread_pool.h +++ b/lib/include/srslte/common/thread_pool.h @@ -106,6 +106,7 @@ public: void push_task(const task_t& task); void push_task(task_t&& task); uint32_t nof_pending_tasks(); + size_t nof_workers() const { return workers.size(); } private: class worker_t : public thread diff --git a/lib/include/srslte/test/ue_test_interfaces.h b/lib/include/srslte/test/ue_test_interfaces.h index d049897d8..2d486c367 100644 --- a/lib/include/srslte/test/ue_test_interfaces.h +++ b/lib/include/srslte/test/ue_test_interfaces.h @@ -22,6 +22,7 @@ #ifndef SRSUE_DUMMY_CLASSES_H #define SRSUE_DUMMY_CLASSES_H +#include "srslte/common/task_scheduler.h" #include "srslte/interfaces/ue_interfaces.h" namespace srsue { @@ -29,44 +30,37 @@ namespace srsue { class stack_test_dummy : public stack_interface_rrc { public: - stack_test_dummy() { stack_queue_id = pending_tasks.add_queue(); } + stack_test_dummy() {} - srslte::timer_handler::unique_timer get_unique_timer() override { return timers.get_unique_timer(); } + srslte::timer_handler::unique_timer get_unique_timer() override { return task_sched.get_unique_timer(); } void start_cell_search() override {} void start_cell_select(const phy_interface_rrc_lte::phy_cell_t* cell) override {} - srslte::tti_point get_current_tti() override { return srslte::tti_point{timers.get_cur_time() % 10240}; } - srslte::task_multiqueue::queue_handler make_task_queue() final { return pending_tasks.get_queue_handler(); } + srslte::tti_point get_current_tti() override + { + return srslte::tti_point{task_sched.get_timer_handler()->get_cur_time() % 10240}; + } + srslte::task_multiqueue::queue_handler make_task_queue() final { return task_sched.make_task_queue(); } + srslte::task_multiqueue::queue_handler make_task_queue(uint32_t len) final { return task_sched.make_task_queue(len); } void enqueue_background_task(std::function f) override { f(0); } void notify_background_task_result(srslte::move_task_t task) override { task(); } void defer_callback(uint32_t duration_ms, std::function func) final { - timers.defer_callback(duration_ms, func); + task_sched.defer_callback(duration_ms, std::move(func)); } - void defer_task(srslte::move_task_t task) final { pending_tasks.push(stack_queue_id, std::move(task)); } + void defer_task(srslte::move_task_t task) final { task_sched.defer_task(std::move(task)); } // Testing utility functions - void call_on_every_tti(srslte::move_task_t t) { tti_callbacks.push_back(std::move(t)); } - void process_tasks() - { - // Make sure to process any stack pending tasks - srslte::move_task_t task; - while (pending_tasks.try_pop(&task) >= 0) { - task(); - } - } void run_tti() { - process_tasks(); - for (auto& t : tti_callbacks) { - t(); + // update clock and run internal tasks + task_sched.tic(); + + // Runs all pending external tasks + while (task_sched.try_run_next_external_task()) { } - timers.step_all(); } - srslte::timer_handler timers{100}; - srslte::task_multiqueue pending_tasks; - std::vector tti_callbacks; - int stack_queue_id = -1; + srslte::task_scheduler task_sched{512, 0, 100}; }; class rlc_dummy_interface : public rlc_interface_mac diff --git a/lib/test/common/CMakeLists.txt b/lib/test/common/CMakeLists.txt index d44412b4c..a9413dbb0 100644 --- a/lib/test/common/CMakeLists.txt +++ b/lib/test/common/CMakeLists.txt @@ -98,6 +98,10 @@ add_executable(expected_test expected_test.cc) target_link_libraries(expected_test srslte_common) add_test(expected_test expected_test) +add_executable(task_scheduler_test task_scheduler_test.cc) +target_link_libraries(task_scheduler_test srslte_common) +add_test(task_scheduler_test task_scheduler_test) + if(ENABLE_5GNR) add_executable(pnf_dummy pnf_dummy.cc) target_link_libraries(pnf_dummy srslte_common ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES}) diff --git a/lib/test/common/task_scheduler_test.cc b/lib/test/common/task_scheduler_test.cc new file mode 100644 index 000000000..8446b9dbb --- /dev/null +++ b/lib/test/common/task_scheduler_test.cc @@ -0,0 +1,85 @@ +/* + * Copyright 2013-2020 Software Radio Systems Limited + * + * This file is part of srsLTE. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#include "srslte/common/task_scheduler.h" +#include "srslte/common/test_common.h" + +enum class task_result { null, internal, external, timer }; + +int test_task_scheduler_no_pool() +{ + srslte::task_scheduler task_sched{5, 0}; + task_result state = task_result::null; + + // TEST: deferring task does not run the task until the next tic + task_sched.defer_task([&state]() { state = task_result::internal; }); + TESTASSERT(state == task_result::null); + task_sched.tic(); + TESTASSERT(state == task_result::internal); + + // TEST: check delaying of task + state = task_result::null; + int dur = 5; + task_sched.defer_callback(dur, [&state]() { state = task_result::timer; }); + for (int i = 0; i < dur; ++i) { + TESTASSERT(state == task_result::null); + task_sched.tic(); + } + TESTASSERT(state == task_result::timer); + + // TEST: background task is run, despite there are no pool workers + state = task_result::null; + task_sched.enqueue_background_task([&task_sched, &state](uint32_t worker_id) { + task_sched.notify_background_task_result([&state]() { state = task_result::external; }); + }); + TESTASSERT(state == task_result::null); + task_sched.tic(); + TESTASSERT(state == task_result::null); + task_sched.run_next_external_task(); // runs background task + TESTASSERT(state == task_result::null); + task_sched.run_next_external_task(); // runs notification + TESTASSERT(state == task_result::external); + + return SRSLTE_SUCCESS; +} + +int test_task_scheduler_with_pool() +{ + srslte::task_scheduler task_sched{5, 2}; + task_result state = task_result::null; + + task_sched.enqueue_background_task([&task_sched, &state](uint32_t worker_id) { + task_sched.notify_background_task_result([&state]() { state = task_result::external; }); + }); + TESTASSERT(state == task_result::null); + task_sched.tic(); + TESTASSERT(state == task_result::null); + task_sched.run_next_external_task(); // waits and runs notification + TESTASSERT(state == task_result::external); + + return SRSLTE_SUCCESS; +} + +int main() +{ + TESTASSERT(test_task_scheduler_no_pool() == SRSLTE_SUCCESS); + TESTASSERT(test_task_scheduler_with_pool() == SRSLTE_SUCCESS); +} diff --git a/lib/test/upper/pdcp_lte_test_rx.cc b/lib/test/upper/pdcp_lte_test_rx.cc index b70916a91..1dec8fbb3 100644 --- a/lib/test/upper/pdcp_lte_test_rx.cc +++ b/lib/test/upper/pdcp_lte_test_rx.cc @@ -43,10 +43,10 @@ int test_rx(std::vector events, srslte::pdcp_discard_timer_t::infinity}; pdcp_lte_test_helper pdcp_hlp_rx(cfg_rx, sec_cfg, log); - srslte::pdcp_entity_lte* pdcp_rx = &pdcp_hlp_rx.pdcp; - gw_dummy* gw_rx = &pdcp_hlp_rx.gw; - rrc_dummy* rrc_rx = &pdcp_hlp_rx.rrc; - srslte::timer_handler* timers_rx = &pdcp_hlp_rx.stack.timers; + srslte::pdcp_entity_lte* pdcp_rx = &pdcp_hlp_rx.pdcp; + gw_dummy* gw_rx = &pdcp_hlp_rx.gw; + rrc_dummy* rrc_rx = &pdcp_hlp_rx.rrc; + srsue::stack_test_dummy* stack = &pdcp_hlp_rx.stack; pdcp_hlp_rx.set_pdcp_initial_state(init_state); // Generate test message and encript/decript SDU. @@ -55,7 +55,7 @@ int test_rx(std::vector events, // Decript and integrity check the PDU pdcp_rx->write_pdu(std::move(event.pkt)); for (uint32_t i = 0; i < event.ticks; ++i) { - timers_rx->step_all(); + stack->run_tti(); } } diff --git a/lib/test/upper/pdcp_nr_test_discard_sdu.cc b/lib/test/upper/pdcp_nr_test_discard_sdu.cc index a304949ff..1318581d4 100644 --- a/lib/test/upper/pdcp_nr_test_discard_sdu.cc +++ b/lib/test/upper/pdcp_nr_test_discard_sdu.cc @@ -38,10 +38,10 @@ int test_tx_sdu_discard(const pdcp_initial_state& init_state, srslte::pdcp_t_reordering_t::ms500, discard_timeout}; - pdcp_nr_test_helper pdcp_hlp(cfg, sec_cfg, log); - srslte::pdcp_entity_nr* pdcp = &pdcp_hlp.pdcp; - rlc_dummy* rlc = &pdcp_hlp.rlc; - srslte::timer_handler* timers = &pdcp_hlp.stack.timers; + pdcp_nr_test_helper pdcp_hlp(cfg, sec_cfg, log); + srslte::pdcp_entity_nr* pdcp = &pdcp_hlp.pdcp; + rlc_dummy* rlc = &pdcp_hlp.rlc; + srsue::stack_test_dummy* stack = &pdcp_hlp.stack; pdcp_hlp.set_pdcp_initial_state(init_state); @@ -51,7 +51,7 @@ int test_tx_sdu_discard(const pdcp_initial_state& init_state, pdcp->write_sdu(std::move(sdu), true); for (uint32_t i = 0; i < static_cast(cfg.discard_timer) - 1; ++i) { - timers->step_all(); + stack->run_tti(); } TESTASSERT(rlc->discard_count == 0); @@ -63,7 +63,7 @@ int test_tx_sdu_discard(const pdcp_initial_state& init_state, } // Last timer step - timers->step_all(); + stack->run_tti(); // Check if RLC was notified of SDU discard if (imediate_notify) { diff --git a/lib/test/upper/pdcp_nr_test_rx.cc b/lib/test/upper/pdcp_nr_test_rx.cc index 63db4f540..04b8ce8fd 100644 --- a/lib/test/upper/pdcp_nr_test_rx.cc +++ b/lib/test/upper/pdcp_nr_test_rx.cc @@ -42,10 +42,10 @@ int test_rx(std::vector events, srslte::pdcp_t_reordering_t::ms500, srslte::pdcp_discard_timer_t::infinity}; - pdcp_nr_test_helper pdcp_hlp_rx(cfg_rx, sec_cfg, log); - srslte::pdcp_entity_nr* pdcp_rx = &pdcp_hlp_rx.pdcp; - gw_dummy* gw_rx = &pdcp_hlp_rx.gw; - srslte::timer_handler* timers_rx = &pdcp_hlp_rx.stack.timers; + pdcp_nr_test_helper pdcp_hlp_rx(cfg_rx, sec_cfg, log); + srslte::pdcp_entity_nr* pdcp_rx = &pdcp_hlp_rx.pdcp; + gw_dummy* gw_rx = &pdcp_hlp_rx.gw; + srsue::stack_test_dummy* stack = &pdcp_hlp_rx.stack; pdcp_hlp_rx.set_pdcp_initial_state(init_state); // Generate test message and encript/decript SDU. @@ -54,7 +54,7 @@ int test_rx(std::vector events, // Decript and integrity check the PDU pdcp_rx->write_pdu(std::move(event.pkt)); for (uint32_t i = 0; i < event.ticks; ++i) { - timers_rx->step_all(); + stack->run_tti(); } } diff --git a/srsenb/hdr/stack/enb_stack_lte.h b/srsenb/hdr/stack/enb_stack_lte.h index ce97bf1d2..060ae4e38 100644 --- a/srsenb/hdr/stack/enb_stack_lte.h +++ b/srsenb/hdr/stack/enb_stack_lte.h @@ -112,6 +112,7 @@ public: /* Stack-MAC interface */ srslte::timer_handler::unique_timer get_unique_timer() final; srslte::task_multiqueue::queue_handler make_task_queue() final; + srslte::task_multiqueue::queue_handler make_task_queue(uint32_t qsize) final; void defer_callback(uint32_t duration_ms, std::function func) final; void enqueue_background_task(std::function task) final; void notify_background_task_result(srslte::move_task_t task) final; diff --git a/srsenb/hdr/stack/gnb_stack_nr.h b/srsenb/hdr/stack/gnb_stack_nr.h index b920b544f..34ba19f66 100644 --- a/srsenb/hdr/stack/gnb_stack_nr.h +++ b/srsenb/hdr/stack/gnb_stack_nr.h @@ -82,10 +82,14 @@ public: // Task Handling interface srslte::timer_handler::unique_timer get_unique_timer() final { return timers.get_unique_timer(); } srslte::task_multiqueue::queue_handler make_task_queue() final { return pending_tasks.get_queue_handler(); } - void enqueue_background_task(std::function f) final; - void notify_background_task_result(srslte::move_task_t task) final; - void defer_callback(uint32_t duration_ms, std::function func) final; - void defer_task(srslte::move_task_t task) final; + srslte::task_multiqueue::queue_handler make_task_queue(uint32_t qsize) final + { + return pending_tasks.get_queue_handler(qsize); + } + void enqueue_background_task(std::function f) final; + void notify_background_task_result(srslte::move_task_t task) final; + void defer_callback(uint32_t duration_ms, std::function func) final; + void defer_task(srslte::move_task_t task) final; private: void run_thread() final; diff --git a/srsenb/src/stack/enb_stack_lte.cc b/srsenb/src/stack/enb_stack_lte.cc index dc0c8dcd4..f584704a2 100644 --- a/srsenb/src/stack/enb_stack_lte.cc +++ b/srsenb/src/stack/enb_stack_lte.cc @@ -266,6 +266,11 @@ srslte::task_multiqueue::queue_handler enb_stack_lte::make_task_queue() return pending_tasks.get_queue_handler(); } +srslte::task_multiqueue::queue_handler enb_stack_lte::make_task_queue(uint32_t qsize) +{ + return pending_tasks.get_queue_handler(qsize); +} + void enb_stack_lte::defer_callback(uint32_t duration_ms, std::function func) { timers.defer_callback(duration_ms, func); diff --git a/srsue/hdr/stack/ue_stack_lte.h b/srsue/hdr/stack/ue_stack_lte.h index b91fb995c..c9a760a40 100644 --- a/srsue/hdr/stack/ue_stack_lte.h +++ b/srsue/hdr/stack/ue_stack_lte.h @@ -42,6 +42,7 @@ #include "srslte/common/buffer_pool.h" #include "srslte/common/log_filter.h" #include "srslte/common/multiqueue.h" +#include "srslte/common/task_scheduler.h" #include "srslte/common/thread_pool.h" #include "srslte/interfaces/ue_interfaces.h" @@ -125,12 +126,16 @@ public: tti_point get_current_tti() final { return current_tti; } // Task Handling interface - srslte::timer_handler::unique_timer get_unique_timer() final { return timers.get_unique_timer(); } - srslte::task_multiqueue::queue_handler make_task_queue() final { return pending_tasks.get_queue_handler(); } - void enqueue_background_task(std::function f) final; - void notify_background_task_result(srslte::move_task_t task) final; - void defer_callback(uint32_t duration_ms, std::function func) final; - void defer_task(srslte::move_task_t task) final; + srslte::timer_handler::unique_timer get_unique_timer() final { return task_sched.get_unique_timer(); } + srslte::task_multiqueue::queue_handler make_task_queue() final { return task_sched.make_task_queue(); } + srslte::task_multiqueue::queue_handler make_task_queue(uint32_t queue_size) final + { + return task_sched.make_task_queue(queue_size); + } + void enqueue_background_task(std::function f) final; + void notify_background_task_result(srslte::move_task_t task) final; + void defer_callback(uint32_t duration_ms, std::function func) final; + void defer_task(srslte::move_task_t task) final; private: void run_thread() final; @@ -146,9 +151,6 @@ private: srslte::tti_point current_tti; - // timers - srslte::timer_handler timers; - // UE stack logging srslte::logger* logger = nullptr; srslte::log_ref stack_log{"STCK"}; ///< our own log filter @@ -165,12 +167,10 @@ private: gw_interface_stack* gw = nullptr; // Thread - static const int STACK_MAIN_THREAD_PRIO = 4; // Next lower priority after PHY workers - srslte::task_multiqueue pending_tasks; - int sync_queue_id = -1, ue_queue_id = -1, gw_queue_id = -1, stack_queue_id = -1, background_queue_id = -1; - srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks - std::vector deferred_stack_tasks; ///< enqueues stack tasks from within. Avoids locking - srslte::block_queue pending_stack_metrics; + static const int STACK_MAIN_THREAD_PRIO = 4; // Next lower priority after PHY workers + srslte::block_queue pending_stack_metrics; + task_scheduler task_sched; + srslte::task_multiqueue::queue_handler sync_task_queue, ue_task_queue, gw_queue_id; // TTI stats srslte::tprof tti_tprof; diff --git a/srsue/hdr/stack/ue_stack_nr.h b/srsue/hdr/stack/ue_stack_nr.h index aa9507eaf..e1a871591 100644 --- a/srsue/hdr/stack/ue_stack_nr.h +++ b/srsue/hdr/stack/ue_stack_nr.h @@ -99,10 +99,14 @@ public: // Task Handling interface srslte::timer_handler::unique_timer get_unique_timer() final { return timers.get_unique_timer(); } srslte::task_multiqueue::queue_handler make_task_queue() final { return pending_tasks.get_queue_handler(); } - void enqueue_background_task(std::function f) final; - void notify_background_task_result(srslte::move_task_t task) final; - void defer_callback(uint32_t duration_ms, std::function func) final; - void defer_task(srslte::move_task_t task) final; + srslte::task_multiqueue::queue_handler make_task_queue(uint32_t qsize) final + { + return pending_tasks.get_queue_handler(qsize); + } + void enqueue_background_task(std::function f) final; + void notify_background_task_result(srslte::move_task_t task) final; + void defer_callback(uint32_t duration_ms, std::function func) final; + void defer_task(srslte::move_task_t task) final; private: void run_thread() final; diff --git a/srsue/src/stack/ue_stack_lte.cc b/srsue/src/stack/ue_stack_lte.cc index b18ffce83..2cb937256 100644 --- a/srsue/src/stack/ue_stack_lte.cc +++ b/srsue/src/stack/ue_stack_lte.cc @@ -32,7 +32,6 @@ using namespace srslte; namespace srsue { ue_stack_lte::ue_stack_lte() : - timers(64), running(false), args(), logger(nullptr), @@ -44,17 +43,12 @@ ue_stack_lte::ue_stack_lte() : pdcp(this, "PDCP"), nas(this), thread("STACK"), - pending_tasks(512), - background_tasks(2), + task_sched(512, 2, 64), tti_tprof("tti_tprof", "STCK", TTI_STAT_PERIOD) { - ue_queue_id = pending_tasks.add_queue(); - gw_queue_id = pending_tasks.add_queue(); - stack_queue_id = pending_tasks.add_queue(); - background_queue_id = pending_tasks.add_queue(); + ue_task_queue = task_sched.make_task_queue(); + gw_queue_id = task_sched.make_task_queue(); // sync_queue is added in init() - - background_tasks.start(); } ue_stack_lte::~ue_stack_lte() @@ -126,10 +120,10 @@ int ue_stack_lte::init(const stack_args_t& args_, srslte::logger* logger_) } // add sync queue - sync_queue_id = pending_tasks.add_queue(args.sync_queue_size); + sync_task_queue = task_sched.make_task_queue(args.sync_queue_size); mac.init(phy, &rlc, &rrc, this); - rlc.init(&pdcp, &rrc, &timers, 0 /* RB_ID_SRB0 */); + rlc.init(&pdcp, &rrc, task_sched.get_timer_handler(), 0 /* RB_ID_SRB0 */); pdcp.init(&rlc, &rrc, gw); nas.init(usim.get(), &rrc, gw, args.nas); rrc.init(phy, &mac, &rlc, &pdcp, &nas, usim.get(), gw, args.rrc); @@ -143,7 +137,7 @@ int ue_stack_lte::init(const stack_args_t& args_, srslte::logger* logger_) void ue_stack_lte::stop() { if (running) { - pending_tasks.try_push(ue_queue_id, [this]() { stop_impl(); }); + ue_task_queue.try_push([this]() { stop_impl(); }); wait_thread_finish(); } } @@ -171,8 +165,7 @@ void ue_stack_lte::stop_impl() bool ue_stack_lte::switch_on() { if (running) { - pending_tasks.try_push(ue_queue_id, - [this]() { nas.start_attach_proc(nullptr, srslte::establishment_cause_t::mo_sig); }); + ue_task_queue.try_push([this]() { nas.start_attach_proc(nullptr, srslte::establishment_cause_t::mo_sig); }); return true; } return false; @@ -214,7 +207,7 @@ bool ue_stack_lte::disable_data() bool ue_stack_lte::get_metrics(stack_metrics_t* metrics) { // use stack thread to query metrics - pending_tasks.try_push(ue_queue_id, [this]() { + ue_task_queue.try_push([this]() { stack_metrics_t metrics{}; mac.get_metrics(metrics.mac); rlc.get_metrics(metrics.rlc); @@ -230,10 +223,7 @@ bool ue_stack_lte::get_metrics(stack_metrics_t* metrics) void ue_stack_lte::run_thread() { while (running) { - srslte::move_task_t task{}; - if (pending_tasks.wait_pop(&task) >= 0) { - task(); - } + task_sched.run_next_external_task(); } } @@ -256,7 +246,7 @@ void ue_stack_lte::write_sdu(uint32_t lcid, srslte::unique_byte_buffer_t sdu, bo auto task = [this, lcid, blocking](srslte::unique_byte_buffer_t& sdu) { pdcp.write_sdu(lcid, std::move(sdu), blocking); }; - bool ret = pending_tasks.try_push(gw_queue_id, std::bind(task, std::move(sdu))).first; + bool ret = gw_queue_id.try_push(std::bind(task, std::move(sdu))).first; if (not ret) { pdcp_log->warning("GW SDU with lcid=%d was discarded.\n", lcid); } @@ -271,17 +261,17 @@ void ue_stack_lte::write_sdu(uint32_t lcid, srslte::unique_byte_buffer_t sdu, bo */ void ue_stack_lte::in_sync() { - pending_tasks.push(sync_queue_id, [this]() { rrc.in_sync(); }); + sync_task_queue.push([this]() { rrc.in_sync(); }); } void ue_stack_lte::out_of_sync() { - pending_tasks.push(sync_queue_id, [this]() { rrc.out_of_sync(); }); + sync_task_queue.push([this]() { rrc.out_of_sync(); }); } void ue_stack_lte::run_tti(uint32_t tti, uint32_t tti_jump) { - pending_tasks.push(sync_queue_id, [this, tti, tti_jump]() { run_tti_impl(tti, tti_jump); }); + sync_task_queue.push([this, tti, tti_jump]() { run_tti_impl(tti, tti_jump); }); } void ue_stack_lte::run_tti_impl(uint32_t tti, uint32_t tti_jump) @@ -291,17 +281,11 @@ void ue_stack_lte::run_tti_impl(uint32_t tti, uint32_t tti_jump) } current_tti = tti_point{tti}; - // Perform pending stack deferred tasks - for (auto& task : deferred_stack_tasks) { - task(); - } - deferred_stack_tasks.clear(); - // perform tasks for the received TTI range for (uint32_t i = 0; i < tti_jump; ++i) { uint32_t next_tti = TTI_SUB(tti, (tti_jump - i - 1)); mac.run_tti(next_tti); - timers.step_all(); + task_sched.tic(); } rrc.run_tti(); nas.run_tti(); @@ -316,8 +300,8 @@ void ue_stack_lte::run_tti_impl(uint32_t tti, uint32_t tti_jump) } // print warning if PHY pushes new TTI messages faster than we process them - if (pending_tasks.size(sync_queue_id) > SYNC_QUEUE_WARN_THRESHOLD) { - stack_log->warning("Detected slow task processing (sync_queue_len=%zd).\n", pending_tasks.size(sync_queue_id)); + if (sync_task_queue.size() > SYNC_QUEUE_WARN_THRESHOLD) { + stack_log->warning("Detected slow task processing (sync_queue_len=%zd).\n", sync_task_queue.size()); } } @@ -327,23 +311,23 @@ void ue_stack_lte::run_tti_impl(uint32_t tti, uint32_t tti_jump) void ue_stack_lte::enqueue_background_task(std::function f) { - background_tasks.push_task(std::move(f)); + task_sched.enqueue_background_task(std::move(f)); } void ue_stack_lte::notify_background_task_result(srslte::move_task_t task) { // run the notification in the stack thread - pending_tasks.push(background_queue_id, std::move(task)); + task_sched.notify_background_task_result(std::move(task)); } void ue_stack_lte::defer_callback(uint32_t duration_ms, std::function func) { - timers.defer_callback(duration_ms, func); + task_sched.defer_callback(duration_ms, std::move(func)); } void ue_stack_lte::defer_task(srslte::move_task_t task) { - deferred_stack_tasks.push_back(std::move(task)); + task_sched.defer_task(std::move(task)); } /******************** @@ -352,21 +336,21 @@ void ue_stack_lte::defer_task(srslte::move_task_t task) void ue_stack_lte::start_cell_search() { - background_tasks.push_task([this](uint32_t worker_id) { + task_sched.enqueue_background_task([this](uint32_t worker_id) { phy_interface_rrc_lte::phy_cell_t found_cell; phy_interface_rrc_lte::cell_search_ret_t ret = phy->cell_search(&found_cell); // notify back RRC - pending_tasks.push(background_queue_id, [this, found_cell, ret]() { rrc.cell_search_completed(ret, found_cell); }); + task_sched.notify_background_task_result([this, found_cell, ret]() { rrc.cell_search_completed(ret, found_cell); }); }); } void ue_stack_lte::start_cell_select(const phy_interface_rrc_lte::phy_cell_t* phy_cell) { phy_interface_rrc_lte::phy_cell_t cell_copy = *phy_cell; - background_tasks.push_task([this, cell_copy](uint32_t worker_id) { + task_sched.enqueue_background_task([this, cell_copy](uint32_t worker_id) { bool ret = phy->cell_select(&cell_copy); // notify back RRC - pending_tasks.push(background_queue_id, [this, ret]() { rrc.cell_select_completed(ret); }); + task_sched.notify_background_task_result([this, ret]() { rrc.cell_select_completed(ret); }); }); } diff --git a/srsue/src/stack/ue_stack_nr.cc b/srsue/src/stack/ue_stack_nr.cc index 422ea7e95..332a312f2 100644 --- a/srsue/src/stack/ue_stack_nr.cc +++ b/srsue/src/stack/ue_stack_nr.cc @@ -199,12 +199,12 @@ void ue_stack_nr::write_sdu(uint32_t lcid, srslte::unique_byte_buffer_t sdu, boo */ void ue_stack_nr::in_sync() { - // pending_tasks.push(sync_queue_id, task_t{[this](task_t*) { rrc.in_sync(); }}); + // pending_tasks.push(sync_task_queue, task_t{[this](task_t*) { rrc.in_sync(); }}); } void ue_stack_nr::out_of_sync() { - // pending_tasks.push(sync_queue_id, task_t{[this](task_t*) { rrc.out_of_sync(); }}); + // pending_tasks.push(sync_task_queue, task_t{[this](task_t*) { rrc.out_of_sync(); }}); } void ue_stack_nr::run_tti(uint32_t tti) diff --git a/srsue/test/mac_test.cc b/srsue/test/mac_test.cc index b86c01e58..fa97cdd4e 100644 --- a/srsue/test/mac_test.cc +++ b/srsue/test/mac_test.cc @@ -342,26 +342,11 @@ public: mac_h = mac_; phy_h = phy_; } - bool events_exist() - { - for (int i = 0; i < pending_tasks.nof_queues(); ++i) { - if (not pending_tasks.empty(i)) { - return true; - } - } - return false; - } void run_tti(uint32_t tti) { mac_h->run_tti(tti); // flush all events - if (events_exist()) { - srslte::move_task_t task{}; - if (pending_tasks.wait_pop(&task) >= 0) { - task(); - } - } - timers.step_all(); + stack_test_dummy::run_tti(); } private: diff --git a/srsue/test/ttcn3/src/ttcn3_syssim.cc b/srsue/test/ttcn3/src/ttcn3_syssim.cc index 77e99fc9b..b365857dd 100644 --- a/srsue/test/ttcn3/src/ttcn3_syssim.cc +++ b/srsue/test/ttcn3/src/ttcn3_syssim.cc @@ -123,7 +123,7 @@ int ttcn3_syssim::init(const all_args_t& args_) // Init SS layers pdus.init(this, log); - rlc.init(&pdcp, this, &stack.timers, 0 /* RB_ID_SRB0 */); + rlc.init(&pdcp, this, stack.task_sched.get_timer_handler(), 0 /* RB_ID_SRB0 */); pdcp.init(&rlc, this, this); return SRSLTE_SUCCESS; diff --git a/srsue/test/upper/nas_test.cc b/srsue/test/upper/nas_test.cc index 803fbec39..6ab748678 100644 --- a/srsue/test/upper/nas_test.cc +++ b/srsue/test/upper/nas_test.cc @@ -168,7 +168,9 @@ public: { running = true; while (running) { - timers.step_all(); + task_sched.tic(); + while (task_sched.try_run_next_external_task()) { + } nas->run_tti(); } } diff --git a/srsue/test/upper/rrc_meas_test.cc b/srsue/test/upper/rrc_meas_test.cc index 27d8c25aa..d0519265e 100644 --- a/srsue/test/upper/rrc_meas_test.cc +++ b/srsue/test/upper/rrc_meas_test.cc @@ -187,7 +187,9 @@ public: void run_tti(uint32_t tti_) { - stack->timers.step_all(); + stack->task_sched.tic(); + while (stack->task_sched.try_run_next_external_task()) { + } rrc::run_tti(); }