From f1e5edee14f21877703d2f380c7d20fc70dec452 Mon Sep 17 00:00:00 2001 From: carllin Date: Mon, 20 May 2019 19:04:18 -0700 Subject: [PATCH] Modify Roots Column To Support Multiple Roots (#4321) * Fix 1) Roots column family to handle storing multiple slots, 2) Store all slots on the rooted path in the roots column family --- core/src/blocktree.rs | 64 ++++++++++++++++++++++----- core/src/blocktree/db.rs | 7 ++- core/src/blocktree/kvs.rs | 14 +++--- core/src/blocktree/rocks.rs | 14 +++--- core/src/blocktree_processor.rs | 76 +++++++++++++++++++++++++++++++-- core/src/repair_service.rs | 21 +++++---- core/src/replay_stage.rs | 8 +--- 7 files changed, 163 insertions(+), 41 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 8c689cb2c..3b66daf5c 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -113,7 +113,7 @@ impl Blocktree { // Open the database let db = Database::open(&ledger_path)?; - let batch_processor = Arc::new(RwLock::new(db.batch_processor())); + let batch_processor = unsafe { Arc::new(RwLock::new(db.batch_processor())) }; // Create the metadata column family let meta_cf = db.column(); @@ -804,24 +804,32 @@ impl Blocktree { } pub fn is_root(&self, slot: u64) -> bool { - if let Ok(Some(root_slot)) = self.db.get::(()) { - root_slot == slot + if let Ok(Some(true)) = self.db.get::(slot) { + true } else { false } } - pub fn set_root(&self, slot: u64) -> Result<()> { - self.db.put::((), &slot)?; + pub fn set_root(&self, new_root: u64, prev_root: u64) -> Result<()> { + let mut current_slot = new_root; + unsafe { + let mut batch_processor = self.db.batch_processor(); + let mut write_batch = batch_processor.batch()?; + if new_root == 0 { + write_batch.put::(0, &true)?; + } else { + while current_slot != prev_root { + write_batch.put::(current_slot, &true)?; + current_slot = self.meta(current_slot).unwrap().unwrap().parent_slot; + } + } + + batch_processor.write(write_batch)?; + } Ok(()) } - pub fn get_root(&self) -> Result { - let root_opt = self.db.get::(())?; - - Ok(root_opt.unwrap_or(0)) - } - pub fn get_orphans(&self, max: Option) -> Vec { let mut results = vec![]; @@ -3112,6 +3120,40 @@ pub mod tests { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + #[test] + fn test_set_root() { + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + blocktree.set_root(0, 0).unwrap(); + let chained_slots = vec![0, 2, 4, 7, 12, 15]; + + // Make a chain of slots + let all_blobs = make_chaining_slot_entries(&chained_slots, 10); + + // Insert the chain of slots into the ledger + for (slot_blobs, _) in all_blobs { + blocktree.insert_data_blobs(&slot_blobs[..]).unwrap(); + } + + blocktree.set_root(4, 0).unwrap(); + for i in &chained_slots[0..3] { + assert!(blocktree.is_root(*i)); + } + + for i in &chained_slots[3..] { + assert!(!blocktree.is_root(*i)); + } + + blocktree.set_root(15, 4).unwrap(); + + for i in chained_slots { + assert!(blocktree.is_root(i)); + } + + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + mod erasure { use super::*; use crate::blocktree::meta::ErasureMetaStatus; diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 333dd983d..90be41b77 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -279,7 +279,12 @@ where } } - pub fn batch_processor(&self) -> BatchProcessor { + // Note this returns an object that can be used to directly write to multiple column families. + // This circumvents the synchronization around APIs that in Blocktree that use + // blocktree.batch_processor, so this API should only be used if the caller is sure they + // are writing to data iin columns that will not be corrupted by any simultaneous blocktree + // operations. + pub unsafe fn batch_processor(&self) -> BatchProcessor { BatchProcessor { backend: Arc::clone(&self.backend), } diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs index 90c3d28bd..f64c92ab6 100644 --- a/core/src/blocktree/kvs.rs +++ b/core/src/blocktree/kvs.rs @@ -121,17 +121,21 @@ impl TypedColumn for cf::Orphans { impl Column for cf::Root { const NAME: &'static str = super::ROOT_CF; - type Index = (); + type Index = u64; - fn key(_: ()) -> Key { - Key::default() + fn key(slot: u64) -> Key { + let mut key = Key::default(); + BigEndian::write_u64(&mut key.0[8..16], slot); + key } - fn index(_: &Key) {} + fn index(key: &Key) -> u64 { + BigEndian::read_u64(&key.0[8..16]) + } } impl TypedColumn for cf::Root { - type Type = u64; + type Type = bool; } impl Column for cf::SlotMeta { diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index fe1412131..81a2483c0 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -182,17 +182,21 @@ impl TypedColumn for cf::Orphans { impl Column for cf::Root { const NAME: &'static str = super::ROOT_CF; - type Index = (); + type Index = u64; - fn key(_: ()) -> Vec { - vec![0; 8] + fn key(slot: u64) -> Vec { + let mut key = vec![0; 8]; + BigEndian::write_u64(&mut key[..], slot); + key } - fn index(_: &[u8]) {} + fn index(key: &[u8]) -> u64 { + BigEndian::read_u64(&key[..8]) + } } impl TypedColumn for cf::Root { - type Type = u64; + type Type = bool; } impl Column for cf::SlotMeta { diff --git a/core/src/blocktree_processor.rs b/core/src/blocktree_processor.rs index 7d04c212b..1736553dd 100644 --- a/core/src/blocktree_processor.rs +++ b/core/src/blocktree_processor.rs @@ -149,6 +149,8 @@ pub fn process_blocktree( vec![(slot, meta, bank, entry_height, last_entry_hash)] }; + blocktree.set_root(0, 0).expect("Couldn't set first root"); + let leader_schedule_cache = LeaderScheduleCache::new(*pending_slots[0].2.epoch_schedule(), 0); let mut fork_info = vec![]; @@ -235,7 +237,7 @@ pub fn process_blocktree( .unwrap(); // only process full slots in blocktree_processor, replay_stage - // handles any partials + // handles any partials if next_meta.is_full() { let next_bank = Arc::new(Bank::new_from_parent( &bank, @@ -285,6 +287,7 @@ mod tests { use crate::blocktree::tests::entries_to_blobs; use crate::entry::{create_ticks, next_entry, next_entry_mut, Entry}; use crate::genesis_utils::{create_genesis_block, create_genesis_block_with_leader}; + use solana_runtime::epoch_schedule::EpochSchedule; use solana_sdk::hash::Hash; use solana_sdk::instruction::InstructionError; use solana_sdk::pubkey::Pubkey; @@ -409,7 +412,7 @@ mod tests { info!("last_fork1_entry.hash: {:?}", last_fork1_entry_hash); info!("last_fork2_entry.hash: {:?}", last_fork2_entry_hash); - blocktree.set_root(4).unwrap(); + blocktree.set_root(4, 0).unwrap(); let (bank_forks, bank_forks_info, _) = process_blocktree(&genesis_block, &blocktree, None).unwrap(); @@ -483,8 +486,8 @@ mod tests { info!("last_fork1_entry.hash: {:?}", last_fork1_entry_hash); info!("last_fork2_entry.hash: {:?}", last_fork2_entry_hash); - blocktree.set_root(0).unwrap(); - blocktree.set_root(1).unwrap(); + blocktree.set_root(0, 0).unwrap(); + blocktree.set_root(1, 0).unwrap(); let (bank_forks, bank_forks_info, _) = process_blocktree(&genesis_block, &blocktree, None).unwrap(); @@ -530,6 +533,63 @@ mod tests { } } + #[test] + fn test_process_blocktree_epoch_boundary_root() { + solana_logger::setup(); + + let (genesis_block, _mint_keypair) = create_genesis_block(10_000); + let ticks_per_slot = genesis_block.ticks_per_slot; + + // Create a new ledger with slot 0 full of ticks + let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_block); + let mut last_entry_hash = blockhash; + + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to successfully open database ledger"); + + // Let last_slot be the number of slots in the first two epochs + let epoch_schedule = get_epoch_schedule(&genesis_block, None); + let last_slot = epoch_schedule.get_last_slot_in_epoch(1); + + // Create a single chain of slots with all indexes in the range [0, last_slot + 1] + for i in 1..=last_slot + 1 { + last_entry_hash = fill_blocktree_slot_with_ticks( + &blocktree, + ticks_per_slot, + i, + i - 1, + last_entry_hash, + ); + } + + // Set a root on the last slot of the last confirmed epoch + blocktree.set_root(last_slot, 0).unwrap(); + + // Set a root on the next slot of the confrimed epoch + blocktree.set_root(last_slot + 1, last_slot).unwrap(); + + // Check that we can properly restart the ledger / leader scheduler doesn't fail + let (bank_forks, bank_forks_info, _) = + process_blocktree(&genesis_block, &blocktree, None).unwrap(); + + assert_eq!(bank_forks_info.len(), 1); // There is one fork + assert_eq!( + bank_forks_info[0], + BankForksInfo { + bank_slot: last_slot + 1, // Head is last_slot + 1 + entry_height: ticks_per_slot * (last_slot + 2), + } + ); + + // The latest root should have purged all its parents + assert!(&bank_forks[last_slot + 1] + .parents() + .iter() + .map(|bank| bank.slot()) + .collect::>() + .is_empty()); + } + #[test] fn test_first_err() { assert_eq!(first_err(&[Ok(())]), Ok(())); @@ -1111,4 +1171,12 @@ mod tests { bank.squash(); } } + + fn get_epoch_schedule( + genesis_block: &GenesisBlock, + account_paths: Option, + ) -> EpochSchedule { + let bank = Bank::new_with_paths(&genesis_block, account_paths); + bank.epoch_schedule().clone() + } } diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 0193d6213..8e98b391e 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -136,7 +136,7 @@ impl RepairService { &cluster_info, completed_slots_receiver, ); - Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH) + Self::generate_repairs(blocktree, root, MAX_REPAIR_LENGTH) } } }; @@ -207,11 +207,14 @@ impl RepairService { Ok(repairs) } - fn generate_repairs(blocktree: &Blocktree, max_repairs: usize) -> Result<(Vec)> { + fn generate_repairs( + blocktree: &Blocktree, + root: u64, + max_repairs: usize, + ) -> Result<(Vec)> { // Slot height and blob indexes for blobs we want to repair let mut repairs: Vec = vec![]; - let slot = blocktree.get_root()?; - Self::generate_repairs_for_fork(blocktree, &mut repairs, max_repairs, slot); + Self::generate_repairs_for_fork(blocktree, &mut repairs, max_repairs, root); // TODO: Incorporate gossip to determine priorities for repair? @@ -382,7 +385,7 @@ mod test { blobs.extend(blobs2); blocktree.write_blobs(&blobs).unwrap(); assert_eq!( - RepairService::generate_repairs(&blocktree, 2).unwrap(), + RepairService::generate_repairs(&blocktree, 0, 2).unwrap(), vec![ RepairType::HighestBlob(0, 0), RepairType::Orphan(0), @@ -408,7 +411,7 @@ mod test { // Check that repair tries to patch the empty slot assert_eq!( - RepairService::generate_repairs(&blocktree, 2).unwrap(), + RepairService::generate_repairs(&blocktree, 0, 2).unwrap(), vec![RepairType::HighestBlob(0, 0), RepairType::Orphan(0)] ); } @@ -447,12 +450,12 @@ mod test { .collect(); assert_eq!( - RepairService::generate_repairs(&blocktree, std::usize::MAX).unwrap(), + RepairService::generate_repairs(&blocktree, 0, std::usize::MAX).unwrap(), expected ); assert_eq!( - RepairService::generate_repairs(&blocktree, expected.len() - 2).unwrap()[..], + RepairService::generate_repairs(&blocktree, 0, expected.len() - 2).unwrap()[..], expected[0..expected.len() - 2] ); } @@ -479,7 +482,7 @@ mod test { let expected: Vec = vec![RepairType::HighestBlob(0, num_entries_per_slot)]; assert_eq!( - RepairService::generate_repairs(&blocktree, std::usize::MAX).unwrap(), + RepairService::generate_repairs(&blocktree, 0, std::usize::MAX).unwrap(), expected ); } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 4f605ac2d..570833561 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -97,11 +97,6 @@ impl ReplayStage { let my_id = *my_id; let mut ticks_per_slot = 0; let mut locktower = Locktower::new_from_forks(&bank_forks.read().unwrap(), &my_id); - if let Some(root) = locktower.root() { - blocktree - .set_root(root) - .expect("blocktree.set_root() failed at replay_stage startup"); - } // Start the replay stage loop let leader_schedule_cache = leader_schedule_cache.clone(); let vote_account = *vote_account; @@ -321,9 +316,10 @@ impl ReplayStage { .map(|bank| bank.slot()) .collect::>(); rooted_slots.push(root_bank.slot()); + let old_root = bank_forks.read().unwrap().root(); bank_forks.write().unwrap().set_root(new_root); leader_schedule_cache.set_root(new_root); - blocktree.set_root(new_root)?; + blocktree.set_root(new_root, old_root)?; Self::handle_new_root(&bank_forks, progress); root_slot_sender.send(rooted_slots)?; }