Implement a benchmark for measuring latency in the foreground threads when pushing log entries.

This benchmark runs with several threads to test contention.
This commit is contained in:
faluco 2021-04-28 10:21:56 +02:00 committed by faluco
parent 04ded030ea
commit aef18f9931
13 changed files with 230 additions and 69 deletions

View File

@ -14,6 +14,7 @@
#define SRSLOG_DETAIL_LOG_BACKEND_H
#include "srsran/srslog/bundled/fmt/printf.h"
#include "srsran/srslog/shared_types.h"
namespace srslog {
@ -31,7 +32,7 @@ public:
/// Starts the processing of incoming log entries.
/// NOTE: Calling this function more than once has no side effects.
virtual void start() = 0;
virtual void start(backend_priority priority = backend_priority::normal) = 0;
/// Allocates a dyn_arg_store and returns a pointer to it on success, otherwise returns nullptr.
virtual fmt::dynamic_format_arg_store<fmt::printf_context>* alloc_arg_store() = 0;

View File

@ -28,7 +28,7 @@ template <typename T, size_t capacity = SRSLOG_QUEUE_CAPACITY>
class work_queue
{
srsran::dyn_circular_buffer<T> queue;
mutable condition_variable cond_var;
mutable mutex m;
static constexpr size_t threshold = capacity * 0.98;
public:
@ -41,15 +41,14 @@ public:
/// queue is full, otherwise true.
bool push(const T& value)
{
cond_var.lock();
m.lock();
// Discard the new element if we reach the maximum capacity.
if (queue.full()) {
cond_var.unlock();
m.unlock();
return false;
}
queue.push(value);
cond_var.unlock();
cond_var.signal();
m.unlock();
return true;
}
@ -58,56 +57,26 @@ public:
/// queue is full, otherwise true.
bool push(T&& value)
{
cond_var.lock();
m.lock();
// Discard the new element if we reach the maximum capacity.
if (queue.full()) {
cond_var.unlock();
m.unlock();
return false;
}
queue.push(std::move(value));
cond_var.unlock();
cond_var.signal();
m.unlock();
return true;
}
/// Extracts the top most element from the queue.
/// NOTE: This method blocks while the queue is empty.
T pop()
/// Extracts the top most element from the queue if it exists.
/// Returns a pair with a bool indicating if the pop has been successful.
std::pair<bool, T> try_pop()
{
cond_var.lock();
m.lock();
while (queue.empty()) {
cond_var.wait();
}
T elem = std::move(queue.top());
queue.pop();
cond_var.unlock();
return elem;
}
/// Extracts the top most element from the queue.
/// NOTE: This method blocks while the queue is empty or or until the
/// programmed timeout expires. Returns a pair with a bool indicating if the
/// pop has been successful.
std::pair<bool, T> timed_pop(unsigned timeout_ms)
{
// Build an absolute time reference for the expiration time.
timespec ts = condition_variable::build_timeout(timeout_ms);
cond_var.lock();
bool timedout = false;
while (queue.empty() && !timedout) {
timedout = cond_var.wait(ts);
}
// Did we wake up on timeout?
if (timedout && queue.empty()) {
cond_var.unlock();
if (queue.empty()) {
m.unlock();
return {false, T()};
}
@ -115,7 +84,7 @@ public:
T Item = std::move(queue.top());
queue.pop();
cond_var.unlock();
m.unlock();
return {true, std::move(Item)};
}
@ -126,7 +95,7 @@ public:
/// Returns true when the queue is almost full, otherwise returns false.
bool is_almost_full() const
{
cond_var_scoped_lock lock(cond_var);
scoped_lock lock(m);
return queue.size() > threshold;
}

View File

@ -20,6 +20,17 @@ namespace srslog {
/// Generic error handler callback.
using error_handler = std::function<void(const std::string&)>;
/// Backend priority levels.
enum class backend_priority
{
/// Default priority of the operating system.
normal,
/// Thread will be given a high priority.
high,
/// Thread will be given a very high priority.
very_high
};
} // namespace srslog
#endif // SRSLOG_SHARED_TYPES_H

View File

@ -223,7 +223,7 @@ sink* create_stderr_sink(const std::string& name = "stderr");
/// This function initializes the logging framework. It must be called before
/// any log entry is generated.
/// NOTE: Calling this function more than once has no side effects.
void init();
void init(backend_priority priority = backend_priority::normal);
/// Flushes the contents of all the registered sinks. The caller thread will
/// block until the operation is completed.

View File

@ -24,12 +24,48 @@ void backend_worker::stop()
}
}
void backend_worker::create_worker()
void backend_worker::set_thread_priority(backend_priority priority) const
{
switch (priority) {
case backend_priority::normal:
break;
case backend_priority::high: {
int min = ::sched_get_priority_min(SCHED_FIFO);
if (min == -1) {
err_handler("Unable to set the backend thread priority to high, falling back to normal priority.");
return;
}
::sched_param sch{min};
if (::pthread_setschedparam(::pthread_self(), SCHED_FIFO, &sch)) {
err_handler("Unable to set the backend thread priority to high, falling back to normal priority.");
return;
}
break;
}
case backend_priority::very_high: {
int max = ::sched_get_priority_max(SCHED_FIFO);
int min = ::sched_get_priority_min(SCHED_FIFO);
if (max == -1 || min == -1) {
err_handler("Unable to set the backend thread priority to real time, falling back to normal priority.");
return;
}
::sched_param sch{min + ((max - min) / 2)};
if (::pthread_setschedparam(::pthread_self(), SCHED_FIFO, &sch)) {
err_handler("Unable to set the backend thread priority to real time, falling back to normal priority.");
return;
}
break;
}
}
}
void backend_worker::create_worker(backend_priority priority)
{
assert(!running_flag && "Only one worker thread should be created");
std::thread t([this]() {
std::thread t([this, priority]() {
running_flag = true;
set_thread_priority(priority);
do_work();
});
@ -41,21 +77,26 @@ void backend_worker::create_worker()
}
}
void backend_worker::start()
void backend_worker::start(backend_priority priority)
{
// Ensure we only create the worker thread once.
std::call_once(start_once_flag, [this]() { create_worker(); });
std::call_once(start_once_flag, [this, priority]() { create_worker(priority); });
}
void backend_worker::do_work()
{
assert(running_flag && "Thread entry function called without running thread");
while (running_flag) {
auto item = queue.timed_pop(sleep_period_ms);
/// This period defines the time the worker will sleep while waiting for new entries. This is required to check the
/// termination variable periodically.
constexpr std::chrono::microseconds sleep_period{100};
// Spin again when the timeout expires.
while (running_flag) {
auto item = queue.try_pop();
// Spin while there are no new entries to process.
if (!item.first) {
std::this_thread::sleep_for(sleep_period);
continue;
}
@ -113,7 +154,7 @@ void backend_worker::process_outstanding_entries()
assert(!running_flag && "Cannot process outstanding entries while thread is running");
while (true) {
auto item = queue.timed_pop(1);
auto item = queue.try_pop();
// Check if the queue is empty.
if (!item.first) {

View File

@ -26,11 +26,6 @@ namespace srslog {
/// log entries from a work queue and dispatches them to the selected sinks.
class backend_worker
{
/// This period defines the maximum time the worker will sleep while waiting
/// for new entries. This is required to check the termination variable
/// periodically.
static constexpr unsigned sleep_period_ms = 500;
public:
backend_worker(detail::work_queue<detail::log_entry>& queue, detail::dyn_arg_store_pool& arg_pool) :
queue(queue), arg_pool(arg_pool), running_flag(false)
@ -44,7 +39,7 @@ public:
/// Starts the backend worker thread. After returning from this function the
/// secondary thread is ensured to be running. Calling this function more than
/// once has no effect.
void start();
void start(backend_priority priority);
/// Stops the backend worker thread if it is running, otherwise the call has
/// no effect. After returning from this function the secondary thread is
@ -78,7 +73,7 @@ public:
private:
/// Creates the worker thread.
/// NOTE: This function should be only called once.
void create_worker();
void create_worker(backend_priority priority);
/// Entry function used by the secondary thread.
void do_work();
@ -103,6 +98,9 @@ private:
}
}
/// Establishes the specified thread priority for the calling thread.
void set_thread_priority(backend_priority priority) const;
private:
detail::work_queue<detail::log_entry>& queue;
detail::dyn_arg_store_pool& arg_pool;

View File

@ -29,7 +29,7 @@ public:
log_backend_impl(const log_backend_impl& other) = delete;
log_backend_impl& operator=(const log_backend_impl& other) = delete;
void start() override { worker.start(); }
void start(backend_priority priority = backend_priority::normal) override { worker.start(priority); }
bool push(detail::log_entry&& entry) override
{

View File

@ -181,9 +181,9 @@ bool srslog::install_custom_sink(const std::string& id, std::unique_ptr<sink> s)
/// Framework configuration and control function implementations.
///
void srslog::init()
void srslog::init(backend_priority priority)
{
srslog_instance::get().get_backend().start();
srslog_instance::get().get_backend().start(priority);
}
void srslog::flush()

View File

@ -6,6 +6,9 @@
# the distribution.
#
add_executable(srslog_frontend_latency benchmarks/frontend_latency.cpp)
target_link_libraries(srslog_frontend_latency srslog)
add_executable(srslog_test srslog_test.cpp)
target_link_libraries(srslog_test srslog)
add_test(srslog_test srslog_test)

View File

@ -0,0 +1,137 @@
/**
*
* \section COPYRIGHT
*
* Copyright 2013-2021 Software Radio Systems Limited
*
* By using this file, you agree to the terms and conditions set
* forth in the LICENSE file which can be found at the top level of
* the distribution.
*
*/
#include "srsran/srslog/srslog.h"
#include <atomic>
#include <sys/resource.h>
#include <thread>
using namespace srslog;
static constexpr unsigned num_iterations = 4000;
static constexpr unsigned num_entries_per_iter = 40;
namespace {
/// This helper class checks if there has been context switches between its construction and destruction for the caller
/// thread.
class context_switch_checker
{
public:
explicit context_switch_checker(std::atomic<unsigned>& counter) : counter(counter)
{
::getrusage(RUSAGE_THREAD, &before);
}
~context_switch_checker()
{
::rusage after{};
::getrusage(RUSAGE_THREAD, &after);
unsigned diff = (after.ru_nvcsw - before.ru_nvcsw) + (after.ru_nivcsw - before.ru_nivcsw);
if (diff) {
counter.fetch_add(diff, std::memory_order_relaxed);
}
}
private:
::rusage before{};
std::atomic<unsigned>& counter;
};
} // namespace
/// Busy waits in the calling thread for the specified amount of time.
static void busy_wait(std::chrono::milliseconds interval)
{
auto begin = std::chrono::steady_clock::now();
auto end = begin + interval;
while (std::chrono::steady_clock::now() < end) {
}
}
/// Worker function used for each thread of the benchmark to generate and measure the time taken for each log entry.
static void run_thread(log_channel& c, std::vector<uint64_t>& results, std::atomic<unsigned>& ctx_counter)
{
for (unsigned iter = 0; iter != num_iterations; ++iter) {
context_switch_checker ctx_checker(ctx_counter);
auto begin = std::chrono::steady_clock::now();
for (unsigned entry_num = 0; entry_num != num_entries_per_iter; ++entry_num) {
double d = entry_num;
c("SRSLOG latency benchmark: int: %u, double: %f, string: %s", iter, d, "test");
}
auto end = std::chrono::steady_clock::now();
results.push_back(std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count() / num_entries_per_iter);
busy_wait(std::chrono::milliseconds(4));
}
}
/// This function runs the latency benchmark generating log entries using the specified number of threads.
static void benchmark(unsigned num_threads)
{
std::vector<std::vector<uint64_t> > thread_results;
thread_results.resize(num_threads);
for (auto& v : thread_results) {
v.reserve(num_iterations);
}
auto& s = srslog::fetch_file_sink("srslog_latency_benchmark.txt");
auto& channel = srslog::fetch_log_channel("bench", s, {});
srslog::init();
std::vector<std::thread> workers;
workers.reserve(num_threads);
std::atomic<unsigned> ctx_counter(0);
for (unsigned i = 0; i != num_threads; ++i) {
workers.emplace_back(run_thread, std::ref(channel), std::ref(thread_results[i]), std::ref(ctx_counter));
}
for (auto& w : workers) {
w.join();
}
std::vector<uint64_t> results;
results.reserve(num_threads * num_iterations);
for (const auto& v : thread_results) {
results.insert(results.end(), v.begin(), v.end());
}
std::sort(results.begin(), results.end());
fmt::print("SRSLOG Frontend Latency Benchmark - logging with {} thread{}\n"
"All values in nanoseconds\n"
"Percentiles: | 50th | 75th | 90th | 99th | 99.9th | Worst |\n"
" |{:6}|{:6}|{:6}|{:6}|{:8}|{:7}|\n"
"Context switches: {} in {} of generated entries\n\n",
num_threads,
(num_threads > 1) ? "s" : "",
results[static_cast<size_t>(results.size() * 0.5)],
results[static_cast<size_t>(results.size() * 0.75)],
results[static_cast<size_t>(results.size() * 0.9)],
results[static_cast<size_t>(results.size() * 0.99)],
results[static_cast<size_t>(results.size() * 0.999)],
results.back(),
ctx_counter,
num_threads * num_iterations * num_entries_per_iter);
}
int main()
{
for (auto n : {1, 2, 4}) {
benchmark(n);
}
return 0;
}

View File

@ -25,7 +25,7 @@ namespace {
class backend_spy : public detail::log_backend
{
public:
void start() override {}
void start(srslog::backend_priority priority) override {}
bool push(detail::log_entry&& entry) override
{

View File

@ -60,7 +60,7 @@ namespace {
class backend_spy : public detail::log_backend
{
public:
void start() override {}
void start(srslog::backend_priority priority) override {}
bool push(detail::log_entry&& entry) override
{

View File

@ -14,6 +14,7 @@
#define TEST_DUMMIES
#include "srsran/srslog/detail/log_backend.h"
#include "srsran/srslog/shared_types.h"
#include "srsran/srslog/sink.h"
namespace test_dummies {
@ -67,7 +68,7 @@ public:
class backend_dummy : public srslog::detail::log_backend
{
public:
void start() override {}
void start(srslog::backend_priority priority) override {}
bool push(srslog::detail::log_entry&& entry) override { return true; }