created a queue-based thread pool. Tasks are inserted into a queue and then popped by the thread pool workers to be processed.

This commit is contained in:
Francisco Paisana 2019-09-20 15:43:26 +01:00 committed by Andre Puschmann
parent efdec15964
commit c413fadea9
4 changed files with 275 additions and 8 deletions

View File

@ -29,10 +29,15 @@
#ifndef SRSLTE_THREAD_POOL_H
#define SRSLTE_THREAD_POOL_H
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <stack>
#include <stdint.h>
#include <string>
#include <vector>
#include <stack>
#include "srslte/common/threads.h"
@ -94,8 +99,50 @@ private:
std::vector<worker_status> status;
std::vector<pthread_cond_t> cvar;
std::vector<pthread_mutex_t> mutex;
std::stack<worker*> available_workers;
std::stack<worker*> available_workers;
};
}
class task_thread_pool
{
using task_t = std::function<void(uint32_t worker_id)>;
public:
task_thread_pool(uint32_t nof_workers);
~task_thread_pool();
void start(int32_t prio = -1, uint32_t mask = 255);
void stop();
void push_task(const task_t& task);
void push_task(task_t&& task);
uint32_t nof_pending_tasks();
private:
class worker_t : public thread
{
public:
explicit worker_t(task_thread_pool* parent_, uint32_t id);
void setup(int32_t prio, uint32_t mask);
bool is_running() const { return running; }
uint32_t id() const { return id_; }
void run_thread() override;
private:
bool wait_task(task_t* task);
task_thread_pool* parent = nullptr;
uint32_t id_ = 0;
bool running = false;
};
std::queue<task_t> pending_tasks;
std::vector<worker_t> workers;
std::mutex queue_mutex;
std::condition_variable cv_empty, cv_exit;
bool running;
uint32_t nof_workers_running = 0;
};
} // namespace srslte
#endif // SRSLTE_THREAD_POOL_H

View File

@ -50,9 +50,11 @@ class thread
{
public:
thread(const std::string& name_) : _thread(0), name(name_) {}
bool start(int prio = -1) {
return threads_new_rt_prio(&_thread, thread_function_entry, this, prio);
}
thread(const thread&) = delete;
thread(thread&&) noexcept = default;
thread& operator=(const thread&) = delete;
thread& operator=(thread&&) noexcept = default;
bool start(int prio = -1) { return threads_new_rt_prio(&_thread, thread_function_entry, this, prio); }
bool start_cpu(int prio, int cpu) {
return threads_new_rt_cpu(&_thread, thread_function_entry, this, cpu, prio);
}

View File

@ -280,6 +280,129 @@ uint32_t thread_pool::get_nof_workers()
return nof_workers;
}
/**************************************************************************
* task_thread_pool - uses a queue to enqueue callables, that start
* once a worker is available
*************************************************************************/
task_thread_pool::task_thread_pool(uint32_t nof_workers) : running(false)
{
workers.reserve(nof_workers);
for (uint32_t i = 0; i < nof_workers; ++i) {
workers.emplace_back(this, i);
}
}
task_thread_pool::~task_thread_pool()
{
stop();
}
void task_thread_pool::start(int32_t prio, uint32_t mask)
{
std::lock_guard<std::mutex> lock(queue_mutex);
running = true;
for (worker_t& w : workers) {
w.setup(prio, mask);
}
}
void task_thread_pool::stop()
{
std::unique_lock<std::mutex> lock(queue_mutex);
running = false;
do {
nof_workers_running = 0;
// next worker that is still running
for (worker_t& w : workers) {
if (w.is_running()) {
nof_workers_running++;
}
}
if (nof_workers_running > 0) {
lock.unlock();
cv_empty.notify_all();
lock.lock();
cv_exit.wait(lock);
}
} while (nof_workers_running > 0);
}
void task_thread_pool::push_task(const task_t& task)
{
{
std::lock_guard<std::mutex> lock(queue_mutex);
pending_tasks.push(task);
}
cv_empty.notify_one();
}
void task_thread_pool::push_task(task_t&& task)
{
{
std::lock_guard<std::mutex> lock(queue_mutex);
pending_tasks.push(std::move(task));
}
cv_empty.notify_one();
}
uint32_t task_thread_pool::nof_pending_tasks()
{
std::lock_guard<std::mutex> lock(queue_mutex);
return pending_tasks.size();
}
task_thread_pool::worker_t::worker_t(srslte::task_thread_pool* parent_, uint32_t my_id) :
parent(parent_),
thread("TASKWORKER"),
id_(my_id)
{
set_name(std::string("TASKWORKER") + std::to_string(my_id));
}
void task_thread_pool::worker_t::setup(int32_t prio, uint32_t mask)
{
if (mask == 255) {
start(prio);
} else {
start_cpu_mask(prio, mask);
}
}
bool task_thread_pool::worker_t::wait_task(task_t* task)
{
std::unique_lock<std::mutex> lock(parent->queue_mutex);
while (parent->running and parent->pending_tasks.empty()) {
parent->cv_empty.wait(lock);
}
if (not parent->running) {
return false;
}
if (task) {
*task = std::move(parent->pending_tasks.front());
}
parent->pending_tasks.pop();
return true;
}
void task_thread_pool::worker_t::run_thread()
{
running = true;
// main loop
task_t task;
while (wait_task(&task)) {
task(id());
}
// on exit, notify pool class
std::unique_lock<std::mutex> lock(parent->queue_mutex);
running = false;
parent->nof_workers_running--;
if (parent->nof_workers_running == 0) {
lock.unlock();
parent->cv_exit.notify_one();
}
}
} // namespace srslte

