Update the crossbeam dependency to 0.5 (#361)

* updated crossbeam to 0.5

* removed an obsolete Clippy directive
This commit is contained in:
Vladimir Komendantskiy 2018-12-10 17:10:13 +00:00 committed by GitHub
parent ceb416a6e1
commit 99d01bf428
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 55 additions and 80 deletions

View File

@ -38,8 +38,8 @@ tiny-keccak = "1.4"
[dev-dependencies]
colored = "1.6"
crossbeam = "0.3.2"
crossbeam-channel = "0.1"
crossbeam = "0.5"
crossbeam-channel = "0.3"
docopt = "1.0"
itertools = "0.7"
serde_derive = "1.0.55"

View File

@ -12,17 +12,6 @@ use serde::{de::DeserializeOwned, Serialize};
use hbbft::SourcedMessage;
#[derive(Debug)]
pub enum Error {
IoError(io::Error),
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::IoError(err)
}
}
/// A communication task connects a remote node to the thread that manages the
/// consensus algorithm.
pub struct CommsTask<'a, M: 'a> {
@ -59,16 +48,19 @@ impl<'a, M: Serialize + DeserializeOwned + Send + 'a> CommsTask<'a, M> {
/// The main socket IO loop and an asynchronous thread responding to manager
/// thread requests.
pub fn run(mut self) -> Result<(), Error> {
pub fn run(mut self) -> Result<(), Box<dyn std::any::Any + Send + 'static>> {
// Borrow parts of `self` before entering the thread binding scope.
let tx = self.tx;
let rx = self.rx;
let mut stream1 = self.stream.try_clone()?;
let mut stream1 = match self.stream.try_clone() {
Ok(stream) => stream,
Err(e) => return Err(Box::new(e)),
};
let node_index = self.node_index;
crossbeam::scope(move |scope| {
// Local comms receive loop thread.
scope.spawn(move || {
scope.spawn(move |_| {
loop {
// Receive a multicast message from the manager thread.
let message = rx.recv().unwrap();
@ -99,7 +91,6 @@ impl<'a, M: Serialize + DeserializeOwned + Send + 'a> CommsTask<'a, M> {
}
}
}
});
Ok(())
})
}
}

View File

