From cc48773b03e77ca4bff6c34b863fc45d7e503965 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Mon, 17 Jun 2019 18:12:13 -0700 Subject: [PATCH] Add "download from replicator" utility (#4709) automerge --- core/src/cluster_info.rs | 49 ++++++++------- core/src/repair_service.rs | 2 +- core/src/replicator.rs | 125 ++++++++++++++++++++++++++++++++++++- core/src/window_service.rs | 2 +- core/tests/replicator.rs | 93 ++++----------------------- 5 files changed, 161 insertions(+), 110 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 088f3d7d1c..fc0cd63c91 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -815,33 +815,34 @@ impl ClusterInfo { } let n = thread_rng().gen::() % valid.len(); let addr = valid[n].gossip; // send the request to the peer's gossip port - let out = { - match repair_request { - RepairType::Blob(slot, blob_index) => { - datapoint_debug!( - "cluster_info-repair", - ("repair-slot", *slot, i64), - ("repair-ix", *blob_index, i64) - ); - self.window_index_request_bytes(*slot, *blob_index)? - } - RepairType::HighestBlob(slot, blob_index) => { - datapoint_debug!( - "cluster_info-repair_highest", - ("repair-highest-slot", *slot, i64), - ("repair-highest-ix", *blob_index, i64) - ); - self.window_highest_index_request_bytes(*slot, *blob_index)? - } - RepairType::Orphan(slot) => { - datapoint_debug!("cluster_info-repair_orphan", ("repair-orphan", *slot, i64)); - self.orphan_bytes(*slot)? - } - } - }; + let out = self.map_repair_request(repair_request)?; Ok((addr, out)) } + pub fn map_repair_request(&self, repair_request: &RepairType) -> Result> { + match repair_request { + RepairType::Blob(slot, blob_index) => { + datapoint_debug!( + "cluster_info-repair", + ("repair-slot", *slot, i64), + ("repair-ix", *blob_index, i64) + ); + Ok(self.window_index_request_bytes(*slot, *blob_index)?) + } + RepairType::HighestBlob(slot, blob_index) => { + datapoint_debug!( + "cluster_info-repair_highest", + ("repair-highest-slot", *slot, i64), + ("repair-highest-ix", *blob_index, i64) + ); + Ok(self.window_highest_index_request_bytes(*slot, *blob_index)?) + } + RepairType::Orphan(slot) => { + datapoint_debug!("cluster_info-repair_orphan", ("repair-orphan", *slot, i64)); + Ok(self.orphan_bytes(*slot)?) + } + } + } // If the network entrypoint hasn't been discovered yet, add it to the crds table fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, Bloom, SocketAddr, CrdsValue)>) { match &self.entrypoint { diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 6cad6b8351..74d26b4b40 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -195,7 +195,7 @@ impl RepairService { } // Generate repairs for all slots `x` in the repair_range.start <= x <= repair_range.end - fn generate_repairs_in_range( + pub fn generate_repairs_in_range( blocktree: &Blocktree, max_repairs: usize, repair_range: &RepairSlotRange, diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 5be3ce960c..7862528f30 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -5,11 +5,12 @@ use crate::cluster_info::{ClusterInfo, Node}; use crate::contact_info::ContactInfo; use crate::gossip_service::GossipService; use crate::packet::to_shared_blob; -use crate::repair_service::{RepairSlotRange, RepairStrategy}; +use crate::repair_service::{RepairService, RepairSlotRange, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; -use crate::streamer::{receiver, responder}; +use crate::streamer::{blob_receiver, receiver, responder}; use crate::window_service::WindowService; +use crate::{repair_service, window_service}; use bincode::deserialize; use rand::thread_rng; use rand::Rng; @@ -599,6 +600,126 @@ impl Replicator { "Couldn't get blockhash or slot", ))? } + + /// Ask a replicator to populate a given blocktree with its segment. + /// Return the slot at the start of the replicator's segment + /// + /// It is recommended to use a temporary blocktree for this since the download will not verify + /// blobs received and might impact the chaining of blobs across slots + pub fn download_from_replicator( + cluster_info: &Arc>, + replicator_info: &ContactInfo, + blocktree: &Arc, + ) -> Result<(u64)> { + // Create a client which downloads from the replicator and see that it + // can respond with blobs. + let start_slot = Self::get_replicator_segment_slot(replicator_info.storage_addr); + info!("Replicator download: start at {}", start_slot); + + let exit = Arc::new(AtomicBool::new(false)); + let (s_reader, r_reader) = channel(); + let repair_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap()); + let t_receiver = blob_receiver(repair_socket.clone(), &exit, s_reader); + let id = cluster_info.read().unwrap().id(); + info!( + "Sending repair requests from: {} to: {}", + cluster_info.read().unwrap().my_data().id, + replicator_info.gossip + ); + let repair_slot_range = RepairSlotRange { + start: start_slot, + end: start_slot + SLOTS_PER_SEGMENT, + }; + // try for upto 180 seconds //TODO needs tuning if segments are huge + for _ in 0..120 { + // Strategy used by replicators + let repairs = RepairService::generate_repairs_in_range( + blocktree, + repair_service::MAX_REPAIR_LENGTH, + &repair_slot_range, + ); + //iter over the repairs and send them + if let Ok(repairs) = repairs { + let reqs: Vec<_> = repairs + .into_iter() + .filter_map(|repair_request| { + cluster_info + .read() + .unwrap() + .map_repair_request(&repair_request) + .map(|result| ((replicator_info.gossip, result), repair_request)) + .ok() + }) + .collect(); + + for ((to, req), repair_request) in reqs { + if let Ok(local_addr) = repair_socket.local_addr() { + datapoint_info!( + "replicator_download", + ("repair_request", format!("{:?}", repair_request), String), + ("to", to.to_string(), String), + ("from", local_addr.to_string(), String), + ("id", id.to_string(), String) + ); + } + repair_socket + .send_to(&req, replicator_info.gossip) + .unwrap_or_else(|e| { + error!("{} repair req send_to({}) error {:?}", id, to, e); + 0 + }); + } + } + let res = r_reader.recv_timeout(Duration::new(1, 0)); + if let Ok(blobs) = res { + window_service::process_blobs(&blobs, blocktree)?; + } + // check if all the slots in the segment are complete + if Self::segment_complete(start_slot, blocktree) { + break; + } + sleep(Duration::from_millis(500)); + } + exit.store(true, Ordering::Relaxed); + t_receiver.join().unwrap(); + + // check if all the slots in the segment are complete + if !Self::segment_complete(start_slot, blocktree) { + Err(io::Error::new( + ErrorKind::Other, + "Unable to download the full segment", + ))? + } + Ok(start_slot) + } + + fn segment_complete(start_slot: u64, blocktree: &Arc) -> bool { + for slot in start_slot..(start_slot + SLOTS_PER_SEGMENT) { + if !blocktree.is_full(slot) { + return false; + } + } + true + } + + fn get_replicator_segment_slot(to: SocketAddr) -> u64 { + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + socket + .set_read_timeout(Some(Duration::from_secs(5))) + .unwrap(); + + let req = ReplicatorRequest::GetSlotHeight(socket.local_addr().unwrap()); + let serialized_req = bincode::serialize(&req).unwrap(); + for _ in 0..10 { + socket.send_to(&serialized_req, to).unwrap(); + let mut buf = [0; 1024]; + if let Ok((size, _addr)) = socket.recv_from(&mut buf) { + return deserialize(&buf[..size]).unwrap(); + } + sleep(Duration::from_millis(500)); + } + panic!("Couldn't get slot height!"); + } } #[cfg(test)] diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 4e879a4358..0c34a5a5f8 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -50,7 +50,7 @@ fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) } /// Process a blob: Add blob to the ledger window. -fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc) -> Result<()> { +pub fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc) -> Result<()> { // make an iterator for insert_data_blobs() let blobs: Vec<_> = blobs.iter().map(move |blob| blob.read().unwrap()).collect(); diff --git a/core/tests/replicator.rs b/core/tests/replicator.rs index 4eb677ab66..9cd4dcfd71 100644 --- a/core/tests/replicator.rs +++ b/core/tests/replicator.rs @@ -4,98 +4,19 @@ extern crate log; #[macro_use] extern crate solana; -use bincode::{deserialize, serialize}; -use solana::blocktree::{create_new_tmp_ledger, Blocktree}; +use solana::blocktree::{create_new_tmp_ledger, get_tmp_ledger_path, Blocktree}; use solana::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE}; use solana::contact_info::ContactInfo; use solana::gossip_service::discover_cluster; use solana::local_cluster::{ClusterConfig, LocalCluster}; use solana::replicator::Replicator; -use solana::replicator::ReplicatorRequest; use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT; -use solana::streamer::blob_receiver; use solana::validator::ValidatorConfig; use solana_client::thin_client::create_client; use solana_sdk::genesis_block::create_genesis_block; -use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; -use std::net::SocketAddr; -use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; -use std::sync::Arc; -use std::thread::sleep; -use std::time::Duration; - -fn get_slot_height(to: SocketAddr) -> u64 { - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - socket - .set_read_timeout(Some(Duration::from_secs(5))) - .unwrap(); - - let req = ReplicatorRequest::GetSlotHeight(socket.local_addr().unwrap()); - let serialized_req = serialize(&req).unwrap(); - for _ in 0..10 { - socket.send_to(&serialized_req, to).unwrap(); - let mut buf = [0; 1024]; - if let Ok((size, _addr)) = socket.recv_from(&mut buf) { - return deserialize(&buf[..size]).unwrap(); - } - sleep(Duration::from_millis(500)); - } - panic!("Couldn't get slot height!"); -} - -fn download_from_replicator(replicator_info: &ContactInfo) { - // Create a client which downloads from the replicator and see that it - // can respond with blobs. - let tn = Node::new_localhost(); - let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone()); - let mut repair_index = get_slot_height(replicator_info.storage_addr); - info!("repair index: {}", repair_index); - - repair_index = 0; - let req = cluster_info - .window_index_request_bytes(0, repair_index) - .unwrap(); - - let exit = Arc::new(AtomicBool::new(false)); - let (s_reader, r_reader) = channel(); - let repair_socket = Arc::new(tn.sockets.repair); - let t_receiver = blob_receiver(repair_socket.clone(), &exit, s_reader); - - info!( - "Sending repair requests from: {} to: {}", - tn.info.id, replicator_info.gossip - ); - - let mut received_blob = false; - for _ in 0..5 { - repair_socket.send_to(&req, replicator_info.gossip).unwrap(); - - let x = r_reader.recv_timeout(Duration::new(1, 0)); - - if let Ok(blobs) = x { - for b in blobs { - let br = b.read().unwrap(); - assert!(br.index() == repair_index); - info!("br: {:?}", br); - let entries = Blocktree::deserialize_blob_data(&br.data()).unwrap(); - for entry in &entries { - info!("entry: {:?}", entry); - assert_ne!(entry.hash, Hash::default()); - received_blob = true; - } - } - break; - } - } - exit.store(true, Ordering::Relaxed); - t_receiver.join().unwrap(); - - assert!(received_blob); -} +use std::sync::{Arc, RwLock}; /// Start the cluster with the given configuration and wait till the replicators are discovered /// Then download blobs from one of them. @@ -134,7 +55,15 @@ fn run_replicator_startup_basic(num_nodes: usize, num_replicators: usize) { } assert_eq!(replicator_count, num_replicators); - download_from_replicator(&replicator_info); + let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( + cluster_nodes[0].clone(), + ))); + let path = get_tmp_ledger_path("test"); + let blocktree = Arc::new(Blocktree::open(&path).unwrap()); + assert_eq!( + Replicator::download_from_replicator(&cluster_info, &replicator_info, &blocktree).unwrap(), + 0 + ); } #[test]