diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 4b669ad45..d31ffda25 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -725,6 +725,12 @@ impl ClusterInfo { orders } + pub fn window_index_request_bytes(&self, ix: u64) -> Result> { + let req = Protocol::RequestWindowIndex(self.my_data().clone(), ix); + let out = serialize(&req)?; + Ok(out) + } + pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication, as indicated // by a valid tvu port location @@ -734,8 +740,7 @@ impl ClusterInfo { } let n = thread_rng().gen::() % valid.len(); let addr = valid[n].gossip; // send the request to the peer's gossip port - let req = Protocol::RequestWindowIndex(self.my_data().clone(), ix); - let out = serialize(&req)?; + let out = self.window_index_request_bytes(ix)?; submit( influxdb::Point::new("cluster-info") @@ -1038,7 +1043,7 @@ impl ClusterInfo { let my_info = me.read().unwrap().my_data().clone(); inc_new_counter_info!("cluster_info-window-request-recv", 1); trace!( - "{}: received RequestWindowIndex {} {} ", + "{}: received RequestWindowIndex from: {} index: {} ", self_id, from.id, ix, diff --git a/tests/replicator.rs b/tests/replicator.rs index 475c50c2c..951d5dded 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -4,26 +4,31 @@ extern crate log; #[macro_use] extern crate serde_json; +use bincode::deserialize; use solana::client::mk_client; -use solana::cluster_info::{Node, NodeInfo}; +use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; use solana::create_vote_account::*; use solana::db_ledger::DbLedger; +use solana::entry::Entry; use solana::fullnode::Fullnode; use solana::leader_scheduler::LeaderScheduler; -use solana::ledger::{create_tmp_genesis, get_tmp_ledger_path, read_ledger, tmp_copy_ledger}; +use solana::ledger::{create_tmp_genesis, get_tmp_ledger_path, tmp_copy_ledger}; use solana::replicator::Replicator; use solana::rpc_request::{RpcClient, RpcRequest}; +use solana::streamer::blob_receiver; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::transaction::Transaction; use std::fs::remove_dir_all; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; use std::sync::Arc; -use std::thread::sleep; use std::time::Duration; #[test] fn test_replicator_startup() { + solana_logger::setup(); info!("starting replicator test"); let replicator_ledger_path = &get_tmp_ledger_path("replicator_test_replicator_ledger"); @@ -115,6 +120,7 @@ fn test_replicator_startup() { info!("starting replicator node"); let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey()); + let replicator_info = replicator_node.info.clone(); let leader_info = NodeInfo::new_entry_point(&leader_info.gossip); @@ -126,38 +132,51 @@ fn test_replicator_startup() { ) .unwrap(); - // Poll the ledger dir to see that some is downloaded - let mut num_entries = 0; - for _ in 0..60 { - match read_ledger(replicator_ledger_path, true) { - Ok(entries) => { - for _ in entries { - num_entries += 1; - } - info!("{} entries", num_entries); - if num_entries > 0 { - break; - } - } - Err(e) => { - info!("error reading ledger: {:?}", e); - } - } - sleep(Duration::from_millis(300)); + // Create a client which downloads from the replicator and see that it + // can respond with blobs. + let tn = Node::new_localhost(); + let cluster_info = ClusterInfo::new(tn.info.clone()); + let repair_index = 1; + let req = cluster_info + .window_index_request_bytes(repair_index) + .unwrap(); - // Do a transfer to make sure new entries are created which - // stimulates the repair process - let last_id = leader_client.get_last_id(); - leader_client - .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) - .unwrap(); + let exit = Arc::new(AtomicBool::new(false)); + let (s_reader, r_reader) = channel(); + let repair_socket = Arc::new(tn.sockets.repair); + let t_receiver = blob_receiver(repair_socket.clone(), exit.clone(), s_reader); + + info!( + "Sending repair requests from: {} to: {}", + tn.info.id, replicator_info.gossip + ); + + let mut num_txs = 0; + for _ in 0..5 { + repair_socket.send_to(&req, replicator_info.gossip).unwrap(); + + let x = r_reader.recv_timeout(Duration::new(1, 0)); + + if let Ok(blobs) = x { + for b in blobs { + let br = b.read().unwrap(); + assert!(br.index().unwrap() == repair_index); + let entry: Entry = deserialize(&br.data()[..br.meta.size]).unwrap(); + info!("entry: {:?}", entry); + num_txs = entry.transactions.len(); + } + break; + } } + exit.store(true, Ordering::Relaxed); + t_receiver.join().unwrap(); // The replicator will not submit storage proofs if // chacha is not enabled #[cfg(feature = "chacha")] { use solana::rpc_request::{RpcClient, RpcRequest}; + use std::thread::sleep; let rpc_client = RpcClient::new_from_socket(validator_node_info.rpc); let mut non_zero_pubkeys = false; @@ -177,7 +196,7 @@ fn test_replicator_startup() { } // Check that some ledger was downloaded - assert!(num_entries > 0); + assert!(num_txs != 0); stop_local_vote_signer_service(t_signer, &signer_exit); replicator.close();