/** * * \section COPYRIGHT * * Copyright 2013-2021 Software Radio Systems Limited * * By using this file, you agree to the terms and conditions set * forth in the LICENSE file which can be found at the top level of * the distribution. * */ #ifndef SRSRAN_CIRCULAR_BUFFER_H #define SRSRAN_CIRCULAR_BUFFER_H #include "srsran/adt/detail/type_storage.h" #include "srsran/adt/expected.h" #include "srsran/adt/pool/pool_utils.h" #include "srsran/common/srsran_assert.h" #include #include #include #include #include #include #include #include namespace srsran { namespace detail { template size_t get_max_size(const std::array& a) { return a.max_size(); } template size_t get_max_size(const std::vector& a) { return a.capacity(); } /** * Base common class for definition of circular buffer data structures with the following features: * - no allocations while pushing/popping new elements. Just an internal index update * - it provides helper methods to add/remove objects * - it provides an iterator interface to iterate over added elements in the buffer * - not thread-safe * @tparam Container underlying container type used as buffer (e.g. std::array or std::vector) */ template class base_circular_buffer { using storage_t = typename Container::value_type; using T = typename storage_t::value_type; template class iterator_impl { using parent_type = typename std::conditional::value, base_circular_buffer, const base_circular_buffer >::type; public: using value_type = DataType; using reference = DataType&; using pointer = DataType*; using difference_type = std::ptrdiff_t; using iterator_category = std::forward_iterator_tag; iterator_impl(parent_type& parent_, size_t i) : parent(&parent_), idx(i) {} iterator_impl& operator++() { idx = (idx + 1) % parent->max_size(); return *this; } iterator_impl operator++(int) { iterator_impl tmp(*this); ++(*this); return tmp; } iterator_impl operator+(difference_type n) { iterator_impl tmp(*this); tmp += n; return tmp; } iterator_impl& operator+=(difference_type n) { idx = (idx + n) % parent->max_size(); return *this; } value_type* operator->() { return &get(); } const value_type* operator->() const { return &get(); } value_type& operator*() { return get(); } const value_type& operator*() const { return get(); } bool operator==(const iterator_impl& it) const { return it.parent == parent and it.idx == idx; } bool operator!=(const iterator_impl& it) const { return not(*this == it); } private: void assert_idx_within_bounds() { srsran_assert(idx + (idx >= parent->rpos ? 0 : parent->max_size()) < parent->rpos + parent->count, "index=%zd is out-of-bounds [%zd, %zd)", idx, parent->rpos, parent->count); } value_type& get() { assert_idx_within_bounds(); return parent->buffer[idx].get(); } const value_type& get() const { assert_idx_within_bounds(); return parent->buffer[idx].get(); } parent_type* parent; size_t idx; }; public: using value_type = T; using difference_type = typename Container::difference_type; using size_type = std::size_t; using iterator = iterator_impl; using const_iterator = iterator_impl; base_circular_buffer() = default; ~base_circular_buffer() { clear(); } template typename std::enable_if::value>::type push(U&& t) { srsran_assert(not full(), "Circular buffer is full."); size_t wpos = (rpos + count) % max_size(); buffer[wpos].emplace(std::forward(t)); count++; } bool try_push(T&& t) { if (full()) { return false; } push(std::move(t)); return true; } bool try_push(const T& t) { if (full()) { return false; } push(t); return true; } void pop() { srsran_assert(not empty(), "Cannot call pop() in empty circular buffer"); buffer[rpos].destroy(); rpos = (rpos + 1) % max_size(); count--; } T& top() { srsran_assert(not empty(), "Cannot call top() in empty circular buffer"); return buffer[rpos].get(); } const T& top() const { srsran_assert(not empty(), "Cannot call top() in empty circular buffer"); return buffer[rpos].get(); } void clear() { for (size_t i = 0; i < count; ++i) { buffer[(rpos + i) % max_size()].destroy(); } count = 0; } bool full() const { return count == max_size(); } bool empty() const { return count == 0; } size_t size() const { return count; } size_t max_size() const { return detail::get_max_size(buffer); } T& operator[](size_t i) { srsran_assert(i < count, "Out-of-bounds access to circular buffer (%zd >= %zd)", i, count); return buffer[(rpos + i) % max_size()].get(); } const T& operator[](size_t i) const { srsran_assert(i < count, "Out-of-bounds access to circular buffer (%zd >= %zd)", i, count); return buffer[(rpos + i) % max_size()].get(); } iterator begin() { return iterator(*this, rpos); } const_iterator begin() const { return const_iterator(*this, rpos); } iterator end() { return iterator(*this, (rpos + count) % max_size()); } const_iterator end() const { return const_iterator(*this, (rpos + count) % max_size()); } template bool apply_first(const F& func) { for (auto it = begin(); it != end(); it++) { if (func(*it)) { return true; } } return false; } protected: base_circular_buffer(size_t rpos_, size_t count_) : rpos(rpos_), count(count_) {} template base_circular_buffer(size_t rpos_, size_t count_, BufferArgs&&... args) : rpos(rpos_), count(count_), buffer(std::forward(args)...) {} Container buffer; size_t rpos = 0; size_t count = 0; }; /** * Base common class for definition of blocking queue data structures with the following features: * - it stores pushed/popped samples in an internal circular buffer * - provides blocking and non-blocking push/pop APIs * - thread-safe * @tparam CircBuffer underlying circular buffer data type (e.g. static_circular_buffer or dyn_circular_buffer) * @tparam PushingFunc function void(const T&) called while pushing an element to the queue * @tparam PoppingFunc function void(const T&) called while popping an element from the queue */ template class base_blocking_queue { using T = typename CircBuffer::value_type; public: template base_blocking_queue(PushingFunc push_func_, PoppingFunc pop_func_, Args&&... args) : circ_buffer(std::forward(args)...), push_func(push_func_), pop_func(pop_func_) {} base_blocking_queue(const base_blocking_queue&) = delete; base_blocking_queue(base_blocking_queue&&) = delete; base_blocking_queue& operator=(const base_blocking_queue&) = delete; base_blocking_queue& operator=(base_blocking_queue&&) = delete; void stop() { std::unique_lock lock(mutex); if (active) { active = false; if (nof_waiting > 0) { // Stop pending pushing/popping threads do { lock.unlock(); cvar_empty.notify_all(); cvar_full.notify_all(); std::this_thread::yield(); lock.lock(); } while (nof_waiting > 0); } // Empty queue circ_buffer.clear(); } } bool try_push(const T& t) { return push_(t, false); } srsran::error_type try_push(T&& t) { return push_(std::move(t), false); } bool push_blocking(const T& t) { return push_(t, true); } srsran::error_type push_blocking(T&& t) { return push_(std::move(t), true); } bool try_pop(T& obj) { return pop_(obj, false); } T pop_blocking() { T obj{}; pop_(obj, true); return obj; } bool pop_wait_until(T& obj, const std::chrono::system_clock::time_point& until) { return pop_(obj, true, &until); } void clear() { T obj; while (pop_(obj, false)) { } } size_t size() const { std::lock_guard lock(mutex); return circ_buffer.size(); } bool empty() const { std::lock_guard lock(mutex); return circ_buffer.empty(); } bool full() const { std::lock_guard lock(mutex); return circ_buffer.full(); } size_t max_size() const { std::lock_guard lock(mutex); return circ_buffer.max_size(); } bool is_stopped() const { std::lock_guard lock(mutex); return not active; } template bool try_call_on_front(F&& f) { std::lock_guard lock(mutex); if (not circ_buffer.empty()) { f(circ_buffer.top()); return true; } return false; } template bool apply_first(const F& func) { std::lock_guard lock(mutex); return circ_buffer.apply_first(func); } PushingFunc push_func; PoppingFunc pop_func; protected: bool active = true; uint8_t nof_waiting = 0; mutable std::mutex mutex; std::condition_variable cvar_empty, cvar_full; CircBuffer circ_buffer; ~base_blocking_queue() { stop(); } bool push_(const T& t, bool block_mode) { std::unique_lock lock(mutex); if (not active) { return false; } if (circ_buffer.full()) { if (not block_mode) { return false; } nof_waiting++; while (circ_buffer.full() and active) { cvar_full.wait(lock); } nof_waiting--; if (not active) { return false; } } push_func(t); circ_buffer.push(t); lock.unlock(); cvar_empty.notify_one(); return true; } srsran::error_type push_(T&& t, bool block_mode) { std::unique_lock lock(mutex); if (not active) { return std::move(t); } if (circ_buffer.full()) { if (not block_mode) { return std::move(t); } nof_waiting++; while (circ_buffer.full() and active) { cvar_full.wait(lock); } nof_waiting--; if (not active) { return std::move(t); } } push_func(t); circ_buffer.push(std::move(t)); lock.unlock(); cvar_empty.notify_one(); return {}; } bool pop_(T& obj, bool block, const std::chrono::system_clock::time_point* until = nullptr) { std::unique_lock lock(mutex); if (not active) { return false; } if (circ_buffer.empty()) { if (not block) { return false; } nof_waiting++; if (until == nullptr) { cvar_empty.wait(lock, [this]() { return not circ_buffer.empty() or not active; }); } else { cvar_empty.wait_until(lock, *until, [this]() { return not circ_buffer.empty() or not active; }); } nof_waiting--; if (circ_buffer.empty()) { // either queue got deactivated or there was a timeout return false; } } obj = std::move(circ_buffer.top()); pop_func(obj); circ_buffer.pop(); lock.unlock(); cvar_full.notify_one(); return true; } }; } // namespace detail /** * Circular buffer with fixed, embedded buffer storage via a std::array. * - Single allocation at object creation for std::array. Given that the buffer size is known at compile-time, the * circular iteration over the buffer may be more optimized (e.g. when N is a power of 2, % operator can be avoided) * - not thread-safe * @tparam T value type stored by buffer * @tparam N size of the queue */ template class static_circular_buffer : public detail::base_circular_buffer, N> > { using base_t = detail::base_circular_buffer, N> >; public: static_circular_buffer() = default; static_circular_buffer(const static_circular_buffer& other) : base_t(other.rpos, other.count) { static_assert(std::is_copy_constructible::value, "T must be copy-constructible"); std::uninitialized_copy(other.begin(), other.end(), base_t::begin()); } static_circular_buffer(static_circular_buffer&& other) noexcept : base_t(other.rpos, other.count) { static_assert(std::is_move_constructible::value, "T must be move-constructible"); for (size_t i = 0; i < other.count; ++i) { size_t idx = (other.rpos + i) % other.max_size(); base_t::buffer[idx].move_ctor(std::move(other.buffer[idx])); } other.clear(); } static_circular_buffer& operator=(const static_circular_buffer& other) { if (this == &other) { return *this; } base_t::clear(); base_t::rpos = other.rpos; base_t::count = other.count; for (size_t i = 0; i < other.count; ++i) { size_t idx = (other.rpos + i) % other.max_size(); base_t::buffer[idx].copy_ctor(other.buffer[idx]); } return *this; } static_circular_buffer& operator=(static_circular_buffer&& other) noexcept { base_t::clear(); base_t::rpos = other.rpos; base_t::count = other.count; for (size_t i = 0; i < other.count; ++i) { size_t idx = (other.rpos + i) % other.max_size(); base_t::buffer[idx].move_ctor(std::move(other.buffer[idx])); } other.clear(); return *this; } }; /** * Circular buffer with buffer storage via a std::vector. * - size can be defined at run-time. * - not thread-safe * @tparam T value type stored by buffer */ template class dyn_circular_buffer : public detail::base_circular_buffer > > { using base_t = detail::base_circular_buffer > >; public: dyn_circular_buffer() = default; explicit dyn_circular_buffer(size_t max_size) : base_t(0, 0, max_size) {} dyn_circular_buffer(dyn_circular_buffer&& other) noexcept : base_t(other.rpos, other.count, std::move(other.buffer)) { other.count = 0; other.rpos = 0; } dyn_circular_buffer(const dyn_circular_buffer& other) : base_t(other.rpos, other.count, other.max_size()) { static_assert(std::is_copy_constructible::value, "T must be copy-constructible"); for (size_t i = 0; i < other.count; ++i) { size_t idx = (other.rpos + i) % other.max_size(); base_t::buffer[idx].copy_ctor(other.buffer[idx]); } } dyn_circular_buffer& operator=(dyn_circular_buffer other) noexcept { swap(other); other.clear(); return *this; } void swap(dyn_circular_buffer& other) noexcept { std::swap(base_t::rpos, other.rpos); std::swap(base_t::count, other.count); std::swap(base_t::buffer, other.buffer); } void set_size(size_t size) { srsran_assert(base_t::empty(), "Dynamic resizes not supported when circular buffer is not empty"); base_t::buffer.resize(size); } }; /** * Blocking queue with fixed, embedded buffer storage via a std::array. * - Blocking push/pop API via push_blocking(...) and pop_blocking(...) methods * - Non-blocking push/pop API via try_push(...) and try_pop(...) methods * - Only one initial allocation for the std::array * - thread-safe * @tparam T value type stored by buffer * @tparam N size of queue * @tparam PushingCallback function void(const T&) called while pushing an element to the queue * @tparam PoppingCallback function void(const T&) called while popping an element from the queue */ template class static_blocking_queue : public detail::base_blocking_queue, PushingCallback, PoppingCallback> { using base_t = detail::base_blocking_queue, PushingCallback, PoppingCallback>; public: explicit static_blocking_queue(PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) : base_t(push_callback, pop_callback) {} }; /** * Blocking queue with buffer storage represented via a std::vector. Features: * - Blocking push/pop API via push_blocking(...) and pop_blocking(...) methods * - Non-blocking push/pop API via try_push(...) and try_pop(...) methods * - Size can be defined at runtime. * - thread-safe * @tparam T value type stored by buffer * @tparam PushingCallback function void(const T&) called while pushing an element to the queue * @tparam PoppingCallback function void(const T&) called while popping an element from the queue */ template class dyn_blocking_queue : public detail::base_blocking_queue, PushingCallback, PoppingCallback> { using base_t = detail::base_blocking_queue, PushingCallback, PoppingCallback>; public: dyn_blocking_queue() = default; explicit dyn_blocking_queue(size_t size, PushingCallback push_callback = {}, PoppingCallback pop_callback = {}) : base_t(push_callback, pop_callback, size) {} void set_size(size_t size) { base_t::circ_buffer.set_size(size); } template bool apply_first(const F& func) { return base_t::apply_first(func); } }; } // namespace srsran #endif // SRSRAN_CIRCULAR_BUFFER_H