diff --git a/lib/include/srsran/adt/circular_buffer.h b/lib/include/srsran/adt/circular_buffer.h index 3b8da7a44..28fdf7a81 100644 --- a/lib/include/srsran/adt/circular_buffer.h +++ b/lib/include/srsran/adt/circular_buffer.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -156,6 +157,19 @@ public: buffer.resize(size); } + template + T discard_if(const F& func) + { + for (auto it = buffer.begin(); it != buffer.end(); it++) { + if (*it != nullptr && func(*it)) { + T tmp = std::move(*it); + *it = nullptr; + return tmp; + } + } + return nullptr; + } + private: Container buffer; size_t rpos = 0; @@ -264,6 +278,18 @@ public: return false; } + template + bool discard_if(const F& func) + { + std::lock_guard lock(mutex); + T tmp = circ_buffer.discard_if(func); + if (tmp == nullptr) { + return false; + } + pop_func(tmp); + return true; + } + protected: bool active = true; uint8_t nof_waiting = 0; @@ -443,6 +469,12 @@ public: base_t(push_callback, pop_callback, size) {} void set_size(size_t size) { base_t::circ_buffer.set_size(size); } + + template + bool discard_if(const F& func) + { + return base_t::discard_if(func); + } }; } // namespace srsran diff --git a/lib/include/srsran/upper/byte_buffer_queue.h b/lib/include/srsran/upper/byte_buffer_queue.h index 5451ea68d..aa2ef33dc 100644 --- a/lib/include/srsran/upper/byte_buffer_queue.h +++ b/lib/include/srsran/upper/byte_buffer_queue.h @@ -23,7 +23,9 @@ #include "srsran/adt/circular_buffer.h" #include "srsran/common/block_queue.h" +#include "srsran/common/byte_buffer.h" #include "srsran/common/common.h" +#include #include namespace srsran { @@ -31,7 +33,9 @@ namespace srsran { class byte_buffer_queue { public: - byte_buffer_queue(int capacity = 128) : queue(capacity, push_callback(unread_bytes), pop_callback(unread_bytes)) {} + byte_buffer_queue(int capacity = 128) : + queue(capacity, push_callback(unread_bytes, n_sdus), pop_callback(unread_bytes, n_sdus)) + {} void write(unique_byte_buffer_t msg) { queue.push_blocking(std::move(msg)); } @@ -46,6 +50,7 @@ public: void resize(uint32_t capacity) { queue.set_size(capacity); } uint32_t size() { return (uint32_t)queue.size(); } + uint32_t get_n_sdus() { return n_sdus; } uint32_t size_bytes() { return unread_bytes; } @@ -67,20 +72,42 @@ public: bool is_full() { return queue.full(); } + template + bool discard_if(const F& func) + { + return queue.discard_if(func); + } + private: struct push_callback { - explicit push_callback(uint32_t& unread_bytes_) : unread_bytes(&unread_bytes_) {} - void operator()(const unique_byte_buffer_t& msg) { *unread_bytes += msg->N_bytes; } + explicit push_callback(uint32_t& unread_bytes_, uint32_t& n_sdus_) : unread_bytes(&unread_bytes_), n_sdus(&n_sdus_) + {} + void operator()(const unique_byte_buffer_t& msg) + { + *unread_bytes += msg->N_bytes; + (*n_sdus)++; + } uint32_t* unread_bytes; + uint32_t* n_sdus; }; struct pop_callback { - explicit pop_callback(uint32_t& unread_bytes_) : unread_bytes(&unread_bytes_) {} - void operator()(const unique_byte_buffer_t& msg) { *unread_bytes -= std::min(msg->N_bytes, *unread_bytes); } + explicit pop_callback(uint32_t& unread_bytes_, uint32_t& n_sdus_) : unread_bytes(&unread_bytes_), n_sdus(&n_sdus_) + {} + void operator()(const unique_byte_buffer_t& msg) + { + if (msg == nullptr) { + return; + } + *unread_bytes -= std::min(msg->N_bytes, *unread_bytes); + *n_sdus = std::max(0, (int32_t)(*n_sdus) - 1); + } uint32_t* unread_bytes; + uint32_t* n_sdus; }; dyn_blocking_queue queue; uint32_t unread_bytes = 0; + uint32_t n_sdus = 0; }; } // namespace srsran diff --git a/lib/src/upper/rlc_am_lte.cc b/lib/src/upper/rlc_am_lte.cc index 06476b0c2..bd6a8a83c 100644 --- a/lib/src/upper/rlc_am_lte.cc +++ b/lib/src/upper/rlc_am_lte.cc @@ -209,12 +209,9 @@ rlc_am_lte::rlc_am_lte_tx::rlc_am_lte_tx(rlc_am_lte* parent_) : pool(byte_buffer_pool::get_instance()), poll_retx_timer(parent_->timers->get_unique_timer()), status_prohibit_timer(parent_->timers->get_unique_timer()) -{ -} +{} -rlc_am_lte::rlc_am_lte_tx::~rlc_am_lte_tx() -{ -} +rlc_am_lte::rlc_am_lte_tx::~rlc_am_lte_tx() {} void rlc_am_lte::rlc_am_lte_tx::set_bsr_callback(bsr_callback_t callback) { @@ -320,7 +317,7 @@ bool rlc_am_lte::rlc_am_lte_tx::has_data() return (((do_status() && not status_prohibit_timer.is_running())) || // if we have a status PDU to transmit (not retx_queue.empty()) || // if we have a retransmission (tx_sdu != NULL) || // if we are currently transmitting a SDU - (not tx_sdu_queue.is_empty())); // or if there is a SDU queued up for transmission + (tx_sdu_queue.get_n_sdus() != 0)); // or if there is a SDU queued up for transmission } /** @@ -345,8 +342,8 @@ void rlc_am_lte::rlc_am_lte_tx::check_sn_reached_max_retx(uint32_t sn) uint32_t rlc_am_lte::rlc_am_lte_tx::get_buffer_state() { std::lock_guard lock(mutex); - uint32_t n_bytes = 0; - uint32_t n_sdus = 0; + uint32_t n_bytes = 0; + uint32_t n_sdus = 0; logger.debug("%s Buffer state - do_status=%s, status_prohibit_running=%s (%d/%d)", RB_NAME, @@ -384,7 +381,7 @@ uint32_t rlc_am_lte::rlc_am_lte_tx::get_buffer_state() // Bytes needed for tx SDUs if (tx_window.size() < 1024) { - n_sdus = tx_sdu_queue.size(); + n_sdus = tx_sdu_queue.get_n_sdus(); n_bytes += tx_sdu_queue.size_bytes(); if (tx_sdu != NULL) { n_sdus++; @@ -456,7 +453,21 @@ void rlc_am_lte::rlc_am_lte_tx::discard_sdu(uint32_t discard_sn) if (!tx_enabled) { return; } - logger.warning("Discard SDU not implemented yet"); + + bool discarded = + tx_sdu_queue.discard_if([&discard_sn](const unique_byte_buffer_t& sdu) { return sdu->md.pdcp_sn == discard_sn; }); + + if (discarded) { + // remove also from undelivered SDUs queue + logger.info("Discarding SDU with PDCP_SN=%d", discard_sn); + if (not undelivered_sdu_info_queue.has_pdcp_sn(discard_sn)) { + logger.info("PDCP SDU info does not exists for discarded SDU. PDCP_SN=%d", discard_sn); + } else { + undelivered_sdu_info_queue.clear_pdcp_sdu(discard_sn); + } + } else { + logger.info("Could not find SDU to discard. PDCP_SN=%d", discard_sn); + } } bool rlc_am_lte::rlc_am_lte_tx::sdu_queue_is_full() @@ -966,7 +977,7 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt } // Pull SDUs from queue - while (pdu_space > head_len && tx_sdu_queue.size() > 0 && header.N_li < RLC_AM_WINDOW_SIZE) { + while (pdu_space > head_len && tx_sdu_queue.get_n_sdus() > 0 && header.N_li < RLC_AM_WINDOW_SIZE) { if (last_li > 0) { header.li[header.N_li] = last_li; header.N_li++; @@ -978,7 +989,18 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt } break; } - tx_sdu = tx_sdu_queue.read(); + + do { + tx_sdu = tx_sdu_queue.read(); + } while (tx_sdu == nullptr && tx_sdu_queue.size() != 0); + + if (tx_sdu == nullptr) { + if (header.N_li > 0) { + header.N_li--; + } + break; + } + to_move = ((pdu_space - head_len) >= tx_sdu->N_bytes) ? tx_sdu->N_bytes : pdu_space - head_len; memcpy(pdu_ptr, tx_sdu->msg, to_move); last_li = to_move; @@ -1045,7 +1067,7 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt } // Update Tx window - vt_s = (vt_s + 1) % MOD; + vt_s = (vt_s + 1) % MOD; // Write final header and TX tx_pdu.buf = std::move(pdu); @@ -1318,9 +1340,7 @@ rlc_am_lte::rlc_am_lte_rx::rlc_am_lte_rx(rlc_am_lte* parent_) : reordering_timer(parent_->timers->get_unique_timer()) {} -rlc_am_lte::rlc_am_lte_rx::~rlc_am_lte_rx() -{ -} +rlc_am_lte::rlc_am_lte_rx::~rlc_am_lte_rx() {} bool rlc_am_lte::rlc_am_lte_rx::configure(rlc_am_config_t cfg_) { @@ -1752,8 +1772,8 @@ void rlc_am_lte::rlc_am_lte_rx::write_pdu(uint8_t* payload, const uint32_t nof_b parent->tx.handle_control_pdu(payload, nof_bytes); } else { std::lock_guard lock(mutex); - rlc_amd_pdu_header_t header = {}; - uint32_t payload_len = nof_bytes; + rlc_amd_pdu_header_t header = {}; + uint32_t payload_len = nof_bytes; rlc_am_read_data_pdu_header(&payload, &payload_len, &header); if (payload_len > nof_bytes) { logger.info("Dropping corrupted PDU (%d B). Remaining length after header %d B.", nof_bytes, payload_len); @@ -1858,9 +1878,9 @@ int rlc_am_lte::rlc_am_lte_rx::get_status_pdu(rlc_status_pdu_t* status, const ui int rlc_am_lte::rlc_am_lte_rx::get_status_pdu_length() { std::lock_guard lock(mutex); - rlc_status_pdu_t status = {}; - status.ack_sn = vr_ms; - uint32_t i = vr_r; + rlc_status_pdu_t status = {}; + status.ack_sn = vr_ms; + uint32_t i = vr_r; while (RX_MOD_BASE(i) < RX_MOD_BASE(vr_ms) && status.N_nack < RLC_AM_WINDOW_SIZE) { if (not rx_window.has_sn(i)) { status.N_nack++; diff --git a/lib/test/upper/rlc_am_test.cc b/lib/test/upper/rlc_am_test.cc index 649cde13e..47e3a50fe 100644 --- a/lib/test/upper/rlc_am_test.cc +++ b/lib/test/upper/rlc_am_test.cc @@ -3413,6 +3413,85 @@ bool reestablish_test() return SRSRAN_SUCCESS; } +// This test checks the correct functioning of RLC discard functionality +bool discard_test() +{ + const rlc_config_t config = rlc_config_t::default_rlc_am_config(); +#if HAVE_PCAP + rlc_pcap pcap; + pcap.open("rlc_am_reestablish_test.pcap", config); + rlc_am_tester tester(&pcap); +#else + rlc_am_tester tester(NULL); +#endif + + srsran::timer_handler timers(8); + + rlc_am_lte rlc1(srslog::fetch_basic_logger("RLC_AM_1"), 1, &tester, &tester, &timers); + rlc_am_lte rlc2(srslog::fetch_basic_logger("RLC_AM_2"), 1, &tester, &tester, &timers); + + srslog::fetch_basic_logger("RLC_AM_1").set_hex_dump_max_size(100); + srslog::fetch_basic_logger("RLC_AM_2").set_hex_dump_max_size(100); + srslog::fetch_basic_logger("RLC").set_hex_dump_max_size(100); + + if (not rlc1.configure(config)) { + return -1; + } + + if (not rlc2.configure(config)) { + return -1; + } + + // Check has_data() after a SDU discard + { + uint32_t num_tx_pdus = 1; + for (uint32_t i = 0; i < num_tx_pdus; ++i) { + // Write SDU + unique_byte_buffer_t sdu = srsran::make_byte_buffer(); + TESTASSERT(sdu != nullptr); + sdu->N_bytes = 5; + for (uint32_t k = 0; k < sdu->N_bytes; ++k) { + sdu->msg[k] = i; // Write the index into the buffer + } + sdu->md.pdcp_sn = i; + rlc1.write_sdu(std::move(sdu)); + } + } + rlc1.discard_sdu(0); // Try to discard PDCP_SN=1 + TESTASSERT(rlc1.has_data() == false); + + // Discard an SDU in the midle of the queue and read PDUs after + { + uint32_t num_tx_pdus = 10; + for (uint32_t i = 0; i < num_tx_pdus; ++i) { + // Write SDU + unique_byte_buffer_t sdu = srsran::make_byte_buffer(); + TESTASSERT(sdu != nullptr); + sdu->N_bytes = 1; + for (uint32_t k = 0; k < sdu->N_bytes; ++k) { + sdu->msg[k] = i; // Write the index into the buffer + } + sdu->md.pdcp_sn = i; + rlc1.write_sdu(std::move(sdu)); + } + } + rlc1.discard_sdu(3); // Try to discard PDCP_SN=1 + TESTASSERT(rlc1.has_data() == true); + TESTASSERT(rlc1.get_buffer_state() == 23); // 2 bytes fixed header, 12 , 9 bytes of data, + + unique_byte_buffer_t pdu = srsran::make_byte_buffer(); + uint32_t len = rlc1.read_pdu(pdu->msg, 50); // enough for all PDUs + pdu->N_bytes = len; + TESTASSERT(23 == len); + + srslog::fetch_basic_logger("TEST").info("Received %zd SDUs", tester.sdus.size()); + +#if HAVE_PCAP + pcap.close(); +#endif + + return SRSRAN_SUCCESS; +} int main(int argc, char** argv) { // Setup the log message spy to intercept error and warning log entries from RLC @@ -3603,5 +3682,10 @@ int main(int argc, char** argv) exit(-1); }; + if (discard_test()) { + printf("discard_test failed\n"); + exit(-1); + }; + return SRSRAN_SUCCESS; }