From bc01a5ecda1abaac77df24f252946f207efa85d5 Mon Sep 17 00:00:00 2001 From: Francisco Paisana Date: Fri, 3 May 2019 16:19:43 +0100 Subject: [PATCH] changed block_queue api to return back the unique buffer in case it fails to push it to the queue --- lib/include/srslte/common/block_queue.h | 55 ++++++++++++++++++------- lib/include/srslte/upper/rlc_tx_queue.h | 2 +- lib/src/upper/rlc_am.cc | 13 ++++-- lib/src/upper/rlc_tm.cc | 16 +++---- lib/src/upper/rlc_um.cc | 13 +++--- 5 files changed, 65 insertions(+), 34 deletions(-) diff --git a/lib/include/srslte/common/block_queue.h b/lib/include/srslte/common/block_queue.h index 8e097ddde..2b81f6a14 100644 --- a/lib/include/srslte/common/block_queue.h +++ b/lib/include/srslte/common/block_queue.h @@ -100,7 +100,7 @@ public: return push_(value, false); } - bool try_push(myobj&& value) { return push_(std::move(value), false); } + std::pair try_push(myobj&& value) { return push_(std::move(value), false); } bool try_pop(myobj *value) { return pop_(value, false); @@ -163,13 +163,8 @@ private: return ret; } - template // universal ref - bool push_(MyObj&& value, bool block) + bool check_queue_space_unlocked(bool block) { - if (!enable) { - return false; - } - pthread_mutex_lock(&mutex); num_threads++; bool ret = false; if (capacity > 0) { @@ -178,20 +173,48 @@ private: pthread_cond_wait(&cv_full, &mutex); } if (!enable) { - goto exit; + return false; } } else if (q.size() >= (uint32_t) capacity) { - goto exit; + return false; } } - if (mutexed_callback) { - mutexed_callback->pushing(value); - } - q.push(std::forward(value)); - ret = true; - pthread_cond_signal(&cv_empty); - exit: num_threads--; + return true; + } + + std::pair push_(myobj&& value, bool block) + { + if (!enable) { + return std::make_pair(false, std::move(value)); + } + pthread_mutex_lock(&mutex); + bool ret = check_queue_space_unlocked(block); + if (ret) { + if (mutexed_callback) { + mutexed_callback->pushing(value); + } + q.push(std::move(value)); + pthread_cond_signal(&cv_empty); + } + pthread_mutex_unlock(&mutex); + return std::make_pair(ret, std::move(value)); + } + + bool push_(const myobj& value, bool block) + { + if (!enable) { + return false; + } + pthread_mutex_lock(&mutex); + bool ret = check_queue_space_unlocked(block); + if (ret) { + if (mutexed_callback) { + mutexed_callback->pushing(value); + } + q.push(value); + pthread_cond_signal(&cv_empty); + } pthread_mutex_unlock(&mutex); return ret; } diff --git a/lib/include/srslte/upper/rlc_tx_queue.h b/lib/include/srslte/upper/rlc_tx_queue.h index 08c88ed20..78c7d4f2d 100644 --- a/lib/include/srslte/upper/rlc_tx_queue.h +++ b/lib/include/srslte/upper/rlc_tx_queue.h @@ -55,7 +55,7 @@ public: } void write(unique_byte_buffer msg) { queue.push(std::move(msg)); } - bool try_write(unique_byte_buffer msg) { return queue.try_push(std::move(msg)); } + std::pair try_write(unique_byte_buffer&& msg) { return queue.try_push(std::move(msg)); } unique_byte_buffer read() { return queue.wait_pop(); } diff --git a/lib/src/upper/rlc_am.cc b/lib/src/upper/rlc_am.cc index 2384e1cbf..65420c460 100644 --- a/lib/src/upper/rlc_am.cc +++ b/lib/src/upper/rlc_am.cc @@ -367,14 +367,19 @@ void rlc_am::rlc_am_tx::write_sdu(unique_byte_buffer sdu, bool blocking) // non-blocking write uint8_t* msg_ptr = sdu->msg; uint32_t nof_bytes = sdu->N_bytes; - if (tx_sdu_queue.try_write(std::move(sdu))) { + std::pair ret = tx_sdu_queue.try_write(std::move(sdu)); + if (ret.first) { log->info_hex( msg_ptr, nof_bytes, "%s Tx SDU (%d B, tx_sdu_queue_len=%d)", RB_NAME, nof_bytes, tx_sdu_queue.size()); } else { -#warning Find a more elegant solution - the msg was already deallocated at this point + // in case of fail, the try_write returns back the sdu log->info("[Dropped SDU] %s Tx SDU (%d B, tx_sdu_queue_len=%d)", RB_NAME, nof_bytes, tx_sdu_queue.size()); - // log->info_hex(msg_ptr, nof_bytes, "[Dropped SDU] %s Tx SDU (%d B, tx_sdu_queue_len=%d)", RB_NAME, - // nof_bytes, tx_sdu_queue.size()); + log->info_hex(ret.second->msg, + ret.second->N_bytes, + "[Dropped SDU] %s Tx SDU (%d B, tx_sdu_queue_len=%d)", + RB_NAME, + ret.second->N_bytes, + tx_sdu_queue.size()); } } } else { diff --git a/lib/src/upper/rlc_tm.cc b/lib/src/upper/rlc_tm.cc index c17b30245..e2fd19225 100644 --- a/lib/src/upper/rlc_tm.cc +++ b/lib/src/upper/rlc_tm.cc @@ -104,7 +104,8 @@ void rlc_tm::write_sdu(unique_byte_buffer sdu, bool blocking) } else { uint8_t* msg_ptr = sdu->msg; uint32_t nof_bytes = sdu->N_bytes; - if (ul_queue.try_write(std::move(sdu))) { + std::pair ret = ul_queue.try_write(std::move(sdu)); + if (ret.first) { log->info_hex(msg_ptr, nof_bytes, "%s Tx SDU, queue size=%d, bytes=%d", @@ -112,13 +113,12 @@ void rlc_tm::write_sdu(unique_byte_buffer sdu, bool blocking) ul_queue.size(), ul_queue.size_bytes()); } else { -#warning Find a more elegant solution - the msg was already deallocated at this point - log->info("[Dropped SDU] %s Tx SDU, queue size=%d, bytes=%d", - rrc->get_rb_name(lcid).c_str(), - ul_queue.size(), - ul_queue.size()); - // log->info_hex(sdu->msg, sdu->N_bytes, "[Dropped SDU] %s Tx SDU, queue size=%d, bytes=%d", - // rrc->get_rb_name(lcid).c_str(), ul_queue.size(), ul_queue.size_bytes()); + log->info_hex(ret.second->msg, + ret.second->N_bytes, + "[Dropped SDU] %s Tx SDU, queue size=%d, bytes=%d", + rrc->get_rb_name(lcid).c_str(), + ul_queue.size(), + ul_queue.size_bytes()); } } } else { diff --git a/lib/src/upper/rlc_um.cc b/lib/src/upper/rlc_um.cc index ed1c1b36d..28e81ad8b 100644 --- a/lib/src/upper/rlc_um.cc +++ b/lib/src/upper/rlc_um.cc @@ -347,14 +347,17 @@ void rlc_um::rlc_um_tx::try_write_sdu(unique_byte_buffer sdu) if (sdu) { uint8_t* msg_ptr = sdu->msg; uint32_t nof_bytes = sdu->N_bytes; - if (tx_sdu_queue.try_write(std::move(sdu))) { + std::pair ret = tx_sdu_queue.try_write(std::move(sdu)); + if (ret.first) { log->info_hex( msg_ptr, nof_bytes, "%s Tx SDU (%d B, tx_sdu_queue_len=%d)", get_rb_name(), nof_bytes, tx_sdu_queue.size()); } else { -#warning Find a more elegant solution - the msg was already deallocated at this point - log->info("[Dropped SDU] %s Tx SDU (%d B, tx_sdu_queue_len=%d)", get_rb_name(), nof_bytes, tx_sdu_queue.size()); - // log->info_hex(msg_ptr, nof_bytes, "[Dropped SDU] %s Tx SDU (%d B, tx_sdu_queue_len=%d)", get_rb_name(), - // nof_bytes, tx_sdu_queue.size()); + log->info_hex(ret.second->msg, + ret.second->N_bytes, + "[Dropped SDU] %s Tx SDU (%d B, tx_sdu_queue_len=%d)", + get_rb_name(), + ret.second->N_bytes, + tx_sdu_queue.size()); } } else { log->warning("NULL SDU pointer in write_sdu()\n");