From fbc754ea252723e41827f064d480c3416afc4367 Mon Sep 17 00:00:00 2001 From: Rob Walker Date: Mon, 6 Aug 2018 12:35:38 -0700 Subject: [PATCH] plug in LedgerWindow fixes #872 --- src/bin/bench-tps.rs | 1 + src/crdt.rs | 120 +++++++++++++++++++++++++++++---------- src/entry.rs | 32 ++++++++++- src/fullnode.rs | 4 +- src/ledger.rs | 10 +--- src/ncp.rs | 3 + src/thin_client.rs | 1 + src/tvu.rs | 2 +- tests/data_replicator.rs | 1 + tests/multinode.rs | 1 + 10 files changed, 132 insertions(+), 43 deletions(-) diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index 7ca49d9ff..b97ea6288 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -689,6 +689,7 @@ fn converge( let ncp = Ncp::new( &spy_ref, window.clone(), + None, spy_gossip, gossip_send_socket, exit_signal.clone(), diff --git a/src/crdt.rs b/src/crdt.rs index dda8c67ee..8bbc2b28e 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -18,6 +18,7 @@ use byteorder::{LittleEndian, ReadBytesExt}; use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy}; use counter::Counter; use hash::Hash; +use ledger::LedgerWindow; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; use rand::{thread_rng, RngCore}; @@ -960,6 +961,7 @@ impl Crdt { } fn run_window_request( window: &SharedWindow, + ledger_window: &mut Option<&mut LedgerWindow>, me: &NodeInfo, from: &NodeInfo, ix: u64, @@ -1006,19 +1008,29 @@ impl Crdt { "requested ix {} != blob_ix {}, outside window!", ix, blob_ix ); + // falls through to checking window_ledger } - } else { - inc_new_counter!("crdt-window-request-fail", 1); - assert!(window.read().unwrap()[pos].data.is_none()); - info!( - "{:x}: failed RequestWindowIndex {:x} {} {}", - me.debug_id(), - from.debug_id(), - ix, - pos, - ); } + if let Some(ledger_window) = ledger_window { + if let Ok(entry) = ledger_window.get_entry(ix) { + inc_new_counter!("crdt-window-request-ledger", 1); + + let out = entry.to_blob(blob_recycler, Some(ix), Some(from.id)); + + return Some(out); + } + } + + inc_new_counter!("crdt-window-request-fail", 1); + info!( + "{:x}: failed RequestWindowIndex {:x} {} {}", + me.debug_id(), + from.debug_id(), + ix, + pos, + ); + None } @@ -1026,11 +1038,14 @@ impl Crdt { fn handle_blob( obj: &Arc>, window: &SharedWindow, + ledger_window: &mut Option<&mut LedgerWindow>, blob_recycler: &BlobRecycler, blob: &Blob, ) -> Option { match deserialize(&blob.data[..blob.meta.size]) { - Ok(request) => Crdt::handle_protocol(request, obj, window, blob_recycler), + Ok(request) => { + Crdt::handle_protocol(request, obj, window, ledger_window, blob_recycler) + } Err(_) => { warn!("deserialize crdt packet failed"); None @@ -1042,6 +1057,7 @@ impl Crdt { request: Protocol, obj: &Arc>, window: &SharedWindow, + ledger_window: &mut Option<&mut LedgerWindow>, blob_recycler: &BlobRecycler, ) -> Option { match request { @@ -1129,7 +1145,7 @@ impl Crdt { inc_new_counter!("crdt-window-request-address-eq", 1); return None; } - Self::run_window_request(&window, &me, &from, ix, blob_recycler) + Self::run_window_request(&window, ledger_window, &me, &from, ix, blob_recycler) } } } @@ -1138,6 +1154,7 @@ impl Crdt { fn run_listen( obj: &Arc>, window: &SharedWindow, + ledger_window: &mut Option<&mut LedgerWindow>, blob_recycler: &BlobRecycler, requests_receiver: &BlobReceiver, response_sender: &BlobSender, @@ -1148,31 +1165,42 @@ impl Crdt { while let Ok(mut more) = requests_receiver.try_recv() { reqs.append(&mut more); } - let resp: VecDeque<_> = reqs - .iter() - .filter_map(|b| Self::handle_blob(obj, window, blob_recycler, &b.read().unwrap())) - .collect(); - response_sender.send(resp)?; - while let Some(r) = reqs.pop_front() { - blob_recycler.recycle(r); + let mut resps = VecDeque::new(); + while let Some(req) = reqs.pop_front() { + if let Some(resp) = Self::handle_blob( + obj, + window, + ledger_window, + blob_recycler, + &req.read().unwrap(), + ) { + resps.push_back(resp); + } + blob_recycler.recycle(req); } + response_sender.send(resps)?; Ok(()) } pub fn listen( obj: Arc>, window: SharedWindow, + ledger_path: Option<&str>, blob_recycler: BlobRecycler, requests_receiver: BlobReceiver, response_sender: BlobSender, exit: Arc, ) -> JoinHandle<()> { let debug_id = obj.read().unwrap().debug_id(); + + let mut ledger_window = ledger_path.map(|p| LedgerWindow::new(p).unwrap()); + Builder::new() .name("solana-listen".to_string()) .spawn(move || loop { let e = Self::run_listen( &obj, &window, + &mut ledger_window.as_mut(), &blob_recycler, &requests_receiver, &response_sender, @@ -1314,11 +1342,14 @@ mod tests { parse_port_or_addr, Crdt, CrdtError, NodeInfo, Protocol, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, }; - use hash::Hash; + use entry::Entry; + use hash::{hash, Hash}; + use ledger::{LedgerWindow, LedgerWriter}; use logger; use packet::BlobRecycler; use result::Error; use signature::{KeyPair, KeyPairUtil, PublicKey}; + use std::fs::remove_dir_all; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -1827,6 +1858,7 @@ mod tests { /// test window requests respond with the right blob, and do not overrun #[test] fn run_window_request() { + logger::setup(); let window = default_window(); let me = NodeInfo::new( KeyPair::new().pubkey(), @@ -1837,19 +1869,48 @@ mod tests { "127.0.0.1:1238".parse().unwrap(), ); let recycler = BlobRecycler::default(); - let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); + let rv = Crdt::run_window_request(&window, &mut None, &me, &me, 0, &recycler); assert!(rv.is_none()); let out = recycler.allocate(); out.write().unwrap().meta.size = 200; window.write().unwrap()[0].data = Some(out); - let rv = Crdt::run_window_request(&window, &me, &me, 0, &recycler); + let rv = Crdt::run_window_request(&window, &mut None, &me, &me, 0, &recycler); assert!(rv.is_some()); let v = rv.unwrap(); //test we copied the blob assert_eq!(v.read().unwrap().meta.size, 200); let len = window.read().unwrap().len() as u64; - let rv = Crdt::run_window_request(&window, &me, &me, len, &recycler); + let rv = Crdt::run_window_request(&window, &mut None, &me, &me, len, &recycler); assert!(rv.is_none()); + + fn tmp_ledger(name: &str) -> String { + let keypair = KeyPair::new(); + + let path = format!("/tmp/farf/{}-{}", name, keypair.pubkey()); + + let mut writer = LedgerWriter::new(&path, true).unwrap(); + let zero = Hash::default(); + let one = hash(&zero.as_ref()); + writer + .write_entries(vec![Entry::new_tick(0, &zero), Entry::new_tick(0, &one)].to_vec()) + .unwrap(); + path + } + + let ledger_path = tmp_ledger("run_window_request"); + let mut ledger_window = LedgerWindow::new(&ledger_path).unwrap(); + + let rv = Crdt::run_window_request( + &window, + &mut Some(&mut ledger_window), + &me, + &me, + 1, + &recycler, + ); + assert!(rv.is_some()); + + remove_dir_all(ledger_path).unwrap(); } /// test window requests respond with the right blob, and do not overrun @@ -1865,7 +1926,7 @@ mod tests { let recycler = BlobRecycler::default(); // Simulate handling a repair request from mock_peer - let rv = Crdt::run_window_request(&window, &me, &mock_peer, 0, &recycler); + let rv = Crdt::run_window_request(&window, &mut None, &me, &mock_peer, 0, &recycler); assert!(rv.is_none()); let blob = recycler.allocate(); let blob_size = 200; @@ -1874,8 +1935,9 @@ mod tests { let num_requests: u32 = 64; for i in 0..num_requests { - let shared_blob = - Crdt::run_window_request(&window, &me, &mock_peer, 0, &recycler).unwrap(); + let shared_blob = Crdt::run_window_request( + &window, &mut None, &me, &mock_peer, 0, &recycler, + ).unwrap(); let blob = shared_blob.read().unwrap(); // Test we copied the blob assert_eq!(blob.meta.size, blob_size); @@ -1935,13 +1997,13 @@ mod tests { let obj = Arc::new(RwLock::new(crdt)); let request = Protocol::RequestUpdates(1, node.clone()); - assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none()); + assert!(Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler).is_none()); let request = Protocol::RequestUpdates(1, node_with_same_addr.clone()); - assert!(Crdt::handle_protocol(request, &obj, &window, &recycler).is_none()); + assert!(Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler).is_none()); let request = Protocol::RequestUpdates(1, node_with_diff_addr.clone()); - Crdt::handle_protocol(request, &obj, &window, &recycler); + Crdt::handle_protocol(request, &obj, &window, &mut None, &recycler); let me = obj.write().unwrap(); diff --git a/src/entry.rs b/src/entry.rs index bdd53d050..124db01ab 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -2,10 +2,12 @@ //! unique ID that is the hash of the Entry before it, plus the hash of the //! transactions within it. Entries cannot be reordered, and its field `num_hashes` //! represents an approximate amount of time since the last Entry was created. -use bincode::serialized_size; +use bincode::{serialize_into, serialized_size}; use hash::{extend_and_hash, hash, Hash}; -use packet::BLOB_DATA_SIZE; +use packet::{BlobRecycler, SharedBlob, BLOB_DATA_SIZE}; use rayon::prelude::*; +use signature::PublicKey; +use std::io::Cursor; use transaction::Transaction; /// Each Entry contains three pieces of data. The `num_hashes` field is the number @@ -76,6 +78,32 @@ impl Entry { entry } + pub fn to_blob( + &self, + blob_recycler: &BlobRecycler, + idx: Option, + id: Option, + ) -> SharedBlob { + let blob = blob_recycler.allocate(); + { + let mut blob_w = blob.write().unwrap(); + let pos = { + let mut out = Cursor::new(blob_w.data_mut()); + serialize_into(&mut out, &self).expect("failed to serialize output"); + out.position() as usize + }; + blob_w.set_size(pos); + + if let Some(idx) = idx { + blob_w.set_index(idx).expect("set_index()"); + } + if let Some(id) = id { + blob_w.set_id(id).expect("set_id()"); + } + } + blob + } + pub fn will_fit(transactions: Vec) -> bool { serialized_size(&Entry { num_hashes: 0, diff --git a/src/fullnode.rs b/src/fullnode.rs index 289067219..f443eaf7a 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -227,8 +227,6 @@ impl FullNode { let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); - // let mut ledger_writer = LedgerWriter::new(ledger_path); - let (tpu, blob_receiver) = Tpu::new( keypair, &bank, @@ -244,6 +242,7 @@ impl FullNode { let ncp = Ncp::new( &crdt, window.clone(), + Some(ledger_path), node.sockets.gossip, node.sockets.gossip_send, exit.clone(), @@ -324,6 +323,7 @@ impl FullNode { let ncp = Ncp::new( &crdt, window.clone(), + ledger_path, node.sockets.gossip, node.sockets.gossip_send, exit.clone(), diff --git a/src/ledger.rs b/src/ledger.rs index 8333f3657..3949a8d5c 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -399,15 +399,7 @@ impl Block for [Entry] { fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque) { for entry in self { - let blob = blob_recycler.allocate(); - let pos = { - let mut bd = blob.write().unwrap(); - let mut out = Cursor::new(bd.data_mut()); - serialize_into(&mut out, &entry).expect("failed to serialize output"); - out.position() as usize - }; - assert!(pos <= BLOB_DATA_SIZE, "pos: {}", pos); - blob.write().unwrap().set_size(pos); + let blob = entry.to_blob(blob_recycler, None, None); q.push_back(blob); } } diff --git a/src/ncp.rs b/src/ncp.rs index a31ed00d4..ee4e7736f 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -20,6 +20,7 @@ impl Ncp { pub fn new( crdt: &Arc>, window: streamer::SharedWindow, + ledger_path: Option<&str>, gossip_listen_socket: UdpSocket, gossip_send_socket: UdpSocket, exit: Arc, @@ -47,6 +48,7 @@ impl Ncp { let t_listen = Crdt::listen( crdt.clone(), window, + ledger_path, blob_recycler.clone(), request_receiver, response_sender.clone(), @@ -95,6 +97,7 @@ mod tests { let d = Ncp::new( &c, w, + None, tn.sockets.gossip, tn.sockets.gossip_send, exit.clone(), diff --git a/src/thin_client.rs b/src/thin_client.rs index c1ce0ebe3..0e10d0361 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -350,6 +350,7 @@ mod tests { assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); server.join().unwrap(); + remove_dir_all(ledger_path).unwrap(); } // sleep(Duration::from_millis(300)); is unstable diff --git a/src/tvu.rs b/src/tvu.rs index c1406cd79..85ee0b115 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -170,7 +170,7 @@ pub mod tests { ) -> Result<(Ncp, SharedWindow)> { let window = streamer::default_window(); let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - let ncp = Ncp::new(&crdt, window.clone(), listen, send_sock, exit)?; + let ncp = Ncp::new(&crdt, window.clone(), None, listen, send_sock, exit)?; Ok((ncp, window)) } diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index f0dd6f840..6ff4d49d5 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -24,6 +24,7 @@ fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { let d = Ncp::new( &c.clone(), w, + None, tn.sockets.gossip, tn.sockets.gossip_send, exit, diff --git a/tests/multinode.rs b/tests/multinode.rs index abc13a7f7..956215256 100755 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -43,6 +43,7 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { let ncp = Ncp::new( &spy_ref, spy_window, + None, spy.sockets.gossip, spy.sockets.gossip_send, exit.clone(),