Fix data race in ZMQ where nsamples and running variables could be read concurrently.

This commit is contained in:
faluco 2021-10-25 12:26:24 +02:00 committed by Andre Puschmann
parent 2f5a1ad2e3
commit 5ff6b2eaa9
4 changed files with 41 additions and 18 deletions

View File

@ -678,7 +678,7 @@ int rf_zmq_recv_with_time_multi(void* h, void** data, uint32_t nsamples, bool bl
}
// return if receiver is turned off
if (!handler->receiver[0].running) {
if (!rf_zmq_rx_is_running(&handler->receiver[0])) {
update_ts(handler, &handler->next_rx_ts, nsamples_baserate, "rx");
return nsamples;
}
@ -705,7 +705,7 @@ int rf_zmq_recv_with_time_multi(void* h, void** data, uint32_t nsamples, bool bl
// check for tx gap if we're also transmitting on this radio
for (int i = 0; i < handler->nof_channels; i++) {
if (handler->transmitter[i].running) {
if (rf_zmq_tx_is_running(&handler->transmitter[i])) {
rf_zmq_tx_align(&handler->transmitter[i], handler->next_rx_ts + nsamples_baserate);
}
}
@ -721,7 +721,7 @@ int rf_zmq_recv_with_time_multi(void* h, void** data, uint32_t nsamples, bool bl
cf_t* ptr = (decim_factor != 1 || buffers[i] == NULL) ? handler->buffer_decimation[i] : buffers[i];
// Completed condition
if (count[i] < nsamples_baserate && handler->receiver[i].running) {
if (count[i] < nsamples_baserate && rf_zmq_rx_is_running(&handler->receiver[i])) {
// Keep receiving
int32_t n = rf_zmq_rx_baseband(&handler->receiver[i], &ptr[count[i]], nsamples_baserate);
#if ZMQ_MONITOR
@ -909,7 +909,7 @@ int rf_zmq_send_timed_multi(void* h,
int num_tx_gap_samples = 0;
for (int i = 0; i < handler->nof_channels; i++) {
if (handler->transmitter[i].running) {
if (rf_zmq_tx_is_running(&handler->transmitter[i])) {
num_tx_gap_samples = rf_zmq_tx_align(&handler->transmitter[i], tx_ts);
}
}
@ -919,7 +919,7 @@ int rf_zmq_send_timed_multi(void* h,
"[zmq] Error: tx time is %.3f ms in the past (%" PRIu64 " < %" PRIu64 ")\n",
-1000.0 * num_tx_gap_samples / handler->base_srate,
tx_ts,
handler->transmitter[0].nsamples);
(uint64_t)rf_zmq_tx_get_nsamples(&handler->transmitter[0]));
goto clean_exit;
}
}

View File

@ -17,20 +17,11 @@
#include <string.h>
#include <zmq.h>
bool is_rx_running(rf_zmq_rx_t* q)
{
bool running = false;
pthread_mutex_lock(&q->mutex);
running = q->running;
pthread_mutex_unlock(&q->mutex);
return running;
}
static void* rf_zmq_async_rx_thread(void* h)
{
rf_zmq_rx_t* q = (rf_zmq_rx_t*)h;
while (q->sock && is_rx_running(q)) {
while (q->sock && rf_zmq_rx_is_running(q)) {
int nbytes = 0;
int n = SRSRAN_ERROR;
uint8_t dummy = 0xFF;
@ -39,7 +30,7 @@ static void* rf_zmq_async_rx_thread(void* h)
// Send request if socket type is REQUEST
if (q->socket_type == ZMQ_REQ) {
while (n < 0 && is_rx_running(q)) {
while (n < 0 && rf_zmq_rx_is_running(q)) {
rf_zmq_info(q->id, " - tx'ing rx request\n");
n = zmq_send(q->sock, &dummy, sizeof(dummy), 0);
if (n < 0) {
@ -53,7 +44,7 @@ static void* rf_zmq_async_rx_thread(void* h)
}
// Receive baseband
for (n = (n < 0) ? 0 : -1; n < 0 && is_rx_running(q);) {
for (n = (n < 0) ? 0 : -1; n < 0 && rf_zmq_rx_is_running(q);) {
n = zmq_recv(q->sock, q->temp_buffer, ZMQ_MAX_BUFFER_SIZE, 0);
if (n == -1) {
if (rf_zmq_handle_error(q->id, "asynchronous rx baseband receive")) {
@ -77,7 +68,7 @@ static void* rf_zmq_async_rx_thread(void* h)
n = -1;
// Try to write in ring buffer
while (n < 0 && is_rx_running(q)) {
while (n < 0 && rf_zmq_rx_is_running(q)) {
n = srsran_ringbuffer_write_timed(&q->ringbuffer, q->temp_buffer, nbytes, q->trx_timeout_ms);
if (n == SRSRAN_ERROR_TIMEOUT && q->log_trx_timeout) {
fprintf(stderr, "Error: timeout writing samples to ringbuffer after %dms\n", q->trx_timeout_ms);
@ -296,3 +287,17 @@ void rf_zmq_rx_close(rf_zmq_rx_t* q)
}
#endif // ZMQ_MONITOR
}
bool rf_zmq_rx_is_running(rf_zmq_rx_t* q)
{
if (!q) {
return false;
}
bool ret = false;
pthread_mutex_lock(&q->mutex);
ret = q->running;
pthread_mutex_unlock(&q->mutex);
return ret;
}

View File

@ -105,6 +105,8 @@ SRSRAN_API bool rf_zmq_tx_match_freq(rf_zmq_tx_t* q, uint32_t freq_hz);
SRSRAN_API void rf_zmq_tx_close(rf_zmq_tx_t* q);
SRSRAN_API bool rf_zmq_tx_is_running(rf_zmq_tx_t* q);
/*
* Receiver functions
*/
@ -116,4 +118,6 @@ SRSRAN_API bool rf_zmq_rx_match_freq(rf_zmq_rx_t* q, uint32_t freq_hz);
SRSRAN_API void rf_zmq_rx_close(rf_zmq_rx_t* q);
SRSRAN_API bool rf_zmq_rx_is_running(rf_zmq_rx_t* q);
#endif // SRSRAN_RF_ZMQ_IMP_TRX_H

View File

@ -250,3 +250,17 @@ void rf_zmq_tx_close(rf_zmq_tx_t* q)
q->sock = NULL;
}
}
bool rf_zmq_tx_is_running(rf_zmq_tx_t* q)
{
if (!q) {
return false;
}
bool ret = false;
pthread_mutex_lock(&q->mutex);
ret = q->running;
pthread_mutex_unlock(&q->mutex);
return ret;
}