Drop older slots in the ledger (#5188)

* Add facility to delete blocktree columns in range

* Add ledger cleanup service

* Add local_cluster test
This commit is contained in:
Sagar Dhawan 2019-07-20 13:13:55 -07:00 committed by GitHub
parent 9d2940d487
commit a07b17b9b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 271 additions and 67 deletions

View File

@ -33,6 +33,7 @@ use std::sync::{Arc, RwLock};
pub use self::meta::*;
pub use self::rooted_slot_iterator::*;
use solana_sdk::timing::Slot;
mod db;
mod meta;
@ -193,67 +194,62 @@ impl Blocktree {
false
}
// silently deletes all blocktree column families starting at the given slot
fn delete_all_columns(&self, starting_slot: u64) {
match self.meta_cf.force_delete_all(Some(starting_slot)) {
Ok(_) => (),
Err(e) => error!(
/// Silently deletes all blocktree column families starting at the given slot until the `to` slot
/// Use with care; does not check for integrity and does not update slot metas that
/// refer to deleted slots
pub fn purge_slots(&self, from_slot: Slot, to_slot: Option<Slot>) {
let to_index = to_slot.map(|slot| (slot + 1, 0));
if let Err(e) = self.meta_cf.force_delete(Some(from_slot), to_slot) {
error!(
"Error: {:?} while deleting meta_cf for slot {:?}",
e, starting_slot
),
e, from_slot
)
}
match self.data_cf.force_delete_all(Some((starting_slot, 0))) {
Ok(_) => (),
Err(e) => error!(
if let Err(e) = self.data_cf.force_delete(Some((from_slot, 0)), to_index) {
error!(
"Error: {:?} while deleting data_cf for slot {:?}",
e, starting_slot
),
e, from_slot
)
}
match self
if let Err(e) = self
.erasure_meta_cf
.force_delete_all(Some((starting_slot, 0)))
.force_delete(Some((from_slot, 0)), to_index)
{
Ok(_) => (),
Err(e) => error!(
error!(
"Error: {:?} while deleting erasure_meta_cf for slot {:?}",
e, starting_slot
),
e, from_slot
)
}
match self.erasure_cf.force_delete_all(Some((starting_slot, 0))) {
Ok(_) => (),
Err(e) => error!(
if let Err(e) = self.erasure_cf.force_delete(Some((from_slot, 0)), to_index) {
error!(
"Error: {:?} while deleting erasure_cf for slot {:?}",
e, starting_slot
),
e, from_slot
)
}
match self.orphans_cf.force_delete_all(Some(starting_slot)) {
Ok(_) => (),
Err(e) => error!(
if let Err(e) = self.orphans_cf.force_delete(Some(from_slot), to_slot) {
error!(
"Error: {:?} while deleting orphans_cf for slot {:?}",
e, starting_slot
),
e, from_slot
)
}
match self.index_cf.force_delete_all(Some(starting_slot)) {
Ok(_) => (),
Err(e) => error!(
if let Err(e) = self.index_cf.force_delete(Some(from_slot), to_slot) {
error!(
"Error: {:?} while deleting index_cf for slot {:?}",
e, starting_slot
),
e, from_slot
)
}
match self.dead_slots_cf.force_delete_all(Some(starting_slot)) {
Ok(_) => (),
Err(e) => error!(
if let Err(e) = self.dead_slots_cf.force_delete(Some(from_slot), to_slot) {
error!(
"Error: {:?} while deleting dead_slots_cf for slot {:?}",
e, starting_slot
),
e, from_slot
)
}
let roots_cf = self.db.column::<cf::Root>();
match roots_cf.force_delete_all(Some(starting_slot)) {
Ok(_) => (),
Err(e) => error!(
if let Err(e) = roots_cf.force_delete(Some(from_slot), to_slot) {
error!(
"Error: {:?} while deleting roots_cf for slot {:?}",
e, starting_slot
),
e, from_slot
)
}
}
@ -1007,7 +1003,7 @@ impl Blocktree {
)
.expect("unable to update meta for target slot");
self.delete_all_columns(target_slot + 1);
self.purge_slots(target_slot + 1, None);
// fixup anything that refers to non-root slots and delete the rest
for (slot, mut meta) in self
@ -3472,6 +3468,32 @@ pub mod tests {
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_purge_slots() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let (blobs, _) = make_many_slot_entries(0, 50, 5);
blocktree.write_blobs(blobs).unwrap();
blocktree.purge_slots(0, Some(5));
blocktree
.slot_meta_iterator(0)
.unwrap()
.for_each(|(slot, _)| {
assert!(slot > 5);
});
blocktree.purge_slots(0, None);
blocktree.slot_meta_iterator(0).unwrap().for_each(|(_, _)| {
assert!(false);
});
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[should_panic]
#[test]
fn test_prune_out_of_bounds() {

View File

@ -409,13 +409,21 @@ where
Ok(iter.map(|(key, value)| (C::index(&key), value)))
}
//TODO add a delete_until that goes the other way
pub fn force_delete_all(&self, start_from: Option<C::Index>) -> Result<()> {
let iter = self.iter(start_from)?;
iter.for_each(|(index, _)| match self.delete(index) {
Ok(_) => (),
Err(e) => error!("Error: {:?} while deleting {:?}", e, C::NAME),
});
pub fn force_delete(&self, from: Option<C::Index>, to: Option<C::Index>) -> Result<()>
where
C::Index: PartialOrd,
{
let iter = self.iter(from)?;
for (index, _) in iter {
if let Some(ref to) = to {
if &index > to {
break;
}
}
if let Err(e) = self.delete(index) {
error!("Error: {:?} while deleting {:?}", e, C::NAME)
}
}
Ok(())
}

View File

@ -0,0 +1,100 @@
//! The `ledger_cleanup_service` drops older ledger data to limit disk space usage
use crate::blocktree::Blocktree;
use crate::result::{Error, Result};
use crate::service::Service;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::DEFAULT_SLOTS_PER_EPOCH;
use std::string::ToString;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::Arc;
use std::thread;
use std::thread::{Builder, JoinHandle};
use std::time::Duration;
pub const DEFAULT_MAX_LEDGER_SLOTS: u64 = DEFAULT_SLOTS_PER_EPOCH * 5;
pub struct LedgerCleanupService {
t_cleanup: JoinHandle<()>,
}
impl LedgerCleanupService {
pub fn new(
slot_full_receiver: Receiver<(u64, Pubkey)>,
blocktree: Arc<Blocktree>,
max_ledger_slots: u64,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let t_cleanup = Builder::new()
.name("solana-ledger-cleanup".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) =
Self::cleanup_ledger(&slot_full_receiver, &blocktree, max_ledger_slots)
{
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => info!("Error from cleanup_ledger: {:?}", e),
}
}
})
.unwrap();
Self { t_cleanup }
}
fn cleanup_ledger(
slot_full_receiver: &Receiver<(u64, Pubkey)>,
blocktree: &Arc<Blocktree>,
max_ledger_slots: u64,
) -> Result<()> {
let (slot, _) = slot_full_receiver.recv_timeout(Duration::from_secs(1))?;
if slot > max_ledger_slots {
//cleanup
blocktree.purge_slots(0, Some(slot - max_ledger_slots));
}
Ok(())
}
}
impl Service for LedgerCleanupService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.t_cleanup.join()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blocktree::get_tmp_ledger_path;
use crate::blocktree::tests::make_many_slot_entries;
use std::sync::mpsc::channel;
#[test]
fn test_cleanup() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let (blobs, _) = make_many_slot_entries(0, 50, 5);
blocktree.write_blobs(blobs).unwrap();
let blocktree = Arc::new(blocktree);
let (sender, receiver) = channel();
//send a signal to kill slots 0-40
sender.send((50, Pubkey::default())).unwrap();
LedgerCleanupService::cleanup_ledger(&receiver, &blocktree, 10).unwrap();
//check that 0-40 don't exist
blocktree
.slot_meta_iterator(0)
.unwrap()
.for_each(|(slot, _)| assert!(slot > 40));
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
}

View File

@ -42,6 +42,7 @@ pub mod gossip_service;
pub mod leader_schedule;
pub mod leader_schedule_cache;
pub mod leader_schedule_utils;
pub mod ledger_cleanup_service;
pub mod local_cluster;
pub mod local_vote_signer_service;
pub mod packet;

View File

@ -88,12 +88,12 @@ impl ReplayStage {
subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> (Self, Receiver<(u64, Pubkey)>, Receiver<Vec<Arc<Bank>>>)
slot_full_senders: Vec<Sender<(u64, Pubkey)>>,
) -> (Self, Receiver<Vec<Arc<Bank>>>)
where
T: 'static + KeypairUtil + Send + Sync,
{
let (root_bank_sender, root_bank_receiver) = channel();
let (slot_full_sender, slot_full_receiver) = channel();
trace!("replay stage");
let exit_ = exit.clone();
let subscriptions = subscriptions.clone();
@ -132,7 +132,7 @@ impl ReplayStage {
&bank_forks,
&my_pubkey,
&mut progress,
&slot_full_sender,
&slot_full_senders,
);
let votable = Self::generate_votable_banks(&bank_forks, &tower, &mut progress);
@ -191,7 +191,7 @@ impl ReplayStage {
Ok(())
})
.unwrap();
(Self { t_replay }, slot_full_receiver, root_bank_receiver)
(Self { t_replay }, root_bank_receiver)
}
fn maybe_start_leader(
@ -409,7 +409,7 @@ impl ReplayStage {
bank_forks: &Arc<RwLock<BankForks>>,
my_pubkey: &Pubkey,
progress: &mut HashMap<u64, ForkProgress>,
slot_full_sender: &Sender<(u64, Pubkey)>,
slot_full_senders: &[Sender<(u64, Pubkey)>],
) -> bool {
let mut did_complete_bank = false;
let active_banks = bank_forks.read().unwrap().active_banks();
@ -435,7 +435,7 @@ impl ReplayStage {
assert_eq!(*bank_slot, bank.slot());
if bank.tick_height() == bank.max_tick_height() {
did_complete_bank = true;
Self::process_completed_bank(my_pubkey, bank, slot_full_sender);
Self::process_completed_bank(my_pubkey, bank, slot_full_senders);
} else {
trace!(
"bank {} not completed tick_height: {}, max_tick_height: {}",
@ -618,13 +618,15 @@ impl ReplayStage {
fn process_completed_bank(
my_pubkey: &Pubkey,
bank: Arc<Bank>,
slot_full_sender: &Sender<(u64, Pubkey)>,
slot_full_senders: &[Sender<(u64, Pubkey)>],
) {
bank.freeze();
info!("bank frozen {}", bank.slot());
if let Err(e) = slot_full_sender.send((bank.slot(), *bank.collector_id())) {
trace!("{} slot_full alert failed: {:?}", my_pubkey, e);
}
slot_full_senders.iter().for_each(|sender| {
if let Err(e) = sender.send((bank.slot(), *bank.collector_id())) {
trace!("{} slot_full alert failed: {:?}", my_pubkey, e);
}
});
}
fn generate_new_bank_forks(

View File

@ -18,6 +18,7 @@ use crate::blockstream_service::BlockstreamService;
use crate::blocktree::{Blocktree, CompletedSlotsReceiver};
use crate::cluster_info::ClusterInfo;
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::ledger_cleanup_service::LedgerCleanupService;
use crate::poh_recorder::PohRecorder;
use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage;
@ -37,6 +38,7 @@ pub struct Tvu {
retransmit_stage: RetransmitStage,
replay_stage: ReplayStage,
blockstream_service: Option<BlockstreamService>,
ledger_cleanup_service: Option<LedgerCleanupService>,
storage_stage: StorageStage,
}
@ -64,6 +66,7 @@ impl Tvu {
blocktree: Arc<Blocktree>,
storage_state: &StorageState,
blockstream: Option<&String>,
max_ledger_slots: Option<u64>,
ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
@ -110,7 +113,10 @@ impl Tvu {
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
);
let (replay_stage, slot_full_receiver, root_bank_receiver) = ReplayStage::new(
let (blockstream_slot_sender, blockstream_slot_receiver) = channel();
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
let (replay_stage, root_bank_receiver) = ReplayStage::new(
&keypair.pubkey(),
vote_account,
voting_keypair,
@ -122,11 +128,12 @@ impl Tvu {
subscriptions,
poh_recorder,
leader_schedule_cache,
vec![blockstream_slot_sender, ledger_cleanup_slot_sender],
);
let blockstream_service = if blockstream.is_some() {
let blockstream_service = BlockstreamService::new(
slot_full_receiver,
blockstream_slot_receiver,
blocktree.clone(),
blockstream.unwrap().to_string(),
&exit,
@ -136,6 +143,15 @@ impl Tvu {
None
};
let ledger_cleanup_service = max_ledger_slots.map(|max_ledger_slots| {
LedgerCleanupService::new(
ledger_cleanup_slot_receiver,
blocktree.clone(),
max_ledger_slots,
&exit,
)
});
let storage_stage = StorageStage::new(
storage_state,
root_bank_receiver,
@ -152,6 +168,7 @@ impl Tvu {
retransmit_stage,
replay_stage,
blockstream_service,
ledger_cleanup_service,
storage_stage,
}
}
@ -167,6 +184,9 @@ impl Service for Tvu {
if self.blockstream_service.is_some() {
self.blockstream_service.unwrap().join()?;
}
if self.ledger_cleanup_service.is_some() {
self.ledger_cleanup_service.unwrap().join()?;
}
self.replay_stage.join()?;
Ok(())
}
@ -226,6 +246,7 @@ pub mod tests {
blocktree,
&StorageState::default(),
None,
None,
l_receiver,
&Arc::new(RpcSubscriptions::default()),
&poh_recorder,

View File

@ -40,6 +40,7 @@ pub struct ValidatorConfig {
pub account_paths: Option<String>,
pub rpc_config: JsonRpcConfig,
pub snapshot_path: Option<String>,
pub max_ledger_slots: Option<u64>,
pub broadcast_stage_type: BroadcastStageType,
pub erasure_config: ErasureConfig,
}
@ -51,6 +52,7 @@ impl Default for ValidatorConfig {
voting_disabled: false,
blockstream: None,
storage_slots_per_turn: DEFAULT_SLOTS_PER_TURN,
max_ledger_slots: None,
account_paths: None,
rpc_config: JsonRpcConfig::default(),
snapshot_path: None,
@ -247,6 +249,7 @@ impl Validator {
blocktree.clone(),
&storage_state,
config.blockstream.as_ref(),
config.max_ledger_slots,
ledger_signal_receiver,
&subscriptions,
&poh_recorder,

View File

@ -3,6 +3,7 @@ extern crate solana;
use hashbrown::HashSet;
use log::*;
use serial_test_derive::serial;
use solana::blocktree::Blocktree;
use solana::broadcast_stage::BroadcastStageType;
use solana::cluster::Cluster;
use solana::cluster_tests;
@ -16,6 +17,44 @@ use solana_sdk::timing;
use std::thread::sleep;
use std::time::Duration;
#[test]
#[serial]
fn test_ledger_cleanup_service() {
solana_logger::setup();
let num_nodes = 3;
let mut validator_config = ValidatorConfig::default();
validator_config.max_ledger_slots = Some(100);
let config = ClusterConfig {
cluster_lamports: 10_000,
poh_config: PohConfig::new_sleep(Duration::from_millis(50)),
node_stakes: vec![100; num_nodes],
validator_configs: vec![validator_config.clone(); num_nodes],
..ClusterConfig::default()
};
let mut cluster = LocalCluster::new(&config);
// 200ms/per * 100 = 20 seconds, so sleep a little longer than that.
sleep(Duration::from_secs(60));
cluster_tests::spend_and_verify_all_nodes(
&cluster.entry_point_info,
&cluster.funding_keypair,
num_nodes,
HashSet::new(),
);
cluster.close_preserve_ledgers();
//check everyone's ledgers and make sure only ~100 slots are stored
for (_, info) in &cluster.fullnode_infos {
let mut slots = 0;
let blocktree = Blocktree::open(&info.info.ledger_path).unwrap();
blocktree
.slot_meta_iterator(0)
.unwrap()
.for_each(|_| slots += 1);
// with 3 nodes upto 3 slots can be in progress and not complete so max slots in blocktree should be upto 103
assert!(slots <= 103, "got {}", slots);
}
}
#[test]
#[serial]
fn test_spend_and_verify_all_nodes_1() {

View File

@ -134,6 +134,7 @@ fn test_replay() {
blocktree,
&StorageState::default(),
None,
None,
ledger_signal_receiver,
&Arc::new(RpcSubscriptions::default()),
&poh_recorder,

View File

@ -2,6 +2,7 @@ use clap::{crate_description, crate_name, crate_version, App, Arg};
use log::*;
use solana::cluster_info::{Node, FULLNODE_PORT_RANGE};
use solana::contact_info::ContactInfo;
use solana::ledger_cleanup_service::DEFAULT_MAX_LEDGER_SLOTS;
use solana::local_vote_signer_service::LocalVoteSignerService;
use solana::service::Service;
use solana::socketaddr;
@ -163,6 +164,13 @@ fn main() {
.takes_value(true)
.help("Snapshot path"),
)
.arg(
clap::Arg::with_name("limit_ledger_size")
.long("limit-ledger-size")
.takes_value(false)
.requires("snapshot_path")
.help("drop older slots in the ledger"),
)
.arg(
clap::Arg::with_name("skip_ledger_verify")
.long("skip-ledger-verify")
@ -230,13 +238,12 @@ fn main() {
if let Some(paths) = matches.value_of("accounts") {
validator_config.account_paths = Some(paths.to_string());
} else {
validator_config.account_paths = None;
}
if let Some(paths) = matches.value_of("snapshot_path") {
validator_config.snapshot_path = Some(paths.to_string());
} else {
validator_config.snapshot_path = None;
}
if matches.is_present("limit_ledger_size") {
validator_config.max_ledger_slots = Some(DEFAULT_MAX_LEDGER_SLOTS);
}
let cluster_entrypoint = matches.value_of("entrypoint").map(|entrypoint| {
let entrypoint_addr = solana_netutil::parse_host_port(entrypoint)