process_blocktree() now halts forks at the first partial slot

This commit is contained in:
Michael Vines 2019-02-28 12:09:19 -08:00
parent d889e77fba
commit 6cf6a1ccc3
7 changed files with 157 additions and 37 deletions

View File

@ -28,6 +28,18 @@ impl BankForks {
}
}
pub fn new_from_banks(initial_banks: &[Arc<Bank>]) -> Self {
let mut banks = HashMap::new();
let working_bank = initial_banks[0].clone();
for bank in initial_banks {
banks.insert(bank.slot(), bank.clone());
}
Self {
banks,
working_bank,
}
}
// TODO: use the bank's own ID instead of receiving a parameter?
pub fn insert(&mut self, bank_id: u64, bank: Bank) {
let mut bank = Arc::new(bank);

View File

@ -1290,7 +1290,7 @@ impl Iterator for EntryIterator {
// Creates a new ledger with slot 0 full of ticks (and only ticks).
//
// Returns the last_id that can be used to start slot 1 entries with.
// Returns the last_id that can be used to append entries with.
pub fn create_new_ledger(ledger_path: &str, genesis_block: &GenesisBlock) -> Result<Hash> {
let ticks_per_slot = genesis_block.ticks_per_slot;
Blocktree::destroy(ledger_path)?;
@ -1298,7 +1298,7 @@ pub fn create_new_ledger(ledger_path: &str, genesis_block: &GenesisBlock) -> Res
// Fill slot 0 with ticks that link back to the genesis_block to bootstrap the ledger.
let blocktree = Blocktree::open_config(ledger_path, ticks_per_slot)?;
let entries = crate::entry::create_ticks(genesis_block.ticks_per_slot, genesis_block.last_id());
let entries = crate::entry::create_ticks(ticks_per_slot, genesis_block.last_id());
blocktree.write_entries(0, 0, 0, &entries)?;
Ok(entries.last().unwrap().id)

View File

@ -9,6 +9,7 @@ use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::timing::duration_as_ms;
use solana_sdk::timing::MAX_ENTRY_IDS;
use std::sync::Arc;
use std::time::Instant;
pub fn process_entry(bank: &Bank, entry: &Entry) -> Result<()> {
@ -113,23 +114,25 @@ pub fn process_blocktree(
info!("processing ledger...");
// Setup bank for slot 0
let (mut bank_forks, mut pending_slots) = {
let bank = Bank::new_with_paths(&genesis_block, account_paths);
let mut pending_slots = {
let slot = 0;
let bank = Arc::new(Bank::new_with_paths(&genesis_block, account_paths));
let entry_height = 0;
let last_entry_id = bank.last_id();
(
BankForks::new(slot, bank),
vec![(slot, entry_height, last_entry_id)],
)
vec![(slot, bank, entry_height, last_entry_id)]
};
let mut bank_forks_info = vec![];
let mut fork_info = vec![];
while !pending_slots.is_empty() {
let (slot, mut entry_height, mut last_entry_id) = pending_slots.pop().unwrap();
let (slot, starting_bank, starting_entry_height, mut last_entry_id) =
pending_slots.pop().unwrap();
let bank = bank_forks[slot].clone();
let bank = Arc::new(Bank::new_from_parent_and_id(
&starting_bank,
leader_schedule_utils::slot_leader_at(slot, &starting_bank),
starting_bank.slot(),
));
let mut entry_height = starting_entry_height;
// Load the metadata for this slot
let meta = blocktree
@ -181,32 +184,50 @@ pub fn process_blocktree(
let slot_complete =
leader_schedule_utils::num_ticks_left_in_slot(&bank, bank.tick_height()) == 0;
if !slot_complete {
// Slot was not complete, clear out any partial entries
// TODO: Walk |meta.next_slots| and clear all child slots too?
blocktree.reset_slot_consumed(slot).map_err(|err| {
warn!("Failed to reset partial slot {}: {:?}", slot, err);
BankError::LedgerVerificationFailed
})?;
if !slot_complete || meta.next_slots.is_empty() {
// Reached the end of this fork. Record the final entry height and last entry id
bank_forks_info.push(BankForksInfo {
let bfi = BankForksInfo {
bank_id: slot,
entry_height,
last_entry_id,
next_blob_index: meta.consumed,
});
entry_height: starting_entry_height,
last_entry_id: starting_bank.last_id(),
next_blob_index: 0,
};
fork_info.push((starting_bank, bfi));
continue;
}
// reached end of slot, look for next slots
// TODO merge with locktower, voting
bank.squash();
if meta.next_slots.is_empty() {
// Reached the end of this fork. Record the final entry height and last entry id
let bfi = BankForksInfo {
bank_id: slot,
entry_height,
last_entry_id: bank.last_id(),
next_blob_index: meta.consumed,
};
fork_info.push((bank, bfi));
continue;
}
// This is a fork point, create a new child bank for each fork
pending_slots.extend(meta.next_slots.iter().map(|next_slot| {
let leader = leader_schedule_utils::slot_leader_at(*next_slot, &bank);
let child_bank = Bank::new_from_parent_and_id(&bank, leader, *next_slot);
let child_bank = Arc::new(Bank::new_from_parent_and_id(
&bank,
leader_schedule_utils::slot_leader_at(*next_slot, &bank),
*next_slot,
));
trace!("Add child bank for slot={}", next_slot);
bank_forks.insert(*next_slot, child_bank);
(*next_slot, entry_height, last_entry_id)
// bank_forks.insert(*next_slot, child_bank);
(*next_slot, child_bank, entry_height, last_entry_id)
}));
// reverse sort by slot, so the next slot to be processed can be pop()ed
@ -214,6 +235,8 @@ pub fn process_blocktree(
pending_slots.sort_by(|a, b| b.0.cmp(&a.0));
}
let (banks, bank_forks_info): (Vec<_>, Vec<_>) = fork_info.into_iter().unzip();
let bank_forks = BankForks::new_from_banks(&banks);
info!(
"processed ledger in {}ms, forks={}...",
duration_as_ms(&now.elapsed()),
@ -275,7 +298,7 @@ mod tests {
let blocktree = Blocktree::open_config(&ledger_path, ticks_per_slot)
.expect("Expected to successfully open database ledger");
let expected_last_entry_id;
let expected_last_entry_id = last_id;
// Write slot 1
// slot 1, points at slot 0. Missing one tick
@ -286,7 +309,6 @@ mod tests {
last_id = entries.last().unwrap().id;
entries.pop();
expected_last_entry_id = entries.last().unwrap().id;
let blobs = entries_to_blobs(&entries, slot, parent_slot);
blocktree.insert_data_blobs(blobs.iter()).unwrap();
@ -303,9 +325,9 @@ mod tests {
bank_forks_info[0],
BankForksInfo {
bank_id: 1, // never finished first slot
entry_height: 2 * ticks_per_slot - 1,
entry_height: ticks_per_slot,
last_entry_id: expected_last_entry_id,
next_blob_index: ticks_per_slot - 1,
next_blob_index: 0,
}
);
}
@ -379,7 +401,7 @@ mod tests {
// Ensure bank_forks holds the right banks
for info in bank_forks_info {
assert_eq!(bank_forks[info.bank_id].last_id(), info.last_entry_id)
assert_eq!(bank_forks[info.bank_id].slot(), info.bank_id);
}
}
@ -488,6 +510,28 @@ mod tests {
assert_eq!(bank.last_id(), entries.last().unwrap().id);
}
#[test]
fn test_process_ledger_with_one_tick_per_slot() {
let (mut genesis_block, _mint_keypair) = GenesisBlock::new(123);
genesis_block.ticks_per_slot = 1;
let (ledger_path, _last_id) = create_new_tmp_ledger!(&genesis_block);
let blocktree = Blocktree::open(&ledger_path).unwrap();
let (bank_forks, bank_forks_info) =
process_blocktree(&genesis_block, &blocktree, None).unwrap();
assert_eq!(bank_forks_info.len(), 1);
assert_eq!(
bank_forks_info[0],
BankForksInfo {
bank_id: 0,
entry_height: 1,
}
);
let bank = bank_forks[0].clone();
assert_eq!(bank.tick_height(), 0);
}
#[test]
fn test_par_process_entries_tick() {
let (genesis_block, _mint_keypair) = GenesisBlock::new(1000);

View File

@ -195,6 +195,7 @@ impl ReplayStage {
bank_forks_info[0].next_blob_index,
)
};
assert_eq!(bank.last_id(), last_entry_id); // TODO: remove last_entry_id, this assert proves it's unnecessary
// Update Tpu and other fullnode components with the current bank
let (mut current_slot, mut current_leader_id, mut max_tick_height_for_slot) = {
@ -213,6 +214,7 @@ impl ReplayStage {
}
current_blob_index = 0;
}
assert_eq!(current_blob_index, 0); // TODO: remove next_blob_index, this assert proves it's unnecessary
// Send a rotation notification back to Fullnode to initialize the TPU to the right
// state. After this point, the bank.tick_height() is live, which it means it can

View File

@ -203,7 +203,6 @@ pub mod tests {
use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
#[test]
fn test_tvu_exit() {
@ -219,7 +218,7 @@ pub mod tests {
let bank_forks_info = vec![BankForksInfo {
bank_id: 0,
entry_height: 0,
last_entry_id: Hash::default(),
last_entry_id: bank_forks.working_bank().last_id(),
next_blob_index: 0,
}];

View File

@ -4,6 +4,7 @@ extern crate solana;
use log::*;
use solana::blob_fetch_stage::BlobFetchStage;
use solana::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree, Blocktree};
use solana::blocktree_processor;
use solana::client::mk_client;
use solana::cluster_info::{Node, NodeInfo};
use solana::contact_info::ContactInfo;
@ -43,6 +44,68 @@ fn read_ledger(ledger_path: &str, ticks_per_slot: u64) -> Vec<Entry> {
.collect()
}
#[test]
fn test_start_with_partial_slot_in_ledger() {
solana_logger::setup();
let leader_keypair = Arc::new(Keypair::new());
let ticks_per_slot = 4;
let (mut genesis_block, _mint_keypair) =
GenesisBlock::new_with_leader(10_000, leader_keypair.pubkey(), 500);
genesis_block.ticks_per_slot = ticks_per_slot;
for i in 0..ticks_per_slot {
info!("Ledger will contain {} ticks in slot 1...", i);
let (ledger_path, last_id) = create_new_tmp_ledger!(&genesis_block);
// Write `i` extra ticks into ledger to create a partially filled slot
{
let blocktree = Blocktree::open_config(&ledger_path, ticks_per_slot).unwrap();
let entries = solana::entry::create_ticks(i, last_id);
blocktree.write_entries(1, 0, 0, &entries).unwrap();
}
let leader = Fullnode::new(
Node::new_localhost_with_pubkey(leader_keypair.pubkey()),
&leader_keypair,
&ledger_path,
VotingKeypair::new_local(&leader_keypair),
None,
&FullnodeConfig::default(),
);
let (rotation_sender, rotation_receiver) = channel();
let leader_exit = leader.run(Some(rotation_sender));
// Wait for the fullnode to rotate twice, indicating that it was able to ingest the ledger
// and work with it
assert_eq!(
(FullnodeReturnType::LeaderToLeaderRotation, 1),
rotation_receiver.recv().unwrap()
);
assert_eq!(
(FullnodeReturnType::LeaderToLeaderRotation, 2),
rotation_receiver.recv().unwrap()
);
info!("Pass");
leader_exit();
// Ensure the ledger is still valid
{
let blocktree = Blocktree::open_config(&ledger_path, ticks_per_slot).unwrap();
let (_bank_forks, bank_forks_info) =
blocktree_processor::process_blocktree(&genesis_block, &blocktree, None).unwrap();
assert_eq!(bank_forks_info.len(), 1);
// The node processed two slots, ensure entry_height reflects that
assert!(bank_forks_info[0].entry_height >= ticks_per_slot * 2);
}
remove_dir_all(ledger_path).unwrap();
}
}
#[test]
fn test_multi_node_ledger_window() -> result::Result<()> {
solana_logger::setup();

View File

@ -19,7 +19,6 @@ use solana::tvu::{Sockets, Tvu};
use solana::voting_keypair::VotingKeypair;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use std::fs::remove_dir_all;
@ -86,8 +85,9 @@ fn test_replay() {
let ticks_per_slot = genesis_block.ticks_per_slot;
let tvu_addr = target1.info.tvu;
let mut cur_hash = Hash::default();
let bank_forks = BankForks::new(0, Bank::new(&genesis_block));
let bank = Bank::new(&genesis_block);
let mut cur_hash = bank.last_id();
let bank_forks = BankForks::new(0, bank);
let bank_forks_info = vec![BankForksInfo {
bank_id: 0,
entry_height: 0,