Fix Storage Stage not receiving entries when node is leader (#3528)

This commit is contained in:
Sagar Dhawan 2019-03-27 13:10:33 -07:00 committed by GitHub
parent 47b6707c07
commit 36ea088387
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 74 additions and 29 deletions

View File

@ -2,7 +2,7 @@
//! //!
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT}; use crate::cluster_info::{ClusterInfo, ClusterInfoError, DATA_PLANE_FANOUT};
use crate::entry::EntrySlice; use crate::entry::{EntrySender, EntrySlice};
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
use crate::erasure::CodingGenerator; use crate::erasure::CodingGenerator;
use crate::packet::index_blobs; use crate::packet::index_blobs;
@ -41,6 +41,7 @@ impl Broadcast {
receiver: &Receiver<WorkingBankEntries>, receiver: &Receiver<WorkingBankEntries>,
sock: &UdpSocket, sock: &UdpSocket,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
storage_entry_sender: &EntrySender,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let (mut bank, entries) = receiver.recv_timeout(timer)?; let (mut bank, entries) = receiver.recv_timeout(timer)?;
@ -87,10 +88,13 @@ impl Broadcast {
let blobs: Vec<_> = ventries let blobs: Vec<_> = ventries
.into_par_iter() .into_par_iter()
.flat_map(|p| { .map_with(storage_entry_sender.clone(), |s, p| {
let entries: Vec<_> = p.into_iter().map(|e| e.0).collect(); let entries: Vec<_> = p.into_iter().map(|e| e.0).collect();
entries.to_shared_blobs() let blobs = entries.to_shared_blobs();
let _ignored = s.send(entries);
blobs
}) })
.flatten()
.collect(); .collect();
let blob_index = blocktree let blob_index = blocktree
@ -186,6 +190,7 @@ impl BroadcastStage {
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>, receiver: &Receiver<WorkingBankEntries>,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
storage_entry_sender: EntrySender,
) -> BroadcastStageReturnType { ) -> BroadcastStageReturnType {
let me = cluster_info.read().unwrap().my_data().clone(); let me = cluster_info.read().unwrap().my_data().clone();
@ -196,7 +201,13 @@ impl BroadcastStage {
}; };
loop { loop {
if let Err(e) = broadcast.run(&cluster_info, receiver, sock, blocktree) { if let Err(e) = broadcast.run(
&cluster_info,
receiver,
sock,
blocktree,
&storage_entry_sender,
) {
match e { match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => {
return BroadcastStageReturnType::ChannelDisconnected; return BroadcastStageReturnType::ChannelDisconnected;
@ -234,6 +245,7 @@ impl BroadcastStage {
receiver: Receiver<WorkingBankEntries>, receiver: Receiver<WorkingBankEntries>,
exit_sender: &Arc<AtomicBool>, exit_sender: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
storage_entry_sender: EntrySender,
) -> Self { ) -> Self {
let blocktree = blocktree.clone(); let blocktree = blocktree.clone();
let exit_sender = exit_sender.clone(); let exit_sender = exit_sender.clone();
@ -241,7 +253,13 @@ impl BroadcastStage {
.name("solana-broadcaster".to_string()) .name("solana-broadcaster".to_string())
.spawn(move || { .spawn(move || {
let _finalizer = Finalizer::new(exit_sender); let _finalizer = Finalizer::new(exit_sender);
Self::run(&sock, &cluster_info, &receiver, &blocktree) Self::run(
&sock,
&cluster_info,
&receiver,
&blocktree,
storage_entry_sender,
)
}) })
.unwrap(); .unwrap();
@ -301,6 +319,7 @@ mod test {
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let exit_sender = Arc::new(AtomicBool::new(false)); let exit_sender = Arc::new(AtomicBool::new(false));
let (storage_sender, _receiver) = channel();
let bank = Arc::new(Bank::default()); let bank = Arc::new(Bank::default());
// Start up the broadcast stage // Start up the broadcast stage
@ -310,6 +329,7 @@ mod test {
entry_receiver, entry_receiver,
&exit_sender, &exit_sender,
&blocktree, &blocktree,
storage_sender,
); );
MockBroadcastStage { MockBroadcastStage {

View File

@ -31,7 +31,7 @@ use solana_sdk::transaction::Transaction;
use solana_vote_api::vote_instruction::{Vote, VoteInstruction}; use solana_vote_api::vote_instruction::{Vote, VoteInstruction};
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Receiver; use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread::Result; use std::thread::Result;
@ -203,7 +203,9 @@ impl Fullnode {
Some(Arc::new(voting_keypair)) Some(Arc::new(voting_keypair))
}; };
// Setup channel for rotation indications // Setup channel for sending entries to storage stage
let (sender, receiver) = channel();
let tvu = Tvu::new( let tvu = Tvu::new(
vote_account, vote_account,
voting_keypair, voting_keypair,
@ -218,6 +220,8 @@ impl Fullnode {
ledger_signal_receiver, ledger_signal_receiver,
&subscriptions, &subscriptions,
&poh_recorder, &poh_recorder,
sender.clone(),
receiver,
&exit, &exit,
); );
let tpu = Tpu::new( let tpu = Tpu::new(
@ -230,6 +234,7 @@ impl Fullnode {
node.sockets.broadcast, node.sockets.broadcast,
config.sigverify_disabled, config.sigverify_disabled,
&blocktree, &blocktree,
sender,
&exit, &exit,
); );

View File

@ -142,12 +142,12 @@ impl LocalCluster {
cluster.add_validator(&fullnode_config, *stake); cluster.add_validator(&fullnode_config, *stake);
} }
discover(&cluster.entry_point_info.gossip, node_stakes.len()).unwrap();
for _ in 0..num_replicators { for _ in 0..num_replicators {
cluster.add_replicator(); cluster.add_replicator();
} }
discover(&cluster.entry_point_info.gossip, node_stakes.len()).unwrap();
cluster cluster
} }

View File

@ -4,7 +4,7 @@ use crate::bank_forks::BankForks;
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::blocktree_processor; use crate::blocktree_processor;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; use crate::entry::{Entry, EntrySender, EntrySlice};
use crate::leader_schedule_utils; use crate::leader_schedule_utils;
use crate::locktower::{Locktower, StakeLockout}; use crate::locktower::{Locktower, StakeLockout};
use crate::packet::BlobError; use crate::packet::BlobError;
@ -82,11 +82,11 @@ impl ReplayStage {
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> (Self, Receiver<(u64, Pubkey)>, EntryReceiver) storage_entry_sender: EntrySender,
) -> (Self, Receiver<(u64, Pubkey)>)
where where
T: 'static + KeypairUtil + Send + Sync, T: 'static + KeypairUtil + Send + Sync,
{ {
let (forward_entry_sender, forward_entry_receiver) = channel();
let (slot_full_sender, slot_full_receiver) = channel(); let (slot_full_sender, slot_full_receiver) = channel();
trace!("replay stage"); trace!("replay stage");
let exit_ = exit.clone(); let exit_ = exit.clone();
@ -121,7 +121,7 @@ impl ReplayStage {
&my_id, &my_id,
&mut ticks_per_slot, &mut ticks_per_slot,
&mut progress, &mut progress,
&forward_entry_sender, &storage_entry_sender,
&slot_full_sender, &slot_full_sender,
)?; )?;
@ -193,11 +193,7 @@ impl ReplayStage {
Ok(()) Ok(())
}) })
.unwrap(); .unwrap();
( (Self { t_replay }, slot_full_receiver)
Self { t_replay },
slot_full_receiver,
forward_entry_receiver,
)
} }
pub fn start_leader( pub fn start_leader(
my_id: &Pubkey, my_id: &Pubkey,
@ -637,7 +633,8 @@ mod test {
let blocktree = Arc::new(blocktree); let blocktree = Arc::new(blocktree);
let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank);
let (replay_stage, _slot_full_receiver, ledger_writer_recv) = ReplayStage::new( let (ledger_writer_sender, ledger_writer_receiver) = channel();
let (replay_stage, _slot_full_receiver) = ReplayStage::new(
&my_keypair.pubkey(), &my_keypair.pubkey(),
&voting_keypair.pubkey(), &voting_keypair.pubkey(),
Some(voting_keypair.clone()), Some(voting_keypair.clone()),
@ -648,6 +645,7 @@ mod test {
l_receiver, l_receiver,
&Arc::new(RpcSubscriptions::default()), &Arc::new(RpcSubscriptions::default()),
&poh_recorder, &poh_recorder,
ledger_writer_sender,
); );
let vote_ix = VoteInstruction::new_vote(&voting_keypair.pubkey(), Vote::new(0)); let vote_ix = VoteInstruction::new_vote(&voting_keypair.pubkey(), Vote::new(0));
@ -665,7 +663,7 @@ mod test {
.write_entries(1, 0, 0, genesis_block.ticks_per_slot, next_tick.clone()) .write_entries(1, 0, 0, genesis_block.ticks_per_slot, next_tick.clone())
.unwrap(); .unwrap();
let received_tick = ledger_writer_recv let received_tick = ledger_writer_receiver
.recv() .recv()
.expect("Expected to receive an entry on the ledger writer receiver"); .expect("Expected to receive an entry on the ledger writer receiver");

View File

@ -6,6 +6,7 @@ use crate::blocktree::Blocktree;
use crate::broadcast_stage::BroadcastStage; use crate::broadcast_stage::BroadcastStage;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::cluster_info_vote_listener::ClusterInfoVoteListener; use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
use crate::entry::EntrySender;
use crate::fetch_stage::FetchStage; use crate::fetch_stage::FetchStage;
use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; use crate::poh_recorder::{PohRecorder, WorkingBankEntries};
use crate::service::Service; use crate::service::Service;
@ -37,6 +38,7 @@ impl Tpu {
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
sigverify_disabled: bool, sigverify_disabled: bool,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
storage_entry_sender: EntrySender,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self { ) -> Self {
cluster_info.write().unwrap().set_leader(id); cluster_info.write().unwrap().set_leader(id);
@ -62,6 +64,7 @@ impl Tpu {
entry_receiver, entry_receiver,
&exit, &exit,
blocktree, blocktree,
storage_entry_sender,
); );
Self { Self {

View File

@ -18,6 +18,7 @@ use crate::blockstream_service::BlockstreamService;
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::blocktree_processor::BankForksInfo; use crate::blocktree_processor::BankForksInfo;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::entry::{EntryReceiver, EntrySender};
use crate::poh_recorder::PohRecorder; use crate::poh_recorder::PohRecorder;
use crate::replay_stage::ReplayStage; use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage; use crate::retransmit_stage::RetransmitStage;
@ -68,6 +69,8 @@ impl Tvu {
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
storage_entry_sender: EntrySender,
storage_entry_receiver: EntryReceiver,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Self ) -> Self
where where
@ -106,7 +109,7 @@ impl Tvu {
&exit, &exit,
); );
let (replay_stage, slot_full_receiver, forward_entry_receiver) = ReplayStage::new( let (replay_stage, slot_full_receiver) = ReplayStage::new(
&keypair.pubkey(), &keypair.pubkey(),
vote_account, vote_account,
voting_keypair, voting_keypair,
@ -117,6 +120,7 @@ impl Tvu {
ledger_signal_receiver, ledger_signal_receiver,
subscriptions, subscriptions,
poh_recorder, poh_recorder,
storage_entry_sender,
); );
let blockstream_service = if blockstream.is_some() { let blockstream_service = if blockstream.is_some() {
@ -133,7 +137,7 @@ impl Tvu {
let storage_stage = StorageStage::new( let storage_stage = StorageStage::new(
storage_state, storage_state,
forward_entry_receiver, storage_entry_receiver,
Some(blocktree), Some(blocktree),
&keypair, &keypair,
&exit, &exit,
@ -205,6 +209,7 @@ pub mod tests {
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank); let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank);
let voting_keypair = Keypair::new(); let voting_keypair = Keypair::new();
let (storage_entry_sender, storage_entry_receiver) = channel();
let tvu = Tvu::new( let tvu = Tvu::new(
&voting_keypair.pubkey(), &voting_keypair.pubkey(),
Some(Arc::new(voting_keypair)), Some(Arc::new(voting_keypair)),
@ -225,6 +230,8 @@ pub mod tests {
l_receiver, l_receiver,
&Arc::new(RpcSubscriptions::default()), &Arc::new(RpcSubscriptions::default()),
&poh_recorder, &poh_recorder,
storage_entry_sender,
storage_entry_receiver,
&exit, &exit,
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);

View File

@ -98,17 +98,16 @@ fn download_from_replicator(replicator_info: &ContactInfo) {
assert!(received_blob); assert!(received_blob);
} }
#[test] /// Start the cluster with the given configuration and wait till the replicators are discovered
fn test_replicator_startup_basic() { /// Then download blobs from one of them.
fn run_replicator_startup_basic(num_nodes: usize, num_replicators: usize) {
solana_logger::setup(); solana_logger::setup();
info!("starting replicator test"); info!("starting replicator test");
const NUM_NODES: usize = 2;
let num_replicators = 1;
let mut fullnode_config = FullnodeConfig::default(); let mut fullnode_config = FullnodeConfig::default();
fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT; fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT;
let cluster = LocalCluster::new_with_config_replicators( let cluster = LocalCluster::new_with_config_replicators(
&[100; NUM_NODES], &vec![100; num_nodes],
10_000, 10_000,
&fullnode_config, &fullnode_config,
num_replicators, num_replicators,
@ -118,10 +117,10 @@ fn test_replicator_startup_basic() {
let cluster_nodes = discover( let cluster_nodes = discover(
&cluster.entry_point_info.gossip, &cluster.entry_point_info.gossip,
NUM_NODES + num_replicators, num_nodes + num_replicators,
) )
.unwrap(); .unwrap();
assert_eq!(cluster_nodes.len(), NUM_NODES + num_replicators); assert_eq!(cluster_nodes.len(), num_nodes + num_replicators);
let mut replicator_count = 0; let mut replicator_count = 0;
let mut replicator_info = ContactInfo::default(); let mut replicator_info = ContactInfo::default();
for node in &cluster_nodes { for node in &cluster_nodes {
@ -136,6 +135,16 @@ fn test_replicator_startup_basic() {
download_from_replicator(&replicator_info); download_from_replicator(&replicator_info);
} }
#[test]
fn test_replicator_startup_1_node() {
run_replicator_startup_basic(1, 1);
}
#[test]
fn test_replicator_startup_2_nodes() {
run_replicator_startup_basic(2, 1);
}
#[test] #[test]
fn test_replicator_startup_leader_hang() { fn test_replicator_startup_leader_hang() {
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};

View File

@ -101,6 +101,7 @@ fn test_replay() {
let voting_keypair = Keypair::new(); let voting_keypair = Keypair::new();
let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) = let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank); create_test_recorder(&bank);
let (storage_sender, storage_receiver) = channel();
let tvu = Tvu::new( let tvu = Tvu::new(
&voting_keypair.pubkey(), &voting_keypair.pubkey(),
Some(Arc::new(voting_keypair)), Some(Arc::new(voting_keypair)),
@ -121,6 +122,8 @@ fn test_replay() {
ledger_signal_receiver, ledger_signal_receiver,
&Arc::new(RpcSubscriptions::default()), &Arc::new(RpcSubscriptions::default()),
&poh_recorder, &poh_recorder,
storage_sender,
storage_receiver,
&exit, &exit,
); );