Poll internal messages receiver.

This commit is contained in:
c0gent 2018-07-11 15:25:27 -07:00
parent c1a7bc8953
commit c5c259109f
6 changed files with 133 additions and 90 deletions

View File

@ -9,8 +9,8 @@ autobins = false
# path = "src/bin/simulation.rs" # path = "src/bin/simulation.rs"
[[bin]] [[bin]]
name = "network" name = "peer_node"
path = "src/bin/network_test.rs" path = "src/bin/peer_node.rs"
[dependencies] [dependencies]
log = "*" log = "*"

2
peer0
View File

@ -1,3 +1,3 @@
#/bin/bash #/bin/bash
HYDRABADGER_LOG=error,info,debug cargo run --bin network -- -b localhost:3000 HYDRABADGER_LOG=error,info,debug cargo run --bin peer_node -- -b localhost:3000

2
peer1
View File

@ -1,3 +1,3 @@
#/bin/bash #/bin/bash
HYDRABADGER_LOG=error,info,debug cargo run --bin network -- -b localhost:3001 -r localhost:3000 HYDRABADGER_LOG=error,info,debug cargo run --bin peer_node -- -b localhost:3001 -r localhost:3000

2
peer2
View File

@ -1,4 +1,4 @@
#/bin/bash #/bin/bash
HYDRABADGER_LOG=error,info,debug cargo run --bin network -- -b localhost:3002 -r localhost:3000 -r localhost:3001 HYDRABADGER_LOG=error,info,debug cargo run --bin peer_node -- -b localhost:3002 -r localhost:3000 -r localhost:3001

View File

@ -36,11 +36,11 @@ fn arg_matches<'a>() -> ArgMatches<'a> {
.takes_value(true) .takes_value(true)
.multiple(true) .multiple(true)
.number_of_values(1)) .number_of_values(1))
.arg(Arg::with_name("broadcast-value") // .arg(Arg::with_name("broadcast-value")
.long("broadcast-value") // .long("broadcast-value")
.value_name("BROADCAST_VALUE") // .value_name("BROADCAST_VALUE")
.help("Specifies the value to broadcast to other nodes.") // .help("Specifies a value to propose to other nodes.")
.takes_value(true)) // .takes_value(true))
.get_matches() .get_matches()
} }
@ -87,10 +87,10 @@ fn main() {
None => HashSet::new(), None => HashSet::new(),
}; };
let broadcast_value = matches.value_of("broadcast-value") // let broadcast_value = matches.value_of("broadcast-value")
.map(|bv| bv.as_bytes().to_vec()); // .map(|bv| bv.as_bytes().to_vec());
let hb = Hydrabadger::new(bind_address, broadcast_value); let hb = Hydrabadger::new(bind_address, None);
hydrabadger::run_node(hb, remote_addresses); hydrabadger::run_node(hb, remote_addresses);
// match mine() { // match mine() {

View File

