Fixes for Blocktree space amplification and slot deletion (#5266)

* Fixes for Blocktree space amplification and slot deletion
This commit is contained in:
Sagar Dhawan 2019-07-24 17:28:08 -07:00 committed by GitHub
parent 3bd35dd7cc
commit 535df0026d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 292 additions and 80 deletions

View File

@ -195,61 +195,151 @@ impl Blocktree {
}
/// Silently deletes all blocktree column families starting at the given slot until the `to` slot
/// Use with care; does not check for integrity and does not update slot metas that
/// refer to deleted slots
pub fn purge_slots(&self, from_slot: Slot, to_slot: Option<Slot>) {
let to_index = to_slot.map(|slot| (slot + 1, 0));
if let Err(e) = self.meta_cf.force_delete(Some(from_slot), to_slot) {
error!(
"Error: {:?} while deleting meta_cf for slot {:?}",
e, from_slot
)
/// Dangerous; Use with care:
/// Does not check for integrity and does not update slot metas that refer to deleted slots
/// Modifies multiple column families simultaneously
pub fn purge_slots(&self, mut from_slot: Slot, to_slot: Option<Slot>) {
// split the purge request into batches of 1000 slots
const PURGE_BATCH_SIZE: u64 = 1000;
let mut batch_end = to_slot
.unwrap_or(from_slot + PURGE_BATCH_SIZE)
.min(from_slot + PURGE_BATCH_SIZE);
while from_slot < batch_end {
if let Ok(end) = self.run_purge_batch(from_slot, batch_end) {
// no more slots to iter or reached the upper bound
if end {
break;
} else {
// update the next batch bounds
from_slot = batch_end;
batch_end = to_slot
.unwrap_or(batch_end + PURGE_BATCH_SIZE)
.min(batch_end + PURGE_BATCH_SIZE);
}
}
}
if let Err(e) = self.data_cf.force_delete(Some((from_slot, 0)), to_index) {
error!(
"Error: {:?} while deleting data_cf for slot {:?}",
e, from_slot
)
}
if let Err(e) = self
.erasure_meta_cf
.force_delete(Some((from_slot, 0)), to_index)
{
error!(
"Error: {:?} while deleting erasure_meta_cf for slot {:?}",
e, from_slot
)
}
if let Err(e) = self.erasure_cf.force_delete(Some((from_slot, 0)), to_index) {
error!(
"Error: {:?} while deleting erasure_cf for slot {:?}",
e, from_slot
)
}
if let Err(e) = self.orphans_cf.force_delete(Some(from_slot), to_slot) {
error!(
"Error: {:?} while deleting orphans_cf for slot {:?}",
e, from_slot
)
}
if let Err(e) = self.index_cf.force_delete(Some(from_slot), to_slot) {
error!(
"Error: {:?} while deleting index_cf for slot {:?}",
e, from_slot
)
}
if let Err(e) = self.dead_slots_cf.force_delete(Some(from_slot), to_slot) {
error!(
"Error: {:?} while deleting dead_slots_cf for slot {:?}",
e, from_slot
)
}
let roots_cf = self.db.column::<cf::Root>();
if let Err(e) = roots_cf.force_delete(Some(from_slot), to_slot) {
error!(
"Error: {:?} while deleting roots_cf for slot {:?}",
e, from_slot
)
}
// Returns whether or not all iterators have reached their end
fn run_purge_batch(&self, from_slot: Slot, batch_end: Slot) -> Result<bool> {
let mut end = true;
let from_slot = Some(from_slot);
let batch_end = Some(batch_end);
unsafe {
let mut batch_processor = self.db.batch_processor();
let mut write_batch = batch_processor
.batch()
.expect("Database Error: Failed to get write batch");
end &= match self
.meta_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
{
Ok(finished) => finished,
Err(e) => {
error!(
"Error: {:?} while deleting meta_cf for slot {:?}",
e, from_slot
);
false
}
};
end &= match self
.data_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
{
Ok(finished) => finished,
Err(e) => {
error!(
"Error: {:?} while deleting meta_cf for slot {:?}",
e, from_slot
);
false
}
};
end &= match self
.erasure_meta_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
{
Ok(finished) => finished,
Err(e) => {
error!(
"Error: {:?} while deleting meta_cf for slot {:?}",
e, from_slot
);
false
}
};
end &= match self
.erasure_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
{
Ok(finished) => finished,
Err(e) => {
error!(
"Error: {:?} while deleting meta_cf for slot {:?}",
e, from_slot
);
false
}
};
end &= match self
.orphans_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
{
Ok(finished) => finished,
Err(e) => {
error!(
"Error: {:?} while deleting meta_cf for slot {:?}",
e, from_slot
);
false
}
};
end &= match self
.index_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
{
Ok(finished) => finished,
Err(e) => {
error!(
"Error: {:?} while deleting meta_cf for slot {:?}",
e, from_slot
);
false
}
};
end &= match self
.dead_slots_cf
.delete_slot(&mut write_batch, from_slot, batch_end)
{
Ok(finished) => finished,
Err(e) => {
error!(
"Error: {:?} while deleting meta_cf for slot {:?}",
e, from_slot
);
false
}
};
let roots_cf = self.db.column::<cf::Root>();
end &= match roots_cf.delete_slot(&mut write_batch, from_slot, batch_end) {
Ok(finished) => finished,
Err(e) => {
error!(
"Error: {:?} while deleting meta_cf for slot {:?}",
e, from_slot
);
false
}
};
if let Err(e) = batch_processor.write(write_batch) {
error!(
"Error: {:?} while submitting write batch for slot {:?} retrying...",
e, from_slot
);
Err(e)?;
}
Ok(end)
}
}
@ -3494,6 +3584,26 @@ pub mod tests {
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_purge_huge() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let (blobs, _) = make_many_slot_entries(0, 5000, 10);
blocktree.write_blobs(blobs).unwrap();
blocktree.purge_slots(0, Some(4999));
blocktree
.slot_meta_iterator(0)
.unwrap()
.for_each(|(slot, _)| {
assert_eq!(slot, 5000);
});
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[should_panic]
#[test]
fn test_prune_out_of_bounds() {

View File

@ -5,6 +5,7 @@ use bincode::{deserialize, serialize};
use serde::de::DeserializeOwned;
use serde::Serialize;
use solana_sdk::timing::Slot;
use std::borrow::Borrow;
use std::collections::HashMap;
use std::marker::PhantomData;
@ -86,6 +87,8 @@ where
fn key(index: Self::Index) -> B::OwnedKey;
fn index(key: &B::Key) -> Self::Index;
fn slot(index: Self::Index) -> Slot;
fn as_index(slot: Slot) -> Self::Index;
}
pub trait DbCursor<B>
@ -409,22 +412,29 @@ where
Ok(iter.map(|(key, value)| (C::index(&key), value)))
}
pub fn force_delete(&self, from: Option<C::Index>, to: Option<C::Index>) -> Result<()>
pub fn delete_slot(
&self,
batch: &mut WriteBatch<B>,
from: Option<Slot>,
to: Option<Slot>,
) -> Result<bool>
where
C::Index: PartialOrd,
C::Index: PartialOrd + Copy,
{
let iter = self.iter(from)?;
let mut end = true;
let iter = self.iter(from.map(C::as_index))?;
for (index, _) in iter {
if let Some(ref to) = to {
if &index > to {
if let Some(to) = to {
if C::slot(index) > to {
end = false;
break;
}
}
if let Err(e) = self.delete(index) {
error!("Error: {:?} while deleting {:?}", e, C::NAME)
};
if let Err(e) = batch.delete::<C>(index) {
error!("Error: {:?} while adding delete to batch {:?}", e, C::NAME)
}
}
Ok(())
Ok(end)
}
#[inline]

View File

@ -2,6 +2,7 @@ use crate::blocktree::db::columns as cf;
use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn};
use crate::blocktree::BlocktreeError;
use crate::result::{Error, Result};
use solana_sdk::timing::Slot;
use byteorder::{BigEndian, ByteOrder};
@ -15,7 +16,8 @@ use std::path::Path;
// A good value for this is the number of cores on the machine
const TOTAL_THREADS: i32 = 8;
const MAX_WRITE_BUFFER_SIZE: usize = 512 * 1024 * 1024;
const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB
const MIN_WRITE_BUFFER_SIZE: u64 = 64 * 1024; // 64KB
#[derive(Debug)]
pub struct Rocks(rocksdb::DB);
@ -40,16 +42,22 @@ impl Backend for Rocks {
let db_options = get_db_options();
// Column family names
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options());
let meta_cf_descriptor =
ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(SlotMeta::NAME));
let data_cf_descriptor =
ColumnFamilyDescriptor::new(Data::NAME, get_cf_options(Data::NAME));
let dead_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options());
let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options());
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(DeadSlots::NAME));
let erasure_cf_descriptor =
ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options(Coding::NAME));
let erasure_meta_cf_descriptor =
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options());
let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options());
let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options());
let index_cf_descriptor = ColumnFamilyDescriptor::new(Index::NAME, get_cf_options());
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(ErasureMeta::NAME));
let orphans_cf_descriptor =
ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options(Orphans::NAME));
let root_cf_descriptor =
ColumnFamilyDescriptor::new(Root::NAME, get_cf_options(Root::NAME));
let index_cf_descriptor =
ColumnFamilyDescriptor::new(Index::NAME, get_cf_options(Index::NAME));
let cfs = vec![
meta_cf_descriptor,
@ -152,6 +160,14 @@ impl Column<Rocks> for cf::Coding {
fn index(key: &[u8]) -> (u64, u64) {
cf::Data::index(key)
}
fn slot(index: Self::Index) -> Slot {
index.0
}
fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}
impl Column<Rocks> for cf::Data {
@ -170,6 +186,14 @@ impl Column<Rocks> for cf::Data {
let index = BigEndian::read_u64(&key[8..16]);
(slot, index)
}
fn slot(index: Self::Index) -> Slot {
index.0
}
fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}
impl Column<Rocks> for cf::Index {
@ -185,6 +209,14 @@ impl Column<Rocks> for cf::Index {
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn<Rocks> for cf::Index {
@ -204,6 +236,14 @@ impl Column<Rocks> for cf::DeadSlots {
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn<Rocks> for cf::DeadSlots {
@ -223,6 +263,14 @@ impl Column<Rocks> for cf::Orphans {
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn<Rocks> for cf::Orphans {
@ -242,6 +290,14 @@ impl Column<Rocks> for cf::Root {
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn<Rocks> for cf::Root {
@ -261,6 +317,14 @@ impl Column<Rocks> for cf::SlotMeta {
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn<Rocks> for cf::SlotMeta {
@ -284,6 +348,14 @@ impl Column<Rocks> for cf::ErasureMeta {
BigEndian::write_u64(&mut key[8..], set_index);
key
}
fn slot(index: Self::Index) -> Slot {
index.0
}
fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}
impl TypedColumn<Rocks> for cf::ErasureMeta {
@ -334,11 +406,27 @@ impl std::convert::From<rocksdb::Error> for Error {
}
}
fn get_cf_options() -> Options {
fn get_cf_options(name: &'static str) -> Options {
use crate::blocktree::db::columns::{Coding, Data};
let mut options = Options::default();
options.set_max_write_buffer_number(32);
options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE);
options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64);
match name {
Coding::NAME | Data::NAME => {
// 512MB * 8 = 4GB. 2 of these columns should take no more than 8GB of RAM
options.set_max_write_buffer_number(8);
options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE as usize);
options.set_target_file_size_base(MAX_WRITE_BUFFER_SIZE / 10);
options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE);
}
_ => {
// We want smaller CFs to flush faster. This results in more WAL files but lowers
// overall WAL space utilization and increases flush frequency
options.set_write_buffer_size(MIN_WRITE_BUFFER_SIZE as usize);
options.set_target_file_size_base(MIN_WRITE_BUFFER_SIZE);
options.set_max_bytes_for_level_base(MIN_WRITE_BUFFER_SIZE);
options.set_level_zero_file_num_compaction_trigger(1);
}
}
options
}
@ -349,8 +437,5 @@ fn get_db_options() -> Options {
options.increase_parallelism(TOTAL_THREADS);
options.set_max_background_flushes(4);
options.set_max_background_compactions(4);
options.set_max_write_buffer_number(32);
options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE);
options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64);
options
}

View File

@ -13,7 +13,7 @@ use std::thread;
use std::thread::{Builder, JoinHandle};
use std::time::Duration;
pub const DEFAULT_MAX_LEDGER_SLOTS: u64 = DEFAULT_SLOTS_PER_EPOCH * 5;
pub const DEFAULT_MAX_LEDGER_SLOTS: u64 = 3 * DEFAULT_SLOTS_PER_EPOCH;
pub struct LedgerCleanupService {
t_cleanup: JoinHandle<()>,
@ -26,6 +26,10 @@ impl LedgerCleanupService {
max_ledger_slots: u64,
exit: &Arc<AtomicBool>,
) -> Self {
info!(
"LedgerCleanupService active. Max Ledger Slots {}",
max_ledger_slots
);
let exit = exit.clone();
let t_cleanup = Builder::new()
.name("solana-ledger-cleanup".to_string())

View File

@ -259,6 +259,9 @@ while [[ -n $1 ]]; do
elif [[ $1 = --no-sigverify ]]; then
args+=("$1")
shift
elif [[ $1 = --limit-ledger-size ]]; then
args+=("$1")
shift
elif [[ $1 = --rpc-port ]]; then
args+=("$1" "$2")
shift 2