couple the pdcp discard timers and sdu buffer management into same subclass. Add asserts to ensure there is no ambiguity in the PDCP SN assignment and buffering. Extend the discard timer and pdcp sdu buffering to SN lengths above 12

This commit is contained in:
Francisco 2021-03-01 15:59:06 +00:00 committed by Francisco Paisana
parent 4723dd0aa9
commit 17fa79f24c
2 changed files with 221 additions and 251 deletions

View File

@ -13,6 +13,7 @@
#ifndef SRSLTE_PDCP_ENTITY_LTE_H
#define SRSLTE_PDCP_ENTITY_LTE_H
#include "srslte/adt/circular_array.h"
#include "srslte/common/buffer_pool.h"
#include "srslte/common/common.h"
#include "srslte/common/log.h"
@ -30,6 +31,174 @@ class rlc_interface_pdcp;
namespace srslte {
class undelivered_sdus_queue
{
public:
explicit undelivered_sdus_queue(srslte::task_sched_handle task_sched)
{
for (auto& e : sdus) {
e.discard_timer = task_sched.get_unique_timer();
}
}
bool empty() const { return count == 0; }
bool is_full() const { return count >= capacity; }
uint32_t size() const { return count; }
static uint32_t get_capacity() { return capacity; }
bool has_sdu(uint32_t sn) const
{
assert(sn != invalid_sn && "provided PDCP SN is invalid");
return sdus[sn].sdu != nullptr and sdus[sn].sdu->md.pdcp_sn == sn;
}
// Getter for the number of discard timers. Used for debugging.
size_t nof_discard_timers() const
{
return std::count_if(sdus.begin(), sdus.end(), [](const sdu_data& s) {
return s.sdu != nullptr and s.discard_timer.is_valid() and s.discard_timer.is_running();
});
}
bool add_sdu(uint32_t sn,
const srslte::unique_byte_buffer_t& sdu,
uint32_t discard_timeout,
const std::function<void(uint32_t)>& callback)
{
assert(discard_timeout < capacity and "Invalid discard timeout value");
assert(not has_sdu(sn) && "Cannot add repeated SNs");
if (is_full()) {
return false;
}
// Make sure we don't associate more than half of the PDCP SN space of contiguous PDCP SDUs
if (not empty()) {
int32_t diff = sn - fms;
if (diff > (int32_t)(capacity / 2)) {
return false;
}
if (diff <= 0 && diff > -((int32_t)(capacity / 2))) {
return false;
}
}
// Allocate buffer and exit on error
srslte::unique_byte_buffer_t tmp = make_byte_buffer();
if (tmp == nullptr) {
return false;
}
// Update FMS and LMS if necessary
if (empty()) {
fms = sn;
lms = sn;
} else {
update_lms(sn);
}
// Add SDU
count++;
sdus[sn].sdu = std::move(tmp);
sdus[sn].sdu->md.pdcp_sn = sn;
sdus[sn].sdu->N_bytes = sdu->N_bytes;
memcpy(sdus[sn].sdu->msg, sdu->msg, sdu->N_bytes);
if (discard_timeout > 0) {
sdus[sn].discard_timer.set(discard_timeout, callback);
sdus[sn].discard_timer.run();
}
sdus[sn].sdu->set_timestamp(); // Metrics
bytes += sdu->N_bytes;
return true;
}
unique_byte_buffer_t& operator[](uint32_t sn)
{
assert(has_sdu(sn));
return sdus[sn].sdu;
}
bool clear_sdu(uint32_t sn)
{
if (not has_sdu(sn)) {
return false;
}
count--;
bytes -= sdus[sn].sdu->N_bytes;
sdus[sn].discard_timer.clear();
sdus[sn].sdu.reset();
// Find next FMS,
update_fms();
return true;
}
void clear()
{
count = 0;
bytes = 0;
fms = 0;
for (uint32_t sn = 0; sn < capacity; sn++) {
sdus[sn].discard_timer.clear();
sdus[sn].sdu.reset();
}
}
uint32_t get_bytes() const { return bytes; }
uint32_t get_fms() const { return fms; }
void set_fms(uint32_t fms_) { fms = fms_; }
void update_fms()
{
if (empty()) {
fms = increment_sn(fms);
return;
}
for (uint32_t i = 0; i < capacity; ++i) {
uint32_t sn = increment_sn(fms + i);
if (has_sdu(sn)) {
fms = sn;
return;
}
}
fms = increment_sn(fms);
}
void update_lms(uint32_t sn)
{
if (empty()) {
lms = fms;
return;
}
int32_t diff = sn - lms;
if (diff > 0 && sn > lms) {
lms = sn;
} else if (diff < 0 && sn < lms) {
lms = sn;
}
}
uint32_t get_lms() const { return lms; }
private:
const static uint32_t capacity = 4096;
const static uint32_t invalid_sn = -1;
static uint32_t increment_sn(uint32_t sn) { return (sn + 1) % capacity; }
struct sdu_data {
srslte::unique_byte_buffer_t sdu;
srslte::unique_timer discard_timer;
};
uint32_t count = 0;
uint32_t bytes = 0;
uint32_t fms = 0;
uint32_t lms = 0;
srslte::circular_array<sdu_data, capacity> sdus;
};
/****************************************************************************
* Structs and Defines
* Ref: 3GPP TS 36.323 v10.1.0
@ -81,13 +250,12 @@ public:
void get_bearer_state(pdcp_lte_state_t* state) override;
void set_bearer_state(const pdcp_lte_state_t& state) override;
// Getter for the number of discard timers. Used for debugging.
uint32_t nof_discard_timers() const;
// Metrics helpers
pdcp_bearer_metrics_t get_metrics() override;
void reset_metrics() override;
size_t nof_discard_timers() const { return undelivered_sdus.nof_discard_timers(); }
private:
srsue::rlc_interface_pdcp* rlc = nullptr;
srsue::rrc_interface_pdcp* rrc = nullptr;
@ -107,155 +275,10 @@ private:
// Discard callback (discardTimer)
class discard_callback;
std::vector<unique_timer> discard_timers;
unique_timer* get_discard_timer(uint32_t sn);
void stop_discard_timer(uint32_t sn);
// TX Queue
uint32_t maximum_allocated_sns_window = 2048;
class undelivered_sdus_queue_t
{
public:
undelivered_sdus_queue_t() : sdus(capacity) {}
bool empty() { return count == 0; }
bool is_full() { return count >= capacity; }
uint32_t size() { return count; }
uint32_t get_capacity() { return capacity; }
bool has_sdu(uint32_t sn)
{
if (sn >= capacity) {
return false;
}
return sdus[sn] != nullptr;
}
bool add_sdu(uint32_t sn, const srslte::unique_byte_buffer_t& sdu)
{
assert(not has_sdu(sn));
if (is_full()) {
return false;
}
// Make sure we don't associate more than half of the PDCP SN space of contiguous PDCP SDUs
if (not empty()) {
int32_t diff = sn - fms;
if (diff > (int32_t)(capacity / 2)) {
return false;
}
if (diff <= 0 && diff > -((int32_t)(capacity / 2))) {
return false;
}
}
// Allocate buffer and exit on error
srslte::unique_byte_buffer_t tmp = make_byte_buffer();
if (tmp == nullptr) {
return false;
}
// Update FMS and LMS if necessary
if (empty()) {
fms = sn;
lms = sn;
} else {
update_lms(sn);
}
// Add SDU
count++;
sdus[sn] = std::move(tmp);
memcpy(sdus[sn]->msg, sdu->msg, sdu->N_bytes);
sdus[sn]->N_bytes = sdu->N_bytes;
bytes += sdu->N_bytes;
sdus[sn]->set_timestamp(); // Metrics
return true;
}
unique_byte_buffer_t& operator[](uint32_t sn)
{
assert(has_sdu(sn));
return sdus[sn];
}
void clear_sdu(uint32_t sn)
{
assert(has_sdu(sn));
if (has_sdu(sn)) {
count--;
bytes -= sdus[sn]->N_bytes;
sdus[sn] = nullptr;
}
// Find next FMS,
update_fms();
}
void clear()
{
count = 0;
bytes = 0;
fms = 0;
for (uint32_t sn = 0; sn < capacity; sn++) {
sdus[sn] = nullptr;
}
}
uint32_t get_bytes() { return bytes; }
uint32_t get_fms() { return fms; }
void set_fms(uint32_t fms_) { fms = fms_; }
void update_fms()
{
if (empty()) {
fms = increment_sn(fms);
return;
}
for (uint32_t i = 0; i < capacity; ++i) {
uint32_t sn = increment_sn(fms + i);
if (has_sdu(sn)) {
fms = sn;
return;
}
}
fms = increment_sn(fms);
}
void update_lms(uint32_t sn)
{
if (empty()) {
lms = fms;
return;
}
int32_t diff = sn - lms;
if (diff > 0 && sn > lms) {
lms = sn;
} else if (diff < 0 && sn < lms) {
lms = sn;
}
}
uint32_t get_lms() { return lms; }
private:
uint32_t increment_sn(uint32_t sn) { return (sn + 1) % capacity; }
uint32_t decrement_sn(uint32_t sn) { return (sn - 1) % capacity; }
const static uint32_t capacity = 4096;
uint32_t count = 0;
uint32_t bytes = 0;
uint32_t fms = 0;
uint32_t lms = 0;
std::vector<srslte::unique_byte_buffer_t> sdus;
} undelivered_sdus_queue;
uint32_t maximum_allocated_sns_window = 2048;
undelivered_sdus_queue undelivered_sdus;
};
// Discard callback (discardTimer)

View File

@ -26,7 +26,7 @@ pdcp_entity_lte::pdcp_entity_lte(srsue::rlc_interface_pdcp* rlc_,
srslog::basic_logger& logger,
uint32_t lcid_,
pdcp_config_t cfg_) :
pdcp_entity_base(task_sched_, logger), rlc(rlc_), rrc(rrc_), gw(gw_)
pdcp_entity_base(task_sched_, logger), rlc(rlc_), rrc(rrc_), gw(gw_), undelivered_sdus(task_sched_)
{
lcid = lcid_;
cfg = cfg_;
@ -55,17 +55,6 @@ pdcp_entity_lte::pdcp_entity_lte(srsue::rlc_interface_pdcp* rlc_,
cfg.discard_timer = pdcp_discard_timer_t::ms1500;
}
uint32_t discard_time_value = static_cast<uint32_t>(cfg.discard_timer);
if (discard_time_value > 0) {
// Note: One extra position is reserved at the end for the status report
discard_timers.reserve(maximum_pdcp_sn + 2);
for (uint32_t sn = 0; sn < maximum_pdcp_sn + 2; ++sn) {
discard_timers.emplace_back(task_sched.get_unique_timer());
discard_callback discard_fnc(this, sn);
discard_timers[sn].set(discard_time_value, discard_fnc);
}
}
// Queue Helpers
maximum_allocated_sns_window = (1 << cfg.sn_len) / 2;
@ -99,7 +88,7 @@ void pdcp_entity_lte::reestablish()
st.tx_hfn = 0;
st.rx_hfn = 0;
st.next_pdcp_rx_sn = 0;
undelivered_sdus_queue.clear();
undelivered_sdus.clear();
} else if (rlc->rb_is_um(lcid)) {
// Only reset counter in RLC-UM
st.next_pdcp_tx_sn = 0;
@ -149,15 +138,6 @@ void pdcp_entity_lte::write_sdu(unique_byte_buffer_t sdu, int upper_sn)
logger.error("Could not store SDU. Discarding %d\n", used_sn);
return;
}
// Start discard timer
if (cfg.discard_timer != pdcp_discard_timer_t::infinity) {
unique_timer* timer = get_discard_timer(used_sn);
if (timer != nullptr) {
timer->run();
logger.debug("Discard Timer set for SN %u. Timeout: %ums", used_sn, static_cast<uint32_t>(cfg.discard_timer));
}
}
}
// check for pending security config in transmit direction
@ -273,7 +253,6 @@ void pdcp_entity_lte::handle_control_pdu(unique_byte_buffer_t pdu)
logger.warning("Unhandled control PDU");
return;
}
return;
}
/****************************************************************************
@ -442,14 +421,14 @@ void pdcp_entity_lte::send_status_report()
// Get First Missing Segment (FMS)
uint32_t fms = 0;
if (undelivered_sdus_queue.empty()) {
if (undelivered_sdus.empty()) {
fms = st.next_pdcp_tx_sn;
} else {
fms = undelivered_sdus_queue.get_fms();
fms = undelivered_sdus.get_fms();
}
// Get Last Missing Segment
uint32_t lms = undelivered_sdus_queue.get_lms();
uint32_t lms = undelivered_sdus.get_lms();
// Allocate Status Report PDU
unique_byte_buffer_t pdu = make_byte_buffer();
@ -481,7 +460,7 @@ void pdcp_entity_lte::send_status_report()
}
// Add bitmap of missing PDUs, if necessary
if (not undelivered_sdus_queue.empty()) {
if (not undelivered_sdus.empty()) {
// First check size of bitmap
uint32_t bitmap_sz = std::ceil((float)(lms - (fms - 1)) / 8);
memset(&pdu->msg[pdu->N_bytes], 0, bitmap_sz);
@ -491,7 +470,7 @@ void pdcp_entity_lte::send_status_report()
fms - 1,
bitmap_sz);
for (uint32_t sn = fms; sn <= lms; sn++) {
if (undelivered_sdus_queue.has_sdu(sn)) {
if (undelivered_sdus.has_sdu(sn)) {
uint32_t offset = sn - fms;
uint32_t bit_offset = offset % 8;
uint32_t byte_offset = offset / 8;
@ -504,8 +483,6 @@ void pdcp_entity_lte::send_status_report()
// Write PDU to RLC
rlc->write_sdu(lcid, std::move(pdu));
return;
}
// Section 5.3.2 receive operation
@ -539,9 +516,8 @@ void pdcp_entity_lte::handle_status_report_pdu(unique_byte_buffer_t pdu)
// Remove all SDUs with SN smaller than FMS
for (uint32_t sn = 0; sn < fms; sn++) {
if (sn < fms && undelivered_sdus_queue.has_sdu(sn)) {
stop_discard_timer(sn);
undelivered_sdus_queue.clear_sdu(sn);
if (sn < fms && undelivered_sdus.has_sdu(sn)) {
undelivered_sdus.clear_sdu(sn);
}
}
@ -559,8 +535,7 @@ void pdcp_entity_lte::handle_status_report_pdu(unique_byte_buffer_t pdu)
// Discard ACK'ed SDUs
for (uint32_t sn : acked_sns) {
logger.debug("Status report ACKed SN=%d.", sn);
undelivered_sdus_queue.clear_sdu(sn);
stop_discard_timer(sn);
undelivered_sdus.clear_sdu(sn);
}
}
/****************************************************************************
@ -569,22 +544,22 @@ void pdcp_entity_lte::handle_status_report_pdu(unique_byte_buffer_t pdu)
bool pdcp_entity_lte::store_sdu(uint32_t sn, const unique_byte_buffer_t& sdu)
{
logger.debug("Storing SDU in undelivered SDUs queue. SN=%d, Queue size=%ld", sn, undelivered_sdus_queue.size());
logger.debug("Storing SDU in undelivered SDUs queue. SN=%d, Queue size=%ld", sn, undelivered_sdus.size());
// Check wether PDU is already in the queue
if (undelivered_sdus_queue.has_sdu(sn)) {
if (undelivered_sdus.has_sdu(sn)) {
logger.error("PDU already exists in the queue. TX_COUNT=%d", sn);
return false;
}
if (undelivered_sdus_queue.is_full()) {
if (undelivered_sdus.is_full()) {
logger.error("Undelivered SDUs queue is full. TX_COUNT=%d", sn);
return false;
}
// Make sure we don't associate more than half of the PDCP SN space of contiguous PDCP SDUs
if (not undelivered_sdus_queue.empty()) {
uint32_t fms_sn = undelivered_sdus_queue.get_fms();
if (not undelivered_sdus.empty()) {
uint32_t fms_sn = undelivered_sdus.get_fms();
int32_t diff = sn - fms_sn;
if (diff > (int32_t)maximum_allocated_sns_window) {
// This SN is too large to assign, it may cause HFN de-synchronization.
@ -593,7 +568,7 @@ bool pdcp_entity_lte::store_sdu(uint32_t sn, const unique_byte_buffer_t& sdu)
fms_sn,
diff,
maximum_allocated_sns_window,
undelivered_sdus_queue.size());
undelivered_sdus.size());
return false;
}
if (diff < 0 && diff > -((int32_t)maximum_allocated_sns_window)) {
@ -603,14 +578,19 @@ bool pdcp_entity_lte::store_sdu(uint32_t sn, const unique_byte_buffer_t& sdu)
fms_sn,
diff,
maximum_allocated_sns_window,
undelivered_sdus_queue.size());
undelivered_sdus.size());
return false;
}
}
// Copy PDU contents into queue
undelivered_sdus_queue.add_sdu(sn, sdu);
return true;
// Copy PDU contents into queue and start discard timer
uint32_t discard_timeout = static_cast<uint32_t>(cfg.discard_timer);
discard_callback discard_fnc(this, sn);
bool ret = undelivered_sdus.add_sdu(sn, sdu, discard_timeout, discard_fnc);
if (ret and discard_timeout > 0) {
logger.debug("Discard Timer set for SN %u. Timeout: %ums", sn, discard_timeout);
}
return ret;
}
/****************************************************************************
@ -621,19 +601,16 @@ void pdcp_entity_lte::discard_callback::operator()(uint32_t timer_id)
{
parent->logger.debug("Discard timer expired for PDU with SN = %d", discard_sn);
// Discard PDU if unacknowledged
if (parent->undelivered_sdus_queue.has_sdu(discard_sn)) {
parent->undelivered_sdus_queue.clear_sdu(discard_sn);
parent->logger.debug("Removed undelivered PDU with TX_COUNT=%d", discard_sn);
} else {
parent->logger.debug("Could not find PDU to discard. TX_COUNT=%d", discard_sn);
}
// Notify the RLC of the discard. It's the RLC to actually discard, if no segment was transmitted yet.
parent->rlc->discard_sdu(parent->lcid, discard_sn);
// Remove timer from map
parent->stop_discard_timer(discard_sn);
// Discard PDU if unacknowledged
if (parent->undelivered_sdus.has_sdu(discard_sn)) {
parent->logger.debug("Removed undelivered PDU with TX_COUNT=%d", discard_sn);
parent->undelivered_sdus.clear_sdu(discard_sn);
} else {
parent->logger.debug("Could not find PDU to discard. TX_COUNT=%d", discard_sn);
}
}
/****************************************************************************
@ -649,20 +626,18 @@ void pdcp_entity_lte::notify_delivery(const std::vector<uint32_t>& pdcp_sns)
continue;
}
// Find undelivered PDU info
if (not undelivered_sdus_queue.has_sdu(sn)) {
if (not undelivered_sdus.has_sdu(sn)) {
logger.warning("Could not find PDU for delivery notification. Notified SN=%d", sn);
} else {
// Metrics
tx_pdu_ack_latency_ms.push(
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() -
undelivered_sdus_queue[sn]->get_timestamp())
.count());
metrics.num_tx_acked_bytes += undelivered_sdus_queue[sn]->N_bytes;
metrics.num_tx_buffered_pdus_bytes -= undelivered_sdus_queue[sn]->N_bytes;
tx_pdu_ack_latency_ms.push(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - undelivered_sdus[sn]->get_timestamp())
.count());
metrics.num_tx_acked_bytes += undelivered_sdus[sn]->N_bytes;
metrics.num_tx_buffered_pdus_bytes -= undelivered_sdus[sn]->N_bytes;
// Remove PDU and disarm timer.
undelivered_sdus_queue.clear_sdu(sn);
stop_discard_timer(sn);
undelivered_sdus.clear_sdu(sn);
}
}
}
@ -677,12 +652,11 @@ void pdcp_entity_lte::notify_failure(const std::vector<uint32_t>& pdcp_sns)
continue;
}
// Find undelivered PDU info
if (not undelivered_sdus_queue.has_sdu(sn)) {
if (not undelivered_sdus.has_sdu(sn)) {
logger.info("Could not find PDU for failure notification. Notified SN=%d", sn);
} else {
// Remove PDU and disarm timer.
undelivered_sdus_queue.clear_sdu(sn);
stop_discard_timer(sn);
undelivered_sdus.clear_sdu(sn);
}
}
}
@ -726,56 +700,29 @@ void pdcp_entity_lte::set_bearer_state(const pdcp_lte_state_t& state)
std::map<uint32_t, srslte::unique_byte_buffer_t> pdcp_entity_lte::get_buffered_pdus()
{
logger.info("Buffered PDUs requested, buffer_size=%d", undelivered_sdus_queue.size());
logger.info("Buffered PDUs requested, buffer_size=%d", undelivered_sdus.size());
std::map<uint32_t, srslte::unique_byte_buffer_t> cpy{};
// Deep copy undelivered SDUs
// TODO: investigate wheter the deep copy can be avoided by moving the undelivered SDU queue.
// That can only be done just before the PDCP is disabled though.
for (uint32_t sn = 0; sn < undelivered_sdus_queue.get_capacity(); sn++) {
if (undelivered_sdus_queue.has_sdu(sn)) {
logger.debug(undelivered_sdus_queue[sn]->msg,
undelivered_sdus_queue[sn]->N_bytes,
"Forwarding buffered PDU with SN=%d",
sn);
for (uint32_t sn = 0; sn < undelivered_sdus.get_capacity(); sn++) {
if (undelivered_sdus.has_sdu(sn)) {
logger.debug(undelivered_sdus[sn]->msg, undelivered_sdus[sn]->N_bytes, "Forwarding buffered PDU with SN=%d", sn);
cpy[sn] = make_byte_buffer();
(*cpy[sn]) = *(undelivered_sdus_queue[sn]);
(*cpy[sn]) = *(undelivered_sdus[sn]);
}
}
return cpy;
}
uint32_t pdcp_entity_lte::nof_discard_timers() const
{
return std::count_if(
discard_timers.begin(), discard_timers.end(), [](const unique_timer& t) { return t.is_running(); });
}
unique_timer* pdcp_entity_lte::get_discard_timer(uint32_t sn)
{
// Note: When SN>max PDCP SN (Status report case), the position will be the last in the vector of discard timers
if (not discard_timers.empty()) {
uint32_t sn_idx = std::min((uint32_t)sn, (uint32_t)(discard_timers.size() - 1));
return &discard_timers[sn_idx];
}
return nullptr;
}
void pdcp_entity_lte::stop_discard_timer(uint32_t sn)
{
unique_timer* timer = get_discard_timer(sn);
if (timer != nullptr) {
timer->stop();
}
}
/****************************************************************************
* Metrics helpers
***************************************************************************/
pdcp_bearer_metrics_t pdcp_entity_lte::get_metrics()
{
metrics.num_tx_buffered_pdus = undelivered_sdus_queue.size();
metrics.num_tx_buffered_pdus_bytes = undelivered_sdus_queue.get_bytes(); //< Number of bytes of PDUs waiting for ACK
metrics.num_tx_buffered_pdus = undelivered_sdus.size();
metrics.num_tx_buffered_pdus_bytes = undelivered_sdus.get_bytes(); //< Number of bytes of PDUs waiting for ACK
metrics.tx_notification_latency_ms =
tx_pdu_ack_latency_ms.value(); //< Average time in ms from PDU delivery to RLC to ACK notification from RLC
return metrics;