byte_buffer_queue: make class thread-safe by using atomics

This commit is contained in:
Andre Puschmann 2021-05-27 17:59:37 +02:00
parent 8ab512c2be
commit 1d34aa280f
1 changed files with 16 additions and 12 deletions

View File

@ -80,33 +80,37 @@ public:
private: private:
struct push_callback { struct push_callback {
explicit push_callback(uint32_t& unread_bytes_, uint32_t& n_sdus_) : unread_bytes(&unread_bytes_), n_sdus(&n_sdus_) explicit push_callback(std::atomic<uint32_t>& unread_bytes_, std::atomic<uint32_t>& n_sdus_) :
unread_bytes(unread_bytes_), n_sdus(n_sdus_)
{} {}
void operator()(const unique_byte_buffer_t& msg) void operator()(const unique_byte_buffer_t& msg)
{ {
*unread_bytes += msg->N_bytes; unread_bytes.fetch_add(msg->N_bytes, std::memory_order_relaxed);
(*n_sdus)++; n_sdus.fetch_add(1, std::memory_order_relaxed);
} }
uint32_t* unread_bytes; std::atomic<uint32_t>& unread_bytes;
uint32_t* n_sdus; std::atomic<uint32_t>& n_sdus;
}; };
struct pop_callback { struct pop_callback {
explicit pop_callback(uint32_t& unread_bytes_, uint32_t& n_sdus_) : unread_bytes(&unread_bytes_), n_sdus(&n_sdus_) explicit pop_callback(std::atomic<uint32_t>& unread_bytes_, std::atomic<uint32_t>& n_sdus_) :
unread_bytes(unread_bytes_), n_sdus(n_sdus_)
{} {}
void operator()(const unique_byte_buffer_t& msg) void operator()(const unique_byte_buffer_t& msg)
{ {
if (msg == nullptr) { if (msg == nullptr) {
return; return;
} }
*unread_bytes -= std::min(msg->N_bytes, *unread_bytes); // non-atomic update of both state variables
*n_sdus = std::max(0, (int32_t)(*n_sdus) - 1); unread_bytes.fetch_sub(std::min(msg->N_bytes, unread_bytes.load(std::memory_order_relaxed)),
std::memory_order_relaxed);
n_sdus.store(std::max(0, (int32_t)(n_sdus.load(std::memory_order_relaxed)) - 1), std::memory_order_relaxed);
} }
uint32_t* unread_bytes; std::atomic<uint32_t>& unread_bytes;
uint32_t* n_sdus; std::atomic<uint32_t>& n_sdus;
}; };
uint32_t unread_bytes = 0; std::atomic<uint32_t> unread_bytes = {0};
uint32_t n_sdus = 0; std::atomic<uint32_t> n_sdus = {0};
public: public:
dyn_blocking_queue<unique_byte_buffer_t, push_callback, pop_callback> queue; dyn_blocking_queue<unique_byte_buffer_t, push_callback, pop_callback> queue;