diff --git a/lib/include/srslte/adt/circular_buffer.h b/lib/include/srslte/adt/circular_buffer.h index 7c4076a15..e6df1cd47 100644 --- a/lib/include/srslte/adt/circular_buffer.h +++ b/lib/include/srslte/adt/circular_buffer.h @@ -189,7 +189,6 @@ public: base_blocking_queue(PushingFunc push_func_, PoppingFunc pop_func_, Args&&... args) : circ_buffer(std::forward(args)...), push_func(push_func_), pop_func(pop_func_) {} - ~base_blocking_queue() { stop(); } void stop() { @@ -220,6 +219,7 @@ public: pop_(obj, true); return obj; } + bool pop_wait_for(T& obj, const std::chrono::microseconds& duration) { return pop_(obj, true, &duration); } void clear() { std::lock_guard lock(mutex); @@ -273,6 +273,8 @@ protected: PoppingFunc pop_func; CircBuffer circ_buffer; + ~base_blocking_queue() { stop(); } + bool push_(const T& t, bool block_mode) { std::unique_lock lock(mutex); @@ -324,7 +326,7 @@ protected: return {}; } - bool pop_(T& obj, bool block) + bool pop_(T& obj, bool block, const std::chrono::microseconds* duration = nullptr) { std::unique_lock lock(mutex); if (not active) { @@ -335,11 +337,14 @@ protected: return false; } nof_waiting++; - while (circ_buffer.empty() and active) { - cvar_empty.wait(lock); + if (duration == nullptr) { + cvar_empty.wait(lock, [this]() { return not circ_buffer.empty() or not active; }); + } else { + cvar_empty.wait_for(lock, *duration, [this]() { return not circ_buffer.empty() or not active; }); } nof_waiting--; - if (not active) { + if (circ_buffer.empty()) { + // either queue got deactivated or there was a timeout return false; } }