Fix concurrency bug in ZMQ during srate change (#954)

The bug was manifesting as buffer overflow in 20 MHz 2x2 CA
This commit is contained in:
Ismael Gomez 2020-02-16 21:31:51 +01:00 committed by GitHub
parent d8d10daebe
commit be82e1f368
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 22 deletions

View File

@ -64,6 +64,7 @@ typedef struct {
pthread_mutex_t tx_config_mutex;
pthread_mutex_t rx_config_mutex;
pthread_mutex_t decim_mutex;
} rf_zmq_handler_t;
void update_rates(rf_zmq_handler_t* handler, double srate);
@ -242,6 +243,9 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels)
if (pthread_mutex_init(&handler->rx_config_mutex, NULL)) {
perror("Mutex init");
}
if (pthread_mutex_init(&handler->decim_mutex, NULL)) {
perror("Mutex init");
}
// parse args
if (args && strlen(args)) {
@ -524,6 +528,7 @@ int rf_zmq_close(void* h)
pthread_mutex_destroy(&handler->tx_config_mutex);
pthread_mutex_destroy(&handler->rx_config_mutex);
pthread_mutex_destroy(&handler->decim_mutex);
// Free all
free(handler);
@ -533,6 +538,7 @@ int rf_zmq_close(void* h)
void update_rates(rf_zmq_handler_t* handler, double srate)
{
pthread_mutex_lock(&handler->decim_mutex);
if (handler) {
// Decimation must be full integer
if (((uint64_t)handler->base_srate % (uint64_t)srate) == 0) {
@ -549,6 +555,7 @@ void update_rates(rf_zmq_handler_t* handler, double srate)
handler->base_srate / 1e6,
handler->decim_factor);
}
pthread_mutex_unlock(&handler->decim_mutex);
}
double rf_zmq_set_rx_srate(void* h, double srate)
@ -715,11 +722,6 @@ int rf_zmq_recv_with_time_multi(void* h,
if (h) {
rf_zmq_handler_t* handler = (rf_zmq_handler_t*)h;
uint32_t nbytes = NSAMPLES2NBYTES(nsamples * handler->decim_factor);
uint32_t nsamples_baserate = nsamples * handler->decim_factor;
rf_zmq_info(handler->id, "Rx %d samples (%d B)\n", nsamples, nbytes);
// Map ports to data buffers according to the selected frequencies
pthread_mutex_lock(&handler->rx_config_mutex);
cf_t* buffers[SRSLTE_MAX_PORTS] = {}; // Buffer pointers, NULL if unmatched
@ -743,6 +745,16 @@ int rf_zmq_recv_with_time_multi(void* h,
}
pthread_mutex_unlock(&handler->rx_config_mutex);
// Protect the access to decim_factor since is a shared variable
pthread_mutex_lock(&handler->decim_mutex);
uint32_t decim_factor = handler->decim_factor;
pthread_mutex_unlock(&handler->decim_mutex);
uint32_t nbytes = NSAMPLES2NBYTES(nsamples * decim_factor);
uint32_t nsamples_baserate = nsamples * decim_factor;
rf_zmq_info(handler->id, "Rx %d samples (%d B)\n", nsamples, nbytes);
// set timestamp for this reception
if (secs != NULL && frac_secs != NULL) {
srslte_timestamp_t ts = {};
@ -792,7 +804,7 @@ int rf_zmq_recv_with_time_multi(void* h,
// Iterate channels
for (uint32_t i = 0; i < handler->nof_channels; i++) {
cf_t* ptr = (handler->decim_factor != 1 || buffers[i] == NULL) ? handler->buffer_decimation[i] : buffers[i];
cf_t* ptr = (decim_factor != 1 || buffers[i] == NULL) ? handler->buffer_decimation[i] : buffers[i];
// Completed condition
if (count[i] < nsamples_baserate && handler->receiver[i].running) {
@ -843,7 +855,7 @@ int rf_zmq_recv_with_time_multi(void* h,
NBYTES2NSAMPLES(srslte_ringbuffer_status(&handler->receiver[0].ringbuffer)));
// decimate if needed
if (handler->decim_factor != 1) {
if (decim_factor != 1) {
for (uint32_t c = 0; c < handler->nof_channels; c++) {
// skip if buffer is not available
if (buffers[c]) {
@ -853,7 +865,7 @@ int rf_zmq_recv_with_time_multi(void* h,
for (uint32_t i = 0, n = 0; i < nsamples; i++) {
// Averaging decimation
cf_t avg = 0.0f;
for (int j = 0; j < handler->decim_factor; j++, n++) {
for (int j = 0; j < decim_factor; j++, n++) {
avg += ptr[n];
}
dst[i] = avg;
@ -861,7 +873,7 @@ int rf_zmq_recv_with_time_multi(void* h,
rf_zmq_info(handler->id,
" - re-adjust bytes due to %dx decimation %d --> %d samples)\n",
handler->decim_factor,
decim_factor,
nsamples_baserate,
nsamples);
}
@ -918,14 +930,6 @@ int rf_zmq_send_timed_multi(void* h,
if (h && data && nsamples > 0) {
rf_zmq_handler_t* handler = (rf_zmq_handler_t*)h;
uint32_t nbytes = NSAMPLES2NBYTES(nsamples);
uint32_t nsamples_baseband = nsamples * handler->decim_factor;
uint32_t nbytes_baseband = NSAMPLES2NBYTES(nsamples_baseband);
if (nbytes_baseband > ZMQ_MAX_BUFFER_SIZE) {
fprintf(stderr, "Error: trying to transmit too many samples (%d > %zu).\n", nbytes, ZMQ_MAX_BUFFER_SIZE);
goto clean_exit;
}
// Map ports to data buffers according to the selected frequencies
pthread_mutex_lock(&handler->tx_config_mutex);
@ -945,6 +949,19 @@ int rf_zmq_send_timed_multi(void* h,
}
pthread_mutex_unlock(&handler->tx_config_mutex);
// Protect the access to decim_factor since is a shared variable
pthread_mutex_lock(&handler->decim_mutex);
uint32_t decim_factor = handler->decim_factor;
pthread_mutex_unlock(&handler->decim_mutex);
uint32_t nbytes = NSAMPLES2NBYTES(nsamples);
uint32_t nsamples_baseband = nsamples * decim_factor;
uint32_t nbytes_baseband = NSAMPLES2NBYTES(nsamples_baseband);
if (nbytes_baseband > ZMQ_MAX_BUFFER_SIZE) {
fprintf(stderr, "Error: trying to transmit too many samples (%d > %zu).\n", nbytes, ZMQ_MAX_BUFFER_SIZE);
goto clean_exit;
}
rf_zmq_info(handler->id, "Tx %d samples (%d B)\n", nsamples, nbytes);
// return if transmitter is switched off
@ -981,13 +998,13 @@ int rf_zmq_send_timed_multi(void* h,
for (int i = 0; i < handler->nof_channels; i++) {
if (buffers[i] != NULL) {
// Select buffer pointer depending on interpolation
cf_t* buf = (handler->decim_factor != 1) ? handler->buffer_tx : buffers[i];
cf_t* buf = (decim_factor != 1) ? handler->buffer_tx : buffers[i];
// Interpolate if required
if (handler->decim_factor != 1) {
if (decim_factor != 1) {
rf_zmq_info(handler->id,
" - re-adjust bytes due to %dx interpolation %d --> %d samples)\n",
handler->decim_factor,
decim_factor,
nsamples,
nsamples_baseband);
@ -995,7 +1012,7 @@ int rf_zmq_send_timed_multi(void* h,
cf_t* src = data[i];
for (int k = 0; k < nsamples; k++) {
// perform zero order hold
for (int j = 0; j < handler->decim_factor; j++, n++) {
for (int j = 0; j < decim_factor; j++, n++) {
buf[n] = src[k];
}
}

View File

@ -374,8 +374,12 @@ void sync::run_thread()
cf_t* dummy_buffer[SRSLTE_MAX_PORTS];
uint32_t nof_rf_channels = worker_com->args->nof_rf_channels * worker_com->args->nof_rx_ant;
if (nof_rf_channels > SRSLTE_MAX_PORTS) {
fprintf(stderr, "Fatal error: nof_rf_channels x nof_rx_ant must be lower than %d\n", SRSLTE_MAX_PORTS);
return;
}
for (uint32_t i = 0; i < nof_rf_channels; i++) {
dummy_buffer[i] = (cf_t*)malloc(sizeof(cf_t) * SRSLTE_SF_LEN_PRB(100));
dummy_buffer[i] = (cf_t*)srslte_vec_cf_malloc(3 * SRSLTE_SF_LEN_PRB(100));
}
uint32_t prach_nof_sf = 0;