diff --git a/lib/src/phy/rf/rf_zmq_imp_rx.c b/lib/src/phy/rf/rf_zmq_imp_rx.c index bb84f7506..4d0678c61 100644 --- a/lib/src/phy/rf/rf_zmq_imp_rx.c +++ b/lib/src/phy/rf/rf_zmq_imp_rx.c @@ -17,11 +17,20 @@ #include #include +bool is_rx_running(rf_zmq_rx_t* q) +{ + bool running = false; + pthread_mutex_lock(&q->mutex); + running = q->running; + pthread_mutex_unlock(&q->mutex); + return running; +} + static void* rf_zmq_async_rx_thread(void* h) { rf_zmq_rx_t* q = (rf_zmq_rx_t*)h; - while (q->sock && q->running) { + while (q->sock && is_rx_running(q)) { int nbytes = 0; int n = SRSRAN_ERROR; uint8_t dummy = 0xFF; @@ -30,7 +39,7 @@ static void* rf_zmq_async_rx_thread(void* h) // Send request if socket type is REQUEST if (q->socket_type == ZMQ_REQ) { - while (n < 0 && q->running) { + while (n < 0 && is_rx_running(q)) { rf_zmq_info(q->id, " - tx'ing rx request\n"); n = zmq_send(q->sock, &dummy, sizeof(dummy), 0); if (n < 0) { @@ -44,7 +53,7 @@ static void* rf_zmq_async_rx_thread(void* h) } // Receive baseband - for (n = (n < 0) ? 0 : -1; n < 0 && q->running;) { + for (n = (n < 0) ? 0 : -1; n < 0 && is_rx_running(q);) { n = zmq_recv(q->sock, q->temp_buffer, ZMQ_MAX_BUFFER_SIZE, 0); if (n == -1) { if (rf_zmq_handle_error(q->id, "asynchronous rx baseband receive")) { @@ -68,7 +77,7 @@ static void* rf_zmq_async_rx_thread(void* h) n = -1; // Try to write in ring buffer - while (n < 0 && q->running) { + while (n < 0 && is_rx_running(q)) { n = srsran_ringbuffer_write_timed(&q->ringbuffer, q->temp_buffer, nbytes, q->trx_timeout_ms); if (n == SRSRAN_ERROR_TIMEOUT && q->log_trx_timeout) { fprintf(stderr, "Error: timeout writing samples to ringbuffer after %dms\n", q->trx_timeout_ms); @@ -253,13 +262,18 @@ bool rf_zmq_rx_match_freq(rf_zmq_rx_t* q, uint32_t freq_hz) void rf_zmq_rx_close(rf_zmq_rx_t* q) { rf_zmq_info(q->id, "Closing ...\n"); + + pthread_mutex_lock(&q->mutex); q->running = false; + pthread_mutex_unlock(&q->mutex); if (q->thread) { pthread_join(q->thread, NULL); pthread_detach(q->thread); } + pthread_mutex_destroy(&q->mutex); + srsran_ringbuffer_free(&q->ringbuffer); if (q->temp_buffer) { diff --git a/lib/src/phy/rf/rf_zmq_imp_tx.c b/lib/src/phy/rf/rf_zmq_imp_tx.c index 81417b53c..078a5b92f 100644 --- a/lib/src/phy/rf/rf_zmq_imp_tx.c +++ b/lib/src/phy/rf/rf_zmq_imp_tx.c @@ -223,7 +223,11 @@ bool rf_zmq_tx_match_freq(rf_zmq_tx_t* q, uint32_t freq_hz) void rf_zmq_tx_close(rf_zmq_tx_t* q) { + pthread_mutex_lock(&q->mutex); q->running = false; + pthread_mutex_unlock(&q->mutex); + + pthread_mutex_destroy(&q->mutex); if (q->zeros) { free(q->zeros); diff --git a/lib/src/phy/utils/ringbuffer.c b/lib/src/phy/utils/ringbuffer.c index 53359c356..cdd6ee7f7 100644 --- a/lib/src/phy/utils/ringbuffer.c +++ b/lib/src/phy/utils/ringbuffer.c @@ -78,7 +78,11 @@ int srsran_ringbuffer_resize(srsran_ringbuffer_t* q, int capacity) int srsran_ringbuffer_status(srsran_ringbuffer_t* q) { - return q->count; + int status = 0; + pthread_mutex_lock(&q->mutex); + status = q->count; + pthread_mutex_unlock(&q->mutex); + return status; } int srsran_ringbuffer_space(srsran_ringbuffer_t* q)