fixed and simplified multiqueue task api to avoid dangling pointers.

This commit is contained in:
Francisco Paisana 2019-11-28 10:56:01 +00:00
parent 6746c5dfa5
commit 54992e72f1
5 changed files with 27 additions and 32 deletions

View File

@ -230,36 +230,33 @@ private:
* Specialization for tasks with content that is move-only * Specialization for tasks with content that is move-only
**********************************************************/ **********************************************************/
template <typename Capture> class move_task_t
class moveable_task_t
{ {
public: public:
moveable_task_t() = default; move_task_t() = default;
template <typename Func> template <typename Func>
moveable_task_t(Func&& f) : func(std::forward<Func>(f)) move_task_t(Func&& f) : task_ptr(new derived_task<Func>(std::forward<Func>(f)))
{ {
} }
template <typename Func> void operator()() { (*task_ptr)(); }
moveable_task_t(Func&& f, Capture&& c) : capture(std::forward<Capture>(c))
{
std::function<void(Capture)> ftmp{std::forward<Func>(f)};
func = [this, ftmp]() { ftmp(std::move(capture)); };
}
void operator()() { func(); }
private: private:
std::function<void()> func; struct base_task {
Capture capture; virtual void operator()() = 0;
};
template <typename Func>
struct derived_task : public base_task {
derived_task(Func&& f_) : f(std::forward<Func>(f_)) {}
void operator()() final { f(); }
private:
Func f;
};
std::unique_ptr<base_task> task_ptr;
}; };
template <typename Func, typename Capture> using multiqueue_task_handler = multiqueue_handler<move_task_t>;
moveable_task_t<Capture> bind_task(Func&& f, Capture&& c)
{
return moveable_task_t<Capture>{std::forward<Func>(f), std::forward<Capture>(c)};
}
template <typename Capture>
using multiqueue_task_handler = multiqueue_handler<moveable_task_t<Capture> >;
} // namespace srslte } // namespace srslte

View File

@ -146,8 +146,7 @@ private:
phy_interface_stack_lte* phy = nullptr; phy_interface_stack_lte* phy = nullptr;
// state // state
bool started = false; bool started = false;
using task_t = srslte::moveable_task_t<srslte::unique_byte_buffer_t>;
srslte::multiqueue_task_handler pending_tasks; srslte::multiqueue_task_handler pending_tasks;
int enb_queue_id = -1, sync_queue_id = -1, mme_queue_id = -1, gtpu_queue_id = -1, mac_queue_id = -1; int enb_queue_id = -1, sync_queue_id = -1, mme_queue_id = -1, gtpu_queue_id = -1, mac_queue_id = -1;
}; };

View File

@ -210,7 +210,7 @@ bool enb_stack_lte::get_metrics(stack_metrics_t* metrics)
void enb_stack_lte::run_thread() void enb_stack_lte::run_thread()
{ {
while (started) { while (started) {
task_t task{}; srslte::move_task_t task{};
if (pending_tasks.wait_pop(&task) >= 0) { if (pending_tasks.wait_pop(&task) >= 0) {
task(); task();
} }
@ -223,11 +223,11 @@ void enb_stack_lte::handle_mme_rx_packet(srslte::unique_byte_buffer_t pdu,
int flags) int flags)
{ {
// Defer the handling of MME packet to eNB stack main thread // Defer the handling of MME packet to eNB stack main thread
auto task_handler = [this, from, sri, flags](srslte::unique_byte_buffer_t t) { auto task_handler = [this, from, sri, flags](srslte::unique_byte_buffer_t& t) {
s1ap.handle_mme_rx_msg(std::move(t), from, sri, flags); s1ap.handle_mme_rx_msg(std::move(t), from, sri, flags);
}; };
// Defer the handling of MME packet to main stack thread // Defer the handling of MME packet to main stack thread
pending_tasks.push(mme_queue_id, srslte::bind_task(task_handler, std::move(pdu))); pending_tasks.push(mme_queue_id, std::bind(task_handler, std::move(pdu)));
} }
void enb_stack_lte::add_mme_socket(int fd) void enb_stack_lte::add_mme_socket(int fd)

View File

@ -159,9 +159,8 @@ private:
gw_interface_stack* gw = nullptr; gw_interface_stack* gw = nullptr;
// Thread // Thread
static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD static const int STACK_MAIN_THREAD_PRIO = -1; // Use default high-priority below UHD
using task_t = srslte::moveable_task_t<srslte::unique_byte_buffer_t>; srslte::multiqueue_task_handler pending_tasks;
srslte::multiqueue_task_handler<srslte::unique_byte_buffer_t> pending_tasks;
int sync_queue_id = -1, ue_queue_id = -1, gw_queue_id = -1, mac_queue_id = -1, background_queue_id = -1; int sync_queue_id = -1, ue_queue_id = -1, gw_queue_id = -1, mac_queue_id = -1, background_queue_id = -1;
srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks srslte::task_thread_pool background_tasks; ///< Thread pool used for long, low-priority tasks
}; };

View File

@ -219,7 +219,7 @@ bool ue_stack_lte::get_metrics(stack_metrics_t* metrics)
void ue_stack_lte::run_thread() void ue_stack_lte::run_thread()
{ {
while (running) { while (running) {
task_t task{}; srslte::move_task_t task{};
if (pending_tasks.wait_pop(&task) >= 0) { if (pending_tasks.wait_pop(&task) >= 0) {
task(); task();
} }
@ -242,10 +242,10 @@ void ue_stack_lte::run_thread()
*/ */
void ue_stack_lte::write_sdu(uint32_t lcid, srslte::unique_byte_buffer_t sdu, bool blocking) void ue_stack_lte::write_sdu(uint32_t lcid, srslte::unique_byte_buffer_t sdu, bool blocking)
{ {
auto task = [this, lcid, blocking](srslte::unique_byte_buffer_t sdu) { auto task = [this, lcid, blocking](srslte::unique_byte_buffer_t& sdu) {
pdcp.write_sdu(lcid, std::move(sdu), blocking); pdcp.write_sdu(lcid, std::move(sdu), blocking);
}; };
bool ret = pending_tasks.try_push(gw_queue_id, srslte::bind_task(task, std::move(sdu))).first; bool ret = pending_tasks.try_push(gw_queue_id, std::bind(task, std::move(sdu))).first;
if (not ret) { if (not ret) {
pdcp_log.warning("GW SDU with lcid=%d was discarded.\n", lcid); pdcp_log.warning("GW SDU with lcid=%d was discarded.\n", lcid);
} }