mirror of https://github.com/PentHertz/srsLTE.git
Replace the queue in thread pool in favour of a static circular buffer to avoid allocations.
This commit is contained in:
parent
138230f4e4
commit
e1752c0878
|
@ -20,13 +20,13 @@
|
||||||
#ifndef SRSRAN_THREAD_POOL_H
|
#ifndef SRSRAN_THREAD_POOL_H
|
||||||
#define SRSRAN_THREAD_POOL_H
|
#define SRSRAN_THREAD_POOL_H
|
||||||
|
|
||||||
|
#include "srsran/adt/circular_buffer.h"
|
||||||
#include "srsran/adt/move_callback.h"
|
#include "srsran/adt/move_callback.h"
|
||||||
#include "srsran/srslog/srslog.h"
|
#include "srsran/srslog/srslog.h"
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <queue>
|
|
||||||
#include <stack>
|
#include <stack>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
@ -90,6 +90,8 @@ private:
|
||||||
class task_thread_pool
|
class task_thread_pool
|
||||||
{
|
{
|
||||||
using task_t = srsran::move_callback<void(), default_move_callback_buffer_size, true>;
|
using task_t = srsran::move_callback<void(), default_move_callback_buffer_size, true>;
|
||||||
|
static constexpr uint32_t max_task_shift = 14;
|
||||||
|
static constexpr uint32_t max_task_num = 1u << max_task_shift;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
task_thread_pool(uint32_t nof_workers = 1, bool start_deferred = false, int32_t prio_ = -1, uint32_t mask_ = 255);
|
task_thread_pool(uint32_t nof_workers = 1, bool start_deferred = false, int32_t prio_ = -1, uint32_t mask_ = 255);
|
||||||
|
@ -130,7 +132,7 @@ private:
|
||||||
uint32_t mask = 255;
|
uint32_t mask = 255;
|
||||||
srslog::basic_logger& logger;
|
srslog::basic_logger& logger;
|
||||||
|
|
||||||
std::queue<task_t> pending_tasks;
|
srsran::dyn_circular_buffer<task_t> pending_tasks;
|
||||||
std::vector<std::unique_ptr<worker_t> > workers;
|
std::vector<std::unique_ptr<worker_t> > workers;
|
||||||
mutable std::mutex queue_mutex;
|
mutable std::mutex queue_mutex;
|
||||||
std::condition_variable cv_empty;
|
std::condition_variable cv_empty;
|
||||||
|
|
|
@ -253,7 +253,7 @@ uint32_t thread_pool::get_nof_workers()
|
||||||
*************************************************************************/
|
*************************************************************************/
|
||||||
|
|
||||||
task_thread_pool::task_thread_pool(uint32_t nof_workers, bool start_deferred, int32_t prio_, uint32_t mask_) :
|
task_thread_pool::task_thread_pool(uint32_t nof_workers, bool start_deferred, int32_t prio_, uint32_t mask_) :
|
||||||
logger(srslog::fetch_basic_logger("POOL")), workers(std::max(1u, nof_workers))
|
logger(srslog::fetch_basic_logger("POOL")), pending_tasks(max_task_num), workers(std::max(1u, nof_workers))
|
||||||
{
|
{
|
||||||
if (not start_deferred) {
|
if (not start_deferred) {
|
||||||
start(prio_, mask_);
|
start(prio_, mask_);
|
||||||
|
@ -322,6 +322,10 @@ void task_thread_pool::push_task(task_t&& task)
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(queue_mutex);
|
std::lock_guard<std::mutex> lock(queue_mutex);
|
||||||
|
if (pending_tasks.full()) {
|
||||||
|
logger.error("Cannot push anymore tasks into the queue, maximum size is %u", uint32_t(max_task_num));
|
||||||
|
return;
|
||||||
|
}
|
||||||
pending_tasks.push(std::move(task));
|
pending_tasks.push(std::move(task));
|
||||||
}
|
}
|
||||||
cv_empty.notify_one();
|
cv_empty.notify_one();
|
||||||
|
@ -358,7 +362,7 @@ bool task_thread_pool::worker_t::wait_task(task_t* task)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (task) {
|
if (task) {
|
||||||
*task = std::move(parent->pending_tasks.front());
|
*task = std::move(parent->pending_tasks.top());
|
||||||
}
|
}
|
||||||
parent->pending_tasks.pop();
|
parent->pending_tasks.pop();
|
||||||
return true;
|
return true;
|
||||||
|
|
Loading…
Reference in New Issue