diff --git a/src/blob_fetch_stage.rs b/src/blob_fetch_stage.rs new file mode 100644 index 000000000..82a88bd3d --- /dev/null +++ b/src/blob_fetch_stage.rs @@ -0,0 +1,47 @@ +//! The `blob_fetch_stage` pulls blobs from UDP sockets and sends it to a channel. + +use packet; +use std::net::UdpSocket; +use std::sync::atomic::AtomicBool; +use std::sync::mpsc::channel; +use std::sync::Arc; +use std::thread::JoinHandle; +use streamer; + +pub struct BlobFetchStage { + pub blob_receiver: streamer::BlobReceiver, + pub thread_hdls: Vec>, +} + +impl BlobFetchStage { + pub fn new( + socket: UdpSocket, + exit: Arc, + blob_recycler: packet::BlobRecycler, + ) -> Self { + Self::new_multi_socket(vec![socket], exit, blob_recycler) + } + pub fn new_multi_socket( + sockets: Vec, + exit: Arc, + blob_recycler: packet::BlobRecycler, + ) -> Self { + let (blob_sender, blob_receiver) = channel(); + let thread_hdls: Vec<_> = sockets + .into_iter() + .map(|socket| { + streamer::blob_receiver( + exit.clone(), + blob_recycler.clone(), + socket, + blob_sender.clone(), + ).expect("blob receiver init") + }) + .collect(); + + BlobFetchStage { + blob_receiver, + thread_hdls, + } + } +} diff --git a/src/crdt.rs b/src/crdt.rs index 51684e40a..b7e6ef591 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -31,7 +31,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; -use streamer::{BlobReceiver, BlobSender}; +use streamer::{BlobReceiver, BlobSender, Window}; pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); @@ -265,7 +265,7 @@ impl Crdt { /// We need to avoid having obj locked while doing any io, such as the `send_to` pub fn broadcast( obj: &Arc>, - window: &Arc>>>, + window: &Window, s: &UdpSocket, transmit_index: &mut u64, received_index: u64, @@ -532,7 +532,7 @@ impl Crdt { .unwrap() } fn run_window_request( - window: &Arc>>>, + window: &Window, from: &ReplicatedData, ix: u64, blob_recycler: &BlobRecycler, @@ -566,7 +566,7 @@ impl Crdt { //TODO we should first coalesce all the requests fn handle_blob( obj: &Arc>, - window: &Arc>>>, + window: &Window, blob_recycler: &BlobRecycler, blob: &Blob, ) -> Option { @@ -638,7 +638,7 @@ impl Crdt { /// Process messages from the network fn run_listen( obj: &Arc>, - window: &Arc>>>, + window: &Window, blob_recycler: &BlobRecycler, requests_receiver: &BlobReceiver, response_sender: &BlobSender, @@ -660,7 +660,7 @@ impl Crdt { } pub fn listen( obj: Arc>, - window: Arc>>>, + window: Window, blob_recycler: BlobRecycler, requests_receiver: BlobReceiver, response_sender: BlobSender, @@ -699,6 +699,7 @@ pub struct Sockets { pub respond: UdpSocket, pub broadcast: UdpSocket, pub repair: UdpSocket, + pub retransmit: UdpSocket, } pub struct TestNode { @@ -716,6 +717,7 @@ impl TestNode { let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let repair = UdpSocket::bind("0.0.0.0:0").unwrap(); + let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); let pubkey = KeyPair::new().pubkey(); let data = ReplicatedData::new( pubkey, @@ -736,6 +738,7 @@ impl TestNode { respond, broadcast, repair, + retransmit, }, } } diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs index ad71882d7..4eb5f8102 100644 --- a/src/fetch_stage.rs +++ b/src/fetch_stage.rs @@ -10,7 +10,7 @@ use streamer; pub struct FetchStage { pub packet_receiver: streamer::PacketReceiver, - pub thread_hdl: JoinHandle<()>, + pub thread_hdls: Vec>, } impl FetchStage { @@ -18,14 +18,30 @@ impl FetchStage { socket: UdpSocket, exit: Arc, packet_recycler: packet::PacketRecycler, + ) -> Self { + Self::new_multi_socket(vec![socket], exit, packet_recycler) + } + pub fn new_multi_socket( + sockets: Vec, + exit: Arc, + packet_recycler: packet::PacketRecycler, ) -> Self { let (packet_sender, packet_receiver) = channel(); - let thread_hdl = - streamer::receiver(socket, exit.clone(), packet_recycler.clone(), packet_sender); + let thread_hdls: Vec<_> = sockets + .into_iter() + .map(|socket| { + streamer::receiver( + socket, + exit.clone(), + packet_recycler.clone(), + packet_sender.clone(), + ) + }) + .collect(); FetchStage { packet_receiver, - thread_hdl, + thread_hdls, } } } diff --git a/src/lib.rs b/src/lib.rs index e1c89bdbc..7df345c1b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ pub mod counter; pub mod bank; pub mod banking_stage; +pub mod blob_fetch_stage; pub mod budget; pub mod crdt; pub mod entry; @@ -43,6 +44,7 @@ pub mod timing; pub mod tpu; pub mod transaction; pub mod tvu; +pub mod window_stage; pub mod write_stage; extern crate bincode; extern crate byteorder; diff --git a/src/server.rs b/src/server.rs index a769c322b..821ebcd78 100644 --- a/src/server.rs +++ b/src/server.rs @@ -114,11 +114,16 @@ impl Server { /// | | Bank | | /// | `------` | /// | ^ | - /// .--------. | | | .------------. - /// | | | .-----. .--+--. .-----. | | | - /// | Leader |--->| NCP +-->| TVU +-->| NCP +------>| Validators | - /// | | | `-----` `-----` `-----` | | | - /// `--------` | | `------------` + /// .--------. | | | .------------. + /// | | | .--+--. | | | + /// | Leader |<------------->| TVU +<--------------->| | + /// | | | `-----` | | Validators | + /// | | | ^ | | | + /// | | | | | | | + /// | | | .--+--. | | | + /// | |<------------->| NCP +<--------------->| | + /// | | | `-----` | | | + /// `--------` | | `------------` /// `-------------------------------` /// ``` pub fn new_validator( @@ -127,7 +132,7 @@ impl Server { requests_socket: UdpSocket, respond_socket: UdpSocket, replicate_socket: UdpSocket, - gossip_socket: UdpSocket, + gossip_listen_socket: UdpSocket, repair_socket: UdpSocket, leader_repl_data: ReplicatedData, exit: Arc, @@ -136,16 +141,36 @@ impl Server { let mut thread_hdls = vec![]; let rpu = Rpu::new(bank.clone(), requests_socket, respond_socket, exit.clone()); thread_hdls.extend(rpu.thread_hdls); + + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + crdt.write() + .expect("'crdt' write lock in pub fn replicate") + .set_leader(leader_repl_data.id); + crdt.write() + .expect("'crdt' write lock before insert() in pub fn replicate") + .insert(&leader_repl_data); + let window = streamer::default_window(); + let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); + let retransmit_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); + let ncp = Ncp::new( + crdt.clone(), + window.clone(), + gossip_listen_socket, + gossip_send_socket, + exit.clone(), + ).expect("Ncp::new"); + let tvu = Tvu::new( bank.clone(), - me, - gossip_socket, + crdt.clone(), + window.clone(), replicate_socket, repair_socket, - leader_repl_data, + retransmit_socket, exit.clone(), ); thread_hdls.extend(tvu.thread_hdls); + thread_hdls.extend(ncp.thread_hdls); Server { thread_hdls } } } diff --git a/src/streamer.rs b/src/streamer.rs index 71c4877e5..072fc39d9 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -18,6 +18,7 @@ pub type PacketReceiver = mpsc::Receiver; pub type PacketSender = mpsc::Sender; pub type BlobSender = mpsc::Sender>; pub type BlobReceiver = mpsc::Receiver>; +pub type Window = Arc>>>; fn recv_loop( sock: &UdpSocket, @@ -143,7 +144,7 @@ pub fn blob_receiver( } fn find_next_missing( - locked_window: &Arc>>>, + locked_window: &Window, crdt: &Arc>, consumed: &mut usize, received: &mut usize, @@ -168,7 +169,7 @@ fn find_next_missing( } fn repair_window( - locked_window: &Arc>>>, + locked_window: &Window, crdt: &Arc>, _recycler: &BlobRecycler, last: &mut usize, @@ -211,7 +212,7 @@ fn repair_window( } fn recv_window( - locked_window: &Arc>>>, + locked_window: &Window, crdt: &Arc>, recycler: &BlobRecycler, consumed: &mut usize, @@ -353,7 +354,7 @@ fn recv_window( Ok(()) } -fn print_window(locked_window: &Arc>>>, consumed: usize) { +fn print_window(locked_window: &Window, consumed: usize) { { let buf: Vec<_> = locked_window .read() @@ -382,14 +383,14 @@ fn print_window(locked_window: &Arc>>>, consumed: } } -pub fn default_window() -> Arc>>> { +pub fn default_window() -> Window { Arc::new(RwLock::new(vec![None; WINDOW_SIZE])) } pub fn window( exit: Arc, crdt: Arc>, - window: Arc>>>, + window: Window, recycler: BlobRecycler, r: BlobReceiver, s: BlobSender, @@ -432,7 +433,7 @@ pub fn window( fn broadcast( crdt: &Arc>, - window: &Arc>>>, + window: &Window, recycler: &BlobRecycler, r: &BlobReceiver, sock: &UdpSocket, @@ -517,7 +518,7 @@ pub fn broadcaster( sock: UdpSocket, exit: Arc, crdt: Arc>, - window: Arc>>>, + window: Window, recycler: BlobRecycler, r: BlobReceiver, ) -> JoinHandle<()> { diff --git a/src/tpu.rs b/src/tpu.rs index 45b898f5c..8ef5bde97 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -85,11 +85,11 @@ impl Tpu { record_stage.entry_receiver, ); let mut thread_hdls = vec![ - fetch_stage.thread_hdl, banking_stage.thread_hdl, record_stage.thread_hdl, write_stage.thread_hdl, ]; + thread_hdls.extend(fetch_stage.thread_hdls.into_iter()); thread_hdls.extend(sigverify_stage.thread_hdls.into_iter()); Tpu { blob_receiver: write_stage.blob_receiver, diff --git a/src/tvu.rs b/src/tvu.rs index fe8e9fb48..1f1d76b8a 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -21,16 +21,16 @@ //! - TODO Validation messages are sent back to the leader use bank::Bank; -use crdt::{Crdt, ReplicatedData}; -use ncp::Ncp; +use blob_fetch_stage::BlobFetchStage; +use crdt::Crdt; use packet; use replicate_stage::ReplicateStage; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::JoinHandle; use streamer; +use window_stage::WindowStage; pub struct Tvu { pub thread_hdls: Vec>, @@ -41,98 +41,45 @@ impl Tvu { /// on the bank state. /// # Arguments /// * `bank` - The bank state. - /// * `me` - my configuration - /// * `gossip` - my gossisp socket - /// * `replicate` - my replicate socket - /// * `leader` - leader configuration + /// * `crdt` - The crdt state. + /// * `window` - The window state. + /// * `replicate_socket` - my replicate socket + /// * `repair_socket` - my repair socket + /// * `retransmit_socket` - my retransmit socket /// * `exit` - The exit signal. pub fn new( bank: Arc, - me: ReplicatedData, - gossip_listen_socket: UdpSocket, - replicate: UdpSocket, + crdt: Arc>, + window: streamer::Window, + replicate_socket: UdpSocket, repair_socket: UdpSocket, - leader: ReplicatedData, + retransmit_socket: UdpSocket, exit: Arc, ) -> Self { - //replicate pipeline - let crdt = Arc::new(RwLock::new(Crdt::new(me))); - crdt.write() - .expect("'crdt' write lock in pub fn replicate") - .set_leader(leader.id); - crdt.write() - .expect("'crdt' write lock before insert() in pub fn replicate") - .insert(&leader); - let window = streamer::default_window(); - let gossip_send_socket = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - let ncp = Ncp::new( - crdt.clone(), - window.clone(), - gossip_listen_socket, - gossip_send_socket, - exit.clone(), - ).expect("Ncp::new"); - - // TODO pull this socket out through the public interface - // make sure we are on the same interface - let mut local = replicate.local_addr().expect("tvu: get local address"); - local.set_port(0); - let write = UdpSocket::bind(local).expect("tvu: bind to local socket"); - let blob_recycler = packet::BlobRecycler::default(); - let (blob_sender, blob_receiver) = channel(); - let t_blob_receiver = streamer::blob_receiver( + let fetch_stage = BlobFetchStage::new_multi_socket( + vec![replicate_socket, repair_socket], exit.clone(), blob_recycler.clone(), - replicate, - blob_sender.clone(), - ).expect("tvu: blob receiver creation"); - let (window_sender, window_receiver) = channel(); - let (retransmit_sender, retransmit_receiver) = channel(); - - let t_retransmit = streamer::retransmitter( - write, - exit.clone(), - crdt.clone(), - blob_recycler.clone(), - retransmit_receiver, ); - let t_repair_receiver = streamer::blob_receiver( - exit.clone(), - blob_recycler.clone(), - repair_socket, - blob_sender.clone(), - ).expect("tvu: blob repair receiver fail"); - //TODO //the packets coming out of blob_receiver need to be sent to the GPU and verified //then sent to the window, which does the erasure coding reconstruction - let t_window = streamer::window( - exit.clone(), - crdt.clone(), + let window_stage = WindowStage::new( + crdt, window, - blob_recycler.clone(), - blob_receiver, - window_sender, - retransmit_sender, - ); - - let replicate_stage = ReplicateStage::new( - bank.clone(), + retransmit_socket, exit.clone(), - window_receiver, blob_recycler.clone(), + fetch_stage.blob_receiver, ); - let mut threads = vec![ - //replicate threads - t_blob_receiver, - t_retransmit, - t_window, - t_repair_receiver, - replicate_stage.thread_hdl, - ]; - threads.extend(ncp.thread_hdls.into_iter()); + let replicate_stage = + ReplicateStage::new(bank, exit, window_stage.blob_receiver, blob_recycler); + + let mut threads = vec![replicate_stage.thread_hdl]; + threads.extend(fetch_stage.thread_hdls.into_iter()); + threads.extend(window_stage.thread_hdls.into_iter()); Tvu { thread_hdls: threads, } @@ -162,14 +109,15 @@ pub mod tests { use transaction::Transaction; use tvu::Tvu; - fn new_replicator( + fn new_ncp( crdt: Arc>, listen: UdpSocket, exit: Arc, - ) -> Result { + ) -> Result<(Ncp, streamer::Window)> { let window = streamer::default_window(); let send_sock = UdpSocket::bind("0.0.0.0:0").expect("bind 0"); - Ncp::new(crdt, window, listen, send_sock, exit) + let ncp = Ncp::new(crdt, window.clone(), listen, send_sock, exit)?; + Ok((ncp, window)) } /// Test that message sent from leader to target1 and replicated to target2 #[test] @@ -185,7 +133,7 @@ pub mod tests { crdt_l.set_leader(leader.data.id); let cref_l = Arc::new(RwLock::new(crdt_l)); - let dr_l = new_replicator(cref_l, leader.sockets.gossip, exit.clone()).unwrap(); + let dr_l = new_ncp(cref_l, leader.sockets.gossip, exit.clone()).unwrap(); //start crdt2 let mut crdt2 = Crdt::new(target2.data.clone()); @@ -193,7 +141,7 @@ pub mod tests { crdt2.set_leader(leader.data.id); let leader_id = leader.data.id; let cref2 = Arc::new(RwLock::new(crdt2)); - let dr_2 = new_replicator(cref2, target2.sockets.gossip, exit.clone()).unwrap(); + let dr_2 = new_ncp(cref2, target2.sockets.gossip, exit.clone()).unwrap(); // setup some blob services to send blobs into the socket // to simulate the source peer and get blobs out of the socket to @@ -221,13 +169,21 @@ pub mod tests { let mint = Mint::new(starting_balance); let replicate_addr = target1.data.replicate_addr; let bank = Arc::new(Bank::new(&mint)); + + //start crdt1 + let mut crdt1 = Crdt::new(target1.data.clone()); + crdt1.insert(&leader.data); + crdt1.set_leader(leader.data.id); + let cref1 = Arc::new(RwLock::new(crdt1)); + let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()).unwrap(); + let tvu = Tvu::new( bank.clone(), - target1.data, - target1.sockets.gossip, + cref1, + dr_1.1, target1.sockets.replicate, target1.sockets.repair, - leader.data, + target1.sockets.retransmit, exit.clone(), ); @@ -292,10 +248,13 @@ pub mod tests { for t in tvu.thread_hdls { t.join().expect("join"); } - for t in dr_l.thread_hdls { + for t in dr_l.0.thread_hdls { t.join().expect("join"); } - for t in dr_2.thread_hdls { + for t in dr_2.0.thread_hdls { + t.join().expect("join"); + } + for t in dr_1.0.thread_hdls { t.join().expect("join"); } t_receiver.join().expect("join"); diff --git a/src/window_stage.rs b/src/window_stage.rs new file mode 100644 index 000000000..36264a88d --- /dev/null +++ b/src/window_stage.rs @@ -0,0 +1,52 @@ +//! The `window_stage` maintains the blob window + +use crdt::Crdt; +use packet; +use std::net::UdpSocket; +use std::sync::atomic::AtomicBool; +use std::sync::mpsc::channel; +use std::sync::{Arc, RwLock}; +use std::thread::JoinHandle; +use streamer; + +pub struct WindowStage { + pub blob_receiver: streamer::BlobReceiver, + pub thread_hdls: Vec>, +} + +impl WindowStage { + pub fn new( + crdt: Arc>, + window: streamer::Window, + retransmit_socket: UdpSocket, + exit: Arc, + blob_recycler: packet::BlobRecycler, + fetch_stage_receiver: streamer::BlobReceiver, + ) -> Self { + let (retransmit_sender, retransmit_receiver) = channel(); + + let t_retransmit = streamer::retransmitter( + retransmit_socket, + exit.clone(), + crdt.clone(), + blob_recycler.clone(), + retransmit_receiver, + ); + let (blob_sender, blob_receiver) = channel(); + let t_window = streamer::window( + exit.clone(), + crdt.clone(), + window, + blob_recycler.clone(), + fetch_stage_receiver, + blob_sender, + retransmit_sender, + ); + let thread_hdls = vec![t_retransmit, t_window]; + + WindowStage { + blob_receiver, + thread_hdls, + } + } +}