multiqueue optimization - use condition_var wait_for() method, use queue try_lock in the consumer side

This commit is contained in:
Francisco 2021-05-19 18:49:06 +01:00 committed by Ismael Gomez
parent b41aba2a03
commit 60a8ee0af9
1 changed files with 42 additions and 37 deletions

View File

@ -116,18 +116,14 @@ class multiqueue_handler
bool try_pop(myobj& obj)
{
std::unique_lock<std::mutex> lock(q_mutex);
if (buffer.empty()) {
consumer_notify_needed = notify_flag;
return false;
}
obj = std::move(buffer.top());
buffer.pop();
consumer_notify_needed = false;
if (nof_waiting > 0) {
lock.unlock();
cv_full.notify_one();
}
return true;
return pop_(lock, obj);
}
bool try_pop(myobj& obj, bool& try_lock_success)
{
std::unique_lock<std::mutex> lock(q_mutex, std::try_to_lock);
try_lock_success = lock.owns_lock();
return try_lock_success ? pop_(lock, obj) : false;
}
private:
@ -165,6 +161,22 @@ class multiqueue_handler
return true;
}
bool pop_(std::unique_lock<std::mutex>& lock, myobj& obj)
{
if (buffer.empty()) {
consumer_notify_needed = notify_flag;
return false;
}
obj = std::move(buffer.top());
buffer.pop();
consumer_notify_needed = false;
if (nof_waiting > 0) {
lock.unlock();
cv_full.notify_one();
}
return true;
}
multiqueue_handler<myobj>* parent = nullptr;
mutable std::mutex q_mutex;
@ -229,8 +241,7 @@ public:
// signal deactivation to pushing threads in a non-blocking way
q.set_active(false);
}
while (wait_state) {
pushed_data = true;
while (consumer_state) {
cv_empty.notify_one();
cv_exit.wait(lock);
}
@ -287,17 +298,16 @@ public:
bool wait_pop(myobj* value)
{
std::unique_lock<std::mutex> lock(mutex);
consumer_state = true;
while (running) {
if (round_robin_pop_(value)) {
consumer_state = false;
return true;
}
pushed_data = false;
wait_state = true;
while (not pushed_data) {
cv_empty.wait(lock);
}
wait_state = false;
cv_empty.wait_for(lock, std::chrono::microseconds(100));
}
consumer_state = false;
lock.unlock();
cv_exit.notify_one();
return false;
}
@ -312,36 +322,31 @@ private:
bool round_robin_pop_(myobj* value)
{
// Round-robin for all queues
auto it = queues.begin() + spin_idx;
auto q_it = queues.begin() + spin_idx;
uint32_t count = 0;
for (; count < queues.size(); ++count, ++it) {
if (it == queues.end()) {
it = queues.begin(); // wrap-around
for (; count < queues.size(); ++count, ++q_it) {
if (q_it == queues.end()) {
q_it = queues.begin(); // wrap-around
}
if (it->try_pop(*value)) {
bool try_lock_success = true;
if (q_it->try_pop(*value, try_lock_success)) {
spin_idx = (spin_idx + count + 1) % queues.size();
return true;
}
if (not try_lock_success) {
// restart RR search, as there was a collision with a producer
count = 0;
}
}
return false;
}
/// Called by the producer threads to signal the consumer to unlock in wait_pop
void signal_pushed_data()
{
{
std::lock_guard<std::mutex> lock(mutex);
if (pushed_data) {
return;
}
pushed_data = true;
}
cv_empty.notify_one();
}
void signal_pushed_data() { cv_empty.notify_one(); }
mutable std::mutex mutex;
std::condition_variable cv_empty, cv_exit;
uint32_t spin_idx = 0;
bool running = true, pushed_data = false, wait_state = false;
bool running = true, consumer_state = false;
std::deque<input_port_impl> queues;
uint32_t default_capacity = 0;
};