Added ringbuffer zerocopy a timeout

This commit is contained in:
Xavier Arteaga 2020-09-16 18:37:41 +02:00 committed by Xavier Arteaga
parent 759719ad55
commit c20d4ff5cc
2 changed files with 41 additions and 9 deletions

View File

@ -78,7 +78,7 @@ SRSLTE_API int srslte_ringbuffer_read_timed_block(srslte_ringbuffer_t* q, void*
// read samples from the buffer, convert them from uint16_t to cplx float and get the conjugate // read samples from the buffer, convert them from uint16_t to cplx float and get the conjugate
SRSLTE_API int srslte_ringbuffer_read_convert_conj(srslte_ringbuffer_t* q, cf_t* dst_ptr, float norm, int nof_samples); SRSLTE_API int srslte_ringbuffer_read_convert_conj(srslte_ringbuffer_t* q, cf_t* dst_ptr, float norm, int nof_samples);
SRSLTE_API int srslte_ringbuffer_read_block(srslte_ringbuffer_t* q, void** p, int nof_bytes); SRSLTE_API int srslte_ringbuffer_read_block(srslte_ringbuffer_t* q, void** p, int nof_bytes, int32_t timeout_ms);
SRSLTE_API void srslte_ringbuffer_stop(srslte_ringbuffer_t* q); SRSLTE_API void srslte_ringbuffer_stop(srslte_ringbuffer_t* q);

View File

@ -118,6 +118,11 @@ int srslte_ringbuffer_write_timed_block(srslte_ringbuffer_t* q, void* p, int nof
struct timespec towait; struct timespec towait;
struct timeval now; struct timeval now;
if (q == NULL || q->buffer == NULL) {
ERROR("Invalid inputs\n");
return SRSLTE_ERROR_INVALID_INPUTS;
}
// Get current time and update timeout // Get current time and update timeout
if (timeout_ms > 0) { if (timeout_ms > 0) {
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
@ -281,19 +286,39 @@ int srslte_ringbuffer_read_convert_conj(srslte_ringbuffer_t* q, cf_t* dst_ptr, f
} }
/* For this function, the ring buffer capacity must be multiple of block size */ /* For this function, the ring buffer capacity must be multiple of block size */
int srslte_ringbuffer_read_block(srslte_ringbuffer_t* q, void** p, int nof_bytes) int srslte_ringbuffer_read_block(srslte_ringbuffer_t* q, void** p, int nof_bytes, int32_t timeout_ms)
{ {
int ret = nof_bytes; int ret = SRSLTE_SUCCESS;
pthread_mutex_lock(&q->mutex); struct timespec towait = {};
/* Wait until enough data is in the buffer */ // Get current time and update timeout
while (q->count < nof_bytes && q->active) { if (timeout_ms > 0) {
pthread_cond_wait(&q->write_cvar, &q->mutex); struct timespec now = {};
timespec_get(&now, TIME_UTC);
// check nsec wrap-around
towait.tv_sec = now.tv_sec + timeout_ms / 1000L;
long nsec = now.tv_nsec + ((timeout_ms % 1000U) * 1000UL);
towait.tv_sec += nsec / 1000000000L;
towait.tv_nsec = nsec % 1000000000L;
} }
if (!q->active) { pthread_mutex_lock(&q->mutex);
ret = 0;
// Wait for having enough samples
while (q->count < nof_bytes && q->active && ret == SRSLTE_SUCCESS) {
if (timeout_ms > 0) {
ret = pthread_cond_timedwait(&q->write_cvar, &q->mutex, &towait);
} else { } else {
pthread_cond_wait(&q->write_cvar, &q->mutex);
}
}
if (ret == ETIMEDOUT) {
ret = SRSLTE_ERROR_TIMEOUT;
} else if (!q->active) {
ret = 0;
} else if (ret == SRSLTE_SUCCESS) {
*p = &q->buffer[q->rpm]; *p = &q->buffer[q->rpm];
q->count -= nof_bytes; q->count -= nof_bytes;
@ -302,6 +327,13 @@ int srslte_ringbuffer_read_block(srslte_ringbuffer_t* q, void** p, int nof_bytes
if (q->rpm >= q->capacity) { if (q->rpm >= q->capacity) {
q->rpm -= q->capacity; q->rpm -= q->capacity;
} }
ret = nof_bytes;
} else if (ret == EINVAL) {
fprintf(stderr, "Error: pthread_cond_timedwait() returned EINVAL, timeout value corrupted.\n");
ret = SRSLTE_ERROR;
} else {
printf("ret=%d %s\n", ret, strerror(ret));
ret = SRSLTE_ERROR;
} }
pthread_cond_broadcast(&q->read_cvar); pthread_cond_broadcast(&q->read_cvar);
pthread_mutex_unlock(&q->mutex); pthread_mutex_unlock(&q->mutex);