Modify Roots Column To Support Multiple Roots (#4321)

* Fix 1) Roots column family to handle storing multiple slots, 2) Store all slots on the rooted path in the roots column family
This commit is contained in:
carllin 2019-05-20 19:04:18 -07:00 committed by GitHub
parent 7153abd483
commit f1e5edee14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 163 additions and 41 deletions

View File

@ -113,7 +113,7 @@ impl Blocktree {
// Open the database
let db = Database::open(&ledger_path)?;
let batch_processor = Arc::new(RwLock::new(db.batch_processor()));
let batch_processor = unsafe { Arc::new(RwLock::new(db.batch_processor())) };
// Create the metadata column family
let meta_cf = db.column();
@ -804,24 +804,32 @@ impl Blocktree {
}
pub fn is_root(&self, slot: u64) -> bool {
if let Ok(Some(root_slot)) = self.db.get::<cf::Root>(()) {
root_slot == slot
if let Ok(Some(true)) = self.db.get::<cf::Root>(slot) {
true
} else {
false
}
}
pub fn set_root(&self, slot: u64) -> Result<()> {
self.db.put::<cf::Root>((), &slot)?;
pub fn set_root(&self, new_root: u64, prev_root: u64) -> Result<()> {
let mut current_slot = new_root;
unsafe {
let mut batch_processor = self.db.batch_processor();
let mut write_batch = batch_processor.batch()?;
if new_root == 0 {
write_batch.put::<cf::Root>(0, &true)?;
} else {
while current_slot != prev_root {
write_batch.put::<cf::Root>(current_slot, &true)?;
current_slot = self.meta(current_slot).unwrap().unwrap().parent_slot;
}
}
batch_processor.write(write_batch)?;
}
Ok(())
}
pub fn get_root(&self) -> Result<u64> {
let root_opt = self.db.get::<cf::Root>(())?;
Ok(root_opt.unwrap_or(0))
}
pub fn get_orphans(&self, max: Option<usize>) -> Vec<u64> {
let mut results = vec![];
@ -3112,6 +3120,40 @@ pub mod tests {
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_set_root() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
blocktree.set_root(0, 0).unwrap();
let chained_slots = vec![0, 2, 4, 7, 12, 15];
// Make a chain of slots
let all_blobs = make_chaining_slot_entries(&chained_slots, 10);
// Insert the chain of slots into the ledger
for (slot_blobs, _) in all_blobs {
blocktree.insert_data_blobs(&slot_blobs[..]).unwrap();
}
blocktree.set_root(4, 0).unwrap();
for i in &chained_slots[0..3] {
assert!(blocktree.is_root(*i));
}
for i in &chained_slots[3..] {
assert!(!blocktree.is_root(*i));
}
blocktree.set_root(15, 4).unwrap();
for i in chained_slots {
assert!(blocktree.is_root(i));
}
drop(blocktree);
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
mod erasure {
use super::*;
use crate::blocktree::meta::ErasureMetaStatus;

View File

@ -279,7 +279,12 @@ where
}
}
pub fn batch_processor(&self) -> BatchProcessor<B> {
// Note this returns an object that can be used to directly write to multiple column families.
// This circumvents the synchronization around APIs that in Blocktree that use
// blocktree.batch_processor, so this API should only be used if the caller is sure they
// are writing to data iin columns that will not be corrupted by any simultaneous blocktree
// operations.
pub unsafe fn batch_processor(&self) -> BatchProcessor<B> {
BatchProcessor {
backend: Arc::clone(&self.backend),
}

View File

@ -121,17 +121,21 @@ impl TypedColumn<Kvs> for cf::Orphans {
impl Column<Kvs> for cf::Root {
const NAME: &'static str = super::ROOT_CF;
type Index = ();
type Index = u64;
fn key(_: ()) -> Key {
Key::default()
fn key(slot: u64) -> Key {
let mut key = Key::default();
BigEndian::write_u64(&mut key.0[8..16], slot);
key
}
fn index(_: &Key) {}
fn index(key: &Key) -> u64 {
BigEndian::read_u64(&key.0[8..16])
}
}
impl TypedColumn<Kvs> for cf::Root {
type Type = u64;
type Type = bool;
}
impl Column<Kvs> for cf::SlotMeta {

View File

@ -182,17 +182,21 @@ impl TypedColumn<Rocks> for cf::Orphans {
impl Column<Rocks> for cf::Root {
const NAME: &'static str = super::ROOT_CF;
type Index = ();
type Index = u64;
fn key(_: ()) -> Vec<u8> {
vec![0; 8]
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(_: &[u8]) {}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
}
impl TypedColumn<Rocks> for cf::Root {
type Type = u64;
type Type = bool;
}
impl Column<Rocks> for cf::SlotMeta {

View File

@ -149,6 +149,8 @@ pub fn process_blocktree(
vec![(slot, meta, bank, entry_height, last_entry_hash)]
};
blocktree.set_root(0, 0).expect("Couldn't set first root");
let leader_schedule_cache = LeaderScheduleCache::new(*pending_slots[0].2.epoch_schedule(), 0);
let mut fork_info = vec![];
@ -235,7 +237,7 @@ pub fn process_blocktree(
.unwrap();
// only process full slots in blocktree_processor, replay_stage
// handles any partials
// handles any partials
if next_meta.is_full() {
let next_bank = Arc::new(Bank::new_from_parent(
&bank,
@ -285,6 +287,7 @@ mod tests {
use crate::blocktree::tests::entries_to_blobs;
use crate::entry::{create_ticks, next_entry, next_entry_mut, Entry};
use crate::genesis_utils::{create_genesis_block, create_genesis_block_with_leader};
use solana_runtime::epoch_schedule::EpochSchedule;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::InstructionError;
use solana_sdk::pubkey::Pubkey;
@ -409,7 +412,7 @@ mod tests {
info!("last_fork1_entry.hash: {:?}", last_fork1_entry_hash);
info!("last_fork2_entry.hash: {:?}", last_fork2_entry_hash);
blocktree.set_root(4).unwrap();
blocktree.set_root(4, 0).unwrap();
let (bank_forks, bank_forks_info, _) =
process_blocktree(&genesis_block, &blocktree, None).unwrap();
@ -483,8 +486,8 @@ mod tests {
info!("last_fork1_entry.hash: {:?}", last_fork1_entry_hash);
info!("last_fork2_entry.hash: {:?}", last_fork2_entry_hash);
blocktree.set_root(0).unwrap();
blocktree.set_root(1).unwrap();
blocktree.set_root(0, 0).unwrap();
blocktree.set_root(1, 0).unwrap();
let (bank_forks, bank_forks_info, _) =
process_blocktree(&genesis_block, &blocktree, None).unwrap();
@ -530,6 +533,63 @@ mod tests {
}
}
#[test]
fn test_process_blocktree_epoch_boundary_root() {
solana_logger::setup();
let (genesis_block, _mint_keypair) = create_genesis_block(10_000);
let ticks_per_slot = genesis_block.ticks_per_slot;
// Create a new ledger with slot 0 full of ticks
let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_block);
let mut last_entry_hash = blockhash;
let blocktree =
Blocktree::open(&ledger_path).expect("Expected to successfully open database ledger");
// Let last_slot be the number of slots in the first two epochs
let epoch_schedule = get_epoch_schedule(&genesis_block, None);
let last_slot = epoch_schedule.get_last_slot_in_epoch(1);
// Create a single chain of slots with all indexes in the range [0, last_slot + 1]
for i in 1..=last_slot + 1 {
last_entry_hash = fill_blocktree_slot_with_ticks(
&blocktree,
ticks_per_slot,
i,
i - 1,
last_entry_hash,
);
}
// Set a root on the last slot of the last confirmed epoch
blocktree.set_root(last_slot, 0).unwrap();
// Set a root on the next slot of the confrimed epoch
blocktree.set_root(last_slot + 1, last_slot).unwrap();
// Check that we can properly restart the ledger / leader scheduler doesn't fail
let (bank_forks, bank_forks_info, _) =
process_blocktree(&genesis_block, &blocktree, None).unwrap();
assert_eq!(bank_forks_info.len(), 1); // There is one fork
assert_eq!(
bank_forks_info[0],
BankForksInfo {
bank_slot: last_slot + 1, // Head is last_slot + 1
entry_height: ticks_per_slot * (last_slot + 2),
}
);
// The latest root should have purged all its parents
assert!(&bank_forks[last_slot + 1]
.parents()
.iter()
.map(|bank| bank.slot())
.collect::<Vec<_>>()
.is_empty());
}
#[test]
fn test_first_err() {
assert_eq!(first_err(&[Ok(())]), Ok(()));
@ -1111,4 +1171,12 @@ mod tests {
bank.squash();
}
}
fn get_epoch_schedule(
genesis_block: &GenesisBlock,
account_paths: Option<String>,
) -> EpochSchedule {
let bank = Bank::new_with_paths(&genesis_block, account_paths);
bank.epoch_schedule().clone()
}
}

View File

@ -136,7 +136,7 @@ impl RepairService {
&cluster_info,
completed_slots_receiver,
);
Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH)
Self::generate_repairs(blocktree, root, MAX_REPAIR_LENGTH)
}
}
};
@ -207,11 +207,14 @@ impl RepairService {
Ok(repairs)
}
fn generate_repairs(blocktree: &Blocktree, max_repairs: usize) -> Result<(Vec<RepairType>)> {
fn generate_repairs(
blocktree: &Blocktree,
root: u64,
max_repairs: usize,
) -> Result<(Vec<RepairType>)> {
// Slot height and blob indexes for blobs we want to repair
let mut repairs: Vec<RepairType> = vec![];
let slot = blocktree.get_root()?;
Self::generate_repairs_for_fork(blocktree, &mut repairs, max_repairs, slot);
Self::generate_repairs_for_fork(blocktree, &mut repairs, max_repairs, root);
// TODO: Incorporate gossip to determine priorities for repair?
@ -382,7 +385,7 @@ mod test {
blobs.extend(blobs2);
blocktree.write_blobs(&blobs).unwrap();
assert_eq!(
RepairService::generate_repairs(&blocktree, 2).unwrap(),
RepairService::generate_repairs(&blocktree, 0, 2).unwrap(),
vec![
RepairType::HighestBlob(0, 0),
RepairType::Orphan(0),
@ -408,7 +411,7 @@ mod test {
// Check that repair tries to patch the empty slot
assert_eq!(
RepairService::generate_repairs(&blocktree, 2).unwrap(),
RepairService::generate_repairs(&blocktree, 0, 2).unwrap(),
vec![RepairType::HighestBlob(0, 0), RepairType::Orphan(0)]
);
}
@ -447,12 +450,12 @@ mod test {
.collect();
assert_eq!(
RepairService::generate_repairs(&blocktree, std::usize::MAX).unwrap(),
RepairService::generate_repairs(&blocktree, 0, std::usize::MAX).unwrap(),
expected
);
assert_eq!(
RepairService::generate_repairs(&blocktree, expected.len() - 2).unwrap()[..],
RepairService::generate_repairs(&blocktree, 0, expected.len() - 2).unwrap()[..],
expected[0..expected.len() - 2]
);
}
@ -479,7 +482,7 @@ mod test {
let expected: Vec<RepairType> = vec![RepairType::HighestBlob(0, num_entries_per_slot)];
assert_eq!(
RepairService::generate_repairs(&blocktree, std::usize::MAX).unwrap(),
RepairService::generate_repairs(&blocktree, 0, std::usize::MAX).unwrap(),
expected
);
}

View File

@ -97,11 +97,6 @@ impl ReplayStage {
let my_id = *my_id;
let mut ticks_per_slot = 0;
let mut locktower = Locktower::new_from_forks(&bank_forks.read().unwrap(), &my_id);
if let Some(root) = locktower.root() {
blocktree
.set_root(root)
.expect("blocktree.set_root() failed at replay_stage startup");
}
// Start the replay stage loop
let leader_schedule_cache = leader_schedule_cache.clone();
let vote_account = *vote_account;
@ -321,9 +316,10 @@ impl ReplayStage {
.map(|bank| bank.slot())
.collect::<Vec<_>>();
rooted_slots.push(root_bank.slot());
let old_root = bank_forks.read().unwrap().root();
bank_forks.write().unwrap().set_root(new_root);
leader_schedule_cache.set_root(new_root);
blocktree.set_root(new_root)?;
blocktree.set_root(new_root, old_root)?;
Self::handle_new_root(&bank_forks, progress);
root_slot_sender.send(rooted_slots)?;
}