From 7b25eac47c7f39f8e61870a5ea9706e37d7b779e Mon Sep 17 00:00:00 2001 From: Pedro Alvarez Date: Tue, 2 Feb 2021 13:20:47 +0000 Subject: [PATCH] Fix race condition in rlc_am_stress test and other fixes related to this test. The race condition was being cause by write_sdu being called simultanously to read_pdu, which could cause the read_pdu to try to get the SDU info before it had been written by the write_sdu. Changes in this commit include: - Make sure PDCP sn in included in RLC AM stress test. - Stop handling control PDU when TX is not enabled in RLC AM. - Fixed issue with length of the PDCP SN in rlc_stress_test. - Moved the place were sdu info erase was called to avoid double calls to erase - Tentative fix for race condition in rlc_am_stress test. - Added function to print information about undelivered_sdu_info_queue for debugging. --- lib/include/srslte/upper/rlc_am_lte.h | 11 ++-- lib/src/upper/rlc_am_lte.cc | 84 ++++++++++++++++++++++----- lib/test/upper/rlc_stress_test.cc | 10 ++-- 3 files changed, 82 insertions(+), 23 deletions(-) diff --git a/lib/include/srslte/upper/rlc_am_lte.h b/lib/include/srslte/upper/rlc_am_lte.h index b471c4666..3e715d282 100644 --- a/lib/include/srslte/upper/rlc_am_lte.h +++ b/lib/include/srslte/upper/rlc_am_lte.h @@ -59,8 +59,10 @@ struct rlc_sn_info_t { }; struct pdcp_sdu_info_t { - uint32_t sn; - bool fully_txed; // Boolean indicating if the SDU is fully transmitted. + uint32_t sn; + bool fully_txed; // Boolean indicating if the SDU is fully transmitted. + bool fully_acked; // Boolean indicating if the SDU is fully acked. This is only necessary temporarely to avoid + // duplicate removal from the queue while processing the status report std::vector rlc_sn_info_list; // List of RLC PDUs in transit and whether they have been acked or not. }; @@ -187,8 +189,8 @@ private: srslte::timer_handler::unique_timer status_prohibit_timer; // SDU info for PDCP notifications - uint32_t pdcp_info_queue_capacity = 512; - std::map undelivered_sdu_info_queue; + uint32_t pdcp_info_queue_capacity = 512; + std::map undelivered_sdu_info_queue = {}; // Callback function for buffer status report bsr_callback_t bsr_callback; @@ -316,6 +318,7 @@ uint32_t rlc_am_packed_length(rlc_status_pdu_t* status); uint32_t rlc_am_packed_length(rlc_amd_retx_t retx); bool rlc_am_is_valid_status_pdu(const rlc_status_pdu_t& status); bool rlc_am_is_pdu_segment(uint8_t* payload); +std::string rlc_am_undelivered_sdu_info_to_string(const std::map& info_queue); std::string rlc_am_status_pdu_to_string(rlc_status_pdu_t* status); std::string rlc_amd_pdu_header_to_string(const rlc_amd_pdu_header_t& header); bool rlc_am_start_aligned(const uint8_t fi); diff --git a/lib/src/upper/rlc_am_lte.cc b/lib/src/upper/rlc_am_lte.cc index fad5207b0..240c3a484 100644 --- a/lib/src/upper/rlc_am_lte.cc +++ b/lib/src/upper/rlc_am_lte.cc @@ -347,12 +347,16 @@ uint32_t rlc_am_lte::rlc_am_lte_tx::get_buffer_state() int rlc_am_lte::rlc_am_lte_tx::write_sdu(unique_byte_buffer_t sdu) { + pthread_mutex_lock(&mutex); + if (!tx_enabled) { + pthread_mutex_unlock(&mutex); return SRSLTE_ERROR; } if (sdu.get() == nullptr) { log->warning("NULL SDU pointer in write_sdu()\n"); + pthread_mutex_unlock(&mutex); return SRSLTE_ERROR; } @@ -375,13 +379,15 @@ int rlc_am_lte::rlc_am_lte_tx::write_sdu(unique_byte_buffer_t sdu) RB_NAME, ret.error()->N_bytes, tx_sdu_queue.size()); + pthread_mutex_unlock(&mutex); return SRSLTE_ERROR; } // Store SDU info uint32_t info_count = undelivered_sdu_info_queue.count(info.sn); if (info_count != 0) { - log->error("PDCP SDU info already exists\n"); + log->error("PDCP SDU info already exists. SN=%d\n", info.sn); + pthread_mutex_unlock(&mutex); return SRSLTE_ERROR; } @@ -390,7 +396,9 @@ int rlc_am_lte::rlc_am_lte_tx::write_sdu(unique_byte_buffer_t sdu) undelivered_sdu_info_queue.size()); } + log->debug("Adding SDU info for PDCP_SN=%d", info.sn); undelivered_sdu_info_queue[info.sn] = info; + pthread_mutex_unlock(&mutex); return SRSLTE_SUCCESS; } @@ -413,6 +421,10 @@ int rlc_am_lte::rlc_am_lte_tx::read_pdu(uint8_t* payload, uint32_t nof_bytes) int pdu_size = 0; + if (not tx_enabled) { + goto unlock_and_exit; + } + log->debug("MAC opportunity - %d bytes\n", nof_bytes); log->debug("tx_window size - %zu PDUs\n", tx_window.size()); @@ -856,7 +868,12 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt pdu->N_bytes += to_move; tx_sdu->N_bytes -= to_move; tx_sdu->msg += to_move; - undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn].rlc_sn_info_list.push_back({header.sn, false}); + auto info_it = undelivered_sdu_info_queue.find(tx_sdu->md.pdcp_sn); + if (info_it == undelivered_sdu_info_queue.end()) { + log->error("Could not find PDCP SN in SDU info queue (segment). PDCP_SN=%d\n", tx_sdu->md.pdcp_sn); + return 0; + } + undelivered_sdu_info_queue.at(tx_sdu->md.pdcp_sn).rlc_sn_info_list.push_back({header.sn, false}); if (tx_sdu->N_bytes == 0) { log->debug("%s Complete SDU scheduled for tx.\n", RB_NAME); undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn].fully_txed = true; @@ -897,7 +914,12 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt pdu->N_bytes += to_move; tx_sdu->N_bytes -= to_move; tx_sdu->msg += to_move; - undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn].rlc_sn_info_list.push_back({header.sn, false}); + auto info_it = undelivered_sdu_info_queue.find(tx_sdu->md.pdcp_sn); + if (info_it == undelivered_sdu_info_queue.end()) { + log->error("Could not find PDCP SN in SDU info queue. PDCP_SN=%d\n", tx_sdu->md.pdcp_sn); + return 0; + } + info_it->second.rlc_sn_info_list.push_back({header.sn, false}); if (tx_sdu->N_bytes == 0) { log->debug("%s Complete SDU scheduled for tx. PDCP SN=%d\n", RB_NAME, tx_sdu->md.pdcp_sn); undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn].fully_txed = true; @@ -966,6 +988,10 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt void rlc_am_lte::rlc_am_lte_tx::handle_control_pdu(uint8_t* payload, uint32_t nof_bytes) { + if (not tx_enabled) { + return; + } + pthread_mutex_lock(&mutex); log->info_hex(payload, nof_bytes, "%s Rx control PDU", RB_NAME); @@ -1063,6 +1089,15 @@ void rlc_am_lte::rlc_am_lte_tx::handle_control_pdu(uint8_t* payload, uint32_t no if (not notify_info_vec.empty()) { parent->pdcp->notify_delivery(parent->lcid, notify_info_vec); + + // Remove all SDUs that were fully acked + for (uint32_t acked_pdcp_sn : notify_info_vec) { + log->debug("Erasing SDU info: PDCP_SN=%d\n", acked_pdcp_sn); + size_t erased = undelivered_sdu_info_queue.erase(acked_pdcp_sn); + if (erased == 0) { + log->error("Could not find info to erase: SN=%d\n", acked_pdcp_sn); + } + } } debug_state(); @@ -1078,6 +1113,10 @@ void rlc_am_lte::rlc_am_lte_tx::handle_control_pdu(uint8_t* payload, uint32_t no void rlc_am_lte::rlc_am_lte_tx::update_notification_ack_info(const rlc_amd_tx_pdu_t& tx_pdu, std::vector& notify_info_vec) { + log->debug("Updating ACK info: RLC SN=%d, number of notified SDU=%ld, number of undelivered SDUs=%ld", + tx_pdu.header.sn, + notify_info_vec.size(), + undelivered_sdu_info_queue.size()); // Iterate over all undelivered SDUs for (auto& info_it : undelivered_sdu_info_queue) { // Iterate over all SNs that were TX'ed @@ -1090,21 +1129,16 @@ void rlc_am_lte::rlc_am_lte_tx::update_notification_ack_info(const rlc_amd_tx_pd } } // Check wether the SDU was fully acked - if (info.fully_txed) { - // Iterate over all SNs that - bool fully_acked = std::all_of(info.rlc_sn_info_list.begin(), + if (info.fully_txed and not info.fully_acked) { + // Check if all SNs were ACK'ed + info.fully_acked = std::all_of(info.rlc_sn_info_list.begin(), info.rlc_sn_info_list.end(), [](rlc_sn_info_t rlc_sn_info) { return rlc_sn_info.is_acked; }); - if (fully_acked) { + if (info.fully_acked) { notify_info_vec.push_back(pdcp_sn); } } } - - // Remove all SDUs that were fully acked - for (uint32_t acked_pdcp_sn : notify_info_vec) { - undelivered_sdu_info_queue.erase(acked_pdcp_sn); - } } void rlc_am_lte::rlc_am_lte_tx::debug_state() @@ -1643,9 +1677,9 @@ bool rlc_am_lte::rlc_am_lte_rx::get_do_status() void rlc_am_lte::rlc_am_lte_rx::write_pdu(uint8_t* payload, const uint32_t nof_bytes) { - if (nof_bytes < 1) + if (nof_bytes < 1) { return; - + } pthread_mutex_lock(&mutex); if (rlc_am_is_control_pdu(payload)) { @@ -2243,6 +2277,28 @@ std::string rlc_am_status_pdu_to_string(rlc_status_pdu_t* status) return ss.str(); } +std::string rlc_am_undelivered_sdu_info_to_string(const std::map& info_queue) +{ + std::string str = "\n"; + for (const auto& info_it : info_queue) { + uint32_t pdcp_sn = info_it.first; + auto info = info_it.second; + std::string tmp_str = fmt::format("\tPDCP_SN = {}, RLC_SNs = [", pdcp_sn); + for (auto rlc_sn_info : info.rlc_sn_info_list) { + std::string tmp_str2; + if (rlc_sn_info.is_acked) { + tmp_str2 = fmt::format("ACK={}, ", rlc_sn_info.sn); + } else { + tmp_str2 = fmt::format("NACK={}, ", rlc_sn_info.sn); + } + tmp_str += tmp_str2; + } + tmp_str += "]\n"; + str += tmp_str; + } + return str; +} + std::string rlc_amd_pdu_header_to_string(const rlc_amd_pdu_header_t& header) { std::stringstream ss; diff --git a/lib/test/upper/rlc_stress_test.cc b/lib/test/upper/rlc_stress_test.cc index 2393b1e8f..59e7a86e7 100644 --- a/lib/test/upper/rlc_stress_test.cc +++ b/lib/test/upper/rlc_stress_test.cc @@ -85,7 +85,6 @@ typedef struct { void parse_args(stress_test_args_t* args, int argc, char* argv[]) { - // Command line only options bpo::options_description general("General options"); @@ -355,8 +354,8 @@ public: private: void run_thread() { - uint8_t sn = 0; - byte_buffer_pool* pool = byte_buffer_pool::get_instance(); + uint16_t pdcp_sn = 0; + byte_buffer_pool* pool = byte_buffer_pool::get_instance(); while (run_enable) { unique_byte_buffer_t pdu = srslte::allocate_unique_buffer(*pool, "rlc_tester::run_thread", true); if (pdu == NULL) { @@ -365,10 +364,11 @@ private: usleep(1000); continue; } + pdu->md.pdcp_sn = pdcp_sn & 0x0FFF; // 12-bit SN for (uint32_t i = 0; i < args.sdu_size; i++) { - pdu->msg[i] = sn; + pdu->msg[i] = pdcp_sn & 0xFF; } - sn++; + pdcp_sn++; pdu->N_bytes = args.sdu_size; rlc->write_sdu(lcid, std::move(pdu)); if (args.sdu_gen_delay_usec > 0) {