hydrabadger/src/peer.rs

660 lines
21 KiB
Rust
Raw Permalink Normal View History

2018-07-17 14:06:23 -07:00
//! A peer network node.
2018-07-22 01:29:10 -07:00
#![allow(unused_imports, dead_code, unused_variables, unused_mut)]
2018-07-17 14:06:23 -07:00
2018-12-07 08:44:48 -08:00
use crate::hydrabadger::{Error, Hydrabadger};
use crate::{
2019-08-13 00:38:55 -07:00
Contribution, InAddr, InternalMessage, NodeId, OutAddr, Uid, WireMessage, WireMessageKind,
WireMessages, WireRx, WireTx,
2018-12-07 08:44:48 -08:00
};
2018-09-28 06:13:31 -07:00
use futures::sync::mpsc;
2019-08-13 00:38:55 -07:00
use hbbft::{crypto::PublicKey, dynamic_honey_badger::Input as HbInput};
2018-09-28 06:13:31 -07:00
use serde::{Deserialize, Serialize};
2018-07-17 14:06:23 -07:00
use std::{
2018-09-28 06:13:31 -07:00
borrow::Borrow,
2018-07-17 14:06:23 -07:00
collections::{
2018-07-17 15:58:48 -07:00
hash_map::{Iter as HashMapIter, Values as HashMapValues},
HashMap,
2018-07-17 14:06:23 -07:00
},
};
use tokio::prelude::*;
2018-07-17 14:06:23 -07:00
/// The state for each connected client.
2018-12-13 20:19:19 -08:00
pub struct PeerHandler<C: Contribution, N: NodeId> {
// Peer nid.
nid: Option<N>,
2018-07-17 14:06:23 -07:00
// The incoming stream of messages:
2018-12-13 20:19:19 -08:00
wire_msgs: WireMessages<C, N>,
2018-07-17 14:06:23 -07:00
/// Handle to the shared message state.
2018-12-13 20:19:19 -08:00
hdb: Hydrabadger<C, N>,
2018-07-17 14:06:23 -07:00
// TODO: Consider adding back a separate clone of `peer_internal_tx`. Is
// there any difference if capacity isn't an issue? -- doubtful
/// Receive half of the message channel.
2018-12-13 20:19:19 -08:00
rx: WireRx<C, N>,
2018-07-17 14:06:23 -07:00
/// Peer socket address.
out_addr: OutAddr,
}
2018-12-13 20:19:19 -08:00
impl<C: Contribution, N: NodeId> PeerHandler<C, N> {
2018-07-17 14:06:23 -07:00
/// Create a new instance of `Peer`.
2018-09-28 06:13:31 -07:00
pub fn new(
2018-12-13 20:19:19 -08:00
pub_info: Option<(N, InAddr, PublicKey)>,
hdb: Hydrabadger<C, N>,
mut wire_msgs: WireMessages<C, N>,
) -> PeerHandler<C, N> {
2018-07-17 14:06:23 -07:00
// Get the client socket address
let out_addr = OutAddr(wire_msgs.socket().peer_addr().unwrap());
// Create a channel for this peer
let (tx, rx) = mpsc::unbounded();
2018-12-13 20:19:19 -08:00
pub_info.as_ref().map(|(nid, _, _)| nid.clone());
2018-11-19 12:10:47 -08:00
2018-12-13 20:19:19 -08:00
let nid = match pub_info {
Some((ref nid, _, pk)) => {
2018-11-19 12:10:47 -08:00
wire_msgs.set_peer_public_key(pk);
2018-12-13 20:19:19 -08:00
Some(nid.clone())
2018-11-19 12:10:47 -08:00
}
None => None,
};
2018-07-17 14:06:23 -07:00
// Add an entry for this `Peer` in the shared state map.
hdb.peers_mut().add(out_addr, tx, pub_info);
2018-07-17 14:06:23 -07:00
PeerHandler {
2018-12-13 20:19:19 -08:00
nid,
2018-07-17 14:06:23 -07:00
wire_msgs,
hdb,
rx,
out_addr,
}
}
2018-12-13 20:19:19 -08:00
pub(crate) fn hdb(&self) -> &Hydrabadger<C, N> {
&self.hdb
2018-07-17 14:06:23 -07:00
}
pub(crate) fn out_addr(&self) -> &OutAddr {
&self.out_addr
2018-07-17 14:06:23 -07:00
}
}
/// A future representing the client connection.
2018-12-13 20:19:19 -08:00
impl<C: Contribution, N: NodeId> Future for PeerHandler<C, N> {
2018-07-17 14:06:23 -07:00
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
const MESSAGES_PER_TICK: usize = 10;
// Receive all messages from peers.
for i in 0..MESSAGES_PER_TICK {
// Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is
// safe.
match self.rx.poll().unwrap() {
Async::Ready(Some(v)) => {
// Buffer the message. Once all messages are buffered, they will
// be flushed to the socket (right below).
self.wire_msgs.start_send(v)?;
// Exceeded max messages per tick, schedule notification:
if i + 1 == MESSAGES_PER_TICK {
task::current().notify();
}
}
_ => break,
}
}
// Flush the write buffer to the socket
let _ = self.wire_msgs.poll_complete()?;
// Read new messages from the socket
while let Async::Ready(message) = self.wire_msgs.poll()? {
trace!("Received message: {:?}", message);
if let Some(msg) = message {
match msg.into_kind() {
2018-12-13 20:19:19 -08:00
WireMessageKind::HelloRequestChangeAdd(src_nid, _in_addr, _pub_key) => {
2018-09-28 06:13:31 -07:00
error!(
"Duplicate `WireMessage::HelloRequestChangeAdd` \
2018-12-13 20:19:19 -08:00
received from '{:?}'",
src_nid
2018-09-28 06:13:31 -07:00
);
}
2018-12-13 20:19:19 -08:00
WireMessageKind::WelcomeReceivedChangeAdd(src_nid, pk, net_state) => {
self.nid = Some(src_nid.clone());
2018-11-19 12:10:47 -08:00
self.wire_msgs.set_peer_public_key(pk);
2018-09-28 06:13:31 -07:00
self.hdb.send_internal(InternalMessage::wire(
2018-12-13 20:19:19 -08:00
Some(src_nid.clone()),
2018-09-28 06:13:31 -07:00
self.out_addr,
2019-08-13 00:38:55 -07:00
WireMessage::welcome_received_change_add(
src_nid.clone(),
pk,
net_state,
),
2018-09-28 06:13:31 -07:00
));
}
2018-12-13 20:19:19 -08:00
WireMessageKind::HelloFromValidator(src_nid, in_addr, pk, net_state) => {
self.nid = Some(src_nid.clone());
2018-11-19 12:10:47 -08:00
self.wire_msgs.set_peer_public_key(pk);
self.hdb.send_internal(InternalMessage::wire(
2018-12-13 20:19:19 -08:00
Some(src_nid.clone()),
2018-11-19 12:10:47 -08:00
self.out_addr,
2019-08-13 00:38:55 -07:00
WireMessage::hello_from_validator(
src_nid.clone(),
in_addr,
pk,
net_state,
),
2018-11-19 12:10:47 -08:00
));
}
2018-12-13 20:19:19 -08:00
WireMessageKind::Message(src_nid, msg) => {
if let Some(peer_nid) = self.nid.as_ref() {
debug_assert_eq!(src_nid, *peer_nid);
2018-07-23 12:01:13 -07:00
}
2018-09-28 06:13:31 -07:00
self.hdb.send_internal(InternalMessage::hb_message(
2018-12-13 20:19:19 -08:00
src_nid,
2018-09-28 06:13:31 -07:00
self.out_addr,
msg,
))
}
2018-12-13 20:19:19 -08:00
WireMessageKind::Transaction(src_nid, txn) => {
if let Some(peer_nid) = self.nid.as_ref() {
debug_assert_eq!(src_nid, *peer_nid);
2018-07-23 12:34:08 -07:00
}
self.hdb.send_internal(InternalMessage::hb_contribution(
2018-12-13 20:19:19 -08:00
src_nid,
2018-09-28 06:13:31 -07:00
self.out_addr,
txn,
2018-09-28 06:13:31 -07:00
))
}
2018-09-28 06:13:31 -07:00
kind => self.hdb.send_internal(InternalMessage::wire(
2018-12-13 20:19:19 -08:00
self.nid.clone(),
2018-09-28 06:13:31 -07:00
self.out_addr,
kind.into(),
)),
2018-07-17 14:06:23 -07:00
}
} else {
2018-12-13 20:19:19 -08:00
info!("Peer ({}: '{:?}') disconnected.", self.out_addr, self.nid);
2018-07-17 14:06:23 -07:00
return Ok(Async::Ready(()));
}
}
Ok(Async::NotReady)
}
}
2018-12-13 20:19:19 -08:00
impl<C: Contribution, N: NodeId> Drop for PeerHandler<C, N> {
2018-07-17 14:06:23 -07:00
fn drop(&mut self) {
2018-09-28 06:13:31 -07:00
debug!(
2018-12-13 20:19:19 -08:00
"Removing peer ({}: '{:?}') from the list of peers.",
2018-09-28 06:13:31 -07:00
self.out_addr,
2018-12-13 20:19:19 -08:00
self.nid.clone().unwrap()
2018-09-28 06:13:31 -07:00
);
2018-07-17 14:06:23 -07:00
// Remove peer transmitter from the lists:
self.hdb.peers_mut().remove(&self.out_addr);
2018-12-13 20:19:19 -08:00
if let Some(nid) = self.nid.clone() {
2018-09-28 06:13:31 -07:00
debug!(
2018-12-13 20:19:19 -08:00
"Sending peer ({}: '{:?}') disconnect internal message.",
2018-09-28 06:13:31 -07:00
self.out_addr,
2018-12-13 20:19:19 -08:00
self.nid.clone().unwrap()
2018-09-28 06:13:31 -07:00
);
self.hdb
2018-12-13 20:19:19 -08:00
.send_internal(InternalMessage::peer_disconnect(nid, self.out_addr));
2018-07-17 14:06:23 -07:00
}
}
}
#[derive(Clone, Debug)]
#[allow(dead_code)]
2018-12-13 20:19:19 -08:00
enum State<N> {
Handshaking,
2018-07-22 01:29:10 -07:00
PendingJoinInfo {
2018-12-13 20:19:19 -08:00
nid: N,
2018-07-22 01:29:10 -07:00
in_addr: InAddr,
pk: PublicKey,
},
EstablishedObserver {
2018-12-13 20:19:19 -08:00
nid: N,
2018-07-22 01:29:10 -07:00
in_addr: InAddr,
pk: PublicKey,
},
EstablishedValidator {
2018-12-13 20:19:19 -08:00
nid: N,
in_addr: InAddr,
pk: PublicKey,
},
2018-07-17 14:06:23 -07:00
}
/// Nodes of the network.
#[derive(Clone, Debug)]
2018-12-13 20:19:19 -08:00
pub struct Peer<C: Contribution, N: NodeId> {
2018-07-17 14:06:23 -07:00
out_addr: OutAddr,
2018-12-13 20:19:19 -08:00
tx: WireTx<C, N>,
state: State<N>,
2018-07-17 14:06:23 -07:00
}
2018-12-13 20:19:19 -08:00
impl<C: Contribution, N: NodeId> Peer<C, N> {
2018-07-17 14:06:23 -07:00
/// Returns a new `Peer`
2018-09-28 06:13:31 -07:00
fn new(
out_addr: OutAddr,
2018-12-13 20:19:19 -08:00
tx: WireTx<C, N>,
pub_info: Option<(N, InAddr, PublicKey)>,
) -> Peer<C, N> {
let state = match pub_info {
None => State::Handshaking,
2018-12-13 20:19:19 -08:00
Some((nid, in_addr, pk)) => State::EstablishedValidator { nid, in_addr, pk },
};
2018-07-17 14:06:23 -07:00
Peer {
out_addr,
tx,
state,
2018-07-17 14:06:23 -07:00
}
}
2018-07-22 01:29:10 -07:00
/// Sets a peer state to `State::PendingJoinInfo` and stores public info.
2018-12-13 20:19:19 -08:00
fn set_pending(&mut self, pub_info: (N, InAddr, PublicKey)) {
2018-07-22 01:29:10 -07:00
self.state = match self.state {
2018-09-28 06:13:31 -07:00
State::Handshaking => State::PendingJoinInfo {
2018-12-13 20:19:19 -08:00
nid: pub_info.0,
2018-09-28 06:13:31 -07:00
in_addr: pub_info.1,
pk: pub_info.2,
2018-07-22 01:29:10 -07:00
},
2018-09-28 06:13:31 -07:00
_ => panic!(
"Peer::set_pending: Can only set pending when \
peer state is `Handshaking`."
),
2018-07-22 01:29:10 -07:00
};
}
/// Sets a peer state to `State::EstablishedObserver` and stores public info.
fn establish_observer(&mut self) {
self.state = match self.state {
2019-08-13 00:38:55 -07:00
State::PendingJoinInfo {
ref nid,
in_addr,
pk,
} => State::EstablishedObserver {
nid: nid.clone(),
in_addr,
pk,
},
2018-09-28 06:13:31 -07:00
_ => panic!(
"Peer::establish_observer: Can only establish observer when \
peer state is`PendingJoinInfo`."
),
2018-07-22 01:29:10 -07:00
};
}
/// Sets a peer state to `State::EstablishedValidator` and stores public info.
2018-12-13 20:19:19 -08:00
fn establish_validator(&mut self, pub_info: Option<(N, InAddr, PublicKey)>) {
2018-07-22 01:29:10 -07:00
self.state = match self.state {
State::Handshaking => match pub_info {
2018-09-28 06:13:31 -07:00
Some(pi) => State::EstablishedValidator {
2018-12-13 20:19:19 -08:00
nid: pi.0,
2018-09-28 06:13:31 -07:00
in_addr: pi.1,
pk: pi.2,
2018-07-22 01:29:10 -07:00
},
None => {
2018-09-28 06:13:31 -07:00
panic!(
"Peer::establish_validator: `pub_info` must be supplied \
when establishing a validator from `Handshaking`."
);
}
2018-07-22 01:29:10 -07:00
},
2019-08-13 00:38:55 -07:00
State::EstablishedObserver {
ref nid,
in_addr,
pk,
} => {
if pub_info.is_some() {
2018-09-28 06:13:31 -07:00
panic!(
"Peer::establish_validator: `pub_info` must be `None` \
when upgrading an observer node."
);
2018-07-22 01:29:10 -07:00
}
2019-08-13 00:38:55 -07:00
State::EstablishedValidator {
nid: nid.clone(),
in_addr,
pk,
}
2018-09-28 06:13:31 -07:00
}
_ => panic!(
"Peer::establish_validator: Can only establish validator when \
peer state is`Handshaking` or `EstablishedObserver`."
),
};
}
2018-07-17 14:06:23 -07:00
/// Returns the peer's unique identifier.
2018-12-13 20:19:19 -08:00
pub fn node_id(&self) -> Option<&N> {
match self.state {
State::Handshaking => None,
2018-12-13 20:19:19 -08:00
State::PendingJoinInfo { ref nid, .. } => Some(nid),
State::EstablishedObserver { ref nid, .. } => Some(nid),
State::EstablishedValidator { ref nid, .. } => Some(nid),
}
2018-07-17 14:06:23 -07:00
}
/// Returns the peer's unique identifier.
pub fn out_addr(&self) -> &OutAddr {
&self.out_addr
2018-07-17 14:06:23 -07:00
}
/// Returns the peer's public key.
pub fn public_key(&self) -> Option<&PublicKey> {
match self.state {
State::Handshaking => None,
2018-07-22 01:29:10 -07:00
State::PendingJoinInfo { ref pk, .. } => Some(pk),
State::EstablishedObserver { ref pk, .. } => Some(pk),
State::EstablishedValidator { ref pk, .. } => Some(pk),
}
2018-07-17 14:06:23 -07:00
}
/// Returns the peer's incoming (listening) socket address.
pub fn in_addr(&self) -> Option<&InAddr> {
match self.state {
State::Handshaking => None,
2018-07-22 01:29:10 -07:00
State::PendingJoinInfo { ref in_addr, .. } => Some(in_addr),
State::EstablishedObserver { ref in_addr, .. } => Some(in_addr),
State::EstablishedValidator { ref in_addr, .. } => Some(in_addr),
}
2018-07-17 14:06:23 -07:00
}
2018-07-17 15:58:48 -07:00
/// Returns the peer's public info if established.
2018-12-13 20:19:19 -08:00
pub fn pub_info(&self) -> Option<(&N, &InAddr, &PublicKey)> {
match self.state {
State::Handshaking => None,
2018-09-28 06:13:31 -07:00
State::EstablishedObserver {
2018-12-13 20:19:19 -08:00
ref nid,
2018-09-28 06:13:31 -07:00
ref in_addr,
ref pk,
2018-12-13 20:19:19 -08:00
} => Some((nid, in_addr, pk)),
2018-09-28 06:13:31 -07:00
State::PendingJoinInfo {
2018-12-13 20:19:19 -08:00
ref nid,
2018-09-28 06:13:31 -07:00
ref in_addr,
ref pk,
2018-12-13 20:19:19 -08:00
} => Some((nid, in_addr, pk)),
2018-09-28 06:13:31 -07:00
State::EstablishedValidator {
2018-12-13 20:19:19 -08:00
ref nid,
2018-09-28 06:13:31 -07:00
ref in_addr,
ref pk,
2018-12-13 20:19:19 -08:00
} => Some((nid, in_addr, pk)),
2018-07-22 01:29:10 -07:00
}
}
/// Returns true if this peer is pending.
pub fn is_pending(&self) -> bool {
match self.state {
State::PendingJoinInfo { .. } => true,
_ => false,
}
}
/// Returns true if this peer is an established observer.
pub fn is_observer(&self) -> bool {
match self.state {
State::EstablishedObserver { .. } => true,
_ => false,
}
}
/// Returns true if this peer is an established validator.
pub fn is_validator(&self) -> bool {
match self.state {
2018-07-22 01:29:10 -07:00
State::EstablishedValidator { .. } => true,
_ => false,
}
2018-07-17 15:58:48 -07:00
}
2018-07-17 14:06:23 -07:00
/// Returns the peer's wire transmitter.
2018-12-13 20:19:19 -08:00
pub fn tx(&self) -> &WireTx<C, N> {
&self.tx
2018-07-17 14:06:23 -07:00
}
}
/// Peer nodes of the network.
//
// TODO: Keep a separate `HashSet` of validator `OutAddrs` to avoid having to
// iterate through entire list.
2018-07-17 14:06:23 -07:00
#[derive(Debug)]
2018-12-13 20:19:19 -08:00
pub struct Peers<C: Contribution, N: NodeId> {
peers: HashMap<OutAddr, Peer<C, N>>,
out_addrs: HashMap<N, OutAddr>,
2018-11-19 12:43:22 -08:00
local_addr: InAddr,
2018-07-17 14:06:23 -07:00
}
2018-12-13 20:19:19 -08:00
impl<C: Contribution, N: NodeId> Peers<C, N> {
2018-07-17 14:06:23 -07:00
/// Returns a new empty list of peers.
2018-12-13 20:19:19 -08:00
pub(crate) fn new(local_addr: InAddr) -> Peers<C, N> {
2018-07-17 14:06:23 -07:00
Peers {
peers: HashMap::with_capacity(64),
out_addrs: HashMap::with_capacity(64),
2018-11-19 12:43:22 -08:00
local_addr,
2018-07-17 14:06:23 -07:00
}
}
/// Adds a peer to the list.
2018-09-28 06:13:31 -07:00
pub(crate) fn add(
&mut self,
out_addr: OutAddr,
2018-12-13 20:19:19 -08:00
tx: WireTx<C, N>,
pub_info: Option<(N, InAddr, PublicKey)>,
2018-09-28 06:13:31 -07:00
) {
2018-07-17 15:58:48 -07:00
let peer = Peer::new(out_addr, tx, pub_info);
2018-12-13 20:19:19 -08:00
if let State::EstablishedValidator { ref nid, .. } = peer.state {
self.out_addrs.insert(nid.clone(), peer.out_addr);
2018-07-17 14:06:23 -07:00
}
self.peers.insert(peer.out_addr, peer);
}
2018-07-22 01:29:10 -07:00
/// Attempts to set peer as pending-join-info, storing `pub_info`.
///
/// Returns `true` if the peer was already pending.
///
/// ### Panics
///
/// Peer state must be `Handshaking`.
///
/// TODO: Error handling...
2018-09-28 06:13:31 -07:00
pub(crate) fn set_pending<O: Borrow<OutAddr>>(
&mut self,
out_addr: O,
2018-12-13 20:19:19 -08:00
pub_info: (N, InAddr, PublicKey),
2018-09-28 06:13:31 -07:00
) -> bool {
let peer = self.peers.get_mut(out_addr.borrow()).expect(&format!(
"Peers::set_pending: \
No peer found with outgoing address: {}",
out_addr.borrow()
));
2019-08-13 00:38:55 -07:00
match self
.out_addrs
.insert(pub_info.0.clone(), *out_addr.borrow())
{
2018-07-22 01:29:10 -07:00
Some(_out_addr_pub) => {
2018-09-28 06:13:31 -07:00
let pi_pub = peer
.pub_info()
2018-07-22 01:29:10 -07:00
.expect("Peers::set_pending: internal consistency error");
2018-09-28 06:13:31 -07:00
assert!(
pub_info.0 == *pi_pub.0 && pub_info.1 == *pi_pub.1 && pub_info.2 == *pi_pub.2
);
2018-07-22 01:29:10 -07:00
assert!(peer.is_validator());
return true;
2018-09-28 06:13:31 -07:00
}
2018-07-22 01:29:10 -07:00
None => peer.set_pending(pub_info),
}
2018-07-23 12:34:08 -07:00
// false
panic!("Peer::set_pending: Do not use yet.");
2018-07-22 01:29:10 -07:00
}
/// Attempts to establish a peer as an observer.
///
/// ### Panics
///
/// Peer state must be `Handshaking`.
///
/// TODO: Error handling...
pub(crate) fn establish_observer<O: Borrow<OutAddr>>(&mut self, out_addr: O) {
2018-09-28 06:13:31 -07:00
let peer = self.peers.get_mut(out_addr.borrow()).expect(&format!(
"Peers::establish_observer: \
No peer found with outgoing address: {}",
out_addr.borrow()
));
2018-07-23 12:34:08 -07:00
// peer.establish_observer()
panic!("Peer::set_pending: Do not use yet.");
2018-07-22 01:29:10 -07:00
}
/// Attempts to establish a peer as a validator, storing `pub_info`.
///
2018-07-22 01:29:10 -07:00
/// Returns `true` if the peer was already an established validator.
///
/// ### Panics
///
/// Peer state must be `Handshaking` or `EstablishedObserver`.
///
/// TODO: Error handling...
2018-09-28 06:13:31 -07:00
pub(crate) fn establish_validator<O: Borrow<OutAddr>>(
&mut self,
out_addr: O,
2018-12-13 20:19:19 -08:00
pub_info: (N, InAddr, PublicKey),
2018-09-28 06:13:31 -07:00
) -> bool {
let peer = self.peers.get_mut(out_addr.borrow()).expect(&format!(
"Peers::establish_validator: \
No peer found with outgoing address: {}",
out_addr.borrow()
));
2019-08-13 00:38:55 -07:00
match self
.out_addrs
.insert(pub_info.0.clone(), *out_addr.borrow())
{
2018-07-22 01:29:10 -07:00
Some(_out_addr_pub) => {
2018-09-28 06:13:31 -07:00
let pi_pub = peer
.pub_info()
.expect("Peers::establish_validator: internal consistency error");
2018-09-28 06:13:31 -07:00
assert!(
pub_info.0 == *pi_pub.0 && pub_info.1 == *pi_pub.1 && pub_info.2 == *pi_pub.2
);
assert!(peer.is_validator());
return true;
2018-09-28 06:13:31 -07:00
}
2018-07-22 01:29:10 -07:00
None => peer.establish_validator(Some(pub_info)),
}
false
}
2018-12-13 20:19:19 -08:00
pub(crate) fn wire_to_all(&self, msg: WireMessage<C, N>) {
2018-12-07 08:44:48 -08:00
for (_p_addr, peer) in self
.peers
2018-11-19 12:43:22 -08:00
.iter()
.filter(|(&p_addr, _)| p_addr != OutAddr(self.local_addr.0))
{
peer.tx().unbounded_send(msg.clone()).unwrap();
}
}
2018-12-13 20:19:19 -08:00
pub(crate) fn wire_to_validators(&self, msg: WireMessage<C, N>) {
2018-11-19 12:43:22 -08:00
// for peer in peers.validators()
// .filter(|p| p.out_addr() != &OutAddr(self.hdb.addr().0)) {
// peer.tx().unbounded_send(msg.clone()).unwrap();
// }
// FIXME: TEMPORARILY WIRE TO ALL FOR NOW.
self.wire_to_all(msg)
}
2018-12-13 20:19:19 -08:00
/// Sends a `WireMessage` to the target specified by `tar_nid`.
2018-11-19 12:43:22 -08:00
///
/// If the target is not an established node, the message will be returned
/// along with an incremented retry count.
2018-12-07 08:44:48 -08:00
pub(crate) fn wire_to(
&self,
2018-12-13 20:19:19 -08:00
tar_nid: N,
msg: WireMessage<C, N>,
2018-12-07 08:44:48 -08:00
retry_count: usize,
2018-12-13 20:19:19 -08:00
) -> Option<(N, WireMessage<C, N>, usize)> {
match self.get_by_nid(&tar_nid) {
2018-11-19 12:43:22 -08:00
Some(p) => {
p.tx().unbounded_send(msg).unwrap();
None
2018-12-07 08:44:48 -08:00
}
2018-11-19 12:43:22 -08:00
None => {
info!(
2018-12-13 20:19:19 -08:00
"Node '{:?}' is not yet established. Queueing message for now (retry_count: {}).",
tar_nid, retry_count
2018-11-19 12:43:22 -08:00
);
2018-12-13 20:19:19 -08:00
Some((tar_nid, msg, retry_count + 1))
2018-11-19 12:43:22 -08:00
}
}
}
2018-07-17 14:06:23 -07:00
/// Removes a peer the list if it exists.
pub(crate) fn remove<O: Borrow<OutAddr>>(&mut self, out_addr: O) {
let peer = self.peers.remove(out_addr.borrow());
if let Some(p) = peer {
2018-12-13 20:19:19 -08:00
if let Some(nid) = p.node_id() {
self.out_addrs.remove(&nid);
2018-07-17 14:06:23 -07:00
}
}
}
2018-12-13 20:19:19 -08:00
pub(crate) fn get<O: Borrow<OutAddr>>(&self, out_addr: O) -> Option<&Peer<C, N>> {
2018-07-17 14:06:23 -07:00
self.peers.get(out_addr.borrow())
}
2018-12-13 20:19:19 -08:00
pub(crate) fn get_by_nid<U: Borrow<N>>(&self, nid: U) -> Option<&Peer<C, N>> {
2018-09-28 06:13:31 -07:00
self.out_addrs
2018-12-13 20:19:19 -08:00
.get(nid.borrow())
2018-09-28 06:13:31 -07:00
.and_then(|addr| self.get(addr))
2018-07-17 14:06:23 -07:00
}
/// Returns an Iterator over the list of peers.
2018-12-13 20:19:19 -08:00
pub(crate) fn iter(&self) -> HashMapIter<OutAddr, Peer<C, N>> {
2018-07-17 14:06:23 -07:00
self.peers.iter()
}
2018-07-17 15:58:48 -07:00
/// Returns an Iterator over the list of peers.
2018-12-13 20:19:19 -08:00
pub fn peers(&self) -> HashMapValues<OutAddr, Peer<C, N>> {
2018-07-17 15:58:48 -07:00
self.peers.values()
}
/// Returns an iterator over the list of validators.
2018-12-13 20:19:19 -08:00
pub fn validators(&self) -> impl Iterator<Item = &Peer<C, N>> {
self.peers.values().filter(|p| p.is_validator())
}
2018-07-17 14:06:23 -07:00
/// Returns the current number of connected peers.
pub fn count_total(&self) -> usize {
2018-07-17 14:06:23 -07:00
self.peers.len()
}
/// Returns the current number of connected and established validators.
///
/// This is semi-expensive (O(n)).
pub fn count_validators(&self) -> usize {
self.validators().count()
}
pub(crate) fn contains_in_addr<I: Borrow<InAddr>>(&self, in_addr: I) -> bool {
for peer in self.peers.values() {
if let Some(peer_in_addr) = peer.in_addr() {
if peer_in_addr == in_addr.borrow() {
return true;
}
}
}
false
}
2018-07-17 14:06:23 -07:00
}