@ -1,6 +1,6 @@
//! The local message delivery system.
use crossbeam::{Scope, ScopedJoinHandle};
use crossbeam_channel::{self, bounded, select_loop, unbounded, Receiver, Sender};
use crossbeam::thread::{Scope, ScopedJoinHandle};
use crossbeam_channel::{self, bounded, select, unbounded, Receiver, Sender};
use hbbft::{SourcedMessage, Target, TargetedMessage};
/// The queue functionality for messages sent between algorithm instances.
@ -85,9 +85,12 @@ impl<M: Send> Messaging<M> {
}
/// Spawns the message delivery thread in a given thread scope.
pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle<Result<(), Error>>
pub fn spawn<'a, 'scope>(
&self,
scope: &'scope Scope<'a>,
) -> ScopedJoinHandle<'scope, Result<(), Error>>
where
M: Clone + 'a,
M: 'a + Clone,
{
let txs_to_comms = self.txs_to_comms.to_owned();
let rx_from_comms = self.rx_from_comms.to_owned();
@ -97,46 +100,45 @@ impl<M: Send> Messaging<M> {
let stop_rx = self.stop_rx.to_owned();
let mut stop = false;
// TODO: `select_loop!` seems to really confuse Clippy.
#[cfg_attr(
feature = "cargo-clippy",
allow(never_loop, if_let_redundant_pattern_matching, deref_addrof)
)]
scope.spawn(move || {
scope.spawn(move |_| {
let mut result = Ok(());
// This loop forwards messages according to their metadata.
while !stop && result.is_ok() {
select_loop! {
recv(rx_from_algo, tm) => {
match tm.target {
Target::All => {
// Send the message to all remote nodes, stopping at
// the first error.
result = txs_to_comms.iter()
.fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(tm.message.clone())
} else {
result
}
}).map_err(Error::from);
},
Target::Node(i) => {
result = if i < txs_to_comms.len() {
txs_to_comms[i].send(tm.message)
.map_err(Error::from)
} else {
Err(Error::NoSuchTarget)
};
select! {
recv(rx_from_algo) -> tm => {
if let Ok(tm) = tm {
match tm.target {
Target::All => {
// Send the message to all remote nodes, stopping at the first
// error.
result = txs_to_comms.iter()
.fold(Ok(()), |result, tx| {
if result.is_ok() {
tx.send(tm.message.clone())
} else {
result
}
}).map_err(Error::from);
},
Target::Node(i) => {
result = if i < txs_to_comms.len() {
txs_to_comms[i].send(tm.message)
.map_err(Error::from)
} else {
Err(Error::NoSuchTarget)
};
}
}
}
},
recv(rx_from_comms, message) => {
// Send the message to all algorithm instances, stopping at
// the first error.
result = tx_to_algo.send(message.clone()).map_err(Error::from)
recv(rx_from_comms) -> message => {
if let Ok(message) = message {
// Send the message to all algorithm instances, stopping at the first
// error.
result = tx_to_algo.send(message.clone()).map_err(Error::from)
}
},
recv(stop_rx, _) => {
recv(stop_rx) -> _ => {
// Flag the thread ready to exit.
stop = true;
}
@ -150,11 +152,11 @@ impl<M: Send> Messaging<M> {
#[derive(Clone, Debug)]
pub enum Error {
NoSuchTarget,
SendError,
Send,
}
impl<T> From<crossbeam_channel::SendError<T>> for Error {
fn from(_: crossbeam_channel::SendError<T>) -> Error {
Error::SendError
Error::Send
}
}

View File

@ -39,7 +39,7 @@ use std::fmt::Debug;
use std::marker::{Send, Sync};
use std::net::SocketAddr;
use std::sync::Arc;
use std::{io, iter, process, thread, time};
use std::{iter, process, thread, time};
use crossbeam;
use crypto::poly::Poly;
@ -51,24 +51,6 @@ use hbbft::{DistAlgorithm, NetworkInfo, SourcedMessage};
use network::messaging::Messaging;
use network::{commst, connection};
#[derive(Debug)]
pub enum Error {
IoError(io::Error),
CommsError(commst::Error),
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::IoError(err)
}
}
impl From<commst::Error> for Error {
fn from(err: commst::Error) -> Error {
Error::CommsError(err)
}
}
/// This is a structure to start a consensus node.
pub struct Node<T> {
/// Incoming connection socket.
@ -92,7 +74,7 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
}
/// Consensus node procedure implementing HoneyBadgerBFT.
pub fn run(&self) -> Result<T, Error> {
pub fn run(&self) -> Result<T, Box<(dyn std::any::Any + Send + 'static)>> {
let value = &self.value;
let (our_str, connections) = connection::make(&self.addr, &self.remotes);
let mut node_strs: Vec<String> = iter::once(our_str.clone())
@ -138,7 +120,7 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
// broadcast the proposed value. There is no remote node
// corresponding to this instance, and no dedicated comms task. The
// node index is 0.
let broadcast_handle = scope.spawn(move || {
let broadcast_handle = scope.spawn(move |_| {
let mut broadcast =
Broadcast::new(Arc::new(netinfo), 0).expect("failed to instantiate broadcast");
@ -183,7 +165,7 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
let node_index = if c.node_str < our_str { i } else { i + 1 };
let rx_to_comms = &rxs_to_comms[node_index];
scope.spawn(move || {
scope.spawn(move |_| {
match commst::CommsTask::<Message>::new(
tx_from_comms,
rx_to_comms,
@ -200,7 +182,7 @@ impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> +
// Wait for the broadcast instances to finish before stopping the
// messaging task.
broadcast_handle.join();
let _ = broadcast_handle.join();
// Wait another second so that pending messages get sent out.
thread::sleep(time::Duration::from_secs(1));