Route BankForks into the ReplayStage

This commit is contained in:
Michael Vines 2019-02-21 11:37:48 -08:00 committed by Grimes
parent f0f55af35b
commit b501090443
6 changed files with 209 additions and 201 deletions

View File

@ -13,13 +13,11 @@ use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::storage_stage::StorageState;
use crate::tpu::Tpu;
use crate::tvu::{Sockets, Tvu, TvuRotationReceiver};
use crate::tvu::{Sockets, Tvu, TvuRotationInfo, TvuRotationReceiver};
use crate::voting_keypair::VotingKeypair;
use log::Level;
use solana_metrics::counter::Counter;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::timestamp;
@ -100,7 +98,6 @@ pub struct Fullnode {
rpc_service: Option<JsonRpcService>,
rpc_pubsub_service: Option<PubSubService>,
gossip_service: GossipService,
bank_forks: Arc<RwLock<BankForks>>,
sigverify_disabled: bool,
tpu_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket,
@ -224,46 +221,10 @@ impl Fullnode {
// Setup channel for rotation indications
let (rotation_sender, rotation_receiver) = channel();
// TODO: All this into Tpu/ReplayStage....
if bank_forks_info.len() != 1 {
warn!("TODO: figure out what to do with multiple bank forks");
}
let (bank, entry_height, last_entry_id) = {
let mut bank_forks = bank_forks.write().unwrap();
bank_forks.set_working_bank_id(bank_forks_info[0].bank_id);
(
bank_forks.working_bank(),
bank_forks_info[0].entry_height,
bank_forks_info[0].last_entry_id,
)
};
// Figure which node should generate the next tick
let (next_leader, next_slot) = {
let next_tick = bank.tick_height() + 1;
let leader_scheduler = leader_scheduler.read().unwrap();
let next_slot = leader_scheduler.tick_height_to_slot(next_tick);
let next_leader = leader_scheduler
.get_leader_for_slot(next_slot)
.expect("Leader not known after processing bank");
trace!(
"node {:?} scheduled as leader for slot {}",
next_leader,
next_slot,
);
(next_leader, next_slot)
};
// END TODO
let tvu = Tvu::new(
voting_keypair_option,
&bank_forks,
entry_height,
last_entry_id,
&bank_forks_info,
&cluster_info,
sockets,
blocktree.clone(),
@ -277,9 +238,9 @@ impl Fullnode {
);
let tpu = Tpu::new(id, &cluster_info);
let mut fullnode = Self {
inc_new_counter_info!("fullnode-new", 1);
Self {
id,
bank_forks,
sigverify_disabled: config.sigverify_disabled,
gossip_service,
rpc_service: Some(rpc_service),
@ -291,22 +252,19 @@ impl Fullnode {
rotation_receiver,
blocktree,
leader_scheduler,
};
// TODO: This first rotate should come from the Tvu/ReplayStage
fullnode.rotate(&bank, next_leader, next_slot, &last_entry_id);
inc_new_counter_info!("fullnode-new", 1);
fullnode
}
}
fn rotate(
&mut self,
bank: &Arc<Bank>,
leader: Pubkey,
slot: u64,
last_entry_id: &Hash,
) -> FullnodeReturnType {
if leader == self.id {
fn rotate(&mut self, rotation_info: TvuRotationInfo) -> FullnodeReturnType {
trace!(
"{:?}: rotate for slot={} to leader={:?} using last_entry_id={:?}",
self.id,
rotation_info.slot,
rotation_info.leader_id,
rotation_info.last_entry_id,
);
if rotation_info.leader_id == self.id {
let transition = match self.node_services.tpu.is_leader() {
Some(was_leader) => {
if was_leader {
@ -319,9 +277,8 @@ impl Fullnode {
}
None => FullnodeReturnType::LeaderToLeaderRotation, // value doesn't matter here...
};
let tpu_bank = Arc::new(Bank::new_from_parent(bank, &leader));
self.node_services.tpu.switch_to_leader(
&tpu_bank,
Arc::new(rotation_info.bank),
PohServiceConfig::default(),
self.tpu_sockets
.iter()
@ -331,8 +288,8 @@ impl Fullnode {
.try_clone()
.expect("Failed to clone broadcast socket"),
self.sigverify_disabled,
slot,
last_entry_id,
rotation_info.slot,
rotation_info.last_entry_id,
&self.blocktree,
&self.leader_scheduler,
);
@ -340,7 +297,7 @@ impl Fullnode {
} else {
debug!("{:?} rotating to validator role", self.id);
self.node_services.tpu.switch_to_forwarder(
leader,
rotation_info.leader_id,
self.tpu_sockets
.iter()
.map(|s| s.try_clone().expect("Failed to clone TPU sockets"))
@ -368,20 +325,9 @@ impl Fullnode {
}
match self.rotation_receiver.recv_timeout(timeout) {
Ok((bank_id, slot, leader)) => {
trace!(
"{:?}: rotate for slot={} to leader={:?} using bank={}",
self.id,
slot,
leader,
bank_id
);
// TODO: Uncomment next line once `bank_id` has a valid id in it
//self.bank_forks.write().set_working_bank_id(bank_id);
let bank = self.bank_forks.read().unwrap().working_bank();
let transition = self.rotate(&bank, leader, slot, &bank.last_id());
Ok(rotation_info) => {
let slot = rotation_info.slot;
let transition = self.rotate(rotation_info);
debug!("role transition complete: {:?}", transition);
if let Some(ref rotation_notifier) = rotation_notifier {
rotation_notifier.send((transition, slot)).unwrap();
@ -417,7 +363,7 @@ impl Fullnode {
}
#[allow(clippy::trivially_copy_pass_by_ref)]
fn new_banks_from_blocktree(
pub fn new_banks_from_blocktree(
blocktree_path: &str,
blocktree_config: &BlocktreeConfig,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
@ -440,34 +386,6 @@ fn new_banks_from_blocktree(
)
}
// TODO: Remove this function from tests
#[allow(clippy::trivially_copy_pass_by_ref)]
pub fn new_bank_from_ledger(
ledger_path: &str,
ledger_config: &BlocktreeConfig,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> (Arc<Bank>, u64, Hash, Blocktree, Receiver<bool>) {
let (mut bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) =
new_banks_from_blocktree(ledger_path, ledger_config, leader_scheduler);
// This helper won't handle multiple banks
assert_eq!(bank_forks_info.len(), 1);
bank_forks.set_working_bank_id(bank_forks_info[0].bank_id);
let (working_bank, entry_height, last_entry_id) = (
bank_forks.working_bank(),
bank_forks_info[0].entry_height,
bank_forks_info[0].last_entry_id,
);
(
working_bank,
entry_height,
last_entry_id,
blocktree,
ledger_signal_receiver,
)
}
impl Service for Fullnode {
type JoinReturnType = ();
@ -494,6 +412,7 @@ mod tests {
use crate::entry::make_consecutive_blobs;
use crate::leader_scheduler::make_active_set_entries;
use crate::streamer::responder;
use solana_sdk::hash::Hash;
use std::fs::remove_dir_all;
#[test]
@ -634,6 +553,10 @@ mod tests {
// Wait for the bootstrap leader to transition. Since there are no other nodes in the
// cluster it will continue to be the leader
assert_eq!(
rotation_receiver.recv().unwrap(),
(FullnodeReturnType::LeaderToLeaderRotation, 0)
);
assert_eq!(
rotation_receiver.recv().unwrap(),
(FullnodeReturnType::LeaderToLeaderRotation, 1)
@ -689,8 +612,12 @@ mod tests {
Some(&bootstrap_leader_info),
&fullnode_config,
);
assert!(!bootstrap_leader.node_services.tpu.is_leader().unwrap());
let (rotation_sender, rotation_receiver) = channel();
let bootstrap_leader_exit = bootstrap_leader.run(Some(rotation_sender));
assert_eq!(
rotation_receiver.recv().unwrap(),
(FullnodeReturnType::LeaderToValidatorRotation, 2)
);
// Test that a node knows to transition to a leader based on parsing the ledger
let validator = Fullnode::new(
@ -702,11 +629,15 @@ mod tests {
&fullnode_config,
);
assert!(validator.node_services.tpu.is_leader().unwrap());
validator.close().expect("Expected leader node to close");
bootstrap_leader
.close()
.expect("Expected validator node to close");
let (rotation_sender, rotation_receiver) = channel();
let validator_exit = validator.run(Some(rotation_sender));
assert_eq!(
rotation_receiver.recv().unwrap(),
(FullnodeReturnType::LeaderToLeaderRotation, 2)
);
validator_exit();
bootstrap_leader_exit();
}
for path in ledger_paths {
Blocktree::destroy(&path).expect("Expected successful database destruction");
@ -792,11 +723,13 @@ mod tests {
// Close the validator so that rocksdb has locks available
validator_exit();
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default()));
let (bank, entry_height, _, _, _) = new_bank_from_ledger(
let (bank_forks, bank_forks_info, _, _) = new_banks_from_blocktree(
&validator_ledger_path,
&BlocktreeConfig::default(),
&leader_scheduler,
);
let bank = bank_forks.working_bank();
let entry_height = bank_forks_info[0].entry_height;
assert!(bank.tick_height() >= leader_scheduler.read().unwrap().ticks_per_epoch);

View File

@ -1,7 +1,8 @@
//! The `replay_stage` replays transactions broadcast by the leader.
use crate::bank_forks::BankForks;
use crate::blocktree::Blocktree;
use crate::blocktree_processor;
use crate::blocktree_processor::{self, BankForksInfo};
use crate::cluster_info::ClusterInfo;
use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice};
use crate::leader_scheduler::LeaderScheduler;
@ -9,7 +10,7 @@ use crate::packet::BlobError;
use crate::result::{Error, Result};
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::tvu::TvuRotationSender;
use crate::tvu::{TvuRotationInfo, TvuRotationSender};
use crate::voting_keypair::VotingKeypair;
use log::Level;
use solana_metrics::counter::Counter;
@ -172,10 +173,10 @@ impl ReplayStage {
my_id: Pubkey,
voting_keypair: Option<Arc<VotingKeypair>>,
blocktree: Arc<Blocktree>,
bank: Arc<Bank>,
bank_forks: &Arc<RwLock<BankForks>>,
bank_forks_info: &[BankForksInfo],
cluster_info: Arc<RwLock<ClusterInfo>>,
exit: Arc<AtomicBool>,
last_entry_id: Hash,
to_leader_sender: &TvuRotationSender,
ledger_signal_receiver: Receiver<bool>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
@ -185,9 +186,42 @@ impl ReplayStage {
let exit_ = exit.clone();
let leader_scheduler_ = leader_scheduler.clone();
let to_leader_sender = to_leader_sender.clone();
let last_entry_id = Arc::new(RwLock::new(last_entry_id));
let subscriptions_ = subscriptions.clone();
let (bank, last_entry_id) = {
let mut bank_forks = bank_forks.write().unwrap();
bank_forks.set_working_bank_id(bank_forks_info[0].bank_id);
(bank_forks.working_bank(), bank_forks_info[0].last_entry_id)
};
let last_entry_id = Arc::new(RwLock::new(last_entry_id));
let mut current_blob_index = {
let leader_scheduler = leader_scheduler.read().unwrap();
let slot = leader_scheduler.tick_height_to_slot(bank.tick_height() + 1);
let leader_id = leader_scheduler
.get_leader_for_slot(slot)
.expect("Leader not known after processing bank");
trace!("node {:?} scheduled as leader for slot {}", leader_id, slot,);
// Send a rotation notification back to Fullnode to initialize the TPU to the right
// state
to_leader_sender
.send(TvuRotationInfo {
bank: Bank::new_from_parent(&bank, &leader_id),
last_entry_id: *last_entry_id.read().unwrap(),
slot,
leader_id,
})
.unwrap();
blocktree
.meta(slot)
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0)
};
let t_replay = Builder::new()
.name("solana-replay-stage".to_string())
.spawn(move || {
@ -209,11 +243,6 @@ impl ReplayStage {
+ leader_scheduler.num_ticks_left_in_slot(first_tick_in_current_slot),
)
};
let mut current_blob_index = blocktree
.meta(current_slot.unwrap())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);
// Loop through blocktree MAX_ENTRY_RECV_PER_ITER entries at a time for each
// relevant slot to see if there are any available updates
@ -298,10 +327,12 @@ impl ReplayStage {
if my_id == leader_id || my_id == last_leader_id {
to_leader_sender
.send((
0, // TODO: fix hard coded bank_id
next_slot, leader_id,
))
.send(TvuRotationInfo {
bank: Bank::new_from_parent(&bank, &leader_id),
last_entry_id: *last_entry_id.read().unwrap(),
slot: next_slot,
leader_id,
})
.unwrap();
} else if leader_id != last_leader_id {
// TODO: Remove this soon once we boot the leader from ClusterInfo
@ -355,7 +386,7 @@ mod test {
use crate::cluster_info::{ClusterInfo, Node};
use crate::entry::create_ticks;
use crate::entry::{next_entry_mut, Entry};
use crate::fullnode::new_bank_from_ledger;
use crate::fullnode::new_banks_from_blocktree;
use crate::leader_scheduler::{
make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig,
};
@ -438,24 +469,23 @@ mod test {
{
// Set up the bank
let blocktree_config = BlocktreeConfig::new(ticks_per_slot);
let (bank, _entry_height, last_entry_id, blocktree, l_receiver) =
new_bank_from_ledger(&my_ledger_path, &blocktree_config, &leader_scheduler);
let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) =
new_banks_from_blocktree(&my_ledger_path, &blocktree_config, &leader_scheduler);
// Set up the replay stage
let (rotation_sender, rotation_receiver) = channel();
let meta = blocktree.meta(0).unwrap().unwrap();
let exit = Arc::new(AtomicBool::new(false));
let blocktree = Arc::new(blocktree);
let (replay_stage, ledger_writer_recv) = ReplayStage::new(
my_id,
Some(Arc::new(voting_keypair)),
blocktree.clone(),
bank.clone(),
&Arc::new(RwLock::new(bank_forks)),
&bank_forks_info,
Arc::new(RwLock::new(cluster_info_me)),
exit.clone(),
last_entry_id,
&rotation_sender,
l_receiver,
ledger_signal_receiver,
&leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
);
@ -468,6 +498,7 @@ mod test {
}
// Write the entries to the ledger, replay_stage should get notified of changes
let meta = blocktree.meta(0).unwrap().unwrap();
blocktree
.write_entries(
DEFAULT_SLOT_HEIGHT,
@ -478,12 +509,15 @@ mod test {
.unwrap();
info!("Wait for replay_stage to exit and check return value is correct");
let rotation_info = rotation_receiver
.recv()
.expect("should have signaled leader rotation");
assert_eq!(
(0, 2, my_keypair.pubkey()),
rotation_receiver
.recv()
.expect("should have signaled leader rotation"),
rotation_info.last_entry_id,
bank_forks_info[0].last_entry_id
);
assert_eq!(rotation_info.slot, 2);
assert_eq!(rotation_info.leader_id, my_keypair.pubkey());
info!("Check that the entries on the ledger writer channel are correct");
let mut received_ticks = ledger_writer_recv
@ -539,24 +573,27 @@ mod test {
let exit = Arc::new(AtomicBool::new(false));
let my_keypair = Arc::new(my_keypair);
let voting_keypair = Arc::new(VotingKeypair::new_local(&my_keypair));
let (to_leader_sender, _) = channel();
let (to_leader_sender, _to_leader_receiver) = channel();
{
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default()));
let (bank, entry_height, last_entry_id, blocktree, l_receiver) = new_bank_from_ledger(
let (bank_forks, bank_forks_info, blocktree, l_receiver) = new_banks_from_blocktree(
&my_ledger_path,
&BlocktreeConfig::default(),
&leader_scheduler,
);
let bank = bank_forks.working_bank();
let entry_height = bank_forks_info[0].entry_height;
let last_entry_id = bank_forks_info[0].last_entry_id;
let blocktree = Arc::new(blocktree);
let (replay_stage, ledger_writer_recv) = ReplayStage::new(
my_keypair.pubkey(),
Some(voting_keypair.clone()),
blocktree.clone(),
bank.clone(),
&Arc::new(RwLock::new(bank_forks)),
&bank_forks_info,
cluster_info_me.clone(),
exit.clone(),
last_entry_id,
&to_leader_sender,
l_receiver,
&leader_scheduler,
@ -661,12 +698,12 @@ mod test {
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
// Set up the replay stage
let (rotation_tx, rotation_rx) = channel();
let (rotation_sender, rotation_receiver) = channel();
let exit = Arc::new(AtomicBool::new(false));
{
let (bank, _entry_height, last_entry_id, blocktree, l_receiver) =
new_bank_from_ledger(&my_ledger_path, &blocktree_config, &leader_scheduler);
let (bank_forks, bank_forks_info, blocktree, l_receiver) =
new_banks_from_blocktree(&my_ledger_path, &blocktree_config, &leader_scheduler);
let bank = bank_forks.working_bank();
let meta = blocktree
.meta(0)
.unwrap()
@ -678,11 +715,11 @@ mod test {
my_keypair.pubkey(),
Some(voting_keypair.clone()),
blocktree.clone(),
bank.clone(),
&Arc::new(RwLock::new(bank_forks)),
&bank_forks_info,
cluster_info_me.clone(),
exit.clone(),
last_entry_id,
&rotation_tx,
&rotation_sender,
l_receiver,
&leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
@ -720,12 +757,15 @@ mod test {
}
// Wait for replay_stage to exit and check return value is correct
let rotation_info = rotation_receiver
.recv()
.expect("should have signaled leader rotation");
assert_eq!(
(0, 1, my_keypair.pubkey()),
rotation_rx
.recv()
.expect("should have signaled leader rotation")
rotation_info.last_entry_id,
bank_forks_info[0].last_entry_id
);
assert_eq!(rotation_info.slot, 1);
assert_eq!(rotation_info.leader_id, my_keypair.pubkey());
assert_ne!(expected_last_id, Hash::default());
//replay stage should continue running even after rotation has happened (tvu never goes down)

View File

@ -191,13 +191,13 @@ impl Tpu {
#[allow(clippy::too_many_arguments)]
pub fn switch_to_leader(
&mut self,
bank: &Arc<Bank>,
bank: Arc<Bank>,
tick_duration: PohServiceConfig,
transactions_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket,
sigverify_disabled: bool,
slot: u64,
last_entry_id: &Hash,
last_entry_id: Hash,
blocktree: &Arc<Blocktree>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) {
@ -234,13 +234,13 @@ impl Tpu {
&bank,
verified_receiver,
tick_duration,
last_entry_id,
&last_entry_id,
max_tick_height,
self.id,
);
let broadcast_service = BroadcastService::new(
bank.clone(),
bank,
broadcast_socket,
self.cluster_info.clone(),
blob_index,

View File

@ -16,6 +16,7 @@ use crate::bank_forks::BankForks;
use crate::blob_fetch_stage::BlobFetchStage;
use crate::blockstream_service::BlockstreamService;
use crate::blocktree::Blocktree;
use crate::blocktree_processor::BankForksInfo;
use crate::cluster_info::ClusterInfo;
use crate::leader_scheduler::LeaderScheduler;
use crate::replay_stage::ReplayStage;
@ -24,6 +25,7 @@ use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::storage_stage::{StorageStage, StorageState};
use crate::voting_keypair::VotingKeypair;
use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
@ -33,13 +35,14 @@ use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, RwLock};
use std::thread;
pub type TvuReturnType = (
u64, // bank_id,
u64, // slot height to initiate a rotation
Pubkey, // leader upon rotation
);
pub type TvuRotationSender = Sender<TvuReturnType>;
pub type TvuRotationReceiver = Receiver<TvuReturnType>;
pub struct TvuRotationInfo {
pub bank: Bank, // Bank to use
pub last_entry_id: Hash, // last_entry_id of that bank
pub slot: u64, // slot height to initiate a rotation
pub leader_id: Pubkey, // leader upon rotation
}
pub type TvuRotationSender = Sender<TvuRotationInfo>;
pub type TvuRotationReceiver = Receiver<TvuRotationInfo>;
pub struct Tvu {
fetch_stage: BlobFetchStage,
@ -60,18 +63,14 @@ impl Tvu {
/// This service receives messages from a leader in the network and processes the transactions
/// on the bank state.
/// # Arguments
/// * `bank` - The bank state.
/// * `entry_height` - Initial ledger height
/// * `last_entry_id` - Hash of the last entry
/// * `cluster_info` - The cluster_info state.
/// * `sockets` - My fetch, repair, and restransmit sockets
/// * `sockets` - fetch, repair, and retransmit sockets
/// * `blocktree` - the ledger itself
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new(
voting_keypair: Option<Arc<VotingKeypair>>,
bank_forks: &Arc<RwLock<BankForks>>,
entry_height: u64,
last_entry_id: Hash,
bank_forks_info: &[BankForksInfo],
cluster_info: &Arc<RwLock<ClusterInfo>>,
sockets: Sockets,
blocktree: Arc<Blocktree>,
@ -119,15 +118,14 @@ impl Tvu {
exit.clone(),
);
let bank = bank_forks.read().unwrap().working_bank();
let (replay_stage, mut previous_receiver) = ReplayStage::new(
keypair.pubkey(),
voting_keypair,
blocktree.clone(),
bank.clone(),
&bank_forks,
&bank_forks_info,
cluster_info.clone(),
exit.clone(),
last_entry_id,
to_leader_sender,
ledger_signal_receiver,
&leader_scheduler,
@ -138,7 +136,7 @@ impl Tvu {
let (blockstream_service, blockstream_receiver) = BlockstreamService::new(
previous_receiver,
blockstream.unwrap().to_string(),
bank.tick_height(),
bank_forks.read().unwrap().working_bank().tick_height(), // TODO: BlockstreamService needs to deal with BankForks somehow still
leader_scheduler,
exit.clone(),
);
@ -154,7 +152,7 @@ impl Tvu {
Some(blocktree),
&keypair,
&exit.clone(),
entry_height,
bank_forks_info[0].entry_height, // TODO: StorageStage needs to deal with BankForks somehow still
storage_rotate_count,
&cluster_info,
);
@ -210,6 +208,7 @@ pub mod tests {
use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
#[test]
fn test_tvu_exit() {
@ -222,6 +221,11 @@ pub mod tests {
let (genesis_block, _mint_keypair) = GenesisBlock::new(starting_balance);
let bank_forks = BankForks::new(0, Bank::new(&genesis_block));
let bank_forks_info = vec![BankForksInfo {
bank_id: 0,
entry_height: 0,
last_entry_id: Hash::default(),
}];
let leader_scheduler_config = LeaderSchedulerConfig::default();
let leader_scheduler =
LeaderScheduler::new_with_bank(&leader_scheduler_config, &bank_forks.working_bank());
@ -233,7 +237,6 @@ pub mod tests {
cluster_info1.set_leader(leader.info.id);
let cref1 = Arc::new(RwLock::new(cluster_info1));
let cur_hash = Hash::default();
let blocktree_path = get_tmp_ledger_path("test_tvu_exit");
let (blocktree, l_receiver) = Blocktree::open_with_signal(&blocktree_path)
.expect("Expected to successfully open ledger");
@ -243,8 +246,7 @@ pub mod tests {
let tvu = Tvu::new(
Some(Arc::new(voting_keypair)),
&Arc::new(RwLock::new(bank_forks)),
0,
cur_hash,
&bank_forks_info,
&cref1,
{
Sockets {

View File

@ -6,7 +6,7 @@ use solana::blocktree::{
use solana::client::mk_client;
use solana::cluster_info::{Node, NodeInfo};
use solana::entry::{reconstruct_entries_from_blobs, Entry};
use solana::fullnode::{new_bank_from_ledger, Fullnode, FullnodeConfig, FullnodeReturnType};
use solana::fullnode::{new_banks_from_blocktree, Fullnode, FullnodeConfig, FullnodeReturnType};
use solana::gossip_service::{converge, make_listening_node};
use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
use solana::result;
@ -507,8 +507,11 @@ fn test_boot_validator_from_file() -> result::Result<()> {
);
ledger_paths.push(genesis_ledger_path.clone());
let leader_ledger_path =
tmp_copy_ledger(&genesis_ledger_path, "multi_node_basic", &blocktree_config);
let leader_ledger_path = tmp_copy_ledger(
&genesis_ledger_path,
"boot_validator_from_file",
&blocktree_config,
);
ledger_paths.push(leader_ledger_path.clone());
let leader_data = leader.info.clone();
@ -521,12 +524,16 @@ fn test_boot_validator_from_file() -> result::Result<()> {
None,
&fullnode_config,
);
let leader_fullnode_exit = leader_fullnode.run(None);
info!("Sending transaction to leader");
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap();
assert_eq!(leader_balance, 500);
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(1000)).unwrap();
assert_eq!(leader_balance, 1000);
info!("Leader balance verified");
let keypair = Arc::new(Keypair::new());
let validator = Node::new_localhost_with_pubkey(keypair.pubkey());
@ -546,12 +553,16 @@ fn test_boot_validator_from_file() -> result::Result<()> {
Some(&leader_data),
&fullnode_config,
);
let val_fullnode_exit = val_fullnode.run(None);
info!("Checking validator balance");
let mut client = mk_client(&validator_data);
let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance));
assert!(getbal == Some(leader_balance));
info!("Validator balance verified");
val_fullnode.close()?;
leader_fullnode.close()?;
val_fullnode_exit();
leader_fullnode_exit();
for path in ledger_paths {
remove_dir_all(path)?;
@ -604,6 +615,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
let voting_keypair = VotingKeypair::new_local(&leader_keypair);
let (leader_data, leader_fullnode) =
create_leader(&ledger_path, leader_keypair.clone(), voting_keypair);
let leader_fullnode_exit = leader_fullnode.run(None);
// lengthen the ledger
let leader_balance =
@ -612,7 +624,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
assert_eq!(leader_balance, 500);
// restart the leader
leader_fullnode.close()?;
leader_fullnode_exit();
}
// create a "stale" ledger by copying current ledger
@ -626,6 +638,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
let voting_keypair = VotingKeypair::new_local(&leader_keypair);
let (leader_data, leader_fullnode) =
create_leader(&ledger_path, leader_keypair.clone(), voting_keypair);
let leader_fullnode_exit = leader_fullnode.run(None);
// lengthen the ledger
let leader_balance =
@ -634,12 +647,13 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
assert_eq!(leader_balance, 1000);
// restart the leader
leader_fullnode.close()?;
leader_fullnode_exit();
}
let voting_keypair = VotingKeypair::new_local(&leader_keypair);
let (leader_data, leader_fullnode) =
create_leader(&ledger_path, leader_keypair, voting_keypair);
let leader_fullnode_exit = leader_fullnode.run(None);
// start validator from old ledger
let keypair = Arc::new(Keypair::new());
@ -655,6 +669,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
Some(&leader_data),
&fullnode_config,
);
let val_fullnode_exit = val_fullnode.run(None);
// trigger broadcast, validator should catch up from leader, whose window contains
// the entries missing from the stale ledger
@ -677,8 +692,8 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(expected));
assert_eq!(getbal, Some(expected));
val_fullnode.close()?;
leader_fullnode.close()?;
val_fullnode_exit();
leader_fullnode_exit();
remove_dir_all(ledger_path)?;
remove_dir_all(stale_ledger_path)?;
@ -989,7 +1004,10 @@ fn test_leader_to_validator_transition() {
let (rotation_sender, rotation_receiver) = channel();
let leader_exit = leader.run(Some(rotation_sender));
let expected_rotations = vec![(FullnodeReturnType::LeaderToValidatorRotation, 1)];
let expected_rotations = vec![
(FullnodeReturnType::LeaderToLeaderRotation, 0),
(FullnodeReturnType::LeaderToValidatorRotation, 1),
];
for expected_rotation in expected_rotations {
loop {
@ -1004,12 +1022,13 @@ fn test_leader_to_validator_transition() {
leader_exit();
info!("Check the ledger to make sure it's the right height...");
let bank = new_bank_from_ledger(
let bank_forks = new_banks_from_blocktree(
&leader_ledger_path,
&BlocktreeConfig::default(),
&Arc::new(RwLock::new(LeaderScheduler::default())),
)
.0;
let bank = bank_forks.working_bank();
assert_eq!(
bank.tick_height(),
@ -1122,10 +1141,18 @@ fn test_leader_validator_basic() {
converge(&leader_info, 2);
info!("Waiting for slot 0 -> slot 1: bootstrap leader and the validator rotate");
assert_eq!(
leader_rotation_receiver.recv().unwrap(),
(FullnodeReturnType::LeaderToLeaderRotation, 0),
);
assert_eq!(
leader_rotation_receiver.recv().unwrap(),
(FullnodeReturnType::LeaderToValidatorRotation, 1)
);
assert_eq!(
validator_rotation_receiver.recv().unwrap(),
(FullnodeReturnType::LeaderToValidatorRotation, 0)
);
assert_eq!(
validator_rotation_receiver.recv().unwrap(),
(FullnodeReturnType::ValidatorToLeaderRotation, 1)
@ -1501,6 +1528,7 @@ fn test_full_leader_validator_network() {
for node in nodes {
node.1();
}
info!("Bootstrap leader exit");
bootstrap_leader_exit();
let mut node_entries = vec![];
@ -1654,7 +1682,7 @@ fn test_broadcast_last_tick() {
loop {
let transition = bootstrap_leader_rotation_receiver.recv().unwrap();
info!("bootstrap leader transition event: {:?}", transition);
if transition.0 == FullnodeReturnType::LeaderToLeaderRotation {
if (FullnodeReturnType::LeaderToLeaderRotation, 1) == transition {
break;
}
}
@ -1846,7 +1874,7 @@ fn test_fullnode_rotate(
last_entry_id = entries.last().unwrap().id;
}
let mut leader_tick_height_of_next_rotation = ticks_per_slot;
let mut leader_tick_height_of_next_rotation = 0;
let mut leader_should_be_leader = true;
if fullnode_config.leader_scheduler_config.ticks_per_slot == 1 {
// Add another tick to the ledger if the cluster has been configured for 1 tick_per_slot.
@ -1857,7 +1885,7 @@ fn test_fullnode_rotate(
entries.extend(tick);
last_entry_id = entries.last().unwrap().id;
leader_tick_height_of_next_rotation += 2;
leader_tick_height_of_next_rotation = 2;
if include_validator {
leader_should_be_leader = false;
}

View File

@ -1,7 +1,7 @@
use log::trace;
use solana::bank_forks::BankForks;
use solana::blocktree::Blocktree;
use solana::blocktree::{get_tmp_ledger_path, BlocktreeConfig};
use solana::blocktree::{get_tmp_ledger_path, Blocktree, BlocktreeConfig};
use solana::blocktree_processor::BankForksInfo;
use solana::cluster_info::{ClusterInfo, Node};
use solana::entry::next_entry_mut;
use solana::entry::EntrySlice;
@ -86,7 +86,14 @@ fn test_replay() {
let (genesis_block, mint_keypair) = GenesisBlock::new(starting_balance);
let tvu_addr = target1.info.tvu;
let mut cur_hash = Hash::default();
let bank_forks = BankForks::new(0, Bank::new(&genesis_block));
let bank_forks_info = vec![BankForksInfo {
bank_id: 0,
entry_height: 0,
last_entry_id: cur_hash,
}];
let bank = bank_forks.working_bank();
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new_with_bank(
&leader_scheduler_config,
@ -101,20 +108,18 @@ fn test_replay() {
let cref1 = Arc::new(RwLock::new(cluster_info1));
let dr_1 = new_gossip(cref1.clone(), target1.sockets.gossip, exit.clone());
let mut cur_hash = Hash::default();
let blocktree_path = get_tmp_ledger_path("test_replay");
let (blocktree, l_receiver) =
let (blocktree, ledger_signal_receiver) =
Blocktree::open_with_config_signal(&blocktree_path, &blocktree_config)
.expect("Expected to successfully open ledger");
let vote_account_keypair = Arc::new(Keypair::new());
let voting_keypair = VotingKeypair::new_local(&vote_account_keypair);
let (sender, _) = channel();
let (to_leader_sender, _to_leader_receiver) = channel();
let tvu = Tvu::new(
Some(Arc::new(voting_keypair)),
&Arc::new(RwLock::new(bank_forks)),
0,
cur_hash,
&bank_forks_info,
&cref1,
{
Sockets {
@ -125,10 +130,10 @@ fn test_replay() {
},
Arc::new(blocktree),
STORAGE_ROTATE_TEST_COUNT,
&sender,
&to_leader_sender,
&StorageState::default(),
None,
l_receiver,
ledger_signal_receiver,
leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
);