View File

@ -19,8 +19,9 @@
*
*/
#include "srslte/common/multiqueue.h"
#include "srslte/common/thread_pool.h"
#include <iostream>
#include <srslte/common/multiqueue.h>
#include <thread>
#include <unistd.h>
@ -214,10 +215,104 @@ int test_multiqueue_threading3()
return 0;
}
int test_task_thread_pool()
{
std::cout << "\n====== TEST task thread pool test 1: start ======\n";
// Description: check whether the tasks are successfully distributed between workers
uint32_t nof_workers = 4, nof_runs = 10000;
std::vector<int> count_worker(nof_workers, 0);
std::vector<std::mutex> count_mutex(nof_workers);
task_thread_pool thread_pool(nof_workers);
thread_pool.start();
auto task = [&count_worker, &count_mutex](uint32_t worker_id) {
std::lock_guard<std::mutex> lock(count_mutex[worker_id]);
// std::cout << "hello world from worker " << worker_id << std::endl;
count_worker[worker_id]++;
};
for (uint32_t i = 0; i < nof_runs; ++i) {
thread_pool.push_task(task);
}
// wait for all tasks to be successfully processed
while (thread_pool.nof_pending_tasks() > 0) {
usleep(100);
}
thread_pool.stop();
uint32_t total_count = 0;
for (uint32_t i = 0; i < nof_workers; ++i) {
if (count_worker[i] < 10) {
printf("the number of tasks %d assigned to worker %d is too low\n", count_worker[i], i);
return -1;
}
total_count += count_worker[i];
printf("worker %d: %d runs\n", i, count_worker[i]);
}
if (total_count != nof_runs) {
printf("Number of task runs=%d does not match total=%d\n", total_count, nof_runs);
return -1;
}
std::cout << "outcome: Success\n";
std::cout << "===================================================\n";
return 0;
}
int test_task_thread_pool2()
{
std::cout << "\n====== TEST task thread pool test 2: start ======\n";
// Description: push a very long task to all workers, and call thread_pool.stop() to check if it waits for the tasks
// to be completed, and does not get stuck.
uint32_t nof_workers = 4;
uint8_t workers_started = 0, workers_finished = 0;
std::mutex mut;
task_thread_pool thread_pool(nof_workers);
thread_pool.start();
auto task = [&workers_started, &workers_finished, &mut](uint32_t worker_id) {
{
std::lock_guard<std::mutex> lock(mut);
workers_started++;
}
sleep(1);
std::lock_guard<std::mutex> lock(mut);
std::cout << "worker " << worker_id << " has finished\n";
workers_finished++;
};
for (uint32_t i = 0; i < nof_workers; ++i) {
thread_pool.push_task(task);
}
while (workers_started != nof_workers) {
usleep(10);
}
std::cout << "stopping thread pool...\n";
thread_pool.stop();
std::cout << "thread pool stopped.\n";
TESTASSERT(workers_finished == nof_workers);
std::cout << "outcome: Success\n";
std::cout << "===================================================\n";
return 0;
}
int main()
{
TESTASSERT(test_multiqueue() == 0);
TESTASSERT(test_multiqueue_threading() == 0);
TESTASSERT(test_multiqueue_threading2() == 0);
TESTASSERT(test_multiqueue_threading3() == 0);
TESTASSERT(test_task_thread_pool() == 0);
TESTASSERT(test_task_thread_pool2() == 0);
}