using task scheduler in enb stack

This commit is contained in:
Francisco Paisana 2020-07-08 19:19:21 +01:00
parent 4f5e65781f
commit e9f34c7613
3 changed files with 43 additions and 49 deletions

View File

@ -46,6 +46,8 @@ public:
}
}
void stop() { background_tasks.stop(); }
srslte::timer_handler::unique_timer get_unique_timer() final { return timers.get_unique_timer(); }
//! Creates new queue for tasks coming from external thread
@ -127,10 +129,10 @@ private:
internal_tasks.clear();
}
srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks
int background_queue_id = -1; ///< Queue for handling the outcomes of tasks run in the background
srslte::task_multiqueue external_tasks;
srslte::timer_handler timers;
srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks
std::vector<srslte::move_task_t> internal_tasks; ///< enqueues stack tasks from within main thread. Avoids locking
};

View File

@ -29,6 +29,7 @@
#include "mac/mac.h"
#include "rrc/rrc.h"
#include "srslte/common/task_scheduler.h"
#include "upper/gtpu.h"
#include "upper/pdcp.h"
#include "upper/rlc.h"
@ -36,7 +37,6 @@
#include "enb_stack_base.h"
#include "srsenb/hdr/enb.h"
#include "srslte/common/multiqueue.h"
#include "srslte/interfaces/enb_interfaces.h"
#include "srslte/interfaces/enb_rrc_interface_types.h"
@ -134,7 +134,6 @@ private:
rrc_cfg_t rrc_cfg = {};
// components that layers depend on (need to be destroyed after layers)
srslte::timer_handler timers;
std::unique_ptr<srslte::rx_multisocket_handler> rx_sockets;
srsenb::mac mac;
@ -162,11 +161,12 @@ private:
phy_interface_stack_lte* phy = nullptr;
// state
bool started = false;
srslte::task_multiqueue pending_tasks;
int enb_queue_id = -1, sync_queue_id = -1, mme_queue_id = -1, gtpu_queue_id = -1, mac_queue_id = -1,
stack_queue_id = -1;
std::vector<srslte::move_task_t> deferred_stack_tasks; ///< enqueues stack tasks from within. Avoids locking
bool started = false;
// task handling
srslte::task_scheduler task_sched;
srslte::task_multiqueue::queue_handler enb_task_queue, gtpu_task_queue, mme_task_queue, sync_task_queue;
srslte::block_queue<stack_metrics_t> pending_stack_metrics;
};

View File

