diff --git a/src/bin/replicator.rs b/src/bin/replicator.rs index cadde6ba9b..f823bfeae9 100644 --- a/src/bin/replicator.rs +++ b/src/bin/replicator.rs @@ -14,8 +14,10 @@ use solana::signature::{Keypair, KeypairUtil}; use std::fs::File; use std::net::{Ipv4Addr, SocketAddr}; use std::process::exit; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::thread::sleep; +use std::time::Duration; fn main() { logger::setup(); @@ -75,6 +77,7 @@ fn main() { println!("my node: {:?}", node); let exit = Arc::new(AtomicBool::new(false)); + let done = Arc::new(AtomicBool::new(false)); let network_addr = matches .value_of("network") @@ -83,7 +86,21 @@ fn main() { // TODO: ask network what slice we should store let entry_height = 0; - let replicator = Replicator::new(entry_height, &exit, ledger_path, node, network_addr); + let replicator = Replicator::new( + entry_height, + 5, + &exit, + ledger_path, + node, + network_addr, + done.clone(), + ); + + while !done.load(Ordering::Relaxed) { + sleep(Duration::from_millis(100)); + } + + println!("Done downloading ledger"); replicator.join(); } diff --git a/src/replicator.rs b/src/replicator.rs index e3bcfb00a1..c1245c1ab2 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -25,10 +25,12 @@ pub struct Replicator { impl Replicator { pub fn new( entry_height: u64, + max_entry_height: u64, exit: &Arc, ledger_path: Option<&str>, node: Node, network_addr: Option, + done: Arc, ) -> Replicator { let window = window::new_window_from_entries(&[], entry_height, &node.info); let shared_window = Arc::new(RwLock::new(window)); @@ -57,10 +59,12 @@ impl Replicator { crdt.clone(), shared_window.clone(), entry_height, + max_entry_height, blob_fetch_receiver, entry_window_sender, retransmit_sender, repair_socket, + done, ); let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path); @@ -121,6 +125,7 @@ mod tests { let replicator_ledger_path = "replicator_test_replicator_ledger"; let exit = Arc::new(AtomicBool::new(false)); + let done = Arc::new(AtomicBool::new(false)); let leader_ledger_path = "replicator_test_leader_ledger"; let (mint, leader_ledger_path) = genesis(leader_ledger_path, 100); @@ -155,10 +160,12 @@ mod tests { let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey()); let replicator = Replicator::new( entry_height, + 1, &exit, Some(replicator_ledger_path), replicator_node, Some(network_addr), + done.clone(), ); let mut num_entries = 0; @@ -183,6 +190,7 @@ mod tests { .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) .unwrap(); } + assert_eq!(done.load(Ordering::Relaxed), true); assert!(num_entries > 0); exit.store(true, Ordering::Relaxed); replicator.join(); diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 60ad922438..c1469cf5c2 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -7,7 +7,7 @@ use log::Level; use result::{Error, Result}; use service::Service; use std::net::UdpSocket; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; @@ -80,14 +80,17 @@ impl RetransmitStage { let t_retransmit = retransmitter(retransmit_socket, crdt.clone(), retransmit_receiver); let (entry_sender, entry_receiver) = channel(); + let done = Arc::new(AtomicBool::new(false)); let t_window = window_service( crdt.clone(), window, entry_height, + 0, fetch_stage_receiver, entry_sender, retransmit_sender, repair_socket, + done, ); ( diff --git a/src/window.rs b/src/window.rs index fe68fe957c..83fe0b985f 100644 --- a/src/window.rs +++ b/src/window.rs @@ -58,6 +58,7 @@ pub trait WindowUtil { times: usize, consumed: u64, received: u64, + max_entry_height: u64, ) -> Vec<(SocketAddr, Vec)>; fn print(&self, id: &Pubkey, consumed: u64) -> String; @@ -99,6 +100,7 @@ impl WindowUtil for Window { times: usize, consumed: u64, received: u64, + max_entry_height: u64, ) -> Vec<(SocketAddr, Vec)> { let rcrdt = crdt.read().unwrap(); let leader_rotation_interval = rcrdt.get_leader_rotation_interval(); @@ -108,7 +110,12 @@ impl WindowUtil for Window { let is_next_leader = rcrdt.get_scheduled_leader(next_leader_rotation) == Some(*id); let num_peers = rcrdt.table.len() as u64; - let max_repair = calculate_max_repair(num_peers, consumed, received, times, is_next_leader); + let max_repair = if max_entry_height == 0 { + calculate_max_repair(num_peers, consumed, received, times, is_next_leader) + } else { + max_entry_height + 1 + }; + let idxs = self.clear_slots(consumed, max_repair); let reqs: Vec<_> = idxs .into_iter() diff --git a/src/window_service.rs b/src/window_service.rs index 6c50d913f9..45874b53ed 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -9,7 +9,7 @@ use rand::{thread_rng, Rng}; use result::{Error, Result}; use signature::Pubkey; use std::net::UdpSocket; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; @@ -149,11 +149,13 @@ fn recv_window( recycler: &BlobRecycler, consumed: &mut u64, received: &mut u64, + max_ix: u64, r: &BlobReceiver, s: &EntrySender, retransmit: &BlobSender, pending_retransmits: &mut bool, leader_rotation_interval: u64, + done: Arc, ) -> Result<()> { let timer = Duration::from_millis(200); let mut dq = r.recv_timeout(timer)?; @@ -202,6 +204,13 @@ fn recv_window( continue; } + // For downloading storage blobs, + // we only want up to a certain index + // then stop + if max_ix != 0 && pix > max_ix { + continue; + } + trace!("{} window pix: {} size: {}", id, pix, meta_size); window.write().unwrap().process_blob( @@ -216,6 +225,11 @@ fn recv_window( pending_retransmits, leader_rotation_interval, ); + + // Send a signal when we hit the max entry_height + if max_ix != 0 && *consumed == (max_ix + 1) { + done.store(true, Ordering::Relaxed); + } } if log_enabled!(Level::Trace) { trace!("{}", window.read().unwrap().print(id, *consumed)); @@ -240,10 +254,12 @@ pub fn window_service( crdt: Arc>, window: SharedWindow, entry_height: u64, + max_entry_height: u64, r: BlobReceiver, s: EntrySender, retransmit: BlobSender, repair_socket: Arc, + done: Arc, ) -> JoinHandle> { Builder::new() .name("solana-window".to_string()) @@ -284,11 +300,13 @@ pub fn window_service( &recycler, &mut consumed, &mut received, + max_entry_height, &r, &s, &retransmit, &mut pending_retransmits, leader_rotation_interval, + done.clone(), ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -318,7 +336,7 @@ pub fn window_service( trace!("{} let's repair! times = {}", id, times); let mut window = window.write().unwrap(); - let reqs = window.repair(&crdt, &id, times, consumed, received); + let reqs = window.repair(&crdt, &id, times, consumed, received, max_entry_height); for (to, req) in reqs { repair_socket.send_to(&req, to).unwrap_or_else(|e| { info!("{} repair req send_to({}) error {:?}", id, to, e); @@ -378,14 +396,17 @@ mod test { let (s_window, r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); let win = Arc::new(RwLock::new(default_window())); + let done = Arc::new(AtomicBool::new(false)); let t_window = window_service( subs, win, 0, + 0, r_reader, s_window, s_retransmit, Arc::new(tn.sockets.repair), + done, ); let t_responder = { let (s_responder, r_responder) = channel(); @@ -437,14 +458,17 @@ mod test { let (s_window, _r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); let win = Arc::new(RwLock::new(default_window())); + let done = Arc::new(AtomicBool::new(false)); let t_window = window_service( subs.clone(), win, 0, + 0, r_reader, s_window, s_retransmit, Arc::new(tn.sockets.repair), + done, ); let t_responder = { let (s_responder, r_responder) = channel(); @@ -491,14 +515,17 @@ mod test { let (s_window, _r_window) = channel(); let (s_retransmit, r_retransmit) = channel(); let win = Arc::new(RwLock::new(default_window())); + let done = Arc::new(AtomicBool::new(false)); let t_window = window_service( subs.clone(), win, 0, + 0, r_reader, s_window, s_retransmit, Arc::new(tn.sockets.repair), + done, ); let t_responder = { let (s_responder, r_responder) = channel(); @@ -610,14 +637,17 @@ mod test { let (s_window, _r_window) = channel(); let (s_retransmit, _r_retransmit) = channel(); let win = Arc::new(RwLock::new(default_window())); + let done = Arc::new(AtomicBool::new(false)); let t_window = window_service( subs, win, 0, + 0, r_reader, s_window, s_retransmit, Arc::new(tn.sockets.repair), + done, ); let t_responder = {