Deshred blocks in parallel (#6461)
* Deshred in parallel * Add tests for corrupt slots and parallel deshred * Rename load_blocktree_entries to load_blocktree_entries_with_shred_count
This commit is contained in:
parent
8319fa05d0
commit
b38bf90de7
|
@ -298,7 +298,7 @@ mod test {
|
||||||
);
|
);
|
||||||
|
|
||||||
let blocktree = broadcast_service.blocktree;
|
let blocktree = broadcast_service.blocktree;
|
||||||
let (entries, _, _, _) = blocktree
|
let (entries, _) = blocktree
|
||||||
.get_slot_entries_with_shred_count(slot, 0)
|
.get_slot_entries_with_shred_count(slot, 0)
|
||||||
.expect("Expect entries to be present");
|
.expect("Expect entries to be present");
|
||||||
assert_eq!(entries.len(), max_tick_height as usize);
|
assert_eq!(entries.len(), max_tick_height as usize);
|
||||||
|
|
|
@ -415,25 +415,25 @@ impl ReplayStage {
|
||||||
.entry(bank.slot())
|
.entry(bank.slot())
|
||||||
.or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash()));
|
.or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash()));
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let load_result = Self::load_blocktree_entries(bank, blocktree, bank_progress);
|
let load_result =
|
||||||
|
Self::load_blocktree_entries_with_shred_count(bank, blocktree, bank_progress);
|
||||||
let fetch_entries_elapsed = now.elapsed().as_micros();
|
let fetch_entries_elapsed = now.elapsed().as_micros();
|
||||||
|
|
||||||
if load_result.is_err() {
|
if load_result.is_err() {
|
||||||
bank_progress.stats.fetch_entries_fail_elapsed += fetch_entries_elapsed as u64;
|
bank_progress.stats.fetch_entries_fail_elapsed += fetch_entries_elapsed as u64;
|
||||||
|
} else {
|
||||||
|
bank_progress.stats.fetch_entries_elapsed += fetch_entries_elapsed as u64;
|
||||||
}
|
}
|
||||||
let replay_result =
|
|
||||||
load_result.and_then(|(entries, num_shreds, useful_time, wasted_time)| {
|
let replay_result = load_result.and_then(|(entries, num_shreds)| {
|
||||||
trace!(
|
trace!(
|
||||||
"Fetch entries for slot {}, {:?} entries, num shreds {:?}",
|
"Fetch entries for slot {}, {:?} entries, num shreds {:?}",
|
||||||
bank.slot(),
|
bank.slot(),
|
||||||
entries.len(),
|
entries.len(),
|
||||||
num_shreds
|
num_shreds
|
||||||
);
|
);
|
||||||
tx_count += entries.iter().map(|e| e.transactions.len()).sum::<usize>();
|
tx_count += entries.iter().map(|e| e.transactions.len()).sum::<usize>();
|
||||||
bank_progress.stats.fetch_entries_elapsed += useful_time as u64;
|
Self::replay_entries_into_bank(bank, entries, bank_progress, num_shreds)
|
||||||
bank_progress.stats.fetch_entries_fail_elapsed += wasted_time as u64;
|
});
|
||||||
Self::replay_entries_into_bank(bank, entries, bank_progress, num_shreds)
|
|
||||||
});
|
|
||||||
|
|
||||||
if Self::is_replay_result_fatal(&replay_result) {
|
if Self::is_replay_result_fatal(&replay_result) {
|
||||||
warn!(
|
warn!(
|
||||||
|
@ -726,15 +726,15 @@ impl ReplayStage {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn load_blocktree_entries(
|
fn load_blocktree_entries_with_shred_count(
|
||||||
bank: &Bank,
|
bank: &Bank,
|
||||||
blocktree: &Blocktree,
|
blocktree: &Blocktree,
|
||||||
bank_progress: &mut ForkProgress,
|
bank_progress: &mut ForkProgress,
|
||||||
) -> Result<(Vec<Entry>, usize, u64, u64)> {
|
) -> Result<(Vec<Entry>, usize)> {
|
||||||
let bank_slot = bank.slot();
|
let bank_slot = bank.slot();
|
||||||
let entries_and_count = blocktree
|
let entries_and_shred_count = blocktree
|
||||||
.get_slot_entries_with_shred_count(bank_slot, bank_progress.num_shreds as u64)?;
|
.get_slot_entries_with_shred_count(bank_slot, bank_progress.num_shreds as u64)?;
|
||||||
Ok(entries_and_count)
|
Ok(entries_and_shred_count)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn replay_entries_into_bank(
|
fn replay_entries_into_bank(
|
||||||
|
@ -766,6 +766,7 @@ impl ReplayStage {
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let last_entry = &bank_progress.last_entry;
|
let last_entry = &bank_progress.last_entry;
|
||||||
|
datapoint_info!("verify-batch-size", ("size", entries.len() as i64, i64));
|
||||||
let verify_result = entries.verify(last_entry);
|
let verify_result = entries.verify(last_entry);
|
||||||
let verify_entries_elapsed = now.elapsed().as_micros();
|
let verify_entries_elapsed = now.elapsed().as_micros();
|
||||||
bank_progress.stats.entry_verification_elapsed += verify_entries_elapsed as u64;
|
bank_progress.stats.entry_verification_elapsed += verify_entries_elapsed as u64;
|
||||||
|
|
|
@ -7,11 +7,14 @@ use crate::shred::{Shred, Shredder};
|
||||||
|
|
||||||
use bincode::deserialize;
|
use bincode::deserialize;
|
||||||
|
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use log::*;
|
use log::*;
|
||||||
|
use rayon::iter::IntoParallelRefIterator;
|
||||||
|
use rayon::iter::ParallelIterator;
|
||||||
|
use rayon::ThreadPool;
|
||||||
|
use rocksdb;
|
||||||
|
|
||||||
use solana_metrics::{datapoint_debug, datapoint_error};
|
use solana_metrics::{datapoint_debug, datapoint_error};
|
||||||
|
use solana_rayon_threadlimit::get_thread_count;
|
||||||
|
|
||||||
use solana_sdk::genesis_block::GenesisBlock;
|
use solana_sdk::genesis_block::GenesisBlock;
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
|
@ -19,13 +22,13 @@ use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
|
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::result;
|
use std::result;
|
||||||
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError};
|
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::time::Instant;
|
|
||||||
|
|
||||||
pub use self::meta::*;
|
pub use self::meta::*;
|
||||||
use crate::leader_schedule_cache::LeaderScheduleCache;
|
use crate::leader_schedule_cache::LeaderScheduleCache;
|
||||||
|
@ -45,6 +48,11 @@ type BatchProcessor = db::BatchProcessor;
|
||||||
|
|
||||||
pub const BLOCKTREE_DIRECTORY: &str = "rocksdb";
|
pub const BLOCKTREE_DIRECTORY: &str = "rocksdb";
|
||||||
|
|
||||||
|
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
|
||||||
|
.num_threads(get_thread_count())
|
||||||
|
.build()
|
||||||
|
.unwrap()));
|
||||||
|
|
||||||
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
|
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
|
||||||
|
|
||||||
pub type SlotMetaWorkingSetEntry = (Rc<RefCell<SlotMeta>>, Option<SlotMeta>);
|
pub type SlotMetaWorkingSetEntry = (Rc<RefCell<SlotMeta>>, Option<SlotMeta>);
|
||||||
|
@ -727,6 +735,13 @@ impl Blocktree {
|
||||||
false
|
false
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let last_in_data = if shred.data_complete() {
|
||||||
|
debug!("got last in data");
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
|
||||||
if is_orphan(slot_meta) {
|
if is_orphan(slot_meta) {
|
||||||
slot_meta.parent_slot = parent;
|
slot_meta.parent_slot = parent;
|
||||||
}
|
}
|
||||||
|
@ -754,7 +769,13 @@ impl Blocktree {
|
||||||
// Commit step: commit all changes to the mutable structures at once, or none at all.
|
// Commit step: commit all changes to the mutable structures at once, or none at all.
|
||||||
// We don't want only a subset of these changes going through.
|
// We don't want only a subset of these changes going through.
|
||||||
write_batch.put_bytes::<cf::ShredData>((slot, index), &shred.payload)?;
|
write_batch.put_bytes::<cf::ShredData>((slot, index), &shred.payload)?;
|
||||||
update_slot_meta(last_in_slot, slot_meta, index, new_consumed);
|
update_slot_meta(
|
||||||
|
last_in_slot,
|
||||||
|
last_in_data,
|
||||||
|
slot_meta,
|
||||||
|
index as u32,
|
||||||
|
new_consumed,
|
||||||
|
);
|
||||||
data_index.set_present(index, true);
|
data_index.set_present(index, true);
|
||||||
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
|
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -991,81 +1012,111 @@ impl Blocktree {
|
||||||
pub fn get_slot_entries_with_shred_count(
|
pub fn get_slot_entries_with_shred_count(
|
||||||
&self,
|
&self,
|
||||||
slot: u64,
|
slot: u64,
|
||||||
mut start_index: u64,
|
start_index: u64,
|
||||||
) -> Result<(Vec<Entry>, usize, u64, u64)> {
|
) -> Result<(Vec<Entry>, usize)> {
|
||||||
let mut useful_time = 0;
|
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
|
||||||
let mut wasted_time = 0;
|
let slot_meta = slot_meta_cf.get(slot)?;
|
||||||
|
if slot_meta.is_none() {
|
||||||
let mut all_entries = vec![];
|
return Ok((vec![], 0));
|
||||||
let mut num_shreds = 0;
|
|
||||||
loop {
|
|
||||||
let now = Instant::now();
|
|
||||||
let mut res = self.get_entries_in_data_block(slot, &mut start_index);
|
|
||||||
let elapsed = now.elapsed().as_micros();
|
|
||||||
|
|
||||||
if let Ok((ref mut entries, new_num_shreds)) = res {
|
|
||||||
if !entries.is_empty() {
|
|
||||||
all_entries.append(entries);
|
|
||||||
num_shreds += new_num_shreds;
|
|
||||||
useful_time += elapsed;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// All unsuccessful cases (errors, incomplete data blocks) will count as wasted work
|
|
||||||
wasted_time += elapsed;
|
|
||||||
res?;
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("Found {:?} entries", all_entries.len());
|
let slot_meta = slot_meta.unwrap();
|
||||||
Ok((
|
// Find all the ranges for the completed data blocks
|
||||||
all_entries,
|
let completed_ranges = Self::get_completed_data_ranges(
|
||||||
num_shreds,
|
start_index as u32,
|
||||||
useful_time as u64,
|
&slot_meta.completed_data_indexes[..],
|
||||||
wasted_time as u64,
|
slot_meta.consumed as u32,
|
||||||
))
|
);
|
||||||
|
if completed_ranges.is_empty() {
|
||||||
|
return Ok((vec![], 0));
|
||||||
|
}
|
||||||
|
let num_shreds = completed_ranges
|
||||||
|
.last()
|
||||||
|
.map(|(_, end_index)| u64::from(*end_index) - start_index + 1);
|
||||||
|
|
||||||
|
let all_entries: Result<Vec<Vec<Entry>>> = PAR_THREAD_POOL.with(|thread_pool| {
|
||||||
|
thread_pool.borrow().install(|| {
|
||||||
|
completed_ranges
|
||||||
|
.par_iter()
|
||||||
|
.map(|(start_index, end_index)| {
|
||||||
|
self.get_entries_in_data_block(slot, *start_index, *end_index)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let all_entries: Vec<Entry> = all_entries?.into_iter().flatten().collect();
|
||||||
|
Ok((all_entries, num_shreds.unwrap_or(0) as usize))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_entries_in_data_block(
|
// Get the range of indexes [start_index, end_index] of every completed data block
|
||||||
&self,
|
fn get_completed_data_ranges(
|
||||||
slot: u64,
|
mut start_index: u32,
|
||||||
start_index: &mut u64,
|
completed_data_end_indexes: &[u32],
|
||||||
) -> Result<(Vec<Entry>, usize)> {
|
consumed: u32,
|
||||||
let mut shred_chunk: Vec<Shred> = vec![];
|
) -> Vec<(u32, u32)> {
|
||||||
let data_shred_cf = self.db.column::<cf::ShredData>();
|
let mut completed_data_ranges = vec![];
|
||||||
while let Some(serialized_shred) = data_shred_cf.get_bytes((slot, *start_index))? {
|
let floor = completed_data_end_indexes
|
||||||
*start_index += 1;
|
.iter()
|
||||||
let new_shred = Shred::new_from_serialized_shred(serialized_shred).ok();
|
.position(|i| *i >= start_index)
|
||||||
if let Some(shred) = new_shred {
|
.unwrap_or_else(|| completed_data_end_indexes.len());
|
||||||
let is_complete = shred.data_complete() || shred.last_in_slot();
|
|
||||||
shred_chunk.push(shred);
|
for i in &completed_data_end_indexes[floor as usize..] {
|
||||||
if is_complete {
|
// `consumed` is the next missing shred index, but shred `i` existing in
|
||||||
if let Ok(deshred_payload) = Shredder::deshred(&shred_chunk) {
|
// completed_data_end_indexes implies it's not missing
|
||||||
debug!("{:?} shreds in last FEC set", shred_chunk.len(),);
|
assert!(*i != consumed);
|
||||||
let entries: Vec<Entry> =
|
|
||||||
bincode::deserialize(&deshred_payload).map_err(|_| {
|
if *i < consumed {
|
||||||
BlocktreeError::InvalidShredData(Box::new(
|
completed_data_ranges.push((start_index, *i));
|
||||||
bincode::ErrorKind::Custom(
|
start_index = *i + 1;
|
||||||
"could not construct entries".to_string(),
|
|
||||||
),
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
return Ok((entries, shred_chunk.len()));
|
|
||||||
} else {
|
|
||||||
debug!("Failed in deshredding shred payloads");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Didn't find a valid shred, this slot is dead.
|
|
||||||
// TODO: Mark as dead, but have to carefully handle last shred of interrupted
|
|
||||||
// slots.
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((vec![], 0))
|
completed_data_ranges
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_entries_in_data_block(
|
||||||
|
&self,
|
||||||
|
slot: u64,
|
||||||
|
start_index: u32,
|
||||||
|
end_index: u32,
|
||||||
|
) -> Result<Vec<Entry>> {
|
||||||
|
let data_shred_cf = self.db.column::<cf::ShredData>();
|
||||||
|
|
||||||
|
// Short circuit on first error
|
||||||
|
let data_shreds: Result<Vec<Shred>> = (start_index..=end_index)
|
||||||
|
.map(|i| {
|
||||||
|
data_shred_cf
|
||||||
|
.get_bytes((slot, u64::from(i)))
|
||||||
|
.and_then(|serialized_shred| {
|
||||||
|
Shred::new_from_serialized_shred(
|
||||||
|
serialized_shred
|
||||||
|
.expect("Shred must exist if shred index was included in a range"),
|
||||||
|
)
|
||||||
|
.map_err(|_| {
|
||||||
|
BlocktreeError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
|
||||||
|
"Could not reconstruct shred from shred payload".to_string(),
|
||||||
|
)))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let data_shreds = data_shreds?;
|
||||||
|
assert!(data_shreds.last().unwrap().data_complete());
|
||||||
|
|
||||||
|
let deshred_payload = Shredder::deshred(&data_shreds).map_err(|_| {
|
||||||
|
BlocktreeError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
|
||||||
|
"Could not reconstruct data block from constituent shreds".to_string(),
|
||||||
|
)))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
debug!("{:?} shreds in last FEC set", data_shreds.len(),);
|
||||||
|
bincode::deserialize::<Vec<Entry>>(&deshred_payload).map_err(|_| {
|
||||||
|
BlocktreeError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
|
||||||
|
"could not reconstruct entries".to_string(),
|
||||||
|
)))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns slots connecting to any element of the list `slots`.
|
// Returns slots connecting to any element of the list `slots`.
|
||||||
|
@ -1198,20 +1249,21 @@ impl Blocktree {
|
||||||
|
|
||||||
fn update_slot_meta(
|
fn update_slot_meta(
|
||||||
is_last_in_slot: bool,
|
is_last_in_slot: bool,
|
||||||
|
is_last_in_data: bool,
|
||||||
slot_meta: &mut SlotMeta,
|
slot_meta: &mut SlotMeta,
|
||||||
index: u64,
|
index: u32,
|
||||||
new_consumed: u64,
|
new_consumed: u64,
|
||||||
) {
|
) {
|
||||||
// Index is zero-indexed, while the "received" height starts from 1,
|
// Index is zero-indexed, while the "received" height starts from 1,
|
||||||
// so received = index + 1 for the same shred.
|
// so received = index + 1 for the same shred.
|
||||||
slot_meta.received = cmp::max(index + 1, slot_meta.received);
|
slot_meta.received = cmp::max((u64::from(index) + 1) as u64, slot_meta.received);
|
||||||
slot_meta.consumed = new_consumed;
|
slot_meta.consumed = new_consumed;
|
||||||
slot_meta.last_index = {
|
slot_meta.last_index = {
|
||||||
// If the last index in the slot hasn't been set before, then
|
// If the last index in the slot hasn't been set before, then
|
||||||
// set it to this shred index
|
// set it to this shred index
|
||||||
if slot_meta.last_index == std::u64::MAX {
|
if slot_meta.last_index == std::u64::MAX {
|
||||||
if is_last_in_slot {
|
if is_last_in_slot {
|
||||||
index
|
u64::from(index)
|
||||||
} else {
|
} else {
|
||||||
std::u64::MAX
|
std::u64::MAX
|
||||||
}
|
}
|
||||||
|
@ -1219,6 +1271,16 @@ fn update_slot_meta(
|
||||||
slot_meta.last_index
|
slot_meta.last_index
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if is_last_in_slot || is_last_in_data {
|
||||||
|
let position = slot_meta
|
||||||
|
.completed_data_indexes
|
||||||
|
.iter()
|
||||||
|
.position(|completed_data_index| *completed_data_index > index)
|
||||||
|
.unwrap_or_else(|| slot_meta.completed_data_indexes.len());
|
||||||
|
|
||||||
|
slot_meta.completed_data_indexes.insert(position, index);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_index_meta_entry<'a>(
|
fn get_index_meta_entry<'a>(
|
||||||
|
@ -2045,7 +2107,8 @@ pub mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_insert_data_shreds_reverse() {
|
fn test_insert_data_shreds_reverse() {
|
||||||
let num_entries = 10;
|
let num_shreds = 10;
|
||||||
|
let num_entries = max_ticks_per_n_shreds(num_shreds);
|
||||||
let (mut shreds, entries) = make_slot_entries(0, 0, num_entries);
|
let (mut shreds, entries) = make_slot_entries(0, 0, num_entries);
|
||||||
let num_shreds = shreds.len() as u64;
|
let num_shreds = shreds.len() as u64;
|
||||||
|
|
||||||
|
@ -2938,7 +3001,7 @@ pub mod tests {
|
||||||
shred
|
shred
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.for_each(|(i, shred)| shred.set_index(slot as u32 + i as u32));
|
.for_each(|(_, shred)| shred.set_index(0));
|
||||||
shreds.extend(shred);
|
shreds.extend(shred);
|
||||||
entries.extend(entry);
|
entries.extend(entry);
|
||||||
}
|
}
|
||||||
|
@ -2956,16 +3019,16 @@ pub mod tests {
|
||||||
|
|
||||||
for i in 0..num_entries - 1 {
|
for i in 0..num_entries - 1 {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
blocktree.get_slot_entries(i, i, None).unwrap()[0],
|
blocktree.get_slot_entries(i, 0, None).unwrap()[0],
|
||||||
entries[i as usize]
|
entries[i as usize]
|
||||||
);
|
);
|
||||||
|
|
||||||
let meta = blocktree.meta(i).unwrap().unwrap();
|
let meta = blocktree.meta(i).unwrap().unwrap();
|
||||||
assert_eq!(meta.received, i + num_shreds_per_slot);
|
assert_eq!(meta.received, 1);
|
||||||
assert_eq!(meta.last_index, i + num_shreds_per_slot - 1);
|
assert_eq!(meta.last_index, 0);
|
||||||
if i != 0 {
|
if i != 0 {
|
||||||
assert_eq!(meta.parent_slot, i - 1);
|
assert_eq!(meta.parent_slot, i - 1);
|
||||||
assert_eq!(meta.consumed, 0);
|
assert_eq!(meta.consumed, 1);
|
||||||
} else {
|
} else {
|
||||||
assert_eq!(meta.parent_slot, 0);
|
assert_eq!(meta.parent_slot, 0);
|
||||||
assert_eq!(meta.consumed, num_shreds_per_slot);
|
assert_eq!(meta.consumed, num_shreds_per_slot);
|
||||||
|
@ -3567,4 +3630,102 @@ pub mod tests {
|
||||||
drop(blocktree);
|
drop(blocktree);
|
||||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_get_completed_data_ranges() {
|
||||||
|
let completed_data_end_indexes = vec![2, 4, 9, 11];
|
||||||
|
|
||||||
|
// Consumed is 1, which means we're missing shred with index 1, should return empty
|
||||||
|
let start_index = 0;
|
||||||
|
let consumed = 1;
|
||||||
|
assert_eq!(
|
||||||
|
Blocktree::get_completed_data_ranges(
|
||||||
|
start_index,
|
||||||
|
&completed_data_end_indexes[..],
|
||||||
|
consumed
|
||||||
|
),
|
||||||
|
vec![]
|
||||||
|
);
|
||||||
|
|
||||||
|
let start_index = 0;
|
||||||
|
let consumed = 3;
|
||||||
|
assert_eq!(
|
||||||
|
Blocktree::get_completed_data_ranges(
|
||||||
|
start_index,
|
||||||
|
&completed_data_end_indexes[..],
|
||||||
|
consumed
|
||||||
|
),
|
||||||
|
vec![(0, 2)]
|
||||||
|
);
|
||||||
|
|
||||||
|
// Test all possible ranges:
|
||||||
|
//
|
||||||
|
// `consumed == completed_data_end_indexes[j] + 1`, means we have all the shreds up to index
|
||||||
|
// `completed_data_end_indexes[j] + 1`. Thus the completed data blocks is everything in the
|
||||||
|
// range:
|
||||||
|
// [start_index, completed_data_end_indexes[j]] ==
|
||||||
|
// [completed_data_end_indexes[i], completed_data_end_indexes[j]],
|
||||||
|
for i in 0..completed_data_end_indexes.len() {
|
||||||
|
for j in i..completed_data_end_indexes.len() {
|
||||||
|
let start_index = completed_data_end_indexes[i];
|
||||||
|
let consumed = completed_data_end_indexes[j] + 1;
|
||||||
|
// When start_index == completed_data_end_indexes[i], then that means
|
||||||
|
// the shred with index == start_index is a single-shred data block,
|
||||||
|
// so the start index is the end index for that data block.
|
||||||
|
let mut expected = vec![(start_index, start_index)];
|
||||||
|
expected.extend(
|
||||||
|
completed_data_end_indexes[i..=j]
|
||||||
|
.windows(2)
|
||||||
|
.map(|end_indexes| (end_indexes[0] + 1, end_indexes[1])),
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
Blocktree::get_completed_data_ranges(
|
||||||
|
start_index,
|
||||||
|
&completed_data_end_indexes[..],
|
||||||
|
consumed
|
||||||
|
),
|
||||||
|
expected
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_get_slot_entries_with_shred_count_corruption() {
|
||||||
|
let blocktree_path =
|
||||||
|
get_tmp_ledger_path("test_get_slot_entries_with_shred_count_corruption");
|
||||||
|
{
|
||||||
|
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
||||||
|
let num_ticks = 8;
|
||||||
|
let entries = create_ticks(num_ticks, Hash::default());
|
||||||
|
let slot = 1;
|
||||||
|
let shreds = entries_to_test_shreds(entries, slot, 0, false);
|
||||||
|
let next_shred_index = shreds.len();
|
||||||
|
blocktree
|
||||||
|
.insert_shreds(shreds, None)
|
||||||
|
.expect("Expected successful write of shreds");
|
||||||
|
assert_eq!(
|
||||||
|
blocktree.get_slot_entries(slot, 0, None).unwrap().len() as u64,
|
||||||
|
num_ticks
|
||||||
|
);
|
||||||
|
|
||||||
|
// Insert an empty shred that won't deshred into entries
|
||||||
|
let shreds = vec![Shred::new_from_data(
|
||||||
|
slot,
|
||||||
|
next_shred_index as u32,
|
||||||
|
1,
|
||||||
|
None,
|
||||||
|
true,
|
||||||
|
true,
|
||||||
|
)];
|
||||||
|
|
||||||
|
// With the corruption, nothing should be returned, even though an
|
||||||
|
// earlier data block was valid
|
||||||
|
blocktree
|
||||||
|
.insert_shreds(shreds, None)
|
||||||
|
.expect("Expected successful write of shreds");
|
||||||
|
assert!(blocktree.get_slot_entries(slot, 0, None).is_err());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,8 @@ pub struct SlotMeta {
|
||||||
// True if this slot is full (consumed == last_index + 1) and if every
|
// True if this slot is full (consumed == last_index + 1) and if every
|
||||||
// slot that is a parent of this slot is also connected.
|
// slot that is a parent of this slot is also connected.
|
||||||
pub is_connected: bool,
|
pub is_connected: bool,
|
||||||
|
// List of start indexes for completed data slots
|
||||||
|
pub completed_data_indexes: Vec<u32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
|
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
|
||||||
|
@ -227,6 +229,7 @@ impl SlotMeta {
|
||||||
next_slots: vec![],
|
next_slots: vec![],
|
||||||
is_connected: slot == 0,
|
is_connected: slot == 0,
|
||||||
last_index: std::u64::MAX,
|
last_index: std::u64::MAX,
|
||||||
|
completed_data_indexes: vec![],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue