Add a node id type parameter.

This commit is contained in:
c0gent 2018-12-13 20:19:19 -08:00
parent 0e28bf33cf
commit 42919d730a
No known key found for this signature in database
GPG Key ID: 9CC25E71A743E892
8 changed files with 449 additions and 438 deletions

View File

@ -10,7 +10,7 @@ extern crate serde_derive;
use chrono::Local;
use clap::{App, Arg, ArgMatches};
use hydrabadger::{Blockchain, Config, Hydrabadger, MiningError};
use hydrabadger::{Blockchain, Config, Hydrabadger, MiningError, Uid};
use rand::Rng;
use std::collections::HashSet;
use std::env;
@ -161,7 +161,7 @@ fn main() {
cfg.output_extra_delay_ms = oed.parse().expect("Invalid output extra delay.");
}
let hb = Hydrabadger::new(bind_address, cfg);
let hb = Hydrabadger::new(bind_address, cfg, Uid::new());
let gen_txn = |txn_gen_count, txn_gen_bytes| {
(0..txn_gen_count)

View File

@ -9,7 +9,7 @@ use super::{Error, Hydrabadger, InputOrMessage, State, StateDsct, StateMachine};
use crate::peer::Peers;
use crate::{
key_gen, BatchTx, Contribution, InAddr, InternalMessage, InternalMessageKind, InternalRx,
NetworkState, OutAddr, Step, Uid, WireMessage, WireMessageKind,
NetworkState, OutAddr, Step, Uid, WireMessage, WireMessageKind, NodeId,
};
use crossbeam::queue::SegQueue;
use hbbft::{
@ -22,28 +22,28 @@ use std::{cell::RefCell, collections::HashMap};
use tokio::{self, prelude::*};
/// Hydrabadger event (internal message) handler.
pub struct Handler<T: Contribution> {
hdb: Hydrabadger<T>,
pub struct Handler<C: Contribution, N: NodeId> {
hdb: Hydrabadger<C, N>,
// TODO: Use a bounded tx/rx (find a sensible upper bound):
peer_internal_rx: InternalRx<T>,
peer_internal_rx: InternalRx<C, N>,
/// Outgoing wire message queue.
wire_queue: SegQueue<(Uid, WireMessage<T>, usize)>,
wire_queue: SegQueue<(N, WireMessage<C, N>, usize)>,
/// Output from HoneyBadger.
step_queue: SegQueue<Step<T>>,
step_queue: SegQueue<Step<C, N>>,
// TODO: Use a bounded tx/rx (find a sensible upper bound):
batch_tx: BatchTx<T>,
batch_tx: BatchTx<C, N>,
/// Distributed synchronous key generation instances.
//
// TODO: Move these to separate threads/tasks.
key_gens: RefCell<HashMap<Uid, key_gen::Machine>>,
key_gens: RefCell<HashMap<Uid, key_gen::Machine<N>>>,
}
impl<T: Contribution> Handler<T> {
impl<C: Contribution, N: NodeId> Handler<C, N> {
pub(super) fn new(
hdb: Hydrabadger<T>,
peer_internal_rx: InternalRx<T>,
batch_tx: BatchTx<T>,
) -> Handler<T> {
hdb: Hydrabadger<C, N>,
peer_internal_rx: InternalRx<C, N>,
batch_tx: BatchTx<C, N>,
) -> Handler<C, N> {
Handler {
hdb,
peer_internal_rx,
@ -56,11 +56,11 @@ impl<T: Contribution> Handler<T> {
fn handle_new_established_peer(
&self,
src_uid: Uid,
src_nid: N,
src_pk: PublicKey,
request_change_add: bool,
state: &mut StateMachine<T>,
peers: &Peers<T>,
state: &mut StateMachine<C, N>,
peers: &Peers<C, N>,
) -> Result<(), Error> {
match state.discriminant() {
StateDsct::Disconnected | StateDsct::DeterminingNetworkState => {
@ -79,9 +79,9 @@ impl<T: Contribution> Handler<T> {
// validator), input the change into HB and broadcast, etc.
if request_change_add {
let dhb = state.dhb_mut().unwrap();
info!("Change-Adding ('{}') to honey badger.", src_uid);
info!("Change-Adding ('{:?}') to honey badger.", src_nid);
let step = dhb
.vote_to_add(src_uid, src_pk)
.vote_to_add(src_nid, src_pk)
.expect("Error adding new peer to HB");
self.step_queue.push(step);
}
@ -90,7 +90,7 @@ impl<T: Contribution> Handler<T> {
Ok(())
}
fn handle_iom(&self, iom: InputOrMessage<T>, state: &mut StateMachine<T>) -> Result<(), Error> {
fn handle_iom(&self, iom: InputOrMessage<C, N>, state: &mut StateMachine<C, N>) -> Result<(), Error> {
trace!("hydrabadger::Handler: About to handle_iom: {:?}", iom);
if let Some(step_res) = state.handle_iom(iom) {
let step = step_res.map_err(Error::HbStep)?;
@ -103,16 +103,16 @@ impl<T: Contribution> Handler<T> {
/// Handles a received `Part`.
fn handle_key_gen_part(
&self,
src_uid: &Uid,
src_nid: &N,
part: Part,
state: &mut StateMachine<T>,
peers: &Peers<T>,
state: &mut StateMachine<C, N>,
peers: &Peers<C, N>,
) -> Result<(), Error> {
match state.state {
State::KeyGen {
ref mut key_gen, ..
} => {
key_gen.handle_key_gen_part(src_uid, part, peers);
key_gen.handle_key_gen_part(src_nid, part, peers);
}
State::DeterminingNetworkState {
ref network_state, ..
@ -132,10 +132,10 @@ impl<T: Contribution> Handler<T> {
/// Handles a received `Ack`.
fn handle_key_gen_ack(
&self,
src_uid: &Uid,
src_nid: &N,
ack: Ack,
state: &mut StateMachine<T>,
peers: &Peers<T>,
state: &mut StateMachine<C, N>,
peers: &Peers<C, N>,
) -> Result<(), Error> {
let mut complete = false;
@ -143,14 +143,14 @@ impl<T: Contribution> Handler<T> {
State::KeyGen {
ref mut key_gen, ..
} => {
if key_gen.handle_key_gen_ack(src_uid, ack, peers)? {
if key_gen.handle_key_gen_ack(src_nid, ack, peers)? {
complete = true;
}
}
State::Validator { .. } | State::Observer { .. } => {
error!(
"Additional unhandled `Ack` received from '{}': \n{:?}",
src_uid, ack
"Additional unhandled `Ack` received from '{:?}': \n{:?}",
src_nid, ack
);
}
_ => panic!("::handle_key_gen_ack: State must be `GeneratingKeys`."),
@ -165,9 +165,9 @@ impl<T: Contribution> Handler<T> {
&self,
instance_id: key_gen::InstanceId,
msg: key_gen::Message,
src_uid: &Uid,
state: &mut StateMachine<T>,
peers: &Peers<T>,
src_nid: &N,
state: &mut StateMachine<C, N>,
peers: &Peers<C, N>,
) -> Result<(), Error> {
use crate::key_gen::{InstanceId, MessageKind};
@ -180,10 +180,10 @@ impl<T: Contribution> Handler<T> {
match msg.into_kind() {
MessageKind::Part(part) => {
kg.handle_key_gen_part(src_uid, part, peers);
kg.handle_key_gen_part(src_nid, part, peers);
}
MessageKind::Ack(ack) => {
kg.handle_key_gen_ack(src_uid, ack, peers)?;
kg.handle_key_gen_ack(src_nid, ack, peers)?;
}
}
}
@ -192,10 +192,10 @@ impl<T: Contribution> Handler<T> {
}
InstanceId::BuiltIn => match msg.into_kind() {
MessageKind::Part(part) => {
self.handle_key_gen_part(src_uid, part, state, peers)?;
self.handle_key_gen_part(src_nid, part, state, peers)?;
}
MessageKind::Ack(ack) => {
self.handle_key_gen_ack(src_uid, ack, state, peers)?;
self.handle_key_gen_ack(src_nid, ack, state, peers)?;
}
},
}
@ -207,9 +207,9 @@ impl<T: Contribution> Handler<T> {
// 'unestablished' nodes.
fn handle_join_plan(
&self,
jp: JoinPlan<Uid>,
state: &mut StateMachine<T>,
peers: &Peers<T>,
jp: JoinPlan<N>,
state: &mut StateMachine<C, N>,
peers: &Peers<C, N>,
) -> Result<(), Error> {
debug!("Join plan: \n{:?}", jp);
@ -237,9 +237,9 @@ impl<T: Contribution> Handler<T> {
// TODO: Create a type for `net_info`.
fn instantiate_hb(
&self,
jp_opt: Option<JoinPlan<Uid>>,
state: &mut StateMachine<T>,
peers: &Peers<T>,
jp_opt: Option<JoinPlan<N>>,
state: &mut StateMachine<C, N>,
peers: &Peers<C, N>,
) -> Result<(), Error> {
let mut iom_queue_opt = None;
@ -251,7 +251,7 @@ impl<T: Contribution> Handler<T> {
Some(jp) => {
let epoch = jp.next_epoch();
iom_queue_opt = Some(state.set_observer(
*self.hdb.uid(),
self.hdb.node_id().clone(),
self.hdb.secret_key().clone(),
jp,
self.hdb.config(),
@ -261,7 +261,7 @@ impl<T: Contribution> Handler<T> {
}
None => {
iom_queue_opt = Some(state.set_validator(
*self.hdb.uid(),
self.hdb.node_id().clone(),
self.hdb.secret_key().clone(),
peers,
self.hdb.config(),
@ -301,11 +301,11 @@ impl<T: Contribution> Handler<T> {
/// without including this node.
fn reset_peer_connections(
&self,
_state: &mut StateMachine<T>,
peers: &Peers<T>,
_state: &mut StateMachine<C, N>,
peers: &Peers<C, N>,
) -> Result<(), Error> {
peers.wire_to_validators(WireMessage::hello_request_change_add(
*self.hdb.uid(),
self.hdb.node_id().clone(),
*self.hdb.addr(),
self.hdb.secret_key().public_key(),
));
@ -314,9 +314,9 @@ impl<T: Contribution> Handler<T> {
fn handle_net_state(
&self,
net_state: NetworkState,
state: &mut StateMachine<T>,
peers: &Peers<T>,
net_state: NetworkState<N>,
state: &mut StateMachine<C, N>,
peers: &Peers<C, N>,
) -> Result<(), Error> {
let peer_infos;
match net_state {
@ -382,7 +382,7 @@ impl<T: Contribution> Handler<T> {
tokio::spawn(self.hdb.clone().connect_outgoing(
peer_info.in_addr.0,
local_sk,
Some((peer_info.uid, peer_info.in_addr, peer_info.pk)),
Some((peer_info.nid.clone(), peer_info.in_addr, peer_info.pk)),
false,
));
}
@ -392,9 +392,9 @@ impl<T: Contribution> Handler<T> {
fn handle_peer_disconnect(
&self,
src_uid: Uid,
state: &mut StateMachine<T>,
peers: &Peers<T>,
src_nid: N,
state: &mut StateMachine<C, N>,
peers: &Peers<C, N>,
) -> Result<(), Error> {
state.update_peer_connection_dropped(peers);
@ -414,7 +414,7 @@ impl<T: Contribution> Handler<T> {
// Observers cannot vote.
}
State::Validator { ref mut dhb } => {
let step = dhb.as_mut().unwrap().vote_to_remove(&src_uid)?;
let step = dhb.as_mut().unwrap().vote_to_remove(&src_nid)?;
self.step_queue.push(step);
}
}
@ -423,13 +423,13 @@ impl<T: Contribution> Handler<T> {
fn handle_internal_message(
&self,
i_msg: InternalMessage<T>,
state: &mut StateMachine<T>,
i_msg: InternalMessage<C, N>,
state: &mut StateMachine<C, N>,
) -> Result<(), Error> {
// let mut state_guard = self.hdb.state_mut();
// let mut state = &mut state_guard;
let (src_uid, src_out_addr, w_msg) = i_msg.into_parts();
let (src_nid, src_out_addr, w_msg) = i_msg.into_parts();
match w_msg {
// New incoming connection:
@ -462,7 +462,7 @@ impl<T: Contribution> Handler<T> {
.unwrap()
.tx()
.unbounded_send(WireMessage::welcome_received_change_add(
*self.hdb.uid(),
self.hdb.node_id().clone(),
self.hdb.secret_key().public_key(),
net_state,
))
@ -470,7 +470,7 @@ impl<T: Contribution> Handler<T> {
// Modify state accordingly:
self.handle_new_established_peer(
src_uid.unwrap(),
src_nid.unwrap(),
// src_out_addr,
src_pk,
request_change_add,
@ -484,7 +484,7 @@ impl<T: Contribution> Handler<T> {
// This message must be immediately followed by either a
// `WireMessage::HelloFromValidator` or
// `WireMessage::WelcomeReceivedChangeAdd`.
debug_assert!(src_uid.is_none());
debug_assert!(src_nid.is_none());
let peers = self.hdb.peers();
state.update_peer_connection_added(&peers);
@ -499,17 +499,17 @@ impl<T: Contribution> Handler<T> {
}
InternalMessageKind::HbMessage(msg) => {
self.handle_iom(InputOrMessage::Message(src_uid.unwrap(), msg), state)?;
self.handle_iom(InputOrMessage::Message(src_nid.unwrap(), msg), state)?;
}
InternalMessageKind::PeerDisconnect => {
let dropped_src_uid = src_uid.unwrap();
let dropped_src_nid = src_nid.unwrap();
info!(
"Peer disconnected: ({}: '{}').",
src_out_addr, dropped_src_uid
"Peer disconnected: ({}: '{:?}').",
src_out_addr, dropped_src_nid
);
let peers = self.hdb.peers();
self.handle_peer_disconnect(dropped_src_uid, state, &peers)?;
self.handle_peer_disconnect(dropped_src_nid, state, &peers)?;
}
InternalMessageKind::NewKeyGenInstance(tx) => {
@ -520,7 +520,7 @@ impl<T: Contribution> Handler<T> {
// tx.unbounded_send(key_gen::Message::instance_id().unwrap();
let instance_id = key_gen::InstanceId::User(new_id.clone());
let key_gen = key_gen::Machine::generate(
self.hdb.uid(),
self.hdb.node_id(),
self.hdb.secret_key().clone(),
&peers,
tx,
@ -534,18 +534,18 @@ impl<T: Contribution> Handler<T> {
// relevant details for a peer (generally preceeding other
// messages which may arrive before `Welcome...`.
WireMessageKind::HelloFromValidator(
src_uid_new,
src_nid_new,
src_in_addr,
src_pk,
net_state,
) => {
debug!("Received hello from {}", src_uid_new);
debug!("Received hello from {:?}", src_nid_new);
let mut peers = self.hdb.peers_mut();
match peers
.establish_validator(src_out_addr, (src_uid_new, src_in_addr, src_pk))
.establish_validator(src_out_addr, (src_nid_new.clone(), src_in_addr, src_pk))
{
true => debug_assert!(src_uid_new == src_uid.unwrap()),
false => debug_assert!(src_uid.is_none()),
true => debug_assert!(src_nid_new == src_nid.unwrap()),
false => debug_assert!(src_nid.is_none()),
}
// Modify state accordingly:
@ -553,15 +553,15 @@ impl<T: Contribution> Handler<T> {
}
// New outgoing connection response:
WireMessageKind::WelcomeReceivedChangeAdd(src_uid_new, src_pk, net_state) => {
WireMessageKind::WelcomeReceivedChangeAdd(src_nid_new, src_pk, net_state) => {
debug!("Received NetworkState: \n{:?}", net_state);
assert!(src_uid_new == src_uid.unwrap());
assert!(src_nid_new == src_nid.unwrap());
let mut peers = self.hdb.peers_mut();
// Set new (outgoing-connection) peer's public info:
peers.establish_validator(
src_out_addr,
(src_uid_new, InAddr(src_out_addr.0), src_pk),
(src_nid_new.clone(), InAddr(src_out_addr.0), src_pk),
);
// Modify state accordingly:
@ -569,7 +569,7 @@ impl<T: Contribution> Handler<T> {
// Modify state accordingly:
self.handle_new_established_peer(
src_uid_new,
src_nid_new,
// src_out_addr,
src_pk,
false,
@ -582,7 +582,7 @@ impl<T: Contribution> Handler<T> {
self.handle_key_gen_message(
instance_id,
msg,
&src_uid.unwrap(),
&src_nid.unwrap(),
state,
&self.hdb.peers(),
)?;
@ -606,7 +606,7 @@ impl<T: Contribution> Handler<T> {
}
}
impl<T: Contribution> Future for Handler<T> {
impl<C: Contribution, N: NodeId> Future for Handler<C, N> {
type Item = ();
type Error = Error;
@ -641,15 +641,15 @@ impl<T: Contribution> Future for Handler<T> {
let peers = self.hdb.peers();
// Process outgoing wire queue:
while let Some((tar_uid, msg, retry_count)) = self.wire_queue.try_pop() {
while let Some((tar_nid, msg, retry_count)) = self.wire_queue.try_pop() {
if retry_count < WIRE_MESSAGE_RETRY_MAX {
info!(
"Sending queued message from retry queue (retry_count: {})",
retry_count
);
peers.wire_to(tar_uid, msg, retry_count);
peers.wire_to(tar_nid, msg, retry_count);
} else {
info!("Discarding queued message for '{}': {:?}", tar_uid, msg);
info!("Discarding queued message for '{:?}': {:?}", tar_nid, msg);
}
}
@ -683,7 +683,7 @@ impl<T: Contribution> Future for Handler<T> {
ChangeState::InProgress(_change) => {}
ChangeState::Complete(change) => match change {
DhbChange::NodeChange(pub_keys) => {
if let Some(pk) = pub_keys.get(self.hdb.uid()) {
if let Some(pk) = pub_keys.get(self.hdb.node_id()) {
assert_eq!(*pk, self.hdb.secret_key().public_key());
assert!(state.dhb().unwrap().netinfo().is_validator());
if state.discriminant() == StateDsct::Observer {
@ -730,15 +730,15 @@ impl<T: Contribution> Future for Handler<T> {
for hb_msg in step.messages.drain(..) {
trace!("hydrabadger::Handler: Forwarding message: {:?}", hb_msg);
match hb_msg.target {
Target::Node(p_uid) => {
Target::Node(p_nid) => {
peers.wire_to(
p_uid,
WireMessage::message(*self.hdb.uid(), hb_msg.message),
p_nid,
WireMessage::message(self.hdb.node_id().clone(), hb_msg.message),
0,
);
}
Target::All => {
peers.wire_to_all(WireMessage::message(*self.hdb.uid(), hb_msg.message));
peers.wire_to_all(WireMessage::message(self.hdb.node_id().clone(), hb_msg.message));
}
}
}

View File

@ -1,11 +1,12 @@
//! A hydrabadger consensus node.
//!
use serde::de::DeserializeOwned;
use super::{Error, Handler, StateDsct, StateMachine};
use crate::peer::{PeerHandler, Peers};
use crate::{
key_gen, BatchRx, Change, Contribution, EpochRx, EpochTx, InAddr, InternalMessage, InternalTx,
OutAddr, Uid, WireMessage, WireMessageKind, WireMessages,
OutAddr, WireMessage, WireMessageKind, WireMessages, NodeId,
};
use futures::{
future::{self, Either},
@ -78,25 +79,26 @@ impl Default for Config {
/// The `Arc` wrapped portion of `Hydrabadger`.
///
/// Shared all over the place.
struct Inner<T: Contribution> {
/// Node uid:
uid: Uid,
struct Inner<C: Contribution, N: NodeId> {
/// Node nid:
// nid: Uid,
nid: N,
/// Incoming connection socket.
addr: InAddr,
/// This node's secret key.
secret_key: SecretKey,
peers: RwLock<Peers<T>>,
peers: RwLock<Peers<C, N>>,
/// The current state containing HB when connected.
state: RwLock<StateMachine<T>>,
state: RwLock<StateMachine<C, N>>,
/// A reference to the last known state discriminant. May be stale when read.
state_dsct_stale: Arc<AtomicUsize>,
// TODO: Use a bounded tx/rx (find a sensible upper bound):
peer_internal_tx: InternalTx<T>,
peer_internal_tx: InternalTx<C, N>,
/// The earliest epoch from which we have not yet received output.
//
@ -104,7 +106,7 @@ struct Inner<T: Contribution> {
current_epoch: Mutex<u64>,
// TODO: Create a separate type which uses a hashmap internally and allows
// for Tx removal. Altenratively just `Option` wrap Txs.
// for Tx removal. Alternatively just `Option` wrap Txs.
epoch_listeners: RwLock<Vec<EpochTx>>,
config: Config,
@ -112,16 +114,16 @@ struct Inner<T: Contribution> {
/// A `HoneyBadger` network node.
#[derive(Clone)]
pub struct Hydrabadger<T: Contribution> {
inner: Arc<Inner<T>>,
handler: Arc<Mutex<Option<Handler<T>>>>,
batch_rx: Arc<Mutex<Option<BatchRx<T>>>>,
pub struct Hydrabadger<C: Contribution, N: NodeId> {
inner: Arc<Inner<C, N>>,
handler: Arc<Mutex<Option<Handler<C, N>>>>,
batch_rx: Arc<Mutex<Option<BatchRx<C, N>>>>,
}
impl<T: Contribution> Hydrabadger<T> {
impl<C: Contribution, N: NodeId + DeserializeOwned + 'static> Hydrabadger<C, N> {
/// Returns a new Hydrabadger node.
pub fn new(addr: SocketAddr, cfg: Config) -> Self {
let uid = Uid::new();
pub fn new(addr: SocketAddr, cfg: Config, nid: N) -> Self {
// let nid = Uid::new();
let secret_key = SecretKey::rand(&mut rand::OsRng::new().expect("Unable to create rng"));
let (peer_internal_tx, peer_internal_rx) = mpsc::unbounded();
@ -129,7 +131,7 @@ impl<T: Contribution> Hydrabadger<T> {
info!("");
info!("Local Hydrabadger Node: ");
info!(" UID: {}", uid);
info!(" UID: {:?}", nid);
info!(" Socket Address: {}", addr);
info!(" Public Key: {:?}", secret_key.public_key());
@ -143,7 +145,7 @@ impl<T: Contribution> Hydrabadger<T> {
let state_dsct_stale = state.dsct.clone();
let inner = Arc::new(Inner {
uid,
nid,
addr: InAddr(addr),
secret_key,
peers: RwLock::new(Peers::new(InAddr(addr))),
@ -167,27 +169,27 @@ impl<T: Contribution> Hydrabadger<T> {
}
/// Returns a new Hydrabadger node.
pub fn with_defaults(addr: SocketAddr) -> Self {
Hydrabadger::new(addr, Config::default())
pub fn with_defaults(addr: SocketAddr, nid: N) -> Self {
Hydrabadger::new(addr, Config::default(), nid)
}
/// Returns the pre-created handler.
pub fn handler(&self) -> Option<Handler<T>> {
pub fn handler(&self) -> Option<Handler<C, N>> {
self.handler.lock().take()
}
/// Returns the batch output receiver.
pub fn batch_rx(&self) -> Option<BatchRx<T>> {
pub fn batch_rx(&self) -> Option<BatchRx<C, N>> {
self.batch_rx.lock().take()
}
/// Returns a reference to the inner state.
pub fn state(&self) -> RwLockReadGuard<StateMachine<T>> {
pub fn state(&self) -> RwLockReadGuard<StateMachine<C, N>> {
self.inner.state.read()
}
/// Returns a mutable reference to the inner state.
pub(crate) fn state_mut(&self) -> RwLockWriteGuard<StateMachine<T>> {
pub(crate) fn state_mut(&self) -> RwLockWriteGuard<StateMachine<C, N>> {
self.inner.state.write()
}
@ -204,12 +206,12 @@ impl<T: Contribution> Hydrabadger<T> {
}
/// Returns a reference to the peers list.
pub fn peers(&self) -> RwLockReadGuard<Peers<T>> {
pub fn peers(&self) -> RwLockReadGuard<Peers<C, N>> {
self.inner.peers.read()
}
/// Returns a mutable reference to the peers list.
pub(crate) fn peers_mut(&self) -> RwLockWriteGuard<Peers<T>> {
pub(crate) fn peers_mut(&self) -> RwLockWriteGuard<Peers<C, N>> {
self.inner.peers.write()
}
@ -257,7 +259,7 @@ impl<T: Contribution> Hydrabadger<T> {
}
/// Sends a message on the internal tx.
pub(crate) fn send_internal(&self, msg: InternalMessage<T>) {
pub(crate) fn send_internal(&self, msg: InternalMessage<C, N>) {
if let Err(err) = self.inner.peer_internal_tx.unbounded_send(msg) {
error!(
"Unable to send on internal tx. Internal rx has dropped: {}",
@ -268,10 +270,10 @@ impl<T: Contribution> Hydrabadger<T> {
}
/// Handles a incoming batch of user transactions.
pub fn propose_user_contribution(&self, txn: T) -> Result<(), Error> {
pub fn propose_user_contribution(&self, txn: C) -> Result<(), Error> {
if self.is_validator() {
self.send_internal(InternalMessage::hb_contribution(
self.inner.uid,
self.inner.nid.clone(),
OutAddr(*self.inner.addr),
txn,
));
@ -282,10 +284,10 @@ impl<T: Contribution> Hydrabadger<T> {
}
/// Casts a vote for a change in the validator set or configuration.
pub fn vote_for(&self, change: Change) -> Result<(), Error> {
pub fn vote_for(&self, change: Change<N>) -> Result<(), Error> {
if self.is_validator() {
self.send_internal(InternalMessage::hb_vote(
self.inner.uid,
self.inner.nid.clone(),
OutAddr(*self.inner.addr),
change,
));
@ -300,7 +302,7 @@ impl<T: Contribution> Hydrabadger<T> {
pub fn new_key_gen_instance(&self) -> mpsc::UnboundedReceiver<key_gen::Message> {
let (tx, rx) = mpsc::unbounded();
self.send_internal(InternalMessage::new_key_gen_instance(
self.inner.uid,
self.inner.nid.clone(),
OutAddr(*self.inner.addr),
tx,
));
@ -310,7 +312,7 @@ impl<T: Contribution> Hydrabadger<T> {
/// Returns a future that handles incoming connections on `socket`.
fn handle_incoming(self, socket: TcpStream) -> impl Future<Item = (), Error = ()> {
info!("Incoming connection from '{}'", socket.peer_addr().unwrap());
let wire_msgs = WireMessages::new(socket, self.inner.secret_key.clone());
let wire_msgs: WireMessages<C, N> = WireMessages::new(socket, self.inner.secret_key.clone());
wire_msgs
.into_future()
@ -321,10 +323,10 @@ impl<T: Contribution> Hydrabadger<T> {
match msg_opt {
Some(msg) => match msg.into_kind() {
// The only correct entry point:
WireMessageKind::HelloRequestChangeAdd(peer_uid, peer_in_addr, peer_pk) => {
WireMessageKind::HelloRequestChangeAdd(peer_nid, peer_in_addr, peer_pk) => {
// Also adds a `Peer` to `self.peers`.
let peer_h = PeerHandler::new(
Some((peer_uid, peer_in_addr, peer_pk)),
Some((peer_nid.clone(), peer_in_addr, peer_pk)),
self.clone(),
w_messages,
);
@ -333,7 +335,7 @@ impl<T: Contribution> Hydrabadger<T> {
peer_h
.hdb()
.send_internal(InternalMessage::new_incoming_connection(
peer_uid,
peer_nid.clone(),
*peer_h.out_addr(),
peer_in_addr,
peer_pk,
@ -365,10 +367,10 @@ impl<T: Contribution> Hydrabadger<T> {
self,
remote_addr: SocketAddr,
local_sk: SecretKey,
pub_info: Option<(Uid, InAddr, PublicKey)>,
pub_info: Option<(N, InAddr, PublicKey)>,
is_optimistic: bool,
) -> impl Future<Item = (), Error = ()> {
let uid = self.inner.uid;
let nid = self.inner.nid.clone();
let in_addr = self.inner.addr;
info!("Initiating outgoing connection to: {}", remote_addr);
@ -380,7 +382,7 @@ impl<T: Contribution> Hydrabadger<T> {
// Wrap the socket with the frame delimiter and codec:
let mut wire_msgs = WireMessages::new(socket, local_sk);
let wire_hello_result = wire_msgs.send_msg(WireMessage::hello_request_change_add(
uid, in_addr, local_pk,
nid, in_addr, local_pk,
));
match wire_hello_result {
Ok(_) => {
@ -410,7 +412,7 @@ impl<T: Contribution> Hydrabadger<T> {
fn generate_contributions(
self,
gen_txns: Option<fn(usize, usize) -> T>,
gen_txns: Option<fn(usize, usize) -> C>,
) -> impl Future<Item = (), Error = ()> {
if let Some(gen_txns) = gen_txns {
let epoch_stream = self.register_epoch_listener();
@ -436,7 +438,7 @@ impl<T: Contribution> Hydrabadger<T> {
);
hdb.send_internal(InternalMessage::hb_contribution(
hdb.inner.uid,
hdb.inner.nid.clone(),
OutAddr(*hdb.inner.addr),
txns,
));
@ -495,7 +497,7 @@ impl<T: Contribution> Hydrabadger<T> {
pub fn node(
self,
remotes: Option<HashSet<SocketAddr>>,
gen_txns: Option<fn(usize, usize) -> T>,
gen_txns: Option<fn(usize, usize) -> C>,
) -> impl Future<Item = (), Error = ()> {
let socket = TcpListener::bind(&self.inner.addr).unwrap();
info!("Listening on: {}", self.inner.addr);
@ -541,7 +543,7 @@ impl<T: Contribution> Hydrabadger<T> {
pub fn run_node(
self,
remotes: Option<HashSet<SocketAddr>>,
gen_txns: Option<fn(usize, usize) -> T>,
gen_txns: Option<fn(usize, usize) -> C>,
) {
tokio::run(self.node(remotes, gen_txns));
}
@ -550,15 +552,15 @@ impl<T: Contribution> Hydrabadger<T> {
&self.inner.addr
}
pub fn uid(&self) -> &Uid {
&self.inner.uid
pub fn node_id(&self) -> &N {
&self.inner.nid
}
pub fn secret_key(&self) -> &SecretKey {
&self.inner.secret_key
}
pub fn to_weak(&self) -> HydrabadgerWeak<T> {
pub fn to_weak(&self) -> HydrabadgerWeak<C, N> {
HydrabadgerWeak {
inner: Arc::downgrade(&self.inner),
handler: Arc::downgrade(&self.handler),
@ -567,14 +569,14 @@ impl<T: Contribution> Hydrabadger<T> {
}
}
pub struct HydrabadgerWeak<T: Contribution> {
inner: Weak<Inner<T>>,
handler: Weak<Mutex<Option<Handler<T>>>>,
batch_rx: Weak<Mutex<Option<BatchRx<T>>>>,
pub struct HydrabadgerWeak<C: Contribution, N: NodeId> {
inner: Weak<Inner<C, N>>,
handler: Weak<Mutex<Option<Handler<C, N>>>>,
batch_rx: Weak<Mutex<Option<BatchRx<C, N>>>>,
}
impl<T: Contribution> HydrabadgerWeak<T> {
pub fn upgrade(self) -> Option<Hydrabadger<T>> {
impl<C: Contribution, N: NodeId> HydrabadgerWeak<C, N> {
pub fn upgrade(self) -> Option<Hydrabadger<C, N>> {
self.inner.upgrade().and_then(|inner| {
self.handler.upgrade().and_then(|handler| {
self.batch_rx.upgrade().and_then(|batch_rx| {

View File

@ -3,7 +3,7 @@
use super::Error;
use crate::hydrabadger::hydrabadger::Hydrabadger;
use crate::peer::Peers;
use crate::{Contribution, NetworkState, Uid, WireMessage};
use crate::{Contribution, NetworkState, Uid, WireMessage, NodeId};
use crossbeam::queue::SegQueue;
use futures::sync::mpsc;
use hbbft::{
@ -55,30 +55,30 @@ impl Message {
/// Key generation state.
#[derive(Debug)]
pub(super) enum State {
pub(super) enum State<N> {
AwaitingPeers {
required_peers: Vec<Uid>,
available_peers: Vec<Uid>,
required_peers: Vec<N>,
available_peers: Vec<N>,
},
Generating {
sync_key_gen: Option<SyncKeyGen<Uid>>,
sync_key_gen: Option<SyncKeyGen<N>>,
public_key: Option<PublicKey>,
public_keys: BTreeMap<Uid, PublicKey>,
public_keys: BTreeMap<N, PublicKey>,
part_count: usize,
ack_count: usize,
},
Complete {
sync_key_gen: Option<SyncKeyGen<Uid>>,
sync_key_gen: Option<SyncKeyGen<N>>,
public_key: Option<PublicKey>,
},
}
/// Forwards an `Ack` to a `SyncKeyGen` instance.
fn handle_ack(uid: &Uid, ack: Ack, ack_count: &mut usize, sync_key_gen: &mut SyncKeyGen<Uid>) {
trace!("KEY GENERATION: Handling ack from '{}'...", uid);
fn handle_ack<N: NodeId>(nid: &N, ack: Ack, ack_count: &mut usize, sync_key_gen: &mut SyncKeyGen<N>) {
trace!("KEY GENERATION: Handling ack from '{:?}'...", nid);
let ack_outcome = sync_key_gen
.handle_ack(uid, ack.clone())
.handle_ack(nid, ack.clone())
.expect("Failed to handle Ack.");
match ack_outcome {
AckOutcome::Invalid(fault) => error!("Error handling ack: '{:?}':\n{:?}", ack, fault),
@ -88,12 +88,12 @@ fn handle_ack(uid: &Uid, ack: Ack, ack_count: &mut usize, sync_key_gen: &mut Syn
/// Forwards all queued `Ack`s to a `SyncKeyGen` instance if `part_count` is
/// sufficient.
fn handle_queued_acks<T: Contribution>(
ack_queue: &SegQueue<(Uid, Ack)>,
fn handle_queued_acks<C: Contribution, N: NodeId>(
ack_queue: &SegQueue<(N, Ack)>,
part_count: usize,
ack_count: &mut usize,
sync_key_gen: &mut SyncKeyGen<Uid>,
peers: &Peers<T>,
sync_key_gen: &mut SyncKeyGen<N>,
peers: &Peers<C, N>,
) {
if part_count == peers.count_validators() + 1 {
trace!("KEY GENERATION: Handling queued acks...");
@ -102,29 +102,29 @@ fn handle_queued_acks<T: Contribution>(
debug!(" Part count: {}", part_count);
debug!(" Ack count: {}", ack_count);
while let Some((uid, ack)) = ack_queue.try_pop() {
handle_ack(&uid, ack, ack_count, sync_key_gen);
while let Some((nid, ack)) = ack_queue.try_pop() {
handle_ack(&nid, ack, ack_count, sync_key_gen);
}
}
}
/// Manages the key generation state.
#[derive(Debug)]
pub struct Machine {
state: State,
ack_queue: SegQueue<(Uid, Ack)>,
pub struct Machine<N> {
state: State<N>,
ack_queue: SegQueue<(N, Ack)>,
event_tx: Option<mpsc::UnboundedSender<Message>>,
instance_id: InstanceId,
}
impl Machine {
impl<N: NodeId> Machine<N> {
/// Creates and returns a new `Machine` in the `AwaitingPeers`
/// state.
pub fn awaiting_peers(
ack_queue: SegQueue<(Uid, Ack)>,
ack_queue: SegQueue<(N, Ack)>,
event_tx: Option<mpsc::UnboundedSender<Message>>,
instance_id: InstanceId,
) -> Machine {
) -> Machine<N> {
Machine {
state: State::AwaitingPeers {
required_peers: Vec::new(),
@ -138,13 +138,13 @@ impl Machine {
/// Creates and returns a new `Machine` in the `Generating`
/// state.
pub fn generate<T: Contribution>(
local_uid: &Uid,
pub fn generate<C: Contribution>(
local_nid: &N,
local_sk: SecretKey,
peers: &Peers<T>,
peers: &Peers<C, N>,
event_tx: mpsc::UnboundedSender<Message>,
instance_id: InstanceId,
) -> Result<Machine, Error> {
) -> Result<Machine<N>, Error> {
let mut m = Machine {
state: State::AwaitingPeers {
required_peers: Vec::new(),
@ -155,7 +155,7 @@ impl Machine {
instance_id: instance_id.clone(),
};
let (part, ack) = m.set_generating_keys(local_uid, local_sk, peers)?;
let (part, ack) = m.set_generating_keys(local_nid, local_sk, peers)?;
peers.wire_to_validators(WireMessage::key_gen_part(instance_id.clone(), part));
peers.wire_to_validators(WireMessage::key_gen_ack(instance_id, ack));
@ -164,11 +164,11 @@ impl Machine {
}
/// Sets the state to `AwaitingMorePeersForKeyGeneration`.
pub(super) fn set_generating_keys<T: Contribution>(
pub(super) fn set_generating_keys<C: Contribution>(
&mut self,
local_uid: &Uid,
local_nid: &N,
local_sk: SecretKey,
peers: &Peers<T>,
peers: &Peers<C, N>,
) -> Result<(Part, Ack), Error> {
let (part, ack);
self.state = match self.state {
@ -176,19 +176,19 @@ impl Machine {
// let threshold = config.keygen_peer_count / 3;
let threshold = peers.count_validators() / 3;
let mut public_keys: BTreeMap<Uid, PublicKey> = peers
let mut public_keys: BTreeMap<N, PublicKey> = peers
.validators()
.map(|p| p.pub_info().map(|(uid, _, pk)| (*uid, *pk)).unwrap())
.map(|p| p.pub_info().map(|(nid, _, pk)| (nid.clone(), *pk)).unwrap())
.collect();
let pk = local_sk.public_key();
public_keys.insert(*local_uid, pk);
public_keys.insert(local_nid.clone(), pk);
let mut rng = rand::OsRng::new().expect("Creating OS Rng has failed");
let (mut sync_key_gen, opt_part) = SyncKeyGen::new(
&mut rng,
*local_uid,
local_nid.clone(),
local_sk,
public_keys.clone(),
threshold,
@ -198,7 +198,7 @@ impl Machine {
trace!("KEY GENERATION: Handling our own `Part`...");
ack = match sync_key_gen
.handle_part(&mut rng, &local_uid, part.clone())
.handle_part(&mut rng, &local_nid, part.clone())
.expect("Handling our own Part has failed")
{
PartOutcome::Valid(Some(ack)) => ack,
@ -209,7 +209,7 @@ impl Machine {
};
trace!("KEY GENERATION: Queueing our own `Ack`...");
self.ack_queue.push((*local_uid, ack.clone()));
self.ack_queue.push((local_nid.clone(), ack.clone()));
State::Generating {
sync_key_gen: Some(sync_key_gen),
@ -230,27 +230,27 @@ impl Machine {
/// Notify this key generation instance that peers have been added.
//
// TODO: Move some of this logic back to handler.
pub(super) fn add_peers<T: Contribution>(
pub(super) fn add_peers<C: Contribution>(
&mut self,
peers: &Peers<T>,
hdb: &Hydrabadger<T>,
net_state: NetworkState,
peers: &Peers<C, N>,
hdb: &Hydrabadger<C, N>,
net_state: NetworkState<N>,
) -> Result<(), Error> {
match self.state {
State::AwaitingPeers { .. } => {
if peers.count_validators() >= hdb.config().keygen_peer_count {
info!("BEGINNING KEY GENERATION");
let local_uid = *hdb.uid();
let local_nid = hdb.node_id().clone();
let local_in_addr = *hdb.addr();
let local_pk = hdb.secret_key().public_key();
let (part, ack) =
self.set_generating_keys(&local_uid, hdb.secret_key().clone(), peers)?;
self.set_generating_keys(&local_nid, hdb.secret_key().clone(), peers)?;
trace!("KEY GENERATION: Sending initial parts and our own ack.");
peers.wire_to_validators(WireMessage::hello_from_validator(
local_uid,
local_nid,
local_in_addr,
local_pk,
net_state,
@ -279,11 +279,11 @@ impl Machine {
}
/// Handles a received `Part`.
pub(super) fn handle_key_gen_part<T: Contribution>(
pub(super) fn handle_key_gen_part<C: Contribution>(
&mut self,
src_uid: &Uid,
src_nid: &N,
part: Part,
peers: &Peers<T>,
peers: &Peers<C, N>,
) {
match self.state {
State::Generating {
@ -293,10 +293,10 @@ impl Machine {
..
} => {
// TODO: Move this match block into a function somewhere for re-use:
trace!("KEY GENERATION: Handling part from '{}'...", src_uid);
trace!("KEY GENERATION: Handling part from '{:?}'...", src_nid);
let mut rng = rand::OsRng::new().expect("Creating OS Rng has failed");
let skg = sync_key_gen.as_mut().unwrap();
let ack = match skg.handle_part(&mut rng, src_uid, part) {
let ack = match skg.handle_part(&mut rng, src_nid, part) {
Ok(PartOutcome::Valid(Some(ack))) => ack,
Ok(PartOutcome::Invalid(faults)) => panic!(
"Invalid part \
@ -316,11 +316,11 @@ impl Machine {
*part_count += 1;
trace!("KEY GENERATION: Queueing `Ack`.");
self.ack_queue.push((*src_uid, ack.clone()));
self.ack_queue.push((src_nid.clone(), ack.clone()));
trace!(
"KEY GENERATION: Part from '{}' acknowledged. Broadcasting ack...",
src_uid
"KEY GENERATION: Part from '{:?}' acknowledged. Broadcasting ack...",
src_nid
);
peers.wire_to_validators(WireMessage::key_gen_ack(self.instance_id.clone(), ack));
@ -339,13 +339,13 @@ impl Machine {
}
/// Handles a received `Ack`.
pub(super) fn handle_key_gen_ack<T: Contribution>(
pub(super) fn handle_key_gen_ack<C: Contribution>(
&mut self,
src_uid: &Uid,
src_nid: &N,
ack: Ack,
peers: &Peers<T>,
peers: &Peers<C, N>,
) -> Result<bool, Error> {
let mut complete: Option<(SyncKeyGen<Uid>, PublicKey)> = None;
let mut complete: Option<(SyncKeyGen<N>, PublicKey)> = None;
match self.state {
State::Generating {
@ -359,7 +359,7 @@ impl Machine {
let skg = sync_key_gen.as_mut().unwrap();
trace!("KEY GENERATION: Queueing `Ack`.");
self.ack_queue.push((*src_uid, ack.clone()));
self.ack_queue.push((src_nid.clone(), ack.clone()));
handle_queued_acks(&self.ack_queue, *part_count, ack_count, skg, peers);
};
@ -395,7 +395,7 @@ impl Machine {
}
/// Returns the state of this key generation instance.
pub(super) fn state(&self) -> &State {
pub(super) fn state(&self) -> &State<N> {
&self.state
}
@ -409,7 +409,7 @@ impl Machine {
/// Returns the `SyncKeyGen` instance and `PublicKey` if this key
/// generation instance is complete.
pub(super) fn complete(&mut self) -> Option<(SyncKeyGen<Uid>, PublicKey)> {
pub(super) fn complete(&mut self) -> Option<(SyncKeyGen<N>, PublicKey)> {
match self.state {
State::Complete {
ref mut sync_key_gen,

View File

@ -3,9 +3,10 @@ mod hydrabadger;
pub mod key_gen;
mod state;
use rand::Rand;
use self::handler::Handler;
use self::state::{State, StateMachine};
use crate::{Change, Message, Uid};
use crate::{Change, Message};
use bincode;
use hbbft::{dynamic_honey_badger::Error as DhbError, sync_key_gen::Error as SyncKeyGenError};
use std;
@ -18,10 +19,10 @@ pub const WIRE_MESSAGE_RETRY_MAX: usize = 10;
/// A HoneyBadger input or message.
#[derive(Clone, Debug)]
pub enum InputOrMessage<T> {
Change(Change),
pub enum InputOrMessage<T, N: Ord + Rand> {
Change(Change<N>),
Contribution(T),
Message(Uid, Message),
Message(N, Message<N>),
}
// TODO: Move this up to `lib.rs` or, preferably, create another error type

View File

@ -7,7 +7,7 @@
use super::{key_gen, Config, Error, InputOrMessage};
use crate::peer::Peers;
use crate::{ActiveNetworkInfo, Contribution, NetworkNodeInfo, NetworkState, Step, Uid};
use crate::{ActiveNetworkInfo, Contribution, NetworkNodeInfo, NetworkState, Step, NodeId};
use crossbeam::queue::SegQueue;
use hbbft::{
crypto::{PublicKey, SecretKey},
@ -65,26 +65,26 @@ impl From<usize> for StateDsct {
/// The current hydrabadger state.
//
pub enum State<T: Contribution> {
pub enum State<C: Contribution, N: NodeId> {
Disconnected {},
DeterminingNetworkState {
ack_queue: Option<SegQueue<(Uid, Ack)>>,
iom_queue: Option<SegQueue<InputOrMessage<T>>>,
network_state: Option<NetworkState>,
ack_queue: Option<SegQueue<(N, Ack)>>,
iom_queue: Option<SegQueue<InputOrMessage<C, N>>>,
network_state: Option<NetworkState<N>>,
},
KeyGen {
key_gen: key_gen::Machine,
iom_queue: Option<SegQueue<InputOrMessage<T>>>,
key_gen: key_gen::Machine<N>,
iom_queue: Option<SegQueue<InputOrMessage<C, N>>>,
},
Observer {
dhb: Option<DynamicHoneyBadger<T, Uid>>,
dhb: Option<DynamicHoneyBadger<C, N>>,
},
Validator {
dhb: Option<DynamicHoneyBadger<T, Uid>>,
dhb: Option<DynamicHoneyBadger<C, N>>,
},
}
impl<T: Contribution> State<T> {
impl<C: Contribution, N: NodeId> State<C, N> {
/// Returns the state discriminant.
pub(super) fn discriminant(&self) -> StateDsct {
match self {
@ -97,14 +97,14 @@ impl<T: Contribution> State<T> {
}
}
pub struct StateMachine<T: Contribution> {
pub(crate) state: State<T>,
pub struct StateMachine<C: Contribution, N: NodeId> {
pub(crate) state: State<C, N>,
pub(crate) dsct: Arc<AtomicUsize>,
}
impl<T: Contribution> StateMachine<T> {
impl<C: Contribution, N: NodeId> StateMachine<C, N> {
/// Returns a new `State::Disconnected`.
pub(super) fn disconnected() -> StateMachine<T> {
pub(super) fn disconnected() -> StateMachine<C, N> {
StateMachine {
state: State::Disconnected {},
dsct: Arc::new(AtomicUsize::new(0)),
@ -172,7 +172,7 @@ impl<T: Contribution> StateMachine<T> {
/// Sets state to `DeterminingNetworkState` if
/// `AwaitingMorePeersForKeyGeneration`, otherwise panics.
pub(super) fn set_determining_network_state_active(&mut self, net_info: ActiveNetworkInfo) {
pub(super) fn set_determining_network_state_active(&mut self, net_info: ActiveNetworkInfo<N>) {
self.state = match self.state {
State::KeyGen {
ref mut iom_queue, ..
@ -197,19 +197,19 @@ impl<T: Contribution> StateMachine<T> {
#[must_use]
pub(super) fn set_observer(
&mut self,
local_uid: Uid,
local_nid: N,
local_sk: SecretKey,
jp: JoinPlan<Uid>,
jp: JoinPlan<N>,
_cfg: &Config,
step_queue: &SegQueue<Step<T>>,
) -> Result<SegQueue<InputOrMessage<T>>, Error> {
step_queue: &SegQueue<Step<C, N>>,
) -> Result<SegQueue<InputOrMessage<C, N>>, Error> {
let iom_queue_ret;
self.state = match self.state {
State::DeterminingNetworkState {
ref mut iom_queue, ..
} => {
let (dhb, dhb_step) =
DynamicHoneyBadger::new_joining(local_uid, local_sk, jp, StdRng::new()?)?;
DynamicHoneyBadger::new_joining(local_nid, local_sk, jp, StdRng::new()?)?;
step_queue.push(dhb_step);
iom_queue_ret = iom_queue.take().unwrap();
@ -249,12 +249,12 @@ impl<T: Contribution> StateMachine<T> {
#[must_use]
pub(super) fn set_validator(
&mut self,
local_uid: Uid,
local_nid: N,
local_sk: SecretKey,
peers: &Peers<T>,
peers: &Peers<C, N>,
cfg: &Config,
_step_queue: &SegQueue<Step<T>>,
) -> Result<SegQueue<InputOrMessage<T>>, Error> {
_step_queue: &SegQueue<Step<C, N>>,
) -> Result<SegQueue<InputOrMessage<C, N>>, Error> {
let iom_queue_ret;
self.state = match self.state {
State::KeyGen {
@ -273,13 +273,13 @@ impl<T: Contribution> StateMachine<T> {
assert!(peers.count_validators() >= cfg.keygen_peer_count);
let mut node_ids: BTreeMap<Uid, PublicKey> = peers
let mut node_ids: BTreeMap<N, PublicKey> = peers
.validators()
.map(|p| (p.uid().cloned().unwrap(), p.public_key().cloned().unwrap()))
.map(|p| (p.node_id().cloned().unwrap(), p.public_key().cloned().unwrap()))
.collect();
node_ids.insert(local_uid, local_sk.public_key());
node_ids.insert(local_nid.clone(), local_sk.public_key());
let netinfo = NetworkInfo::new(local_uid, sk_share, pk_set, local_sk, node_ids);
let netinfo = NetworkInfo::new(local_nid, sk_share, pk_set, local_sk, node_ids);
let dhb = DynamicHoneyBadger::builder()
.era(cfg.start_epoch)
@ -331,7 +331,7 @@ impl<T: Contribution> StateMachine<T> {
/// Sets state to `DeterminingNetworkState` if `Disconnected`, otherwise does
/// nothing.
pub(super) fn update_peer_connection_added(&mut self, _peers: &Peers<T>) {
pub(super) fn update_peer_connection_added(&mut self, _peers: &Peers<C, N>) {
self.state = match self.state {
State::Disconnected {} => {
info!("Setting state: `DeterminingNetworkState`.");
@ -347,7 +347,7 @@ impl<T: Contribution> StateMachine<T> {
}
/// Sets state to `Disconnected` if peer count is zero, otherwise does nothing.
pub(super) fn update_peer_connection_dropped(&mut self, peers: &Peers<T>) {
pub(super) fn update_peer_connection_dropped(&mut self, peers: &Peers<C, N>) {
self.state = match self.state {
State::DeterminingNetworkState { .. } => {
if peers.count_total() == 0 {
@ -385,14 +385,14 @@ impl<T: Contribution> StateMachine<T> {
}
/// Returns the network state, if possible.
pub(super) fn network_state(&self, peers: &Peers<T>) -> NetworkState {
pub(super) fn network_state(&self, peers: &Peers<C, N>) -> NetworkState<N> {
use super::key_gen::State as KeyGenState;
let peer_infos = peers
.peers()
.filter_map(|peer| {
peer.pub_info()
.map(|(&uid, &in_addr, &pk)| NetworkNodeInfo { uid, in_addr, pk })
.map(|(nid, &in_addr, &pk)| NetworkNodeInfo { nid: nid.clone(), in_addr, pk })
})
.collect::<Vec<_>>();
match self.state {
@ -416,7 +416,7 @@ impl<T: Contribution> StateMachine<T> {
}
/// Returns a reference to the internal HB instance.
pub fn dhb(&self) -> Option<&DynamicHoneyBadger<T, Uid>> {
pub fn dhb(&self) -> Option<&DynamicHoneyBadger<C, N>> {
match self.state {
State::Observer { ref dhb, .. } => dhb.as_ref(),
State::Validator { ref dhb, .. } => dhb.as_ref(),
@ -425,7 +425,7 @@ impl<T: Contribution> StateMachine<T> {
}
/// Returns a reference to the internal HB instance.
pub(super) fn dhb_mut(&mut self) -> Option<&mut DynamicHoneyBadger<T, Uid>> {
pub(super) fn dhb_mut(&mut self) -> Option<&mut DynamicHoneyBadger<C, N>> {
match self.state {
State::Observer { ref mut dhb, .. } => dhb.as_mut(),
State::Validator { ref mut dhb, .. } => dhb.as_mut(),
@ -434,7 +434,7 @@ impl<T: Contribution> StateMachine<T> {
}
/// Returns a reference to the key generation instance.
pub(super) fn key_gen(&self) -> Option<&key_gen::Machine> {
pub(super) fn key_gen(&self) -> Option<&key_gen::Machine<N>> {
match self.state {
State::KeyGen { ref key_gen, .. } => Some(key_gen),
_ => None,
@ -442,7 +442,7 @@ impl<T: Contribution> StateMachine<T> {
}
/// Returns a reference to the key generation instance.
pub(super) fn key_gen_mut(&mut self) -> Option<&mut key_gen::Machine> {
pub(super) fn key_gen_mut(&mut self) -> Option<&mut key_gen::Machine<N>> {
match self.state {
State::KeyGen {
ref mut key_gen, ..
@ -456,8 +456,8 @@ impl<T: Contribution> StateMachine<T> {
/// Cannot be called while disconnected or connection-pending.
pub(super) fn handle_iom(
&mut self,
iom: InputOrMessage<T>,
) -> Option<Result<Step<T>, DhbError>> {
iom: InputOrMessage<C, N>,
) -> Option<Result<Step<C, N>, DhbError>> {
match self.state {
State::Observer { ref mut dhb, .. } | State::Validator { ref mut dhb, .. } => {
trace!("State::handle_iom: Handling: {:?}", iom);
@ -466,7 +466,7 @@ impl<T: Contribution> StateMachine<T> {
match iom {
InputOrMessage::Contribution(contrib) => dhb.propose(contrib),
InputOrMessage::Change(change) => dhb.vote_for(change),
InputOrMessage::Message(src_uid, msg) => dhb.handle_message(&src_uid, msg),
InputOrMessage::Message(src_nid, msg) => dhb.handle_message(&src_nid, msg),
}
});

View File

@ -68,7 +68,7 @@ use hbbft::{
Change as DhbChange, DynamicHoneyBadger, JoinPlan, Message as DhbMessage,
},
sync_key_gen::{Ack, Part},
Contribution as HbbftContribution, DaStep as MessagingStep,
Contribution as HbbftContribution, DaStep as MessagingStep, NodeIdT,
};
use rand::{Rand, Rng};
use serde::{de::DeserializeOwned, Serialize};
@ -97,27 +97,27 @@ pub use hbbft::dynamic_honey_badger::Batch;
/// Transmit half of the wire message channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound):
type WireTx<T> = mpsc::UnboundedSender<WireMessage<T>>;
type WireTx<C, N> = mpsc::UnboundedSender<WireMessage<C, N>>;
/// Receive half of the wire message channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound):
type WireRx<T> = mpsc::UnboundedReceiver<WireMessage<T>>;
type WireRx<C, N> = mpsc::UnboundedReceiver<WireMessage<C, N>>;
/// Transmit half of the internal message channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound):
type InternalTx<T> = mpsc::UnboundedSender<InternalMessage<T>>;
type InternalTx<C, N> = mpsc::UnboundedSender<InternalMessage<C, N>>;
/// Receive half of the internal message channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound):
type InternalRx<T> = mpsc::UnboundedReceiver<InternalMessage<T>>;
type InternalRx<C, N> = mpsc::UnboundedReceiver<InternalMessage<C, N>>;
/// Transmit half of the batch output channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound):
type BatchTx<T> = mpsc::UnboundedSender<Batch<T, Uid>>;
type BatchTx<C, N> = mpsc::UnboundedSender<Batch<C, N>>;
/// Receive half of the batch output channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound):
pub type BatchRx<T> = mpsc::UnboundedReceiver<Batch<T, Uid>>;
pub type BatchRx<C, N> = mpsc::UnboundedReceiver<Batch<C, N>>;
/// Transmit half of the epoch number output channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound):
@ -137,6 +137,10 @@ impl<C> Contribution for C where
{
}
pub trait NodeId: NodeIdT + Serialize + DeserializeOwned + Rand + 'static {}
impl<N> NodeId for N where N: NodeIdT + Serialize + DeserializeOwned + Rand + 'static {}
/// A unique identifier.
#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
pub struct Uid(pub(crate) Uuid);
@ -166,9 +170,9 @@ impl fmt::Debug for Uid {
}
}
type Message = DhbMessage<Uid>;
type Step<T> = MessagingStep<DynamicHoneyBadger<T, Uid>>;
type Change = DhbChange<Uid>;
type Message<N> = DhbMessage<N>;
type Step<C, N> = MessagingStep<DynamicHoneyBadger<C, N>>;
type Change<N> = DhbChange<N>;
/// A peer's incoming (listening) address.
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
@ -206,22 +210,22 @@ impl fmt::Display for OutAddr {
/// Nodes of the network.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkNodeInfo {
pub(crate) uid: Uid,
pub struct NetworkNodeInfo<N> {
pub(crate) nid: N,
pub(crate) in_addr: InAddr,
pub(crate) pk: PublicKey,
}
type ActiveNetworkInfo = (Vec<NetworkNodeInfo>, PublicKeySet, BTreeMap<Uid, PublicKey>);
type ActiveNetworkInfo<N> = (Vec<NetworkNodeInfo<N>>, PublicKeySet, BTreeMap<N, PublicKey>);
/// The current state of the network.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NetworkState {
pub enum NetworkState<N: Ord + Rand> {
None,
Unknown(Vec<NetworkNodeInfo>),
AwaitingMorePeersForKeyGeneration(Vec<NetworkNodeInfo>),
GeneratingKeys(Vec<NetworkNodeInfo>, BTreeMap<Uid, PublicKey>),
Active(ActiveNetworkInfo),
Unknown(Vec<NetworkNodeInfo<N>>),
AwaitingMorePeersForKeyGeneration(Vec<NetworkNodeInfo<N>>),
GeneratingKeys(Vec<NetworkNodeInfo<N>>, BTreeMap<N, PublicKey>),
Active(ActiveNetworkInfo<N>),
}
/// Messages sent over the network between nodes.
@ -229,12 +233,12 @@ pub enum NetworkState {
/// [`Message`](enum.WireMessageKind.html#variant.Message) variants are among
/// those verified.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum WireMessageKind<T> {
HelloFromValidator(Uid, InAddr, PublicKey, NetworkState),
HelloRequestChangeAdd(Uid, InAddr, PublicKey),
WelcomeReceivedChangeAdd(Uid, PublicKey, NetworkState),
pub enum WireMessageKind<C, N: Ord + Rand> {
HelloFromValidator(N, InAddr, PublicKey, NetworkState<N>),
HelloRequestChangeAdd(N, InAddr, PublicKey),
WelcomeReceivedChangeAdd(N, PublicKey, NetworkState<N>),
RequestNetworkState,
NetworkState(NetworkState),
NetworkState(NetworkState<N>),
Goodbye,
#[serde(with = "serde_bytes")]
// TODO(c0gent): Remove.
@ -243,89 +247,89 @@ pub enum WireMessageKind<T> {
///
/// All received messages are verified against the senders public key
/// using an attached signature.
Message(Uid, Message),
Message(N, Message<N>),
// TODO(c0gent): Remove.
Transaction(Uid, T),
Transaction(N, C),
/// Messages used during synchronous key generation.
KeyGen(key_gen::InstanceId, key_gen::Message),
JoinPlan(JoinPlan<Uid>),
JoinPlan(JoinPlan<N>),
}
/// Messages sent over the network between nodes.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WireMessage<T> {
kind: WireMessageKind<T>,
pub struct WireMessage<C, N: Ord + Rand> {
kind: WireMessageKind<C, N>,
}
impl<T: Contribution> WireMessage<T> {
impl<C: Contribution, N: NodeId> WireMessage<C, N> {
pub fn hello_from_validator(
src_uid: Uid,
src_uid: N,
in_addr: InAddr,
pk: PublicKey,
net_state: NetworkState,
) -> WireMessage<T> {
net_state: NetworkState<N>,
) -> WireMessage<C, N> {
WireMessageKind::HelloFromValidator(src_uid, in_addr, pk, net_state).into()
}
/// Returns a `HelloRequestChangeAdd` variant.
pub fn hello_request_change_add(
src_uid: Uid,
src_uid: N,
in_addr: InAddr,
pk: PublicKey,
) -> WireMessage<T> {
) -> WireMessage<C, N> {
WireMessageKind::HelloRequestChangeAdd(src_uid, in_addr, pk).into()
}
/// Returns a `WelcomeReceivedChangeAdd` variant.
pub fn welcome_received_change_add(
src_uid: Uid,
src_uid: N,
pk: PublicKey,
net_state: NetworkState,
) -> WireMessage<T> {
net_state: NetworkState<N>,
) -> WireMessage<C, N> {
WireMessageKind::WelcomeReceivedChangeAdd(src_uid, pk, net_state).into()
}
/// Returns an `Input` variant.
pub fn transaction(src_uid: Uid, txn: T) -> WireMessage<T> {
pub fn transaction(src_uid: N, txn: C) -> WireMessage<C, N> {
WireMessageKind::Transaction(src_uid, txn).into()
}
/// Returns a `Message` variant.
pub fn message(src_uid: Uid, msg: Message) -> WireMessage<T> {
pub fn message(src_uid: N, msg: Message<N>) -> WireMessage<C, N> {
WireMessageKind::Message(src_uid, msg).into()
}
pub fn key_gen(instance_id: key_gen::InstanceId, msg: key_gen::Message) -> WireMessage<T> {
pub fn key_gen(instance_id: key_gen::InstanceId, msg: key_gen::Message) -> WireMessage<C, N> {
WireMessageKind::KeyGen(instance_id, msg).into()
}
pub fn key_gen_part(instance_id: key_gen::InstanceId, part: Part) -> WireMessage<T> {
pub fn key_gen_part(instance_id: key_gen::InstanceId, part: Part) -> WireMessage<C, N> {
// WireMessageKind::KeyGenPart(part).into()
WireMessage::key_gen(instance_id, key_gen::Message::part(part))
}
pub fn key_gen_ack(instance_id: key_gen::InstanceId, ack: Ack) -> WireMessage<T> {
pub fn key_gen_ack(instance_id: key_gen::InstanceId, ack: Ack) -> WireMessage<C, N> {
// WireMessageKind::KeyGenAck(outcome).into()
WireMessage::key_gen(instance_id, key_gen::Message::ack(ack))
}
pub fn join_plan(jp: JoinPlan<Uid>) -> WireMessage<T> {
pub fn join_plan(jp: JoinPlan<N>) -> WireMessage<C, N> {
WireMessageKind::JoinPlan(jp).into()
}
/// Returns the wire message kind.
pub fn kind(&self) -> &WireMessageKind<T> {
pub fn kind(&self) -> &WireMessageKind<C, N> {
&self.kind
}
/// Consumes this `WireMessage` into its kind.
pub fn into_kind(self) -> WireMessageKind<T> {
pub fn into_kind(self) -> WireMessageKind<C, N> {
self.kind
}
}
impl<T: Contribution> From<WireMessageKind<T>> for WireMessage<T> {
fn from(kind: WireMessageKind<T>) -> WireMessage<T> {
impl<C: Contribution, N: NodeId> From<WireMessageKind<C, N>> for WireMessage<C, N> {
fn from(kind: WireMessageKind<C, N>) -> WireMessage<C, N> {
WireMessage { kind }
}
}
@ -338,20 +342,22 @@ pub struct SignedWireMessage {
}
/// A stream/sink of `WireMessage`s connected to a socket.
pub struct WireMessages<T: Contribution> {
pub struct WireMessages<C: Contribution, N: NodeId> {
framed: Framed<TcpStream, LengthDelimitedCodec>,
local_sk: SecretKey,
peer_pk: Option<PublicKey>,
_t: PhantomData<T>,
_c: PhantomData<C>,
_n: PhantomData<N>,
}
impl<T: Contribution> WireMessages<T> {
pub fn new(socket: TcpStream, local_sk: SecretKey) -> WireMessages<T> {
impl<C: Contribution, N: NodeId + DeserializeOwned> WireMessages<C, N> {
pub fn new(socket: TcpStream, local_sk: SecretKey) -> WireMessages<C, N> {
WireMessages {
framed: Framed::new(socket, LengthDelimitedCodec::new()),
local_sk,
peer_pk: None,
_t: PhantomData,
_c: PhantomData,
_n: PhantomData,
}
}
@ -364,15 +370,15 @@ impl<T: Contribution> WireMessages<T> {
self.framed.get_ref()
}
pub fn send_msg(&mut self, msg: WireMessage<T>) -> Result<(), Error> {
pub fn send_msg(&mut self, msg: WireMessage<C, N>) -> Result<(), Error> {
self.start_send(msg)?;
let _ = self.poll_complete()?;
Ok(())
}
}
impl<T: Contribution> Stream for WireMessages<T> {
type Item = WireMessage<T>;
impl<C: Contribution, N: NodeId + DeserializeOwned> Stream for WireMessages<C, N> {
type Item = WireMessage<C, N>;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -380,7 +386,7 @@ impl<T: Contribution> Stream for WireMessages<T> {
Some(frame) => {
let s_msg: SignedWireMessage =
bincode::deserialize(&frame.freeze()).map_err(Error::Serde)?;
let msg: WireMessage<T> =
let msg: WireMessage<C, N> =
bincode::deserialize(&s_msg.message).map_err(Error::Serde)?;
// Verify signature for certain variants.
@ -403,8 +409,8 @@ impl<T: Contribution> Stream for WireMessages<T> {
}
}
impl<T: Contribution> Sink for WireMessages<T> {
type SinkItem = WireMessage<T>;
impl<C: Contribution, N: NodeId + Serialize> Sink for WireMessages<C, N> {
type SinkItem = WireMessage<C, N>;
type SinkError = Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
@ -438,11 +444,11 @@ impl<T: Contribution> Sink for WireMessages<T> {
/// A message between internal threads/tasks.
#[derive(Clone, Debug)]
pub enum InternalMessageKind<T: Contribution> {
Wire(WireMessage<T>),
HbMessage(Message),
HbContribution(T),
HbChange(Change),
pub enum InternalMessageKind<C: Contribution, N: NodeId> {
Wire(WireMessage<C, N>),
HbMessage(Message<N>),
HbContribution(C),
HbChange(Change<N>),
PeerDisconnect,
NewIncomingConnection(InAddr, PublicKey, bool),
NewOutgoingConnection,
@ -451,18 +457,18 @@ pub enum InternalMessageKind<T: Contribution> {
/// A message between internal threads/tasks.
#[derive(Clone, Debug)]
pub struct InternalMessage<T: Contribution> {
src_uid: Option<Uid>,
pub struct InternalMessage<C: Contribution, N: NodeId> {
src_uid: Option<N>,
src_addr: OutAddr,
kind: InternalMessageKind<T>,
kind: InternalMessageKind<C, N>,
}
impl<T: Contribution> InternalMessage<T> {
impl<C: Contribution, N: NodeId> InternalMessage<C, N> {
pub fn new(
src_uid: Option<Uid>,
src_uid: Option<N>,
src_addr: OutAddr,
kind: InternalMessageKind<T>,
) -> InternalMessage<T> {
kind: InternalMessageKind<C, N>,
) -> InternalMessage<C, N> {
InternalMessage {
src_uid,
src_addr,
@ -471,23 +477,23 @@ impl<T: Contribution> InternalMessage<T> {
}
/// Returns a new `InternalMessage` without a uid.
pub fn new_without_uid(src_addr: OutAddr, kind: InternalMessageKind<T>) -> InternalMessage<T> {
pub fn new_without_uid(src_addr: OutAddr, kind: InternalMessageKind<C, N>) -> InternalMessage<C, N> {
InternalMessage::new(None, src_addr, kind)
}
pub fn wire(
src_uid: Option<Uid>,
src_uid: Option<N>,
src_addr: OutAddr,
wire_message: WireMessage<T>,
) -> InternalMessage<T> {
wire_message: WireMessage<C, N>,
) -> InternalMessage<C, N> {
InternalMessage::new(src_uid, src_addr, InternalMessageKind::Wire(wire_message))
}
pub fn hb_message(src_uid: Uid, src_addr: OutAddr, msg: Message) -> InternalMessage<T> {
pub fn hb_message(src_uid: N, src_addr: OutAddr, msg: Message<N>) -> InternalMessage<C, N> {
InternalMessage::new(Some(src_uid), src_addr, InternalMessageKind::HbMessage(msg))
}
pub fn hb_contribution(src_uid: Uid, src_addr: OutAddr, contrib: T) -> InternalMessage<T> {
pub fn hb_contribution(src_uid: N, src_addr: OutAddr, contrib: C) -> InternalMessage<C, N> {
InternalMessage::new(
Some(src_uid),
src_addr,
@ -495,7 +501,7 @@ impl<T: Contribution> InternalMessage<T> {
)
}
pub fn hb_vote(src_uid: Uid, src_addr: OutAddr, change: Change) -> InternalMessage<T> {
pub fn hb_vote(src_uid: N, src_addr: OutAddr, change: Change<N>) -> InternalMessage<C, N> {
InternalMessage::new(
Some(src_uid),
src_addr,
@ -503,17 +509,17 @@ impl<T: Contribution> InternalMessage<T> {
)
}
pub fn peer_disconnect(src_uid: Uid, src_addr: OutAddr) -> InternalMessage<T> {
pub fn peer_disconnect(src_uid: N, src_addr: OutAddr) -> InternalMessage<C, N> {
InternalMessage::new(Some(src_uid), src_addr, InternalMessageKind::PeerDisconnect)
}
pub fn new_incoming_connection(
src_uid: Uid,
src_uid: N,
src_addr: OutAddr,
src_in_addr: InAddr,
src_pk: PublicKey,
request_change_add: bool,
) -> InternalMessage<T> {
) -> InternalMessage<C, N> {
InternalMessage::new(
Some(src_uid),
src_addr,
@ -522,10 +528,10 @@ impl<T: Contribution> InternalMessage<T> {
}
pub fn new_key_gen_instance(
src_uid: Uid,
src_uid: N,
src_addr: OutAddr,
tx: mpsc::UnboundedSender<key_gen::Message>,
) -> InternalMessage<T> {
) -> InternalMessage<C, N> {
InternalMessage::new(
Some(src_uid),
src_addr,
@ -533,12 +539,12 @@ impl<T: Contribution> InternalMessage<T> {
)
}
pub fn new_outgoing_connection(src_addr: OutAddr) -> InternalMessage<T> {
pub fn new_outgoing_connection(src_addr: OutAddr) -> InternalMessage<C, N> {
InternalMessage::new_without_uid(src_addr, InternalMessageKind::NewOutgoingConnection)
}
/// Returns the source unique identifier this message was received in.
pub fn src_uid(&self) -> Option<&Uid> {
pub fn src_uid(&self) -> Option<&N> {
self.src_uid.as_ref()
}
@ -548,12 +554,12 @@ impl<T: Contribution> InternalMessage<T> {
}
/// Returns the internal message kind.
pub fn kind(&self) -> &InternalMessageKind<T> {
pub fn kind(&self) -> &InternalMessageKind<C, N> {
&self.kind
}
/// Consumes this `InternalMessage` into its parts.
pub fn into_parts(self) -> (Option<Uid>, OutAddr, InternalMessageKind<T>) {
pub fn into_parts(self) -> (Option<N>, OutAddr, InternalMessageKind<C, N>) {
(self.src_uid, self.src_addr, self.kind)
}
}

View File

@ -5,11 +5,13 @@
use crate::hydrabadger::{Error, Hydrabadger};
use crate::{
Contribution, InAddr, InternalMessage, OutAddr, Uid, WireMessage, WireMessageKind,
WireMessages, WireRx, WireTx,
WireMessages, WireRx, WireTx, NodeId,
};
use futures::sync::mpsc;
use hbbft::crypto::PublicKey;
use hbbft::dynamic_honey_badger::Input as HbInput;
use hbbft::{
crypto::PublicKey,
dynamic_honey_badger::Input as HbInput,
};
use serde::{Deserialize, Serialize};
use std::{
borrow::Borrow,
@ -21,44 +23,44 @@ use std::{
use tokio::prelude::*;
/// The state for each connected client.
pub struct PeerHandler<T: Contribution> {
// Peer uid.
uid: Option<Uid>,
pub struct PeerHandler<C: Contribution, N: NodeId> {
// Peer nid.
nid: Option<N>,
// The incoming stream of messages:
wire_msgs: WireMessages<T>,
wire_msgs: WireMessages<C, N>,
/// Handle to the shared message state.
hdb: Hydrabadger<T>,
hdb: Hydrabadger<C, N>,
// TODO: Consider adding back a separate clone of `peer_internal_tx`. Is
// there any difference if capacity isn't an issue? -- doubtful
/// Receive half of the message channel.
rx: WireRx<T>,
rx: WireRx<C, N>,
/// Peer socket address.
out_addr: OutAddr,
}
impl<T: Contribution> PeerHandler<T> {
impl<C: Contribution, N: NodeId> PeerHandler<C, N> {
/// Create a new instance of `Peer`.
pub fn new(
pub_info: Option<(Uid, InAddr, PublicKey)>,
hdb: Hydrabadger<T>,
mut wire_msgs: WireMessages<T>,
) -> PeerHandler<T> {
pub_info: Option<(N, InAddr, PublicKey)>,
hdb: Hydrabadger<C, N>,
mut wire_msgs: WireMessages<C, N>,
) -> PeerHandler<C, N> {
// Get the client socket address
let out_addr = OutAddr(wire_msgs.socket().peer_addr().unwrap());
// Create a channel for this peer
let (tx, rx) = mpsc::unbounded();
pub_info.as_ref().map(|(uid, _, _)| *uid);
pub_info.as_ref().map(|(nid, _, _)| nid.clone());
let uid = match pub_info {
Some((uid, _, pk)) => {
let nid = match pub_info {
Some((ref nid, _, pk)) => {
wire_msgs.set_peer_public_key(pk);
Some(uid)
Some(nid.clone())
}
None => None,
};
@ -67,7 +69,7 @@ impl<T: Contribution> PeerHandler<T> {
hdb.peers_mut().add(out_addr, tx, pub_info);
PeerHandler {
uid,
nid,
wire_msgs,
hdb,
rx,
@ -75,7 +77,7 @@ impl<T: Contribution> PeerHandler<T> {
}
}
pub(crate) fn hdb(&self) -> &Hydrabadger<T> {
pub(crate) fn hdb(&self) -> &Hydrabadger<C, N> {
&self.hdb
}
@ -85,7 +87,7 @@ impl<T: Contribution> PeerHandler<T> {
}
/// A future representing the client connection.
impl<T: Contribution> Future for PeerHandler<T> {
impl<C: Contribution, N: NodeId> Future for PeerHandler<C, N> {
type Item = ();
type Error = Error;
@ -120,61 +122,61 @@ impl<T: Contribution> Future for PeerHandler<T> {
if let Some(msg) = message {
match msg.into_kind() {
WireMessageKind::HelloRequestChangeAdd(src_uid, _in_addr, _pub_key) => {
WireMessageKind::HelloRequestChangeAdd(src_nid, _in_addr, _pub_key) => {
error!(
"Duplicate `WireMessage::HelloRequestChangeAdd` \
received from '{}'",
src_uid
received from '{:?}'",
src_nid
);
}
WireMessageKind::WelcomeReceivedChangeAdd(src_uid, pk, net_state) => {
self.uid = Some(src_uid);
WireMessageKind::WelcomeReceivedChangeAdd(src_nid, pk, net_state) => {
self.nid = Some(src_nid.clone());
self.wire_msgs.set_peer_public_key(pk);
self.hdb.send_internal(InternalMessage::wire(
Some(src_uid),
Some(src_nid.clone()),
self.out_addr,
WireMessage::welcome_received_change_add(src_uid, pk, net_state),
WireMessage::welcome_received_change_add(src_nid.clone(), pk, net_state),
));
}
WireMessageKind::HelloFromValidator(src_uid, in_addr, pk, net_state) => {
self.uid = Some(src_uid);
WireMessageKind::HelloFromValidator(src_nid, in_addr, pk, net_state) => {
self.nid = Some(src_nid.clone());
self.wire_msgs.set_peer_public_key(pk);
self.hdb.send_internal(InternalMessage::wire(
Some(src_uid),
Some(src_nid.clone()),
self.out_addr,
WireMessage::hello_from_validator(src_uid, in_addr, pk, net_state),
WireMessage::hello_from_validator(src_nid.clone(), in_addr, pk, net_state),
));
}
WireMessageKind::Message(src_uid, msg) => {
if let Some(peer_uid) = self.uid.as_ref() {
debug_assert_eq!(src_uid, *peer_uid);
WireMessageKind::Message(src_nid, msg) => {
if let Some(peer_nid) = self.nid.as_ref() {
debug_assert_eq!(src_nid, *peer_nid);
}
self.hdb.send_internal(InternalMessage::hb_message(
src_uid,
src_nid,
self.out_addr,
msg,
))
}
WireMessageKind::Transaction(src_uid, txn) => {
if let Some(peer_uid) = self.uid.as_ref() {
debug_assert_eq!(src_uid, *peer_uid);
WireMessageKind::Transaction(src_nid, txn) => {
if let Some(peer_nid) = self.nid.as_ref() {
debug_assert_eq!(src_nid, *peer_nid);
}
self.hdb.send_internal(InternalMessage::hb_contribution(
src_uid,
src_nid,
self.out_addr,
txn,
))
}
kind => self.hdb.send_internal(InternalMessage::wire(
self.uid,
self.nid.clone(),
self.out_addr,
kind.into(),
)),
}
} else {
info!("Peer ({}: '{:?}') disconnected.", self.out_addr, self.uid);
info!("Peer ({}: '{:?}') disconnected.", self.out_addr, self.nid);
return Ok(Async::Ready(()));
}
}
@ -183,45 +185,45 @@ impl<T: Contribution> Future for PeerHandler<T> {
}
}
impl<T: Contribution> Drop for PeerHandler<T> {
impl<C: Contribution, N: NodeId> Drop for PeerHandler<C, N> {
fn drop(&mut self) {
debug!(
"Removing peer ({}: '{}') from the list of peers.",
"Removing peer ({}: '{:?}') from the list of peers.",
self.out_addr,
self.uid.unwrap()
self.nid.clone().unwrap()
);
// Remove peer transmitter from the lists:
self.hdb.peers_mut().remove(&self.out_addr);
if let Some(uid) = self.uid {
if let Some(nid) = self.nid.clone() {
debug!(
"Sending peer ({}: '{}') disconnect internal message.",
"Sending peer ({}: '{:?}') disconnect internal message.",
self.out_addr,
self.uid.unwrap()
self.nid.clone().unwrap()
);
self.hdb
.send_internal(InternalMessage::peer_disconnect(uid, self.out_addr));
.send_internal(InternalMessage::peer_disconnect(nid, self.out_addr));
}
}
}
#[derive(Clone, Debug)]
#[allow(dead_code)]
enum State {
enum State<N> {
Handshaking,
PendingJoinInfo {
uid: Uid,
nid: N,
in_addr: InAddr,
pk: PublicKey,
},
EstablishedObserver {
uid: Uid,
nid: N,
in_addr: InAddr,
pk: PublicKey,
},
EstablishedValidator {
uid: Uid,
nid: N,
in_addr: InAddr,
pk: PublicKey,
},
@ -229,22 +231,22 @@ enum State {
/// Nodes of the network.
#[derive(Clone, Debug)]
pub struct Peer<T: Contribution> {
pub struct Peer<C: Contribution, N: NodeId> {
out_addr: OutAddr,
tx: WireTx<T>,
state: State,
tx: WireTx<C, N>,
state: State<N>,
}
impl<T: Contribution> Peer<T> {
impl<C: Contribution, N: NodeId> Peer<C, N> {
/// Returns a new `Peer`
fn new(
out_addr: OutAddr,
tx: WireTx<T>,
pub_info: Option<(Uid, InAddr, PublicKey)>,
) -> Peer<T> {
tx: WireTx<C, N>,
pub_info: Option<(N, InAddr, PublicKey)>,
) -> Peer<C, N> {
let state = match pub_info {
None => State::Handshaking,
Some((uid, in_addr, pk)) => State::EstablishedValidator { uid, in_addr, pk },
Some((nid, in_addr, pk)) => State::EstablishedValidator { nid, in_addr, pk },
};
Peer {
@ -255,10 +257,10 @@ impl<T: Contribution> Peer<T> {
}
/// Sets a peer state to `State::PendingJoinInfo` and stores public info.
fn set_pending(&mut self, pub_info: (Uid, InAddr, PublicKey)) {
fn set_pending(&mut self, pub_info: (N, InAddr, PublicKey)) {
self.state = match self.state {
State::Handshaking => State::PendingJoinInfo {
uid: pub_info.0,
nid: pub_info.0,
in_addr: pub_info.1,
pk: pub_info.2,
},
@ -272,8 +274,8 @@ impl<T: Contribution> Peer<T> {
/// Sets a peer state to `State::EstablishedObserver` and stores public info.
fn establish_observer(&mut self) {
self.state = match self.state {
State::PendingJoinInfo { uid, in_addr, pk } => {
State::EstablishedObserver { uid, in_addr, pk }
State::PendingJoinInfo { ref nid, in_addr, pk } => {
State::EstablishedObserver { nid: nid.clone(), in_addr, pk }
}
_ => panic!(
"Peer::establish_observer: Can only establish observer when \
@ -283,11 +285,11 @@ impl<T: Contribution> Peer<T> {
}
/// Sets a peer state to `State::EstablishedValidator` and stores public info.
fn establish_validator(&mut self, pub_info: Option<(Uid, InAddr, PublicKey)>) {
fn establish_validator(&mut self, pub_info: Option<(N, InAddr, PublicKey)>) {
self.state = match self.state {
State::Handshaking => match pub_info {
Some(pi) => State::EstablishedValidator {
uid: pi.0,
nid: pi.0,
in_addr: pi.1,
pk: pi.2,
},
@ -298,14 +300,14 @@ impl<T: Contribution> Peer<T> {
);
}
},
State::EstablishedObserver { uid, in_addr, pk } => {
State::EstablishedObserver { ref nid, in_addr, pk } => {
if pub_info.is_some() {
panic!(
"Peer::establish_validator: `pub_info` must be `None` \
when upgrading an observer node."
);
}
State::EstablishedValidator { uid, in_addr, pk }
State::EstablishedValidator { nid: nid.clone(), in_addr, pk }
}
_ => panic!(
"Peer::establish_validator: Can only establish validator when \
@ -315,12 +317,12 @@ impl<T: Contribution> Peer<T> {
}
/// Returns the peer's unique identifier.
pub fn uid(&self) -> Option<&Uid> {
pub fn node_id(&self) -> Option<&N> {
match self.state {
State::Handshaking => None,
State::PendingJoinInfo { ref uid, .. } => Some(uid),
State::EstablishedObserver { ref uid, .. } => Some(uid),
State::EstablishedValidator { ref uid, .. } => Some(uid),
State::PendingJoinInfo { ref nid, .. } => Some(nid),
State::EstablishedObserver { ref nid, .. } => Some(nid),
State::EstablishedValidator { ref nid, .. } => Some(nid),
}
}
@ -350,24 +352,24 @@ impl<T: Contribution> Peer<T> {
}
/// Returns the peer's public info if established.
pub fn pub_info(&self) -> Option<(&Uid, &InAddr, &PublicKey)> {
pub fn pub_info(&self) -> Option<(&N, &InAddr, &PublicKey)> {
match self.state {
State::Handshaking => None,
State::EstablishedObserver {
ref uid,
ref nid,
ref in_addr,
ref pk,
} => Some((uid, in_addr, pk)),
} => Some((nid, in_addr, pk)),
State::PendingJoinInfo {
ref uid,
ref nid,
ref in_addr,
ref pk,
} => Some((uid, in_addr, pk)),
} => Some((nid, in_addr, pk)),
State::EstablishedValidator {
ref uid,
ref nid,
ref in_addr,
ref pk,
} => Some((uid, in_addr, pk)),
} => Some((nid, in_addr, pk)),
}
}
@ -396,7 +398,7 @@ impl<T: Contribution> Peer<T> {
}
/// Returns the peer's wire transmitter.
pub fn tx(&self) -> &WireTx<T> {
pub fn tx(&self) -> &WireTx<C, N> {
&self.tx
}
}
@ -406,15 +408,15 @@ impl<T: Contribution> Peer<T> {
// TODO: Keep a separate `HashSet` of validator `OutAddrs` to avoid having to
// iterate through entire list.
#[derive(Debug)]
pub struct Peers<T: Contribution> {
peers: HashMap<OutAddr, Peer<T>>,
out_addrs: HashMap<Uid, OutAddr>,
pub struct Peers<C: Contribution, N: NodeId> {
peers: HashMap<OutAddr, Peer<C, N>>,
out_addrs: HashMap<N, OutAddr>,
local_addr: InAddr,
}
impl<T: Contribution> Peers<T> {
impl<C: Contribution, N: NodeId> Peers<C, N> {
/// Returns a new empty list of peers.
pub(crate) fn new(local_addr: InAddr) -> Peers<T> {
pub(crate) fn new(local_addr: InAddr) -> Peers<C, N> {
Peers {
peers: HashMap::with_capacity(64),
out_addrs: HashMap::with_capacity(64),
@ -426,12 +428,12 @@ impl<T: Contribution> Peers<T> {
pub(crate) fn add(
&mut self,
out_addr: OutAddr,
tx: WireTx<T>,
pub_info: Option<(Uid, InAddr, PublicKey)>,
tx: WireTx<C, N>,
pub_info: Option<(N, InAddr, PublicKey)>,
) {
let peer = Peer::new(out_addr, tx, pub_info);
if let State::EstablishedValidator { uid, .. } = peer.state {
self.out_addrs.insert(uid, peer.out_addr);
if let State::EstablishedValidator { ref nid, .. } = peer.state {
self.out_addrs.insert(nid.clone(), peer.out_addr);
}
self.peers.insert(peer.out_addr, peer);
}
@ -448,14 +450,14 @@ impl<T: Contribution> Peers<T> {
pub(crate) fn set_pending<O: Borrow<OutAddr>>(
&mut self,
out_addr: O,
pub_info: (Uid, InAddr, PublicKey),
pub_info: (N, InAddr, PublicKey),
) -> bool {
let peer = self.peers.get_mut(out_addr.borrow()).expect(&format!(
"Peers::set_pending: \
No peer found with outgoing address: {}",
out_addr.borrow()
));
match self.out_addrs.insert(pub_info.0, *out_addr.borrow()) {
match self.out_addrs.insert(pub_info.0.clone(), *out_addr.borrow()) {
Some(_out_addr_pub) => {
let pi_pub = peer
.pub_info()
@ -503,14 +505,14 @@ impl<T: Contribution> Peers<T> {
pub(crate) fn establish_validator<O: Borrow<OutAddr>>(
&mut self,
out_addr: O,
pub_info: (Uid, InAddr, PublicKey),
pub_info: (N, InAddr, PublicKey),
) -> bool {
let peer = self.peers.get_mut(out_addr.borrow()).expect(&format!(
"Peers::establish_validator: \
No peer found with outgoing address: {}",
out_addr.borrow()
));
match self.out_addrs.insert(pub_info.0, *out_addr.borrow()) {
match self.out_addrs.insert(pub_info.0.clone(), *out_addr.borrow()) {
Some(_out_addr_pub) => {
let pi_pub = peer
.pub_info()
@ -526,7 +528,7 @@ impl<T: Contribution> Peers<T> {
false
}
pub(crate) fn wire_to_all(&self, msg: WireMessage<T>) {
pub(crate) fn wire_to_all(&self, msg: WireMessage<C, N>) {
for (_p_addr, peer) in self
.peers
.iter()
@ -536,7 +538,7 @@ impl<T: Contribution> Peers<T> {
}
}
pub(crate) fn wire_to_validators(&self, msg: WireMessage<T>) {
pub(crate) fn wire_to_validators(&self, msg: WireMessage<C, N>) {
// for peer in peers.validators()
// .filter(|p| p.out_addr() != &OutAddr(self.hdb.addr().0)) {
// peer.tx().unbounded_send(msg.clone()).unwrap();
@ -546,27 +548,27 @@ impl<T: Contribution> Peers<T> {
self.wire_to_all(msg)
}
/// Sends a `WireMessage` to the target specified by `tar_uid`.
/// Sends a `WireMessage` to the target specified by `tar_nid`.
///
/// If the target is not an established node, the message will be returned
/// along with an incremented retry count.
pub(crate) fn wire_to(
&self,
tar_uid: Uid,
msg: WireMessage<T>,
tar_nid: N,
msg: WireMessage<C, N>,
retry_count: usize,
) -> Option<(Uid, WireMessage<T>, usize)> {
match self.get_by_uid(&tar_uid) {
) -> Option<(N, WireMessage<C, N>, usize)> {
match self.get_by_nid(&tar_nid) {
Some(p) => {
p.tx().unbounded_send(msg).unwrap();
None
}
None => {
info!(
"Node '{}' is not yet established. Queueing message for now (retry_count: {}).",
tar_uid, retry_count
"Node '{:?}' is not yet established. Queueing message for now (retry_count: {}).",
tar_nid, retry_count
);
Some((tar_uid, msg, retry_count + 1))
Some((tar_nid, msg, retry_count + 1))
}
}
}
@ -575,34 +577,34 @@ impl<T: Contribution> Peers<T> {
pub(crate) fn remove<O: Borrow<OutAddr>>(&mut self, out_addr: O) {
let peer = self.peers.remove(out_addr.borrow());
if let Some(p) = peer {
if let Some(uid) = p.uid() {
self.out_addrs.remove(&uid);
if let Some(nid) = p.node_id() {
self.out_addrs.remove(&nid);
}
}
}
pub(crate) fn get<O: Borrow<OutAddr>>(&self, out_addr: O) -> Option<&Peer<T>> {
pub(crate) fn get<O: Borrow<OutAddr>>(&self, out_addr: O) -> Option<&Peer<C, N>> {
self.peers.get(out_addr.borrow())
}
pub(crate) fn get_by_uid<U: Borrow<Uid>>(&self, uid: U) -> Option<&Peer<T>> {
pub(crate) fn get_by_nid<U: Borrow<N>>(&self, nid: U) -> Option<&Peer<C, N>> {
self.out_addrs
.get(uid.borrow())
.get(nid.borrow())
.and_then(|addr| self.get(addr))
}
/// Returns an Iterator over the list of peers.
pub(crate) fn iter(&self) -> HashMapIter<OutAddr, Peer<T>> {
pub(crate) fn iter(&self) -> HashMapIter<OutAddr, Peer<C, N>> {
self.peers.iter()
}
/// Returns an Iterator over the list of peers.
pub fn peers(&self) -> HashMapValues<OutAddr, Peer<T>> {
pub fn peers(&self) -> HashMapValues<OutAddr, Peer<C, N>> {
self.peers.values()
}
/// Returns an iterator over the list of validators.
pub fn validators(&self) -> impl Iterator<Item = &Peer<T>> {
pub fn validators(&self) -> impl Iterator<Item = &Peer<C, N>> {
self.peers.values().filter(|p| p.is_validator())
}