Fix race condition in rlc_am_stress test and other fixes related to this

test. The race condition was being cause by write_sdu being called
simultanously to read_pdu, which could cause the read_pdu to try to get
the SDU info before it had been written by the write_sdu.

Changes in this commit include:
 - Make sure PDCP sn in included in RLC AM stress test.
 - Stop handling control PDU when TX is not enabled in RLC AM.
 - Fixed issue with length of the PDCP SN in rlc_stress_test.
 - Moved the place were sdu info erase was called to avoid double calls to erase
 - Tentative fix for race condition in rlc_am_stress test.
 - Added function to print information about undelivered_sdu_info_queue
   for debugging.
This commit is contained in:
Pedro Alvarez 2021-02-02 13:20:47 +00:00
parent 2b092ddc85
commit 7b25eac47c
3 changed files with 82 additions and 23 deletions

View File

@ -59,8 +59,10 @@ struct rlc_sn_info_t {
}; };
struct pdcp_sdu_info_t { struct pdcp_sdu_info_t {
uint32_t sn; uint32_t sn;
bool fully_txed; // Boolean indicating if the SDU is fully transmitted. bool fully_txed; // Boolean indicating if the SDU is fully transmitted.
bool fully_acked; // Boolean indicating if the SDU is fully acked. This is only necessary temporarely to avoid
// duplicate removal from the queue while processing the status report
std::vector<rlc_sn_info_t> rlc_sn_info_list; // List of RLC PDUs in transit and whether they have been acked or not. std::vector<rlc_sn_info_t> rlc_sn_info_list; // List of RLC PDUs in transit and whether they have been acked or not.
}; };
@ -187,8 +189,8 @@ private:
srslte::timer_handler::unique_timer status_prohibit_timer; srslte::timer_handler::unique_timer status_prohibit_timer;
// SDU info for PDCP notifications // SDU info for PDCP notifications
uint32_t pdcp_info_queue_capacity = 512; uint32_t pdcp_info_queue_capacity = 512;
std::map<uint32_t, pdcp_sdu_info_t> undelivered_sdu_info_queue; std::map<uint32_t, pdcp_sdu_info_t> undelivered_sdu_info_queue = {};
// Callback function for buffer status report // Callback function for buffer status report
bsr_callback_t bsr_callback; bsr_callback_t bsr_callback;
@ -316,6 +318,7 @@ uint32_t rlc_am_packed_length(rlc_status_pdu_t* status);
uint32_t rlc_am_packed_length(rlc_amd_retx_t retx); uint32_t rlc_am_packed_length(rlc_amd_retx_t retx);
bool rlc_am_is_valid_status_pdu(const rlc_status_pdu_t& status); bool rlc_am_is_valid_status_pdu(const rlc_status_pdu_t& status);
bool rlc_am_is_pdu_segment(uint8_t* payload); bool rlc_am_is_pdu_segment(uint8_t* payload);
std::string rlc_am_undelivered_sdu_info_to_string(const std::map<uint32_t, pdcp_sdu_info_t>& info_queue);
std::string rlc_am_status_pdu_to_string(rlc_status_pdu_t* status); std::string rlc_am_status_pdu_to_string(rlc_status_pdu_t* status);
std::string rlc_amd_pdu_header_to_string(const rlc_amd_pdu_header_t& header); std::string rlc_amd_pdu_header_to_string(const rlc_amd_pdu_header_t& header);
bool rlc_am_start_aligned(const uint8_t fi); bool rlc_am_start_aligned(const uint8_t fi);

View File

