From 0d800eb8f628532118cfc51136579b2679eb819a Mon Sep 17 00:00:00 2001 From: Francisco Date: Fri, 23 Apr 2021 19:08:18 +0100 Subject: [PATCH] stack, multiqueue - bugfix for multiqueue destruction, and addition of unit test --- lib/include/srsran/common/multiqueue.h | 10 ++-- lib/include/srsran/common/task_scheduler.h | 2 +- lib/test/common/multiqueue_test.cc | 60 ++++++++++++++++++++-- srsenb/hdr/stack/enb_stack_lte.h | 2 +- srsenb/src/stack/enb_stack_lte.cc | 4 +- 5 files changed, 66 insertions(+), 12 deletions(-) diff --git a/lib/include/srsran/common/multiqueue.h b/lib/include/srsran/common/multiqueue.h index 14130057f..1b181be01 100644 --- a/lib/include/srsran/common/multiqueue.h +++ b/lib/include/srsran/common/multiqueue.h @@ -208,9 +208,9 @@ public: explicit multiqueue_handler(uint32_t default_capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) : default_capacity(default_capacity_) {} - ~multiqueue_handler() { reset(); } + ~multiqueue_handler() { stop(); } - void reset() + void stop() { std::unique_lock lock(mutex); running = false; @@ -222,8 +222,10 @@ public: cv_empty.notify_one(); cv_exit.wait(lock); } - // queue destructor ensures that the pushing threads have been notified of the queue deactivation in a blocking way - queues.clear(); + for (auto& q : queues) { + // ensure the queues are finished being deactivated + q.deactivate_blocking(); + } } /** diff --git a/lib/include/srsran/common/task_scheduler.h b/lib/include/srsran/common/task_scheduler.h index 27b9bdeed..fc0b5d3c5 100644 --- a/lib/include/srsran/common/task_scheduler.h +++ b/lib/include/srsran/common/task_scheduler.h @@ -33,7 +33,7 @@ public: task_scheduler& operator=(const task_scheduler&) = delete; task_scheduler& operator=(task_scheduler&&) = delete; - void stop() { external_tasks.reset(); } + void stop() { external_tasks.stop(); } srsran::unique_timer get_unique_timer() { return timers.get_unique_timer(); } diff --git a/lib/test/common/multiqueue_test.cc b/lib/test/common/multiqueue_test.cc index 67e36a3df..beaebd472 100644 --- a/lib/test/common/multiqueue_test.cc +++ b/lib/test/common/multiqueue_test.cc @@ -148,7 +148,7 @@ int test_multiqueue_threading() } TESTASSERT(qid1.size() == 0); - multiqueue.reset(); + multiqueue.stop(); t1.join(); std::cout << "outcome: Success\n"; @@ -182,7 +182,7 @@ int test_multiqueue_threading2() TESTASSERT(t1_running); } - multiqueue.reset(); + multiqueue.stop(); t1.join(); std::cout << "outcome: Success\n"; @@ -214,7 +214,60 @@ int test_multiqueue_threading3() TESTASSERT((int)qid1.size() == 0); // Should be able to unlock all - multiqueue.reset(); + multiqueue.stop(); + TESTASSERT(multiqueue.nof_queues() == 0); + TESTASSERT(not qid1.active()); + t1.join(); + TESTASSERT(t1_success); + + std::cout << "outcome: Success\n"; + std::cout << "===================================================\n"; + + return 0; +} + +int test_multiqueue_threading4() +{ + std::cout << "\n===== TEST multiqueue threading test 4: start =====\n"; + // Description: the consumer will block on popping, but the pushing from different producers + // should be sufficient to awake it when necessary + + int capacity = 4; + multiqueue_handler multiqueue(capacity); + auto qid1 = multiqueue.add_queue(); + auto qid2 = multiqueue.add_queue(); + auto qid3 = multiqueue.add_queue(); + auto qid4 = multiqueue.add_queue(); + auto pop_blocking_func = [&multiqueue](bool* success) { + int number = 0, count = 0; + while (multiqueue.wait_pop(&number)) { + TESTASSERT(number == count++); + } + *success = true; + }; + + bool t1_success = false; + std::thread t1(pop_blocking_func, &t1_success); + + for (int i = 0; i < 1000; ++i) { + switch (i % 3) { + case 0: + qid1.push(i); + break; + case 1: + qid2.push(i); + break; + case 2: + qid4.push(i); + break; + default: + break; + } + usleep(10); + } + + // Should be able to unlock all + multiqueue.stop(); TESTASSERT(multiqueue.nof_queues() == 0); TESTASSERT(not qid1.active()); t1.join(); @@ -417,6 +470,7 @@ int main() TESTASSERT(test_multiqueue_threading() == 0); TESTASSERT(test_multiqueue_threading2() == 0); TESTASSERT(test_multiqueue_threading3() == 0); + TESTASSERT(test_multiqueue_threading4() == 0); TESTASSERT(test_task_thread_pool() == 0); TESTASSERT(test_task_thread_pool2() == 0); diff --git a/srsenb/hdr/stack/enb_stack_lte.h b/srsenb/hdr/stack/enb_stack_lte.h index c7c1533bf..6af11ce6b 100644 --- a/srsenb/hdr/stack/enb_stack_lte.h +++ b/srsenb/hdr/stack/enb_stack_lte.h @@ -121,7 +121,7 @@ private: // task handling srsran::task_scheduler task_sched; - srsran::task_queue_handle enb_task_queue, gtpu_task_queue, mme_task_queue, sync_task_queue; + srsran::task_queue_handle enb_task_queue, sync_task_queue; srsenb::mac mac; srsenb::rlc rlc; diff --git a/srsenb/src/stack/enb_stack_lte.cc b/srsenb/src/stack/enb_stack_lte.cc index 4f38c942c..50bbee745 100644 --- a/srsenb/src/stack/enb_stack_lte.cc +++ b/srsenb/src/stack/enb_stack_lte.cc @@ -41,9 +41,7 @@ enb_stack_lte::enb_stack_lte(srslog::sink& log_sink) : pending_stack_metrics(64) { get_background_workers().set_nof_workers(2); - enb_task_queue = task_sched.make_task_queue(); - mme_task_queue = task_sched.make_task_queue(); - gtpu_task_queue = task_sched.make_task_queue(); + enb_task_queue = task_sched.make_task_queue(); // sync_queue is added in init() }