Initialize Window, not SharedWindow

Wrap with Arc<RwLock>> when/if needed, no earlier.
This commit is contained in:
Greg Fitzgerald 2018-09-07 16:08:37 -06:00
parent 7f669094de
commit fc64e1853c
7 changed files with 44 additions and 63 deletions

View File

@ -731,14 +731,8 @@ fn converge(
spy_crdt.insert(&leader); spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id); spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_ref = Arc::new(RwLock::new(spy_crdt));
let window = default_window(); let window = Arc::new(RwLock::new(default_window()));
let ncp = Ncp::new( let ncp = Ncp::new(&spy_ref, window, None, gossip_socket, exit_signal.clone());
&spy_ref,
window.clone(),
None,
gossip_socket,
exit_signal.clone(),
);
let mut v: Vec<NodeInfo> = vec![]; let mut v: Vec<NodeInfo> = vec![];
//wait for the network to converge, 30 seconds should be plenty //wait for the network to converge, 30 seconds should be plenty
for _ in 0..30 { for _ in 0..30 {

View File

@ -1684,7 +1684,7 @@ mod tests {
#[test] #[test]
fn run_window_request() { fn run_window_request() {
logger::setup(); logger::setup();
let window = default_window(); let window = Arc::new(RwLock::new(default_window()));
let me = NodeInfo::new( let me = NodeInfo::new(
Keypair::new().pubkey(), Keypair::new().pubkey(),
socketaddr!("127.0.0.1:1234"), socketaddr!("127.0.0.1:1234"),
@ -1767,7 +1767,7 @@ mod tests {
/// test window requests respond with the right blob, and do not overrun /// test window requests respond with the right blob, and do not overrun
#[test] #[test]
fn run_window_request_with_backoff() { fn run_window_request_with_backoff() {
let window = default_window(); let window = Arc::new(RwLock::new(default_window()));
let mut me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let mut me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
me.leader_id = me.id; me.leader_id = me.id;
@ -1874,7 +1874,7 @@ mod tests {
#[test] #[test]
fn protocol_requestupdate_alive() { fn protocol_requestupdate_alive() {
logger::setup(); logger::setup();
let window = default_window(); let window = Arc::new(RwLock::new(default_window()));
let recycler = BlobRecycler::default(); let recycler = BlobRecycler::default();
let node = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); let node = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));

View File

@ -202,12 +202,13 @@ impl Fullnode {
let blob_recycler = BlobRecycler::default(); let blob_recycler = BlobRecycler::default();
let window = let window =
window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler); window::new_window_from_entries(ledger_tail, entry_height, &node.info, &blob_recycler);
let shared_window = Arc::new(RwLock::new(window));
let crdt = Arc::new(RwLock::new(Crdt::new(node.info).expect("Crdt::new"))); let crdt = Arc::new(RwLock::new(Crdt::new(node.info).expect("Crdt::new")));
let ncp = Ncp::new( let ncp = Ncp::new(
&crdt, &crdt,
window.clone(), shared_window.clone(),
ledger_path, ledger_path,
node.sockets.gossip, node.sockets.gossip,
exit.clone(), exit.clone(),
@ -224,7 +225,7 @@ impl Fullnode {
&bank, &bank,
entry_height, entry_height,
crdt, crdt,
window, shared_window,
node.sockets.replicate, node.sockets.replicate,
node.sockets.repair, node.sockets.repair,
node.sockets.retransmit, node.sockets.retransmit,
@ -256,7 +257,7 @@ impl Fullnode {
let broadcast_stage = BroadcastStage::new( let broadcast_stage = BroadcastStage::new(
node.sockets.broadcast, node.sockets.broadcast,
crdt, crdt,
window, shared_window,
entry_height, entry_height,
blob_recycler.clone(), blob_recycler.clone(),
blob_receiver, blob_receiver,

View File

@ -169,7 +169,7 @@ pub mod tests {
gossip: UdpSocket, gossip: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> (Ncp, SharedWindow) { ) -> (Ncp, SharedWindow) {
let window = window::default_window(); let window = Arc::new(RwLock::new(window::default_window()));
let ncp = Ncp::new(&crdt, window.clone(), None, gossip, exit); let ncp = Ncp::new(&crdt, window.clone(), None, gossip, exit);
(ncp, window) (ncp, window)
} }

View File

@ -326,11 +326,8 @@ pub fn blob_idx_in_window(id: &Pubkey, pix: u64, consumed: u64, received: &mut u
} }
} }
pub fn default_window() -> SharedWindow { pub fn default_window() -> Window {
Arc::new(RwLock::new(vec![ vec![WindowSlot::default(); WINDOW_SIZE as usize]
WindowSlot::default();
WINDOW_SIZE as usize
]))
} }
pub fn index_blobs( pub fn index_blobs(
@ -361,33 +358,29 @@ pub fn initialized_window(
node_info: &NodeInfo, node_info: &NodeInfo,
blobs: Vec<SharedBlob>, blobs: Vec<SharedBlob>,
entry_height: u64, entry_height: u64,
) -> SharedWindow { ) -> Window {
let window = default_window(); let mut window = default_window();
let id = node_info.id; let id = node_info.id;
{ trace!(
let mut win = window.write().unwrap(); "{} initialized window entry_height:{} blobs_len:{}",
id,
entry_height,
blobs.len()
);
trace!( // Index the blobs
"{} initialized window entry_height:{} blobs_len:{}", let mut received = entry_height - blobs.len() as u64;
id, index_blobs(&node_info, &blobs, &mut received).expect("index blobs for initial window");
entry_height,
blobs.len()
);
// Index the blobs // populate the window, offset by implied index
let mut received = entry_height - blobs.len() as u64; let diff = cmp::max(blobs.len() as isize - window.len() as isize, 0) as usize;
index_blobs(&node_info, &blobs, &mut received).expect("index blobs for initial window"); for b in blobs.into_iter().skip(diff) {
let ix = b.read().unwrap().get_index().expect("blob index");
// populate the window, offset by implied index let pos = (ix % WINDOW_SIZE) as usize;
let diff = cmp::max(blobs.len() as isize - win.len() as isize, 0) as usize; trace!("{} caching {} at {}", id, ix, pos);
for b in blobs.into_iter().skip(diff) { assert!(window[pos].data.is_none());
let ix = b.read().unwrap().get_index().expect("blob index"); window[pos].data = Some(b);
let pos = (ix % WINDOW_SIZE) as usize;
trace!("{} caching {} at {}", id, ix, pos);
assert!(win[pos].data.is_none());
win[pos].data = Some(b);
}
} }
window window
@ -398,7 +391,7 @@ pub fn new_window_from_entries(
entry_height: u64, entry_height: u64,
node_info: &NodeInfo, node_info: &NodeInfo,
blob_recycler: &BlobRecycler, blob_recycler: &BlobRecycler,
) -> SharedWindow { ) -> Window {
// convert to blobs // convert to blobs
let blobs = ledger_tail.to_blobs(&blob_recycler); let blobs = ledger_tail.to_blobs(&blob_recycler);
initialized_window(&node_info, blobs, entry_height) initialized_window(&node_info, blobs, entry_height)
@ -406,8 +399,6 @@ pub fn new_window_from_entries(
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crdt::{Crdt, Node};
use logger;
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE};
use signature::Pubkey; use signature::Pubkey;
use std::io; use std::io;
@ -415,12 +406,10 @@ mod test {
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use streamer::{blob_receiver, receiver, responder, BlobReceiver, PacketReceiver}; use streamer::{receiver, responder, PacketReceiver};
use window::{ use window::{blob_idx_in_window, calculate_highest_lost_blob_index, WINDOW_SIZE};
blob_idx_in_window, calculate_highest_lost_blob_index, default_window, WINDOW_SIZE,
};
fn get_msgs(r: PacketReceiver, num: &mut usize) { fn get_msgs(r: PacketReceiver, num: &mut usize) {
for _t in 0..5 { for _t in 0..5 {

View File

@ -303,16 +303,13 @@ pub fn window_service(
mod test { mod test {
use crdt::{Crdt, Node}; use crdt::{Crdt, Node};
use logger; use logger;
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_DATA_SIZE}; use packet::{BlobRecycler, PACKET_DATA_SIZE};
use signature::Pubkey;
use std::io;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
use streamer::{blob_receiver, receiver, responder, BlobReceiver, PacketReceiver}; use streamer::{blob_receiver, responder, BlobReceiver};
use window::{default_window, WINDOW_SIZE}; use window::default_window;
use window_service::{repair_backoff, window_service}; use window_service::{repair_backoff, window_service};
fn get_blobs(r: BlobReceiver, num: &mut usize) { fn get_blobs(r: BlobReceiver, num: &mut usize) {
@ -353,7 +350,7 @@ mod test {
); );
let (s_window, r_window) = channel(); let (s_window, r_window) = channel();
let (s_retransmit, r_retransmit) = channel(); let (s_retransmit, r_retransmit) = channel();
let win = default_window(); let win = Arc::new(RwLock::new(default_window()));
let t_window = window_service( let t_window = window_service(
subs, subs,
win, win,
@ -423,7 +420,7 @@ mod test {
); );
let (s_window, _r_window) = channel(); let (s_window, _r_window) = channel();
let (s_retransmit, r_retransmit) = channel(); let (s_retransmit, r_retransmit) = channel();
let win = default_window(); let win = Arc::new(RwLock::new(default_window()));
let t_window = window_service( let t_window = window_service(
subs.clone(), subs.clone(),
win, win,
@ -486,7 +483,7 @@ mod test {
); );
let (s_window, _r_window) = channel(); let (s_window, _r_window) = channel();
let (s_retransmit, r_retransmit) = channel(); let (s_retransmit, r_retransmit) = channel();
let win = default_window(); let win = Arc::new(RwLock::new(default_window()));
let t_window = window_service( let t_window = window_service(
subs.clone(), subs.clone(),
win, win,

View File

@ -41,7 +41,7 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
spy_crdt.insert(&leader); spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id); spy_crdt.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_crdt)); let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window(); let spy_window = Arc::new(RwLock::new(default_window()));
let ncp = Ncp::new(&spy_ref, spy_window, None, spy.sockets.gossip, exit.clone()); let ncp = Ncp::new(&spy_ref, spy_window, None, spy.sockets.gossip, exit.clone());
//wait for the network to converge //wait for the network to converge
let mut converged = false; let mut converged = false;
@ -439,7 +439,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
let (alice, ledger_path) = genesis( let (alice, ledger_path) = genesis(
"leader_restart_validator_start_from_old_ledger", "leader_restart_validator_start_from_old_ledger",
100_000 + 500 * solana::window::MAX_REPAIR_BACKOFF as i64, 100_000 + 500 * solana::window_service::MAX_REPAIR_BACKOFF as i64,
); );
let bob_pubkey = Keypair::new().pubkey(); let bob_pubkey = Keypair::new().pubkey();
@ -487,7 +487,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
// send requests so the validator eventually sees a gap and requests a repair // send requests so the validator eventually sees a gap and requests a repair
let mut expected = 1500; let mut expected = 1500;
let mut client = mk_client(&validator_data); let mut client = mk_client(&validator_data);
for _ in 0..solana::window::MAX_REPAIR_BACKOFF { for _ in 0..solana::window_service::MAX_REPAIR_BACKOFF {
let leader_balance = let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(expected)) send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(expected))
.unwrap(); .unwrap();