@ -30,13 +30,14 @@ using namespace srslte;
namespace srsenb {
enb_stack_lte::enb_stack_lte(srslte::logger* logger_) :
timers(128), logger(logger_), pdcp(this, "PDCP"), thread("STACK")
task_sched(512, 0, 128),
logger(logger_),
pdcp(this, "PDCP"),
thread("STACK")
{
enb_queue_id = pending_tasks.add_queue();
mme_queue_id = pending_tasks.add_queue();
gtpu_queue_id = pending_tasks.add_queue();
mac_queue_id = pending_tasks.add_queue();
stack_queue_id = pending_tasks.add_queue();
enb_task_queue = task_sched.make_task_queue();
mme_task_queue = task_sched.make_task_queue();
gtpu_task_queue = task_sched.make_task_queue();
// sync_queue is added in init()
pool = byte_buffer_pool::get_instance();
@ -101,14 +102,14 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_)
rx_sockets.reset(new srslte::rx_multisocket_handler("ENBSOCKETS", stack_log));
// add sync queue
sync_queue_id = pending_tasks.add_queue(args.sync_queue_size);
sync_task_queue = task_sched.make_task_queue(args.sync_queue_size);
// Init all layers
mac.init(args.mac, rrc_cfg.cell_list, phy, &rlc, &rrc, this, mac_log);
rlc.init(&pdcp, &rrc, &mac, &timers, rlc_log);
rlc.init(&pdcp, &rrc, &mac, task_sched.get_timer_handler(), rlc_log);
pdcp.init(&rlc, &rrc, &gtpu);
rrc.init(rrc_cfg, phy, &mac, &rlc, &pdcp, &s1ap, &gtpu, &timers);
if (s1ap.init(args.s1ap, &rrc, &timers, this) != SRSLTE_SUCCESS) {
rrc.init(rrc_cfg, phy, &mac, &rlc, &pdcp, &s1ap, &gtpu, task_sched.get_timer_handler());
if (s1ap.init(args.s1ap, &rrc, task_sched.get_timer_handler(), this) != SRSLTE_SUCCESS) {
stack_log->error("Couldn't initialize S1AP\n");
return SRSLTE_ERROR;
}
@ -131,23 +132,19 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_)
void enb_stack_lte::tti_clock()
{
pending_tasks.push(sync_queue_id, [this]() { tti_clock_impl(); });
sync_task_queue.push([this]() { tti_clock_impl(); });
}
void enb_stack_lte::tti_clock_impl()
{
for (auto& t : deferred_stack_tasks) {
t();
}
deferred_stack_tasks.clear();
timers.step_all();
task_sched.tic();
rrc.tti_clock();
}
void enb_stack_lte::stop()
{
if (started) {
pending_tasks.push(enb_queue_id, [this]() { stop_impl(); });
enb_task_queue.push([this]() { stop_impl(); });
wait_thread_finish();
}
}
@ -170,12 +167,7 @@ void enb_stack_lte::stop_impl()
s1ap_pcap.close();
}
// erasing the queues is the last thing, bc we need them to call stop_impl()
pending_tasks.erase_queue(sync_queue_id);
pending_tasks.erase_queue(enb_queue_id);
pending_tasks.erase_queue(mme_queue_id);
pending_tasks.erase_queue(gtpu_queue_id);
pending_tasks.erase_queue(mac_queue_id);
task_sched.stop();
started = false;
}
@ -183,7 +175,7 @@ void enb_stack_lte::stop_impl()
bool enb_stack_lte::get_metrics(stack_metrics_t* metrics)
{
// use stack thread to query metrics
pending_tasks.try_push(enb_queue_id, [this]() {
auto ret = enb_task_queue.try_push([this]() {
stack_metrics_t metrics{};
mac.get_metrics(metrics.mac);
rrc.get_metrics(metrics.rrc);
@ -191,18 +183,18 @@ bool enb_stack_lte::get_metrics(stack_metrics_t* metrics)
pending_stack_metrics.push(metrics);
});
// wait for result
*metrics = pending_stack_metrics.wait_pop();
return true;
if (ret.first) {
// wait for result
*metrics = pending_stack_metrics.wait_pop();
return true;
}
return false;
}
void enb_stack_lte::run_thread()
{
while (started) {
srslte::move_task_t task{};
if (pending_tasks.wait_pop(&task) >= 0) {
task();
}
task_sched.run_next_external_task();
}
}
@ -216,7 +208,7 @@ void enb_stack_lte::handle_mme_rx_packet(srslte::unique_byte_buffer_t pdu,
s1ap.handle_mme_rx_msg(std::move(t), from, sri, flags);
};
// Defer the handling of MME packet to main stack thread
pending_tasks.push(mme_queue_id, std::bind(task_handler, std::move(pdu)));
mme_task_queue.push(std::bind(task_handler, std::move(pdu)));
}
void enb_stack_lte::add_mme_socket(int fd)
@ -240,7 +232,7 @@ void enb_stack_lte::add_gtpu_s1u_socket_handler(int fd)
auto task_handler = [this, from](srslte::unique_byte_buffer_t& t) {
gtpu.handle_gtpu_s1u_rx_packet(std::move(t), from);
};
pending_tasks.push(gtpu_queue_id, std::bind(task_handler, std::move(pdu)));
gtpu_task_queue.push(std::bind(task_handler, std::move(pdu)));
};
rx_sockets->add_socket_pdu_handler(fd, gtpu_s1u_handler);
}
@ -251,44 +243,44 @@ void enb_stack_lte::add_gtpu_m1u_socket_handler(int fd)
auto task_handler = [this, from](srslte::unique_byte_buffer_t& t) {
gtpu.handle_gtpu_m1u_rx_packet(std::move(t), from);
};
pending_tasks.push(gtpu_queue_id, std::bind(task_handler, std::move(pdu)));
gtpu_task_queue.push(std::bind(task_handler, std::move(pdu)));
};
rx_sockets->add_socket_pdu_handler(fd, gtpu_m1u_handler);
}
srslte::timer_handler::unique_timer enb_stack_lte::get_unique_timer()
{
return timers.get_unique_timer();
return task_sched.get_unique_timer();
}
srslte::task_multiqueue::queue_handler enb_stack_lte::make_task_queue()
{
return pending_tasks.get_queue_handler();
return task_sched.make_task_queue();
}
srslte::task_multiqueue::queue_handler enb_stack_lte::make_task_queue(uint32_t qsize)
{
return pending_tasks.get_queue_handler(qsize);
return task_sched.make_task_queue(qsize);
}
void enb_stack_lte::defer_callback(uint32_t duration_ms, std::function<void()> func)
{
timers.defer_callback(duration_ms, func);
task_sched.defer_callback(duration_ms, std::move(func));
}
void enb_stack_lte::enqueue_background_task(std::function<void(uint32_t)> task)
{
task(0);
task_sched.enqueue_background_task(task);
}
void enb_stack_lte::notify_background_task_result(srslte::move_task_t task)
{
task();
task_sched.notify_background_task_result(std::move(task));
}
void enb_stack_lte::defer_task(srslte::move_task_t task)
{
deferred_stack_tasks.push_back(std::move(task));
task_sched.defer_task(std::move(task));
}
} // namespace srsenb