Fix storage stage operating on empty slots (#4474)
* Fix storage stage operating on empty slots * Reduce fn argument count * Fix tests
This commit is contained in:
parent
4404634b14
commit
6ed071c4dd
|
@ -177,6 +177,15 @@ impl Blocktree {
|
||||||
self.meta_cf.get(slot)
|
self.meta_cf.get(slot)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_full(&self, slot: u64) -> bool {
|
||||||
|
if let Ok(meta) = self.meta_cf.get(slot) {
|
||||||
|
if let Some(meta) = meta {
|
||||||
|
return meta.is_full();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
pub fn erasure_meta(&self, slot: u64, set_index: u64) -> Result<Option<ErasureMeta>> {
|
pub fn erasure_meta(&self, slot: u64, set_index: u64) -> Result<Option<ErasureMeta>> {
|
||||||
self.erasure_meta_cf.get((slot, set_index))
|
self.erasure_meta_cf.get((slot, set_index))
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,14 @@ pub struct StorageStateInner {
|
||||||
slot: u64,
|
slot: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Used to track root slots in storage stage
|
||||||
|
#[derive(Default)]
|
||||||
|
struct StorageSlots {
|
||||||
|
last_root: u64,
|
||||||
|
slot_count: u64,
|
||||||
|
pending_roots: Vec<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
pub struct StorageState {
|
pub struct StorageState {
|
||||||
state: Arc<RwLock<StorageStateInner>>,
|
state: Arc<RwLock<StorageStateInner>>,
|
||||||
|
@ -151,8 +159,7 @@ impl StorageStage {
|
||||||
.name("solana-storage-mining-verify-stage".to_string())
|
.name("solana-storage-mining-verify-stage".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let mut current_key = 0;
|
let mut current_key = 0;
|
||||||
let mut slot_count = 0;
|
let mut storage_slots = StorageSlots::default();
|
||||||
let mut last_root = 0;
|
|
||||||
loop {
|
loop {
|
||||||
if let Some(ref some_blocktree) = blocktree {
|
if let Some(ref some_blocktree) = blocktree {
|
||||||
if let Err(e) = Self::process_entries(
|
if let Err(e) = Self::process_entries(
|
||||||
|
@ -160,8 +167,7 @@ impl StorageStage {
|
||||||
&storage_state_inner,
|
&storage_state_inner,
|
||||||
&slot_receiver,
|
&slot_receiver,
|
||||||
&some_blocktree,
|
&some_blocktree,
|
||||||
&mut slot_count,
|
&mut storage_slots,
|
||||||
&mut last_root,
|
|
||||||
&mut current_key,
|
&mut current_key,
|
||||||
storage_rotate_count,
|
storage_rotate_count,
|
||||||
&instruction_sender,
|
&instruction_sender,
|
||||||
|
@ -414,19 +420,28 @@ impl StorageStage {
|
||||||
storage_state: &Arc<RwLock<StorageStateInner>>,
|
storage_state: &Arc<RwLock<StorageStateInner>>,
|
||||||
slot_receiver: &Receiver<Vec<u64>>,
|
slot_receiver: &Receiver<Vec<u64>>,
|
||||||
blocktree: &Arc<Blocktree>,
|
blocktree: &Arc<Blocktree>,
|
||||||
slot_count: &mut u64,
|
storage_slots: &mut StorageSlots,
|
||||||
last_root: &mut u64,
|
|
||||||
current_key_idx: &mut usize,
|
current_key_idx: &mut usize,
|
||||||
storage_rotate_count: u64,
|
storage_rotate_count: u64,
|
||||||
instruction_sender: &InstructionSender,
|
instruction_sender: &InstructionSender,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timeout = Duration::new(1, 0);
|
let timeout = Duration::new(1, 0);
|
||||||
let slots: Vec<u64> = slot_receiver.recv_timeout(timeout)?;
|
storage_slots
|
||||||
|
.pending_roots
|
||||||
|
.append(&mut slot_receiver.recv_timeout(timeout)?);
|
||||||
|
storage_slots
|
||||||
|
.pending_roots
|
||||||
|
.sort_unstable_by(|a, b| b.cmp(a));
|
||||||
// check if any rooted slots were missed leading up to this one and bump slot count and process proofs for each missed root
|
// check if any rooted slots were missed leading up to this one and bump slot count and process proofs for each missed root
|
||||||
for slot in slots.into_iter().rev() {
|
while let Some(slot) = storage_slots.pending_roots.pop() {
|
||||||
if slot > *last_root {
|
if slot > storage_slots.last_root {
|
||||||
*slot_count += 1;
|
if !blocktree.is_full(slot) {
|
||||||
*last_root = slot;
|
// stick this slot back into pending_roots. Evaluate it next time around.
|
||||||
|
storage_slots.pending_roots.push(slot);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
storage_slots.slot_count += 1;
|
||||||
|
storage_slots.last_root = slot;
|
||||||
|
|
||||||
if let Ok(entries) = blocktree.get_slot_entries(slot, 0, None) {
|
if let Ok(entries) = blocktree.get_slot_entries(slot, 0, None) {
|
||||||
for entry in &entries {
|
for entry in &entries {
|
||||||
|
@ -450,12 +465,12 @@ impl StorageStage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if *slot_count % storage_rotate_count == 0 {
|
if storage_slots.slot_count % storage_rotate_count == 0 {
|
||||||
// assume the last entry in the slot is the blockhash for that slot
|
// assume the last entry in the slot is the blockhash for that slot
|
||||||
let entry_hash = entries.last().unwrap().hash;
|
let entry_hash = entries.last().unwrap().hash;
|
||||||
debug!(
|
debug!(
|
||||||
"crosses sending at root slot: {}! with last entry's hash {}",
|
"crosses sending at root slot: {}! with last entry's hash {}",
|
||||||
slot_count, entry_hash
|
storage_slots.slot_count, entry_hash
|
||||||
);
|
);
|
||||||
Self::process_entry_crossing(
|
Self::process_entry_crossing(
|
||||||
&storage_keypair,
|
&storage_keypair,
|
||||||
|
@ -526,10 +541,11 @@ impl Service for StorageStage {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::blocktree::tests::{entries_to_blobs, make_slot_entries};
|
||||||
use crate::blocktree::{create_new_tmp_ledger, Blocktree};
|
use crate::blocktree::{create_new_tmp_ledger, Blocktree};
|
||||||
use crate::cluster_info::ClusterInfo;
|
use crate::cluster_info::ClusterInfo;
|
||||||
use crate::contact_info::ContactInfo;
|
use crate::contact_info::ContactInfo;
|
||||||
use crate::entry::{make_tiny_test_entries, Entry};
|
use crate::entry::Entry;
|
||||||
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
|
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
|
@ -587,20 +603,16 @@ mod tests {
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000);
|
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000);
|
||||||
let ticks_per_slot = genesis_block.ticks_per_slot;
|
|
||||||
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
|
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
|
||||||
|
|
||||||
let entries = make_tiny_test_entries(64);
|
let (blobs, _entries) = make_slot_entries(1, 0, 64);
|
||||||
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
|
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
|
||||||
let slot = 1;
|
let slot = 1;
|
||||||
let bank = Arc::new(Bank::new(&genesis_block));
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank], 0)));
|
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank], 0)));
|
||||||
blocktree
|
blocktree.insert_data_blobs(blobs).unwrap();
|
||||||
.write_entries(slot, 0, 0, ticks_per_slot, &entries)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let cluster_info = test_cluster_info(&keypair.pubkey());
|
let cluster_info = test_cluster_info(&keypair.pubkey());
|
||||||
|
|
||||||
let (slot_sender, slot_receiver) = channel();
|
let (slot_sender, slot_receiver) = channel();
|
||||||
let storage_state = StorageState::new();
|
let storage_state = StorageState::new();
|
||||||
let storage_stage = StorageStage::new(
|
let storage_stage = StorageStage::new(
|
||||||
|
@ -624,9 +636,8 @@ mod tests {
|
||||||
|
|
||||||
let rooted_slots = (slot..slot + SLOTS_PER_SEGMENT + 1)
|
let rooted_slots = (slot..slot + SLOTS_PER_SEGMENT + 1)
|
||||||
.map(|i| {
|
.map(|i| {
|
||||||
blocktree
|
let (blobs, _entries) = make_slot_entries(i, i - 1, 64);
|
||||||
.write_entries(i, 0, 0, ticks_per_slot, &entries)
|
blocktree.insert_data_blobs(blobs).unwrap();
|
||||||
.unwrap();
|
|
||||||
i
|
i
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
@ -662,14 +673,12 @@ mod tests {
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000);
|
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000);
|
||||||
let ticks_per_slot = genesis_block.ticks_per_slot;;
|
|
||||||
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
|
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
|
||||||
|
|
||||||
let entries = make_tiny_test_entries(128);
|
|
||||||
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
|
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
|
||||||
blocktree
|
let (blobs, entries) = make_slot_entries(1, 0, 128);
|
||||||
.write_entries(1, 0, 0, ticks_per_slot, &entries)
|
blocktree.insert_data_blobs(blobs).unwrap();
|
||||||
.unwrap();
|
|
||||||
let bank = Arc::new(Bank::new(&genesis_block));
|
let bank = Arc::new(Bank::new(&genesis_block));
|
||||||
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank], 0)));
|
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank], 0)));
|
||||||
let cluster_info = test_cluster_info(&keypair.pubkey());
|
let cluster_info = test_cluster_info(&keypair.pubkey());
|
||||||
|
@ -707,9 +716,10 @@ mod tests {
|
||||||
let mining_proof_tx = Transaction::new_unsigned_instructions(vec![mining_proof_ix]);
|
let mining_proof_tx = Transaction::new_unsigned_instructions(vec![mining_proof_ix]);
|
||||||
let mining_txs = vec![mining_proof_tx];
|
let mining_txs = vec![mining_proof_tx];
|
||||||
|
|
||||||
let proof_entries = vec![Entry::new(&Hash::default(), 1, mining_txs)];
|
let next_hash = solana_sdk::hash::hash(entries.last().unwrap().hash.as_ref());
|
||||||
|
let proof_entry = Entry::new(&next_hash, 1, mining_txs);
|
||||||
blocktree
|
blocktree
|
||||||
.write_entries(2, 0, 0, ticks_per_slot, &proof_entries)
|
.insert_data_blobs(entries_to_blobs(&vec![proof_entry], 2, 1, true))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
slot_sender.send(vec![2]).unwrap();
|
slot_sender.send(vec![2]).unwrap();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue