diff --git a/lib/include/srslte/common/task_scheduler.h b/lib/include/srslte/common/task_scheduler.h index fd1bd8627..a6b61df23 100644 --- a/lib/include/srslte/common/task_scheduler.h +++ b/lib/include/srslte/common/task_scheduler.h @@ -46,6 +46,8 @@ public: } } + void stop() { background_tasks.stop(); } + srslte::timer_handler::unique_timer get_unique_timer() final { return timers.get_unique_timer(); } //! Creates new queue for tasks coming from external thread @@ -127,10 +129,10 @@ private: internal_tasks.clear(); } + srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks int background_queue_id = -1; ///< Queue for handling the outcomes of tasks run in the background srslte::task_multiqueue external_tasks; srslte::timer_handler timers; - srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks std::vector internal_tasks; ///< enqueues stack tasks from within main thread. Avoids locking }; diff --git a/srsenb/hdr/stack/enb_stack_lte.h b/srsenb/hdr/stack/enb_stack_lte.h index 060ae4e38..4572cda7f 100644 --- a/srsenb/hdr/stack/enb_stack_lte.h +++ b/srsenb/hdr/stack/enb_stack_lte.h @@ -29,6 +29,7 @@ #include "mac/mac.h" #include "rrc/rrc.h" +#include "srslte/common/task_scheduler.h" #include "upper/gtpu.h" #include "upper/pdcp.h" #include "upper/rlc.h" @@ -36,7 +37,6 @@ #include "enb_stack_base.h" #include "srsenb/hdr/enb.h" -#include "srslte/common/multiqueue.h" #include "srslte/interfaces/enb_interfaces.h" #include "srslte/interfaces/enb_rrc_interface_types.h" @@ -134,7 +134,6 @@ private: rrc_cfg_t rrc_cfg = {}; // components that layers depend on (need to be destroyed after layers) - srslte::timer_handler timers; std::unique_ptr rx_sockets; srsenb::mac mac; @@ -162,11 +161,12 @@ private: phy_interface_stack_lte* phy = nullptr; // state - bool started = false; - srslte::task_multiqueue pending_tasks; - int enb_queue_id = -1, sync_queue_id = -1, mme_queue_id = -1, gtpu_queue_id = -1, mac_queue_id = -1, - stack_queue_id = -1; - std::vector deferred_stack_tasks; ///< enqueues stack tasks from within. Avoids locking + bool started = false; + + // task handling + srslte::task_scheduler task_sched; + srslte::task_multiqueue::queue_handler enb_task_queue, gtpu_task_queue, mme_task_queue, sync_task_queue; + srslte::block_queue pending_stack_metrics; }; diff --git a/srsenb/src/stack/enb_stack_lte.cc b/srsenb/src/stack/enb_stack_lte.cc index f584704a2..22ea1a4af 100644 --- a/srsenb/src/stack/enb_stack_lte.cc +++ b/srsenb/src/stack/enb_stack_lte.cc @@ -30,13 +30,14 @@ using namespace srslte; namespace srsenb { enb_stack_lte::enb_stack_lte(srslte::logger* logger_) : - timers(128), logger(logger_), pdcp(this, "PDCP"), thread("STACK") + task_sched(512, 0, 128), + logger(logger_), + pdcp(this, "PDCP"), + thread("STACK") { - enb_queue_id = pending_tasks.add_queue(); - mme_queue_id = pending_tasks.add_queue(); - gtpu_queue_id = pending_tasks.add_queue(); - mac_queue_id = pending_tasks.add_queue(); - stack_queue_id = pending_tasks.add_queue(); + enb_task_queue = task_sched.make_task_queue(); + mme_task_queue = task_sched.make_task_queue(); + gtpu_task_queue = task_sched.make_task_queue(); // sync_queue is added in init() pool = byte_buffer_pool::get_instance(); @@ -101,14 +102,14 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_) rx_sockets.reset(new srslte::rx_multisocket_handler("ENBSOCKETS", stack_log)); // add sync queue - sync_queue_id = pending_tasks.add_queue(args.sync_queue_size); + sync_task_queue = task_sched.make_task_queue(args.sync_queue_size); // Init all layers mac.init(args.mac, rrc_cfg.cell_list, phy, &rlc, &rrc, this, mac_log); - rlc.init(&pdcp, &rrc, &mac, &timers, rlc_log); + rlc.init(&pdcp, &rrc, &mac, task_sched.get_timer_handler(), rlc_log); pdcp.init(&rlc, &rrc, >pu); - rrc.init(rrc_cfg, phy, &mac, &rlc, &pdcp, &s1ap, >pu, &timers); - if (s1ap.init(args.s1ap, &rrc, &timers, this) != SRSLTE_SUCCESS) { + rrc.init(rrc_cfg, phy, &mac, &rlc, &pdcp, &s1ap, >pu, task_sched.get_timer_handler()); + if (s1ap.init(args.s1ap, &rrc, task_sched.get_timer_handler(), this) != SRSLTE_SUCCESS) { stack_log->error("Couldn't initialize S1AP\n"); return SRSLTE_ERROR; } @@ -131,23 +132,19 @@ int enb_stack_lte::init(const stack_args_t& args_, const rrc_cfg_t& rrc_cfg_) void enb_stack_lte::tti_clock() { - pending_tasks.push(sync_queue_id, [this]() { tti_clock_impl(); }); + sync_task_queue.push([this]() { tti_clock_impl(); }); } void enb_stack_lte::tti_clock_impl() { - for (auto& t : deferred_stack_tasks) { - t(); - } - deferred_stack_tasks.clear(); - timers.step_all(); + task_sched.tic(); rrc.tti_clock(); } void enb_stack_lte::stop() { if (started) { - pending_tasks.push(enb_queue_id, [this]() { stop_impl(); }); + enb_task_queue.push([this]() { stop_impl(); }); wait_thread_finish(); } } @@ -170,12 +167,7 @@ void enb_stack_lte::stop_impl() s1ap_pcap.close(); } - // erasing the queues is the last thing, bc we need them to call stop_impl() - pending_tasks.erase_queue(sync_queue_id); - pending_tasks.erase_queue(enb_queue_id); - pending_tasks.erase_queue(mme_queue_id); - pending_tasks.erase_queue(gtpu_queue_id); - pending_tasks.erase_queue(mac_queue_id); + task_sched.stop(); started = false; } @@ -183,7 +175,7 @@ void enb_stack_lte::stop_impl() bool enb_stack_lte::get_metrics(stack_metrics_t* metrics) { // use stack thread to query metrics - pending_tasks.try_push(enb_queue_id, [this]() { + auto ret = enb_task_queue.try_push([this]() { stack_metrics_t metrics{}; mac.get_metrics(metrics.mac); rrc.get_metrics(metrics.rrc); @@ -191,18 +183,18 @@ bool enb_stack_lte::get_metrics(stack_metrics_t* metrics) pending_stack_metrics.push(metrics); }); - // wait for result - *metrics = pending_stack_metrics.wait_pop(); - return true; + if (ret.first) { + // wait for result + *metrics = pending_stack_metrics.wait_pop(); + return true; + } + return false; } void enb_stack_lte::run_thread() { while (started) { - srslte::move_task_t task{}; - if (pending_tasks.wait_pop(&task) >= 0) { - task(); - } + task_sched.run_next_external_task(); } } @@ -216,7 +208,7 @@ void enb_stack_lte::handle_mme_rx_packet(srslte::unique_byte_buffer_t pdu, s1ap.handle_mme_rx_msg(std::move(t), from, sri, flags); }; // Defer the handling of MME packet to main stack thread - pending_tasks.push(mme_queue_id, std::bind(task_handler, std::move(pdu))); + mme_task_queue.push(std::bind(task_handler, std::move(pdu))); } void enb_stack_lte::add_mme_socket(int fd) @@ -240,7 +232,7 @@ void enb_stack_lte::add_gtpu_s1u_socket_handler(int fd) auto task_handler = [this, from](srslte::unique_byte_buffer_t& t) { gtpu.handle_gtpu_s1u_rx_packet(std::move(t), from); }; - pending_tasks.push(gtpu_queue_id, std::bind(task_handler, std::move(pdu))); + gtpu_task_queue.push(std::bind(task_handler, std::move(pdu))); }; rx_sockets->add_socket_pdu_handler(fd, gtpu_s1u_handler); } @@ -251,44 +243,44 @@ void enb_stack_lte::add_gtpu_m1u_socket_handler(int fd) auto task_handler = [this, from](srslte::unique_byte_buffer_t& t) { gtpu.handle_gtpu_m1u_rx_packet(std::move(t), from); }; - pending_tasks.push(gtpu_queue_id, std::bind(task_handler, std::move(pdu))); + gtpu_task_queue.push(std::bind(task_handler, std::move(pdu))); }; rx_sockets->add_socket_pdu_handler(fd, gtpu_m1u_handler); } srslte::timer_handler::unique_timer enb_stack_lte::get_unique_timer() { - return timers.get_unique_timer(); + return task_sched.get_unique_timer(); } srslte::task_multiqueue::queue_handler enb_stack_lte::make_task_queue() { - return pending_tasks.get_queue_handler(); + return task_sched.make_task_queue(); } srslte::task_multiqueue::queue_handler enb_stack_lte::make_task_queue(uint32_t qsize) { - return pending_tasks.get_queue_handler(qsize); + return task_sched.make_task_queue(qsize); } void enb_stack_lte::defer_callback(uint32_t duration_ms, std::function func) { - timers.defer_callback(duration_ms, func); + task_sched.defer_callback(duration_ms, std::move(func)); } void enb_stack_lte::enqueue_background_task(std::function task) { - task(0); + task_sched.enqueue_background_task(task); } void enb_stack_lte::notify_background_task_result(srslte::move_task_t task) { - task(); + task_sched.notify_background_task_result(std::move(task)); } void enb_stack_lte::defer_task(srslte::move_task_t task) { - deferred_stack_tasks.push_back(std::move(task)); + task_sched.defer_task(std::move(task)); } } // namespace srsenb