diff --git a/lib/include/srslte/phy/utils/ringbuffer.h b/lib/include/srslte/phy/utils/ringbuffer.h index 5d2ae61ad..6a2a466b5 100644 --- a/lib/include/srslte/phy/utils/ringbuffer.h +++ b/lib/include/srslte/phy/utils/ringbuffer.h @@ -53,14 +53,29 @@ SRSLTE_API int srslte_ringbuffer_status(srslte_ringbuffer_t* q); SRSLTE_API int srslte_ringbuffer_space(srslte_ringbuffer_t* q); +SRSLTE_API int srslte_ringbuffer_resize(srslte_ringbuffer_t* q, int capacity); + +// write to the buffer immediately, if there isnt enough space it will overflow SRSLTE_API int srslte_ringbuffer_write(srslte_ringbuffer_t* q, void* ptr, int nof_bytes); -SRSLTE_API int srslte_ringbuffer_write_timed(srslte_ringbuffer_t* q, void* ptr, int nof_bytes, uint32_t timeout_ms); +// block forever until there is enough space then write to buffer +SRSLTE_API int srslte_ringbuffer_write_block(srslte_ringbuffer_t* q, void* ptr, int nof_bytes); +// block for timeout_ms milliseconds, then either write to buffer if there is space or return an error without writing +SRSLTE_API int srslte_ringbuffer_write_timed(srslte_ringbuffer_t* q, void* ptr, int nof_bytes, int32_t timeout_ms); + +SRSLTE_API int +srslte_ringbuffer_write_timed_block(srslte_ringbuffer_t* q, void* ptr, int nof_bytes, int32_t timeout_ms); + +// read from buffer, blocking until there is enough samples SRSLTE_API int srslte_ringbuffer_read(srslte_ringbuffer_t* q, void* ptr, int nof_bytes); -SRSLTE_API int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, uint32_t timeout_ms); +// read from buffer, blocking for timeout_ms milliseconds until there is enough samples or return an error +SRSLTE_API int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, int32_t timeout_ms); +SRSLTE_API int srslte_ringbuffer_read_timed_block(srslte_ringbuffer_t* q, void* p, int nof_bytes, int32_t timeout_ms); + +// read samples from the buffer, convert them from uint16_t to cplx float and get the conjugate SRSLTE_API int srslte_ringbuffer_read_convert_conj(srslte_ringbuffer_t* q, cf_t* dst_ptr, float norm, int nof_samples); SRSLTE_API int srslte_ringbuffer_read_block(srslte_ringbuffer_t* q, void** p, int nof_bytes); diff --git a/lib/src/phy/utils/ringbuffer.c b/lib/src/phy/utils/ringbuffer.c index 8dcd2afee..7a388664f 100644 --- a/lib/src/phy/utils/ringbuffer.c +++ b/lib/src/phy/utils/ringbuffer.c @@ -68,6 +68,23 @@ void srslte_ringbuffer_reset(srslte_ringbuffer_t* q) } } +int srslte_ringbuffer_resize(srslte_ringbuffer_t* q, int capacity) +{ + if (q->buffer) { + free(q->buffer); + q->buffer = NULL; + } + srslte_ringbuffer_reset(q); + q->buffer = srslte_vec_malloc(capacity); + if (!q->buffer) { + return -1; + } + q->active = true; + q->capacity = capacity; + + return 0; +} + int srslte_ringbuffer_status(srslte_ringbuffer_t* q) { return q->count; @@ -78,37 +95,22 @@ int srslte_ringbuffer_space(srslte_ringbuffer_t* q) return q->capacity - q->count; } -int srslte_ringbuffer_write(srslte_ringbuffer_t* q, void* p, int nof_bytes) +int srslte_ringbuffer_write(srslte_ringbuffer_t* q, void* ptr, int nof_bytes) { - uint8_t* ptr = (uint8_t*)p; - int w_bytes = nof_bytes; - pthread_mutex_lock(&q->mutex); - if (!q->active) { - pthread_mutex_unlock(&q->mutex); - return 0; - } - if (q->count + w_bytes > q->capacity) { - w_bytes = q->capacity - q->count; - ERROR("Buffer overrun: lost %d bytes\n", nof_bytes - w_bytes); - } - if (w_bytes > q->capacity - q->wpm) { - int x = q->capacity - q->wpm; - memcpy(&q->buffer[q->wpm], ptr, x); - memcpy(q->buffer, &ptr[x], w_bytes - x); - } else { - memcpy(&q->buffer[q->wpm], ptr, w_bytes); - } - q->wpm += w_bytes; - if (q->wpm >= q->capacity) { - q->wpm -= q->capacity; - } - q->count += w_bytes; - pthread_cond_broadcast(&q->write_cvar); - pthread_mutex_unlock(&q->mutex); - return w_bytes; + return srslte_ringbuffer_write_timed_block(q, ptr, nof_bytes, 0); } -int srslte_ringbuffer_write_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, uint32_t timeout_ms) +int srslte_ringbuffer_write_timed(srslte_ringbuffer_t* q, void* ptr, int nof_bytes, int32_t timeout_ms) +{ + return srslte_ringbuffer_write_timed_block(q, ptr, nof_bytes, timeout_ms); +} + +int srslte_ringbuffer_write_block(srslte_ringbuffer_t* q, void* ptr, int nof_bytes) +{ + return srslte_ringbuffer_write_timed_block(q, ptr, nof_bytes, -1); +} + +int srslte_ringbuffer_write_timed_block(srslte_ringbuffer_t* q, void* p, int nof_bytes, int32_t timeout_ms) { int ret = SRSLTE_SUCCESS; uint8_t* ptr = (uint8_t*)p; @@ -117,16 +119,24 @@ int srslte_ringbuffer_write_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes struct timeval now; // Get current time and update timeout - gettimeofday(&now, NULL); - towait.tv_sec = now.tv_sec + timeout_ms / 1000U; - towait.tv_nsec = (now.tv_usec + 1000UL * (timeout_ms % 1000U)) * 1000UL; + if (timeout_ms > 0) { + gettimeofday(&now, NULL); + towait.tv_sec = now.tv_sec + timeout_ms / 1000U; + towait.tv_nsec = (now.tv_usec + 1000UL * (timeout_ms % 1000U)) * 1000UL; + } pthread_mutex_lock(&q->mutex); // Wait to have enough space in the buffer while (q->count + w_bytes > q->capacity && q->active && ret == SRSLTE_SUCCESS) { - ret = pthread_cond_timedwait(&q->read_cvar, &q->mutex, &towait); + if (timeout_ms > 0) { + ret = pthread_cond_timedwait(&q->read_cvar, &q->mutex, &towait); + } else if (timeout_ms < 0) { + pthread_cond_wait(&q->read_cvar, &q->mutex); + } else { + w_bytes = q->capacity - q->count; + ERROR("Buffer overrun: lost %d bytes\n", nof_bytes - w_bytes); + } } - if (ret == ETIMEDOUT) { ret = SRSLTE_ERROR_TIMEOUT; } else if (!q->active) { @@ -144,43 +154,26 @@ int srslte_ringbuffer_write_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes q->wpm -= q->capacity; } q->count += w_bytes; + ret = w_bytes; } else { ret = SRSLTE_ERROR; } pthread_cond_broadcast(&q->write_cvar); pthread_mutex_unlock(&q->mutex); - return w_bytes; + return ret; } int srslte_ringbuffer_read(srslte_ringbuffer_t* q, void* p, int nof_bytes) { - uint8_t* ptr = (uint8_t*)p; - pthread_mutex_lock(&q->mutex); - while (q->count < nof_bytes && q->active) { - pthread_cond_wait(&q->write_cvar, &q->mutex); - } - if (!q->active) { - pthread_mutex_unlock(&q->mutex); - return 0; - } - if (nof_bytes + q->rpm > q->capacity) { - int x = q->capacity - q->rpm; - memcpy(ptr, &q->buffer[q->rpm], x); - memcpy(&ptr[x], q->buffer, nof_bytes - x); - } else { - memcpy(ptr, &q->buffer[q->rpm], nof_bytes); - } - q->rpm += nof_bytes; - if (q->rpm >= q->capacity) { - q->rpm -= q->capacity; - } - q->count -= nof_bytes; - pthread_cond_broadcast(&q->read_cvar); - pthread_mutex_unlock(&q->mutex); - return nof_bytes; + return srslte_ringbuffer_read_timed_block(q, p, nof_bytes, -1); } -int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, uint32_t timeout_ms) +int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, int32_t timeout_ms) +{ + return srslte_ringbuffer_read_timed_block(q, p, nof_bytes, timeout_ms); +} + +int srslte_ringbuffer_read_timed_block(srslte_ringbuffer_t* q, void* p, int nof_bytes, int32_t timeout_ms) { int ret = SRSLTE_SUCCESS; uint8_t* ptr = (uint8_t*)p; @@ -188,16 +181,21 @@ int srslte_ringbuffer_read_timed(srslte_ringbuffer_t* q, void* p, int nof_bytes, struct timeval now; // Get current time and update timeout - gettimeofday(&now, NULL); - towait.tv_sec = now.tv_sec + timeout_ms / 1000U; - towait.tv_nsec = (now.tv_usec + 1000UL * (timeout_ms % 1000U)) * 1000UL; - + if (timeout_ms > 0) { + gettimeofday(&now, NULL); + towait.tv_sec = now.tv_sec + timeout_ms / 1000U; + towait.tv_nsec = (now.tv_usec + 1000UL * (timeout_ms % 1000U)) * 1000UL; + } // Lock mutex pthread_mutex_lock(&q->mutex); // Wait for having enough samples while (q->count < nof_bytes && q->active && ret == SRSLTE_SUCCESS) { - ret = pthread_cond_timedwait(&q->write_cvar, &q->mutex, &towait); + if (timeout_ms > 0) { + ret = pthread_cond_timedwait(&q->write_cvar, &q->mutex, &towait); + } else { + pthread_cond_wait(&q->write_cvar, &q->mutex); + } } if (ret == ETIMEDOUT) { diff --git a/lib/src/phy/utils/test/CMakeLists.txt b/lib/src/phy/utils/test/CMakeLists.txt index 817e4ca67..4596d5823 100644 --- a/lib/src/phy/utils/test/CMakeLists.txt +++ b/lib/src/phy/utils/test/CMakeLists.txt @@ -46,3 +46,12 @@ add_test(algebra_2x2_mmse_solver_test algebra_test -m) add_executable(vector_test vector_test.c) target_link_libraries(vector_test srslte_phy) add_test(vector_test vector_test) + + +######################################################################## + +add_executable(ringbuffer_test ring_buffer_test.c) +target_link_libraries(ringbuffer_test srslte_phy) + +add_test(ringbuffer_tester ringbuffer_test) +######################################################################## diff --git a/lib/src/phy/utils/test/ring_buffer_test.c b/lib/src/phy/utils/test/ring_buffer_test.c new file mode 100644 index 000000000..40151a4d0 --- /dev/null +++ b/lib/src/phy/utils/test/ring_buffer_test.c @@ -0,0 +1,199 @@ +/* + * Copyright 2013-2019 Software Radio Systems Limited + * + * This file is part of srsLTE. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#include "srslte/common/test_common.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "srslte/phy/utils/ringbuffer.h" +#include "srslte/phy/utils/vector.h" + +struct thread_args_t { + int len; + uint8_t* in; + uint8_t* out; + srslte_ringbuffer_t* buf; + int res; +}; + +int N = 200; +int M = 10; + +void usage(char* prog) +{ + printf("Usage: %s\n", prog); + printf("\t-N size of blocks in [Default 200]\n"); + printf("\t-M Number of blocks [Default 10]\n"); +} + +void parse_args(int argc, char** argv) +{ + int opt; + while ((opt = getopt(argc, argv, "Nd")) != -1) { + switch (opt) { + case 'N': + N = (int)strtol(argv[optind], NULL, 10); + break; + case 'M': + M = (int)strtol(argv[optind], NULL, 10); + break; + default: + usage(argv[0]); + exit(-1); + } + } +} + +int test_normal_read_write(srslte_ringbuffer_t* q, uint8_t* in, uint8_t* out, int len) +{ + srslte_ringbuffer_write(q, in, len / 2); + srslte_ringbuffer_write(q, &in[len / 2], len / 2); + + for (int i = 0; i < 4; i++) { + srslte_ringbuffer_read(q, &out[(len / 4) * i], len / 4); + } + + TESTASSERT(!memcmp(in, out, len)); + return 0; +} + +int test_overflow_write(srslte_ringbuffer_t* q, uint8_t* in, uint8_t* out, int len) +{ + int ret = srslte_ringbuffer_write(q, in, len / 2); + ret = srslte_ringbuffer_write(q, &in[len / 2], len / 2 + 2); + if (ret != (len / 2 + 2)) { + ret = -1; + } + return ret; +} + +void* write_thread(void* args_) +{ + int res = 0; + struct thread_args_t* args = (struct thread_args_t*)args_; + for (int i = 0; i < M; i++) { + res = srslte_ringbuffer_write_block(args->buf, args->in, args->len); + } + if (res < 0) { + args->res = res; + } + return NULL; +} + +void* read_thread(void* args_) +{ + int res = 0; + struct thread_args_t* args = (struct thread_args_t*)args_; + for (int i = 0; i < M; i++) { + res = srslte_ringbuffer_read(args->buf, &args->out[args->len * i], args->len); + } + if (res < 0) { + args->res = res; + } + return NULL; +} + +int threaded_blocking_test(struct thread_args_t* args) +{ + + pthread_t threads[2]; + if (pthread_create(&threads[0], NULL, write_thread, args)) { + fprintf(stderr, "Error creating thread\n"); + return SRSLTE_ERROR; + } + usleep(10000); + if (pthread_create(&threads[1], NULL, read_thread, args)) { + fprintf(stderr, "Error creating thread\n"); + return SRSLTE_ERROR; + } + + for (int i = 0; i < 2; i++) { + if (pthread_join(threads[i], NULL)) { + fprintf(stderr, "Error joining thread\n"); + return SRSLTE_ERROR; + } + } + + for (int i = 0; i < M; i++) { + uint8_t* out_ptr = &args->out[N * i]; + TESTASSERT(!memcmp(args->in, out_ptr, N)); + } + + if (args->res < 0) { + return SRSLTE_ERROR; + } + + return SRSLTE_SUCCESS; +} + +int main(int argc, char** argv) +{ + int ret = SRSLTE_SUCCESS; + parse_args(argc, argv); + struct thread_args_t thread_in; + + uint8_t* in = srslte_vec_u8_malloc(N * 2); + uint8_t* out = srslte_vec_u8_malloc(N * 10); + srslte_ringbuffer_t ring_buf; + srslte_ringbuffer_init(&ring_buf, N); + + thread_in.in = in; + thread_in.out = out; + thread_in.buf = &ring_buf; + thread_in.len = N; + thread_in.res = ret; + + for (int i = 0; i < N * 2; i++) { + in[i] = i % 255; + } + + if (test_normal_read_write(&ring_buf, in, out, N) < 0) { + printf("Normal read write test failed\n"); + ret = SRSLTE_ERROR; + } + bzero(out, N * 10); + srslte_ringbuffer_reset(&ring_buf); + + if (test_overflow_write(&ring_buf, in, out, N) != -1) { + printf("Overflow detection not working correctly\n"); + ret = SRSLTE_ERROR; + } + bzero(out, N * 10); + srslte_ringbuffer_reset(&ring_buf); + + if (threaded_blocking_test((void*)&thread_in)) { + printf("Error in multithreaded blocking ringbuffer test\n"); + ret = SRSLTE_ERROR; + } + srslte_ringbuffer_stop(&ring_buf); + srslte_ringbuffer_free(&ring_buf); + free(in); + free(out); + printf("Done\n"); + return ret; +}