From 444adcd1ca04fcf42a9e916a9263fa0b25207fd3 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Wed, 18 Apr 2018 12:02:54 -0700 Subject: [PATCH] update --- src/accountant_skel.rs | 47 +++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index 34de14261..4d776f334 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -22,7 +22,7 @@ use std::io::Write; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; @@ -280,7 +280,7 @@ impl AccountantSkel { let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::process( + let e = Self::process( &skel, &verified_receiver, &blob_sender, @@ -294,9 +294,22 @@ impl AccountantSkel { Ok(vec![t_receiver, t_responder, t_server, t_verifier]) } - /// Create a UDP microservice that forwards messages the given AccountantSkel. - /// This service receives messages from a leader in the network - /// Set `exit` to shutdown its threads. + /// This service receives messages from a leader in the network and processes the transactions + /// on the accountant state. + /// # Arguments + /// * `obj` - The accoutnant state. + /// * `rsubs` - The subscribers. + /// * `exit` - The exit signal. + /// # Remarks + /// The pipeline is constructed as follows + /// 1. receive blobs from the network, these are out of order + /// 2. verify blobs, PoH, signatures + /// 3. reconstruct consequitive window + /// a. order the blobs + /// b. use erasure coding to reconstruct missing blobs + /// c. ask the network for missing blobs + /// 4. process the transaction state machine + /// 5. respond with the hash of the state back to the leader pub fn replicate( obj: &Arc>>, rsubs: Subscribers, @@ -323,26 +336,26 @@ impl AccountantSkel { blob_recycler.clone(), retransmit_receiver, ); - - - let t_window = - streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver, window_sender, retransmit_sender); + //TODO + //the packets comming out of blob_receiver need to be sent to the GPU and verified + //then sent to the window, which does the erasure coding reconstruction + let t_window = streamer::window( + exit.clone(), + blob_recycler.clone(), + blob_receiver, + window_sender, + retransmit_sender, + ); let skel = obj.clone(); let t_server = spawn(move || loop { - let e = AccountantSkel::replicate( - &skel, - &verified_receiver, - &blob_sender, - &blob_recycler, - ); + let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } }); - Ok(vec![t_receiver, t_responder, t_server, t_verifier]) + Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server]) } - } #[cfg(test)]