mirror of https://github.com/PentHertz/srsLTE.git
stack, multiqueue - bugfix for multiqueue destruction, and addition of unit test
This commit is contained in:
parent
d947e259c9
commit
0d800eb8f6
|
@ -208,9 +208,9 @@ public:
|
||||||
explicit multiqueue_handler(uint32_t default_capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) :
|
explicit multiqueue_handler(uint32_t default_capacity_ = MULTIQUEUE_DEFAULT_CAPACITY) :
|
||||||
default_capacity(default_capacity_)
|
default_capacity(default_capacity_)
|
||||||
{}
|
{}
|
||||||
~multiqueue_handler() { reset(); }
|
~multiqueue_handler() { stop(); }
|
||||||
|
|
||||||
void reset()
|
void stop()
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lock(mutex);
|
std::unique_lock<std::mutex> lock(mutex);
|
||||||
running = false;
|
running = false;
|
||||||
|
@ -222,8 +222,10 @@ public:
|
||||||
cv_empty.notify_one();
|
cv_empty.notify_one();
|
||||||
cv_exit.wait(lock);
|
cv_exit.wait(lock);
|
||||||
}
|
}
|
||||||
// queue destructor ensures that the pushing threads have been notified of the queue deactivation in a blocking way
|
for (auto& q : queues) {
|
||||||
queues.clear();
|
// ensure the queues are finished being deactivated
|
||||||
|
q.deactivate_blocking();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -33,7 +33,7 @@ public:
|
||||||
task_scheduler& operator=(const task_scheduler&) = delete;
|
task_scheduler& operator=(const task_scheduler&) = delete;
|
||||||
task_scheduler& operator=(task_scheduler&&) = delete;
|
task_scheduler& operator=(task_scheduler&&) = delete;
|
||||||
|
|
||||||
void stop() { external_tasks.reset(); }
|
void stop() { external_tasks.stop(); }
|
||||||
|
|
||||||
srsran::unique_timer get_unique_timer() { return timers.get_unique_timer(); }
|
srsran::unique_timer get_unique_timer() { return timers.get_unique_timer(); }
|
||||||
|
|
||||||
|
|
|
@ -148,7 +148,7 @@ int test_multiqueue_threading()
|
||||||
}
|
}
|
||||||
TESTASSERT(qid1.size() == 0);
|
TESTASSERT(qid1.size() == 0);
|
||||||
|
|
||||||
multiqueue.reset();
|
multiqueue.stop();
|
||||||
t1.join();
|
t1.join();
|
||||||
|
|
||||||
std::cout << "outcome: Success\n";
|
std::cout << "outcome: Success\n";
|
||||||
|
@ -182,7 +182,7 @@ int test_multiqueue_threading2()
|
||||||
TESTASSERT(t1_running);
|
TESTASSERT(t1_running);
|
||||||
}
|
}
|
||||||
|
|
||||||
multiqueue.reset();
|
multiqueue.stop();
|
||||||
t1.join();
|
t1.join();
|
||||||
|
|
||||||
std::cout << "outcome: Success\n";
|
std::cout << "outcome: Success\n";
|
||||||
|
@ -214,7 +214,60 @@ int test_multiqueue_threading3()
|
||||||
TESTASSERT((int)qid1.size() == 0);
|
TESTASSERT((int)qid1.size() == 0);
|
||||||
|
|
||||||
// Should be able to unlock all
|
// Should be able to unlock all
|
||||||
multiqueue.reset();
|
multiqueue.stop();
|
||||||
|
TESTASSERT(multiqueue.nof_queues() == 0);
|
||||||
|
TESTASSERT(not qid1.active());
|
||||||
|
t1.join();
|
||||||
|
TESTASSERT(t1_success);
|
||||||
|
|
||||||
|
std::cout << "outcome: Success\n";
|
||||||
|
std::cout << "===================================================\n";
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int test_multiqueue_threading4()
|
||||||
|
{
|
||||||
|
std::cout << "\n===== TEST multiqueue threading test 4: start =====\n";
|
||||||
|
// Description: the consumer will block on popping, but the pushing from different producers
|
||||||
|
// should be sufficient to awake it when necessary
|
||||||
|
|
||||||
|
int capacity = 4;
|
||||||
|
multiqueue_handler<int> multiqueue(capacity);
|
||||||
|
auto qid1 = multiqueue.add_queue();
|
||||||
|
auto qid2 = multiqueue.add_queue();
|
||||||
|
auto qid3 = multiqueue.add_queue();
|
||||||
|
auto qid4 = multiqueue.add_queue();
|
||||||
|
auto pop_blocking_func = [&multiqueue](bool* success) {
|
||||||
|
int number = 0, count = 0;
|
||||||
|
while (multiqueue.wait_pop(&number)) {
|
||||||
|
TESTASSERT(number == count++);
|
||||||
|
}
|
||||||
|
*success = true;
|
||||||
|
};
|
||||||
|
|
||||||
|
bool t1_success = false;
|
||||||
|
std::thread t1(pop_blocking_func, &t1_success);
|
||||||
|
|
||||||
|
for (int i = 0; i < 1000; ++i) {
|
||||||
|
switch (i % 3) {
|
||||||
|
case 0:
|
||||||
|
qid1.push(i);
|
||||||
|
break;
|
||||||
|
case 1:
|
||||||
|
qid2.push(i);
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
qid4.push(i);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
usleep(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should be able to unlock all
|
||||||
|
multiqueue.stop();
|
||||||
TESTASSERT(multiqueue.nof_queues() == 0);
|
TESTASSERT(multiqueue.nof_queues() == 0);
|
||||||
TESTASSERT(not qid1.active());
|
TESTASSERT(not qid1.active());
|
||||||
t1.join();
|
t1.join();
|
||||||
|
@ -417,6 +470,7 @@ int main()
|
||||||
TESTASSERT(test_multiqueue_threading() == 0);
|
TESTASSERT(test_multiqueue_threading() == 0);
|
||||||
TESTASSERT(test_multiqueue_threading2() == 0);
|
TESTASSERT(test_multiqueue_threading2() == 0);
|
||||||
TESTASSERT(test_multiqueue_threading3() == 0);
|
TESTASSERT(test_multiqueue_threading3() == 0);
|
||||||
|
TESTASSERT(test_multiqueue_threading4() == 0);
|
||||||
|
|
||||||
TESTASSERT(test_task_thread_pool() == 0);
|
TESTASSERT(test_task_thread_pool() == 0);
|
||||||
TESTASSERT(test_task_thread_pool2() == 0);
|
TESTASSERT(test_task_thread_pool2() == 0);
|
||||||
|
|
|
@ -121,7 +121,7 @@ private:
|
||||||
|
|
||||||
// task handling
|
// task handling
|
||||||
srsran::task_scheduler task_sched;
|
srsran::task_scheduler task_sched;
|
||||||
srsran::task_queue_handle enb_task_queue, gtpu_task_queue, mme_task_queue, sync_task_queue;
|
srsran::task_queue_handle enb_task_queue, sync_task_queue;
|
||||||
|
|
||||||
srsenb::mac mac;
|
srsenb::mac mac;
|
||||||
srsenb::rlc rlc;
|
srsenb::rlc rlc;
|
||||||
|
|
|
@ -41,9 +41,7 @@ enb_stack_lte::enb_stack_lte(srslog::sink& log_sink) :
|
||||||
pending_stack_metrics(64)
|
pending_stack_metrics(64)
|
||||||
{
|
{
|
||||||
get_background_workers().set_nof_workers(2);
|
get_background_workers().set_nof_workers(2);
|
||||||
enb_task_queue = task_sched.make_task_queue();
|
enb_task_queue = task_sched.make_task_queue();
|
||||||
mme_task_queue = task_sched.make_task_queue();
|
|
||||||
gtpu_task_queue = task_sched.make_task_queue();
|
|
||||||
// sync_queue is added in init()
|
// sync_queue is added in init()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue