Add a way to make a DAG of checkpointed Banks

This commit is contained in:
Greg Fitzgerald 2019-02-16 20:58:07 -07:00
parent d68b6ea7b1
commit 9fa8105ae8
6 changed files with 143 additions and 16 deletions

View File

@ -98,6 +98,8 @@ pub struct Bank {
last_id_queue: RwLock<LastIdQueue>,
subscriptions: RwLock<Option<Arc<RpcSubscriptions>>>,
parent: Option<Arc<Bank>>,
}
impl Default for Bank {
@ -107,6 +109,7 @@ impl Default for Bank {
last_id_queue: RwLock::new(LastIdQueue::default()),
status_cache: RwLock::new(BankStatusCache::default()),
subscriptions: RwLock::new(None),
parent: None,
}
}
}
@ -119,6 +122,17 @@ impl Bank {
bank
}
pub fn new_from_parent(parent: Arc<Bank>) -> Self {
let mut bank = Self::default();
bank.parent = Some(parent);
bank
}
/// Return the more recent checkpoint of this bank instance.
pub fn parent(&self) -> Option<Arc<Bank>> {
self.parent.clone()
}
pub fn set_subscriptions(&self, subscriptions: Arc<RpcSubscriptions>) {
let mut sub = self.subscriptions.write().unwrap();
*sub = Some(subscriptions)
@ -130,6 +144,7 @@ impl Bank {
status_cache: RwLock::new(self.status_cache.read().unwrap().clone()),
last_id_queue: RwLock::new(self.last_id_queue.read().unwrap().clone()),
subscriptions: RwLock::new(None),
parent: self.parent.clone(),
}
}

74
src/bank_forks.rs Normal file
View File

@ -0,0 +1,74 @@
//! The `bank_forks` module implments BankForks a DAG of checkpointed Banks
use crate::bank::Bank;
use std::collections::HashMap;
use std::sync::Arc;
pub struct BankForks {
working_bank_id: u64,
banks: HashMap<u64, Arc<Bank>>,
}
impl BankForks {
pub fn new(bank: Bank) -> Self {
let mut banks = HashMap::new();
let working_bank_id = bank.tick_height();
banks.insert(working_bank_id, Arc::new(bank));
Self {
working_bank_id,
banks,
}
}
pub fn working_bank(&self) -> Arc<Bank> {
self.banks[&self.working_bank_id].clone()
}
pub fn finalized_bank(&self) -> Arc<Bank> {
let mut bank = self.working_bank();
while let Some(parent) = bank.parent() {
bank = parent;
}
bank
}
pub fn insert(&mut self, bank: Bank) -> u64 {
let bank_id = bank.tick_height();
self.banks.insert(bank_id, Arc::new(bank));
bank_id
}
pub fn set_working_bank_id(&mut self, bank_id: u64) {
if self.banks.contains_key(&bank_id) {
self.working_bank_id = bank_id;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use solana_sdk::hash::Hash;
#[test]
fn test_bank_forks_root() {
let bank = Bank::default();
let tick_height = bank.tick_height();
let bank_forks = BankForks::new(bank);
assert_eq!(bank_forks.working_bank().tick_height(), tick_height);
assert_eq!(bank_forks.finalized_bank().tick_height(), tick_height);
}
#[test]
fn test_bank_forks_parent() {
let bank = Bank::default();
let finalized_bank_id = bank.tick_height();
let mut bank_forks = BankForks::new(bank);
let child_bank = Bank::new_from_parent(bank_forks.working_bank());
child_bank.register_tick(&Hash::default());
let child_bank_id = bank_forks.insert(child_bank);
bank_forks.set_working_bank_id(child_bank_id);
assert_eq!(bank_forks.working_bank().tick_height(), child_bank_id);
assert_eq!(bank_forks.finalized_bank().tick_height(), finalized_bank_id);
}
}

View File

@ -1,4 +1,5 @@
use crate::bank::{Bank, BankError, Result};
use crate::bank_forks::BankForks;
use crate::blocktree::Blocktree;
use crate::counter::Counter;
use crate::entry::{Entry, EntrySlice};
@ -188,12 +189,12 @@ where
}
pub fn process_blocktree(
bank: &Bank,
bank_forks: &BankForks,
blocktree: &Blocktree,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<(u64, Hash)> {
let entries = blocktree.read_ledger().expect("opening ledger");
process_ledger(&bank, entries, leader_scheduler)
process_ledger(&bank_forks.working_bank(), entries, leader_scheduler)
}
#[cfg(test)]

View File

@ -1,6 +1,7 @@
//! The `fullnode` module hosts all the fullnode microservices.
use crate::bank::Bank;
use crate::bank_forks::BankForks;
use crate::blocktree::{Blocktree, BlocktreeConfig};
use crate::blocktree_processor;
use crate::cluster_info::{ClusterInfo, Node, NodeInfo};
@ -143,7 +144,6 @@ impl Fullnode {
);
let exit = Arc::new(AtomicBool::new(false));
let bank = Arc::new(bank);
let blocktree = Arc::new(blocktree);
node.info.wallclock = timestamp();
@ -456,36 +456,75 @@ impl Fullnode {
}
#[allow(clippy::trivially_copy_pass_by_ref)]
pub fn new_bank_from_ledger(
ledger_path: &str,
ledger_config: &BlocktreeConfig,
fn new_banks_from_blocktree(
blocktree_path: &str,
blocktree_config: &BlocktreeConfig,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> (Bank, u64, Hash, Blocktree, SyncSender<bool>, Receiver<bool>) {
) -> (
BankForks,
u64,
Hash,
Blocktree,
SyncSender<bool>,
Receiver<bool>,
) {
let (blocktree, ledger_signal_sender, ledger_signal_receiver) =
Blocktree::open_with_config_signal(ledger_path, ledger_config)
Blocktree::open_with_config_signal(blocktree_path, blocktree_config)
.expect("Expected to successfully open database ledger");
let genesis_block =
GenesisBlock::load(ledger_path).expect("Expected to successfully open genesis block");
GenesisBlock::load(blocktree_path).expect("Expected to successfully open genesis block");
let bank = Bank::new(&genesis_block);
let bank_forks = BankForks::new(bank);
leader_scheduler
.write()
.unwrap()
.update_tick_height(0, &bank);
.update_tick_height(0, &bank_forks.finalized_bank());
let now = Instant::now();
info!("processing ledger...");
let (entry_height, last_entry_id) =
blocktree_processor::process_blocktree(&bank, &blocktree, leader_scheduler)
blocktree_processor::process_blocktree(&bank_forks, &blocktree, leader_scheduler)
.expect("process_blocktree");
info!(
"processed {} ledger entries in {}ms, tick_height={}...",
entry_height,
duration_as_ms(&now.elapsed()),
bank.tick_height()
bank_forks.working_bank().tick_height()
);
(
bank,
bank_forks,
entry_height,
last_entry_id,
blocktree,
ledger_signal_sender,
ledger_signal_receiver,
)
}
#[allow(clippy::trivially_copy_pass_by_ref)]
pub fn new_bank_from_ledger(
ledger_path: &str,
ledger_config: &BlocktreeConfig,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> (
Arc<Bank>,
u64,
Hash,
Blocktree,
SyncSender<bool>,
Receiver<bool>,
) {
let (
bank_forks,
entry_height,
last_entry_id,
blocktree,
ledger_signal_sender,
ledger_signal_receiver,
) = new_banks_from_blocktree(ledger_path, ledger_config, leader_scheduler);
(
bank_forks.working_bank(),
entry_height,
last_entry_id,
blocktree,

View File

@ -11,6 +11,7 @@
pub mod counter;
pub mod accounts;
pub mod bank;
pub mod bank_forks;
pub mod banking_stage;
pub mod blob_fetch_stage;
pub mod bloom;

View File

@ -467,7 +467,6 @@ mod test {
let (rotation_sender, rotation_receiver) = channel();
let meta = blocktree.meta(0).unwrap().unwrap();
let exit = Arc::new(AtomicBool::new(false));
let bank = Arc::new(bank);
let blocktree = Arc::new(blocktree);
let (replay_stage, ledger_writer_recv) = ReplayStage::new(
my_id,
@ -573,7 +572,6 @@ mod test {
&leader_scheduler,
);
let bank = Arc::new(bank);
let blocktree = Arc::new(blocktree);
let (replay_stage, ledger_writer_recv) = ReplayStage::new(
my_keypair.pubkey(),
@ -700,7 +698,6 @@ mod test {
.expect("First slot metadata must exist");
let voting_keypair = Arc::new(voting_keypair);
let bank = Arc::new(bank);
let blocktree = Arc::new(blocktree);
let (replay_stage, ledger_writer_recv) = ReplayStage::new(
my_keypair.pubkey(),