From 5ff6b2eaa98fd006da8b2b280ceb703048221e50 Mon Sep 17 00:00:00 2001 From: faluco Date: Mon, 25 Oct 2021 12:26:24 +0200 Subject: [PATCH] Fix data race in ZMQ where nsamples and running variables could be read concurrently. --- lib/src/phy/rf/rf_zmq_imp.c | 10 +++++----- lib/src/phy/rf/rf_zmq_imp_rx.c | 31 ++++++++++++++++++------------- lib/src/phy/rf/rf_zmq_imp_trx.h | 4 ++++ lib/src/phy/rf/rf_zmq_imp_tx.c | 14 ++++++++++++++ 4 files changed, 41 insertions(+), 18 deletions(-) diff --git a/lib/src/phy/rf/rf_zmq_imp.c b/lib/src/phy/rf/rf_zmq_imp.c index fc966dcf2..fbb0ea3d1 100644 --- a/lib/src/phy/rf/rf_zmq_imp.c +++ b/lib/src/phy/rf/rf_zmq_imp.c @@ -678,7 +678,7 @@ int rf_zmq_recv_with_time_multi(void* h, void** data, uint32_t nsamples, bool bl } // return if receiver is turned off - if (!handler->receiver[0].running) { + if (!rf_zmq_rx_is_running(&handler->receiver[0])) { update_ts(handler, &handler->next_rx_ts, nsamples_baserate, "rx"); return nsamples; } @@ -705,7 +705,7 @@ int rf_zmq_recv_with_time_multi(void* h, void** data, uint32_t nsamples, bool bl // check for tx gap if we're also transmitting on this radio for (int i = 0; i < handler->nof_channels; i++) { - if (handler->transmitter[i].running) { + if (rf_zmq_tx_is_running(&handler->transmitter[i])) { rf_zmq_tx_align(&handler->transmitter[i], handler->next_rx_ts + nsamples_baserate); } } @@ -721,7 +721,7 @@ int rf_zmq_recv_with_time_multi(void* h, void** data, uint32_t nsamples, bool bl cf_t* ptr = (decim_factor != 1 || buffers[i] == NULL) ? handler->buffer_decimation[i] : buffers[i]; // Completed condition - if (count[i] < nsamples_baserate && handler->receiver[i].running) { + if (count[i] < nsamples_baserate && rf_zmq_rx_is_running(&handler->receiver[i])) { // Keep receiving int32_t n = rf_zmq_rx_baseband(&handler->receiver[i], &ptr[count[i]], nsamples_baserate); #if ZMQ_MONITOR @@ -909,7 +909,7 @@ int rf_zmq_send_timed_multi(void* h, int num_tx_gap_samples = 0; for (int i = 0; i < handler->nof_channels; i++) { - if (handler->transmitter[i].running) { + if (rf_zmq_tx_is_running(&handler->transmitter[i])) { num_tx_gap_samples = rf_zmq_tx_align(&handler->transmitter[i], tx_ts); } } @@ -919,7 +919,7 @@ int rf_zmq_send_timed_multi(void* h, "[zmq] Error: tx time is %.3f ms in the past (%" PRIu64 " < %" PRIu64 ")\n", -1000.0 * num_tx_gap_samples / handler->base_srate, tx_ts, - handler->transmitter[0].nsamples); + (uint64_t)rf_zmq_tx_get_nsamples(&handler->transmitter[0])); goto clean_exit; } } diff --git a/lib/src/phy/rf/rf_zmq_imp_rx.c b/lib/src/phy/rf/rf_zmq_imp_rx.c index 85567c1fe..5e48a0bb5 100644 --- a/lib/src/phy/rf/rf_zmq_imp_rx.c +++ b/lib/src/phy/rf/rf_zmq_imp_rx.c @@ -17,20 +17,11 @@ #include #include -bool is_rx_running(rf_zmq_rx_t* q) -{ - bool running = false; - pthread_mutex_lock(&q->mutex); - running = q->running; - pthread_mutex_unlock(&q->mutex); - return running; -} - static void* rf_zmq_async_rx_thread(void* h) { rf_zmq_rx_t* q = (rf_zmq_rx_t*)h; - while (q->sock && is_rx_running(q)) { + while (q->sock && rf_zmq_rx_is_running(q)) { int nbytes = 0; int n = SRSRAN_ERROR; uint8_t dummy = 0xFF; @@ -39,7 +30,7 @@ static void* rf_zmq_async_rx_thread(void* h) // Send request if socket type is REQUEST if (q->socket_type == ZMQ_REQ) { - while (n < 0 && is_rx_running(q)) { + while (n < 0 && rf_zmq_rx_is_running(q)) { rf_zmq_info(q->id, " - tx'ing rx request\n"); n = zmq_send(q->sock, &dummy, sizeof(dummy), 0); if (n < 0) { @@ -53,7 +44,7 @@ static void* rf_zmq_async_rx_thread(void* h) } // Receive baseband - for (n = (n < 0) ? 0 : -1; n < 0 && is_rx_running(q);) { + for (n = (n < 0) ? 0 : -1; n < 0 && rf_zmq_rx_is_running(q);) { n = zmq_recv(q->sock, q->temp_buffer, ZMQ_MAX_BUFFER_SIZE, 0); if (n == -1) { if (rf_zmq_handle_error(q->id, "asynchronous rx baseband receive")) { @@ -77,7 +68,7 @@ static void* rf_zmq_async_rx_thread(void* h) n = -1; // Try to write in ring buffer - while (n < 0 && is_rx_running(q)) { + while (n < 0 && rf_zmq_rx_is_running(q)) { n = srsran_ringbuffer_write_timed(&q->ringbuffer, q->temp_buffer, nbytes, q->trx_timeout_ms); if (n == SRSRAN_ERROR_TIMEOUT && q->log_trx_timeout) { fprintf(stderr, "Error: timeout writing samples to ringbuffer after %dms\n", q->trx_timeout_ms); @@ -296,3 +287,17 @@ void rf_zmq_rx_close(rf_zmq_rx_t* q) } #endif // ZMQ_MONITOR } + +bool rf_zmq_rx_is_running(rf_zmq_rx_t* q) +{ + if (!q) { + return false; + } + + bool ret = false; + pthread_mutex_lock(&q->mutex); + ret = q->running; + pthread_mutex_unlock(&q->mutex); + + return ret; +} diff --git a/lib/src/phy/rf/rf_zmq_imp_trx.h b/lib/src/phy/rf/rf_zmq_imp_trx.h index c574581a9..04b100b05 100644 --- a/lib/src/phy/rf/rf_zmq_imp_trx.h +++ b/lib/src/phy/rf/rf_zmq_imp_trx.h @@ -105,6 +105,8 @@ SRSRAN_API bool rf_zmq_tx_match_freq(rf_zmq_tx_t* q, uint32_t freq_hz); SRSRAN_API void rf_zmq_tx_close(rf_zmq_tx_t* q); +SRSRAN_API bool rf_zmq_tx_is_running(rf_zmq_tx_t* q); + /* * Receiver functions */ @@ -116,4 +118,6 @@ SRSRAN_API bool rf_zmq_rx_match_freq(rf_zmq_rx_t* q, uint32_t freq_hz); SRSRAN_API void rf_zmq_rx_close(rf_zmq_rx_t* q); +SRSRAN_API bool rf_zmq_rx_is_running(rf_zmq_rx_t* q); + #endif // SRSRAN_RF_ZMQ_IMP_TRX_H diff --git a/lib/src/phy/rf/rf_zmq_imp_tx.c b/lib/src/phy/rf/rf_zmq_imp_tx.c index a7e8bf5db..f8178d98d 100644 --- a/lib/src/phy/rf/rf_zmq_imp_tx.c +++ b/lib/src/phy/rf/rf_zmq_imp_tx.c @@ -250,3 +250,17 @@ void rf_zmq_tx_close(rf_zmq_tx_t* q) q->sock = NULL; } } + +bool rf_zmq_tx_is_running(rf_zmq_tx_t* q) +{ + if (!q) { + return false; + } + + bool ret = false; + pthread_mutex_lock(&q->mutex); + ret = q->running; + pthread_mutex_unlock(&q->mutex); + + return ret; +}