@ -9,7 +9,7 @@
use crossbeam; use crossbeam;
use std::{ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
sync::{Arc, RwLock}, sync::{Arc, RwLock, Mutex},
{self, iter, process, thread, time}, {self, iter, process, thread, time},
collections::{BTreeSet, HashSet, HashMap, VecDeque}, collections::{BTreeSet, HashSet, HashMap, VecDeque},
fmt::Debug, fmt::Debug,
@ -50,8 +50,8 @@ use hbbft::{
messaging::{DistAlgorithm, NetworkInfo, SourcedMessage, Target, TargetedMessage}, messaging::{DistAlgorithm, NetworkInfo, SourcedMessage, Target, TargetedMessage},
proto::message::BroadcastProto, proto::message::BroadcastProto,
honey_badger::HoneyBadger, honey_badger::HoneyBadger,
dynamic_honey_badger::{/*DynamicHoneyBadger,*/ /*Input, Batch,*/ Message, /*Change*/}, honey_badger::{Message},
queueing_honey_badger::{QueueingHoneyBadger, Input, Batch, /*Message,*/ Change}, queueing_honey_badger::{QueueingHoneyBadger, Input, Batch, Change},
}; };
@ -61,6 +61,8 @@ pub enum Error {
Io(std::io::Error), Io(std::io::Error),
#[fail(display = "{}", _0)] #[fail(display = "{}", _0)]
Serde(bincode::Error), Serde(bincode::Error),
#[fail(display = "Error polling hydrabadger internal receiver")]
HydrabadgerPoll,
} }
impl From<std::io::Error> for Error { impl From<std::io::Error> for Error {
@ -151,19 +153,19 @@ impl InternalMessage {
} }
/// Transmit half of the message channel. /// Transmit half of the wire message channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound): // TODO: Use a bounded tx/rx (find a sensible upper bound):
type WireTx = mpsc::UnboundedSender<WireMessage>; type WireTx = mpsc::UnboundedSender<WireMessage>;
/// Receive half of the message channel. /// Receive half of the wire message channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound): // TODO: Use a bounded tx/rx (find a sensible upper bound):
type WireRx = mpsc::UnboundedReceiver<WireMessage>; type WireRx = mpsc::UnboundedReceiver<WireMessage>;
/// Transmit half of the message channel. /// Transmit half of the internal message channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound): // TODO: Use a bounded tx/rx (find a sensible upper bound):
type InternalTx = mpsc::UnboundedSender<InternalMessage>; type InternalTx = mpsc::UnboundedSender<InternalMessage>;
/// Receive half of the message channel. /// Receive half of the internal message channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound): // TODO: Use a bounded tx/rx (find a sensible upper bound):
type InternalRx = mpsc::UnboundedReceiver<InternalMessage>; type InternalRx = mpsc::UnboundedReceiver<InternalMessage>;
@ -259,12 +261,13 @@ struct Peer {
/// Handle to the shared message state. /// Handle to the shared message state.
// txs: PeerTxs, // txs: PeerTxs,
hb: Arc<Hydrabadger>, hb: Arc<HydrabadgerInner>,
/// Receive half of the message channel. /// Receive half of the message channel.
rx: WireRx, rx: WireRx,
peer_internal_tx: InternalTx, /// Channel to `Hydrabadger`.
internal_tx: InternalTx,
/// Peer socket address. /// Peer socket address.
addr: SocketAddr, addr: SocketAddr,
@ -272,8 +275,8 @@ struct Peer {
impl Peer { impl Peer {
/// Create a new instance of `Peer`. /// Create a new instance of `Peer`.
fn new(hb: Arc<Hydrabadger>, wire_messages: WireMessages, fn new(hb: Arc<HydrabadgerInner>, wire_messages: WireMessages,
peer_internal_tx: InternalTx) -> Peer { internal_tx: InternalTx) -> Peer {
// Get the client socket address // Get the client socket address
let addr = wire_messages.socket().peer_addr().unwrap(); let addr = wire_messages.socket().peer_addr().unwrap();
@ -288,13 +291,13 @@ impl Peer {
wire_messages, wire_messages,
hb, hb,
rx, rx,
peer_internal_tx, internal_tx,
addr, addr,
} }
} }
/// Sends a message to all connected peers. /// Sends a message to all connected peers.
fn send_to_all(&mut self, msg: &WireMessage) { fn wire_to_all(&mut self, msg: &WireMessage) {
// Now, send the message to all other peers // Now, send the message to all other peers
for (addr, tx) in self.hb.peer_txs.read().unwrap().iter() { for (addr, tx) in self.hb.peer_txs.read().unwrap().iter() {
// Don't send the message to ourselves // Don't send the message to ourselves
@ -352,16 +355,18 @@ impl Future for Peer {
// Read new messages from the socket // Read new messages from the socket
while let Async::Ready(message) = self.wire_messages.poll()? { while let Async::Ready(message) = self.wire_messages.poll()? {
info!("Received message: {:?}", message); trace!("Received message: {:?}", message);
if let Some(msg) = message { if let Some(msg) = message {
match msg.kind() { match msg.kind() {
WireMessageKind::Hello => info!("HELLO RECEIVED from '{}'", self.uid), WireMessageKind::Hello => error!("Duplicate `WireMessage::Hello` \
received from '{}'", self.uid),
_ => (), _ => (),
} }
} else { } else {
// EOF was reached. The remote client has disconnected. There is // EOF was reached. The remote client has disconnected. There is
// nothing more to do. // nothing more to do.
info!("Peer ('{}') disconnected.", self.uid);
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
} }
} }
@ -382,19 +387,12 @@ impl Drop for Peer {
} }
/// The `Arc`-wrapped portion of `Hydrabadger`.
struct HoneyBadgerTask { struct HydrabadgerInner {
}
pub struct Hydrabadger {
/// Node uid: /// Node uid:
uid: Uuid, uid: Uuid,
/// Incoming connection socket. /// Incoming connection socket.
addr: SocketAddr, addr: SocketAddr,
// value: Option<Vec<u8>>,
// TODO: Use a bounded tx/rx (find a sensible upper bound): // TODO: Use a bounded tx/rx (find a sensible upper bound):
peer_txs: RwLock<HashMap<SocketAddr, WireTx>>, peer_txs: RwLock<HashMap<SocketAddr, WireTx>>,
@ -404,24 +402,25 @@ pub struct Hydrabadger {
// TODO: Use a bounded tx/rx (find a sensible upper bound): // TODO: Use a bounded tx/rx (find a sensible upper bound):
peer_internal_tx: InternalTx, peer_internal_tx: InternalTx,
peer_internal_rx: InternalRx,
peer_out_queue: RwLock<VecDeque<TargetedMessage<Message<usize>, usize>>>, peer_out_queue: RwLock<VecDeque<TargetedMessage<Message<usize>, usize>>>,
batch_out_queue: RwLock<VecDeque<Batch<Transaction, usize>>>, batch_out_queue: RwLock<VecDeque<Batch<Transaction, usize>>>,
} }
/// The core API for creation of and event handling for a `HoneyBadger` instance.
pub struct Hydrabadger {
inner: Arc<HydrabadgerInner>,
// TODO: Use a bounded tx/rx (find a sensible upper bound):
peer_internal_rx: InternalRx,
}
impl Hydrabadger { impl Hydrabadger {
/// Returns a new Hydrabadger node. /// Returns a new Hydrabadger node.
pub fn new(addr: SocketAddr, _value: Option<Vec<u8>>) -> Self { pub fn new(addr: SocketAddr, _value: Option<Vec<u8>>) -> Self {
// let node_count_good = node_count_total - node_count_faulty;
// let txns = (0..txn_count).map(|_| Transaction::new(txn_bytes));
let sk_set = SecretKeySet::random(0, &mut rand::thread_rng()); let sk_set = SecretKeySet::random(0, &mut rand::thread_rng());
let pk_set = sk_set.public_keys(); let pk_set = sk_set.public_keys();
let uid = Uuid::new_v4(); let uid = Uuid::new_v4();
// let mut all_ids = BTreeSet::new();
// all_ids.insert(id);
// let sk_share = 0;
let node_ids: BTreeSet<_> = iter::once(uid).collect(); let node_ids: BTreeSet<_> = iter::once(uid).collect();
@ -433,64 +432,109 @@ impl Hydrabadger {
); );
let dhb = RwLock::new(QueueingHoneyBadger::builder(netinfo) let dhb = RwLock::new(QueueingHoneyBadger::builder(netinfo)
// Default: 100:
.batch_size(50) .batch_size(50)
.max_future_epochs(0) // Default: 3:
.max_future_epochs(3)
.build()); .build());
/*.build().expect("Error creating `QueueingHoneyBadger`");*/
let (peer_internal_tx, peer_internal_rx) = mpsc::unbounded(); let (peer_internal_tx, peer_internal_rx) = mpsc::unbounded();
Hydrabadger { Hydrabadger {
uid, inner: Arc::new(HydrabadgerInner {
addr, uid,
// value, addr,
peer_txs: RwLock::new(HashMap::new()), peer_txs: RwLock::new(HashMap::new()),
dhb, dhb,
peer_internal_tx,
// peer_in_queue: RwLock::new(VecDeque::new()), peer_out_queue: RwLock::new(VecDeque::new()),
peer_internal_tx, batch_out_queue: RwLock::new(VecDeque::new()),
}),
peer_internal_rx, peer_internal_rx,
peer_out_queue: RwLock::new(VecDeque::new()),
batch_out_queue: RwLock::new(VecDeque::new()),
} }
} }
}
pub fn connect(&self) { impl Future for Hydrabadger {
type Item = ();
type Error = Error;
/// Polls the internal message receiver until all txs are dropped.
fn poll(&mut self) -> Poll<(), Error> {
match self.peer_internal_rx.poll() {
Ok(Async::Ready(Some(i_msg))) => match i_msg.kind() {
InternalMessageKind::Wire(w_msg) => match w_msg.kind() {
WireMessageKind::Hello => {
info!("Adding node ('{}') to honey badger.", i_msg.src_uid);
},
_ => {},
},
},
Ok(Async::Ready(None)) => {
// The sending ends have all dropped.
return Ok(Async::Ready(()));
},
Ok(Async::NotReady) => {},
Err(()) => return Err(Error::HydrabadgerPoll),
};
Ok(Async::NotReady)
} }
} }
fn process_incoming(socket: TcpStream, hb: Arc<HydrabadgerInner>) -> impl Future<Item = (), Error = ()> {
info!("Incoming connection from '{}'", socket.peer_addr().unwrap());
let wire_messages = WireMessages::new(socket);
wire_messages.into_future()
.map_err(|(e, _)| e)
.and_then(move |(msg_opt, w_messages)| {
let hb = hb.clone();
let peer_internal_tx = hb.peer_internal_tx.clone();
let peer = Peer::new(hb, w_messages, peer_internal_tx.clone());
match msg_opt {
Some(msg) => match msg.kind() {
WireMessageKind::Hello => {
peer.internal_tx.unbounded_send(InternalMessage::wire(peer.uid, msg))
.map_err(|err| error!("Unable to send on internal tx. \
Internal rx has dropped: {}", err)).unwrap();
},
_ => {
error!("Peer connected without sending `WireMessageKind::Hello`.");
return Either::A(future::ok(()));
},
},
None => {
// The remote client closed the connection without sending
// a hello message.
return Either::A(future::ok(()));
},
};
Either::B(peer)
})
.map_err(|err| error!("Connection error = {:?}", err))
}
/// Binds to a host address and returns a future which starts the node. /// Binds to a host address and returns a future which starts the node.
pub fn node(hb: Hydrabadger, remotes: HashSet<SocketAddr>) pub fn node(hydrabadger: Hydrabadger, remotes: HashSet<SocketAddr>)
-> impl Future<Item = (), Error = ()> { -> impl Future<Item = (), Error = ()> {
let socket = TcpListener::bind(&hb.addr).unwrap(); let socket = TcpListener::bind(&hydrabadger.inner.addr).unwrap();
info!("Listening on: {}", hb.addr); info!("Listening on: {}", hydrabadger.inner.addr);
// let peer_txs = hb.peer_txs.clone(); let hb = hydrabadger.inner.clone();
let hydrabadger = Arc::new(hb);
let hb = hydrabadger.clone();
let listen = socket.incoming() let listen = socket.incoming()
.map_err(|e| error!("failed to accept socket; error = {:?}", e)) .map_err(|err| error!("failed to accept socket; error = {:?}", err))
.for_each(move |socket| { .for_each(move |socket| {
let peer_addr = socket.peer_addr().unwrap(); tokio::spawn(process_incoming(socket, hb.clone()));
info!("Incoming connection from '{}'", peer_addr);
let wire_messages = WireMessages::new(socket);
tokio::spawn(Peer::new(hb.clone(), wire_messages, hb.peer_internal_tx.clone())
.map_err(|e| {
error!("Connection error = {:?}", e);
})
);
Ok(()) Ok(())
}); });
// let peer_txs = hb.peer_txs.clone(); let uid = hydrabadger.inner.uid.clone();
let uid = hydrabadger.uid.clone(); let hb = hydrabadger.inner.clone();
let hb = hydrabadger.clone();
let connect = future::lazy(move || { let connect = future::lazy(move || {
for remote_addr in remotes.iter() { for remote_addr in remotes.iter() {
let hb = hb.clone(); let hb = hb.clone();
@ -513,26 +557,25 @@ pub fn node(hb: Hydrabadger, remotes: HashSet<SocketAddr>)
Ok(()) Ok(())
}); });
let hb = hydrabadger.clone(); let hb = hydrabadger.inner.clone();
let list = Interval::new(Instant::now(), Duration::from_millis(3000)) let generate_txns = Interval::new(Instant::now(), Duration::from_millis(3000))
.for_each(move |_| { .for_each(move |_| {
let hb = hb.clone(); let hb = hb.clone();
let peer_txs = hb.peer_txs.read().unwrap(); let peer_txs = hb.peer_txs.read().unwrap();
// info!("Peer list:"); trace!("Peer list:");
for (peer_addr, mut pb) in peer_txs.iter() { for (peer_addr, mut pb) in peer_txs.iter() {
info!(" peer_addr: {}", peer_addr); trace!(" peer_addr: {}", peer_addr);
} }
// TODO: Send txns instead. // TODO: Send txns.
Ok(()) Ok(())
}) })
.map_err(|err| { .map_err(|err| error!("List connection inverval error: {:?}", err));
error!("List connection inverval error: {:?}", err);
});
listen.join3(connect, list).map(|(_, _, _)| ()) let hydrabadger = hydrabadger.map_err(|err| error!("Hydrabadger internal error: {:?}", err));
listen.join4(connect, generate_txns, hydrabadger).map(|(_, _, _, _)| ())
} }