state replication

This commit is contained in:
Anatoly Yakovenko 2018-04-17 19:26:19 -07:00 committed by Stephen Akridge
parent 266f85f607
commit 2ff57df2a0
2 changed files with 56 additions and 2 deletions

View File

@ -247,6 +247,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
}
/// Create a UDP microservice that forwards messages the given AccountantSkel.
/// This service is the network leader
/// Set `exit` to shutdown its threads.
pub fn serve(
obj: &Arc<Mutex<AccountantSkel<W>>>,
@ -269,6 +270,57 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
streamer::responder(write, exit.clone(), blob_recycler.clone(), blob_receiver);
let (verified_sender, verified_receiver) = channel();
let exit_ = exit.clone();
let t_verifier = spawn(move || loop {
let e = Self::blob_verifier(&blob_receiver, &verified_sender);
if e.is_err() && exit_.load(Ordering::Relaxed) {
break;
}
});
let skel = obj.clone();
let t_server = spawn(move || loop {
let e = AccountantSkel::process(
&skel,
&verified_receiver,
&blob_sender,
&packet_recycler,
&blob_recycler,
);
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
});
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
}
/// Create a UDP microservice that forwards messages the given AccountantSkel.
/// This service receives messages from a leader in the network
/// Set `exit` to shutdown its threads.
pub fn replicate(
obj: &Arc<Mutex<AccountantSkel<W>>>,
rsubs: Subscribers,
addr: &str,
exit: Arc<AtomicBool>,
) -> Result<Vec<JoinHandle<()>>> {
let read = UdpSocket::bind(rsubs.me.addr)?;
// make sure we are on the same interface
let mut local = read.local_addr()?;
local.set_port(0);
let write = UdpSocket::bind(local)?;
let blob_recycler = packet::BlobRecycler::default();
let (blob_sender, blob_receiver) = channel();
let t_blob_receiver =
streamer::blob_receiver(exit.clone(), blob_recycler.clone(), read, blob_sender)?;
let (window_sender, window_receiver) = channel();
let subs = Arc::new(RwLock::new(rsubs));
let t_window =
streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver);
let (verified_sender, verified_receiver) = channel();
let exit_ = exit.clone();
let t_verifier = spawn(move || loop {
let e = Self::verifier(&packet_receiver, &verified_sender);
@ -292,6 +344,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
});
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
}
}
#[cfg(test)]

View File

@ -40,18 +40,19 @@ impl Node {
pub struct Subscribers {
data: Vec<Node>,
me: Node,
pub me: Node,
pub leader: Node,
}
impl Subscribers {
pub fn new(me: Node, leader: Node) -> Subscribers {
pub fn new(me: Node, leader: Node, network: &[Node]) -> Subscribers {
let mut h = Subscribers {
data: vec![],
me: me.clone(),
leader: leader.clone(),
};
h.insert(&[me, leader]);
h.insert(network);
h
}