sync: protect sync_state, access ue_sync object only from one thread

races detected with TSAN, primarily the ue_sync object which isn't
thread-safe is accessed by all workers to set the CFO and by the
sync thread to receive samples (which read the CFO).

The patch introduces shadow variables that are updates from the
main thread before/after the sync is executed. The atomic shadow
variables can then be read from otherthreads without holding a
mutex, i.e. blocking the sync.
This commit is contained in:
Andre Puschmann 2021-05-27 15:25:55 +02:00
parent 1529379e9e
commit 98a91a2057
3 changed files with 39 additions and 29 deletions

View File

@ -189,7 +189,7 @@ private:
bool set_frequency(); bool set_frequency();
bool set_cell(float cfo); bool set_cell(float cfo);
bool running = false; std::atomic<bool> running = {false};
bool is_overflow = false; bool is_overflow = false;
srsran::rf_timestamp_t last_rx_time; srsran::rf_timestamp_t last_rx_time;
@ -229,11 +229,14 @@ private:
srsran::rf_buffer_t dummy_buffer; srsran::rf_buffer_t dummy_buffer;
// Sync metrics // Sync metrics
std::atomic<float> sfo = {}; // SFO estimate updated after each sync-cycle
std::atomic<float> cfo = {}; // CFO estimate updated after each sync-cycle
std::atomic<float> ref_cfo = {}; // provided adjustment value applied before sync
sync_metrics_t metrics = {}; sync_metrics_t metrics = {};
// in-sync / out-of-sync counters // in-sync / out-of-sync counters
uint32_t out_of_sync_cnt = 0; std::atomic<uint32_t> out_of_sync_cnt = {0};
uint32_t in_sync_cnt = 0; std::atomic<uint32_t> in_sync_cnt = {0};
std::mutex rrc_mutex; std::mutex rrc_mutex;
enum { enum {

View File

@ -30,7 +30,7 @@ public:
*/ */
state_t run_state() state_t run_state()
{ {
std::lock_guard<std::mutex> lock(inside); std::lock_guard<std::mutex> lock(mutex);
cur_state = next_state; cur_state = next_state;
if (state_setting) { if (state_setting) {
state_setting = false; state_setting = false;
@ -43,7 +43,7 @@ public:
// Called by the main thread at the end of each state to indicate it has finished. // Called by the main thread at the end of each state to indicate it has finished.
void state_exit(bool exit_ok = true) void state_exit(bool exit_ok = true)
{ {
std::lock_guard<std::mutex> lock(inside); std::lock_guard<std::mutex> lock(mutex);
if (cur_state == SFN_SYNC && exit_ok == true) { if (cur_state == SFN_SYNC && exit_ok == true) {
next_state = CAMPING; next_state = CAMPING;
} else { } else {
@ -54,7 +54,7 @@ public:
} }
void force_sfn_sync() void force_sfn_sync()
{ {
std::lock_guard<std::mutex> lock(inside); std::lock_guard<std::mutex> lock(mutex);
next_state = SFN_SYNC; next_state = SFN_SYNC;
} }
@ -65,20 +65,17 @@ public:
*/ */
void go_idle() void go_idle()
{ {
std::lock_guard<std::mutex> lock(outside);
// Do not wait when transitioning to IDLE to avoid blocking // Do not wait when transitioning to IDLE to avoid blocking
go_state_nowait(IDLE); next_state = IDLE;
} }
void run_cell_search() void run_cell_search()
{ {
std::lock_guard<std::mutex> lock(outside);
go_state(CELL_SEARCH); go_state(CELL_SEARCH);
wait_state_run(); wait_state_run();
wait_state_next(); wait_state_next();
} }
void run_sfn_sync() void run_sfn_sync()
{ {
std::lock_guard<std::mutex> lock(outside);
go_state(SFN_SYNC); go_state(SFN_SYNC);
wait_state_run(); wait_state_run();
wait_state_next(); wait_state_next();
@ -89,7 +86,7 @@ public:
bool is_camping() { return cur_state == CAMPING; } bool is_camping() { return cur_state == CAMPING; }
bool wait_idle(uint32_t timeout_ms) bool wait_idle(uint32_t timeout_ms)
{ {
std::unique_lock<std::mutex> lock(outside); std::unique_lock<std::mutex> lock(mutex);
// Avoid wasting time if the next state will not be IDLE // Avoid wasting time if the next state will not be IDLE
if (next_state != IDLE) { if (next_state != IDLE) {
@ -116,6 +113,7 @@ public:
const char* to_string() const char* to_string()
{ {
std::lock_guard<std::mutex> lock(mutex);
switch (cur_state) { switch (cur_state) {
case IDLE: case IDLE:
return "IDLE"; return "IDLE";
@ -135,7 +133,7 @@ public:
private: private:
void go_state(state_t s) void go_state(state_t s)
{ {
std::unique_lock<std::mutex> ul(inside); std::unique_lock<std::mutex> ul(mutex);
next_state = s; next_state = s;
state_setting = true; state_setting = true;
while (state_setting) { while (state_setting) {
@ -145,7 +143,7 @@ private:
void go_state_nowait(state_t s) void go_state_nowait(state_t s)
{ {
std::unique_lock<std::mutex> ul(inside); std::unique_lock<std::mutex> ul(mutex);
next_state = s; next_state = s;
state_setting = true; state_setting = true;
} }
@ -153,14 +151,14 @@ private:
/* Waits until there is a call to set_state() and then run_state(). Returns when run_state() returns */ /* Waits until there is a call to set_state() and then run_state(). Returns when run_state() returns */
void wait_state_run() void wait_state_run()
{ {
std::unique_lock<std::mutex> ul(inside); std::unique_lock<std::mutex> ul(mutex);
while (state_running) { while (state_running) {
cvar.wait(ul); cvar.wait(ul);
} }
} }
void wait_state_next() void wait_state_next()
{ {
std::unique_lock<std::mutex> ul(inside); std::unique_lock<std::mutex> ul(mutex);
while (cur_state != next_state) { while (cur_state != next_state) {
cvar.wait(ul); cvar.wait(ul);
} }
@ -168,10 +166,9 @@ private:
bool state_running = false; bool state_running = false;
bool state_setting = false; bool state_setting = false;
state_t cur_state = IDLE; std::atomic<state_t> next_state = {IDLE}; // can be updated from outside (i.e. other thread)
state_t next_state = IDLE; state_t cur_state = IDLE; // will only be accessed when holding the mutex
std::mutex inside; std::mutex mutex;
std::mutex outside;
std::condition_variable cvar; std::condition_variable cvar;
}; };

View File

@ -482,8 +482,9 @@ void sync::run_camping_in_sync_state(lte::sf_worker* lte_worker,
Debug("SYNC: Worker %d synchronized", lte_worker->get_id()); Debug("SYNC: Worker %d synchronized", lte_worker->get_id());
metrics.sfo = srsran_ue_sync_get_sfo(&ue_sync); // Collect and provide metrics from last successful sync
metrics.cfo = srsran_ue_sync_get_cfo(&ue_sync); metrics.sfo = sfo;
metrics.cfo = cfo;
metrics.ta_us = worker_com->ta.get_usec(); metrics.ta_us = worker_com->ta.get_usec();
for (uint32_t i = 0; i < worker_com->args->nof_lte_carriers; i++) { for (uint32_t i = 0; i < worker_com->args->nof_lte_carriers; i++) {
worker_com->set_sync_metrics(i, metrics); worker_com->set_sync_metrics(i, metrics);
@ -505,7 +506,7 @@ void sync::run_camping_in_sync_state(lte::sf_worker* lte_worker,
// Set CFO for all Carriers // Set CFO for all Carriers
for (uint32_t cc = 0; cc < worker_com->args->nof_lte_carriers; cc++) { for (uint32_t cc = 0; cc < worker_com->args->nof_lte_carriers; cc++) {
lte_worker->set_cfo_unlocked(cc, get_tx_cfo()); lte_worker->set_cfo_unlocked(cc, get_tx_cfo());
worker_com->update_cfo_measurement(cc, srsran_ue_sync_get_cfo(&ue_sync)); worker_com->update_cfo_measurement(cc, cfo);
} }
lte_worker->set_tti(tti); lte_worker->set_tti(tti);
@ -568,8 +569,18 @@ void sync::run_camping_state()
} }
} }
// Apply CFO adjustment if available
if (ref_cfo != 0.0) {
srsran_ue_sync_set_cfo_ref(&ue_sync, ref_cfo);
ref_cfo = 0.0; // reset until value changes again
}
// Primary Cell (PCell) Synchronization // Primary Cell (PCell) Synchronization
switch (srsran_ue_sync_zerocopy(&ue_sync, sync_buffer.to_cf_t(), lte_worker->get_buffer_len())) { int sync_result = srsran_ue_sync_zerocopy(&ue_sync, sync_buffer.to_cf_t(), lte_worker->get_buffer_len());
cfo = srsran_ue_sync_get_cfo(&ue_sync);
sfo = srsran_ue_sync_get_sfo(&ue_sync);
switch (sync_result) {
case 1: case 1:
run_camping_in_sync_state(lte_worker, nr_worker, sync_buffer); run_camping_in_sync_state(lte_worker, nr_worker, sync_buffer);
break; break;
@ -621,7 +632,7 @@ void sync::run_idle_state()
void sync::run_thread() void sync::run_thread()
{ {
while (running) { while (running.load(std::memory_order_relaxed)) {
phy_lib_logger.set_context(tti); phy_lib_logger.set_context(tti);
Debug("SYNC: state=%s, tti=%d", phy_state.to_string(), tti); Debug("SYNC: state=%s, tti=%d", phy_state.to_string(), tti);
@ -683,7 +694,7 @@ void sync::in_sync()
void sync::out_of_sync() void sync::out_of_sync()
{ {
// Send RRC out-of-sync signal after NOF_OUT_OF_SYNC_SF consecutive subframes // Send RRC out-of-sync signal after NOF_OUT_OF_SYNC_SF consecutive subframes
Info("Out-of-sync %d/%d", out_of_sync_cnt, worker_com->args->nof_out_of_sync_events); Info("Out-of-sync %d/%d", out_of_sync_cnt.load(std::memory_order_relaxed), worker_com->args->nof_out_of_sync_events);
out_of_sync_cnt++; out_of_sync_cnt++;
if (out_of_sync_cnt == worker_com->args->nof_out_of_sync_events) { if (out_of_sync_cnt == worker_com->args->nof_out_of_sync_events) {
Info("Sending to RRC"); Info("Sending to RRC");
@ -695,7 +706,7 @@ void sync::out_of_sync()
void sync::set_cfo(float cfo) void sync::set_cfo(float cfo)
{ {
srsran_ue_sync_set_cfo_ref(&ue_sync, cfo); ref_cfo = cfo;
} }
void sync::set_agc_enable(bool enable) void sync::set_agc_enable(bool enable)
@ -724,7 +735,7 @@ void sync::set_agc_enable(bool enable)
return; return;
} }
// Enable AGC // Enable AGC (unprotected call to ue_sync must not happen outside of thread calling recv)
srsran_ue_sync_start_agc( srsran_ue_sync_start_agc(
&ue_sync, callback_set_rx_gain, rf_info->min_rx_gain, rf_info->max_rx_gain, radio_h->get_rx_gain()); &ue_sync, callback_set_rx_gain, rf_info->min_rx_gain, rf_info->max_rx_gain, radio_h->get_rx_gain());
search_p.set_agc_enable(true); search_p.set_agc_enable(true);
@ -732,8 +743,7 @@ void sync::set_agc_enable(bool enable)
float sync::get_tx_cfo() float sync::get_tx_cfo()
{ {
float cfo = srsran_ue_sync_get_cfo(&ue_sync); // Use CFO estimate from last successful sync
float ret = cfo * ul_dl_factor; float ret = cfo * ul_dl_factor;
if (worker_com->args->cfo_is_doppler) { if (worker_com->args->cfo_is_doppler) {