adt lib additions - add the ability to perform timedwait for popping from a blocking queue

This commit is contained in:
Francisco 2021-03-08 12:35:47 +00:00 committed by Francisco Paisana
parent f0ed1e06a8
commit 0b6293c676
1 changed files with 10 additions and 5 deletions

View File

@ -189,7 +189,6 @@ public:
base_blocking_queue(PushingFunc push_func_, PoppingFunc pop_func_, Args&&... args) : base_blocking_queue(PushingFunc push_func_, PoppingFunc pop_func_, Args&&... args) :
circ_buffer(std::forward<Args>(args)...), push_func(push_func_), pop_func(pop_func_) circ_buffer(std::forward<Args>(args)...), push_func(push_func_), pop_func(pop_func_)
{} {}
~base_blocking_queue() { stop(); }
void stop() void stop()
{ {
@ -220,6 +219,7 @@ public:
pop_(obj, true); pop_(obj, true);
return obj; return obj;
} }
bool pop_wait_for(T& obj, const std::chrono::microseconds& duration) { return pop_(obj, true, &duration); }
void clear() void clear()
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
@ -273,6 +273,8 @@ protected:
PoppingFunc pop_func; PoppingFunc pop_func;
CircBuffer circ_buffer; CircBuffer circ_buffer;
~base_blocking_queue() { stop(); }
bool push_(const T& t, bool block_mode) bool push_(const T& t, bool block_mode)
{ {
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
@ -324,7 +326,7 @@ protected:
return {}; return {};
} }
bool pop_(T& obj, bool block) bool pop_(T& obj, bool block, const std::chrono::microseconds* duration = nullptr)
{ {
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
if (not active) { if (not active) {
@ -335,11 +337,14 @@ protected:
return false; return false;
} }
nof_waiting++; nof_waiting++;
while (circ_buffer.empty() and active) { if (duration == nullptr) {
cvar_empty.wait(lock); cvar_empty.wait(lock, [this]() { return not circ_buffer.empty() or not active; });
} else {
cvar_empty.wait_for(lock, *duration, [this]() { return not circ_buffer.empty() or not active; });
} }
nof_waiting--; nof_waiting--;
if (not active) { if (circ_buffer.empty()) {
// either queue got deactivated or there was a timeout
return false; return false;
} }
} }