This commit is contained in:
Anatoly Yakovenko 2018-04-18 12:02:54 -07:00 committed by Stephen Akridge
parent 69ac305883
commit 444adcd1ca
1 changed files with 30 additions and 17 deletions

View File

@ -22,7 +22,7 @@ use std::io::Write;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
@ -280,7 +280,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
let skel = obj.clone();
let t_server = spawn(move || loop {
let e = AccountantSkel::process(
let e = Self::process(
&skel,
&verified_receiver,
&blob_sender,
@ -294,9 +294,22 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
}
/// Create a UDP microservice that forwards messages the given AccountantSkel.
/// This service receives messages from a leader in the network
/// Set `exit` to shutdown its threads.
/// This service receives messages from a leader in the network and processes the transactions
/// on the accountant state.
/// # Arguments
/// * `obj` - The accoutnant state.
/// * `rsubs` - The subscribers.
/// * `exit` - The exit signal.
/// # Remarks
/// The pipeline is constructed as follows
/// 1. receive blobs from the network, these are out of order
/// 2. verify blobs, PoH, signatures
/// 3. reconstruct consequitive window
/// a. order the blobs
/// b. use erasure coding to reconstruct missing blobs
/// c. ask the network for missing blobs
/// 4. process the transaction state machine
/// 5. respond with the hash of the state back to the leader
pub fn replicate(
obj: &Arc<Mutex<AccountantSkel<W>>>,
rsubs: Subscribers,
@ -323,26 +336,26 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
blob_recycler.clone(),
retransmit_receiver,
);
let t_window =
streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver, window_sender, retransmit_sender);
//TODO
//the packets comming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction
let t_window = streamer::window(
exit.clone(),
blob_recycler.clone(),
blob_receiver,
window_sender,
retransmit_sender,
);
let skel = obj.clone();
let t_server = spawn(move || loop {
let e = AccountantSkel::replicate(
&skel,
&verified_receiver,
&blob_sender,
&blob_recycler,
);
let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler);
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
});
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server])
}
}
#[cfg(test)]