From ef9d1b8c13e8e0ee9e474735b95f89e2ef9c92d4 Mon Sep 17 00:00:00 2001 From: Francisco Date: Fri, 23 Apr 2021 16:42:42 +0100 Subject: [PATCH] stack optimization - reduction of contention in multiqueue class With the new design, each queue created in the multiqueue object has its own mutex. Pushing tasks to separate queues will, therefore, not cause contention. There will be, however, still contention between the popping thread and the pushing threads. --- lib/include/srsran/common/multiqueue.h | 637 +++++++++++++++------ lib/include/srsran/common/task_scheduler.h | 18 +- lib/test/common/multiqueue_test.cc | 150 +++-- srsenb/src/stack/enb_stack_lte.cc | 2 +- srsue/src/stack/mac/mac.cc | 6 +- srsue/src/stack/ue_stack_lte.cc | 4 +- srsue/src/stack/ue_stack_nr.cc | 4 +- 7 files changed, 549 insertions(+), 272 deletions(-) diff --git a/lib/include/srsran/common/multiqueue.h b/lib/include/srsran/common/multiqueue.h index 5275c4ad1..f0076cdb3 100644 --- a/lib/include/srsran/common/multiqueue.h +++ b/lib/include/srsran/common/multiqueue.h @@ -19,6 +19,7 @@ #ifndef SRSRAN_MULTIQUEUE_H #define SRSRAN_MULTIQUEUE_H +#include "srsran/adt/circular_buffer.h" #include "srsran/adt/move_callback.h" #include #include @@ -31,53 +32,416 @@ namespace srsran { #define MULTIQUEUE_DEFAULT_CAPACITY (8192) // Default per-queue capacity +// template +// class multiqueue_handler +//{ +// class circular_buffer +// { +// public: +// circular_buffer(uint32_t cap) : buffer(cap + 1) {} +// circular_buffer(circular_buffer&& other) noexcept +// { +// active = other.active; +// other.active = false; +// widx = other.widx; +// ridx = other.ridx; +// buffer = std::move(other.buffer); +// } +// +// std::condition_variable cv_full; +// bool active = true; +// +// bool empty() const { return widx == ridx; } +// size_t size() const { return widx >= ridx ? widx - ridx : widx + (buffer.size() - ridx); } +// bool full() const { return (ridx > 0) ? widx == ridx - 1 : widx == buffer.size() - 1; } +// size_t capacity() const { return buffer.size() - 1; } +// +// template +// void push(T&& o) noexcept +// { +// buffer[widx++] = std::forward(o); +// if (widx >= buffer.size()) { +// widx = 0; +// } +// } +// +// void pop() noexcept +// { +// ridx++; +// if (ridx >= buffer.size()) { +// ridx = 0; +// } +// } +// +// myobj& front() noexcept { return buffer[ridx]; } +// const myobj& front() const noexcept { return buffer[ridx]; } +// +// private: +// std::vector buffer; +// size_t widx = 0, ridx = 0; +// }; +// +// public: +// class queue_handle +// { +// public: +// queue_handle() = default; +// queue_handle(multiqueue_handler* parent_, int id) : parent(parent_), queue_id(id) {} +// template +// void push(FwdRef&& value) +// { +// parent->push(queue_id, std::forward(value)); +// } +// bool try_push(const myobj& value) { return parent->try_push(queue_id, value); } +// std::pair try_push(myobj&& value) { return parent->try_push(queue_id, std::move(value)); } +// size_t size() { return parent->size(queue_id); } +// +// private: +// multiqueue_handler* parent = nullptr; +// int queue_id = -1; +// }; +// +// explicit multiqueue_handler(uint32_t capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) : capacity(capacity_) {} +// ~multiqueue_handler() { reset(); } +// +// void reset() +// { +// std::unique_lock lock(mutex); +// running = false; +// while (nof_threads_waiting > 0) { +// uint32_t size = queues.size(); +// cv_empty.notify_one(); +// for (uint32_t i = 0; i < size; ++i) { +// queues[i].cv_full.notify_all(); +// } +// // wait for all threads to unblock +// cv_exit.wait(lock); +// } +// queues.clear(); +// } +// +// /** +// * Adds a new queue with fixed capacity +// * @param capacity_ The capacity of the queue. +// * @return The index of the newly created (or reused) queue within the vector of queues. +// */ +// int add_queue(uint32_t capacity_) +// { +// uint32_t qidx = 0; +// std::lock_guard lock(mutex); +// if (not running) { +// return -1; +// } +// for (; qidx < queues.size() and queues[qidx].active; ++qidx) +// ; +// +// // check if there is a free queue of the required size +// if (qidx == queues.size() || queues[qidx].capacity() != capacity_) { +// // create new queue +// queues.emplace_back(capacity_); +// qidx = queues.size() - 1; // update qidx to the last element +// } else { +// queues[qidx].active = true; +// } +// return (int)qidx; +// } +// +// /** +// * Add queue using the default capacity of the underlying multiqueue +// * @return The queue index +// */ +// int add_queue() { return add_queue(capacity); } +// +// int nof_queues() +// { +// std::lock_guard lock(mutex); +// uint32_t count = 0; +// for (uint32_t i = 0; i < queues.size(); ++i) { +// count += queues[i].active ? 1 : 0; +// } +// return count; +// } +// +// template +// void push(int q_idx, FwdRef&& value) +// { +// { +// std::unique_lock lock(mutex); +// while (is_queue_active_(q_idx) and queues[q_idx].full()) { +// nof_threads_waiting++; +// queues[q_idx].cv_full.wait(lock); +// nof_threads_waiting--; +// } +// if (not is_queue_active_(q_idx)) { +// cv_exit.notify_one(); +// return; +// } +// queues[q_idx].push(std::forward(value)); +// } +// cv_empty.notify_one(); +// } +// +// bool try_push(int q_idx, const myobj& value) +// { +// { +// std::lock_guard lock(mutex); +// if (not is_queue_active_(q_idx) or queues[q_idx].full()) { +// return false; +// } +// queues[q_idx].push(value); +// } +// cv_empty.notify_one(); +// return true; +// } +// +// std::pair try_push(int q_idx, myobj&& value) +// { +// { +// std::lock_guard lck(mutex); +// if (not is_queue_active_(q_idx) or queues[q_idx].full()) { +// return {false, std::move(value)}; +// } +// queues[q_idx].push(std::move(value)); +// } +// cv_empty.notify_one(); +// return {true, std::move(value)}; +// } +// +// int wait_pop(myobj* value) +// { +// std::unique_lock lock(mutex); +// while (running) { +// if (round_robin_pop_(value)) { +// if (nof_threads_waiting > 0) { +// lock.unlock(); +// queues[spin_idx].cv_full.notify_one(); +// } +// return spin_idx; +// } +// nof_threads_waiting++; +// cv_empty.wait(lock); +// nof_threads_waiting--; +// } +// cv_exit.notify_one(); +// return -1; +// } +// +// int try_pop(myobj* value) +// { +// std::unique_lock lock(mutex); +// if (running) { +// if (round_robin_pop_(value)) { +// if (nof_threads_waiting > 0) { +// lock.unlock(); +// queues[spin_idx].cv_full.notify_one(); +// } +// return spin_idx; +// } +// // didn't find any task +// return -1; +// } +// cv_exit.notify_one(); +// return -1; +// } +// +// bool empty(int qidx) +// { +// std::lock_guard lck(mutex); +// return queues[qidx].empty(); +// } +// +// size_t size(int qidx) +// { +// std::lock_guard lck(mutex); +// return queues[qidx].size(); +// } +// +// size_t max_size(int qidx) +// { +// std::lock_guard lck(mutex); +// return queues[qidx].capacity(); +// } +// +// const myobj& front(int qidx) +// { +// std::lock_guard lck(mutex); +// return queues[qidx].front(); +// } +// +// void erase_queue(int qidx) +// { +// std::lock_guard lck(mutex); +// if (is_queue_active_(qidx)) { +// queues[qidx].active = false; +// while (not queues[qidx].empty()) { +// queues[qidx].pop(); +// } +// } +// } +// +// bool is_queue_active(int qidx) +// { +// std::lock_guard lck(mutex); +// return is_queue_active_(qidx); +// } +// +// queue_handle get_queue_handler() { return {this, add_queue()}; } +// queue_handle get_queue_handler(uint32_t size) { return {this, add_queue(size)}; } +// +// private: +// bool is_queue_active_(int qidx) const { return running and queues[qidx].active; } +// +// bool round_robin_pop_(myobj* value) +// { +// // Round-robin for all queues +// for (const circular_buffer& q : queues) { +// spin_idx = (spin_idx + 1) % queues.size(); +// if (is_queue_active_(spin_idx) and not queues[spin_idx].empty()) { +// if (value) { +// *value = std::move(queues[spin_idx].front()); +// } +// queues[spin_idx].pop(); +// return true; +// } +// } +// return false; +// } +// +// std::mutex mutex; +// std::condition_variable cv_empty, cv_exit; +// uint32_t spin_idx = 0; +// bool running = true; +// std::vector queues; +// uint32_t capacity = 0; +// uint32_t nof_threads_waiting = 0; +// }; + +/** + * N-to-1 Message-Passing Broker that manages the creation, destruction of input ports, and popping of messages that + * are pushed to these ports. + * Each port provides a thread-safe push(...) / try_push(...) interface to enqueue messages + * The class will pop from the several created ports in a round-robin fashion. + * The popping() interface is not safe-thread. That means, that it is expected that only one thread will + * be popping tasks. + * @tparam myobj message type + */ template class multiqueue_handler { - class circular_buffer + class input_port_impl { public: - circular_buffer(uint32_t cap) : buffer(cap + 1) {} - circular_buffer(circular_buffer&& other) noexcept + input_port_impl(uint32_t cap, multiqueue_handler* parent_) : buffer(cap), parent(parent_) {} + input_port_impl(input_port_impl&& other) noexcept { - active = other.active; - other.active = false; - widx = other.widx; - ridx = other.ridx; - buffer = std::move(other.buffer); + std::lock_guard lock(other.q_mutex); + active_ = other.active_; + parent = other.parent_; + other.active_ = false; + buffer = std::move(other.buffer); + } + ~input_port_impl() { set_active_blocking(false); } + + size_t capacity() const { return buffer.max_size(); } + size_t size() const + { + std::lock_guard lock(q_mutex); + return buffer.size(); + } + bool active() const + { + std::lock_guard lock(q_mutex); + return active_; } - std::condition_variable cv_full; - bool active = true; + void set_active(bool val) + { + std::unique_lock lock(q_mutex); + if (val == active_) { + return; + } + active_ = val; - bool empty() const { return widx == ridx; } - size_t size() const { return widx >= ridx ? widx - ridx : widx + (buffer.size() - ridx); } - bool full() const { return (ridx > 0) ? widx == ridx - 1 : widx == buffer.size() - 1; } - size_t capacity() const { return buffer.size() - 1; } + if (not active_) { + buffer.clear(); + lock.unlock(); + cv_full.notify_all(); + } + } + + void set_active_blocking(bool val) + { + set_active(val); + + if (not val) { + // wait for all the pushers to unlock + std::unique_lock lock(q_mutex); + while (nof_waiting > 0) { + cv_exit.wait(lock); + } + } + } template void push(T&& o) noexcept { - buffer[widx++] = std::forward(o); - if (widx >= buffer.size()) { - widx = 0; - } + push_(&o, true); } - void pop() noexcept + bool try_push(const myobj& o) { return push_(&o, false); } + + srsran::error_type try_push(myobj&& o) { - ridx++; - if (ridx >= buffer.size()) { - ridx = 0; + if (push_(&o, false)) { + return {}; } + return {std::move(o)}; } - myobj& front() noexcept { return buffer[ridx]; } - const myobj& front() const noexcept { return buffer[ridx]; } + bool try_pop(myobj& obj) + { + std::unique_lock lock(q_mutex); + if (buffer.empty()) { + return false; + } + obj = std::move(buffer.top()); + buffer.pop(); + if (nof_waiting > 0) { + lock.unlock(); + cv_full.notify_one(); + } + return true; + } private: - std::vector buffer; - size_t widx = 0, ridx = 0; + template + bool push_(T* o, bool blocking) noexcept + { + { + std::unique_lock lock(q_mutex); + while (active_ and blocking and buffer.full()) { + nof_waiting++; + cv_full.wait(lock); + nof_waiting--; + } + if (not active_) { + lock.unlock(); + cv_exit.notify_one(); + return false; + } + buffer.push(std::forward(*o)); + } + parent->cv_empty.notify_one(); + return true; + } + + multiqueue_handler* parent = nullptr; + + mutable std::mutex q_mutex; + srsran::dyn_circular_buffer buffer; + std::condition_variable cv_full, cv_exit; + bool active_ = true; + int nof_waiting = 0; }; public: @@ -85,37 +449,53 @@ public: { public: queue_handle() = default; - queue_handle(multiqueue_handler* parent_, int id) : parent(parent_), queue_id(id) {} + queue_handle(input_port_impl* impl_) : impl(impl_) {} template void push(FwdRef&& value) { - parent->push(queue_id, std::forward(value)); + impl->push(std::forward(value)); } - bool try_push(const myobj& value) { return parent->try_push(queue_id, value); } - std::pair try_push(myobj&& value) { return parent->try_push(queue_id, std::move(value)); } - size_t size() { return parent->size(queue_id); } + bool try_push(const myobj& value) { return impl->try_push(value); } + srsran::error_type try_push(myobj&& value) { return impl->try_push(std::move(value)); } + void reset() + { + if (impl != nullptr) { + impl->set_active_blocking(false); + impl = nullptr; + } + } + + size_t size() { return impl->size(); } + size_t capacity() { return impl->capacity(); } + bool active() const { return impl != nullptr and impl->active(); } + bool empty() const { return impl->size() == 0; } + + bool operator==(const queue_handle& other) const { return impl == other.impl; } + bool operator!=(const queue_handle& other) const { return impl != other.impl; } private: - multiqueue_handler* parent = nullptr; - int queue_id = -1; + input_port_impl* impl = nullptr; }; - explicit multiqueue_handler(uint32_t capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) : capacity(capacity_) {} + explicit multiqueue_handler(uint32_t default_capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) : capacity(default_capacity_) {} ~multiqueue_handler() { reset(); } void reset() { std::unique_lock lock(mutex); running = false; - while (nof_threads_waiting > 0) { - uint32_t size = queues.size(); + for (auto& q : queues) { + // signal deactivation to pushing threads in a non-blocking way + q.set_active(false); + } + while (wait_pop_state) { cv_empty.notify_one(); - for (uint32_t i = 0; i < size; ++i) { - queues[i].cv_full.notify_all(); - } - // wait for all threads to unblock cv_exit.wait(lock); } + for (auto& q : queues) { + // ensure that all queues are completed with the deactivation before clearing the memory + q.set_active_blocking(false); + } queues.clear(); } @@ -124,197 +504,98 @@ public: * @param capacity_ The capacity of the queue. * @return The index of the newly created (or reused) queue within the vector of queues. */ - int add_queue(uint32_t capacity_) + queue_handle add_queue(uint32_t capacity_) { uint32_t qidx = 0; std::lock_guard lock(mutex); if (not running) { - return -1; + return queue_handle(); } - for (; qidx < queues.size() and queues[qidx].active; ++qidx) + for (; qidx < queues.size() and (queues[qidx].active() or (queues[qidx].capacity() != capacity_)); ++qidx) ; // check if there is a free queue of the required size - if (qidx == queues.size() || queues[qidx].capacity() != capacity_) { + if (qidx == queues.size()) { // create new queue - queues.emplace_back(capacity_); + queues.emplace_back(capacity_, this); qidx = queues.size() - 1; // update qidx to the last element } else { - queues[qidx].active = true; + queues[qidx].set_active(true); } - return (int)qidx; + return queue_handle(&queues[qidx]); } /** * Add queue using the default capacity of the underlying multiqueue * @return The queue index */ - int add_queue() { return add_queue(capacity); } + queue_handle add_queue() { return add_queue(capacity); } - int nof_queues() + uint32_t nof_queues() const { std::lock_guard lock(mutex); uint32_t count = 0; for (uint32_t i = 0; i < queues.size(); ++i) { - count += queues[i].active ? 1 : 0; + count += queues[i].active() ? 1 : 0; } return count; } - template - void push(int q_idx, FwdRef&& value) - { - { - std::unique_lock lock(mutex); - while (is_queue_active_(q_idx) and queues[q_idx].full()) { - nof_threads_waiting++; - queues[q_idx].cv_full.wait(lock); - nof_threads_waiting--; - } - if (not is_queue_active_(q_idx)) { - cv_exit.notify_one(); - return; - } - queues[q_idx].push(std::forward(value)); - } - cv_empty.notify_one(); - } - - bool try_push(int q_idx, const myobj& value) - { - { - std::lock_guard lock(mutex); - if (not is_queue_active_(q_idx) or queues[q_idx].full()) { - return false; - } - queues[q_idx].push(value); - } - cv_empty.notify_one(); - return true; - } - - std::pair try_push(int q_idx, myobj&& value) - { - { - std::lock_guard lck(mutex); - if (not is_queue_active_(q_idx) or queues[q_idx].full()) { - return {false, std::move(value)}; - } - queues[q_idx].push(std::move(value)); - } - cv_empty.notify_one(); - return {true, std::move(value)}; - } - - int wait_pop(myobj* value) + bool wait_pop(myobj* value) { std::unique_lock lock(mutex); while (running) { if (round_robin_pop_(value)) { - if (nof_threads_waiting > 0) { - lock.unlock(); - queues[spin_idx].cv_full.notify_one(); - } - return spin_idx; + return true; } - nof_threads_waiting++; + wait_pop_state = true; cv_empty.wait(lock); - nof_threads_waiting--; + wait_pop_state = false; } - cv_exit.notify_one(); - return -1; + if (not running) { + cv_exit.notify_one(); + } + return false; } - int try_pop(myobj* value) + bool try_pop(myobj* value) { std::unique_lock lock(mutex); - if (running) { - if (round_robin_pop_(value)) { - if (nof_threads_waiting > 0) { - lock.unlock(); - queues[spin_idx].cv_full.notify_one(); - } - return spin_idx; - } - // didn't find any task - return -1; + if (running and round_robin_pop_(value)) { + return true; } - cv_exit.notify_one(); - return -1; + return false; } - bool empty(int qidx) - { - std::lock_guard lck(mutex); - return queues[qidx].empty(); - } - - size_t size(int qidx) - { - std::lock_guard lck(mutex); - return queues[qidx].size(); - } - - size_t max_size(int qidx) - { - std::lock_guard lck(mutex); - return queues[qidx].capacity(); - } - - const myobj& front(int qidx) - { - std::lock_guard lck(mutex); - return queues[qidx].front(); - } - - void erase_queue(int qidx) - { - std::lock_guard lck(mutex); - if (is_queue_active_(qidx)) { - queues[qidx].active = false; - while (not queues[qidx].empty()) { - queues[qidx].pop(); - } - } - } - - bool is_queue_active(int qidx) - { - std::lock_guard lck(mutex); - return is_queue_active_(qidx); - } - - queue_handle get_queue_handler() { return {this, add_queue()}; } - queue_handle get_queue_handler(uint32_t size) { return {this, add_queue(size)}; } - private: - bool is_queue_active_(int qidx) const { return running and queues[qidx].active; } - bool round_robin_pop_(myobj* value) { // Round-robin for all queues - for (const circular_buffer& q : queues) { - spin_idx = (spin_idx + 1) % queues.size(); - if (is_queue_active_(spin_idx) and not queues[spin_idx].empty()) { - if (value) { - *value = std::move(queues[spin_idx].front()); - } - queues[spin_idx].pop(); + auto it = queues.begin() + spin_idx; + uint32_t count = 0; + for (; count < queues.size(); ++count, ++it) { + if (it == queues.end()) { + it = queues.begin(); // wrap-around + } + if (it->try_pop(*value)) { + spin_idx = (spin_idx + count + 1) % queues.size(); return true; } } return false; } - std::mutex mutex; - std::condition_variable cv_empty, cv_exit; - uint32_t spin_idx = 0; - bool running = true; - std::vector queues; - uint32_t capacity = 0; - uint32_t nof_threads_waiting = 0; + mutable std::mutex mutex; + std::condition_variable cv_empty, cv_exit; + uint32_t spin_idx = 0; + bool running = true, wait_pop_state = false; + std::deque queues; + uint32_t capacity = 0; }; +template +using queue_handle = typename multiqueue_handler::queue_handle; + //! Specialization for tasks using task_multiqueue = multiqueue_handler; using task_queue_handle = task_multiqueue::queue_handle; diff --git a/lib/include/srsran/common/task_scheduler.h b/lib/include/srsran/common/task_scheduler.h index 5bba56fee..27b9bdeed 100644 --- a/lib/include/srsran/common/task_scheduler.h +++ b/lib/include/srsran/common/task_scheduler.h @@ -26,7 +26,7 @@ public: explicit task_scheduler(uint32_t default_extern_tasks_size = 512, uint32_t nof_timers_prealloc = 100) : external_tasks{default_extern_tasks_size}, timers{nof_timers_prealloc} { - background_queue_id = external_tasks.add_queue(); + background_queue = external_tasks.add_queue(); } task_scheduler(const task_scheduler&) = delete; task_scheduler(task_scheduler&&) = delete; @@ -38,8 +38,8 @@ public: srsran::unique_timer get_unique_timer() { return timers.get_unique_timer(); } //! Creates new queue for tasks coming from external thread - srsran::task_queue_handle make_task_queue() { return external_tasks.get_queue_handler(); } - srsran::task_queue_handle make_task_queue(uint32_t qsize) { return external_tasks.get_queue_handler(qsize); } + srsran::task_queue_handle make_task_queue() { return external_tasks.add_queue(); } + srsran::task_queue_handle make_task_queue(uint32_t qsize) { return external_tasks.add_queue(qsize); } //! Delays a task processing by duration_ms template @@ -55,7 +55,7 @@ public: void notify_background_task_result(srsran::move_task_t task) { // run the notification in next tic - external_tasks.push(background_queue_id, std::move(task)); + background_queue.push(std::move(task)); } //! Updates timers, and run any pending internal tasks. @@ -67,7 +67,7 @@ public: bool run_next_task() { srsran::move_task_t task{}; - if (external_tasks.wait_pop(&task) >= 0) { + if (external_tasks.wait_pop(&task)) { task(); run_all_internal_tasks(); return true; @@ -81,7 +81,7 @@ public: { run_all_internal_tasks(); srsran::move_task_t task{}; - while (external_tasks.try_pop(&task) >= 0) { + while (external_tasks.try_pop(&task)) { task(); run_all_internal_tasks(); } @@ -101,9 +101,9 @@ private: } } - int background_queue_id = -1; ///< Queue for handling the outcomes of tasks run in the background - srsran::task_multiqueue external_tasks; - srsran::timer_handler timers; + srsran::task_multiqueue external_tasks; + srsran::task_queue_handle background_queue; ///< Queue for handling the outcomes of tasks run in the background + srsran::timer_handler timers; std::deque internal_tasks; ///< enqueues stack tasks from within main thread. Avoids locking }; diff --git a/lib/test/common/multiqueue_test.cc b/lib/test/common/multiqueue_test.cc index cfd6cf6d5..67e36a3df 100644 --- a/lib/test/common/multiqueue_test.cc +++ b/lib/test/common/multiqueue_test.cc @@ -12,20 +12,13 @@ #include "srsran/adt/move_callback.h" #include "srsran/common/multiqueue.h" +#include "srsran/common/test_common.h" #include "srsran/common/thread_pool.h" #include #include #include #include -#define TESTASSERT(cond) \ - { \ - if (!(cond)) { \ - std::cout << "[" << __FUNCTION__ << "][Line " << __LINE__ << "]: FAIL at " << (#cond) << std::endl; \ - return -1; \ - } \ - } - using namespace srsran; int test_multiqueue() @@ -35,79 +28,80 @@ int test_multiqueue() int number = 2; multiqueue_handler multiqueue; - TESTASSERT(multiqueue.nof_queues() == 0) + TESTASSERT(multiqueue.nof_queues() == 0); // test push/pop and size for one queue - int qid1 = multiqueue.add_queue(); - TESTASSERT(qid1 == 0 and multiqueue.is_queue_active(qid1)) - TESTASSERT(multiqueue.size(qid1) == 0 and multiqueue.empty(qid1)) - TESTASSERT(multiqueue.nof_queues() == 1) - TESTASSERT(multiqueue.try_push(qid1, 5).first) - TESTASSERT(multiqueue.try_push(qid1, number)) - TESTASSERT(multiqueue.size(qid1) == 2 and not multiqueue.empty(qid1)) - TESTASSERT(multiqueue.wait_pop(&number) == qid1) - TESTASSERT(number == 5) - TESTASSERT(multiqueue.wait_pop(&number) == qid1) - TESTASSERT(number == 2 and multiqueue.empty(qid1) and multiqueue.size(qid1) == 0) + queue_handle qid1 = multiqueue.add_queue(); + TESTASSERT(qid1.active()); + TESTASSERT(qid1.size() == 0 and qid1.empty()); + TESTASSERT(multiqueue.nof_queues() == 1); + TESTASSERT(qid1.try_push(5).has_value()); + TESTASSERT(qid1.try_push(number)); + TESTASSERT(qid1.size() == 2 and not qid1.empty()); + TESTASSERT(multiqueue.wait_pop(&number)); + TESTASSERT(number == 5); + TESTASSERT(multiqueue.wait_pop(&number)); + TESTASSERT(number == 2 and qid1.empty()); // test push/pop and size for two queues - int qid2 = multiqueue.add_queue(); - TESTASSERT(qid2 == 1) - TESTASSERT(multiqueue.nof_queues() == 2 and multiqueue.is_queue_active(qid1)) - TESTASSERT(multiqueue.try_push(qid2, 3).first) - TESTASSERT(multiqueue.size(qid2) == 1 and not multiqueue.empty(qid2)) - TESTASSERT(multiqueue.empty(qid1) and multiqueue.size(qid1) == 0) + queue_handle qid2 = multiqueue.add_queue(); + TESTASSERT(qid2.active()); + TESTASSERT(multiqueue.nof_queues() == 2 and qid1.active()); + TESTASSERT(qid2.try_push(3).has_value()); + TESTASSERT(qid2.size() == 1 and not qid2.empty()); + TESTASSERT(qid1.empty()); // check if erasing a queue breaks anything - multiqueue.erase_queue(qid1); - TESTASSERT(multiqueue.nof_queues() == 1 and not multiqueue.is_queue_active(qid1)) + qid1.reset(); + TESTASSERT(multiqueue.nof_queues() == 1 and not qid1.active()); qid1 = multiqueue.add_queue(); - TESTASSERT(qid1 == 0) - TESTASSERT(multiqueue.empty(qid1) and multiqueue.is_queue_active(qid1)) + TESTASSERT(qid1.empty() and qid1.active()); + TESTASSERT(qid2.size() == 1 and not qid2.empty()); multiqueue.wait_pop(&number); // check round-robin for (int i = 0; i < 10; ++i) { - TESTASSERT(multiqueue.try_push(qid1, i)) + TESTASSERT(qid1.try_push(i)); } for (int i = 20; i < 35; ++i) { - TESTASSERT(multiqueue.try_push(qid2, i)) + TESTASSERT(qid2.try_push(i)); } - TESTASSERT(multiqueue.size(qid1) == 10) - TESTASSERT(multiqueue.size(qid2) == 15) - TESTASSERT(multiqueue.wait_pop(&number) == qid1 and number == 0) - TESTASSERT(multiqueue.wait_pop(&number) == qid2 and number == 20) - TESTASSERT(multiqueue.wait_pop(&number) == qid1 and number == 1) - TESTASSERT(multiqueue.wait_pop(&number) == qid2 and number == 21) - TESTASSERT(multiqueue.size(qid1) == 8) - TESTASSERT(multiqueue.size(qid2) == 13) + TESTASSERT(qid1.size() == 10); + TESTASSERT(qid2.size() == 15); + TESTASSERT(multiqueue.wait_pop(&number) and number == 0); + TESTASSERT(multiqueue.wait_pop(&number) and number == 20); + TESTASSERT(multiqueue.wait_pop(&number) and number == 1); + TESTASSERT(multiqueue.wait_pop(&number) and number == 21); + TESTASSERT(qid1.size() == 8); + TESTASSERT(qid2.size() == 13); for (int i = 0; i < 8 * 2; ++i) { multiqueue.wait_pop(&number); } - TESTASSERT(multiqueue.size(qid1) == 0) - TESTASSERT(multiqueue.size(qid2) == 5) - TESTASSERT(multiqueue.wait_pop(&number) == qid2 and number == 30) + TESTASSERT(qid1.size() == 0); + TESTASSERT(qid2.size() == 5); + TESTASSERT(multiqueue.wait_pop(&number) and number == 30); // remove existing queues - multiqueue.erase_queue(qid1); - multiqueue.erase_queue(qid2); - TESTASSERT(multiqueue.nof_queues() == 0) + qid1.reset(); + qid2.reset(); + TESTASSERT(multiqueue.nof_queues() == 0); // check that adding a queue of different capacity works { - int qid1 = multiqueue.add_queue(); - int qid2 = multiqueue.add_queue(); + qid1 = multiqueue.add_queue(); + qid2 = multiqueue.add_queue(); // remove first queue again - multiqueue.erase_queue(qid1); - TESTASSERT(multiqueue.nof_queues() == 1) + qid1.reset(); + TESTASSERT(multiqueue.nof_queues() == 1); // add queue with non-default capacity - int qid3 = multiqueue.add_queue(10); + auto qid3 = multiqueue.add_queue(10); + TESTASSERT(qid3.capacity() == 10); // make sure neither a new queue index is returned - TESTASSERT(qid1 != qid3) - TESTASSERT(qid2 != qid3) + TESTASSERT(qid1 != qid3); + TESTASSERT(qid2 != qid3); } std::cout << "outcome: Success\n"; @@ -122,10 +116,10 @@ int test_multiqueue_threading() int capacity = 4, number = 0, start_number = 2, nof_pushes = capacity + 1; multiqueue_handler multiqueue(capacity); - int qid1 = multiqueue.add_queue(); - auto push_blocking_func = [&multiqueue](int qid, int start_value, int nof_pushes, bool* is_running) { + auto qid1 = multiqueue.add_queue(); + auto push_blocking_func = [](queue_handle* qid, int start_value, int nof_pushes, bool* is_running) { for (int i = 0; i < nof_pushes; ++i) { - multiqueue.push(qid, start_value + i); + qid->push(start_value + i); std::cout << "t1: pushed item " << i << std::endl; } std::cout << "t1: pushed all items\n"; @@ -133,17 +127,17 @@ int test_multiqueue_threading() }; bool t1_running = true; - std::thread t1(push_blocking_func, qid1, start_number, nof_pushes, &t1_running); + std::thread t1(push_blocking_func, &qid1, start_number, nof_pushes, &t1_running); // Wait for queue to fill - while ((int)multiqueue.size(qid1) != capacity) { + while ((int)qid1.size() != capacity) { usleep(1000); - TESTASSERT(t1_running) + TESTASSERT(t1_running); } for (int i = 0; i < nof_pushes; ++i) { - TESTASSERT(multiqueue.wait_pop(&number) == qid1) - TESTASSERT(number == start_number + i) + TESTASSERT(multiqueue.wait_pop(&number)); + TESTASSERT(number == start_number + i); std::cout << "main: popped item " << i << "\n"; } std::cout << "main: popped all items\n"; @@ -152,7 +146,7 @@ int test_multiqueue_threading() while (t1_running) { usleep(1000); } - TESTASSERT(multiqueue.size(qid1) == 0) + TESTASSERT(qid1.size() == 0); multiqueue.reset(); t1.join(); @@ -170,22 +164,22 @@ int test_multiqueue_threading2() int capacity = 4, start_number = 2, nof_pushes = capacity + 1; multiqueue_handler multiqueue(capacity); - int qid1 = multiqueue.add_queue(); - auto push_blocking_func = [&multiqueue](int qid, int start_value, int nof_pushes, bool* is_running) { + auto qid1 = multiqueue.add_queue(); + auto push_blocking_func = [](queue_handle* qid, int start_value, int nof_pushes, bool* is_running) { for (int i = 0; i < nof_pushes; ++i) { - multiqueue.push(qid, start_value + i); + qid->push(start_value + i); } std::cout << "t1: pushed all items\n"; *is_running = false; }; bool t1_running = true; - std::thread t1(push_blocking_func, qid1, start_number, nof_pushes, &t1_running); + std::thread t1(push_blocking_func, &qid1, start_number, nof_pushes, &t1_running); // Wait for queue to fill - while ((int)multiqueue.size(qid1) != capacity) { + while ((int)qid1.size() != capacity) { usleep(1000); - TESTASSERT(t1_running) + TESTASSERT(t1_running); } multiqueue.reset(); @@ -204,23 +198,25 @@ int test_multiqueue_threading3() int capacity = 4; multiqueue_handler multiqueue(capacity); - int qid1 = multiqueue.add_queue(); - auto pop_blocking_func = [&multiqueue](int qid, bool* success) { - int number = 0; - int id = multiqueue.wait_pop(&number); - *success = id < 0; + auto qid1 = multiqueue.add_queue(); + auto pop_blocking_func = [&multiqueue](bool* success) { + int number = 0; + bool ret = multiqueue.wait_pop(&number); + *success = not ret; }; bool t1_success = false; - std::thread t1(pop_blocking_func, qid1, &t1_success); + std::thread t1(pop_blocking_func, &t1_success); - TESTASSERT(not t1_success) + TESTASSERT(not t1_success); usleep(1000); - TESTASSERT(not t1_success) - TESTASSERT((int)multiqueue.size(qid1) == 0) + TESTASSERT(not t1_success); + TESTASSERT((int)qid1.size() == 0); // Should be able to unlock all multiqueue.reset(); + TESTASSERT(multiqueue.nof_queues() == 0); + TESTASSERT(not qid1.active()); t1.join(); TESTASSERT(t1_success); diff --git a/srsenb/src/stack/enb_stack_lte.cc b/srsenb/src/stack/enb_stack_lte.cc index 66fa3915a..4f38c942c 100644 --- a/srsenb/src/stack/enb_stack_lte.cc +++ b/srsenb/src/stack/enb_stack_lte.cc @@ -218,7 +218,7 @@ bool enb_stack_lte::get_metrics(stack_metrics_t* metrics) } }); - if (ret.first) { + if (ret.has_value()) { // wait for result *metrics = pending_stack_metrics.pop_blocking(); return true; diff --git a/srsue/src/stack/mac/mac.cc b/srsue/src/stack/mac/mac.cc index 4be6a1026..dcd504949 100644 --- a/srsue/src/stack/mac/mac.cc +++ b/srsue/src/stack/mac/mac.cc @@ -338,7 +338,7 @@ void mac::bch_decoded_ok(uint32_t cc_idx, uint8_t* payload, uint32_t len) buf->set_timestamp(); auto p = stack_task_dispatch_queue.try_push(std::bind( [this](srsran::unique_byte_buffer_t& buf) { rlc_h->write_pdu_bcch_bch(std::move(buf)); }, std::move(buf))); - if (not p.first) { + if (p.is_error()) { Warning("Failed to dispatch rlc::write_pdu_bcch_bch task to stack"); } } else { @@ -399,7 +399,7 @@ void mac::tb_decoded(uint32_t cc_idx, mac_grant_dl_t grant, bool ack[SRSRAN_MAX_ auto ret = stack_task_dispatch_queue.try_push(std::bind( [this](srsran::unique_byte_buffer_t& pdu) { rlc_h->write_pdu_pcch(std::move(pdu)); }, std::move(pdu))); - if (not ret.first) { + if (ret.is_error()) { Warning("Failed to dispatch rlc::write_pdu_pcch task to stack"); } } else { @@ -477,7 +477,7 @@ void mac::process_pdus() have_data = demux_unit.process_pdus(); } }); - if (not ret.first) { + if (ret.is_error()) { Warning("Failed to dispatch mac::%s task to stack thread", __func__); } } diff --git a/srsue/src/stack/ue_stack_lte.cc b/srsue/src/stack/ue_stack_lte.cc index 2b44b71b8..e903a4a27 100644 --- a/srsue/src/stack/ue_stack_lte.cc +++ b/srsue/src/stack/ue_stack_lte.cc @@ -119,7 +119,7 @@ int ue_stack_lte::init(const stack_args_t& args_) mac_nr_logger.set_hex_dump_max_size(args.log.mac_hex_limit); rrc_nr_logger.set_level(srslog::str_to_basic_level(args.log.rrc_level)); rrc_nr_logger.set_hex_dump_max_size(args.log.rrc_hex_limit); - + // Set up pcap // parse pcap trace list std::vector pcap_list; @@ -341,7 +341,7 @@ void ue_stack_lte::run_thread() void ue_stack_lte::write_sdu(uint32_t lcid, srsran::unique_byte_buffer_t sdu) { auto task = [this, lcid](srsran::unique_byte_buffer_t& sdu) { pdcp.write_sdu(lcid, std::move(sdu)); }; - bool ret = gw_queue_id.try_push(std::bind(task, std::move(sdu))).first; + bool ret = gw_queue_id.try_push(std::bind(task, std::move(sdu))).has_value(); if (not ret) { pdcp_logger.info("GW SDU with lcid=%d was discarded.", lcid); ul_dropped_sdus++; diff --git a/srsue/src/stack/ue_stack_nr.cc b/srsue/src/stack/ue_stack_nr.cc index c670acc14..33ed31c5f 100644 --- a/srsue/src/stack/ue_stack_nr.cc +++ b/srsue/src/stack/ue_stack_nr.cc @@ -154,9 +154,9 @@ void ue_stack_nr::run_thread() void ue_stack_nr::write_sdu(uint32_t lcid, srsran::unique_byte_buffer_t sdu) { if (pdcp != nullptr) { - std::pair ret = gw_task_queue.try_push(std::bind( + auto ret = gw_task_queue.try_push(std::bind( [this, lcid](srsran::unique_byte_buffer_t& sdu) { pdcp->write_sdu(lcid, std::move(sdu)); }, std::move(sdu))); - if (not ret.first) { + if (ret.is_error()) { pdcp_logger.warning("GW SDU with lcid=%d was discarded.", lcid); } }