Add blockstore column for frozen hashes and duplicate confirmed (#18533)

This commit is contained in:
carllin 2021-07-12 20:59:16 -07:00 committed by GitHub
parent cfece66403
commit a1c0f144f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 208 additions and 68 deletions

View File

@ -1,5 +1,5 @@
use crate::blockstore::*;
use solana_sdk::clock::Slot;
use solana_sdk::{clock::Slot, hash::Hash};
pub struct AncestorIterator<'a> {
current: Option<Slot>,
@ -50,73 +50,74 @@ impl<'a> Iterator for AncestorIterator<'a> {
}
}
pub struct AncestorIteratorWithHash<'a> {
ancestor_iterator: AncestorIterator<'a>,
}
impl<'a> From<AncestorIterator<'a>> for AncestorIteratorWithHash<'a> {
fn from(ancestor_iterator: AncestorIterator<'a>) -> Self {
Self { ancestor_iterator }
}
}
impl<'a> Iterator for AncestorIteratorWithHash<'a> {
type Item = (Slot, Hash);
fn next(&mut self) -> Option<Self::Item> {
self.ancestor_iterator
.next()
.and_then(|next_ancestor_slot| {
self.ancestor_iterator
.blockstore
.get_bank_hash(next_ancestor_slot)
.map(|next_ancestor_hash| (next_ancestor_slot, next_ancestor_hash))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blockstore_processor::fill_blockstore_slot_with_ticks;
use solana_sdk::hash::Hash;
use std::{collections::HashMap, path::Path};
use trees::tr;
fn setup_forks(blockstore_path: &Path) -> Blockstore {
let blockstore = Blockstore::open(blockstore_path).unwrap();
/*
Build fork structure:
slot 0
|
slot 1
/ \
slot 2 |
| |
slot 3 |
|
|
slot 4
*/
let tree = tr(0) / (tr(1) / (tr(2) / (tr(3))) / (tr(4)));
blockstore.add_tree(tree, true, true, 2, Hash::default());
blockstore
}
#[test]
fn test_ancestor_iterator() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore.set_roots(std::iter::once(&0)).unwrap();
let ticks_per_slot = 5;
{
let blockstore = setup_forks(&blockstore_path);
/*
Build a blockstore in the ledger with the following fork structure:
slot 0
|
slot 1
/ \
slot 2 |
/ |
slot 3 |
|
slot 4
*/
// Fork 1, ending at slot 3
let last_entry_hash = Hash::default();
let fork_point = 1;
let mut fork_hash = Hash::default();
for slot in 0..=3 {
let parent = {
if slot == 0 {
0
} else {
slot - 1
}
};
let last_entry_hash = fill_blockstore_slot_with_ticks(
&blockstore,
ticks_per_slot,
slot,
parent,
last_entry_hash,
// Test correctness
assert!(AncestorIterator::new(0, &blockstore).next().is_none());
assert_eq!(
AncestorIterator::new(4, &blockstore).collect::<Vec<Slot>>(),
vec![1, 0]
);
assert_eq!(
AncestorIterator::new(3, &blockstore).collect::<Vec<Slot>>(),
vec![2, 1, 0]
);
if slot == fork_point {
fork_hash = last_entry_hash;
}
}
// Fork 2, ending at slot 4
let _ =
fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, 4, fork_point, fork_hash);
// Test correctness
assert!(AncestorIterator::new(0, &blockstore).next().is_none());
assert_eq!(
AncestorIterator::new(4, &blockstore).collect::<Vec<Slot>>(),
vec![1, 0]
);
assert_eq!(
AncestorIterator::new(3, &blockstore).collect::<Vec<Slot>>(),
vec![2, 1, 0]
);
Blockstore::destroy(&blockstore_path).unwrap();
}
#[test]
@ -147,4 +148,42 @@ mod tests {
vec![] as Vec<Slot>
);
}
#[test]
fn test_ancestor_iterator_with_hash() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = setup_forks(&blockstore_path);
// Insert frozen hashes
let mut slot_to_bank_hash = HashMap::new();
for slot in 0..=4 {
let bank_hash = Hash::new_unique();
slot_to_bank_hash.insert(slot, bank_hash);
blockstore.insert_bank_hash(slot, bank_hash, false);
}
// Test correctness
assert!(
AncestorIteratorWithHash::from(AncestorIterator::new(0, &blockstore))
.next()
.is_none()
);
assert_eq!(
AncestorIteratorWithHash::from(AncestorIterator::new(4, &blockstore))
.collect::<Vec<(Slot, Hash)>>(),
vec![(1, slot_to_bank_hash[&1]), (0, slot_to_bank_hash[&0])]
);
assert_eq!(
AncestorIteratorWithHash::from(AncestorIterator::new(3, &blockstore))
.collect::<Vec<(Slot, Hash)>>(),
vec![
(2, slot_to_bank_hash[&2]),
(1, slot_to_bank_hash[&1]),
(0, slot_to_bank_hash[&0])
]
);
}
Blockstore::destroy(&blockstore_path).unwrap();
}
}

