diff --git a/src/bank.rs b/src/bank.rs index 3da9d367df..efe7a74fd4 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -87,11 +87,6 @@ pub struct Bank { /// The number of transactions the bank has processed without error since the /// start of the ledger. transaction_count: AtomicUsize, - - /// The number of Entries the bank has processed without error since start - /// of the ledger, i.e. poor-man's network synchronization - /// TODO: upgrade to U64 when stable? - entry_count: AtomicUsize, } impl Bank { @@ -105,7 +100,6 @@ impl Bank { time_sources: RwLock::new(HashSet::new()), last_time: RwLock::new(Utc.timestamp(0, 0)), transaction_count: AtomicUsize::new(0), - entry_count: AtomicUsize::new(0), }; bank.apply_payment(deposit, &mut bank.balances.write().unwrap()); bank @@ -302,12 +296,13 @@ impl Bank { } /// Process an ordered list of entries. - pub fn process_entries(&self, entries: I) -> Result + pub fn process_entries(&self, entries: I) -> Result where I: IntoIterator, { + let mut entry_count = 0; for entry in entries { - self.entry_count.fetch_add(1, Ordering::Relaxed); + entry_count += 1; if !entry.transactions.is_empty() { for result in self.process_transactions(entry.transactions) { @@ -321,7 +316,7 @@ impl Bank { self.register_entry_id(&entry.id); } } - Ok(self.entry_count()) + Ok(entry_count) } /// Process a Witness Signature. Any payment plans waiting on this signature @@ -435,9 +430,6 @@ impl Bank { pub fn transaction_count(&self) -> usize { self.transaction_count.load(Ordering::Relaxed) } - pub fn entry_count(&self) -> usize { - self.entry_count.load(Ordering::Relaxed) - } } #[cfg(test)] diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index b4f4dbe351..b9751164c6 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -101,9 +101,11 @@ fn main() { bank.register_entry_id(&entry0.id); bank.register_entry_id(&entry1.id); + // entry_height is the network-wide agreed height of the ledger. + // initialize it from the input ledger eprintln!("processing entries..."); - let num_entries = bank.process_entries(entries).expect("process_entries"); - eprintln!("processed {} entries...", num_entries); + let entry_height = bank.process_entries(entries).expect("process_entries"); + eprintln!("processed {} entries...", entry_height); eprintln!("creating networking stack..."); @@ -135,6 +137,7 @@ fn main() { let newtwork_entry_point = ReplicatedData::new_entry_point(testnet_addr); let s = Server::new_validator( bank, + entry_height, repl_data.clone(), UdpSocket::bind(repl_data.requests_addr).unwrap(), UdpSocket::bind("0.0.0.0:0").unwrap(), @@ -160,6 +163,7 @@ fn main() { let server = Server::new_leader( bank, + entry_height, //Some(Duration::from_millis(1000)), None, repl_data.clone(), diff --git a/src/crdt.rs b/src/crdt.rs index 32c3383f50..23654c7061 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -727,7 +727,10 @@ impl Crdt { } } else { assert!(window.read().unwrap()[pos].is_none()); - info!("failed RequestWindowIndex {} {}", ix, from.repair_addr); + info!( + "failed RequestWindowIndex {} {} {}", + ix, pos, from.repair_addr + ); } None diff --git a/src/drone.rs b/src/drone.rs index b4543d3771..edf10392cf 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -248,6 +248,7 @@ mod tests { let server = Server::new_leader( bank, + 0, Some(Duration::from_millis(30)), leader.data.clone(), leader.sockets.requests, diff --git a/src/server.rs b/src/server.rs index 50f976c44b..dc6b9881ca 100644 --- a/src/server.rs +++ b/src/server.rs @@ -46,6 +46,7 @@ impl Server { /// ``` pub fn new_leader( bank: Bank, + entry_height: u64, tick_duration: Option, me: ReplicatedData, requests_socket: UdpSocket, @@ -89,9 +90,9 @@ impl Server { exit.clone(), crdt, window, + entry_height, blob_recycler.clone(), tpu.blob_receiver, - bank.entry_count(), ); thread_hdls.extend(vec![t_broadcast]); @@ -129,6 +130,7 @@ impl Server { /// ``` pub fn new_validator( bank: Bank, + entry_height: u64, me: ReplicatedData, requests_socket: UdpSocket, respond_socket: UdpSocket, @@ -160,6 +162,7 @@ impl Server { let tvu = Tvu::new( bank.clone(), + entry_height, crdt.clone(), window.clone(), replicate_socket, @@ -188,6 +191,7 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let v = Server::new_validator( bank, + 0, tn.data.clone(), tn.sockets.requests, tn.sockets.respond, diff --git a/src/streamer.rs b/src/streamer.rs index 159a18910b..632cb8ffec 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -16,7 +16,7 @@ use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; use std::time::Duration; -pub const WINDOW_SIZE: usize = 2 * 1024; +pub const WINDOW_SIZE: u64 = 2 * 1024; pub type PacketReceiver = Receiver; pub type PacketSender = Sender; pub type BlobSender = Sender; @@ -148,8 +148,8 @@ pub fn blob_receiver( fn find_next_missing( locked_window: &Window, crdt: &Arc>, - consumed: &mut usize, - received: &mut usize, + consumed: &mut u64, + received: &mut u64, ) -> Result)>> { if *received <= *consumed { return Err(Error::GenericError); @@ -157,7 +157,7 @@ fn find_next_missing( let window = locked_window.read().unwrap(); let reqs: Vec<_> = (*consumed..*received) .filter_map(|pix| { - let i = pix % WINDOW_SIZE; + let i = (pix % WINDOW_SIZE) as usize; if let &None = &window[i] { let val = crdt.read().unwrap().window_index_request(pix as u64); if let Ok((to, req)) = val { @@ -174,18 +174,18 @@ fn repair_window( locked_window: &Window, crdt: &Arc>, _recycler: &BlobRecycler, - last: &mut usize, + last: &mut u64, times: &mut usize, - consumed: &mut usize, - received: &mut usize, + consumed: &mut u64, + received: &mut u64, ) -> Result<()> { #[cfg(feature = "erasure")] { if erasure::recover( _recycler, &mut locked_window.write().unwrap(), - *consumed, - *received, + *consumed as usize, + *received as usize, ).is_err() { trace!("erasure::recover failed"); @@ -217,8 +217,8 @@ fn recv_window( locked_window: &Window, crdt: &Arc>, recycler: &BlobRecycler, - consumed: &mut usize, - received: &mut usize, + consumed: &mut u64, + received: &mut u64, r: &BlobReceiver, s: &BlobSender, retransmit: &BlobSender, @@ -273,7 +273,7 @@ fn recv_window( while let Some(b) = dq.pop_front() { let (pix, meta_size) = { let p = b.write().expect("'b' write lock in fn recv_window"); - (p.get_index()? as usize, p.meta.size) + (p.get_index()?, p.meta.size) }; if pix > *received { *received = pix; @@ -287,7 +287,7 @@ fn recv_window( ); continue; } - let w = pix % WINDOW_SIZE; + let w = (pix % WINDOW_SIZE) as usize; //TODO, after the block are authenticated //if we get different blocks at the same index //that is a network failure/attack @@ -304,7 +304,7 @@ fn recv_window( } } loop { - let k = *consumed % WINDOW_SIZE; + let k = (*consumed % WINDOW_SIZE) as usize; trace!("k: {} consumed: {}", k, *consumed); if window[k].is_none() { break; @@ -330,19 +330,21 @@ fn recv_window( } else { #[cfg(feature = "erasure")] { - let block_start = *consumed - (*consumed % erasure::NUM_CODED); - let coding_end = block_start + erasure::NUM_CODED; + let block_start = *consumed - (*consumed % erasure::NUM_CODED as u64); + let coding_end = block_start + erasure::NUM_CODED as u64; // We've received all this block's data blobs, go and null out the window now for j in block_start..*consumed { - if let Some(b) = mem::replace(&mut window[j % WINDOW_SIZE], None) { + if let Some(b) = + mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None) + { recycler.recycle(b); } } for j in *consumed..coding_end { - window[j % WINDOW_SIZE] = None; + window[(j % WINDOW_SIZE) as usize] = None; } - *consumed += erasure::MAX_MISSING; + *consumed += erasure::MAX_MISSING as u64; debug!( "skipping processing coding blob k: {} consumed: {}", k, *consumed @@ -361,7 +363,7 @@ fn recv_window( Ok(()) } -fn print_window(locked_window: &Window, consumed: usize) { +fn print_window(locked_window: &Window, consumed: u64) { { let buf: Vec<_> = locked_window .read() @@ -369,7 +371,7 @@ fn print_window(locked_window: &Window, consumed: usize) { .iter() .enumerate() .map(|(i, v)| { - if i == (consumed % WINDOW_SIZE) { + if i == (consumed % WINDOW_SIZE) as usize { "_" } else if v.is_none() { "0" @@ -391,25 +393,25 @@ fn print_window(locked_window: &Window, consumed: usize) { } pub fn default_window() -> Window { - Arc::new(RwLock::new(vec![None; WINDOW_SIZE])) + Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize])) } pub fn window( exit: Arc, crdt: Arc>, window: Window, + entry_height: u64, recycler: BlobRecycler, r: BlobReceiver, s: BlobSender, retransmit: BlobSender, - entry_count: usize, ) -> JoinHandle<()> { Builder::new() .name("solana-window".to_string()) .spawn(move || { - let mut consumed = entry_count; - let mut received = entry_count; - let mut last = entry_count; + let mut consumed = entry_height; + let mut received = entry_height; + let mut last = entry_height; let mut times = 0; loop { if exit.load(Ordering::Relaxed) { @@ -459,9 +461,9 @@ fn broadcast( // We could receive more blobs than window slots so // break them up into window-sized chunks to process - let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE).map(|x| x.to_vec()); + let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec()); - print_window(window, *receive_index as usize); + print_window(window, *receive_index); for mut blobs in blobs_chunked { // Insert the coding blobs into the blob stream @@ -479,7 +481,7 @@ fn broadcast( assert!(blobs.len() <= win.len()); for b in &blobs { let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix as usize) % WINDOW_SIZE; + let pos = (ix % WINDOW_SIZE) as usize; if let Some(x) = mem::replace(&mut win[pos], None) { trace!( "popped {} at {}", @@ -492,7 +494,7 @@ fn broadcast( } while let Some(b) = blobs.pop() { let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix as usize) % WINDOW_SIZE; + let pos = (ix % WINDOW_SIZE) as usize; trace!("caching {} at {}", ix, pos); assert!(win[pos].is_none()); win[pos] = Some(b); @@ -531,15 +533,15 @@ pub fn broadcaster( exit: Arc, crdt: Arc>, window: Window, + entry_height: u64, recycler: BlobRecycler, r: BlobReceiver, - entry_count: usize, ) -> JoinHandle<()> { Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { - let mut transmit_index = entry_count as u64; - let mut receive_index = entry_count as u64; + let mut transmit_index = entry_height; + let mut receive_index = entry_height; loop { if exit.load(Ordering::Relaxed) { break; @@ -825,11 +827,11 @@ mod test { exit.clone(), subs, win, + 0, resp_recycler.clone(), r_reader, s_window, s_retransmit, - 0, ); let (s_responder, r_responder) = channel(); let t_responder = responder( diff --git a/src/thin_client.rs b/src/thin_client.rs index bb8bd6f83e..51dc3bcb15 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -205,6 +205,7 @@ mod tests { let server = Server::new_leader( bank, + 0, Some(Duration::from_millis(30)), leader.data.clone(), leader.sockets.requests, @@ -249,6 +250,7 @@ mod tests { let server = Server::new_leader( bank, + 0, Some(Duration::from_millis(30)), leader.data.clone(), leader.sockets.requests, diff --git a/src/tvu.rs b/src/tvu.rs index 6671169bb4..bcbec8397c 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -55,6 +55,7 @@ impl Tvu { /// on the bank state. /// # Arguments /// * `bank` - The bank state. + /// * `entry_height` - Initial ledger height, passed to replicate stage /// * `crdt` - The crdt state. /// * `window` - The window state. /// * `replicate_socket` - my replicate socket @@ -63,6 +64,7 @@ impl Tvu { /// * `exit` - The exit signal. pub fn new( bank: Arc, + entry_height: u64, crdt: Arc>, window: Window, replicate_socket: UdpSocket, @@ -82,11 +84,11 @@ impl Tvu { let window_stage = WindowStage::new( crdt, window, + entry_height, retransmit_socket, exit.clone(), blob_recycler.clone(), fetch_stage.blob_receiver, - bank.entry_count(), ); let replicate_stage = @@ -194,6 +196,7 @@ pub mod tests { let tvu = Tvu::new( bank.clone(), + 0, cref1, dr_1.1, target1.sockets.replicate, diff --git a/src/window_stage.rs b/src/window_stage.rs index 3142279eed..46f6eb2b98 100644 --- a/src/window_stage.rs +++ b/src/window_stage.rs @@ -18,11 +18,11 @@ impl WindowStage { pub fn new( crdt: Arc>, window: Window, + entry_height: u64, retransmit_socket: UdpSocket, exit: Arc, blob_recycler: BlobRecycler, fetch_stage_receiver: BlobReceiver, - entry_count: usize, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -38,11 +38,11 @@ impl WindowStage { exit.clone(), crdt.clone(), window, + entry_height, blob_recycler.clone(), fetch_stage_receiver, blob_sender, retransmit_sender, - entry_count, ); let thread_hdls = vec![t_retransmit, t_window]; diff --git a/tests/multinode.rs b/tests/multinode.rs index 365ea6c6fd..30bd788461 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -32,6 +32,7 @@ fn validator( let replicant_bank = Bank::new(&alice); let mut ts = Server::new_validator( replicant_bank, + 0, validator.data.clone(), validator.sockets.requests, validator.sockets.respond, @@ -105,6 +106,7 @@ fn test_multi_node() { let leader_bank = Bank::new(&alice); let server = Server::new_leader( leader_bank, + 0, None, leader.data.clone(), leader.sockets.requests,