diff --git a/replicator/src/main.rs b/replicator/src/main.rs index 0d1324411..af2deeae6 100644 --- a/replicator/src/main.rs +++ b/replicator/src/main.rs @@ -82,7 +82,7 @@ fn main() { let leader_info = NodeInfo::new_entry_point(&network_addr); - let replicator = Replicator::new(ledger_path, node, &leader_info, &keypair).unwrap(); + let replicator = Replicator::new(ledger_path, node, &leader_info, &keypair, None).unwrap(); replicator.join(); } diff --git a/src/replicator.rs b/src/replicator.rs index a431c97d2..b8519071c 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -6,7 +6,7 @@ use crate::cluster_info::{ClusterInfo, Node, NodeInfo}; use crate::db_ledger::DbLedger; use crate::gossip_service::GossipService; use crate::leader_scheduler::LeaderScheduler; -use crate::result::Result; +use crate::result::{self, Result}; use crate::rpc_request::{RpcClient, RpcRequest, RpcRequestHandler}; use crate::service::Service; use crate::storage_stage::{get_segment_from_entry, ENTRIES_PER_SEGMENT}; @@ -35,7 +35,7 @@ use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::thread::JoinHandle; -use std::time::Duration; +use std::time::{Duration, Instant}; pub struct Replicator { gossip_service: GossipService, @@ -99,15 +99,27 @@ fn get_entry_heights_from_last_id( } impl Replicator { + /// Returns a Result that contains a replicator on success + /// + /// # Arguments + /// * `ledger_path` - (Not actually optional) path to where the the ledger will be stored. + /// Causes panic if none + /// * `node` - The replicator node + /// * `leader_info` - NodeInfo representing the leader + /// * `keypair` - Keypair for this replicator + /// * `timeout` - (optional) timeout for polling for leader/downloading the ledger. Defaults to + /// 30 seconds #[allow(clippy::new_ret_no_self)] pub fn new( ledger_path: Option<&str>, node: Node, leader_info: &NodeInfo, keypair: &Keypair, + timeout: Option, ) -> Result { let exit = Arc::new(AtomicBool::new(false)); let done = Arc::new(AtomicBool::new(false)); + let timeout = timeout.unwrap_or_else(|| Duration::new(30, 0)); info!("Replicator: id: {}", keypair.pubkey()); info!("Creating cluster info...."); @@ -138,7 +150,7 @@ impl Replicator { ); info!("polling for leader"); - let leader = Self::poll_for_leader(&cluster_info)?; + let leader = Self::poll_for_leader(&cluster_info, timeout)?; info!("Got leader: {:?}", leader); @@ -161,6 +173,8 @@ impl Replicator { // todo: pull blobs off the retransmit_receiver and recycle them? let (retransmit_sender, retransmit_receiver) = channel(); + let (entry_sender, entry_receiver) = channel(); + let t_window = window_service( db_ledger.clone(), cluster_info.clone(), @@ -168,7 +182,7 @@ impl Replicator { entry_height, max_entry_height, blob_fetch_receiver, - None, + Some(entry_sender), retransmit_sender, repair_socket, Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( @@ -178,10 +192,25 @@ impl Replicator { ); info!("window created, waiting for ledger download done"); + let start = Instant::now(); + let mut received_so_far = 0; + while !done.load(Ordering::Relaxed) { sleep(Duration::from_millis(100)); + + let elapsed = start.elapsed(); + received_so_far += entry_receiver.try_recv().map(|v| v.len()).unwrap_or(0); + + if received_so_far == 0 && elapsed > timeout { + return Err(result::Error::IO(io::Error::new( + ErrorKind::TimedOut, + "Timed out waiting to receive any blocks", + ))); + } } + info!("Done receiving entries from window_service"); + let mut node_info = node.info.clone(); node_info.tvu = "0.0.0.0:0".parse().unwrap(); { @@ -279,16 +308,27 @@ impl Replicator { self.entry_height } - fn poll_for_leader(cluster_info: &Arc>) -> Result { - for _ in 0..30 { + fn poll_for_leader( + cluster_info: &Arc>, + timeout: Duration, + ) -> Result { + let start = Instant::now(); + loop { if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() { return Ok(l.clone()); } + let elapsed = start.elapsed(); + if elapsed > timeout { + return Err(result::Error::IO(io::Error::new( + ErrorKind::TimedOut, + "Timed out waiting to receive any blocks", + ))); + } + sleep(Duration::from_millis(900)); info!("{}", cluster_info.read().unwrap().node_info_trace()); } - Err(Error::new(ErrorKind::Other, "Couldn't find leader"))? } fn poll_for_last_id_and_entry_height( @@ -355,7 +395,6 @@ impl Replicator { #[cfg(test)] mod tests { - use crate::replicator::sample_file; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -429,5 +468,4 @@ mod tests { let res = sample_file(&in_path, &samples); assert!(res.is_err()); } - } diff --git a/tests/replicator.rs b/tests/replicator.rs index 6b335da0d..5e589b5d1 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -132,6 +132,7 @@ fn test_replicator_startup() { replicator_node, &leader_info, &replicator_keypair, + None, ) .unwrap(); @@ -217,3 +218,116 @@ fn test_replicator_startup() { let _ignored = remove_dir_all(&leader_ledger_path); let _ignored = remove_dir_all(&replicator_ledger_path); } + +#[test] +fn test_replicator_startup_leader_hang() { + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::time::Duration; + + solana_logger::setup(); + info!("starting replicator test"); + + let replicator_ledger_path = &get_tmp_ledger_path("replicator_test_replicator_ledger"); + let leader_ledger_path = "replicator_test_leader_ledger"; + + { + let replicator_keypair = Keypair::new(); + + info!("starting replicator node"); + let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey()); + + let fake_gossip = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); + let leader_info = NodeInfo::new_entry_point(&fake_gossip); + + let replicator_res = Replicator::new( + Some(replicator_ledger_path), + replicator_node, + &leader_info, + &replicator_keypair, + Some(Duration::from_secs(3)), + ); + + assert!(replicator_res.is_err()); + } + + let _ignored = DbLedger::destroy(&leader_ledger_path); + let _ignored = DbLedger::destroy(&replicator_ledger_path); + let _ignored = remove_dir_all(&leader_ledger_path); + let _ignored = remove_dir_all(&replicator_ledger_path); +} + +#[test] +fn test_replicator_startup_ledger_hang() { + use std::net::UdpSocket; + + solana_logger::setup(); + info!("starting replicator test"); + let replicator_ledger_path = &get_tmp_ledger_path("replicator_test_replicator_ledger"); + + info!("starting leader node"); + let leader_keypair = Arc::new(Keypair::new()); + let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); + let leader_info = leader_node.info.clone(); + + let leader_ledger_path = "replicator_test_leader_ledger"; + let (_, leader_ledger_path) = create_tmp_genesis(leader_ledger_path, 100, leader_info.id, 1); + + let validator_ledger_path = + tmp_copy_ledger(&leader_ledger_path, "replicator_test_validator_ledger"); + + { + let signer_proxy = + VoteSignerProxy::new(&leader_keypair, Box::new(LocalVoteSigner::default())); + + let _ = Fullnode::new( + leader_node, + &leader_ledger_path, + leader_keypair, + Arc::new(signer_proxy), + None, + false, + LeaderScheduler::from_bootstrap_leader(leader_info.id.clone()), + None, + ); + + let validator_keypair = Arc::new(Keypair::new()); + let signer_proxy = + VoteSignerProxy::new(&validator_keypair, Box::new(LocalVoteSigner::default())); + let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); + + let _ = Fullnode::new( + validator_node, + &validator_ledger_path, + validator_keypair, + Arc::new(signer_proxy), + Some(leader_info.gossip), + false, + LeaderScheduler::from_bootstrap_leader(leader_info.id), + None, + ); + + info!("starting replicator node"); + let bad_keys = Keypair::new(); + let mut replicator_node = Node::new_localhost_with_pubkey(bad_keys.pubkey()); + + // Pass bad TVU sockets to prevent successful ledger download + replicator_node.sockets.tvu = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; + + let leader_info = NodeInfo::new_entry_point(&leader_info.gossip); + + let replicator_res = Replicator::new( + Some(replicator_ledger_path), + replicator_node, + &leader_info, + &bad_keys, + Some(Duration::from_secs(3)), + ); + + assert!(replicator_res.is_err()); + } + + let _ignored = DbLedger::destroy(&leader_ledger_path); + let _ignored = DbLedger::destroy(&replicator_ledger_path); + let _ignored = remove_dir_all(&leader_ledger_path); + let _ignored = remove_dir_all(&replicator_ledger_path); +}