2018-05-05 06:39:32 -07:00
|
|
|
//! The local message delivery system.
|
|
|
|
use crossbeam::{Scope, ScopedJoinHandle};
|
|
|
|
use crossbeam_channel;
|
|
|
|
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
|
2018-10-10 07:11:27 -07:00
|
|
|
use hbbft::{SourcedMessage, Target, TargetedMessage};
|
2018-05-05 06:39:32 -07:00
|
|
|
|
|
|
|
/// The queue functionality for messages sent between algorithm instances.
|
|
|
|
/// The messaging struct allows for targeted message exchange between comms
|
|
|
|
/// tasks on one side and algo tasks on the other.
|
2018-05-10 08:50:07 -07:00
|
|
|
pub struct Messaging<M> {
|
2018-05-05 06:39:32 -07:00
|
|
|
/// Transmit sides of message channels to comms threads.
|
2018-05-10 08:50:07 -07:00
|
|
|
txs_to_comms: Vec<Sender<M>>,
|
2018-05-05 06:39:32 -07:00
|
|
|
/// Receive side of the routed message channel from comms threads.
|
2018-05-10 08:50:07 -07:00
|
|
|
rx_from_comms: Receiver<SourcedMessage<M, usize>>,
|
2018-05-08 07:20:32 -07:00
|
|
|
/// Transmit sides of message channels to algo thread.
|
2018-05-10 08:50:07 -07:00
|
|
|
tx_to_algo: Sender<SourcedMessage<M, usize>>,
|
2018-05-05 06:39:32 -07:00
|
|
|
/// Receive side of the routed message channel from comms threads.
|
2018-05-10 08:50:07 -07:00
|
|
|
rx_from_algo: Receiver<TargetedMessage<M, usize>>,
|
2018-05-05 06:39:32 -07:00
|
|
|
|
|
|
|
/// RX handles to be used by comms tasks.
|
2018-05-10 08:50:07 -07:00
|
|
|
rxs_to_comms: Vec<Receiver<M>>,
|
2018-05-05 06:39:32 -07:00
|
|
|
/// TX handle to be used by comms tasks.
|
2018-05-10 08:50:07 -07:00
|
|
|
tx_from_comms: Sender<SourcedMessage<M, usize>>,
|
2018-05-08 07:20:32 -07:00
|
|
|
/// RX handles to be used by algo task.
|
2018-05-10 08:50:07 -07:00
|
|
|
rx_to_algo: Receiver<SourcedMessage<M, usize>>,
|
2018-05-08 07:20:32 -07:00
|
|
|
/// TX handle to be used by algo task.
|
2018-05-10 08:50:07 -07:00
|
|
|
tx_from_algo: Sender<TargetedMessage<M, usize>>,
|
2018-05-05 06:39:32 -07:00
|
|
|
|
|
|
|
/// Control channel used to stop the listening thread.
|
|
|
|
stop_tx: Sender<()>,
|
|
|
|
stop_rx: Receiver<()>,
|
|
|
|
}
|
|
|
|
|
2018-05-10 08:50:07 -07:00
|
|
|
impl<M: Send> Messaging<M> {
|
2018-05-05 06:39:32 -07:00
|
|
|
/// Initialises all the required TX and RX handles for the case on a total
|
|
|
|
/// number `num_nodes` of consensus nodes.
|
|
|
|
pub fn new(num_nodes: usize) -> Self {
|
2018-05-10 08:50:07 -07:00
|
|
|
let to_comms: Vec<_> = (0..num_nodes).map(|_| unbounded::<M>()).collect();
|
2018-05-05 06:39:32 -07:00
|
|
|
let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
|
2018-05-10 08:50:07 -07:00
|
|
|
let rxs_to_comms: Vec<Receiver<M>> =
|
2018-05-05 06:39:32 -07:00
|
|
|
to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
|
|
|
|
let (tx_from_comms, rx_from_comms) = unbounded();
|
2018-05-08 07:20:32 -07:00
|
|
|
|
|
|
|
let (tx_to_algo, rx_to_algo) = unbounded();
|
2018-05-05 06:39:32 -07:00
|
|
|
let (tx_from_algo, rx_from_algo) = unbounded();
|
|
|
|
|
|
|
|
let (stop_tx, stop_rx) = bounded(1);
|
|
|
|
|
|
|
|
Messaging {
|
|
|
|
// internally used handles
|
|
|
|
txs_to_comms,
|
|
|
|
rx_from_comms,
|
2018-05-08 07:20:32 -07:00
|
|
|
tx_to_algo,
|
2018-05-05 06:39:32 -07:00
|
|
|
rx_from_algo,
|
|
|
|
|
|
|
|
// externally used handles
|
|
|
|
rxs_to_comms,
|
|
|
|
tx_from_comms,
|
2018-05-08 07:20:32 -07:00
|
|
|
rx_to_algo,
|
2018-05-05 06:39:32 -07:00
|
|
|
tx_from_algo,
|
|
|
|
|
|
|
|
stop_tx,
|
|
|
|
stop_rx,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-05-10 08:50:07 -07:00
|
|
|
pub fn rxs_to_comms(&self) -> &Vec<Receiver<M>> {
|
2018-05-05 06:39:32 -07:00
|
|
|
&self.rxs_to_comms
|
|
|
|
}
|
|
|
|
|
2018-05-10 08:50:07 -07:00
|
|
|
pub fn tx_from_comms(&self) -> &Sender<SourcedMessage<M, usize>> {
|
2018-05-05 06:39:32 -07:00
|
|
|
&self.tx_from_comms
|
|
|
|
}
|
|
|
|
|
2018-05-10 08:50:07 -07:00
|
|
|
pub fn rx_to_algo(&self) -> &Receiver<SourcedMessage<M, usize>> {
|
2018-05-08 07:20:32 -07:00
|
|
|
&self.rx_to_algo
|
2018-05-05 06:39:32 -07:00
|
|
|
}
|
|
|
|
|
2018-05-10 08:50:07 -07:00
|
|
|
pub fn tx_from_algo(&self) -> &Sender<TargetedMessage<M, usize>> {
|
2018-05-05 06:39:32 -07:00
|
|
|
&self.tx_from_algo
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Gives the ownership of the handle to stop the message receive loop.
|
|
|
|
pub fn stop_tx(&self) -> Sender<()> {
|
|
|
|
self.stop_tx.to_owned()
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Spawns the message delivery thread in a given thread scope.
|
|
|
|
pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle<Result<(), Error>>
|
|
|
|
where
|
2018-05-10 08:50:07 -07:00
|
|
|
M: Clone + 'a,
|
2018-05-05 06:39:32 -07:00
|
|
|
{
|
|
|
|
let txs_to_comms = self.txs_to_comms.to_owned();
|
|
|
|
let rx_from_comms = self.rx_from_comms.to_owned();
|
2018-05-08 07:20:32 -07:00
|
|
|
let tx_to_algo = self.tx_to_algo.to_owned();
|
2018-05-05 06:39:32 -07:00
|
|
|
let rx_from_algo = self.rx_from_algo.to_owned();
|
|
|
|
|
|
|
|
let stop_rx = self.stop_rx.to_owned();
|
|
|
|
let mut stop = false;
|
|
|
|
|
|
|
|
// TODO: `select_loop!` seems to really confuse Clippy.
|
|
|
|
#[cfg_attr(
|
|
|
|
feature = "cargo-clippy",
|
|
|
|
allow(never_loop, if_let_redundant_pattern_matching, deref_addrof)
|
|
|
|
)]
|
|
|
|
scope.spawn(move || {
|
|
|
|
let mut result = Ok(());
|
|
|
|
// This loop forwards messages according to their metadata.
|
|
|
|
while !stop && result.is_ok() {
|
|
|
|
select_loop! {
|
2018-05-10 08:50:07 -07:00
|
|
|
recv(rx_from_algo, tm) => {
|
|
|
|
match tm.target {
|
|
|
|
Target::All => {
|
2018-05-05 06:39:32 -07:00
|
|
|
// Send the message to all remote nodes, stopping at
|
|
|
|
// the first error.
|
|
|
|
result = txs_to_comms.iter()
|
|
|
|
.fold(Ok(()), |result, tx| {
|
|
|
|
if result.is_ok() {
|
2018-05-10 08:50:07 -07:00
|
|
|
tx.send(tm.message.clone())
|
2018-05-08 07:20:32 -07:00
|
|
|
} else {
|
2018-05-05 06:39:32 -07:00
|
|
|
result
|
|
|
|
}
|
|
|
|
}).map_err(Error::from);
|
|
|
|
},
|
2018-05-10 08:50:07 -07:00
|
|
|
Target::Node(i) => {
|
2018-05-05 06:39:32 -07:00
|
|
|
result = if i < txs_to_comms.len() {
|
2018-05-10 08:50:07 -07:00
|
|
|
txs_to_comms[i].send(tm.message)
|
2018-05-05 06:39:32 -07:00
|
|
|
.map_err(Error::from)
|
2018-05-08 07:20:32 -07:00
|
|
|
} else {
|
2018-05-05 06:39:32 -07:00
|
|
|
Err(Error::NoSuchTarget)
|
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
|
|
|
recv(rx_from_comms, message) => {
|
|
|
|
// Send the message to all algorithm instances, stopping at
|
|
|
|
// the first error.
|
2018-05-08 07:20:32 -07:00
|
|
|
result = tx_to_algo.send(message.clone()).map_err(Error::from)
|
2018-05-05 06:39:32 -07:00
|
|
|
},
|
|
|
|
recv(stop_rx, _) => {
|
|
|
|
// Flag the thread ready to exit.
|
|
|
|
stop = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} // end of select_loop!
|
|
|
|
result
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
pub enum Error {
|
|
|
|
NoSuchTarget,
|
|
|
|
SendError,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> From<crossbeam_channel::SendError<T>> for Error {
|
|
|
|
fn from(_: crossbeam_channel::SendError<T>) -> Error {
|
|
|
|
Error::SendError
|
|
|
|
}
|
|
|
|
}
|