diff --git a/lib/src/phy/rf/rf_zmq_imp.c b/lib/src/phy/rf/rf_zmq_imp.c index a61bed3bd..7878e549a 100644 --- a/lib/src/phy/rf/rf_zmq_imp.c +++ b/lib/src/phy/rf/rf_zmq_imp.c @@ -317,6 +317,16 @@ int rf_zmq_open_multi(char* args, void** h, uint32_t nof_channels) rx_opts.fail_on_disconnect = true; } + // trx_timeout_ms + rx_opts.trx_timeout_ms = ZMQ_TIMEOUT_MS; + parse_uint32(args, "trx_timeout_ms", i, &rx_opts.trx_timeout_ms); + + // log_trx_timeout + parse_string(args, "log_trx_timeout", i, tmp); + if (strncmp(tmp, "true", RF_PARAM_LEN) == 0 || strncmp(tmp, "yes", RF_PARAM_LEN) == 0) { + rx_opts.log_trx_timeout = true; + } + // initialize transmitter if (strlen(tx_port) != 0) { if (rf_zmq_tx_open(&handler->transmitter[i], tx_opts, handler->context, tx_port) != SRSLTE_SUCCESS) { @@ -707,6 +717,9 @@ int rf_zmq_recv_with_time_multi(void* h, // No error count[i] += n; } else if (n == SRSLTE_ERROR_TIMEOUT) { + if (handler->receiver[i].log_trx_timeout) { + fprintf(stderr, "Error: timeout receiving samples after %dms\n", handler->receiver[i].trx_timeout_ms); + } // Other end disconnected, either keep going, or fail if (handler->receiver[i].fail_on_disconnect) { goto clean_exit; diff --git a/lib/src/phy/rf/rf_zmq_imp_rx.c b/lib/src/phy/rf/rf_zmq_imp_rx.c index ed325116f..a5f2f91e5 100644 --- a/lib/src/phy/rf/rf_zmq_imp_rx.c +++ b/lib/src/phy/rf/rf_zmq_imp_rx.c @@ -69,7 +69,10 @@ static void* rf_zmq_async_rx_thread(void* h) // Try to write in ring buffer while (n < 0 && q->running) { - n = srslte_ringbuffer_write_timed(&q->ringbuffer, q->temp_buffer, nbytes, ZMQ_TIMEOUT_MS); + n = srslte_ringbuffer_write_timed(&q->ringbuffer, q->temp_buffer, nbytes, q->trx_timeout_ms); + if (n == SRSLTE_ERROR_TIMEOUT && q->log_trx_timeout) { + fprintf(stderr, "Error: timeout writing samples to ringbuffer after %dms\n", q->trx_timeout_ms); + } } // Check write @@ -109,6 +112,8 @@ int rf_zmq_rx_open(rf_zmq_rx_t* q, rf_zmq_opts_t opts, void* zmq_ctx, char* sock q->frequency_mhz = opts.frequency_mhz; q->fail_on_disconnect = opts.fail_on_disconnect; q->sample_offset = opts.sample_offset; + q->trx_timeout_ms = opts.trx_timeout_ms; + q->log_trx_timeout = opts.log_trx_timeout; if (opts.socket_type == ZMQ_SUB) { zmq_setsockopt(q->sock, ZMQ_SUBSCRIBE, "", 0); @@ -139,24 +144,24 @@ int rf_zmq_rx_open(rf_zmq_rx_t* q, rf_zmq_opts_t opts, void* zmq_ctx, char* sock goto clean_exit; } -#if ZMQ_TIMEOUT_MS - int timeout = ZMQ_TIMEOUT_MS; - if (zmq_setsockopt(q->sock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) { - fprintf(stderr, "Error: setting receive timeout on rx socket\n"); - goto clean_exit; - } + if (opts.trx_timeout_ms) { + int timeout = opts.trx_timeout_ms; + if (zmq_setsockopt(q->sock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) { + fprintf(stderr, "Error: setting receive timeout on rx socket\n"); + goto clean_exit; + } - if (zmq_setsockopt(q->sock, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) { - fprintf(stderr, "Error: setting receive timeout on rx socket\n"); - goto clean_exit; - } + if (zmq_setsockopt(q->sock, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) { + fprintf(stderr, "Error: setting receive timeout on rx socket\n"); + goto clean_exit; + } - timeout = 0; - if (zmq_setsockopt(q->sock, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) { - fprintf(stderr, "Error: setting linger timeout on rx socket\n"); - goto clean_exit; + timeout = 0; + if (zmq_setsockopt(q->sock, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) { + fprintf(stderr, "Error: setting linger timeout on rx socket\n"); + goto clean_exit; + } } -#endif if (srslte_ringbuffer_init(&q->ringbuffer, ZMQ_MAX_BUFFER_SIZE)) { fprintf(stderr, "Error: initiating ringbuffer\n"); @@ -216,14 +221,15 @@ int rf_zmq_rx_baseband(rf_zmq_rx_t* q, cf_t* buffer, uint32_t nsamples) // If the read needs to be advanced while (q->sample_offset < 0) { uint32_t n_offset = SRSLTE_MIN(-q->sample_offset, NBYTES2NSAMPLES(ZMQ_MAX_BUFFER_SIZE)); - int n = srslte_ringbuffer_read_timed(&q->ringbuffer, q->temp_buffer, (int)(n_offset * sample_sz), ZMQ_TIMEOUT_MS); + int n = + srslte_ringbuffer_read_timed(&q->ringbuffer, q->temp_buffer, (int)(n_offset * sample_sz), q->trx_timeout_ms); if (n < SRSLTE_SUCCESS) { return n; } q->sample_offset += n_offset; } - int n = srslte_ringbuffer_read_timed(&q->ringbuffer, dst_buffer, sample_sz * nsamples, ZMQ_TIMEOUT_MS); + int n = srslte_ringbuffer_read_timed(&q->ringbuffer, dst_buffer, sample_sz * nsamples, q->trx_timeout_ms); if (n < 0) { return n; } diff --git a/lib/src/phy/rf/rf_zmq_imp_trx.h b/lib/src/phy/rf/rf_zmq_imp_trx.h index eb4bed704..ba60fc787 100644 --- a/lib/src/phy/rf/rf_zmq_imp_trx.h +++ b/lib/src/phy/rf/rf_zmq_imp_trx.h @@ -63,6 +63,8 @@ typedef struct { void* temp_buffer_convert; uint32_t frequency_mhz; bool fail_on_disconnect; + uint32_t trx_timeout_ms; + bool log_trx_timeout; int32_t sample_offset; } rf_zmq_rx_t; @@ -72,6 +74,8 @@ typedef struct { rf_zmq_format_t sample_format; uint32_t frequency_mhz; bool fail_on_disconnect; + uint32_t trx_timeout_ms; + bool log_trx_timeout; int32_t sample_offset; ///< offset in samples } rf_zmq_opts_t; diff --git a/lib/src/phy/rf/rf_zmq_imp_tx.c b/lib/src/phy/rf/rf_zmq_imp_tx.c index bd181a513..2ebd76eaa 100644 --- a/lib/src/phy/rf/rf_zmq_imp_tx.c +++ b/lib/src/phy/rf/rf_zmq_imp_tx.c @@ -49,24 +49,24 @@ int rf_zmq_tx_open(rf_zmq_tx_t* q, rf_zmq_opts_t opts, void* zmq_ctx, char* sock goto clean_exit; } -#if ZMQ_TIMEOUT_MS - int timeout = ZMQ_TIMEOUT_MS; - if (zmq_setsockopt(q->sock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) { - fprintf(stderr, "Error: setting receive timeout on tx socket\n"); - goto clean_exit; - } + if (opts.trx_timeout_ms) { + int timeout = opts.trx_timeout_ms; + if (zmq_setsockopt(q->sock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)) == -1) { + fprintf(stderr, "Error: setting receive timeout on tx socket\n"); + goto clean_exit; + } - if (zmq_setsockopt(q->sock, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) { - fprintf(stderr, "Error: setting receive timeout on tx socket\n"); - goto clean_exit; - } + if (zmq_setsockopt(q->sock, ZMQ_SNDTIMEO, &timeout, sizeof(timeout)) == -1) { + fprintf(stderr, "Error: setting receive timeout on tx socket\n"); + goto clean_exit; + } - timeout = 0; - if (zmq_setsockopt(q->sock, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) { - fprintf(stderr, "Error: setting linger timeout on tx socket\n"); - goto clean_exit; + timeout = 0; + if (zmq_setsockopt(q->sock, ZMQ_LINGER, &timeout, sizeof(timeout)) == -1) { + fprintf(stderr, "Error: setting linger timeout on tx socket\n"); + goto clean_exit; + } } -#endif if (pthread_mutex_init(&q->mutex, NULL)) { fprintf(stderr, "Error: creating mutex\n"); diff --git a/lib/src/phy/rf/rf_zmq_test.c b/lib/src/phy/rf/rf_zmq_test.c index fe905c7f0..c8857cef1 100644 --- a/lib/src/phy/rf/rf_zmq_test.c +++ b/lib/src/phy/rf/rf_zmq_test.c @@ -229,7 +229,8 @@ int main() } // two trx radios with continous tx (no timed tx) using TCP transport for both directions - if (run_test("tx_port=tcp://*:5554,rx_port=tcp://localhost:5555,id=ue,base_srate=1.92e6", + if (run_test("tx_port=tcp://*:5554,rx_port=tcp://" + "localhost:5555,id=ue,base_srate=1.92e6,log_trx_timeout=true,trx_timeout_ms=1000", "rx_port=tcp://localhost:5554,tx_port=tcp://*:5555,id=enb,base_srate=1.92e6", false) != SRSLTE_SUCCESS) { fprintf(stderr, "Two TRx radio test failed!\n");