pool - avoid concurrent batch allocations in background threads

This commit is contained in:
Francisco 2021-04-08 14:37:16 +01:00 committed by Francisco Paisana
parent 75e9700de0
commit f1c67f5b2b
2 changed files with 39 additions and 13 deletions

View File

@ -121,7 +121,7 @@ public:
void* node = grow_pool.allocate_node();
if (grow_pool.size() < batch_threshold) {
allocate_batch_in_background();
allocate_batch_in_background_unlocked();
}
return node;
}
@ -139,23 +139,39 @@ public:
}
size_t get_node_max_size() const { return grow_pool.get_node_max_size(); }
size_t cache_size() const { return grow_pool.cache_size(); }
size_t cache_size() const
{
std::lock_guard<std::mutex> lock(state->mutex);
return grow_pool.cache_size();
}
private:
void allocate_batch_in_background()
void allocate_batch_in_background_unlocked()
{
std::shared_ptr<detached_pool_state> state_copy = state;
get_background_workers().push_task([state_copy]() {
std::lock_guard<std::mutex> lock(state_copy->mutex);
if (state_copy->pool != nullptr) {
state_copy->pool->grow_pool.allocate_batch();
if (state->dispatched) {
// new batch allocation already ongoing
return;
}
state->dispatched = true;
std::shared_ptr<detached_pool_state> state_sptr = state;
get_background_workers().push_task([state_sptr]() {
std::lock_guard<std::mutex> lock(state_sptr->mutex);
// check if pool has not been destroyed
if (state_sptr->pool != nullptr) {
auto* pool = state_sptr->pool;
do {
pool->grow_pool.allocate_batch();
} while (pool->grow_pool.cache_size() < pool->batch_threshold);
}
state_sptr->dispatched = false;
});
}
// State is stored in a shared_ptr that may outlive the pool.
struct detached_pool_state {
std::mutex mutex;
background_mem_pool* pool;
bool dispatched = false;
explicit detached_pool_state(background_mem_pool* pool_) : pool(pool_) {}
};
std::shared_ptr<detached_pool_state> state;

View File

@ -182,12 +182,21 @@ private:
void allocate_batch_in_background_()
{
std::shared_ptr<detached_pool_state> state_copy = state;
get_background_workers().push_task([state_copy]() {
std::lock_guard<std::mutex> lock(state_copy->mutex);
if (state_copy->pool != nullptr) {
state_copy->pool->grow_pool.allocate_batch();
if (state->dispatched) {
// new batch allocation already ongoing
return;
}
state->dispatched = true;
std::shared_ptr<detached_pool_state> state_sptr = state;
get_background_workers().push_task([state_sptr]() {
std::lock_guard<std::mutex> lock(state_sptr->mutex);
if (state_sptr->pool != nullptr) {
auto* pool = state_sptr->pool;
do {
pool->grow_pool.allocate_batch();
} while (pool->grow_pool.cache_size() < pool->thres);
}
state_sptr->dispatched = false;
});
}
@ -197,6 +206,7 @@ private:
struct detached_pool_state {
std::mutex mutex;
background_obj_pool<T>* pool;
bool dispatched = false;
explicit detached_pool_state(background_obj_pool<T>* pool_) : pool(pool_) {}
};
std::shared_ptr<detached_pool_state> state;