Switch test to send a repair request to try and download from replicator

Removes need for read_ledger in the test and also tests replicator
download path.
This commit is contained in:
Stephen Akridge 2019-01-09 11:19:04 -08:00 committed by sakridge
parent 28431ff22c
commit 73eca72f14
2 changed files with 55 additions and 31 deletions

View File

@ -725,6 +725,12 @@ impl ClusterInfo {
orders orders
} }
pub fn window_index_request_bytes(&self, ix: u64) -> Result<Vec<u8>> {
let req = Protocol::RequestWindowIndex(self.my_data().clone(), ix);
let out = serialize(&req)?;
Ok(out)
}
pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> { pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication, as indicated // find a peer that appears to be accepting replication, as indicated
// by a valid tvu port location // by a valid tvu port location
@ -734,8 +740,7 @@ impl ClusterInfo {
} }
let n = thread_rng().gen::<usize>() % valid.len(); let n = thread_rng().gen::<usize>() % valid.len();
let addr = valid[n].gossip; // send the request to the peer's gossip port let addr = valid[n].gossip; // send the request to the peer's gossip port
let req = Protocol::RequestWindowIndex(self.my_data().clone(), ix); let out = self.window_index_request_bytes(ix)?;
let out = serialize(&req)?;
submit( submit(
influxdb::Point::new("cluster-info") influxdb::Point::new("cluster-info")
@ -1038,7 +1043,7 @@ impl ClusterInfo {
let my_info = me.read().unwrap().my_data().clone(); let my_info = me.read().unwrap().my_data().clone();
inc_new_counter_info!("cluster_info-window-request-recv", 1); inc_new_counter_info!("cluster_info-window-request-recv", 1);
trace!( trace!(
"{}: received RequestWindowIndex {} {} ", "{}: received RequestWindowIndex from: {} index: {} ",
self_id, self_id,
from.id, from.id,
ix, ix,

View File

@ -4,26 +4,31 @@ extern crate log;
#[macro_use] #[macro_use]
extern crate serde_json; extern crate serde_json;
use bincode::deserialize;
use solana::client::mk_client; use solana::client::mk_client;
use solana::cluster_info::{Node, NodeInfo}; use solana::cluster_info::{ClusterInfo, Node, NodeInfo};
use solana::create_vote_account::*; use solana::create_vote_account::*;
use solana::db_ledger::DbLedger; use solana::db_ledger::DbLedger;
use solana::entry::Entry;
use solana::fullnode::Fullnode; use solana::fullnode::Fullnode;
use solana::leader_scheduler::LeaderScheduler; use solana::leader_scheduler::LeaderScheduler;
use solana::ledger::{create_tmp_genesis, get_tmp_ledger_path, read_ledger, tmp_copy_ledger}; use solana::ledger::{create_tmp_genesis, get_tmp_ledger_path, tmp_copy_ledger};
use solana::replicator::Replicator; use solana::replicator::Replicator;
use solana::rpc_request::{RpcClient, RpcRequest}; use solana::rpc_request::{RpcClient, RpcRequest};
use solana::streamer::blob_receiver;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::fs::remove_dir_all; use std::fs::remove_dir_all;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc; use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
#[test] #[test]
fn test_replicator_startup() { fn test_replicator_startup() {
solana_logger::setup();
info!("starting replicator test"); info!("starting replicator test");
let replicator_ledger_path = &get_tmp_ledger_path("replicator_test_replicator_ledger"); let replicator_ledger_path = &get_tmp_ledger_path("replicator_test_replicator_ledger");
@ -115,6 +120,7 @@ fn test_replicator_startup() {
info!("starting replicator node"); info!("starting replicator node");
let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey()); let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey());
let replicator_info = replicator_node.info.clone();
let leader_info = NodeInfo::new_entry_point(&leader_info.gossip); let leader_info = NodeInfo::new_entry_point(&leader_info.gossip);
@ -126,38 +132,51 @@ fn test_replicator_startup() {
) )
.unwrap(); .unwrap();
// Poll the ledger dir to see that some is downloaded // Create a client which downloads from the replicator and see that it
let mut num_entries = 0; // can respond with blobs.
for _ in 0..60 { let tn = Node::new_localhost();
match read_ledger(replicator_ledger_path, true) { let cluster_info = ClusterInfo::new(tn.info.clone());
Ok(entries) => { let repair_index = 1;
for _ in entries { let req = cluster_info
num_entries += 1; .window_index_request_bytes(repair_index)
} .unwrap();
info!("{} entries", num_entries);
if num_entries > 0 {
break;
}
}
Err(e) => {
info!("error reading ledger: {:?}", e);
}
}
sleep(Duration::from_millis(300));
// Do a transfer to make sure new entries are created which let exit = Arc::new(AtomicBool::new(false));
// stimulates the repair process let (s_reader, r_reader) = channel();
let last_id = leader_client.get_last_id(); let repair_socket = Arc::new(tn.sockets.repair);
leader_client let t_receiver = blob_receiver(repair_socket.clone(), exit.clone(), s_reader);
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap(); info!(
"Sending repair requests from: {} to: {}",
tn.info.id, replicator_info.gossip
);
let mut num_txs = 0;
for _ in 0..5 {
repair_socket.send_to(&req, replicator_info.gossip).unwrap();
let x = r_reader.recv_timeout(Duration::new(1, 0));
if let Ok(blobs) = x {
for b in blobs {
let br = b.read().unwrap();
assert!(br.index().unwrap() == repair_index);
let entry: Entry = deserialize(&br.data()[..br.meta.size]).unwrap();
info!("entry: {:?}", entry);
num_txs = entry.transactions.len();
}
break;
}
} }
exit.store(true, Ordering::Relaxed);
t_receiver.join().unwrap();
// The replicator will not submit storage proofs if // The replicator will not submit storage proofs if
// chacha is not enabled // chacha is not enabled
#[cfg(feature = "chacha")] #[cfg(feature = "chacha")]
{ {
use solana::rpc_request::{RpcClient, RpcRequest}; use solana::rpc_request::{RpcClient, RpcRequest};
use std::thread::sleep;
let rpc_client = RpcClient::new_from_socket(validator_node_info.rpc); let rpc_client = RpcClient::new_from_socket(validator_node_info.rpc);
let mut non_zero_pubkeys = false; let mut non_zero_pubkeys = false;
@ -177,7 +196,7 @@ fn test_replicator_startup() {
} }
// Check that some ledger was downloaded // Check that some ledger was downloaded
assert!(num_entries > 0); assert!(num_txs != 0);
stop_local_vote_signer_service(t_signer, &signer_exit); stop_local_vote_signer_service(t_signer, &signer_exit);
replicator.close(); replicator.close();