@ -347,12 +347,16 @@ uint32_t rlc_am_lte::rlc_am_lte_tx::get_buffer_state()
int rlc_am_lte::rlc_am_lte_tx::write_sdu(unique_byte_buffer_t sdu) int rlc_am_lte::rlc_am_lte_tx::write_sdu(unique_byte_buffer_t sdu)
{ {
pthread_mutex_lock(&mutex);
if (!tx_enabled) { if (!tx_enabled) {
pthread_mutex_unlock(&mutex);
return SRSLTE_ERROR; return SRSLTE_ERROR;
} }
if (sdu.get() == nullptr) { if (sdu.get() == nullptr) {
log->warning("NULL SDU pointer in write_sdu()\n"); log->warning("NULL SDU pointer in write_sdu()\n");
pthread_mutex_unlock(&mutex);
return SRSLTE_ERROR; return SRSLTE_ERROR;
} }
@ -375,13 +379,15 @@ int rlc_am_lte::rlc_am_lte_tx::write_sdu(unique_byte_buffer_t sdu)
RB_NAME, RB_NAME,
ret.error()->N_bytes, ret.error()->N_bytes,
tx_sdu_queue.size()); tx_sdu_queue.size());
pthread_mutex_unlock(&mutex);
return SRSLTE_ERROR; return SRSLTE_ERROR;
} }
// Store SDU info // Store SDU info
uint32_t info_count = undelivered_sdu_info_queue.count(info.sn); uint32_t info_count = undelivered_sdu_info_queue.count(info.sn);
if (info_count != 0) { if (info_count != 0) {
log->error("PDCP SDU info already exists\n"); log->error("PDCP SDU info already exists. SN=%d\n", info.sn);
pthread_mutex_unlock(&mutex);
return SRSLTE_ERROR; return SRSLTE_ERROR;
} }
@ -390,7 +396,9 @@ int rlc_am_lte::rlc_am_lte_tx::write_sdu(unique_byte_buffer_t sdu)
undelivered_sdu_info_queue.size()); undelivered_sdu_info_queue.size());
} }
log->debug("Adding SDU info for PDCP_SN=%d", info.sn);
undelivered_sdu_info_queue[info.sn] = info; undelivered_sdu_info_queue[info.sn] = info;
pthread_mutex_unlock(&mutex);
return SRSLTE_SUCCESS; return SRSLTE_SUCCESS;
} }
@ -413,6 +421,10 @@ int rlc_am_lte::rlc_am_lte_tx::read_pdu(uint8_t* payload, uint32_t nof_bytes)
int pdu_size = 0; int pdu_size = 0;
if (not tx_enabled) {
goto unlock_and_exit;
}
log->debug("MAC opportunity - %d bytes\n", nof_bytes); log->debug("MAC opportunity - %d bytes\n", nof_bytes);
log->debug("tx_window size - %zu PDUs\n", tx_window.size()); log->debug("tx_window size - %zu PDUs\n", tx_window.size());
@ -856,7 +868,12 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt
pdu->N_bytes += to_move; pdu->N_bytes += to_move;
tx_sdu->N_bytes -= to_move; tx_sdu->N_bytes -= to_move;
tx_sdu->msg += to_move; tx_sdu->msg += to_move;
undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn].rlc_sn_info_list.push_back({header.sn, false}); auto info_it = undelivered_sdu_info_queue.find(tx_sdu->md.pdcp_sn);
if (info_it == undelivered_sdu_info_queue.end()) {
log->error("Could not find PDCP SN in SDU info queue (segment). PDCP_SN=%d\n", tx_sdu->md.pdcp_sn);
return 0;
}
undelivered_sdu_info_queue.at(tx_sdu->md.pdcp_sn).rlc_sn_info_list.push_back({header.sn, false});
if (tx_sdu->N_bytes == 0) { if (tx_sdu->N_bytes == 0) {
log->debug("%s Complete SDU scheduled for tx.\n", RB_NAME); log->debug("%s Complete SDU scheduled for tx.\n", RB_NAME);
undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn].fully_txed = true; undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn].fully_txed = true;
@ -897,7 +914,12 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt
pdu->N_bytes += to_move; pdu->N_bytes += to_move;
tx_sdu->N_bytes -= to_move; tx_sdu->N_bytes -= to_move;
tx_sdu->msg += to_move; tx_sdu->msg += to_move;
undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn].rlc_sn_info_list.push_back({header.sn, false}); auto info_it = undelivered_sdu_info_queue.find(tx_sdu->md.pdcp_sn);
if (info_it == undelivered_sdu_info_queue.end()) {
log->error("Could not find PDCP SN in SDU info queue. PDCP_SN=%d\n", tx_sdu->md.pdcp_sn);
return 0;
}
info_it->second.rlc_sn_info_list.push_back({header.sn, false});
if (tx_sdu->N_bytes == 0) { if (tx_sdu->N_bytes == 0) {
log->debug("%s Complete SDU scheduled for tx. PDCP SN=%d\n", RB_NAME, tx_sdu->md.pdcp_sn); log->debug("%s Complete SDU scheduled for tx. PDCP SN=%d\n", RB_NAME, tx_sdu->md.pdcp_sn);
undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn].fully_txed = true; undelivered_sdu_info_queue[tx_sdu->md.pdcp_sn].fully_txed = true;
@ -966,6 +988,10 @@ int rlc_am_lte::rlc_am_lte_tx::build_data_pdu(uint8_t* payload, uint32_t nof_byt
void rlc_am_lte::rlc_am_lte_tx::handle_control_pdu(uint8_t* payload, uint32_t nof_bytes) void rlc_am_lte::rlc_am_lte_tx::handle_control_pdu(uint8_t* payload, uint32_t nof_bytes)
{ {
if (not tx_enabled) {
return;
}
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
log->info_hex(payload, nof_bytes, "%s Rx control PDU", RB_NAME); log->info_hex(payload, nof_bytes, "%s Rx control PDU", RB_NAME);
@ -1063,6 +1089,15 @@ void rlc_am_lte::rlc_am_lte_tx::handle_control_pdu(uint8_t* payload, uint32_t no
if (not notify_info_vec.empty()) { if (not notify_info_vec.empty()) {
parent->pdcp->notify_delivery(parent->lcid, notify_info_vec); parent->pdcp->notify_delivery(parent->lcid, notify_info_vec);
// Remove all SDUs that were fully acked
for (uint32_t acked_pdcp_sn : notify_info_vec) {
log->debug("Erasing SDU info: PDCP_SN=%d\n", acked_pdcp_sn);
size_t erased = undelivered_sdu_info_queue.erase(acked_pdcp_sn);
if (erased == 0) {
log->error("Could not find info to erase: SN=%d\n", acked_pdcp_sn);
}
}
} }
debug_state(); debug_state();
@ -1078,6 +1113,10 @@ void rlc_am_lte::rlc_am_lte_tx::handle_control_pdu(uint8_t* payload, uint32_t no
void rlc_am_lte::rlc_am_lte_tx::update_notification_ack_info(const rlc_amd_tx_pdu_t& tx_pdu, void rlc_am_lte::rlc_am_lte_tx::update_notification_ack_info(const rlc_amd_tx_pdu_t& tx_pdu,
std::vector<uint32_t>& notify_info_vec) std::vector<uint32_t>& notify_info_vec)
{ {
log->debug("Updating ACK info: RLC SN=%d, number of notified SDU=%ld, number of undelivered SDUs=%ld",
tx_pdu.header.sn,
notify_info_vec.size(),
undelivered_sdu_info_queue.size());
// Iterate over all undelivered SDUs // Iterate over all undelivered SDUs
for (auto& info_it : undelivered_sdu_info_queue) { for (auto& info_it : undelivered_sdu_info_queue) {
// Iterate over all SNs that were TX'ed // Iterate over all SNs that were TX'ed
@ -1090,21 +1129,16 @@ void rlc_am_lte::rlc_am_lte_tx::update_notification_ack_info(const rlc_amd_tx_pd
} }
} }
// Check wether the SDU was fully acked // Check wether the SDU was fully acked
if (info.fully_txed) { if (info.fully_txed and not info.fully_acked) {
// Iterate over all SNs that // Check if all SNs were ACK'ed
bool fully_acked = std::all_of(info.rlc_sn_info_list.begin(), info.fully_acked = std::all_of(info.rlc_sn_info_list.begin(),
info.rlc_sn_info_list.end(), info.rlc_sn_info_list.end(),
[](rlc_sn_info_t rlc_sn_info) { return rlc_sn_info.is_acked; }); [](rlc_sn_info_t rlc_sn_info) { return rlc_sn_info.is_acked; });
if (fully_acked) { if (info.fully_acked) {
notify_info_vec.push_back(pdcp_sn); notify_info_vec.push_back(pdcp_sn);
} }
} }
} }
// Remove all SDUs that were fully acked
for (uint32_t acked_pdcp_sn : notify_info_vec) {
undelivered_sdu_info_queue.erase(acked_pdcp_sn);
}
} }
void rlc_am_lte::rlc_am_lte_tx::debug_state() void rlc_am_lte::rlc_am_lte_tx::debug_state()
@ -1643,9 +1677,9 @@ bool rlc_am_lte::rlc_am_lte_rx::get_do_status()
void rlc_am_lte::rlc_am_lte_rx::write_pdu(uint8_t* payload, const uint32_t nof_bytes) void rlc_am_lte::rlc_am_lte_rx::write_pdu(uint8_t* payload, const uint32_t nof_bytes)
{ {
if (nof_bytes < 1) if (nof_bytes < 1) {
return; return;
}
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
if (rlc_am_is_control_pdu(payload)) { if (rlc_am_is_control_pdu(payload)) {
@ -2243,6 +2277,28 @@ std::string rlc_am_status_pdu_to_string(rlc_status_pdu_t* status)
return ss.str(); return ss.str();
} }
std::string rlc_am_undelivered_sdu_info_to_string(const std::map<uint32_t, pdcp_sdu_info_t>& info_queue)
{
std::string str = "\n";
for (const auto& info_it : info_queue) {
uint32_t pdcp_sn = info_it.first;
auto info = info_it.second;
std::string tmp_str = fmt::format("\tPDCP_SN = {}, RLC_SNs = [", pdcp_sn);
for (auto rlc_sn_info : info.rlc_sn_info_list) {
std::string tmp_str2;
if (rlc_sn_info.is_acked) {
tmp_str2 = fmt::format("ACK={}, ", rlc_sn_info.sn);
} else {
tmp_str2 = fmt::format("NACK={}, ", rlc_sn_info.sn);
}
tmp_str += tmp_str2;
}
tmp_str += "]\n";
str += tmp_str;
}
return str;
}
std::string rlc_amd_pdu_header_to_string(const rlc_amd_pdu_header_t& header) std::string rlc_amd_pdu_header_to_string(const rlc_amd_pdu_header_t& header)
{ {
std::stringstream ss; std::stringstream ss;

View File

@ -85,7 +85,6 @@ typedef struct {
void parse_args(stress_test_args_t* args, int argc, char* argv[]) void parse_args(stress_test_args_t* args, int argc, char* argv[])
{ {
// Command line only options // Command line only options
bpo::options_description general("General options"); bpo::options_description general("General options");
@ -355,8 +354,8 @@ public:
private: private:
void run_thread() void run_thread()
{ {
uint8_t sn = 0; uint16_t pdcp_sn = 0;
byte_buffer_pool* pool = byte_buffer_pool::get_instance(); byte_buffer_pool* pool = byte_buffer_pool::get_instance();
while (run_enable) { while (run_enable) {
unique_byte_buffer_t pdu = srslte::allocate_unique_buffer(*pool, "rlc_tester::run_thread", true); unique_byte_buffer_t pdu = srslte::allocate_unique_buffer(*pool, "rlc_tester::run_thread", true);
if (pdu == NULL) { if (pdu == NULL) {
@ -365,10 +364,11 @@ private:
usleep(1000); usleep(1000);
continue; continue;
} }
pdu->md.pdcp_sn = pdcp_sn & 0x0FFF; // 12-bit SN
for (uint32_t i = 0; i < args.sdu_size; i++) { for (uint32_t i = 0; i < args.sdu_size; i++) {
pdu->msg[i] = sn; pdu->msg[i] = pdcp_sn & 0xFF;
} }
sn++; pdcp_sn++;
pdu->N_bytes = args.sdu_size; pdu->N_bytes = args.sdu_size;
rlc->write_sdu(lcid, std::move(pdu)); rlc->write_sdu(lcid, std::move(pdu));
if (args.sdu_gen_delay_usec > 0) { if (args.sdu_gen_delay_usec > 0) {