From 36ea08838782db2334bde79bc072155b55a9c59e Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 27 Mar 2019 13:10:33 -0700 Subject: [PATCH] Fix Storage Stage not receiving entries when node is leader (#3528) --- core/src/broadcast_stage.rs | 30 +++++++++++++++++++++++++----- core/src/fullnode.rs | 9 +++++++-- core/src/local_cluster.rs | 4 ++-- core/src/replay_stage.rs | 20 +++++++++----------- core/src/tpu.rs | 3 +++ core/src/tvu.rs | 11 +++++++++-- core/tests/replicator.rs | 23 ++++++++++++++++------- core/tests/tvu.rs | 3 +++ 8 files changed, 74 insertions(+), 29 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 89ca33318e..543e4f483d 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -2,7 +2,7 @@ //! use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}; -use crate::entry::EntrySlice; +use crate::entry::{EntrySender, EntrySlice}; #[cfg(feature = "erasure")] use crate::erasure::CodingGenerator; use crate::packet::index_blobs; @@ -41,6 +41,7 @@ impl Broadcast { receiver: &Receiver, sock: &UdpSocket, blocktree: &Arc, + storage_entry_sender: &EntrySender, ) -> Result<()> { let timer = Duration::new(1, 0); let (mut bank, entries) = receiver.recv_timeout(timer)?; @@ -87,10 +88,13 @@ impl Broadcast { let blobs: Vec<_> = ventries .into_par_iter() - .flat_map(|p| { + .map_with(storage_entry_sender.clone(), |s, p| { let entries: Vec<_> = p.into_iter().map(|e| e.0).collect(); - entries.to_shared_blobs() + let blobs = entries.to_shared_blobs(); + let _ignored = s.send(entries); + blobs }) + .flatten() .collect(); let blob_index = blocktree @@ -186,6 +190,7 @@ impl BroadcastStage { cluster_info: &Arc>, receiver: &Receiver, blocktree: &Arc, + storage_entry_sender: EntrySender, ) -> BroadcastStageReturnType { let me = cluster_info.read().unwrap().my_data().clone(); @@ -196,7 +201,13 @@ impl BroadcastStage { }; loop { - if let Err(e) = broadcast.run(&cluster_info, receiver, sock, blocktree) { + if let Err(e) = broadcast.run( + &cluster_info, + receiver, + sock, + blocktree, + &storage_entry_sender, + ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => { return BroadcastStageReturnType::ChannelDisconnected; @@ -234,6 +245,7 @@ impl BroadcastStage { receiver: Receiver, exit_sender: &Arc, blocktree: &Arc, + storage_entry_sender: EntrySender, ) -> Self { let blocktree = blocktree.clone(); let exit_sender = exit_sender.clone(); @@ -241,7 +253,13 @@ impl BroadcastStage { .name("solana-broadcaster".to_string()) .spawn(move || { let _finalizer = Finalizer::new(exit_sender); - Self::run(&sock, &cluster_info, &receiver, &blocktree) + Self::run( + &sock, + &cluster_info, + &receiver, + &blocktree, + storage_entry_sender, + ) }) .unwrap(); @@ -301,6 +319,7 @@ mod test { let cluster_info = Arc::new(RwLock::new(cluster_info)); let exit_sender = Arc::new(AtomicBool::new(false)); + let (storage_sender, _receiver) = channel(); let bank = Arc::new(Bank::default()); // Start up the broadcast stage @@ -310,6 +329,7 @@ mod test { entry_receiver, &exit_sender, &blocktree, + storage_sender, ); MockBroadcastStage { diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index fab7854243..9607b11734 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -31,7 +31,7 @@ use solana_sdk::transaction::Transaction; use solana_vote_api::vote_instruction::{Vote, VoteInstruction}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::Receiver; +use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::Result; @@ -203,7 +203,9 @@ impl Fullnode { Some(Arc::new(voting_keypair)) }; - // Setup channel for rotation indications + // Setup channel for sending entries to storage stage + let (sender, receiver) = channel(); + let tvu = Tvu::new( vote_account, voting_keypair, @@ -218,6 +220,8 @@ impl Fullnode { ledger_signal_receiver, &subscriptions, &poh_recorder, + sender.clone(), + receiver, &exit, ); let tpu = Tpu::new( @@ -230,6 +234,7 @@ impl Fullnode { node.sockets.broadcast, config.sigverify_disabled, &blocktree, + sender, &exit, ); diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index 05863f696e..cffa828e46 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -142,12 +142,12 @@ impl LocalCluster { cluster.add_validator(&fullnode_config, *stake); } + discover(&cluster.entry_point_info.gossip, node_stakes.len()).unwrap(); + for _ in 0..num_replicators { cluster.add_replicator(); } - discover(&cluster.entry_point_info.gossip, node_stakes.len()).unwrap(); - cluster } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 978a7861e0..291d54a3c7 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -4,7 +4,7 @@ use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; use crate::blocktree_processor; use crate::cluster_info::ClusterInfo; -use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; +use crate::entry::{Entry, EntrySender, EntrySlice}; use crate::leader_schedule_utils; use crate::locktower::{Locktower, StakeLockout}; use crate::packet::BlobError; @@ -82,11 +82,11 @@ impl ReplayStage { ledger_signal_receiver: Receiver, subscriptions: &Arc, poh_recorder: &Arc>, - ) -> (Self, Receiver<(u64, Pubkey)>, EntryReceiver) + storage_entry_sender: EntrySender, + ) -> (Self, Receiver<(u64, Pubkey)>) where T: 'static + KeypairUtil + Send + Sync, { - let (forward_entry_sender, forward_entry_receiver) = channel(); let (slot_full_sender, slot_full_receiver) = channel(); trace!("replay stage"); let exit_ = exit.clone(); @@ -121,7 +121,7 @@ impl ReplayStage { &my_id, &mut ticks_per_slot, &mut progress, - &forward_entry_sender, + &storage_entry_sender, &slot_full_sender, )?; @@ -193,11 +193,7 @@ impl ReplayStage { Ok(()) }) .unwrap(); - ( - Self { t_replay }, - slot_full_receiver, - forward_entry_receiver, - ) + (Self { t_replay }, slot_full_receiver) } pub fn start_leader( my_id: &Pubkey, @@ -637,7 +633,8 @@ mod test { let blocktree = Arc::new(blocktree); let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); - let (replay_stage, _slot_full_receiver, ledger_writer_recv) = ReplayStage::new( + let (ledger_writer_sender, ledger_writer_receiver) = channel(); + let (replay_stage, _slot_full_receiver) = ReplayStage::new( &my_keypair.pubkey(), &voting_keypair.pubkey(), Some(voting_keypair.clone()), @@ -648,6 +645,7 @@ mod test { l_receiver, &Arc::new(RpcSubscriptions::default()), &poh_recorder, + ledger_writer_sender, ); let vote_ix = VoteInstruction::new_vote(&voting_keypair.pubkey(), Vote::new(0)); @@ -665,7 +663,7 @@ mod test { .write_entries(1, 0, 0, genesis_block.ticks_per_slot, next_tick.clone()) .unwrap(); - let received_tick = ledger_writer_recv + let received_tick = ledger_writer_receiver .recv() .expect("Expected to receive an entry on the ledger writer receiver"); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index c3011b6845..ca2b225f01 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -6,6 +6,7 @@ use crate::blocktree::Blocktree; use crate::broadcast_stage::BroadcastStage; use crate::cluster_info::ClusterInfo; use crate::cluster_info_vote_listener::ClusterInfoVoteListener; +use crate::entry::EntrySender; use crate::fetch_stage::FetchStage; use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; use crate::service::Service; @@ -37,6 +38,7 @@ impl Tpu { broadcast_socket: UdpSocket, sigverify_disabled: bool, blocktree: &Arc, + storage_entry_sender: EntrySender, exit: &Arc, ) -> Self { cluster_info.write().unwrap().set_leader(id); @@ -62,6 +64,7 @@ impl Tpu { entry_receiver, &exit, blocktree, + storage_entry_sender, ); Self { diff --git a/core/src/tvu.rs b/core/src/tvu.rs index f95143a324..599205c12d 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -18,6 +18,7 @@ use crate::blockstream_service::BlockstreamService; use crate::blocktree::Blocktree; use crate::blocktree_processor::BankForksInfo; use crate::cluster_info::ClusterInfo; +use crate::entry::{EntryReceiver, EntrySender}; use crate::poh_recorder::PohRecorder; use crate::replay_stage::ReplayStage; use crate::retransmit_stage::RetransmitStage; @@ -68,6 +69,8 @@ impl Tvu { ledger_signal_receiver: Receiver, subscriptions: &Arc, poh_recorder: &Arc>, + storage_entry_sender: EntrySender, + storage_entry_receiver: EntryReceiver, exit: &Arc, ) -> Self where @@ -106,7 +109,7 @@ impl Tvu { &exit, ); - let (replay_stage, slot_full_receiver, forward_entry_receiver) = ReplayStage::new( + let (replay_stage, slot_full_receiver) = ReplayStage::new( &keypair.pubkey(), vote_account, voting_keypair, @@ -117,6 +120,7 @@ impl Tvu { ledger_signal_receiver, subscriptions, poh_recorder, + storage_entry_sender, ); let blockstream_service = if blockstream.is_some() { @@ -133,7 +137,7 @@ impl Tvu { let storage_stage = StorageStage::new( storage_state, - forward_entry_receiver, + storage_entry_receiver, Some(blocktree), &keypair, &exit, @@ -205,6 +209,7 @@ pub mod tests { let bank = bank_forks.working_bank(); let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let voting_keypair = Keypair::new(); + let (storage_entry_sender, storage_entry_receiver) = channel(); let tvu = Tvu::new( &voting_keypair.pubkey(), Some(Arc::new(voting_keypair)), @@ -225,6 +230,8 @@ pub mod tests { l_receiver, &Arc::new(RpcSubscriptions::default()), &poh_recorder, + storage_entry_sender, + storage_entry_receiver, &exit, ); exit.store(true, Ordering::Relaxed); diff --git a/core/tests/replicator.rs b/core/tests/replicator.rs index 1a618f4e9c..f09436a2e1 100644 --- a/core/tests/replicator.rs +++ b/core/tests/replicator.rs @@ -98,17 +98,16 @@ fn download_from_replicator(replicator_info: &ContactInfo) { assert!(received_blob); } -#[test] -fn test_replicator_startup_basic() { +/// Start the cluster with the given configuration and wait till the replicators are discovered +/// Then download blobs from one of them. +fn run_replicator_startup_basic(num_nodes: usize, num_replicators: usize) { solana_logger::setup(); info!("starting replicator test"); - const NUM_NODES: usize = 2; - let num_replicators = 1; let mut fullnode_config = FullnodeConfig::default(); fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT; let cluster = LocalCluster::new_with_config_replicators( - &[100; NUM_NODES], + &vec![100; num_nodes], 10_000, &fullnode_config, num_replicators, @@ -118,10 +117,10 @@ fn test_replicator_startup_basic() { let cluster_nodes = discover( &cluster.entry_point_info.gossip, - NUM_NODES + num_replicators, + num_nodes + num_replicators, ) .unwrap(); - assert_eq!(cluster_nodes.len(), NUM_NODES + num_replicators); + assert_eq!(cluster_nodes.len(), num_nodes + num_replicators); let mut replicator_count = 0; let mut replicator_info = ContactInfo::default(); for node in &cluster_nodes { @@ -136,6 +135,16 @@ fn test_replicator_startup_basic() { download_from_replicator(&replicator_info); } +#[test] +fn test_replicator_startup_1_node() { + run_replicator_startup_basic(1, 1); +} + +#[test] +fn test_replicator_startup_2_nodes() { + run_replicator_startup_basic(2, 1); +} + #[test] fn test_replicator_startup_leader_hang() { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; diff --git a/core/tests/tvu.rs b/core/tests/tvu.rs index 38c93297bd..f2ad23a1a5 100644 --- a/core/tests/tvu.rs +++ b/core/tests/tvu.rs @@ -101,6 +101,7 @@ fn test_replay() { let voting_keypair = Keypair::new(); let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); + let (storage_sender, storage_receiver) = channel(); let tvu = Tvu::new( &voting_keypair.pubkey(), Some(Arc::new(voting_keypair)), @@ -121,6 +122,8 @@ fn test_replay() { ledger_signal_receiver, &Arc::new(RpcSubscriptions::default()), &poh_recorder, + storage_sender, + storage_receiver, &exit, );