hbbft/examples/network/commst.rs

98 lines
3.3 KiB
Rust
Raw Normal View History

//! Comms task structure. A comms task communicates with a remote node through a
//! socket. Local communication with coordinating threads is made via
//! `crossbeam_channel::unbounded()`.
2018-10-29 07:36:56 -07:00
use std::io;
use std::net::TcpStream;
2018-07-31 02:57:12 -07:00
use bincode;
2018-04-30 08:55:51 -07:00
use crossbeam;
use crossbeam_channel::{Receiver, Sender};
2018-10-29 07:36:56 -07:00
use log::{debug, info};
2018-10-24 02:38:14 -07:00
use serde::{de::DeserializeOwned, Serialize};
2018-10-10 07:11:27 -07:00
use hbbft::SourcedMessage;
/// A communication task connects a remote node to the thread that manages the
/// consensus algorithm.
pub struct CommsTask<'a, M> {
/// The transmit side of the multiple producer channel from comms threads.
2018-05-10 08:50:07 -07:00
tx: &'a Sender<SourcedMessage<M, usize>>,
/// The receive side of the channel to the comms thread.
2018-05-10 08:50:07 -07:00
rx: &'a Receiver<M>,
/// The socket IO task.
2018-07-31 02:57:12 -07:00
stream: TcpStream,
/// The index of this comms task for identification against its remote node.
2018-04-30 08:55:51 -07:00
pub node_index: usize,
}
2018-10-24 02:38:14 -07:00
impl<'a, M: Serialize + DeserializeOwned + Send + 'a> CommsTask<'a, M> {
2018-04-30 08:55:51 -07:00
pub fn new(
2018-05-10 08:50:07 -07:00
tx: &'a Sender<SourcedMessage<M, usize>>,
rx: &'a Receiver<M>,
2018-04-30 08:55:51 -07:00
stream: TcpStream,
node_index: usize,
) -> Self {
debug!(
"Creating comms task #{} for {:?}",
node_index,
stream.peer_addr().unwrap()
);
CommsTask {
2018-04-29 06:27:40 -07:00
tx,
rx,
2018-07-31 02:57:12 -07:00
stream,
2018-04-30 08:55:51 -07:00
node_index,
}
}
/// The main socket IO loop and an asynchronous thread responding to manager
/// thread requests.
pub fn run(mut self) -> Result<(), Box<dyn std::any::Any + Send + 'static>> {
// Borrow parts of `self` before entering the thread binding scope.
2018-05-10 08:50:07 -07:00
let tx = self.tx;
let rx = self.rx;
let mut stream1 = match self.stream.try_clone() {
Ok(stream) => stream,
Err(e) => return Err(Box::new(e)),
};
let node_index = self.node_index;
2018-05-10 08:50:07 -07:00
crossbeam::scope(move |scope| {
// Local comms receive loop thread.
scope.spawn(move |_| {
loop {
// Receive a multicast message from the manager thread.
let message = rx.recv().unwrap();
// Forward the message to the remote node.
2018-07-31 02:57:12 -07:00
bincode::serialize_into(&mut stream1, &message)
.expect("message serialization failed");
}
});
// Remote comms receive loop.
debug!("Starting remote RX loop for node {}", node_index);
loop {
2018-07-31 02:57:12 -07:00
match bincode::deserialize_from(&mut self.stream) {
Ok(message) => {
2018-04-30 08:55:51 -07:00
tx.send(SourcedMessage {
source: node_index,
2018-07-31 02:57:12 -07:00
message,
})
.unwrap();
2018-04-30 08:55:51 -07:00
}
2018-07-31 02:57:12 -07:00
Err(err) => {
if let bincode::ErrorKind::Io(ref io_err) = *err {
if io_err.kind() == io::ErrorKind::UnexpectedEof {
info!("Node {} disconnected.", node_index);
break;
}
}
2018-07-31 02:57:12 -07:00
panic!("Node {} - Deserialization error {:?}", node_index, err);
}
}
}
})
}
}