From 647c624423444e7f64c33827895692edd4e9dafa Mon Sep 17 00:00:00 2001 From: Francisco Paisana Date: Fri, 13 Sep 2019 11:33:43 +0100 Subject: [PATCH] created a multiqueue handler, and started using it for the ue stack --- lib/include/srslte/common/multiqueue.h | 187 +++++++++++++++++++++++++ lib/test/common/CMakeLists.txt | 4 + lib/test/common/queue_test.cc | 80 +++++++++++ srsue/hdr/stack/ue_stack_lte.h | 17 ++- srsue/src/stack/ue_stack_lte.cc | 19 +-- 5 files changed, 296 insertions(+), 11 deletions(-) create mode 100644 lib/include/srslte/common/multiqueue.h create mode 100644 lib/test/common/queue_test.cc diff --git a/lib/include/srslte/common/multiqueue.h b/lib/include/srslte/common/multiqueue.h new file mode 100644 index 000000000..b127ab67e --- /dev/null +++ b/lib/include/srslte/common/multiqueue.h @@ -0,0 +1,187 @@ +/* + * Copyright 2013-2019 Software Radio Systems Limited + * + * This file is part of srsLTE. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +/****************************************************************************** + * File: multiqueue.h + * Description: General-purpose non-blocking multiqueue. It behaves as a list + * of bounded/unbounded queues. + *****************************************************************************/ + +#ifndef SRSLTE_MULTIQUEUE_H +#define SRSLTE_MULTIQUEUE_H + +#include +#include +#include +#include +#include + +namespace srslte { + +template +class multiqueue_handler +{ + // NOTE: needed to create a queue wrapper to make its move ctor noexcept. + // otherwise we couldnt use the resize method of std::vector> if myobj is move-only + class queue_wrapper : private std::queue + { + public: + queue_wrapper() = default; + queue_wrapper(queue_wrapper&& other) noexcept : std::queue(std::move(other)) {} + using std::queue::push; + using std::queue::pop; + using std::queue::size; + using std::queue::empty; + using std::queue::front; + }; + +public: + explicit multiqueue_handler(uint32_t capacity_ = std::numeric_limits::max()) : capacity(capacity_) {} + ~multiqueue_handler() + { + std::lock_guard lck(mutex); + queues_active.clear(); + queues.clear(); + running = false; + } + + int add_queue() + { + uint32_t qidx = 0; + for (; qidx < queues_active.size() and queues_active[qidx]; ++qidx) + ; + if (qidx == queues_active.size()) { + // create new queue + std::lock_guard lck(mutex); + queues_active.push_back(true); + queues.emplace_back(); + } else { + queues_active[qidx] = true; + } + return (int)qidx; + } + + int nof_queues() + { + std::lock_guard lck(mutex); + return std::count(queues_active.begin(), queues_active.end(), true); + } + + bool try_push(int q_idx, const myobj& value) + { + if (not running) { + return false; + } + { + std::lock_guard lck(mutex); + if (queues[q_idx].size() >= capacity) { + return false; + } + queues[q_idx].push(value); + } + cv.notify_one(); + return true; + } + + std::pair try_push(int q_idx, myobj&& value) + { + if (not running) { + return {false, std::move(value)}; + } + { + std::lock_guard lck(mutex); + if (queues[q_idx].size() >= capacity) { + return {false, std::move(value)}; + } + queues[q_idx].push(std::move(value)); + } + cv.notify_one(); + return {true, std::move(value)}; + } + + int wait_pop(myobj* value) + { + std::unique_lock lock(mutex); + while (running) { + cv.wait(lock); + // Round-robin for all queues + for (uint32_t i = 0; queues.size(); ++i) { + spin_idx = (spin_idx + 1) % queues.size(); + if (queues_active[spin_idx] and not queues[spin_idx].empty()) { + if (value) { + *value = std::move(queues[spin_idx].front()); + } + queues[spin_idx].pop(); + return spin_idx; + } + } + } + return -1; + } + + bool empty(int qidx) + { + std::lock_guard lck(mutex); + return queues[qidx].empty(); + } + + size_t size(int qidx) + { + std::lock_guard lck(mutex); + return queues[qidx].size(); + } + + const myobj& front(int qidx) + { + std::lock_guard lck(mutex); + return queues.front(); + } + + void erase_queue(int qidx) + { + std::lock_guard lck(mutex); + if (queues_active[qidx]) { + queues_active[qidx] = false; + while (not queues[qidx].empty()) { + queues[qidx].pop(); + } + } + } + + bool is_queue_active(int qidx) + { + std::lock_guard lck(mutex); + return queues_active[qidx]; + } + +private: + std::mutex mutex; + std::condition_variable cv; + uint32_t spin_idx = 0; + bool running = true; + std::vector queues_active; + std::vector queues; + uint32_t capacity; +}; + +} // namespace srslte + +#endif // SRSLTE_MULTIQUEUE_H diff --git a/lib/test/common/CMakeLists.txt b/lib/test/common/CMakeLists.txt index ec68d1782..72da402aa 100644 --- a/lib/test/common/CMakeLists.txt +++ b/lib/test/common/CMakeLists.txt @@ -70,3 +70,7 @@ target_link_libraries(mac_nr_pdu_test srslte_phy srslte_common ${CMAKE_THREAD_LI add_test(mac_nr_pdu_test mac_nr_pdu_test) add_executable(stack_procedure_test stack_procedure_test.cc) + +add_executable(queue_test queue_test.cc) +target_link_libraries(queue_test srslte_common ${CMAKE_THREAD_LIBS_INIT}) +add_test(queue_test queue_test) diff --git a/lib/test/common/queue_test.cc b/lib/test/common/queue_test.cc new file mode 100644 index 000000000..bf5a56a48 --- /dev/null +++ b/lib/test/common/queue_test.cc @@ -0,0 +1,80 @@ +/* + * Copyright 2013-2019 Software Radio Systems Limited + * + * This file is part of srsLTE. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#include +#include + +#define TESTASSERT(cond) \ + { \ + if (!(cond)) { \ + std::cout << "[" << __FUNCTION__ << "][Line " << __LINE__ << "]: FAIL at " << (#cond) << std::endl; \ + return -1; \ + } \ + } + +using namespace srslte; + +int test_multiqueue() +{ + std::cout << "\n======= TEST multiqueue test: start =======\n"; + + int number = 2; + + multiqueue_handler multiqueue; + TESTASSERT(multiqueue.nof_queues() == 0) + + int qid1 = multiqueue.add_queue(); + TESTASSERT(qid1 == 0 and multiqueue.is_queue_active(qid1)) + TESTASSERT(multiqueue.size(qid1) == 0 and multiqueue.empty(qid1)) + TESTASSERT(multiqueue.nof_queues() == 1) + TESTASSERT(multiqueue.try_push(qid1, 5).first) + TESTASSERT(multiqueue.try_push(qid1, number)) + TESTASSERT(multiqueue.size(qid1) == 2 and not multiqueue.empty(qid1)) + TESTASSERT(multiqueue.wait_pop(&number) == qid1) + TESTASSERT(number == 5) + TESTASSERT(multiqueue.wait_pop(&number) == qid1) + TESTASSERT(number == 2 and multiqueue.empty(qid1) and multiqueue.size(qid1) == 0) + + int qid2 = multiqueue.add_queue(); + TESTASSERT(qid2 == 1) + TESTASSERT(multiqueue.nof_queues() == 2 and multiqueue.is_queue_active(qid1)) + TESTASSERT(multiqueue.try_push(qid2, 3).first) + TESTASSERT(multiqueue.size(qid2) == 1 and not multiqueue.empty(qid2)) + TESTASSERT(multiqueue.empty(qid1) and multiqueue.size(qid1) == 0) + + multiqueue.erase_queue(qid1); + TESTASSERT(multiqueue.nof_queues() == 1 and not multiqueue.is_queue_active(qid1)) + qid1 = multiqueue.add_queue(); + TESTASSERT(qid1 == 0) + TESTASSERT(multiqueue.empty(qid1) and multiqueue.is_queue_active(qid1)) + TESTASSERT(multiqueue.try_push(qid1, number)) + TESTASSERT(multiqueue.size(qid1) == 1) + + std::cout << "outcome: Success\n"; + std::cout << "===========================================\n"; + + return 0; +} + +int main() +{ + TESTASSERT(test_multiqueue() == 0); +} diff --git a/srsue/hdr/stack/ue_stack_lte.h b/srsue/hdr/stack/ue_stack_lte.h index 928c7500d..4d90c5ac8 100644 --- a/srsue/hdr/stack/ue_stack_lte.h +++ b/srsue/hdr/stack/ue_stack_lte.h @@ -40,8 +40,9 @@ #include "upper/usim.h" #include "srslte/common/buffer_pool.h" -#include "srslte/interfaces/ue_interfaces.h" #include "srslte/common/log_filter.h" +#include "srslte/common/multiqueue.h" +#include "srslte/interfaces/ue_interfaces.h" #include "srsue/hdr/ue_metrics_interface.h" #include "ue_stack_base.h" @@ -149,8 +150,18 @@ private: gw_interface_stack* gw = nullptr; // Thread - static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD - srslte::block_queue > pending_tasks; + static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD + + // NOTE: we use this struct instead of a std::function bc lambdas can't capture by move in C++11 + struct task_t { + std::function func; + srslte::unique_byte_buffer_t pdu; + task_t() = default; + task_t(std::function f_) : func(std::move(f_)) {} + void operator()() { func(this); } + }; + srslte::multiqueue_handler pending_tasks; + int sync_queue_id = -1, ue_queue_id = -1; }; } // namespace srsue diff --git a/srsue/src/stack/ue_stack_lte.cc b/srsue/src/stack/ue_stack_lte.cc index 864d4a697..67de04067 100644 --- a/srsue/src/stack/ue_stack_lte.cc +++ b/srsue/src/stack/ue_stack_lte.cc @@ -40,6 +40,8 @@ ue_stack_lte::ue_stack_lte() : nas(&nas_log), thread("STACK") { + ue_queue_id = pending_tasks.add_queue(); + sync_queue_id = pending_tasks.add_queue(); } ue_stack_lte::~ue_stack_lte() @@ -130,7 +132,7 @@ int ue_stack_lte::init(const stack_args_t& args_, srslte::logger* logger_) void ue_stack_lte::stop() { if (running) { - pending_tasks.push([this]() { stop_impl(); }); + pending_tasks.try_push(ue_queue_id, task_t{[this](task_t*) { stop_impl(); }}); wait_thread_finish(); } } @@ -159,7 +161,8 @@ bool ue_stack_lte::switch_on() { if (running) { proc_state_t proc_result = proc_state_t::on_going; - pending_tasks.push([this, &proc_result]() { nas.start_attach_request(&proc_result); }); + pending_tasks.try_push(ue_queue_id, + task_t{[this, &proc_result](task_t*) { nas.start_attach_request(&proc_result); }}); while (proc_result == proc_state_t::on_going) { usleep(1000); } @@ -202,25 +205,25 @@ bool ue_stack_lte::get_metrics(stack_metrics_t* metrics) void ue_stack_lte::run_thread() { while (running) { - // FIXME: For now it is a single queue - std::function func = pending_tasks.wait_pop(); - func(); + task_t task; + pending_tasks.wait_pop(&task); + task(); } } void ue_stack_lte::in_sync() { - pending_tasks.push([this]() { rrc.in_sync(); }); + pending_tasks.try_push(sync_queue_id, task_t{[this](task_t*) { rrc.in_sync(); }}); } void ue_stack_lte::out_of_sync() { - pending_tasks.push([this]() { rrc.out_of_sync(); }); + pending_tasks.try_push(sync_queue_id, task_t{[this](task_t*) { rrc.out_of_sync(); }}); } void ue_stack_lte::run_tti(uint32_t tti) { - pending_tasks.push([this, tti]() { run_tti_impl(tti); }); + pending_tasks.try_push(sync_queue_id, task_t{[this, tti](task_t*) { run_tti_impl(tti); }}); } void ue_stack_lte::run_tti_impl(uint32_t tti)