Add max entry height to download for replicator

This commit is contained in:
Stephen Akridge 2018-09-24 14:10:51 -07:00 committed by sakridge
parent bb7ecc7cd9
commit a5b28349ed
5 changed files with 71 additions and 6 deletions

View File

@ -14,8 +14,10 @@ use solana::signature::{Keypair, KeypairUtil};
use std::fs::File;
use std::net::{Ipv4Addr, SocketAddr};
use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
fn main() {
logger::setup();
@ -75,6 +77,7 @@ fn main() {
println!("my node: {:?}", node);
let exit = Arc::new(AtomicBool::new(false));
let done = Arc::new(AtomicBool::new(false));
let network_addr = matches
.value_of("network")
@ -83,7 +86,21 @@ fn main() {
// TODO: ask network what slice we should store
let entry_height = 0;
let replicator = Replicator::new(entry_height, &exit, ledger_path, node, network_addr);
let replicator = Replicator::new(
entry_height,
5,
&exit,
ledger_path,
node,
network_addr,
done.clone(),
);
while !done.load(Ordering::Relaxed) {
sleep(Duration::from_millis(100));
}
println!("Done downloading ledger");
replicator.join();
}

View File

@ -25,10 +25,12 @@ pub struct Replicator {
impl Replicator {
pub fn new(
entry_height: u64,
max_entry_height: u64,
exit: &Arc<AtomicBool>,
ledger_path: Option<&str>,
node: Node,
network_addr: Option<SocketAddr>,
done: Arc<AtomicBool>,
) -> Replicator {
let window = window::new_window_from_entries(&[], entry_height, &node.info);
let shared_window = Arc::new(RwLock::new(window));
@ -57,10 +59,12 @@ impl Replicator {
crdt.clone(),
shared_window.clone(),
entry_height,
max_entry_height,
blob_fetch_receiver,
entry_window_sender,
retransmit_sender,
repair_socket,
done,
);
let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path);
@ -121,6 +125,7 @@ mod tests {
let replicator_ledger_path = "replicator_test_replicator_ledger";
let exit = Arc::new(AtomicBool::new(false));
let done = Arc::new(AtomicBool::new(false));
let leader_ledger_path = "replicator_test_leader_ledger";
let (mint, leader_ledger_path) = genesis(leader_ledger_path, 100);
@ -155,10 +160,12 @@ mod tests {
let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey());
let replicator = Replicator::new(
entry_height,
1,
&exit,
Some(replicator_ledger_path),
replicator_node,
Some(network_addr),
done.clone(),
);
let mut num_entries = 0;
@ -183,6 +190,7 @@ mod tests {
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();
}
assert_eq!(done.load(Ordering::Relaxed), true);
assert!(num_entries > 0);
exit.store(true, Ordering::Relaxed);
replicator.join();

View File

@ -7,7 +7,7 @@ use log::Level;
use result::{Error, Result};
use service::Service;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::mpsc::RecvTimeoutError;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
@ -80,14 +80,17 @@ impl RetransmitStage {
let t_retransmit = retransmitter(retransmit_socket, crdt.clone(), retransmit_receiver);
let (entry_sender, entry_receiver) = channel();
let done = Arc::new(AtomicBool::new(false));
let t_window = window_service(
crdt.clone(),
window,
entry_height,
0,
fetch_stage_receiver,
entry_sender,
retransmit_sender,
repair_socket,
done,
);
(

View File

@ -58,6 +58,7 @@ pub trait WindowUtil {
times: usize,
consumed: u64,
received: u64,
max_entry_height: u64,
) -> Vec<(SocketAddr, Vec<u8>)>;
fn print(&self, id: &Pubkey, consumed: u64) -> String;
@ -99,6 +100,7 @@ impl WindowUtil for Window {
times: usize,
consumed: u64,
received: u64,
max_entry_height: u64,
) -> Vec<(SocketAddr, Vec<u8>)> {
let rcrdt = crdt.read().unwrap();
let leader_rotation_interval = rcrdt.get_leader_rotation_interval();
@ -108,7 +110,12 @@ impl WindowUtil for Window {
let is_next_leader = rcrdt.get_scheduled_leader(next_leader_rotation) == Some(*id);
let num_peers = rcrdt.table.len() as u64;
let max_repair = calculate_max_repair(num_peers, consumed, received, times, is_next_leader);
let max_repair = if max_entry_height == 0 {
calculate_max_repair(num_peers, consumed, received, times, is_next_leader)
} else {
max_entry_height + 1
};
let idxs = self.clear_slots(consumed, max_repair);
let reqs: Vec<_> = idxs
.into_iter()

View File

@ -9,7 +9,7 @@ use rand::{thread_rng, Rng};
use result::{Error, Result};
use signature::Pubkey;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle};
@ -149,11 +149,13 @@ fn recv_window(
recycler: &BlobRecycler,
consumed: &mut u64,
received: &mut u64,
max_ix: u64,
r: &BlobReceiver,
s: &EntrySender,
retransmit: &BlobSender,
pending_retransmits: &mut bool,
leader_rotation_interval: u64,
done: Arc<AtomicBool>,
) -> Result<()> {
let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?;
@ -202,6 +204,13 @@ fn recv_window(
continue;
}
// For downloading storage blobs,
// we only want up to a certain index
// then stop
if max_ix != 0 && pix > max_ix {
continue;
}
trace!("{} window pix: {} size: {}", id, pix, meta_size);
window.write().unwrap().process_blob(
@ -216,6 +225,11 @@ fn recv_window(
pending_retransmits,
leader_rotation_interval,
);
// Send a signal when we hit the max entry_height
if max_ix != 0 && *consumed == (max_ix + 1) {
done.store(true, Ordering::Relaxed);
}
}
if log_enabled!(Level::Trace) {
trace!("{}", window.read().unwrap().print(id, *consumed));
@ -240,10 +254,12 @@ pub fn window_service(
crdt: Arc<RwLock<Crdt>>,
window: SharedWindow,
entry_height: u64,
max_entry_height: u64,
r: BlobReceiver,
s: EntrySender,
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
done: Arc<AtomicBool>,
) -> JoinHandle<Option<WindowServiceReturnType>> {
Builder::new()
.name("solana-window".to_string())
@ -284,11 +300,13 @@ pub fn window_service(
&recycler,
&mut consumed,
&mut received,
max_entry_height,
&r,
&s,
&retransmit,
&mut pending_retransmits,
leader_rotation_interval,
done.clone(),
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
@ -318,7 +336,7 @@ pub fn window_service(
trace!("{} let's repair! times = {}", id, times);
let mut window = window.write().unwrap();
let reqs = window.repair(&crdt, &id, times, consumed, received);
let reqs = window.repair(&crdt, &id, times, consumed, received, max_entry_height);
for (to, req) in reqs {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
@ -378,14 +396,17 @@ mod test {
let (s_window, r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let win = Arc::new(RwLock::new(default_window()));
let done = Arc::new(AtomicBool::new(false));
let t_window = window_service(
subs,
win,
0,
0,
r_reader,
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
done,
);
let t_responder = {
let (s_responder, r_responder) = channel();
@ -437,14 +458,17 @@ mod test {
let (s_window, _r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let win = Arc::new(RwLock::new(default_window()));
let done = Arc::new(AtomicBool::new(false));
let t_window = window_service(
subs.clone(),
win,
0,
0,
r_reader,
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
done,
);
let t_responder = {
let (s_responder, r_responder) = channel();
@ -491,14 +515,17 @@ mod test {
let (s_window, _r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let win = Arc::new(RwLock::new(default_window()));
let done = Arc::new(AtomicBool::new(false));
let t_window = window_service(
subs.clone(),
win,
0,
0,
r_reader,
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
done,
);
let t_responder = {
let (s_responder, r_responder) = channel();
@ -610,14 +637,17 @@ mod test {
let (s_window, _r_window) = channel();
let (s_retransmit, _r_retransmit) = channel();
let win = Arc::new(RwLock::new(default_window()));
let done = Arc::new(AtomicBool::new(false));
let t_window = window_service(
subs,
win,
0,
0,
r_reader,
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
done,
);
let t_responder = {