performance - use of new concurrent fixed size pool for byte_buffer pool

This commit is contained in:
Francisco 2021-03-25 15:54:15 +00:00 committed by Francisco Paisana
parent e1523692c2
commit be771e5c23
5 changed files with 94 additions and 79 deletions

View File

@ -15,7 +15,6 @@
#include "memblock_cache.h"
#include "srsran/adt/circular_buffer.h"
#include "srsran/adt/singleton.h"
#include <thread>
namespace srsran {
@ -31,10 +30,9 @@ namespace srsran {
* @tparam NofObjects number of objects in the pool
* @tparam ObjSize object size
*/
template <size_t NofObjects, size_t ObjSize, size_t MaxWorkerCacheSize = NofObjects / 16>
class concurrent_fixed_memory_pool : public singleton_t<concurrent_fixed_memory_pool<NofObjects, ObjSize> >
template <size_t ObjSize, bool DebugSanitizeAddress = false>
class concurrent_fixed_memory_pool
{
static_assert(NofObjects > 256, "This pool is particularly designed for a high number of objects");
static_assert(ObjSize > 256, "This pool is particularly designed for large objects");
struct obj_storage_t {
@ -45,20 +43,41 @@ class concurrent_fixed_memory_pool : public singleton_t<concurrent_fixed_memory_
const static size_t batch_steal_size = 10;
protected:
// ctor only accessible from singleton
concurrent_fixed_memory_pool()
// ctor only accessible from singleton get_instance()
explicit concurrent_fixed_memory_pool(size_t nof_objects_)
{
allocated_blocks.resize(NofObjects);
srsran_assert(nof_objects_ > 0, "A positive pool size must be provided");
std::lock_guard<std::mutex> lock(mutex);
allocated_blocks.resize(nof_objects_);
for (std::unique_ptr<obj_storage_t>& b : allocated_blocks) {
b.reset(new obj_storage_t(std::this_thread::get_id()));
srsran_assert(b.get() != nullptr, "Failed to instantiate fixed memory pool");
shared_mem_cache.push(static_cast<void*>(b.get()));
}
max_objs_per_cache = allocated_blocks.size() / 16;
max_objs_per_cache = max_objs_per_cache < batch_steal_size ? batch_steal_size : max_objs_per_cache;
}
public:
static size_t size() { return NofObjects; }
concurrent_fixed_memory_pool(const concurrent_fixed_memory_pool&) = delete;
concurrent_fixed_memory_pool(concurrent_fixed_memory_pool&&) = delete;
concurrent_fixed_memory_pool& operator=(const concurrent_fixed_memory_pool&) = delete;
concurrent_fixed_memory_pool& operator=(concurrent_fixed_memory_pool&&) = delete;
~concurrent_fixed_memory_pool()
{
std::lock_guard<std::mutex> lock(mutex);
allocated_blocks.clear();
}
static concurrent_fixed_memory_pool<ObjSize, DebugSanitizeAddress>* get_instance(size_t size = 4096)
{
static concurrent_fixed_memory_pool<ObjSize, DebugSanitizeAddress> pool(size);
return &pool;
}
size_t size() { return allocated_blocks.size(); }
void* allocate_node(size_t sz)
{
@ -76,6 +95,12 @@ public:
}
node = worker_cache->try_pop();
}
#ifdef SRSRAN_BUFFER_POOL_LOG_ENABLED
if (node == nullptr) {
print_error("Error allocating buffer in pool of ObjSize=%zd", ObjSize);
}
#endif
return node;
}
@ -85,7 +110,16 @@ public:
memblock_cache* worker_cache = get_worker_cache();
obj_storage_t* block_ptr = static_cast<obj_storage_t*>(p);
if (block_ptr->worker_id != std::this_thread::get_id() or worker_cache->size() >= MaxWorkerCacheSize) {
if (DebugSanitizeAddress) {
std::lock_guard<std::mutex> lock(mutex);
srsran_assert(std::any_of(allocated_blocks.begin(),
allocated_blocks.end(),
[block_ptr](const std::unique_ptr<obj_storage_t>& b) { return b.get() == block_ptr; }),
"Error deallocating block with address 0x%lx.",
(long unsigned)block_ptr);
}
if (block_ptr->worker_id != std::this_thread::get_id() or worker_cache->size() >= max_objs_per_cache) {
// if block was allocated in a different thread or local cache reached max capacity, send block to shared
// container
shared_mem_cache.push(static_cast<void*>(block_ptr));
@ -96,6 +130,22 @@ public:
worker_cache->push(static_cast<uint8_t*>(p));
}
void enable_logger(bool enabled)
{
if (enabled) {
logger = &srslog::fetch_basic_logger("POOL");
logger->set_level(srslog::basic_levels::debug);
} else {
logger = nullptr;
}
}
void print_all_buffers()
{
std::lock_guard<std::mutex> lock(mutex);
printf("There are %zd/%zd buffers in shared block container.\n", shared_mem_cache.size(), allocated_blocks.size());
}
private:
memblock_cache* get_worker_cache()
{
@ -103,6 +153,20 @@ private:
return &worker_cache;
}
/// Formats and prints the input string and arguments into the configured output stream.
template <typename... Args>
void print_error(const char* str, Args&&... args)
{
if (logger != nullptr) {
logger->error(str, std::forward<Args>(args)...);
} else {
fmt::printf(std::string(str) + "\n", std::forward<Args>(args)...);
}
}
size_t max_objs_per_cache = 0;
srslog::basic_logger* logger = nullptr;
mutexed_memblock_cache shared_mem_cache;
std::mutex mutex;
std::vector<std::unique_ptr<obj_storage_t> > allocated_blocks;

View File

@ -21,10 +21,7 @@
#include <string>
#include <vector>
/*******************************************************************************
INCLUDES
*******************************************************************************/
#include "srsran/adt/pool/fixed_size_pool.h"
#include "srsran/common/common.h"
#include "srsran/srslog/srslog.h"
@ -165,58 +162,7 @@ private:
uint32_t capacity;
};
class byte_buffer_pool
{
using mem_chunk = typename std::aligned_storage<sizeof(byte_buffer_t), alignof(byte_buffer_t)>::type;
public:
// Singleton static methods
static byte_buffer_pool* get_instance(int capacity = -1)
{
static std::unique_ptr<byte_buffer_pool> instance(new byte_buffer_pool(capacity));
return instance.get();
}
byte_buffer_pool(int capacity = -1) : pool(capacity) {}
byte_buffer_pool(const byte_buffer_pool& other) = delete;
byte_buffer_pool(byte_buffer_pool&& other) = delete;
byte_buffer_pool& operator=(const byte_buffer_pool& other) = delete;
byte_buffer_pool& operator=(byte_buffer_pool&& other) = delete;
void* allocate(const char* debug_name = nullptr, bool blocking = false)
{
return pool.allocate(debug_name, blocking);
}
void enable_logger(bool enabled) { print_to_log = enabled; }
void deallocate(void* b)
{
if (!b) {
return;
}
if (!pool.deallocate(static_cast<mem_chunk*>(b))) {
#ifdef SRSRAN_BUFFER_POOL_LOG_ENABLED
print_error("Error deallocating PDU: Addr=0x%p, name=%s not found in pool", (void*)b, b->debug_name);
#else
print_error("Error deallocating PDU: Addr=0x%p", (void*)b);
#endif
}
}
void print_all_buffers() { pool.print_all_buffers(); }
private:
/// Formats and prints the input string and arguments into the configured output stream.
template <typename... Args>
void print_error(const char* str, Args&&... args)
{
if (print_to_log) {
srslog::fetch_basic_logger("POOL", false).error(str, std::forward<Args>(args)...);
} else {
fmt::printf(std::string(str) + "\n", std::forward<Args>(args)...);
}
}
private:
bool print_to_log = false;
buffer_pool<mem_chunk> pool;
};
using byte_buffer_pool = concurrent_fixed_memory_pool<sizeof(byte_buffer_t)>;
inline unique_byte_buffer_t make_byte_buffer() noexcept
{

View File

@ -197,9 +197,6 @@ struct bit_buffer_t {
uint32_t get_headroom() { return msg - buffer; }
};
// Create a Managed Life-Time Byte Buffer
class byte_buffer_pool;
using unique_byte_buffer_t = std::unique_ptr<byte_buffer_t>;
///

View File

@ -18,13 +18,13 @@ namespace srsran {
void* byte_buffer_t::operator new(size_t sz, const std::nothrow_t& nothrow_value) noexcept
{
assert(sz == sizeof(byte_buffer_t));
return byte_buffer_pool::get_instance()->allocate(nullptr, false);
return byte_buffer_pool::get_instance()->allocate_node(sz);
}
void* byte_buffer_t::operator new(size_t sz)
{
assert(sz == sizeof(byte_buffer_t));
void* ptr = byte_buffer_pool::get_instance()->allocate(nullptr, false);
void* ptr = byte_buffer_pool::get_instance()->allocate_node(sz);
if (ptr == nullptr) {
throw std::bad_alloc();
}
@ -33,7 +33,7 @@ void* byte_buffer_t::operator new(size_t sz)
void byte_buffer_t::operator delete(void* ptr)
{
byte_buffer_pool::get_instance()->deallocate(ptr);
byte_buffer_pool::get_instance()->deallocate_node(ptr);
}
} // namespace srsran

View File

@ -80,7 +80,7 @@ struct BigObj {
C c;
std::array<uint8_t, 500> space;
using pool_t = srsran::concurrent_fixed_memory_pool<1024, 512>;
using pool_t = srsran::concurrent_fixed_memory_pool<512, true>;
void* operator new(size_t sz)
{
@ -97,9 +97,12 @@ struct BigObj {
void test_fixedsize_pool()
{
size_t pool_size = 1024;
auto* fixed_pool = BigObj::pool_t::get_instance(pool_size);
fixed_pool->print_all_buffers();
{
std::vector<std::unique_ptr<BigObj> > vec(BigObj::pool_t::size());
for (size_t i = 0; i < BigObj::pool_t::size(); ++i) {
std::vector<std::unique_ptr<BigObj> > vec(pool_size);
for (size_t i = 0; i < pool_size; ++i) {
vec[i].reset(new BigObj());
TESTASSERT(vec[i].get() != nullptr);
}
@ -109,32 +112,37 @@ void test_fixedsize_pool()
obj = std::unique_ptr<BigObj>(new (std::nothrow) BigObj());
TESTASSERT(obj != nullptr);
obj.reset();
fixed_pool->print_all_buffers();
}
fixed_pool->print_all_buffers();
// TEST: one thread allocates, and the other deallocates
{
std::unique_ptr<BigObj> obj;
std::atomic<bool> stop(false);
srsran::dyn_blocking_queue<std::unique_ptr<BigObj> > queue(BigObj::pool_t::size() / 2);
srsran::dyn_blocking_queue<std::unique_ptr<BigObj> > queue(pool_size / 2);
std::thread t([&queue, &stop]() {
while (not stop.load(std::memory_order_relaxed)) {
std::unique_ptr<BigObj> obj(new (std::nothrow) BigObj());
TESTASSERT(obj != nullptr);
queue.push_blocking(std::move(obj));
queue.try_push(std::move(obj));
}
});
for (size_t i = 0; i < BigObj::pool_t::size() * 8; ++i) {
for (size_t i = 0; i < pool_size * 8; ++i) {
obj = queue.pop_blocking();
TESTASSERT(obj != nullptr);
}
stop.store(true);
t.join();
}
fixed_pool->print_all_buffers();
}
int main()
int main(int argc, char** argv)
{
srsran::test_init(argc, argv);
TESTASSERT(test_nontrivial_obj_pool() == SRSRAN_SUCCESS);
test_fixedsize_pool();