mirror of https://github.com/PentHertz/srsLTE.git
change interface to create multiqueue queues
This commit is contained in:
parent
f705735093
commit
2d6a878826
|
@ -47,7 +47,9 @@ class multiqueue_handler
|
|||
class input_port_impl
|
||||
{
|
||||
public:
|
||||
input_port_impl(uint32_t cap, multiqueue_handler<myobj>* parent_) : buffer(cap), parent(parent_) {}
|
||||
input_port_impl(uint32_t cap, bool notify_flag_, multiqueue_handler<myobj>* parent_) :
|
||||
buffer(cap), notify_flag(notify_flag_), consumer_notify_needed(notify_flag_), parent(parent_)
|
||||
{}
|
||||
input_port_impl(const input_port_impl&) = delete;
|
||||
input_port_impl(input_port_impl&&) = delete;
|
||||
input_port_impl& operator=(const input_port_impl&) = delete;
|
||||
|
@ -55,6 +57,7 @@ class multiqueue_handler
|
|||
~input_port_impl() { deactivate_blocking(); }
|
||||
|
||||
size_t capacity() const { return buffer.max_size(); }
|
||||
bool get_notify_mode() const { return notify_flag; }
|
||||
size_t size() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(q_mutex);
|
||||
|
@ -65,12 +68,6 @@ class multiqueue_handler
|
|||
std::lock_guard<std::mutex> lock(q_mutex);
|
||||
return active_;
|
||||
}
|
||||
|
||||
void set_notify_mode()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(q_mutex);
|
||||
notify_mode = true;
|
||||
}
|
||||
void set_active(bool val)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(q_mutex);
|
||||
|
@ -79,7 +76,7 @@ class multiqueue_handler
|
|||
return;
|
||||
}
|
||||
active_ = val;
|
||||
consumer_notify_needed = true;
|
||||
consumer_notify_needed = notify_flag;
|
||||
|
||||
if (not active_) {
|
||||
buffer.clear();
|
||||
|
@ -120,7 +117,7 @@ class multiqueue_handler
|
|||
{
|
||||
std::unique_lock<std::mutex> lock(q_mutex);
|
||||
if (buffer.empty()) {
|
||||
consumer_notify_needed = true;
|
||||
consumer_notify_needed = notify_flag;
|
||||
return false;
|
||||
}
|
||||
obj = std::move(buffer.top());
|
||||
|
@ -157,7 +154,7 @@ class multiqueue_handler
|
|||
}
|
||||
}
|
||||
buffer.push(std::forward<T>(*o));
|
||||
if (consumer_notify_needed and notify_mode) {
|
||||
if (consumer_notify_needed) {
|
||||
// Note: The consumer thread only needs to be notified and awaken when queues transition from empty to non-empty
|
||||
// To ensure that the consumer noticed that the queue was empty before a push, we store the last
|
||||
// try_pop() return in a member variable.
|
||||
|
@ -174,8 +171,8 @@ class multiqueue_handler
|
|||
srsran::dyn_circular_buffer<myobj> buffer;
|
||||
std::condition_variable cv_full, cv_exit;
|
||||
bool active_ = true;
|
||||
bool consumer_notify_needed = true;
|
||||
bool notify_mode = false;
|
||||
bool consumer_notify_needed = false;
|
||||
bool notify_flag = false;
|
||||
int nof_waiting = 0;
|
||||
};
|
||||
|
||||
|
@ -198,7 +195,6 @@ public:
|
|||
impl = nullptr;
|
||||
}
|
||||
}
|
||||
void set_notify_mode() { impl->set_notify_mode(); }
|
||||
|
||||
size_t size() { return impl->size(); }
|
||||
size_t capacity() { return impl->capacity(); }
|
||||
|
@ -249,20 +245,22 @@ public:
|
|||
* @param capacity_ The capacity of the queue.
|
||||
* @return The index of the newly created (or reused) queue within the vector of queues.
|
||||
*/
|
||||
queue_handle add_queue(uint32_t capacity_)
|
||||
queue_handle add_queue(uint32_t capacity_, bool notify_flag = false)
|
||||
{
|
||||
uint32_t qidx = 0;
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
if (not running) {
|
||||
return queue_handle();
|
||||
}
|
||||
for (; qidx < queues.size() and (queues[qidx].active() or (queues[qidx].capacity() != capacity_)); ++qidx)
|
||||
;
|
||||
while (qidx < queues.size() and (queues[qidx].active() or (queues[qidx].capacity() != capacity_) or
|
||||
(queues[qidx].get_notify_mode() == notify_flag))) {
|
||||
++qidx;
|
||||
}
|
||||
|
||||
// check if there is a free queue of the required size
|
||||
if (qidx == queues.size()) {
|
||||
// create new queue
|
||||
queues.emplace_back(capacity_, this);
|
||||
queues.emplace_back(capacity_, notify_flag, this);
|
||||
qidx = queues.size() - 1; // update qidx to the last element
|
||||
} else {
|
||||
queues[qidx].set_active(true);
|
||||
|
@ -274,7 +272,7 @@ public:
|
|||
* Add queue using the default capacity of the underlying multiqueue
|
||||
* @return The queue index
|
||||
*/
|
||||
queue_handle add_queue() { return add_queue(default_capacity); }
|
||||
queue_handle add_queue(bool notify_flag) { return add_queue(default_capacity, notify_flag); }
|
||||
|
||||
uint32_t nof_queues() const
|
||||
{
|
||||
|
|
|
@ -26,7 +26,7 @@ public:
|
|||
explicit task_scheduler(uint32_t default_extern_tasks_size = 512, uint32_t nof_timers_prealloc = 100) :
|
||||
external_tasks{default_extern_tasks_size}, timers{nof_timers_prealloc}
|
||||
{
|
||||
background_queue = external_tasks.add_queue();
|
||||
background_queue = external_tasks.add_queue(false);
|
||||
}
|
||||
task_scheduler(const task_scheduler&) = delete;
|
||||
task_scheduler(task_scheduler&&) = delete;
|
||||
|
@ -38,8 +38,11 @@ public:
|
|||
srsran::unique_timer get_unique_timer() { return timers.get_unique_timer(); }
|
||||
|
||||
//! Creates new queue for tasks coming from external thread
|
||||
srsran::task_queue_handle make_task_queue() { return external_tasks.add_queue(); }
|
||||
srsran::task_queue_handle make_task_queue(uint32_t qsize) { return external_tasks.add_queue(qsize); }
|
||||
srsran::task_queue_handle make_task_queue(bool notify_mode) { return external_tasks.add_queue(notify_mode); }
|
||||
srsran::task_queue_handle make_task_queue(uint32_t qsize, bool notify_mode)
|
||||
{
|
||||
return external_tasks.add_queue(qsize, notify_mode);
|
||||
}
|
||||
|
||||
//! Delays a task processing by duration_ms
|
||||
template <typename F>
|
||||
|
@ -124,7 +127,7 @@ public:
|
|||
sched->defer_callback(duration_ms, std::forward<F>(func));
|
||||
}
|
||||
void defer_task(srsran::move_task_t func) { sched->defer_task(std::move(func)); }
|
||||
srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(); }
|
||||
srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(false); }
|
||||
|
||||
private:
|
||||
task_scheduler* sched;
|
||||
|
@ -141,7 +144,7 @@ public:
|
|||
{
|
||||
sched->notify_background_task_result(std::move(task));
|
||||
}
|
||||
srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(); }
|
||||
srsran::task_queue_handle make_task_queue() { return sched->make_task_queue(false); }
|
||||
template <typename F>
|
||||
void defer_callback(uint32_t duration_ms, F&& func)
|
||||
{
|
||||
|
|
|
@ -32,7 +32,7 @@ int test_multiqueue()
|
|||
TESTASSERT(multiqueue.nof_queues() == 0);
|
||||
|
||||
// test push/pop and size for one queue
|
||||
queue_handle<int> qid1 = multiqueue.add_queue();
|
||||
queue_handle<int> qid1 = multiqueue.add_queue(true);
|
||||
TESTASSERT(qid1.active());
|
||||
TESTASSERT(qid1.size() == 0 and qid1.empty());
|
||||
TESTASSERT(multiqueue.nof_queues() == 1);
|
||||
|
@ -45,7 +45,7 @@ int test_multiqueue()
|
|||
TESTASSERT(number == 2 and qid1.empty());
|
||||
|
||||
// test push/pop and size for two queues
|
||||
queue_handle<int> qid2 = multiqueue.add_queue();
|
||||
queue_handle<int> qid2 = multiqueue.add_queue(true);
|
||||
TESTASSERT(qid2.active());
|
||||
TESTASSERT(multiqueue.nof_queues() == 2 and qid1.active());
|
||||
TESTASSERT(qid2.try_push(3).has_value());
|
||||
|
@ -55,7 +55,7 @@ int test_multiqueue()
|
|||
// check if erasing a queue breaks anything
|
||||
qid1.reset();
|
||||
TESTASSERT(multiqueue.nof_queues() == 1 and not qid1.active());
|
||||
qid1 = multiqueue.add_queue();
|
||||
qid1 = multiqueue.add_queue(true);
|
||||
TESTASSERT(qid1.empty() and qid1.active());
|
||||
TESTASSERT(qid2.size() == 1 and not qid2.empty());
|
||||
multiqueue.wait_pop(&number);
|
||||
|
@ -89,15 +89,15 @@ int test_multiqueue()
|
|||
|
||||
// check that adding a queue of different capacity works
|
||||
{
|
||||
qid1 = multiqueue.add_queue();
|
||||
qid2 = multiqueue.add_queue();
|
||||
qid1 = multiqueue.add_queue(true);
|
||||
qid2 = multiqueue.add_queue(true);
|
||||
|
||||
// remove first queue again
|
||||
qid1.reset();
|
||||
TESTASSERT(multiqueue.nof_queues() == 1);
|
||||
|
||||
// add queue with non-default capacity
|
||||
auto qid3 = multiqueue.add_queue(10);
|
||||
auto qid3 = multiqueue.add_queue(10, true);
|
||||
TESTASSERT(qid3.capacity() == 10);
|
||||
|
||||
// make sure neither a new queue index is returned
|
||||
|
@ -117,7 +117,7 @@ int test_multiqueue_threading()
|
|||
|
||||
int capacity = 4, number = 0, start_number = 2, nof_pushes = capacity + 1;
|
||||
multiqueue_handler<int> multiqueue(capacity);
|
||||
auto qid1 = multiqueue.add_queue();
|
||||
auto qid1 = multiqueue.add_queue(true);
|
||||
auto push_blocking_func = [](queue_handle<int>* qid, int start_value, int nof_pushes, bool* is_running) {
|
||||
for (int i = 0; i < nof_pushes; ++i) {
|
||||
qid->push(start_value + i);
|
||||
|
@ -165,7 +165,7 @@ int test_multiqueue_threading2()
|
|||
|
||||
int capacity = 4, start_number = 2, nof_pushes = capacity + 1;
|
||||
multiqueue_handler<int> multiqueue(capacity);
|
||||
auto qid1 = multiqueue.add_queue();
|
||||
auto qid1 = multiqueue.add_queue(true);
|
||||
auto push_blocking_func = [](queue_handle<int>* qid, int start_value, int nof_pushes, bool* is_running) {
|
||||
for (int i = 0; i < nof_pushes; ++i) {
|
||||
qid->push(start_value + i);
|
||||
|
@ -199,7 +199,7 @@ int test_multiqueue_threading3()
|
|||
|
||||
int capacity = 4;
|
||||
multiqueue_handler<int> multiqueue(capacity);
|
||||
auto qid1 = multiqueue.add_queue();
|
||||
auto qid1 = multiqueue.add_queue(true);
|
||||
auto pop_blocking_func = [&multiqueue](bool* success) {
|
||||
int number = 0;
|
||||
bool ret = multiqueue.wait_pop(&number);
|
||||
|
@ -235,10 +235,10 @@ int test_multiqueue_threading4()
|
|||
|
||||
int capacity = 4;
|
||||
multiqueue_handler<int> multiqueue(capacity);
|
||||
auto qid1 = multiqueue.add_queue();
|
||||
auto qid2 = multiqueue.add_queue();
|
||||
auto qid3 = multiqueue.add_queue();
|
||||
auto qid4 = multiqueue.add_queue();
|
||||
auto qid1 = multiqueue.add_queue(true);
|
||||
auto qid2 = multiqueue.add_queue(true);
|
||||
auto qid3 = multiqueue.add_queue(true);
|
||||
auto qid4 = multiqueue.add_queue(true);
|
||||
std::mutex mutex;
|
||||
int last_number = -1;
|
||||
auto pop_blocking_func = [&multiqueue, &last_number, &mutex](bool* success) {
|
||||
|
|
|
@ -23,7 +23,7 @@ struct rx_thread_tester {
|
|||
std::thread t;
|
||||
|
||||
rx_thread_tester() :
|
||||
task_queue(task_sched.make_task_queue()),
|
||||
task_queue(task_sched.make_task_queue(true)),
|
||||
t([this]() {
|
||||
stop_token.store(false);
|
||||
while (not stop_token.load(std::memory_order_relaxed)) {
|
||||
|
|
|
@ -41,9 +41,8 @@ enb_stack_lte::enb_stack_lte(srslog::sink& log_sink) :
|
|||
pending_stack_metrics(64)
|
||||
{
|
||||
get_background_workers().set_nof_workers(2);
|
||||
enb_task_queue = task_sched.make_task_queue();
|
||||
enb_task_queue.set_notify_mode();
|
||||
metrics_task_queue = task_sched.make_task_queue();
|
||||
enb_task_queue = task_sched.make_task_queue(true);
|
||||
metrics_task_queue = task_sched.make_task_queue(false);
|
||||
// sync_queue is added in init()
|
||||
}
|
||||
|
||||
|
@ -116,8 +115,7 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_)
|
|||
}
|
||||
|
||||
// add sync queue
|
||||
sync_task_queue = task_sched.make_task_queue(args.sync_queue_size);
|
||||
sync_task_queue.set_notify_mode();
|
||||
sync_task_queue = task_sched.make_task_queue(args.sync_queue_size, true);
|
||||
|
||||
// Init all layers
|
||||
if (!mac.init(args.mac, rrc_cfg.cell_list, phy, &rlc, &rrc)) {
|
||||
|
|
|
@ -26,12 +26,10 @@ gnb_stack_nr::gnb_stack_nr() : task_sched{512, 128}, thread("gNB"), rlc_logger(s
|
|||
m_gw.reset(new srsue::gw());
|
||||
// m_gtpu.reset(new srsenb::gtpu());
|
||||
|
||||
ue_task_queue = task_sched.make_task_queue();
|
||||
ue_task_queue.set_notify_mode();
|
||||
sync_task_queue = task_sched.make_task_queue();
|
||||
sync_task_queue.set_notify_mode();
|
||||
gw_task_queue = task_sched.make_task_queue();
|
||||
mac_task_queue = task_sched.make_task_queue();
|
||||
ue_task_queue = task_sched.make_task_queue(true);
|
||||
sync_task_queue = task_sched.make_task_queue(true);
|
||||
gw_task_queue = task_sched.make_task_queue(false);
|
||||
mac_task_queue = task_sched.make_task_queue(false);
|
||||
}
|
||||
|
||||
gnb_stack_nr::~gnb_stack_nr()
|
||||
|
|
|
@ -52,10 +52,9 @@ ue_stack_lte::ue_stack_lte() :
|
|||
tti_tprof("tti_tprof", "STCK", TTI_STAT_PERIOD)
|
||||
{
|
||||
get_background_workers().set_nof_workers(2);
|
||||
ue_task_queue = task_sched.make_task_queue();
|
||||
ue_task_queue.set_notify_mode();
|
||||
gw_queue_id = task_sched.make_task_queue();
|
||||
cfg_task_queue = task_sched.make_task_queue();
|
||||
ue_task_queue = task_sched.make_task_queue(true);
|
||||
gw_queue_id = task_sched.make_task_queue(false);
|
||||
cfg_task_queue = task_sched.make_task_queue(false);
|
||||
// sync_queue is added in init()
|
||||
}
|
||||
|
||||
|
@ -199,8 +198,7 @@ int ue_stack_lte::init(const stack_args_t& args_)
|
|||
}
|
||||
|
||||
// add sync queue
|
||||
sync_task_queue = task_sched.make_task_queue(args.sync_queue_size);
|
||||
sync_task_queue.set_notify_mode();
|
||||
sync_task_queue = task_sched.make_task_queue(args.sync_queue_size, true);
|
||||
|
||||
mac.init(phy, &rlc, &rrc);
|
||||
rlc.init(&pdcp, &rrc, &rrc_nr, task_sched.get_timer_handler(), 0 /* RB_ID_SRB0 */);
|
||||
|
|
|
@ -33,11 +33,9 @@ ue_stack_nr::ue_stack_nr() :
|
|||
// setup logging for pool, RLC and PDCP
|
||||
byte_buffer_pool::get_instance()->enable_logger(true);
|
||||
|
||||
ue_task_queue = task_sched.make_task_queue();
|
||||
ue_task_queue.set_notify_mode();
|
||||
sync_task_queue = task_sched.make_task_queue();
|
||||
sync_task_queue.set_notify_mode();
|
||||
gw_task_queue = task_sched.make_task_queue();
|
||||
ue_task_queue = task_sched.make_task_queue(true);
|
||||
sync_task_queue = task_sched.make_task_queue(true);
|
||||
gw_task_queue = task_sched.make_task_queue(false);
|
||||
}
|
||||
|
||||
ue_stack_nr::~ue_stack_nr()
|
||||
|
|
Loading…
Reference in New Issue