diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 36e48bd19..373807ecd 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -177,6 +177,15 @@ impl Blocktree { self.meta_cf.get(slot) } + pub fn is_full(&self, slot: u64) -> bool { + if let Ok(meta) = self.meta_cf.get(slot) { + if let Some(meta) = meta { + return meta.is_full(); + } + } + false + } + pub fn erasure_meta(&self, slot: u64, set_index: u64) -> Result> { self.erasure_meta_cf.get((slot, set_index)) } diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 7a896c038..55e5db10b 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -46,6 +46,14 @@ pub struct StorageStateInner { slot: u64, } +// Used to track root slots in storage stage +#[derive(Default)] +struct StorageSlots { + last_root: u64, + slot_count: u64, + pending_roots: Vec, +} + #[derive(Clone, Default)] pub struct StorageState { state: Arc>, @@ -151,8 +159,7 @@ impl StorageStage { .name("solana-storage-mining-verify-stage".to_string()) .spawn(move || { let mut current_key = 0; - let mut slot_count = 0; - let mut last_root = 0; + let mut storage_slots = StorageSlots::default(); loop { if let Some(ref some_blocktree) = blocktree { if let Err(e) = Self::process_entries( @@ -160,8 +167,7 @@ impl StorageStage { &storage_state_inner, &slot_receiver, &some_blocktree, - &mut slot_count, - &mut last_root, + &mut storage_slots, &mut current_key, storage_rotate_count, &instruction_sender, @@ -414,19 +420,28 @@ impl StorageStage { storage_state: &Arc>, slot_receiver: &Receiver>, blocktree: &Arc, - slot_count: &mut u64, - last_root: &mut u64, + storage_slots: &mut StorageSlots, current_key_idx: &mut usize, storage_rotate_count: u64, instruction_sender: &InstructionSender, ) -> Result<()> { let timeout = Duration::new(1, 0); - let slots: Vec = slot_receiver.recv_timeout(timeout)?; + storage_slots + .pending_roots + .append(&mut slot_receiver.recv_timeout(timeout)?); + storage_slots + .pending_roots + .sort_unstable_by(|a, b| b.cmp(a)); // check if any rooted slots were missed leading up to this one and bump slot count and process proofs for each missed root - for slot in slots.into_iter().rev() { - if slot > *last_root { - *slot_count += 1; - *last_root = slot; + while let Some(slot) = storage_slots.pending_roots.pop() { + if slot > storage_slots.last_root { + if !blocktree.is_full(slot) { + // stick this slot back into pending_roots. Evaluate it next time around. + storage_slots.pending_roots.push(slot); + break; + } + storage_slots.slot_count += 1; + storage_slots.last_root = slot; if let Ok(entries) = blocktree.get_slot_entries(slot, 0, None) { for entry in &entries { @@ -450,12 +465,12 @@ impl StorageStage { } } } - if *slot_count % storage_rotate_count == 0 { + if storage_slots.slot_count % storage_rotate_count == 0 { // assume the last entry in the slot is the blockhash for that slot let entry_hash = entries.last().unwrap().hash; debug!( "crosses sending at root slot: {}! with last entry's hash {}", - slot_count, entry_hash + storage_slots.slot_count, entry_hash ); Self::process_entry_crossing( &storage_keypair, @@ -526,10 +541,11 @@ impl Service for StorageStage { #[cfg(test)] mod tests { use super::*; + use crate::blocktree::tests::{entries_to_blobs, make_slot_entries}; use crate::blocktree::{create_new_tmp_ledger, Blocktree}; use crate::cluster_info::ClusterInfo; use crate::contact_info::ContactInfo; - use crate::entry::{make_tiny_test_entries, Entry}; + use crate::entry::Entry; use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use crate::service::Service; use rayon::prelude::*; @@ -587,20 +603,16 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000); - let ticks_per_slot = genesis_block.ticks_per_slot; let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); - let entries = make_tiny_test_entries(64); + let (blobs, _entries) = make_slot_entries(1, 0, 64); let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); let slot = 1; let bank = Arc::new(Bank::new(&genesis_block)); let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank], 0))); - blocktree - .write_entries(slot, 0, 0, ticks_per_slot, &entries) - .unwrap(); + blocktree.insert_data_blobs(blobs).unwrap(); let cluster_info = test_cluster_info(&keypair.pubkey()); - let (slot_sender, slot_receiver) = channel(); let storage_state = StorageState::new(); let storage_stage = StorageStage::new( @@ -624,9 +636,8 @@ mod tests { let rooted_slots = (slot..slot + SLOTS_PER_SEGMENT + 1) .map(|i| { - blocktree - .write_entries(i, 0, 0, ticks_per_slot, &entries) - .unwrap(); + let (blobs, _entries) = make_slot_entries(i, i - 1, 64); + blocktree.insert_data_blobs(blobs).unwrap(); i }) .collect::>(); @@ -662,14 +673,12 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000); - let ticks_per_slot = genesis_block.ticks_per_slot;; let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); - let entries = make_tiny_test_entries(128); let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); - blocktree - .write_entries(1, 0, 0, ticks_per_slot, &entries) - .unwrap(); + let (blobs, entries) = make_slot_entries(1, 0, 128); + blocktree.insert_data_blobs(blobs).unwrap(); + let bank = Arc::new(Bank::new(&genesis_block)); let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank], 0))); let cluster_info = test_cluster_info(&keypair.pubkey()); @@ -707,9 +716,10 @@ mod tests { let mining_proof_tx = Transaction::new_unsigned_instructions(vec![mining_proof_ix]); let mining_txs = vec![mining_proof_tx]; - let proof_entries = vec![Entry::new(&Hash::default(), 1, mining_txs)]; + let next_hash = solana_sdk::hash::hash(entries.last().unwrap().hash.as_ref()); + let proof_entry = Entry::new(&next_hash, 1, mining_txs); blocktree - .write_entries(2, 0, 0, ticks_per_slot, &proof_entries) + .insert_data_blobs(entries_to_blobs(&vec![proof_entry], 2, 1, true)) .unwrap(); slot_sender.send(vec![2]).unwrap();