zmq,ringbuffer: protect concurrent access

some issue found through TSAN execution
This commit is contained in:
Andre Puschmann 2021-05-18 12:32:05 +02:00
parent f1e6a975de
commit d8b2cfcef8
3 changed files with 27 additions and 5 deletions

View File

@ -17,11 +17,20 @@
#include <string.h>
#include <zmq.h>
bool is_rx_running(rf_zmq_rx_t* q)
{
bool running = false;
pthread_mutex_lock(&q->mutex);
running = q->running;
pthread_mutex_unlock(&q->mutex);
return running;
}
static void* rf_zmq_async_rx_thread(void* h)
{
rf_zmq_rx_t* q = (rf_zmq_rx_t*)h;
while (q->sock && q->running) {
while (q->sock && is_rx_running(q)) {
int nbytes = 0;
int n = SRSRAN_ERROR;
uint8_t dummy = 0xFF;
@ -30,7 +39,7 @@ static void* rf_zmq_async_rx_thread(void* h)
// Send request if socket type is REQUEST
if (q->socket_type == ZMQ_REQ) {
while (n < 0 && q->running) {
while (n < 0 && is_rx_running(q)) {
rf_zmq_info(q->id, " - tx'ing rx request\n");
n = zmq_send(q->sock, &dummy, sizeof(dummy), 0);
if (n < 0) {
@ -44,7 +53,7 @@ static void* rf_zmq_async_rx_thread(void* h)
}
// Receive baseband
for (n = (n < 0) ? 0 : -1; n < 0 && q->running;) {
for (n = (n < 0) ? 0 : -1; n < 0 && is_rx_running(q);) {
n = zmq_recv(q->sock, q->temp_buffer, ZMQ_MAX_BUFFER_SIZE, 0);
if (n == -1) {
if (rf_zmq_handle_error(q->id, "asynchronous rx baseband receive")) {
@ -68,7 +77,7 @@ static void* rf_zmq_async_rx_thread(void* h)
n = -1;
// Try to write in ring buffer
while (n < 0 && q->running) {
while (n < 0 && is_rx_running(q)) {
n = srsran_ringbuffer_write_timed(&q->ringbuffer, q->temp_buffer, nbytes, q->trx_timeout_ms);
if (n == SRSRAN_ERROR_TIMEOUT && q->log_trx_timeout) {
fprintf(stderr, "Error: timeout writing samples to ringbuffer after %dms\n", q->trx_timeout_ms);
@ -253,13 +262,18 @@ bool rf_zmq_rx_match_freq(rf_zmq_rx_t* q, uint32_t freq_hz)
void rf_zmq_rx_close(rf_zmq_rx_t* q)
{
rf_zmq_info(q->id, "Closing ...\n");
pthread_mutex_lock(&q->mutex);
q->running = false;
pthread_mutex_unlock(&q->mutex);
if (q->thread) {
pthread_join(q->thread, NULL);
pthread_detach(q->thread);
}
pthread_mutex_destroy(&q->mutex);
srsran_ringbuffer_free(&q->ringbuffer);
if (q->temp_buffer) {

View File

@ -223,7 +223,11 @@ bool rf_zmq_tx_match_freq(rf_zmq_tx_t* q, uint32_t freq_hz)
void rf_zmq_tx_close(rf_zmq_tx_t* q)
{
pthread_mutex_lock(&q->mutex);
q->running = false;
pthread_mutex_unlock(&q->mutex);
pthread_mutex_destroy(&q->mutex);
if (q->zeros) {
free(q->zeros);

View File

@ -78,7 +78,11 @@ int srsran_ringbuffer_resize(srsran_ringbuffer_t* q, int capacity)
int srsran_ringbuffer_status(srsran_ringbuffer_t* q)
{
return q->count;
int status = 0;
pthread_mutex_lock(&q->mutex);
status = q->count;
pthread_mutex_unlock(&q->mutex);
return status;
}
int srsran_ringbuffer_space(srsran_ringbuffer_t* q)