diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index e7a028e8db..1c16e141f8 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -3,6 +3,7 @@ use crate::cluster_info::{Node, FULLNODE_PORT_RANGE}; use crate::contact_info::ContactInfo; use crate::fullnode::{Fullnode, FullnodeConfig}; use crate::gossip_service::discover; +use crate::replicator::Replicator; use crate::service::Service; use solana_client::client::create_client; use solana_client::thin_client::{retry_get_balance, ThinClient}; @@ -23,6 +24,9 @@ pub struct LocalCluster { pub entry_point_info: ContactInfo, pub ledger_paths: Vec, fullnodes: Vec, + replicators: Vec, + genesis_ledger_path: String, + genesis_block: GenesisBlock, } impl LocalCluster { @@ -35,6 +39,15 @@ impl LocalCluster { node_stakes: &[u64], cluster_lamports: u64, fullnode_config: &FullnodeConfig, + ) -> Self { + Self::new_with_config_replicators(node_stakes, cluster_lamports, fullnode_config, 0) + } + + pub fn new_with_config_replicators( + node_stakes: &[u64], + cluster_lamports: u64, + fullnode_config: &FullnodeConfig, + num_replicators: usize, ) -> Self { let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey(); @@ -48,6 +61,7 @@ impl LocalCluster { ledger_paths.push(leader_ledger_path.clone()); let voting_keypair = Keypair::new(); let leader_contact_info = leader_node.info.clone(); + let leader_server = Fullnode::new( leader_node, &leader_keypair, @@ -57,54 +71,30 @@ impl LocalCluster { None, fullnode_config, ); - let mut fullnodes = vec![leader_server]; - let mut client = create_client( - leader_contact_info.client_facing_addr(), - FULLNODE_PORT_RANGE, - ); - for stake in &node_stakes[1..] { - // Must have enough tokens to fund vote account and set delegate - assert!(*stake > 2); - let validator_keypair = Arc::new(Keypair::new()); - let voting_keypair = Keypair::new(); - let validator_pubkey = validator_keypair.pubkey(); - let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey()); - let ledger_path = tmp_copy_blocktree!(&genesis_ledger_path); - ledger_paths.push(ledger_path.clone()); - // Send each validator some lamports to vote - let validator_balance = - Self::transfer(&mut client, &mint_keypair, &validator_pubkey, *stake); - info!( - "validator {} balance {}", - validator_pubkey, validator_balance - ); + let fullnodes = vec![leader_server]; - Self::create_and_fund_vote_account( - &mut client, - &voting_keypair, - &validator_keypair, - stake - 1, - ) - .unwrap(); - let validator_server = Fullnode::new( - validator_node, - &validator_keypair, - &ledger_path, - &voting_keypair.pubkey(), - voting_keypair, - Some(&leader_contact_info), - fullnode_config, - ); - fullnodes.push(validator_server); - } - discover(&leader_contact_info.gossip, node_stakes.len()).unwrap(); - Self { + let mut cluster = Self { funding_keypair: mint_keypair, entry_point_info: leader_contact_info, fullnodes, + replicators: vec![], ledger_paths, + genesis_ledger_path, + genesis_block, + }; + + for stake in &node_stakes[1..] { + cluster.add_validator(&fullnode_config, *stake); } + + for _ in 0..num_replicators { + cluster.add_replicator(); + } + + discover(&cluster.entry_point_info.gossip, node_stakes.len()).unwrap(); + + cluster } pub fn exit(&self) { @@ -118,6 +108,84 @@ impl LocalCluster { while let Some(node) = self.fullnodes.pop() { node.join().unwrap(); } + + while let Some(node) = self.replicators.pop() { + node.close(); + } + } + + fn add_validator(&mut self, fullnode_config: &FullnodeConfig, stake: u64) { + let mut client = create_client( + self.entry_point_info.client_facing_addr(), + FULLNODE_PORT_RANGE, + ); + + // Must have enough tokens to fund vote account and set delegate + assert!(stake > 2); + let validator_keypair = Arc::new(Keypair::new()); + let voting_keypair = Keypair::new(); + let validator_pubkey = validator_keypair.pubkey(); + let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey()); + let ledger_path = tmp_copy_blocktree!(&self.genesis_ledger_path); + self.ledger_paths.push(ledger_path.clone()); + + // Send each validator some lamports to vote + let validator_balance = + Self::transfer(&mut client, &self.funding_keypair, &validator_pubkey, stake); + info!( + "validator {} balance {}", + validator_pubkey, validator_balance + ); + + Self::create_and_fund_vote_account( + &mut client, + &voting_keypair, + &validator_keypair, + stake - 1, + ) + .unwrap(); + + let validator_server = Fullnode::new( + validator_node, + &validator_keypair, + &ledger_path, + &voting_keypair.pubkey(), + voting_keypair, + Some(&self.entry_point_info), + fullnode_config, + ); + + self.fullnodes.push(validator_server); + } + + fn add_replicator(&mut self) { + let replicator_keypair = Arc::new(Keypair::new()); + let mut client = create_client( + self.entry_point_info.client_facing_addr(), + FULLNODE_PORT_RANGE, + ); + + Self::transfer( + &mut client, + &self.funding_keypair, + &replicator_keypair.pubkey(), + 1, + ); + let replicator_node = Node::new_localhost_with_pubkey(&replicator_keypair.pubkey()); + + let (replicator_ledger_path, _blockhash) = create_new_tmp_ledger!(&self.genesis_block); + let replicator = Replicator::new( + &replicator_ledger_path, + replicator_node, + self.entry_point_info.clone(), + replicator_keypair, + None, + ) + .unwrap(); + + self.ledger_paths.push(replicator_ledger_path); + + self.replicators.push(replicator); } fn close(&mut self) { @@ -216,7 +284,10 @@ mod test { #[test] fn test_local_cluster_start_and_exit() { solana_logger::setup(); - let _cluster = LocalCluster::new(1, 100, 3); + let num_nodes = 1; + let cluster = LocalCluster::new(num_nodes, 100, 3); + assert_eq!(cluster.fullnodes.len(), num_nodes); + assert_eq!(cluster.replicators.len(), 0); } #[test] @@ -224,6 +295,15 @@ mod test { solana_logger::setup(); let mut fullnode_exit = FullnodeConfig::default(); fullnode_exit.rpc_config.enable_fullnode_exit = true; - let _cluster = LocalCluster::new_with_config(&[3], 100, &fullnode_exit); + const NUM_NODES: usize = 1; + let num_replicators = 1; + let cluster = LocalCluster::new_with_config_replicators( + &[3; NUM_NODES], + 100, + &fullnode_exit, + num_replicators, + ); + assert_eq!(cluster.fullnodes.len(), NUM_NODES); + assert_eq!(cluster.replicators.len(), num_replicators); } } diff --git a/core/src/replicator.rs b/core/src/replicator.rs index a44c757e65..4c95ea535c 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -9,7 +9,6 @@ use crate::repair_service::RepairSlotRange; use crate::result::Result; use crate::service::Service; use crate::storage_stage::{get_segment_from_entry, ENTRIES_PER_SEGMENT}; -use crate::streamer::BlobReceiver; use crate::window_service::WindowService; use rand::thread_rng; use rand::Rng; @@ -35,13 +34,15 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::sleep; +use std::thread::spawn; +use std::thread::JoinHandle; use std::time::{Duration, Instant}; pub struct Replicator { gossip_service: GossipService, fetch_stage: BlobFetchStage, window_service: WindowService, - pub retransmit_receiver: BlobReceiver, + t_retransmit: JoinHandle<()>, exit: Arc, slot: u64, ledger_path: String, @@ -174,7 +175,6 @@ impl Replicator { let (blob_fetch_sender, blob_fetch_receiver) = channel(); let fetch_stage = BlobFetchStage::new_multi_socket(blob_sockets, &blob_fetch_sender, &exit); - // todo: pull blobs off the retransmit_receiver and recycle them? let (retransmit_sender, retransmit_receiver) = channel(); let window_service = WindowService::new( @@ -187,11 +187,20 @@ impl Replicator { repair_slot_range, ); + // receive blobs from retransmit and drop them. + let exit2 = exit.clone(); + let t_retransmit = spawn(move || loop { + let _ = retransmit_receiver.recv_timeout(Duration::from_secs(1)); + if exit2.load(Ordering::Relaxed) { + break; + } + }); + Ok(Self { gossip_service, fetch_stage, window_service, - retransmit_receiver, + t_retransmit, exit, slot, ledger_path: ledger_path.to_string(), @@ -329,14 +338,7 @@ impl Replicator { self.gossip_service.join().unwrap(); self.fetch_stage.join().unwrap(); self.window_service.join().unwrap(); - - // Drain the queue here to prevent self.retransmit_receiver from being dropped - // before the window_service thread is joined - let mut retransmit_queue_count = 0; - while let Ok(_blob) = self.retransmit_receiver.recv_timeout(Duration::new(1, 0)) { - retransmit_queue_count += 1; - } - debug!("retransmit channel count: {}", retransmit_queue_count); + self.t_retransmit.join().unwrap(); } pub fn entry_height(&self) -> u64 { diff --git a/tests/replicator.rs b/tests/replicator.rs index 61c2ca8436..a45e012252 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -4,18 +4,15 @@ extern crate log; #[macro_use] extern crate solana; -use solana::blocktree::{ - create_new_tmp_ledger, get_tmp_ledger_path, tmp_copy_blocktree, Blocktree, -}; -use solana::cluster_info::{Node, FULLNODE_PORT_RANGE}; +use solana::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree, Blocktree}; +use solana::cluster_info::Node; use solana::contact_info::ContactInfo; use solana::fullnode::{Fullnode, FullnodeConfig}; +use solana::local_cluster::LocalCluster; use solana::replicator::Replicator; use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT; -use solana_client::client::create_client; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::signature::{Keypair, KeypairUtil}; -use solana_sdk::system_transaction::SystemTransaction; use std::fs::remove_dir_all; use std::sync::Arc; use std::time::Duration; @@ -24,124 +21,12 @@ use std::time::Duration; fn test_replicator_startup_basic() { solana_logger::setup(); info!("starting replicator test"); - let replicator_ledger_path = &get_tmp_ledger_path!(); - info!("starting leader node"); - let leader_keypair = Arc::new(Keypair::new()); - let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey()); - let leader_info = leader_node.info.clone(); - - let (genesis_block, mint_keypair) = - GenesisBlock::new_with_leader(1_000_000_000, &leader_info.id, 42); - let (leader_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); - - let validator_ledger_path = tmp_copy_blocktree!(&leader_ledger_path); - - { - let voting_keypair = Keypair::new(); - - let mut fullnode_config = FullnodeConfig::default(); - fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT; - let leader = Fullnode::new( - leader_node, - &leader_keypair, - &leader_ledger_path, - &voting_keypair.pubkey(), - voting_keypair, - None, - &fullnode_config, - ); - - debug!("Looking for leader on gossip..."); - solana::gossip_service::discover(&leader_info.gossip, 1).unwrap(); - - let validator_keypair = Arc::new(Keypair::new()); - let voting_keypair = Keypair::new(); - - let mut leader_client = - create_client(leader_info.client_facing_addr(), FULLNODE_PORT_RANGE); - let blockhash = leader_client.get_recent_blockhash(); - debug!("blockhash: {:?}", blockhash); - - leader_client - .transfer(10, &mint_keypair, &validator_keypair.pubkey(), &blockhash) - .unwrap(); - - let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey()); - - let validator = Fullnode::new( - validator_node, - &validator_keypair, - &validator_ledger_path, - &voting_keypair.pubkey(), - voting_keypair, - Some(&leader_info), - &fullnode_config, - ); - - let bob = Keypair::new(); - - info!("starting transfers.."); - for i in 0..64 { - debug!("transfer {}", i); - let blockhash = leader_client.get_recent_blockhash(); - let mut transaction = - SystemTransaction::new_account(&mint_keypair, &bob.pubkey(), 1, blockhash, 0); - leader_client - .retry_transfer(&mint_keypair, &mut transaction, 5) - .unwrap(); - debug!( - "transfer {}: mint balance={:?}, bob balance={:?}", - i, - leader_client.get_balance(&mint_keypair.pubkey()), - leader_client.get_balance(&bob.pubkey()), - ); - } - - let replicator_keypair = Arc::new(Keypair::new()); - - info!("giving replicator lamports.."); - - let blockhash = leader_client.get_recent_blockhash(); - // Give the replicator some lamports - let mut tx = SystemTransaction::new_account( - &mint_keypair, - &replicator_keypair.pubkey(), - 1, - blockhash, - 0, - ); - leader_client - .retry_transfer(&mint_keypair, &mut tx, 5) - .unwrap(); - - info!("starting replicator node"); - let replicator_node = Node::new_localhost_with_pubkey(&replicator_keypair.pubkey()); - let _replicator_info = replicator_node.info.clone(); - - let leader_info = ContactInfo::new_gossip_entry_point(&leader_info.gossip); - - let replicator = Replicator::new( - replicator_ledger_path, - replicator_node, - leader_info, - replicator_keypair, - None, - ) - .unwrap(); - - info!("started replicator.."); - - replicator.close(); - validator.close().unwrap(); - leader.close().unwrap(); - } - - info!("cleanup"); - Blocktree::destroy(&leader_ledger_path).expect("Expected successful database destruction"); - Blocktree::destroy(&replicator_ledger_path).expect("Expected successful database destruction"); - let _ignored = remove_dir_all(&leader_ledger_path); - let _ignored = remove_dir_all(&replicator_ledger_path); + const NUM_NODES: usize = 2; + let mut fullnode_config = FullnodeConfig::default(); + fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT; + let _cluster = + LocalCluster::new_with_config_replicators(&[100; NUM_NODES], 10_000, &fullnode_config, 1); } #[test]