View File

@ -144,6 +144,7 @@ pub struct Blockstore {
perf_samples_cf: LedgerColumn<cf::PerfSamples>,
block_height_cf: LedgerColumn<cf::BlockHeight>,
program_costs_cf: LedgerColumn<cf::ProgramCosts>,
bank_hash_cf: LedgerColumn<cf::BankHash>,
last_root: Arc<RwLock<Slot>>,
insert_shreds_lock: Arc<Mutex<()>>,
pub new_shreds_signals: Vec<SyncSender<bool>>,
@ -344,6 +345,7 @@ impl Blockstore {
let perf_samples_cf = db.column();
let block_height_cf = db.column();
let program_costs_cf = db.column();
let bank_hash_cf = db.column();
let db = Arc::new(db);
@ -393,6 +395,7 @@ impl Blockstore {
perf_samples_cf,
block_height_cf,
program_costs_cf,
bank_hash_cf,
new_shreds_signals: vec![],
completed_slots_senders: vec![],
insert_shreds_lock: Arc::new(Mutex::new(())),
@ -2949,6 +2952,54 @@ impl Blockstore {
}
}
pub fn insert_bank_hash(&self, slot: Slot, frozen_hash: Hash, is_duplicate_confirmed: bool) {
if let Some(prev_value) = self.bank_hash_cf.get(slot).unwrap() {
if prev_value.frozen_hash() == frozen_hash && prev_value.is_duplicate_confirmed() {
// Don't overwrite is_duplicate_confirmed == true with is_duplicate_confirmed == false,
// which may happen on startup when procesing from blockstore processor because the
// blocks may not reflect earlier observed gossip votes from before the restart.
return;
}
}
let data = FrozenHashVersioned::Current(FrozenHashStatus {
frozen_hash,
is_duplicate_confirmed,
});
self.bank_hash_cf.put(slot, &data).unwrap()
}
pub fn get_bank_hash(&self, slot: Slot) -> Option<Hash> {
self.bank_hash_cf
.get(slot)
.unwrap()
.map(|versioned| versioned.frozen_hash())
}
pub fn is_duplicate_confirmed(&self, slot: Slot) -> bool {
self.bank_hash_cf
.get(slot)
.unwrap()
.map(|versioned| versioned.is_duplicate_confirmed())
.unwrap_or(false)
}
pub fn set_duplicate_confirmed_slots_and_hashes(
&self,
duplicate_confirmed_slot_hashes: impl Iterator<Item = (Slot, Hash)>,
) -> Result<()> {
let mut write_batch = self.db.batch()?;
for (slot, frozen_hash) in duplicate_confirmed_slot_hashes {
let data = FrozenHashVersioned::Current(FrozenHashStatus {
frozen_hash,
is_duplicate_confirmed: true,
});
write_batch.put::<cf::BankHash>(slot, &data)?;
}
self.db.write(write_batch)?;
Ok(())
}
pub fn set_roots<'a>(&self, rooted_slots: impl Iterator<Item = &'a Slot>) -> Result<()> {
let mut write_batch = self.db.batch()?;
let mut max_new_rooted_slot = 0;

View File

@ -135,6 +135,10 @@ impl Blockstore {
.db
.delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::BankHash>(&mut write_batch, from_slot, to_slot)
.is_ok()
& self
.db
.delete_range_cf::<cf::Root>(&mut write_batch, from_slot, to_slot)
@ -264,6 +268,10 @@ impl Blockstore {
.orphans_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.bank_hash_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.index_cf
.compact_range(from_slot, to_slot)

View File

@ -47,6 +47,8 @@ const DUPLICATE_SLOTS_CF: &str = "duplicate_slots";
const ERASURE_META_CF: &str = "erasure_meta";
// Column family for orphans data
const ORPHANS_CF: &str = "orphans";
/// Column family for bank hashes
const BANK_HASH_CF: &str = "bank_hashes";
// Column family for root data
const ROOT_CF: &str = "root";
/// Column family for indexes
@ -133,6 +135,10 @@ pub mod columns {
/// The erasure meta column
pub struct ErasureMeta;
#[derive(Debug)]
/// The bank hash column
pub struct BankHash;
#[derive(Debug)]
/// The root column
pub struct Root;
@ -262,11 +268,7 @@ impl Rocks {
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Rocks> {
use columns::{
AddressSignatures, BlockHeight, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta,
Index, Orphans, PerfSamples, ProgramCosts, Rewards, Root, ShredCode, ShredData,
SlotMeta, TransactionStatus, TransactionStatusIndex,
};
use columns::*;
fs::create_dir_all(&path)?;
@ -302,6 +304,10 @@ impl Rocks {
Orphans::NAME,
get_cf_options::<Orphans>(&access_type, &oldest_slot),
);
let bank_hash_cf_descriptor = ColumnFamilyDescriptor::new(
BankHash::NAME,
get_cf_options::<BankHash>(&access_type, &oldest_slot),
);
let root_cf_descriptor = ColumnFamilyDescriptor::new(
Root::NAME,
get_cf_options::<Root>(&access_type, &oldest_slot),
@ -359,6 +365,7 @@ impl Rocks {
(DuplicateSlots::NAME, duplicate_slots_cf_descriptor),
(ErasureMeta::NAME, erasure_meta_cf_descriptor),
(Orphans::NAME, orphans_cf_descriptor),
(BankHash::NAME, bank_hash_cf_descriptor),
(Root::NAME, root_cf_descriptor),
(Index::NAME, index_cf_descriptor),
(ShredData::NAME, shred_data_cf_descriptor),
@ -472,11 +479,7 @@ impl Rocks {
}
fn columns(&self) -> Vec<&'static str> {
use columns::{
AddressSignatures, BlockHeight, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta,
Index, Orphans, PerfSamples, ProgramCosts, Rewards, Root, ShredCode, ShredData,
SlotMeta, TransactionStatus, TransactionStatusIndex,
};
use columns::*;
vec![
ErasureMeta::NAME,
@ -484,6 +487,7 @@ impl Rocks {
DuplicateSlots::NAME,
Index::NAME,
Orphans::NAME,
BankHash::NAME,
Root::NAME,
SlotMeta::NAME,
ShredData::NAME,
@ -887,6 +891,14 @@ impl TypedColumn for columns::Orphans {
type Type = bool;
}
impl SlotColumn for columns::BankHash {}
impl ColumnName for columns::BankHash {
const NAME: &'static str = BANK_HASH_CF;
}
impl TypedColumn for columns::BankHash {
type Type = blockstore_meta::FrozenHashVersioned;
}
impl SlotColumn for columns::Root {}
impl ColumnName for columns::Root {
const NAME: &'static str = ROOT_CF;

View File

@ -1,6 +1,6 @@
use crate::erasure::ErasureConfig;
use serde::{Deserialize, Serialize};
use solana_sdk::clock::Slot;
use solana_sdk::{clock::Slot, hash::Hash};
use std::{collections::BTreeSet, ops::RangeBounds};
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
@ -75,6 +75,33 @@ pub enum ErasureMetaStatus {
StillNeed(usize),
}
#[derive(Deserialize, Serialize, Debug, PartialEq)]
pub enum FrozenHashVersioned {
Current(FrozenHashStatus),
}
impl FrozenHashVersioned {
pub fn frozen_hash(&self) -> Hash {
match self {
FrozenHashVersioned::Current(frozen_hash_status) => frozen_hash_status.frozen_hash,
}
}
pub fn is_duplicate_confirmed(&self) -> bool {
match self {
FrozenHashVersioned::Current(frozen_hash_status) => {
frozen_hash_status.is_duplicate_confirmed
}
}
}
}
#[derive(Deserialize, Serialize, Debug, PartialEq)]
pub struct FrozenHashStatus {
pub frozen_hash: Hash,
pub is_duplicate_confirmed: bool,
}
impl Index {
pub(crate) fn new(slot: Slot) -> Self {
Index {

View File

@ -862,6 +862,7 @@ fn process_bank_0(
)
.expect("processing for bank 0 must succeed");
bank0.freeze();
blockstore.insert_bank_hash(bank0.slot(), bank0.hash(), false);
cache_block_meta(bank0, cache_block_meta_sender);
}
@ -1034,6 +1035,7 @@ fn load_frozen_forks(
}
inc_new_counter_info!("load_frozen_forks-cluster-confirmed-root", rooted_slots.len());
blockstore.set_roots(rooted_slots.iter().map(|(slot, _hash)| slot)).expect("Blockstore::set_roots should succeed");
blockstore.set_duplicate_confirmed_slots_and_hashes(rooted_slots.into_iter()).expect("Blockstore::set_duplicate_confirmed should succeed");
Some(cluster_root_bank)
} else {
None
@ -1179,6 +1181,7 @@ fn process_single_slot(
})?;
bank.freeze(); // all banks handled by this routine are created from complete slots
blockstore.insert_bank_hash(bank.slot(), bank.hash(), false);
cache_block_meta(bank, cache_block_meta_sender);
Ok(())