diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 4c11aa99ee..b6c360a03d 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -195,61 +195,151 @@ impl Blocktree { } /// Silently deletes all blocktree column families starting at the given slot until the `to` slot - /// Use with care; does not check for integrity and does not update slot metas that - /// refer to deleted slots - pub fn purge_slots(&self, from_slot: Slot, to_slot: Option) { - let to_index = to_slot.map(|slot| (slot + 1, 0)); - if let Err(e) = self.meta_cf.force_delete(Some(from_slot), to_slot) { - error!( - "Error: {:?} while deleting meta_cf for slot {:?}", - e, from_slot - ) + /// Dangerous; Use with care: + /// Does not check for integrity and does not update slot metas that refer to deleted slots + /// Modifies multiple column families simultaneously + pub fn purge_slots(&self, mut from_slot: Slot, to_slot: Option) { + // split the purge request into batches of 1000 slots + const PURGE_BATCH_SIZE: u64 = 1000; + let mut batch_end = to_slot + .unwrap_or(from_slot + PURGE_BATCH_SIZE) + .min(from_slot + PURGE_BATCH_SIZE); + while from_slot < batch_end { + if let Ok(end) = self.run_purge_batch(from_slot, batch_end) { + // no more slots to iter or reached the upper bound + if end { + break; + } else { + // update the next batch bounds + from_slot = batch_end; + batch_end = to_slot + .unwrap_or(batch_end + PURGE_BATCH_SIZE) + .min(batch_end + PURGE_BATCH_SIZE); + } + } } - if let Err(e) = self.data_cf.force_delete(Some((from_slot, 0)), to_index) { - error!( - "Error: {:?} while deleting data_cf for slot {:?}", - e, from_slot - ) - } - if let Err(e) = self - .erasure_meta_cf - .force_delete(Some((from_slot, 0)), to_index) - { - error!( - "Error: {:?} while deleting erasure_meta_cf for slot {:?}", - e, from_slot - ) - } - if let Err(e) = self.erasure_cf.force_delete(Some((from_slot, 0)), to_index) { - error!( - "Error: {:?} while deleting erasure_cf for slot {:?}", - e, from_slot - ) - } - if let Err(e) = self.orphans_cf.force_delete(Some(from_slot), to_slot) { - error!( - "Error: {:?} while deleting orphans_cf for slot {:?}", - e, from_slot - ) - } - if let Err(e) = self.index_cf.force_delete(Some(from_slot), to_slot) { - error!( - "Error: {:?} while deleting index_cf for slot {:?}", - e, from_slot - ) - } - if let Err(e) = self.dead_slots_cf.force_delete(Some(from_slot), to_slot) { - error!( - "Error: {:?} while deleting dead_slots_cf for slot {:?}", - e, from_slot - ) - } - let roots_cf = self.db.column::(); - if let Err(e) = roots_cf.force_delete(Some(from_slot), to_slot) { - error!( - "Error: {:?} while deleting roots_cf for slot {:?}", - e, from_slot - ) + } + + // Returns whether or not all iterators have reached their end + fn run_purge_batch(&self, from_slot: Slot, batch_end: Slot) -> Result { + let mut end = true; + let from_slot = Some(from_slot); + let batch_end = Some(batch_end); + unsafe { + let mut batch_processor = self.db.batch_processor(); + let mut write_batch = batch_processor + .batch() + .expect("Database Error: Failed to get write batch"); + end &= match self + .meta_cf + .delete_slot(&mut write_batch, from_slot, batch_end) + { + Ok(finished) => finished, + Err(e) => { + error!( + "Error: {:?} while deleting meta_cf for slot {:?}", + e, from_slot + ); + false + } + }; + end &= match self + .data_cf + .delete_slot(&mut write_batch, from_slot, batch_end) + { + Ok(finished) => finished, + Err(e) => { + error!( + "Error: {:?} while deleting meta_cf for slot {:?}", + e, from_slot + ); + false + } + }; + end &= match self + .erasure_meta_cf + .delete_slot(&mut write_batch, from_slot, batch_end) + { + Ok(finished) => finished, + Err(e) => { + error!( + "Error: {:?} while deleting meta_cf for slot {:?}", + e, from_slot + ); + false + } + }; + end &= match self + .erasure_cf + .delete_slot(&mut write_batch, from_slot, batch_end) + { + Ok(finished) => finished, + Err(e) => { + error!( + "Error: {:?} while deleting meta_cf for slot {:?}", + e, from_slot + ); + false + } + }; + end &= match self + .orphans_cf + .delete_slot(&mut write_batch, from_slot, batch_end) + { + Ok(finished) => finished, + Err(e) => { + error!( + "Error: {:?} while deleting meta_cf for slot {:?}", + e, from_slot + ); + false + } + }; + end &= match self + .index_cf + .delete_slot(&mut write_batch, from_slot, batch_end) + { + Ok(finished) => finished, + Err(e) => { + error!( + "Error: {:?} while deleting meta_cf for slot {:?}", + e, from_slot + ); + false + } + }; + end &= match self + .dead_slots_cf + .delete_slot(&mut write_batch, from_slot, batch_end) + { + Ok(finished) => finished, + Err(e) => { + error!( + "Error: {:?} while deleting meta_cf for slot {:?}", + e, from_slot + ); + false + } + }; + let roots_cf = self.db.column::(); + end &= match roots_cf.delete_slot(&mut write_batch, from_slot, batch_end) { + Ok(finished) => finished, + Err(e) => { + error!( + "Error: {:?} while deleting meta_cf for slot {:?}", + e, from_slot + ); + false + } + }; + if let Err(e) = batch_processor.write(write_batch) { + error!( + "Error: {:?} while submitting write batch for slot {:?} retrying...", + e, from_slot + ); + Err(e)?; + } + Ok(end) } } @@ -3494,6 +3584,26 @@ pub mod tests { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + #[test] + fn test_purge_huge() { + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + let (blobs, _) = make_many_slot_entries(0, 5000, 10); + blocktree.write_blobs(blobs).unwrap(); + + blocktree.purge_slots(0, Some(4999)); + + blocktree + .slot_meta_iterator(0) + .unwrap() + .for_each(|(slot, _)| { + assert_eq!(slot, 5000); + }); + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + #[should_panic] #[test] fn test_prune_out_of_bounds() { diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index a8bdf2b042..dbfffce4b1 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -5,6 +5,7 @@ use bincode::{deserialize, serialize}; use serde::de::DeserializeOwned; use serde::Serialize; +use solana_sdk::timing::Slot; use std::borrow::Borrow; use std::collections::HashMap; use std::marker::PhantomData; @@ -86,6 +87,8 @@ where fn key(index: Self::Index) -> B::OwnedKey; fn index(key: &B::Key) -> Self::Index; + fn slot(index: Self::Index) -> Slot; + fn as_index(slot: Slot) -> Self::Index; } pub trait DbCursor @@ -409,22 +412,29 @@ where Ok(iter.map(|(key, value)| (C::index(&key), value))) } - pub fn force_delete(&self, from: Option, to: Option) -> Result<()> + pub fn delete_slot( + &self, + batch: &mut WriteBatch, + from: Option, + to: Option, + ) -> Result where - C::Index: PartialOrd, + C::Index: PartialOrd + Copy, { - let iter = self.iter(from)?; + let mut end = true; + let iter = self.iter(from.map(C::as_index))?; for (index, _) in iter { - if let Some(ref to) = to { - if &index > to { + if let Some(to) = to { + if C::slot(index) > to { + end = false; break; } - } - if let Err(e) = self.delete(index) { - error!("Error: {:?} while deleting {:?}", e, C::NAME) + }; + if let Err(e) = batch.delete::(index) { + error!("Error: {:?} while adding delete to batch {:?}", e, C::NAME) } } - Ok(()) + Ok(end) } #[inline] diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index 4863f9e850..31318a26e4 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -2,6 +2,7 @@ use crate::blocktree::db::columns as cf; use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn}; use crate::blocktree::BlocktreeError; use crate::result::{Error, Result}; +use solana_sdk::timing::Slot; use byteorder::{BigEndian, ByteOrder}; @@ -15,7 +16,8 @@ use std::path::Path; // A good value for this is the number of cores on the machine const TOTAL_THREADS: i32 = 8; -const MAX_WRITE_BUFFER_SIZE: usize = 512 * 1024 * 1024; +const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB +const MIN_WRITE_BUFFER_SIZE: u64 = 64 * 1024; // 64KB #[derive(Debug)] pub struct Rocks(rocksdb::DB); @@ -40,16 +42,22 @@ impl Backend for Rocks { let db_options = get_db_options(); // Column family names - let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); - let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options()); + let meta_cf_descriptor = + ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(SlotMeta::NAME)); + let data_cf_descriptor = + ColumnFamilyDescriptor::new(Data::NAME, get_cf_options(Data::NAME)); let dead_slots_cf_descriptor = - ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options()); - let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options()); + ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(DeadSlots::NAME)); + let erasure_cf_descriptor = + ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options(Coding::NAME)); let erasure_meta_cf_descriptor = - ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options()); - let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options()); - let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options()); - let index_cf_descriptor = ColumnFamilyDescriptor::new(Index::NAME, get_cf_options()); + ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(ErasureMeta::NAME)); + let orphans_cf_descriptor = + ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options(Orphans::NAME)); + let root_cf_descriptor = + ColumnFamilyDescriptor::new(Root::NAME, get_cf_options(Root::NAME)); + let index_cf_descriptor = + ColumnFamilyDescriptor::new(Index::NAME, get_cf_options(Index::NAME)); let cfs = vec![ meta_cf_descriptor, @@ -152,6 +160,14 @@ impl Column for cf::Coding { fn index(key: &[u8]) -> (u64, u64) { cf::Data::index(key) } + + fn slot(index: Self::Index) -> Slot { + index.0 + } + + fn as_index(slot: Slot) -> Self::Index { + (slot, 0) + } } impl Column for cf::Data { @@ -170,6 +186,14 @@ impl Column for cf::Data { let index = BigEndian::read_u64(&key[8..16]); (slot, index) } + + fn slot(index: Self::Index) -> Slot { + index.0 + } + + fn as_index(slot: Slot) -> Self::Index { + (slot, 0) + } } impl Column for cf::Index { @@ -185,6 +209,14 @@ impl Column for cf::Index { fn index(key: &[u8]) -> u64 { BigEndian::read_u64(&key[..8]) } + + fn slot(index: Self::Index) -> Slot { + index + } + + fn as_index(slot: Slot) -> Self::Index { + slot + } } impl TypedColumn for cf::Index { @@ -204,6 +236,14 @@ impl Column for cf::DeadSlots { fn index(key: &[u8]) -> u64 { BigEndian::read_u64(&key[..8]) } + + fn slot(index: Self::Index) -> Slot { + index + } + + fn as_index(slot: Slot) -> Self::Index { + slot + } } impl TypedColumn for cf::DeadSlots { @@ -223,6 +263,14 @@ impl Column for cf::Orphans { fn index(key: &[u8]) -> u64 { BigEndian::read_u64(&key[..8]) } + + fn slot(index: Self::Index) -> Slot { + index + } + + fn as_index(slot: Slot) -> Self::Index { + slot + } } impl TypedColumn for cf::Orphans { @@ -242,6 +290,14 @@ impl Column for cf::Root { fn index(key: &[u8]) -> u64 { BigEndian::read_u64(&key[..8]) } + + fn slot(index: Self::Index) -> Slot { + index + } + + fn as_index(slot: Slot) -> Self::Index { + slot + } } impl TypedColumn for cf::Root { @@ -261,6 +317,14 @@ impl Column for cf::SlotMeta { fn index(key: &[u8]) -> u64 { BigEndian::read_u64(&key[..8]) } + + fn slot(index: Self::Index) -> Slot { + index + } + + fn as_index(slot: Slot) -> Self::Index { + slot + } } impl TypedColumn for cf::SlotMeta { @@ -284,6 +348,14 @@ impl Column for cf::ErasureMeta { BigEndian::write_u64(&mut key[8..], set_index); key } + + fn slot(index: Self::Index) -> Slot { + index.0 + } + + fn as_index(slot: Slot) -> Self::Index { + (slot, 0) + } } impl TypedColumn for cf::ErasureMeta { @@ -334,11 +406,27 @@ impl std::convert::From for Error { } } -fn get_cf_options() -> Options { +fn get_cf_options(name: &'static str) -> Options { + use crate::blocktree::db::columns::{Coding, Data}; + let mut options = Options::default(); - options.set_max_write_buffer_number(32); - options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); - options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); + match name { + Coding::NAME | Data::NAME => { + // 512MB * 8 = 4GB. 2 of these columns should take no more than 8GB of RAM + options.set_max_write_buffer_number(8); + options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE as usize); + options.set_target_file_size_base(MAX_WRITE_BUFFER_SIZE / 10); + options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE); + } + _ => { + // We want smaller CFs to flush faster. This results in more WAL files but lowers + // overall WAL space utilization and increases flush frequency + options.set_write_buffer_size(MIN_WRITE_BUFFER_SIZE as usize); + options.set_target_file_size_base(MIN_WRITE_BUFFER_SIZE); + options.set_max_bytes_for_level_base(MIN_WRITE_BUFFER_SIZE); + options.set_level_zero_file_num_compaction_trigger(1); + } + } options } @@ -349,8 +437,5 @@ fn get_db_options() -> Options { options.increase_parallelism(TOTAL_THREADS); options.set_max_background_flushes(4); options.set_max_background_compactions(4); - options.set_max_write_buffer_number(32); - options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); - options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); options } diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index c464602419..3cc36c1be5 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -13,7 +13,7 @@ use std::thread; use std::thread::{Builder, JoinHandle}; use std::time::Duration; -pub const DEFAULT_MAX_LEDGER_SLOTS: u64 = DEFAULT_SLOTS_PER_EPOCH * 5; +pub const DEFAULT_MAX_LEDGER_SLOTS: u64 = 3 * DEFAULT_SLOTS_PER_EPOCH; pub struct LedgerCleanupService { t_cleanup: JoinHandle<()>, @@ -26,6 +26,10 @@ impl LedgerCleanupService { max_ledger_slots: u64, exit: &Arc, ) -> Self { + info!( + "LedgerCleanupService active. Max Ledger Slots {}", + max_ledger_slots + ); let exit = exit.clone(); let t_cleanup = Builder::new() .name("solana-ledger-cleanup".to_string()) diff --git a/multinode-demo/fullnode.sh b/multinode-demo/fullnode.sh index 206e8d6757..ce45fe643e 100755 --- a/multinode-demo/fullnode.sh +++ b/multinode-demo/fullnode.sh @@ -259,6 +259,9 @@ while [[ -n $1 ]]; do elif [[ $1 = --no-sigverify ]]; then args+=("$1") shift + elif [[ $1 = --limit-ledger-size ]]; then + args+=("$1") + shift elif [[ $1 = --rpc-port ]]; then args+=("$1" "$2") shift 2