zmq: optionally log rx/tx timout from ZMQ ringbuffers to stderr

this patch adds two new config flags to the ZMQ driver that allows to:
* configure the default ZMQ trx timeout in ms
* turn on error logging if the timeout occurs

Use with, e.g.:
device_args = log_trx_timeout=true,trx_timeout_ms=3333
This commit is contained in:
Andre Puschmann 2021-02-16 11:26:27 +01:00
parent 984c4a4748
commit c1a1c92e1b
5 changed files with 58 additions and 34 deletions

View File

@ -317,6 +317,16 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels)
rx_opts.fail_on_disconnect = true;
}
// trx_timeout_ms
rx_opts.trx_timeout_ms = ZMQ_TIMEOUT_MS;
parse_uint32(args, "trx_timeout_ms", i, &rx_opts.trx_timeout_ms);
// log_trx_timeout
parse_string(args, "log_trx_timeout", i, tmp);
if (strncmp(tmp, "true", RF_PARAM_LEN) == 0 || strncmp(tmp, "yes", RF_PARAM_LEN) == 0) {
rx_opts.log_trx_timeout = true;
}
// initialize transmitter
if (strlen(tx_port) != 0) {
if (rf_zmq_tx_open(&handler->transmitter[i], tx_opts, handler->context, tx_port) != SRSLTE_SUCCESS) {
@ -707,6 +717,9 @@ int rf_zmq_recv_with_time_multi(void* h,
// No error
count[i] += n;
} else if (n == SRSLTE_ERROR_TIMEOUT) {
if (handler->receiver[i].log_trx_timeout) {
fprintf(stderr, "Error: timeout receiving samples after %dms\n", handler->receiver[i].trx_timeout_ms);
}
// Other end disconnected, either keep going, or fail
if (handler->receiver[i].fail_on_disconnect) {
goto clean_exit;

View File

@ -69,7 +69,10 @@ static void* rf_zmq_async_rx_thread(void* h)
// Try to write in ring buffer
while (n < 0 && q->running) {
n = srslte_ringbuffer_write_timed(&q->ringbuffer, q->temp_buffer, nbytes, ZMQ_TIMEOUT_MS);
n = srslte_ringbuffer_write_timed(&q->ringbuffer, q->temp_buffer, nbytes, q->trx_timeout_ms);
if (n == SRSLTE_ERROR_TIMEOUT && q->log_trx_timeout) {
fprintf(stderr, "Error: timeout writing samples to ringbuffer after %dms\n", q->trx_timeout_ms);
}
}
// Check write
@ -109,6 +112,8 @@ int rf_zmq_rx_open(rf_zmq_rx_t* q, rf_zmq_opts_t opts, void* zmq_ctx, char* sock
q->frequency_mhz = opts.frequency_mhz;
q->fail_on_disconnect = opts.fail_on_disconnect;
q->sample_offset = opts.sample_offset;
q->trx_timeout_ms = opts.trx_timeout_ms;
q->log_trx_timeout = opts.log_trx_timeout;
if (opts.socket_type == ZMQ_SUB) {
zmq_setsockopt(q->sock, ZMQ_SUBSCRIBE, "", 0);
@ -139,24 +144,24 @@ int rf_zmq_rx_open(rf_zmq_rx_t* q, rf_zmq_opts_t opts, void* zmq_ctx, char* sock
goto clean_exit;
}
#if ZMQ_TIMEOUT_MS
int timeout = ZMQ_TIMEOUT_MS;
if (zmq_setsockopt(q->sock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on rx socket\n");
goto clean_exit;
}
if (opts.trx_timeout_ms) {
int timeout = opts.trx_timeout_ms;
if (zmq_setsockopt(q->sock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on rx socket\n");
goto clean_exit;
}
if (zmq_setsockopt(q->sock, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on rx socket\n");
goto clean_exit;
}
if (zmq_setsockopt(q->sock, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on rx socket\n");
goto clean_exit;
}
timeout = 0;
if (zmq_setsockopt(q->sock, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting linger timeout on rx socket\n");
goto clean_exit;
timeout = 0;
if (zmq_setsockopt(q->sock, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting linger timeout on rx socket\n");
goto clean_exit;
}
}
#endif
if (srslte_ringbuffer_init(&q->ringbuffer, ZMQ_MAX_BUFFER_SIZE)) {
fprintf(stderr, "Error: initiating ringbuffer\n");
@ -216,14 +221,15 @@ int rf_zmq_rx_baseband(rf_zmq_rx_t* q, cf_t* buffer, uint32_t nsamples)
// If the read needs to be advanced
while (q->sample_offset < 0) {
uint32_t n_offset = SRSLTE_MIN(-q->sample_offset, NBYTES2NSAMPLES(ZMQ_MAX_BUFFER_SIZE));
int n = srslte_ringbuffer_read_timed(&q->ringbuffer, q->temp_buffer, (int)(n_offset * sample_sz), ZMQ_TIMEOUT_MS);
int n =
srslte_ringbuffer_read_timed(&q->ringbuffer, q->temp_buffer, (int)(n_offset * sample_sz), q->trx_timeout_ms);
if (n < SRSLTE_SUCCESS) {
return n;
}
q->sample_offset += n_offset;
}
int n = srslte_ringbuffer_read_timed(&q->ringbuffer, dst_buffer, sample_sz * nsamples, ZMQ_TIMEOUT_MS);
int n = srslte_ringbuffer_read_timed(&q->ringbuffer, dst_buffer, sample_sz * nsamples, q->trx_timeout_ms);
if (n < 0) {
return n;
}

View File

@ -63,6 +63,8 @@ typedef struct {
void* temp_buffer_convert;
uint32_t frequency_mhz;
bool fail_on_disconnect;
uint32_t trx_timeout_ms;
bool log_trx_timeout;
int32_t sample_offset;
} rf_zmq_rx_t;
@ -72,6 +74,8 @@ typedef struct {
rf_zmq_format_t sample_format;
uint32_t frequency_mhz;
bool fail_on_disconnect;
uint32_t trx_timeout_ms;
bool log_trx_timeout;
int32_t sample_offset; ///< offset in samples
} rf_zmq_opts_t;

View File

@ -49,24 +49,24 @@ int rf_zmq_tx_open(rf_zmq_tx_t* q, rf_zmq_opts_t opts, void* zmq_ctx, char* sock
goto clean_exit;
}
#if ZMQ_TIMEOUT_MS
int timeout = ZMQ_TIMEOUT_MS;
if (zmq_setsockopt(q->sock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on tx socket\n");
goto clean_exit;
}
if (opts.trx_timeout_ms) {
int timeout = opts.trx_timeout_ms;
if (zmq_setsockopt(q->sock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on tx socket\n");
goto clean_exit;
}
if (zmq_setsockopt(q->sock, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on tx socket\n");
goto clean_exit;
}
if (zmq_setsockopt(q->sock, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting receive timeout on tx socket\n");
goto clean_exit;
}
timeout = 0;
if (zmq_setsockopt(q->sock, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting linger timeout on tx socket\n");
goto clean_exit;
timeout = 0;
if (zmq_setsockopt(q->sock, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) {
fprintf(stderr, "Error: setting linger timeout on tx socket\n");
goto clean_exit;
}
}
#endif
if (pthread_mutex_init(&q->mutex, NULL)) {
fprintf(stderr, "Error: creating mutex\n");

View File

@ -229,7 +229,8 @@ int main()
}
// two trx radios with continous tx (no timed tx) using TCP transport for both directions
if (run_test("tx_port=tcp://*:5554,rx_port=tcp://localhost:5555,id=ue,base_srate=1.92e6",
if (run_test("tx_port=tcp://*:5554,rx_port=tcp://"
"localhost:5555,id=ue,base_srate=1.92e6,log_trx_timeout=true,trx_timeout_ms=1000",
"rx_port=tcp://localhost:5554,tx_port=tcp://*:5555,id=enb,base_srate=1.92e6",
false) != SRSLTE_SUCCESS) {
fprintf(stderr, "Two TRx radio test failed!\n");