Add test for replicator ledger download

Add an interface to query the storage slot a
  replicator is holding on storage_addr port.
Fix logic to poll blocktree for all slots
  replicated being filled.
Add test logic to ask replicator what slot it
  is replicating and then download an entry in
  the slot.
This commit is contained in:
Stephen Akridge 2019-03-14 12:57:02 -07:00 committed by sakridge
parent 07f4dd385d
commit ee58c1f960
4 changed files with 215 additions and 28 deletions

View File

@ -65,8 +65,8 @@ pub fn discover(gossip_addr: &SocketAddr, num_nodes: usize) -> std::io::Result<V
let now = Instant::now();
let mut i = 0;
while now.elapsed() < Duration::from_secs(30) {
let rpc_peers = spy_ref.read().unwrap().rpc_peers();
if rpc_peers.len() >= num_nodes {
let tvu_peers = spy_ref.read().unwrap().tvu_peers();
if tvu_peers.len() >= num_nodes {
info!(
"discover success in {}s...\n{}",
now.elapsed().as_secs(),
@ -75,7 +75,7 @@ pub fn discover(gossip_addr: &SocketAddr, num_nodes: usize) -> std::io::Result<V
exit.store(true, Ordering::Relaxed);
gossip_service.join().unwrap();
return Ok(rpc_peers);
return Ok(tvu_peers);
}
if i % 20 == 0 {
info!(

View File

@ -196,7 +196,7 @@ impl LocalCluster {
&replicator_keypair.pubkey(),
1,
);
let replicator_node = Node::new_localhost_with_pubkey(&replicator_keypair.pubkey());
let replicator_node = Node::new_localhost_replicator(&replicator_keypair.pubkey());
let (replicator_ledger_path, _blockhash) = create_new_tmp_ledger!(&self.genesis_block);
let replicator = Replicator::new(

View File

@ -5,11 +5,15 @@ use crate::chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE};
use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE};
use crate::contact_info::ContactInfo;
use crate::gossip_service::GossipService;
use crate::packet::to_shared_blob;
use crate::repair_service::RepairSlotRange;
use crate::result::Result;
use crate::service::Service;
use crate::storage_stage::{get_segment_from_entry, ENTRIES_PER_SEGMENT};
use crate::streamer::receiver;
use crate::streamer::responder;
use crate::window_service::WindowService;
use bincode::deserialize;
use rand::thread_rng;
use rand::Rng;
use solana_client::rpc_client::RpcClient;
@ -27,9 +31,10 @@ use std::io::Seek;
use std::io::SeekFrom;
use std::io::{Error, ErrorKind};
use std::mem::size_of;
use std::net::UdpSocket;
use std::net::{SocketAddr, UdpSocket};
use std::path::Path;
use std::path::PathBuf;
use std::result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
@ -38,25 +43,29 @@ use std::thread::spawn;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
#[derive(Serialize, Deserialize)]
pub enum ReplicatorRequest {
GetSlotHeight(SocketAddr),
}
pub struct Replicator {
gossip_service: GossipService,
fetch_stage: BlobFetchStage,
window_service: WindowService,
t_retransmit: JoinHandle<()>,
thread_handles: Vec<JoinHandle<()>>,
exit: Arc<AtomicBool>,
slot: u64,
ledger_path: String,
keypair: Arc<Keypair>,
signature: ring::signature::Signature,
cluster_entrypoint: ContactInfo,
node_info: ContactInfo,
cluster_info: Arc<RwLock<ClusterInfo>>,
ledger_data_file_encrypted: PathBuf,
sampling_offsets: Vec<u64>,
hash: Hash,
blocktree: Arc<Blocktree>,
#[cfg(feature = "chacha")]
num_chacha_blocks: usize,
#[cfg(feature = "chacha")]
blocktree: Arc<Blocktree>,
}
pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<Hash> {
@ -108,6 +117,54 @@ fn get_entry_heights_from_blockhash(
segment_index * ENTRIES_PER_SEGMENT
}
fn create_request_processor(
socket: UdpSocket,
exit: &Arc<AtomicBool>,
slot: u64,
) -> Vec<JoinHandle<()>> {
let mut thread_handles = vec![];
let (s_reader, r_reader) = channel();
let (s_responder, r_responder) = channel();
let storage_socket = Arc::new(socket);
let t_receiver = receiver(
storage_socket.clone(),
exit,
s_reader,
"replicator-receiver",
);
thread_handles.push(t_receiver);
let t_responder = responder("replicator-responder", storage_socket.clone(), r_responder);
thread_handles.push(t_responder);
let exit4 = exit.clone();
let t_processor = spawn(move || loop {
let packets = r_reader.recv_timeout(Duration::from_secs(1));
if let Ok(packets) = packets {
for packet in &packets.read().unwrap().packets {
let req: result::Result<ReplicatorRequest, Box<bincode::ErrorKind>> =
deserialize(&packet.data[..packet.meta.size]);
match req {
Ok(ReplicatorRequest::GetSlotHeight(from)) => {
if let Ok(blob) = to_shared_blob(slot, from) {
let _ = s_responder.send(vec![blob]);
}
}
Err(e) => {
info!("invalid request: {:?}", e);
}
}
}
}
if exit4.load(Ordering::Relaxed) {
break;
}
});
thread_handles.push(t_processor);
thread_handles
}
impl Replicator {
/// Returns a Result that contains a replicator on success
///
@ -194,6 +251,9 @@ impl Replicator {
repair_slot_range,
);
let mut thread_handles =
create_request_processor(node.sockets.storage.unwrap(), &exit, slot);
// receive blobs from retransmit and drop them.
let exit2 = exit.clone();
let t_retransmit = spawn(move || loop {
@ -202,31 +262,40 @@ impl Replicator {
break;
}
});
thread_handles.push(t_retransmit);
let exit3 = exit.clone();
let blocktree1 = blocktree.clone();
let t_replicate = spawn(move || loop {
Self::wait_for_ledger_download(slot, &blocktree1, &exit3, &node_info, &cluster_info);
if exit3.load(Ordering::Relaxed) {
break;
}
});
thread_handles.push(t_replicate);
Ok(Self {
gossip_service,
fetch_stage,
window_service,
t_retransmit,
thread_handles,
exit,
slot,
ledger_path: ledger_path.to_string(),
keypair: keypair.clone(),
signature,
cluster_entrypoint,
node_info,
cluster_info,
ledger_data_file_encrypted: PathBuf::default(),
sampling_offsets: vec![],
hash: Hash::default(),
blocktree,
#[cfg(feature = "chacha")]
num_chacha_blocks: 0,
#[cfg(feature = "chacha")]
blocktree,
})
}
pub fn run(&mut self) {
self.wait_for_ledger_download();
self.encrypt_ledger()
.expect("ledger encrypt not successful");
loop {
@ -239,30 +308,61 @@ impl Replicator {
}
}
fn wait_for_ledger_download(&self) {
fn wait_for_ledger_download(
start_slot: u64,
blocktree: &Arc<Blocktree>,
exit: &Arc<AtomicBool>,
node_info: &ContactInfo,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) {
info!("window created, waiting for ledger download done");
let _start = Instant::now();
let mut _received_so_far = 0;
let mut current_slot = start_slot;
let mut done = false;
loop {
if let Ok(entries) = self.blocktree.get_slot_entries(self.slot, 0, None) {
if !entries.is_empty() {
loop {
if let Ok(meta) = blocktree.meta(current_slot) {
if let Some(meta) = meta {
if meta.is_rooted {
current_slot += 1;
info!("current slot: {}", current_slot);
} else {
break;
}
} else {
break;
}
} else {
break;
}
if current_slot >= start_slot + ENTRIES_PER_SEGMENT {
info!("current slot: {} start: {}", current_slot, start_slot);
done = true;
break;
}
}
if done {
break;
}
if exit.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_secs(1));
}
info!("Done receiving entries from window_service");
let mut contact_info = self.node_info.clone();
// Remove replicator from the data plane
let mut contact_info = node_info.clone();
contact_info.tvu = "0.0.0.0:0".parse().unwrap();
{
let mut cluster_info_w = self.cluster_info.write().unwrap();
let mut cluster_info_w = cluster_info.write().unwrap();
cluster_info_w.insert_self(contact_info);
}
info!("Done downloading ledger at {}", self.ledger_path);
}
fn encrypt_ledger(&mut self) -> Result<()> {
@ -345,7 +445,9 @@ impl Replicator {
self.gossip_service.join().unwrap();
self.fetch_stage.join().unwrap();
self.window_service.join().unwrap();
self.t_retransmit.join().unwrap();
for handle in self.thread_handles {
handle.join().unwrap();
}
}
pub fn entry_height(&self) -> u64 {

View File

@ -4,49 +4,134 @@ extern crate log;
#[macro_use]
extern crate solana;
use bincode::{deserialize, serialize};
use solana::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree, Blocktree};
use solana::cluster_info::Node;
use solana::cluster_info::{ClusterInfo, Node};
use solana::contact_info::ContactInfo;
use solana::entry::Entry;
use solana::fullnode::{Fullnode, FullnodeConfig};
use solana::gossip_service::discover;
use solana::local_cluster::LocalCluster;
use solana::replicator::Replicator;
use solana::replicator::ReplicatorRequest;
use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT;
use solana::streamer::blob_receiver;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::DEFAULT_SLOTS_PER_EPOCH;
use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
use std::fs::remove_dir_all;
use std::net::SocketAddr;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
fn get_slot_height(to: SocketAddr) -> u64 {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
let req = ReplicatorRequest::GetSlotHeight(socket.local_addr().unwrap());
let serialized_req = serialize(&req).unwrap();
for _ in 0..10 {
socket.send_to(&serialized_req, to).unwrap();
let mut buf = [0; 1024];
if let Ok((size, _addr)) = socket.recv_from(&mut buf) {
return deserialize(&buf[..size]).unwrap();
}
sleep(Duration::from_millis(500));
}
panic!("Couldn't get slot height!");
}
fn download_from_replicator(replicator_info: &ContactInfo) {
// Create a client which downloads from the replicator and see that it
// can respond with blobs.
let tn = Node::new_localhost();
let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone());
let mut repair_index = get_slot_height(replicator_info.storage_addr);
info!("repair index: {}", repair_index);
repair_index = 0;
let req = cluster_info
.window_index_request_bytes(0, repair_index)
.unwrap();
let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel();
let repair_socket = Arc::new(tn.sockets.repair);
let t_receiver = blob_receiver(repair_socket.clone(), &exit, s_reader);
info!(
"Sending repair requests from: {} to: {}",
tn.info.id, replicator_info.gossip
);
let mut received_blob = false;
for _ in 0..5 {
repair_socket.send_to(&req, replicator_info.gossip).unwrap();
let x = r_reader.recv_timeout(Duration::new(1, 0));
if let Ok(blobs) = x {
for b in blobs {
let br = b.read().unwrap();
assert!(br.index() == repair_index);
let entry: Entry = deserialize(&br.data()[..br.meta.size]).unwrap();
info!("entry: {:?}", entry);
assert_ne!(entry.hash, Hash::default());
received_blob = true;
}
break;
}
}
exit.store(true, Ordering::Relaxed);
t_receiver.join().unwrap();
assert!(received_blob);
}
#[test]
fn test_replicator_startup_basic() {
solana_logger::setup();
info!("starting replicator test");
const NUM_NODES: usize = 2;
let num_replicators = 1;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT;
let _cluster = LocalCluster::new_with_config_replicators(
let cluster = LocalCluster::new_with_config_replicators(
&[100; NUM_NODES],
10_000,
&fullnode_config,
1,
num_replicators,
DEFAULT_TICKS_PER_SLOT,
DEFAULT_SLOTS_PER_EPOCH,
);
let cluster_nodes = discover(&cluster.entry_point_info.gossip, 3).unwrap();
assert_eq!(cluster_nodes.len(), 3);
let cluster_nodes = discover(
&cluster.entry_point_info.gossip,
NUM_NODES + num_replicators,
)
.unwrap();
assert_eq!(cluster_nodes.len(), NUM_NODES + num_replicators);
let mut replicator_count = 0;
let mut replicator_info = ContactInfo::default();
for node in &cluster_nodes {
info!("storage: {:?} rpc: {:?}", node.storage_addr, node.rpc);
if ContactInfo::is_valid_address(&node.storage_addr) {
replicator_count += 1;
replicator_info = node.clone();
}
}
assert_eq!(replicator_count, 1);
assert_eq!(replicator_count, num_replicators);
download_from_replicator(&replicator_info);
}
#[test]