Add/Remove nodes dynamically (not working).

This commit is contained in:
c0gent 2018-07-11 20:12:35 -07:00
parent c5c259109f
commit 987aa9a7f9
No known key found for this signature in database
GPG Key ID: 9CC25E71A743E892
8 changed files with 216 additions and 591 deletions

View File

@ -42,12 +42,9 @@ bytes = "*"
uuid = { version = "0.6", features = ["v4", "serde"] }
byteorder = "*"
# tokio-serde-bincode = "*"
#
[dependencies.hbbft]
version = "*"
# git = "https://github.com/c0gent/hbbft"
# branch = "master"
path = "../hbbft"
git = "https://github.com/c0gent/hbbft"
branch = "c0gent-hydrabadger"
# path = "../hbbft"
features = ["serialization-protobuf"]

View File

@ -3,7 +3,23 @@
An experimental peer-to-peer client using the [Honey Badger Byzantine Fault
Tolerant consensus algorithm](https://github.com/poanetwork/hbbft).
### Status
### Usage
* Initial project layout based on [hbbft
examples](https://github.com/poanetwork/hbbft/tree/master/examples)
#### Running a test peer
1. `git clone https://github.com/c0gent/hydrabadger`
2. `cd hydrabadger`
3. `./peer0`
#### Additional local peers
1. Open a new terminal window.
2. `cd {...}/hydrabadger`
3. `./peer1`
4. Repeat 1 and 2
5. `./peer2`
Each peer will generate a number of random transactions at regular intervals,
process them accordingly, and output complete batches.

View File

@ -42,27 +42,34 @@ use serde::{Serializer, Deserializer, Serialize, Deserialize};
use serde_bytes;
use bincode::{self, serialize_into, deserialize_from, serialize, deserialize};
use tokio_serde_bincode::{ReadBincode, WriteBincode};
use hbbft::{
broadcast::{Broadcast, BroadcastMessage},
crypto::{SecretKeySet, poly::Poly},
crypto::{SecretKeySet, poly::Poly, PublicKey, PublicKeySet},
messaging::{DistAlgorithm, NetworkInfo, SourcedMessage, Target, TargetedMessage},
proto::message::BroadcastProto,
honey_badger::HoneyBadger,
honey_badger::{Message},
queueing_honey_badger::{QueueingHoneyBadger, Input, Batch, Change},
dynamic_honey_badger::Message,
queueing_honey_badger::{Error as QhbError, QueueingHoneyBadger, Input, Batch, Change},
// dynamic_honey_badger::{Error as DhbError, DynamicHoneyBadger, Input, Batch, Change, Message},
};
const BATCH_SIZE: usize = 150;
const TXN_BYTES: usize = 10;
const NEW_TXNS_PER_INTERVAL: usize = 20;
#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "{}", _0)]
#[fail(display = "Io error: {}", _0)]
Io(std::io::Error),
#[fail(display = "{}", _0)]
#[fail(display = "Serde error: {}", _0)]
Serde(bincode::Error),
#[fail(display = "Error polling hydrabadger internal receiver")]
HydrabadgerPoll,
// FIXME: Make honeybadger error thread safe.
#[fail(display = "QueuingHoneyBadger propose error")]
QhbPropose,
#[fail(display = "DynamicHoneyBadger error")]
Dhb // FIXME: ^^^^ DhbError
}
impl From<std::io::Error> for Error {
@ -86,12 +93,12 @@ impl Transaction {
/// Messages sent over the network between nodes.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum WireMessageKind {
Hello,
Hello(PublicKey),
Goodbye,
// Message,
#[serde(with = "serde_bytes")]
Bytes(Bytes),
Message(Message<Uuid>),
// TargetedMessage(TargetedMessage<Uuid>),
// Transaction()
}
@ -99,25 +106,25 @@ pub enum WireMessageKind {
/// Messages sent over the network between nodes.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WireMessage {
// src_uid: Uuid,
kind: WireMessageKind,
}
impl WireMessage {
pub fn hello(/*src_uid: Uuid*/) -> WireMessage {
WireMessage {
// src_uid,
kind: WireMessageKind::Hello,
}
pub fn hello(pub_key: PublicKey) -> WireMessage {
WireMessage { kind: WireMessageKind::Hello(pub_key), }
}
// pub fn src_uid(&self) -> &Uuid {
// &self.src_uid
// }
pub fn message(msg: Message<Uuid>) -> WireMessage {
WireMessage { kind: WireMessageKind::Message(msg), }
}
pub fn kind(&self) -> &WireMessageKind {
&self.kind
}
pub fn into_kind(self) -> WireMessageKind {
self.kind
}
}
@ -125,6 +132,10 @@ impl WireMessage {
#[derive(Clone, Debug)]
pub enum InternalMessageKind {
Wire(WireMessage),
// NewTransaction(Transaction),
NewTransactions(Vec<Transaction>),
IncomingHbMessage(Message<Uuid>),
HbInput(Input<Vec<Transaction>, Uuid>),
}
@ -136,11 +147,20 @@ pub struct InternalMessage {
}
impl InternalMessage {
pub fn new(src_uid: Uuid, kind: InternalMessageKind) -> InternalMessage {
InternalMessage { src_uid, kind, }
}
pub fn wire(src_uid: Uuid, wire_message: WireMessage) -> InternalMessage {
InternalMessage {
src_uid,
kind: InternalMessageKind::Wire(wire_message),
}
InternalMessage::new(src_uid, InternalMessageKind::Wire(wire_message))
}
pub fn new_transactions(src_uid: Uuid, txns: Vec<Transaction>) -> InternalMessage {
InternalMessage::new(src_uid, InternalMessageKind::NewTransactions(txns))
}
pub fn incoming_hb_message(src_uid: Uuid, msg: Message<Uuid>) -> InternalMessage {
InternalMessage::new(src_uid, InternalMessageKind::IncomingHbMessage(msg))
}
pub fn src_uid(&self) -> &Uuid {
@ -150,6 +170,10 @@ impl InternalMessage {
pub fn kind(&self) -> &InternalMessageKind {
&self.kind
}
pub fn into_parts(self) -> (Uuid, InternalMessageKind) {
(self.src_uid, self.kind)
}
}
@ -169,17 +193,6 @@ type InternalTx = mpsc::UnboundedSender<InternalMessage>;
// TODO: Use a bounded tx/rx (find a sensible upper bound):
type InternalRx = mpsc::UnboundedReceiver<InternalMessage>;
// type PeerTxs = Arc<RwLock<HashMap<SocketAddr, Tx>>>;
/// A serialized message with a sender and the timestamp of arrival.
#[derive(Eq, PartialEq, Debug)]
struct TimestampedMessage {
time: Duration,
sender_id: Uuid,
target: Target<Uuid>,
message: Vec<u8>,
}
/// A stream/sink of `WireMessage`s connected to a socket.
#[derive(Debug)]
@ -279,15 +292,16 @@ impl Peer {
internal_tx: InternalTx) -> Peer {
// Get the client socket address
let addr = wire_messages.socket().peer_addr().unwrap();
let uid = Uuid::new_v4();
// Create a channel for this peer
let (tx, rx) = mpsc::unbounded();
// Add an entry for this `Peer` in the shared state map.
let guard = hb.peer_txs.write().unwrap().insert(addr, tx);
let guard = hb.peer_txs.write().unwrap().insert(uid, tx);
Peer {
uid: Uuid::new_v4(),
uid,
wire_messages,
hb,
rx,
@ -299,9 +313,9 @@ impl Peer {
/// Sends a message to all connected peers.
fn wire_to_all(&mut self, msg: &WireMessage) {
// Now, send the message to all other peers
for (addr, tx) in self.hb.peer_txs.read().unwrap().iter() {
for (uid, tx) in self.hb.peer_txs.read().unwrap().iter() {
// Don't send the message to ourselves
if *addr != self.addr {
if *uid != self.uid {
// The send only fails if the rx half has been dropped,
// however this is impossible as the `tx` half will be
// removed from the map before the `rx` is dropped.
@ -325,7 +339,6 @@ impl Future for Peer {
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
// Ensure the loop can't hog the thread for too long:
const MESSAGES_PER_TICK: usize = 10;
// Receive all messages from peers.
@ -338,10 +351,7 @@ impl Future for Peer {
// be flushed to the socket (right below).
self.wire_messages.start_send(v)?;
// If this is the last iteration, the loop will break even
// though there could still be messages to read. Because we did
// not reach `Async::NotReady`, we have to notify ourselves
// in order to tell the executor to schedule the task again.
// Exceeded max messages per tick, schedule notification:
if i + 1 == MESSAGES_PER_TICK {
task::current().notify();
}
@ -358,10 +368,14 @@ impl Future for Peer {
trace!("Received message: {:?}", message);
if let Some(msg) = message {
match msg.kind() {
WireMessageKind::Hello => error!("Duplicate `WireMessage::Hello` \
match msg.into_kind() {
WireMessageKind::Hello(_pub_key) => error!("Duplicate `WireMessage::Hello` \
received from '{}'", self.uid),
_ => (),
WireMessageKind::Message(msg) => {
self.internal_tx.unbounded_send(InternalMessage::incoming_hb_message(
self.uid, msg)).unwrap();
},
_ => unimplemented!(),
}
} else {
// EOF was reached. The remote client has disconnected. There is
@ -382,7 +396,13 @@ impl Future for Peer {
impl Drop for Peer {
fn drop(&mut self) {
self.hb.peer_txs.write().unwrap().remove(&self.addr);
self.hb.peer_txs.write().unwrap().remove(&self.uid);
// let hb = self.hb.clone();
// FIXME: Consider simply sending the 'change' input through the
// internal channel.
self.hb.qhb.write().unwrap().input(Input::Change(Change::Remove(self.uid)))
.expect("Error adding new peer to HB");
}
}
@ -394,11 +414,14 @@ struct HydrabadgerInner {
/// Incoming connection socket.
addr: SocketAddr,
sk_set: SecretKeySet,
pk_set: PublicKeySet,
// TODO: Use a bounded tx/rx (find a sensible upper bound):
peer_txs: RwLock<HashMap<SocketAddr, WireTx>>,
peer_txs: RwLock<HashMap<Uuid, WireTx>>,
/// Honey badger.
dhb: RwLock<QueueingHoneyBadger<Transaction, Uuid>>,
qhb: RwLock<QueueingHoneyBadger<Vec<Transaction>, Uuid>>,
// TODO: Use a bounded tx/rx (find a sensible upper bound):
peer_internal_tx: InternalTx,
@ -418,9 +441,11 @@ pub struct Hydrabadger {
impl Hydrabadger {
/// Returns a new Hydrabadger node.
pub fn new(addr: SocketAddr, _value: Option<Vec<u8>>) -> Self {
let sk_set = SecretKeySet::random(0, &mut rand::thread_rng());
let pk_set = sk_set.public_keys();
let uid = Uuid::new_v4();
// TODO: This needs to be updated based on network size:
let sk_threshold = 0;
let sk_set = SecretKeySet::random(sk_threshold, &mut rand::thread_rng());
let pk_set = sk_set.public_keys();
let node_ids: BTreeSet<_> = iter::once(uid).collect();
@ -431,9 +456,9 @@ impl Hydrabadger {
pk_set.clone(),
);
let dhb = RwLock::new(QueueingHoneyBadger::builder(netinfo)
let qhb = RwLock::new(QueueingHoneyBadger::builder(netinfo)
// Default: 100:
.batch_size(50)
.batch_size(BATCH_SIZE)
// Default: 3:
.max_future_epochs(3)
.build());
@ -444,8 +469,10 @@ impl Hydrabadger {
inner: Arc::new(HydrabadgerInner {
uid,
addr,
sk_set,
pk_set,
peer_txs: RwLock::new(HashMap::new()),
dhb,
qhb,
peer_internal_tx,
peer_out_queue: RwLock::new(VecDeque::new()),
batch_out_queue: RwLock::new(VecDeque::new()),
@ -461,29 +488,104 @@ impl Future for Hydrabadger {
/// Polls the internal message receiver until all txs are dropped.
fn poll(&mut self) -> Poll<(), Error> {
match self.peer_internal_rx.poll() {
Ok(Async::Ready(Some(i_msg))) => match i_msg.kind() {
InternalMessageKind::Wire(w_msg) => match w_msg.kind() {
WireMessageKind::Hello => {
info!("Adding node ('{}') to honey badger.", i_msg.src_uid);
},
_ => {},
// Ensure the loop can't hog the thread for too long:
const MESSAGES_PER_TICK: usize = 50;
// Handle incoming internal messages:
for i in 0..MESSAGES_PER_TICK {
match self.peer_internal_rx.poll() {
Ok(Async::Ready(Some(i_msg))) => {
let (src_uid, w_msg) = i_msg.into_parts();
let mut qhb = self.inner.qhb.write().unwrap();
let epoch = 0; // ?
match w_msg {
InternalMessageKind::Wire(w_msg) => match w_msg.into_kind() {
WireMessageKind::Hello(pub_key) => {
info!("Adding node ('{}') to honey badger.", src_uid);
// qhb.handle_message(Message::HoneyBadger(epoch, HbMessage<NodeUid>),)
qhb.input(Input::Change(Change::Add(src_uid, pub_key)))
.expect("Error adding new peer to HB");
},
_ => {},
},
InternalMessageKind::NewTransactions(txns) => {
info!("Pushing {} new user transactions to queue.", txns.len());
// let mut qhb = self.inner.qhb.write().unwrap();
qhb.input(Input::User(txns))
.expect("Error inputing transactions into `HoneyBadger`");
// Turn the HB crank:
if qhb.queue().len() >= BATCH_SIZE {
////// KEEPME (FIXME: fix HB error type):
// qhb.propose().map_err(|_err| Error::QhbPropose)?;
//////
qhb.propose().unwrap();
}
},
InternalMessageKind::IncomingHbMessage(msg) => {
info!("Handling incoming message: {:?}", msg);
// let mut qhb = self.inner.qhb.write().unwrap();
qhb.handle_message(&src_uid, msg).unwrap();
},
InternalMessageKind::HbInput(hb_input) => {
qhb.input(hb_input)
.expect("Error inputting to HB");
}
}
// Exceeded max messages per tick, schedule notification:
if i + 1 == MESSAGES_PER_TICK {
task::current().notify();
}
},
},
Ok(Async::Ready(None)) => {
// The sending ends have all dropped.
return Ok(Async::Ready(()));
},
Ok(Async::NotReady) => {},
Err(()) => return Err(Error::HydrabadgerPoll),
};
Ok(Async::Ready(None)) => {
// The sending ends have all dropped.
return Ok(Async::Ready(()));
},
Ok(Async::NotReady) => {},
Err(()) => return Err(Error::HydrabadgerPoll),
};
}
// Get a mutable reference to HB:
let mut qhb = self.inner.qhb.write().unwrap();
// Forward outgoing messages:
let peer_txs = self.inner.peer_txs.read().unwrap();
for (i, hb_msg) in qhb.message_iter().enumerate() {
info!("Forwarding message: {:?}", hb_msg);
match hb_msg.target {
Target::Node(n_uid) => {
peer_txs.get(&n_uid).unwrap().unbounded_send(
WireMessage::message(hb_msg.message)).unwrap();
},
Target::All => {
for (n_uid, tx) in peer_txs.iter().filter(|(&uid, _)| uid != self.inner.uid) {
tx.unbounded_send(WireMessage::message(hb_msg.message.clone())).unwrap();
}
},
}
// Exceeded max messages per tick, schedule notification:
if i + 1 == MESSAGES_PER_TICK {
task::current().notify();
}
}
// Check for batch outputs:
for batch in qhb.output_iter() {
// self.batch_out_queue.push_back(txn);
info!("BATCH OUTPUT: {:?}", batch);
}
Ok(Async::NotReady)
}
}
fn process_incoming(socket: TcpStream, hb: Arc<HydrabadgerInner>) -> impl Future<Item = (), Error = ()> {
/// Returns a future that handles incoming connections on `socket`.
fn process_incoming(socket: TcpStream, hb: Arc<HydrabadgerInner>)
-> impl Future<Item = (), Error = ()> {
info!("Incoming connection from '{}'", socket.peer_addr().unwrap());
let wire_messages = WireMessages::new(socket);
@ -495,11 +597,14 @@ fn process_incoming(socket: TcpStream, hb: Arc<HydrabadgerInner>) -> impl Future
let peer = Peer::new(hb, w_messages, peer_internal_tx.clone());
match msg_opt {
Some(msg) => match msg.kind() {
WireMessageKind::Hello => {
peer.internal_tx.unbounded_send(InternalMessage::wire(peer.uid, msg))
Some(msg) => match msg.into_kind() {
WireMessageKind::Hello(pub_key) => {
// Relay hello message (this could be simplified).
peer.internal_tx.unbounded_send(
InternalMessage::wire(peer.uid, WireMessage::hello(pub_key)))
.map_err(|err| error!("Unable to send on internal tx. \
Internal rx has dropped: {}", err)).unwrap();
Internal rx has dropped: {}", err))
.unwrap();
},
_ => {
error!("Peer connected without sending `WireMessageKind::Hello`.");
@ -527,7 +632,7 @@ pub fn node(hydrabadger: Hydrabadger, remotes: HashSet<SocketAddr>)
let hb = hydrabadger.inner.clone();
let listen = socket.incoming()
.map_err(|err| error!("failed to accept socket; error = {:?}", err))
.map_err(|err| error!("Error accepting socket: {:?}", err))
.for_each(move |socket| {
tokio::spawn(process_incoming(socket, hb.clone()));
Ok(())
@ -544,7 +649,7 @@ pub fn node(hydrabadger: Hydrabadger, remotes: HashSet<SocketAddr>)
// Wrap the socket with the frame delimiter and codec:
let mut wire_messages = WireMessages::new(socket);
match wire_messages.send_msg(WireMessage::hello()) {
match wire_messages.send_msg(WireMessage::hello(hb.pk_set.public_key())) {
Ok(_) => {
let peer_internal_tx = hb.peer_internal_tx.clone();
Either::A(Peer::new(hb, wire_messages, peer_internal_tx))
@ -564,10 +669,20 @@ pub fn node(hydrabadger: Hydrabadger, remotes: HashSet<SocketAddr>)
let peer_txs = hb.peer_txs.read().unwrap();
trace!("Peer list:");
for (peer_addr, mut pb) in peer_txs.iter() {
trace!(" peer_addr: {}", peer_addr);
}
trace!(" peer_addr: {}", peer_addr); }
// TODO: Send txns.
let qhb = hb.qhb.read().unwrap();
// If no nodes are connected, ignore new transactions:
if qhb.dyn_hb().netinfo().num_nodes() > 1 {
info!("Generating and inputting {} random transactions...", NEW_TXNS_PER_INTERVAL);
// Send some random transactions to our internal HB instance.
let txns: Vec<_> = (0..NEW_TXNS_PER_INTERVAL).map(|_| Transaction::random(TXN_BYTES)).collect();
hb.peer_internal_tx.unbounded_send(
InternalMessage::new_transactions(hb.uid, txns)
).unwrap();
} else {
info!("No nodes connected...");
}
Ok(())
})

View File

@ -1,101 +0,0 @@
//! Comms task structure. A comms task communicates with a remote node through a
//! socket. Local communication with coordinating threads is made via
//! `crossbeam_channel::unbounded()`.
use crossbeam;
use crossbeam_channel::{Receiver, Sender};
use std::io;
use std::net::TcpStream;
use hbbft::messaging::SourcedMessage;
use hbbft::proto_io::{self, ProtoIo};
use hbbft::protobuf::Message;
#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "{}", _0)]
IoError(io::Error),
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::IoError(err)
}
}
/// A communication task connects a remote node to the thread that manages the
/// consensus algorithm.
pub struct CommsTask<'a, P: 'a, M: 'a> {
/// The transmit side of the multiple producer channel from comms threads.
tx: &'a Sender<SourcedMessage<M, usize>>,
/// The receive side of the channel to the comms thread.
rx: &'a Receiver<M>,
/// The socket IO task.
io: ProtoIo<TcpStream, P>,
/// The index of this comms task for identification against its remote node.
pub node_index: usize,
}
impl<'a, P: Message + 'a, M: Into<P> + From<P> + Send + 'a> CommsTask<'a, P, M> {
pub fn new(
tx: &'a Sender<SourcedMessage<M, usize>>,
rx: &'a Receiver<M>,
stream: TcpStream,
node_index: usize,
) -> Self {
debug!(
"Creating comms task #{} for {:?}",
node_index,
stream.peer_addr().unwrap()
);
CommsTask {
tx,
rx,
io: ProtoIo::from_stream(stream),
node_index,
}
}
/// The main socket IO loop and an asynchronous thread responding to manager
/// thread requests.
pub fn run(mut self) -> Result<(), Error> {
// Borrow parts of `self` before entering the thread binding scope.
let tx = self.tx;
let rx = self.rx;
let mut io1 = self.io.try_clone()?;
let node_index = self.node_index;
crossbeam::scope(move |scope| {
// Local comms receive loop thread.
scope.spawn(move || {
loop {
// Receive a multicast message from the manager thread.
let message = rx.recv().unwrap();
// Forward the message to the remote node.
io1.send(&message.into()).unwrap();
}
});
// Remote comms receive loop.
debug!("Starting remote RX loop for node {}", node_index);
loop {
match self.io.recv() {
Ok(message) => {
tx.send(SourcedMessage {
source: node_index,
message: message.into(),
});
}
Err(proto_io::Error(proto_io::ErrorKind::Protobuf(e), _)) => {
warn!("Node {} - Protobuf error {}", node_index, e)
}
Err(e) => {
warn!("Node {} - Critical error {:?}", node_index, e);
break;
}
}
}
});
Ok(())
}
}

View File

@ -1,54 +0,0 @@
//! Connection data and initiation routines.
use std::collections::{BTreeMap, HashSet};
use std::io::BufReader;
use std::net::{SocketAddr, TcpListener, TcpStream};
#[derive(Debug)]
pub struct Connection {
pub stream: TcpStream,
pub reader: BufReader<TcpStream>,
pub node_str: String,
}
impl Connection {
pub fn new(stream: TcpStream, node_str: String) -> Self {
Connection {
// Create a read buffer of 1K bytes.
reader: BufReader::with_capacity(1024, stream.try_clone().unwrap()),
stream,
node_str,
}
}
}
/// Connect this node to remote peers. A vector of successful connections is returned, as well as
/// our own node ID.
pub fn make(bind_address: &SocketAddr, remote_addresses: &HashSet<SocketAddr>)
-> (String, Vec<Connection>) {
// Listen for incoming connections on a given TCP port.
let listener = TcpListener::bind(bind_address).expect("start listener");
let here_str = format!("{}", bind_address);
// Use a `BTreeMap` to make sure we all iterate in the same order.
let remote_by_str: BTreeMap<String, _> = remote_addresses
.iter()
.map(|addr| (format!("{}", addr), addr))
.filter(|(there_str, _)| *there_str != here_str)
.collect();
// Wait for all nodes with larger addresses to connect.
let connections = remote_by_str
.into_iter()
.map(|(there_str, address)| {
let tcp_conn = if here_str < there_str {
listener.accept().expect("failed to connect").0
} else {
TcpStream::connect(address).expect("failed to connect")
};
Connection::new(tcp_conn, there_str.to_string())
})
.collect();
(here_str, connections)
}

View File

@ -1,159 +0,0 @@
//! The local message delivery system.
use crossbeam::{Scope, ScopedJoinHandle};
// use crossbeam_channel;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use hbbft::messaging::{SourcedMessage, Target, TargetedMessage};
#[derive(Clone, Debug, Fail)]
pub enum Error {
#[fail(display = "Invalid messaging target: '{}'", _0)]
NoSuchTarget(usize),
}
/// The queue functionality for messages sent between algorithm instances.
/// The messaging struct allows for targeted message exchange between comms
/// tasks on one side and algo tasks on the other.
pub struct Messaging<M> {
/// Transmit sides of message channels to comms threads.
txs_to_comms: Vec<Sender<M>>,
/// Receive side of the routed message channel from comms threads.
rx_from_comms: Receiver<SourcedMessage<M, usize>>,
/// Transmit sides of message channels to algo thread.
tx_to_algo: Sender<SourcedMessage<M, usize>>,
/// Receive side of the routed message channel from comms threads.
rx_from_algo: Receiver<TargetedMessage<M, usize>>,
/// RX handles to be used by comms tasks.
rxs_to_comms: Vec<Receiver<M>>,
/// TX handle to be used by comms tasks.
tx_from_comms: Sender<SourcedMessage<M, usize>>,
/// RX handles to be used by algo task.
rx_to_algo: Receiver<SourcedMessage<M, usize>>,
/// TX handle to be used by algo task.
tx_from_algo: Sender<TargetedMessage<M, usize>>,
/// Control channel used to stop the listening thread.
stop_tx: Sender<()>,
stop_rx: Receiver<()>,
}
impl<M: Send> Messaging<M> {
/// Initialises all the required TX and RX handles for the case on a total
/// number `num_nodes` of consensus nodes.
pub fn new(num_nodes: usize) -> Self {
let to_comms: Vec<_> = (0..num_nodes).map(|_| unbounded::<M>()).collect();
let txs_to_comms = to_comms.iter().map(|&(ref tx, _)| tx.to_owned()).collect();
let rxs_to_comms: Vec<Receiver<M>> =
to_comms.iter().map(|&(_, ref rx)| rx.to_owned()).collect();
let (tx_from_comms, rx_from_comms) = unbounded();
let (tx_to_algo, rx_to_algo) = unbounded();
let (tx_from_algo, rx_from_algo) = unbounded();
let (stop_tx, stop_rx) = bounded(1);
Messaging {
// internally used handles
txs_to_comms,
rx_from_comms,
tx_to_algo,
rx_from_algo,
// externally used handles
rxs_to_comms,
tx_from_comms,
rx_to_algo,
tx_from_algo,
stop_tx,
stop_rx,
}
}
pub fn rxs_to_comms(&self) -> &Vec<Receiver<M>> {
&self.rxs_to_comms
}
pub fn tx_from_comms(&self) -> &Sender<SourcedMessage<M, usize>> {
&self.tx_from_comms
}
pub fn rx_to_algo(&self) -> &Receiver<SourcedMessage<M, usize>> {
&self.rx_to_algo
}
pub fn tx_from_algo(&self) -> &Sender<TargetedMessage<M, usize>> {
&self.tx_from_algo
}
/// Gives the ownership of the handle to stop the message receive loop.
pub fn stop_tx(&self) -> Sender<()> {
self.stop_tx.to_owned()
}
/// Spawns the message delivery thread in a given thread scope.
pub fn spawn<'a>(&self, scope: &Scope<'a>) -> ScopedJoinHandle<Result<(), Error>>
where
M: Clone + 'a,
{
let txs_to_comms = self.txs_to_comms.to_owned();
let rx_from_comms = self.rx_from_comms.to_owned();
let tx_to_algo = self.tx_to_algo.to_owned();
let rx_from_algo = self.rx_from_algo.to_owned();
let stop_rx = self.stop_rx.to_owned();
let mut stop = false;
// TODO: `select_loop!` seems to really confuse Clippy.
#[cfg_attr(
feature = "cargo-clippy",
allow(never_loop, if_let_redundant_pattern_matching, deref_addrof)
)]
scope.spawn(move || {
let mut result = Ok(());
// This loop forwards messages according to their metadata.
while !stop && result.is_ok() {
select! {
recv(rx_from_algo, tm_opt) => {
match tm_opt {
Some(tm) => match tm.target {
Target::All => {
// Send the message to all remote nodes.
for tx in txs_to_comms.iter() {
tx.send(tm.message.clone())
}
},
Target::Node(i) => {
// if i < txs_to_comms.len() {
// txs_to_comms[i].send(tm.message);
// } else {
// result = Err(Error::NoSuchTarget);
// }
match txs_to_comms.get(i) {
Some(tx) => tx.send(tm.message),
None => { result = Err(Error::NoSuchTarget(i)); }
}
}
}
_ => (),
}
},
recv(rx_from_comms, message) => {
// Send the message to all algorithm instances, stopping at
// the first error.
if let Some(msg) = message {
tx_to_algo.send(msg)
}
},
recv(stop_rx, _) => {
// Flag the thread ready to exit.
stop = true;
}
}
} // end of select_loop!
result
})
}
}

View File

@ -1,9 +0,0 @@
// pub mod comms_task;
// pub mod connection;
// pub mod messaging;
// pub mod node;
// TODO: De-glob:
// pub use self::comms_task::CommsTask;
// pub use self::messaging::Messaging;
// pub use self::node::Node;

View File

@ -1,180 +0,0 @@
//! A hydrabadger consensus node.
//!
//! Code heavily borrowed from: https://github.com/poanetwork/hbbft/blob/master/examples/network/node.rs
//!
use crossbeam;
use std::collections::{BTreeSet, HashSet};
use std::fmt::Debug;
use std::marker::{Send, Sync};
use std::net::SocketAddr;
use std::rc::Rc;
use std::{io, iter, process, thread, time};
use hbbft::broadcast::{Broadcast, BroadcastMessage};
use hbbft::crypto::poly::Poly;
use hbbft::crypto::SecretKeySet;
use hbbft::messaging::{DistAlgorithm, NetworkInfo, SourcedMessage};
use hbbft::proto::message::BroadcastProto;
use network::comms_task;
use network::connection;
use network::messaging::Messaging;
#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "{}", _0)]
IoError(io::Error),
#[fail(display = "{}", _0)]
CommsError(comms_task::Error),
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::IoError(err)
}
}
impl From<comms_task::Error> for Error {
fn from(err: comms_task::Error) -> Error {
Error::CommsError(err)
}
}
pub struct Node<T> {
/// Incoming connection socket.
addr: SocketAddr,
/// Sockets of remote nodes.
remotes: HashSet<SocketAddr>,
/// Optionally, a value to be broadcast by this node.
value: Option<T>,
}
impl<T> Node<T>
where T: Clone + Debug + AsRef<[u8]> + PartialEq +
Send + Sync + From<Vec<u8>> + Into<Vec<u8>> {
/// Consensus node constructor. It only initialises initial parameters.
pub fn new(addr: SocketAddr, remotes: HashSet<SocketAddr>, value: Option<T>) -> Self {
Node {
addr,
remotes,
value,
}
}
/// Consensus node procedure implementing HoneyBadgerBFT.
pub fn run(&self) -> Result<T, Error> {
let value = &self.value;
let (our_str, connections) = connection::make(&self.addr, &self.remotes);
let mut node_strs: Vec<String> = iter::once(our_str.clone())
.chain(connections.iter().map(|c| c.node_str.clone()))
.collect();
node_strs.sort();
let our_id = node_strs.binary_search(&our_str).unwrap();
let all_ids: BTreeSet<_> = (0..node_strs.len()).collect();
// FIXME: This example doesn't call algorithms that use cryptography. However the keys are
// required by the interface to all algorithms in Honey Badger. Therefore we set placeholder
// keys here. A fully-featured application would need to take appropriately initialized keys
// from elsewhere.
let secret_key_set = SecretKeySet::from(Poly::zero());
let secret_key = secret_key_set.secret_key_share(our_id as u64);
let public_key_set = secret_key_set.public_keys();
let netinfo = NetworkInfo::new(our_id, all_ids.clone(), secret_key, public_key_set);
if value.is_some() != (our_id == 0) {
panic!("Exactly the first node must propose a value.");
}
// Initialise the message delivery system and obtain TX and RX handles.
let messaging: Messaging<BroadcastMessage> = Messaging::new(all_ids.len());
let rxs_to_comms = messaging.rxs_to_comms();
let tx_from_comms = messaging.tx_from_comms();
let rx_to_algo = messaging.rx_to_algo();
let tx_from_algo = messaging.tx_from_algo();
let stop_tx = messaging.stop_tx();
// All spawned threads will have exited by the end of the scope.
crossbeam::scope(|scope| {
// Start the centralised message delivery system.
let _msg_handle = messaging.spawn(scope);
// Associate a broadcast instance with this node. This instance will
// broadcast the proposed value. There is no remote node
// corresponding to this instance, and no dedicated comms task. The
// node index is 0.
let broadcast_handle = scope.spawn(move || {
let mut broadcast =
Broadcast::new(Rc::new(netinfo), 0).expect("failed to instantiate broadcast");
if let Some(v) = value {
broadcast.input(v.clone().into()).expect("propose value");
for msg in broadcast.message_iter() {
tx_from_algo.send(msg);
}
}
loop {
// Receive a message from the socket IO task.
let message = rx_to_algo.recv().expect("receive from algo");
let SourcedMessage { source: i, message } = message;
debug!("{} received from {}: {:?}", our_id, i, message);
broadcast
.handle_message(&i, message)
.expect("handle broadcast message");
for msg in broadcast.message_iter() {
debug!("{} sending to {:?}: {:?}", our_id, msg.target, msg.message);
tx_from_algo.send(msg);
}
if let Some(output) = broadcast.next_output() {
println!(
"Broadcast succeeded! Node {} output: {}",
our_id,
String::from_utf8(output).unwrap()
);
break;
}
}
});
// Start a comms task for each connection. Node indices of those
// tasks are 1 through N where N is the number of connections.
for (i, c) in connections.iter().enumerate() {
// Receive side of a single-consumer channel from algorithm
// actor tasks to the comms task.
let node_index = if c.node_str < our_str { i } else { i + 1 };
let rx_to_comms = &rxs_to_comms[node_index];
scope.spawn(move || {
match comms_task::CommsTask::<BroadcastProto, BroadcastMessage>::new(
tx_from_comms,
rx_to_comms,
// FIXME: handle error
c.stream.try_clone().unwrap(),
node_index,
).run()
{
Ok(_) => debug!("Comms task {} succeeded", node_index),
Err(e) => error!("Comms task {}: {:?}", node_index, e),
}
});
}
// Wait for the broadcast instances to finish before stopping the
// messaging task.
broadcast_handle.join();
// Wait another second so that pending messages get sent out.
thread::sleep(time::Duration::from_secs(1));
// Stop the messaging task.
stop_tx.send(());
process::exit(0);
}) // end of thread scope
}
}