Blocktree last_root to enforce a slot floor (#5593)
* Add last_root to blocktree * Don't repair earlier than last_root * Add integration test to make sure blocktree floor is enforced
This commit is contained in:
parent
362a39a941
commit
8b9c3a2561
|
@ -47,8 +47,7 @@ macro_rules! db_imports {
|
|||
mod $mod;
|
||||
|
||||
use $mod::$db;
|
||||
use db::columns as cf;
|
||||
|
||||
use db::{columns as cf, IteratorMode, IteratorDirection};
|
||||
pub use db::columns;
|
||||
|
||||
pub type Database = db::Database<$db>;
|
||||
|
@ -96,6 +95,7 @@ pub struct Blocktree {
|
|||
data_shred_cf: LedgerColumn<cf::ShredData>,
|
||||
code_shred_cf: LedgerColumn<cf::ShredCode>,
|
||||
batch_processor: Arc<RwLock<BatchProcessor>>,
|
||||
last_root: Arc<RwLock<u64>>,
|
||||
pub new_blobs_signals: Vec<SyncSender<bool>>,
|
||||
pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>,
|
||||
}
|
||||
|
@ -156,6 +156,14 @@ impl Blocktree {
|
|||
|
||||
let db = Arc::new(db);
|
||||
|
||||
// Get max root or 0 if it doesn't exist
|
||||
let max_root = db
|
||||
.iter::<cf::Root>(IteratorMode::End)?
|
||||
.next()
|
||||
.map(|(slot, _)| slot)
|
||||
.unwrap_or(0);
|
||||
let last_root = Arc::new(RwLock::new(max_root));
|
||||
|
||||
Ok(Blocktree {
|
||||
db,
|
||||
meta_cf,
|
||||
|
@ -170,6 +178,7 @@ impl Blocktree {
|
|||
new_blobs_signals: vec![],
|
||||
batch_processor,
|
||||
completed_slots_senders: vec![],
|
||||
last_root,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -308,7 +317,9 @@ impl Blocktree {
|
|||
}
|
||||
|
||||
pub fn slot_meta_iterator(&self, slot: u64) -> Result<impl Iterator<Item = (u64, SlotMeta)>> {
|
||||
let meta_iter = self.db.iter::<cf::SlotMeta>(Some(slot))?;
|
||||
let meta_iter = self
|
||||
.db
|
||||
.iter::<cf::SlotMeta>(IteratorMode::From(slot, IteratorDirection::Forward))?;
|
||||
Ok(meta_iter.map(|(slot, slot_meta_bytes)| {
|
||||
(
|
||||
slot,
|
||||
|
@ -322,7 +333,9 @@ impl Blocktree {
|
|||
&self,
|
||||
slot: u64,
|
||||
) -> Result<impl Iterator<Item = ((u64, u64), Box<[u8]>)>> {
|
||||
let slot_iterator = self.db.iter::<cf::Data>(Some((slot, 0)))?;
|
||||
let slot_iterator = self
|
||||
.db
|
||||
.iter::<cf::Data>(IteratorMode::From((slot, 0), IteratorDirection::Forward))?;
|
||||
Ok(slot_iterator.take_while(move |((blob_slot, _), _)| *blob_slot == slot))
|
||||
}
|
||||
|
||||
|
@ -463,8 +476,8 @@ impl Blocktree {
|
|||
&mut write_batch,
|
||||
)
|
||||
}
|
||||
} else if Blocktree::insert_data_shred(
|
||||
db,
|
||||
} else if self
|
||||
.insert_data_shred(
|
||||
&mut slot_meta_working_set,
|
||||
&mut index_working_set,
|
||||
&shred,
|
||||
|
@ -485,8 +498,7 @@ impl Blocktree {
|
|||
);
|
||||
|
||||
recovered_data.into_iter().for_each(|shred| {
|
||||
let _ = Blocktree::insert_data_shred(
|
||||
db,
|
||||
let _ = self.insert_data_shred(
|
||||
&mut slot_meta_working_set,
|
||||
&mut index_working_set,
|
||||
&shred,
|
||||
|
@ -495,7 +507,7 @@ impl Blocktree {
|
|||
});
|
||||
|
||||
// Handle chaining for the working set
|
||||
handle_chaining(&db, &mut write_batch, &slot_meta_working_set)?;
|
||||
handle_chaining(&self.db, &mut write_batch, &slot_meta_working_set)?;
|
||||
|
||||
let (should_signal, newly_completed_slots) = prepare_signals(
|
||||
&slot_meta_working_set,
|
||||
|
@ -575,7 +587,7 @@ impl Blocktree {
|
|||
}
|
||||
|
||||
fn insert_data_shred(
|
||||
db: &Database,
|
||||
&self,
|
||||
mut slot_meta_working_set: &mut HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
|
||||
index_working_set: &mut HashMap<u64, Index>,
|
||||
shred: &Shred,
|
||||
|
@ -592,14 +604,14 @@ impl Blocktree {
|
|||
false
|
||||
};
|
||||
|
||||
let entry = get_slot_meta_entry(db, &mut slot_meta_working_set, slot, parent);
|
||||
let entry = get_slot_meta_entry(&self.db, &mut slot_meta_working_set, slot, parent);
|
||||
|
||||
let slot_meta = &mut entry.0.borrow_mut();
|
||||
if is_orphan(slot_meta) {
|
||||
slot_meta.parent_slot = parent;
|
||||
}
|
||||
|
||||
let data_cf = db.column::<cf::ShredData>();
|
||||
let data_cf = self.db.column::<cf::ShredData>();
|
||||
|
||||
let check_data_cf = |slot, index| {
|
||||
data_cf
|
||||
|
@ -614,7 +626,14 @@ impl Blocktree {
|
|||
.data_mut();
|
||||
|
||||
if !index_meta.is_present(index)
|
||||
&& should_insert(slot_meta, index, slot, last_in_slot, check_data_cf)
|
||||
&& should_insert(
|
||||
slot_meta,
|
||||
index,
|
||||
slot,
|
||||
last_in_slot,
|
||||
check_data_cf,
|
||||
*self.last_root.read().unwrap(),
|
||||
)
|
||||
{
|
||||
let new_consumed = if slot_meta.consumed == index {
|
||||
let mut current_index = index + 1;
|
||||
|
@ -628,8 +647,10 @@ impl Blocktree {
|
|||
};
|
||||
|
||||
let serialized_shred = bincode::serialize(shred).unwrap();
|
||||
write_batch.put_bytes::<cf::ShredData>((slot, index), &serialized_shred)?;
|
||||
|
||||
// Commit step: commit all changes to the mutable structures at once, or none at all.
|
||||
// We don't want only a subset of these changes going through.
|
||||
write_batch.put_bytes::<cf::ShredData>((slot, index), &serialized_shred)?;
|
||||
update_slot_meta(last_in_slot, slot_meta, index, new_consumed);
|
||||
index_meta.set_present(index, true);
|
||||
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
|
||||
|
@ -884,6 +905,7 @@ impl Blocktree {
|
|||
&mut index_working_set,
|
||||
&mut prev_inserted_blob_datas,
|
||||
&mut write_batch,
|
||||
*self.last_root.read().unwrap(),
|
||||
)?;
|
||||
} else {
|
||||
insert_data_blob_batch(
|
||||
|
@ -893,6 +915,7 @@ impl Blocktree {
|
|||
&mut index_working_set,
|
||||
&mut prev_inserted_blob_datas,
|
||||
&mut write_batch,
|
||||
*self.last_root.read().unwrap(),
|
||||
)?;
|
||||
}
|
||||
|
||||
|
@ -1134,6 +1157,7 @@ impl Blocktree {
|
|||
&mut index_working_set,
|
||||
&mut prev_inserted_blob_datas,
|
||||
&mut writebatch,
|
||||
*self.last_root.read().unwrap(),
|
||||
)?;
|
||||
|
||||
// Handle chaining for the working set
|
||||
|
@ -1433,6 +1457,12 @@ impl Blocktree {
|
|||
|
||||
batch_processor.write(write_batch)?;
|
||||
}
|
||||
|
||||
let mut last_root = self.last_root.write().unwrap();
|
||||
if *last_root == std::u64::MAX {
|
||||
*last_root = 0;
|
||||
}
|
||||
*last_root = cmp::max(*rooted_slots.iter().max().unwrap(), *last_root);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -1469,29 +1499,6 @@ impl Blocktree {
|
|||
results
|
||||
}
|
||||
|
||||
// Handle special case of writing genesis blobs. For instance, the first two entries
|
||||
// don't count as ticks, even if they're empty entries
|
||||
fn write_genesis_blobs(&self, blobs: &[Blob]) -> Result<()> {
|
||||
// TODO: change bootstrap height to number of slots
|
||||
let mut bootstrap_meta = SlotMeta::new(0, 1);
|
||||
let last = blobs.last().unwrap();
|
||||
|
||||
let mut batch_processor = self.batch_processor.write().unwrap();
|
||||
|
||||
bootstrap_meta.consumed = last.index() + 1;
|
||||
bootstrap_meta.received = last.index() + 1;
|
||||
bootstrap_meta.is_connected = true;
|
||||
|
||||
let mut batch = batch_processor.batch()?;
|
||||
batch.put::<cf::SlotMeta>(0, &bootstrap_meta)?;
|
||||
for blob in blobs {
|
||||
let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()];
|
||||
batch.put_bytes::<cf::Data>((blob.slot(), blob.index()), serialized_blob_datas)?;
|
||||
}
|
||||
batch_processor.write(batch)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prune blocktree such that slots higher than `target_slot` are deleted and all references to
|
||||
/// higher slots are removed
|
||||
pub fn prune(&self, target_slot: u64) {
|
||||
|
@ -1524,6 +1531,10 @@ impl Blocktree {
|
|||
.expect("couldn't update meta");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn last_root(&self) -> u64 {
|
||||
*self.last_root.read().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_data_blob_batch<'a, I>(
|
||||
|
@ -1533,6 +1544,7 @@ fn insert_data_blob_batch<'a, I>(
|
|||
index_working_set: &mut HashMap<u64, Index>,
|
||||
prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>,
|
||||
write_batch: &mut WriteBatch,
|
||||
last_root: u64,
|
||||
) -> Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = &'a Blob>,
|
||||
|
@ -1544,6 +1556,7 @@ where
|
|||
slot_meta_working_set,
|
||||
prev_inserted_blob_datas,
|
||||
write_batch,
|
||||
last_root,
|
||||
);
|
||||
|
||||
if inserted {
|
||||
|
@ -1589,7 +1602,7 @@ fn insert_data_blob<'a>(
|
|||
let serialized_blob_data = &blob_to_insert.data[..BLOB_HEADER_SIZE + blob_size];
|
||||
|
||||
// Commit step: commit all changes to the mutable structures at once, or none at all.
|
||||
// We don't want only some of these changes going through.
|
||||
// We don't want only a subset of these changes going through.
|
||||
write_batch.put_bytes::<cf::Data>((blob_slot, blob_index), serialized_blob_data)?;
|
||||
prev_inserted_blob_datas.insert((blob_slot, blob_index), serialized_blob_data);
|
||||
update_slot_meta(
|
||||
|
@ -1660,6 +1673,7 @@ fn check_insert_data_blob<'a>(
|
|||
slot_meta_working_set: &mut HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
|
||||
prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>,
|
||||
write_batch: &mut WriteBatch,
|
||||
last_root: u64,
|
||||
) -> bool {
|
||||
let entry = get_slot_meta_entry(db, slot_meta_working_set, blob.slot(), blob.parent());
|
||||
|
||||
|
@ -1670,7 +1684,7 @@ fn check_insert_data_blob<'a>(
|
|||
|
||||
// This slot is full, skip the bogus blob
|
||||
// Check if this blob should be inserted
|
||||
if !should_insert_blob(&slot_meta, db, &prev_inserted_blob_datas, blob) {
|
||||
if !should_insert_blob(&slot_meta, db, &prev_inserted_blob_datas, blob, last_root) {
|
||||
false
|
||||
} else {
|
||||
let _ = insert_data_blob(blob, db, prev_inserted_blob_datas, slot_meta, write_batch);
|
||||
|
@ -1714,6 +1728,7 @@ fn should_insert_blob(
|
|||
db: &Database,
|
||||
prev_inserted_blob_datas: &HashMap<(u64, u64), &[u8]>,
|
||||
blob: &Blob,
|
||||
last_slot: u64,
|
||||
) -> bool {
|
||||
let blob_index = blob.index();
|
||||
let blob_slot = blob.slot();
|
||||
|
@ -1728,7 +1743,14 @@ fn should_insert_blob(
|
|||
};
|
||||
|
||||
!prev_inserted_blob_datas.contains_key(&(blob_slot, blob_index))
|
||||
&& should_insert(slot, blob_index, blob_slot, last_in_slot, check_data_cf)
|
||||
&& should_insert(
|
||||
slot,
|
||||
blob_index,
|
||||
blob_slot,
|
||||
last_in_slot,
|
||||
check_data_cf,
|
||||
last_slot,
|
||||
)
|
||||
}
|
||||
|
||||
fn should_insert<F>(
|
||||
|
@ -1737,6 +1759,7 @@ fn should_insert<F>(
|
|||
slot: u64,
|
||||
last_in_slot: bool,
|
||||
db_check: F,
|
||||
last_root: u64,
|
||||
) -> bool
|
||||
where
|
||||
F: Fn(u64, u64) -> bool,
|
||||
|
@ -1775,9 +1798,42 @@ where
|
|||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
if !is_valid_write_to_slot_0(slot, slot_meta.parent_slot, last_root) {
|
||||
// Check that the parent_slot < slot
|
||||
if slot_meta.parent_slot >= slot {
|
||||
datapoint_error!(
|
||||
"blocktree_error",
|
||||
(
|
||||
"error",
|
||||
format!(
|
||||
"Received blob with parent_slot {} >= slot {}",
|
||||
slot_meta.parent_slot, slot
|
||||
),
|
||||
String
|
||||
)
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check that the blob is for a slot that is past the root
|
||||
if slot <= last_root {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Ignore blobs that chain to slots before the last root
|
||||
if slot_meta.parent_slot < last_root {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
fn is_valid_write_to_slot_0(slot_to_write: u64, parent_slot: u64, last_root: u64) -> bool {
|
||||
slot_to_write == 0 && last_root == 0 && parent_slot == 0
|
||||
}
|
||||
|
||||
fn send_signals(
|
||||
new_blobs_signals: &[SyncSender<bool>],
|
||||
completed_slots_senders: &[SyncSender<Vec<u64>>],
|
||||
|
@ -2464,33 +2520,11 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_block: &GenesisBlock) -> Re
|
|||
.collect();
|
||||
|
||||
blocktree.insert_shreds(shreds)?;
|
||||
blocktree.set_roots(&[0])?;
|
||||
|
||||
Ok(last_hash)
|
||||
}
|
||||
|
||||
pub fn genesis<'a, I>(ledger_path: &Path, keypair: &Keypair, entries: I) -> Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = &'a Entry>,
|
||||
{
|
||||
let blocktree = Blocktree::open(ledger_path)?;
|
||||
|
||||
// TODO sign these blobs with keypair
|
||||
let blobs: Vec<_> = entries
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(idx, entry)| {
|
||||
let mut b = entry.borrow().to_blob();
|
||||
b.set_index(idx as u64);
|
||||
b.set_id(&keypair.pubkey());
|
||||
b.set_slot(0);
|
||||
b
|
||||
})
|
||||
.collect();
|
||||
|
||||
blocktree.write_genesis_blobs(&blobs[..])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! tmp_ledger_name {
|
||||
() => {
|
||||
|
@ -3909,7 +3943,8 @@ pub mod tests {
|
|||
&slot_meta,
|
||||
&blocktree.db,
|
||||
&HashMap::new(),
|
||||
&blobs[4].clone()
|
||||
&blobs[4].clone(),
|
||||
0
|
||||
));
|
||||
|
||||
// Trying to insert the same blob again should fail
|
||||
|
@ -3919,7 +3954,8 @@ pub mod tests {
|
|||
&slot_meta,
|
||||
&blocktree.db,
|
||||
&HashMap::new(),
|
||||
&blobs[7].clone()
|
||||
&blobs[7].clone(),
|
||||
0
|
||||
));
|
||||
|
||||
// Trying to insert another "is_last" blob with index < the received index
|
||||
|
@ -3932,7 +3968,8 @@ pub mod tests {
|
|||
&slot_meta,
|
||||
&blocktree.db,
|
||||
&HashMap::new(),
|
||||
&blobs[8].clone()
|
||||
&blobs[8].clone(),
|
||||
0
|
||||
));
|
||||
|
||||
// Insert the 10th blob, which is marked as "is_last"
|
||||
|
@ -3945,7 +3982,8 @@ pub mod tests {
|
|||
&slot_meta,
|
||||
&blocktree.db,
|
||||
&HashMap::new(),
|
||||
&blobs[10].clone()
|
||||
&blobs[10].clone(),
|
||||
0
|
||||
));
|
||||
|
||||
drop(blocktree);
|
||||
|
@ -4009,9 +4047,12 @@ pub mod tests {
|
|||
let blocktree_path = get_tmp_ledger_path!();
|
||||
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
||||
let chained_slots = vec![0, 2, 4, 7, 12, 15];
|
||||
assert_eq!(blocktree.last_root(), 0);
|
||||
|
||||
blocktree.set_roots(&chained_slots).unwrap();
|
||||
|
||||
assert_eq!(blocktree.last_root(), 15);
|
||||
|
||||
for i in chained_slots {
|
||||
assert!(blocktree.is_root(i));
|
||||
}
|
||||
|
@ -4041,7 +4082,10 @@ pub mod tests {
|
|||
assert_eq!(meta.last_index, 5)
|
||||
});
|
||||
|
||||
let data_iter = blocktree.data_cf.iter(Some((0, 0))).unwrap();
|
||||
let data_iter = blocktree
|
||||
.data_cf
|
||||
.iter(IteratorMode::From((0, 0), IteratorDirection::Forward))
|
||||
.unwrap();
|
||||
for ((slot, _), _) in data_iter {
|
||||
if slot > 5 {
|
||||
assert!(false);
|
||||
|
@ -4855,10 +4899,8 @@ pub mod tests {
|
|||
let mut slots_shreds_and_entries = vec![];
|
||||
for (i, slot) in chain.iter().enumerate() {
|
||||
let parent_slot = {
|
||||
if *slot == 0 {
|
||||
if *slot == 0 || i == 0 {
|
||||
0
|
||||
} else if i == 0 {
|
||||
std::u64::MAX
|
||||
} else {
|
||||
chain[i - 1]
|
||||
}
|
||||
|
@ -4907,10 +4949,8 @@ pub mod tests {
|
|||
let mut slots_blobs_and_entries = vec![];
|
||||
for (i, slot) in chain.iter().enumerate() {
|
||||
let parent_slot = {
|
||||
if *slot == 0 {
|
||||
if *slot == 0 || i == 0 {
|
||||
0
|
||||
} else if i == 0 {
|
||||
std::u64::MAX
|
||||
} else {
|
||||
chain[i - 1]
|
||||
}
|
||||
|
|
|
@ -12,6 +12,17 @@ use std::marker::PhantomData;
|
|||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub enum IteratorMode<Index> {
|
||||
Start,
|
||||
End,
|
||||
From(Index, IteratorDirection),
|
||||
}
|
||||
|
||||
pub enum IteratorDirection {
|
||||
Forward,
|
||||
Reverse,
|
||||
}
|
||||
|
||||
pub mod columns {
|
||||
#[derive(Debug)]
|
||||
/// SlotMeta Column
|
||||
|
@ -77,7 +88,11 @@ pub trait Backend: Sized + Send + Sync {
|
|||
|
||||
fn delete_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result<()>;
|
||||
|
||||
fn iterator_cf(&self, cf: Self::ColumnFamily, from: Option<&Self::Key>) -> Result<Self::Iter>;
|
||||
fn iterator_cf(
|
||||
&self,
|
||||
cf: Self::ColumnFamily,
|
||||
iterator_mode: IteratorMode<&Self::Key>,
|
||||
) -> Result<Self::Iter>;
|
||||
|
||||
fn raw_iterator_cf(&self, cf: Self::ColumnFamily) -> Result<Self::Cursor>;
|
||||
|
||||
|
@ -262,18 +277,26 @@ where
|
|||
|
||||
pub fn iter<C>(
|
||||
&self,
|
||||
start_from: Option<C::Index>,
|
||||
iterator_mode: IteratorMode<C::Index>,
|
||||
) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)>>
|
||||
where
|
||||
C: Column<B>,
|
||||
{
|
||||
let iter = {
|
||||
if let Some(index) = start_from {
|
||||
let key = C::key(index);
|
||||
self.backend
|
||||
.iterator_cf(self.cf_handle::<C>(), Some(key.borrow()))?
|
||||
} else {
|
||||
self.backend.iterator_cf(self.cf_handle::<C>(), None)?
|
||||
match iterator_mode {
|
||||
IteratorMode::From(start_from, direction) => {
|
||||
let key = C::key(start_from);
|
||||
self.backend.iterator_cf(
|
||||
self.cf_handle::<C>(),
|
||||
IteratorMode::From(key.borrow(), direction),
|
||||
)?
|
||||
}
|
||||
IteratorMode::Start => self
|
||||
.backend
|
||||
.iterator_cf(self.cf_handle::<C>(), IteratorMode::Start)?,
|
||||
IteratorMode::End => self
|
||||
.backend
|
||||
.iterator_cf(self.cf_handle::<C>(), IteratorMode::End)?,
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -405,15 +428,19 @@ where
|
|||
|
||||
pub fn iter(
|
||||
&self,
|
||||
start_from: Option<C::Index>,
|
||||
iterator_mode: IteratorMode<C::Index>,
|
||||
) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)>> {
|
||||
let iter = {
|
||||
if let Some(index) = start_from {
|
||||
let key = C::key(index);
|
||||
match iterator_mode {
|
||||
IteratorMode::From(start_from, direction) => {
|
||||
let key = C::key(start_from);
|
||||
self.backend
|
||||
.iterator_cf(self.handle(), Some(key.borrow()))?
|
||||
} else {
|
||||
self.backend.iterator_cf(self.handle(), None)?
|
||||
.iterator_cf(self.handle(), IteratorMode::From(key.borrow(), direction))?
|
||||
}
|
||||
IteratorMode::Start => self
|
||||
.backend
|
||||
.iterator_cf(self.handle(), IteratorMode::Start)?,
|
||||
IteratorMode::End => self.backend.iterator_cf(self.handle(), IteratorMode::End)?,
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -430,7 +457,11 @@ where
|
|||
C::Index: PartialOrd + Copy,
|
||||
{
|
||||
let mut end = true;
|
||||
let iter = self.iter(from.map(C::as_index))?;
|
||||
let iter_config = match from {
|
||||
Some(s) => IteratorMode::From(C::as_index(s), IteratorDirection::Forward),
|
||||
None => IteratorMode::Start,
|
||||
};
|
||||
let iter = self.iter(iter_config)?;
|
||||
for (index, _) in iter {
|
||||
if let Some(to) = to {
|
||||
if C::slot(index) > to {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::blocktree::db::columns as cf;
|
||||
use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn};
|
||||
use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn, IteratorMode, IteratorDirection};
|
||||
use crate::blocktree::BlocktreeError;
|
||||
use crate::result::{Error, Result};
|
||||
use solana_sdk::timing::Slot;
|
||||
|
@ -7,7 +7,7 @@ use solana_sdk::timing::Slot;
|
|||
use byteorder::{BigEndian, ByteOrder};
|
||||
|
||||
use rocksdb::{
|
||||
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction, IteratorMode,
|
||||
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, Direction, IteratorMode as RocksIteratorMode,
|
||||
Options, WriteBatch as RWriteBatch, DB,
|
||||
};
|
||||
|
||||
|
@ -130,13 +130,27 @@ impl Backend for Rocks {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn iterator_cf(&self, cf: ColumnFamily, start_from: Option<&[u8]>) -> Result<DBIterator> {
|
||||
fn iterator_cf(&self, cf: ColumnFamily, iterator_mode: IteratorMode<&[u8]>,) -> Result<DBIterator> {
|
||||
let iter = {
|
||||
if let Some(start_from) = start_from {
|
||||
match iterator_mode {
|
||||
IteratorMode::Start => {
|
||||
self.0.iterator_cf(cf, RocksIteratorMode::Start)?
|
||||
}
|
||||
IteratorMode::End => {
|
||||
self.0.iterator_cf(cf, RocksIteratorMode::End)?
|
||||
}
|
||||
IteratorMode::From(start_from, direction) => {
|
||||
let rocks_direction = match direction {
|
||||
IteratorDirection::Forward => {
|
||||
Direction::Forward
|
||||
}
|
||||
IteratorDirection::Reverse => {
|
||||
Direction::Reverse
|
||||
}
|
||||
};
|
||||
self.0
|
||||
.iterator_cf(cf, IteratorMode::From(start_from, Direction::Forward))?
|
||||
} else {
|
||||
self.0.iterator_cf(cf, IteratorMode::Start)?
|
||||
.iterator_cf(cf, RocksIteratorMode::From(start_from, rocks_direction))?
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -544,7 +544,7 @@ pub mod tests {
|
|||
info!("last_fork1_entry.hash: {:?}", last_fork1_entry_hash);
|
||||
info!("last_fork2_entry.hash: {:?}", last_fork2_entry_hash);
|
||||
|
||||
blocktree.set_roots(&[4, 1, 0]).unwrap();
|
||||
blocktree.set_roots(&[0, 1, 4]).unwrap();
|
||||
|
||||
let (bank_forks, bank_forks_info, _) =
|
||||
process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap();
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
//! The `repair_service` module implements the tools necessary to generate a thread which
|
||||
//! regularly finds missing blobs in the ledger and sends repair requests for those blobs
|
||||
|
||||
use crate::bank_forks::BankForks;
|
||||
use crate::blocktree::{Blocktree, CompletedSlotsReceiver, SlotMeta};
|
||||
use crate::cluster_info::ClusterInfo;
|
||||
|
@ -111,12 +110,10 @@ impl RepairService {
|
|||
let id = cluster_info.read().unwrap().id();
|
||||
let mut current_root = 0;
|
||||
if let RepairStrategy::RepairAll {
|
||||
ref bank_forks,
|
||||
ref epoch_schedule,
|
||||
..
|
||||
ref epoch_schedule, ..
|
||||
} = repair_strategy
|
||||
{
|
||||
current_root = bank_forks.read().unwrap().root();
|
||||
current_root = blocktree.last_root();
|
||||
Self::initialize_epoch_slots(
|
||||
id,
|
||||
blocktree,
|
||||
|
@ -143,11 +140,10 @@ impl RepairService {
|
|||
}
|
||||
|
||||
RepairStrategy::RepairAll {
|
||||
ref bank_forks,
|
||||
ref completed_slots_receiver,
|
||||
..
|
||||
} => {
|
||||
let new_root = bank_forks.read().unwrap().root();
|
||||
let new_root = blocktree.last_root();
|
||||
Self::update_epoch_slots(
|
||||
id,
|
||||
new_root,
|
||||
|
@ -239,7 +235,8 @@ impl RepairService {
|
|||
// TODO: Incorporate gossip to determine priorities for repair?
|
||||
|
||||
// Try to resolve orphans in blocktree
|
||||
let orphans = blocktree.get_orphans(Some(MAX_ORPHANS));
|
||||
let mut orphans = blocktree.get_orphans(Some(MAX_ORPHANS));
|
||||
orphans.retain(|x| *x > root);
|
||||
|
||||
Self::generate_repairs_for_orphans(&orphans[..], &mut repairs);
|
||||
Ok(repairs)
|
||||
|
@ -430,11 +427,7 @@ mod test {
|
|||
blocktree.write_blobs(&blobs).unwrap();
|
||||
assert_eq!(
|
||||
RepairService::generate_repairs(&blocktree, 0, 2).unwrap(),
|
||||
vec![
|
||||
RepairType::HighestBlob(0, 0),
|
||||
RepairType::Orphan(0),
|
||||
RepairType::Orphan(2)
|
||||
]
|
||||
vec![RepairType::HighestBlob(0, 0), RepairType::Orphan(2)]
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -456,7 +449,7 @@ mod test {
|
|||
// Check that repair tries to patch the empty slot
|
||||
assert_eq!(
|
||||
RepairService::generate_repairs(&blocktree, 0, 2).unwrap(),
|
||||
vec![RepairType::HighestBlob(0, 0), RepairType::Orphan(0)]
|
||||
vec![RepairType::HighestBlob(0, 0)]
|
||||
);
|
||||
}
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
|
|
|
@ -420,13 +420,14 @@ impl ReplayStage {
|
|||
let mut rooted_banks = root_bank.parents();
|
||||
rooted_banks.push(root_bank);
|
||||
let rooted_slots: Vec<_> = rooted_banks.iter().map(|bank| bank.slot()).collect();
|
||||
// Call leader schedule_cache.set_root() before blocktree.set_root() because
|
||||
// bank_forks.root is consumed by repair_service to update gossip, so we don't want to
|
||||
// get blobs for repair on gossip before we update leader schedule, otherwise they may
|
||||
// get dropped.
|
||||
leader_schedule_cache.set_root(rooted_banks.last().unwrap());
|
||||
blocktree
|
||||
.set_roots(&rooted_slots)
|
||||
.expect("Ledger set roots failed");
|
||||
// Set root first in leader schedule_cache before bank_forks because bank_forks.root
|
||||
// is consumed by repair_service to update gossip, so we don't want to get blobs for
|
||||
// repair on gossip before we update leader schedule, otherwise they may get dropped.
|
||||
leader_schedule_cache.set_root(rooted_banks.last().unwrap());
|
||||
bank_forks
|
||||
.write()
|
||||
.unwrap()
|
||||
|
|
|
@ -224,7 +224,7 @@ impl Service for Tvu {
|
|||
pub mod tests {
|
||||
use super::*;
|
||||
use crate::banking_stage::create_test_recorder;
|
||||
use crate::blocktree::get_tmp_ledger_path;
|
||||
use crate::blocktree::create_new_tmp_ledger;
|
||||
use crate::cluster_info::{ClusterInfo, Node};
|
||||
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
|
||||
use solana_runtime::bank::Bank;
|
||||
|
@ -247,7 +247,7 @@ pub mod tests {
|
|||
cluster_info1.insert_info(leader.info.clone());
|
||||
let cref1 = Arc::new(RwLock::new(cluster_info1));
|
||||
|
||||
let blocktree_path = get_tmp_ledger_path!();
|
||||
let (blocktree_path, _) = create_new_tmp_ledger!(&genesis_block);
|
||||
let (blocktree, l_receiver, completed_slots_receiver) =
|
||||
Blocktree::open_with_signal(&blocktree_path)
|
||||
.expect("Expected to successfully open ledger");
|
||||
|
|
|
@ -15,6 +15,7 @@ use solana_runtime::{
|
|||
epoch_schedule::{EpochSchedule, MINIMUM_SLOTS_PER_EPOCH},
|
||||
};
|
||||
use solana_sdk::{client::SyncClient, poh_config::PohConfig, timing};
|
||||
use std::path::PathBuf;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
fs,
|
||||
|
@ -299,37 +300,116 @@ fn test_listener_startup() {
|
|||
assert_eq!(cluster_nodes.len(), 4);
|
||||
}
|
||||
|
||||
#[allow(unused_attributes)]
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_snapshots_restart_validity() {
|
||||
solana_logger::setup();
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
let snapshot_path = temp_dir.path().join("bank_states");
|
||||
let snapshot_package_output_path = temp_dir.path().join("tar");
|
||||
#[ignore]
|
||||
fn test_snapshots_blocktree_floor() {
|
||||
// First set up the cluster with 1 snapshotting leader
|
||||
let snapshot_interval_slots = 10;
|
||||
|
||||
// Create the snapshot directories
|
||||
fs::create_dir_all(&snapshot_path).expect("Failed to create snapshots bank state directory");
|
||||
fs::create_dir_all(&snapshot_package_output_path)
|
||||
.expect("Failed to create snapshots tar directory");
|
||||
|
||||
// Set up the cluster with 1 snapshotting validator
|
||||
let mut snapshot_validator_config = ValidatorConfig::default();
|
||||
snapshot_validator_config.rpc_config.enable_fullnode_exit = true;
|
||||
snapshot_validator_config.snapshot_config = Some(SnapshotConfig {
|
||||
snapshot_interval_slots,
|
||||
snapshot_package_output_path: snapshot_package_output_path.clone(),
|
||||
snapshot_path,
|
||||
});
|
||||
let num_account_paths = 4;
|
||||
let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths);
|
||||
let mut all_account_storage_dirs = vec![account_storage_dirs];
|
||||
snapshot_validator_config.account_paths = Some(account_storage_paths);
|
||||
|
||||
let leader_snapshot_test_config =
|
||||
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
|
||||
let validator_snapshot_test_config =
|
||||
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
|
||||
|
||||
let snapshot_package_output_path = &leader_snapshot_test_config
|
||||
.validator_config
|
||||
.snapshot_config
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.snapshot_package_output_path;
|
||||
|
||||
let config = ClusterConfig {
|
||||
node_stakes: vec![10000],
|
||||
cluster_lamports: 100000,
|
||||
validator_configs: vec![snapshot_validator_config.clone()],
|
||||
validator_configs: vec![leader_snapshot_test_config.validator_config.clone()],
|
||||
..ClusterConfig::default()
|
||||
};
|
||||
|
||||
let mut cluster = LocalCluster::new(&config);
|
||||
|
||||
trace!("Waiting for snapshot tar to be generated with slot",);
|
||||
|
||||
let tar = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path);
|
||||
loop {
|
||||
if tar.exists() {
|
||||
trace!("snapshot tar exists");
|
||||
break;
|
||||
}
|
||||
sleep(Duration::from_millis(5000));
|
||||
}
|
||||
|
||||
// Copy tar to validator's snapshot output directory
|
||||
let validator_tar_path =
|
||||
snapshot_utils::get_snapshot_tar_path(&validator_snapshot_test_config.snapshot_output_path);
|
||||
fs::hard_link(tar, &validator_tar_path).unwrap();
|
||||
let slot_floor = snapshot_utils::bank_slot_from_archive(&validator_tar_path).unwrap();
|
||||
|
||||
// Start up a new node from a snapshot, wait for it to catchup with the leader
|
||||
let validator_stake = 5;
|
||||
cluster.add_validator(
|
||||
&validator_snapshot_test_config.validator_config,
|
||||
validator_stake,
|
||||
);
|
||||
let all_pubkeys = cluster.get_node_pubkeys();
|
||||
let validator_id = all_pubkeys
|
||||
.into_iter()
|
||||
.find(|x| *x != cluster.entry_point_info.id)
|
||||
.unwrap();
|
||||
let validator_client = cluster.get_validator_client(&validator_id).unwrap();
|
||||
let mut current_slot = 0;
|
||||
|
||||
// Make sure this validator can get repaired past the first few warmup epochs
|
||||
let target_slot = slot_floor + 40;
|
||||
while current_slot <= target_slot {
|
||||
trace!("current_slot: {}", current_slot);
|
||||
if let Ok(slot) = validator_client.get_slot() {
|
||||
current_slot = slot;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
sleep(Duration::from_secs(1));
|
||||
}
|
||||
|
||||
// Check the validator ledger doesn't contain any slots < slot_floor
|
||||
cluster.close_preserve_ledgers();
|
||||
let validator_ledger_path = &cluster.fullnode_infos[&validator_id];
|
||||
let blocktree = Blocktree::open(&validator_ledger_path.info.ledger_path).unwrap();
|
||||
|
||||
// Skip the zeroth slot in blocktree that the ledger is initialized with
|
||||
let (first_slot, _) = blocktree.slot_meta_iterator(1).unwrap().next().unwrap();
|
||||
assert_eq!(first_slot, slot_floor);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_snapshots_restart_validity() {
|
||||
solana_logger::setup();
|
||||
let snapshot_interval_slots = 10;
|
||||
let num_account_paths = 4;
|
||||
let mut snapshot_test_config =
|
||||
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
|
||||
let snapshot_package_output_path = &snapshot_test_config
|
||||
.validator_config
|
||||
.snapshot_config
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.snapshot_package_output_path;
|
||||
|
||||
// Set up the cluster with 1 snapshotting validator
|
||||
let mut all_account_storage_dirs = vec![vec![]];
|
||||
|
||||
std::mem::swap(
|
||||
&mut all_account_storage_dirs[0],
|
||||
&mut snapshot_test_config.account_storage_dirs,
|
||||
);
|
||||
|
||||
let config = ClusterConfig {
|
||||
node_stakes: vec![10000],
|
||||
cluster_lamports: 100000,
|
||||
validator_configs: vec![snapshot_test_config.validator_config.clone()],
|
||||
..ClusterConfig::default()
|
||||
};
|
||||
|
||||
|
@ -381,12 +461,12 @@ fn test_snapshots_restart_validity() {
|
|||
let (new_account_storage_dirs, new_account_storage_paths) =
|
||||
generate_account_paths(num_account_paths);
|
||||
all_account_storage_dirs.push(new_account_storage_dirs);
|
||||
snapshot_validator_config.account_paths = Some(new_account_storage_paths);
|
||||
snapshot_test_config.validator_config.account_paths = Some(new_account_storage_paths);
|
||||
|
||||
// Restart a node
|
||||
trace!("Restarting cluster from snapshot");
|
||||
let nodes = cluster.get_node_pubkeys();
|
||||
cluster.restart_node(nodes[0], &snapshot_validator_config);
|
||||
cluster.restart_node(nodes[0], &snapshot_test_config.validator_config);
|
||||
|
||||
// Verify account balances on validator
|
||||
trace!("Verifying balances");
|
||||
|
@ -586,3 +666,40 @@ fn generate_account_paths(num_account_paths: usize) -> (Vec<TempDir>, String) {
|
|||
let account_storage_paths = AccountsDB::format_paths(account_storage_paths);
|
||||
(account_storage_dirs, account_storage_paths)
|
||||
}
|
||||
|
||||
struct SnapshotValidatorConfig {
|
||||
_snapshot_dir: TempDir,
|
||||
snapshot_output_path: TempDir,
|
||||
account_storage_dirs: Vec<TempDir>,
|
||||
validator_config: ValidatorConfig,
|
||||
}
|
||||
|
||||
fn setup_snapshot_validator_config(
|
||||
snapshot_interval_slots: usize,
|
||||
num_account_paths: usize,
|
||||
) -> SnapshotValidatorConfig {
|
||||
// Create the snapshot config
|
||||
let snapshot_dir = TempDir::new().unwrap();
|
||||
let snapshot_output_path = TempDir::new().unwrap();
|
||||
let snapshot_config = SnapshotConfig {
|
||||
snapshot_interval_slots,
|
||||
snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()),
|
||||
snapshot_path: PathBuf::from(snapshot_dir.path()),
|
||||
};
|
||||
|
||||
// Create the account paths
|
||||
let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths);
|
||||
|
||||
// Create the validator config
|
||||
let mut validator_config = ValidatorConfig::default();
|
||||
validator_config.rpc_config.enable_fullnode_exit = true;
|
||||
validator_config.snapshot_config = Some(snapshot_config);
|
||||
validator_config.account_paths = Some(account_storage_paths);
|
||||
|
||||
SnapshotValidatorConfig {
|
||||
_snapshot_dir: snapshot_dir,
|
||||
snapshot_output_path,
|
||||
account_storage_dirs,
|
||||
validator_config,
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue