Update to Rust 2018.

This commit is contained in:
c0gent 2018-12-07 08:44:48 -08:00
parent e5b9c8cbbc
commit 25a126aab4
No known key found for this signature in database
GPG Key ID: 9CC25E71A743E892
8 changed files with 374 additions and 288 deletions

View File

@ -3,6 +3,7 @@ name = "hydrabadger"
version = "0.1.0"
authors = ["c0gent <nsan1129@gmail.com>"]
autobins = false
edition = "2018"
# [[bin]]
# name = "simulation"

View File

@ -4,26 +4,22 @@
//! * Do not make state changes directly in this module (use closures, etc.).
//!
use std::{
collections::HashMap,
cell::RefCell,
use super::WIRE_MESSAGE_RETRY_MAX;
use super::{Error, Hydrabadger, InputOrMessage, State, StateDsct, StateMachine};
use crate::peer::Peers;
use crate::{
key_gen, BatchTx, Contribution, InAddr, InternalMessage, InternalMessageKind, InternalRx,
NetworkState, OutAddr, Step, Uid, WireMessage, WireMessageKind,
};
use crossbeam::queue::SegQueue;
use tokio::{self, prelude::*};
use hbbft::{
crypto::PublicKey,
dynamic_honey_badger::{ChangeState, JoinPlan, Change as DhbChange},
dynamic_honey_badger::{Change as DhbChange, ChangeState, JoinPlan},
sync_key_gen::{Ack, Part},
Target,
};
use peer::Peers;
use {
Contribution, InAddr, InternalMessage, InternalMessageKind, InternalRx,
NetworkState, OutAddr, Step, Uid, WireMessage, WireMessageKind, BatchTx,
key_gen};
use super::WIRE_MESSAGE_RETRY_MAX;
use super::{Error, Hydrabadger, InputOrMessage, StateMachine, State, StateDsct};
use std::{cell::RefCell, collections::HashMap};
use tokio::{self, prelude::*};
/// Hydrabadger event (internal message) handler.
pub struct Handler<T: Contribution> {
@ -43,7 +39,11 @@ pub struct Handler<T: Contribution> {
}
impl<T: Contribution> Handler<T> {
pub(super) fn new(hdb: Hydrabadger<T>, peer_internal_rx: InternalRx<T>, batch_tx: BatchTx<T>) -> Handler<T> {
pub(super) fn new(
hdb: Hydrabadger<T>,
peer_internal_rx: InternalRx<T>,
batch_tx: BatchTx<T>,
) -> Handler<T> {
Handler {
hdb,
peer_internal_rx,
@ -69,7 +69,9 @@ impl<T: Contribution> Handler<T> {
StateDsct::KeyGen => {
// TODO: Should network state simply be stored within key_gen?
let net_state = state.network_state(&peers);
state.key_gen_mut().unwrap()
state
.key_gen_mut()
.unwrap()
.add_peers(peers, &self.hdb, net_state)?;
}
StateDsct::Observer | StateDsct::Validator => {
@ -78,7 +80,8 @@ impl<T: Contribution> Handler<T> {
if request_change_add {
let dhb = state.dhb_mut().unwrap();
info!("Change-Adding ('{}') to honey badger.", src_uid);
let step = dhb.vote_to_add(src_uid, src_pk)
let step = dhb
.vote_to_add(src_uid, src_pk)
.expect("Error adding new peer to HB");
self.step_queue.push(step);
}
@ -98,18 +101,24 @@ impl<T: Contribution> Handler<T> {
}
/// Handles a received `Part`.
fn handle_key_gen_part(&self, src_uid: &Uid, part: Part, state: &mut StateMachine<T>, peers: &Peers<T>)
-> Result<(), Error>
{
fn handle_key_gen_part(
&self,
src_uid: &Uid,
part: Part,
state: &mut StateMachine<T>,
peers: &Peers<T>,
) -> Result<(), Error> {
match state.state {
State::KeyGen { ref mut key_gen, .. } => {
State::KeyGen {
ref mut key_gen, ..
} => {
key_gen.handle_key_gen_part(src_uid, part, peers);
}
State::DeterminingNetworkState { ref network_state, .. } => {
match network_state.is_some() {
true => unimplemented!(),
false => unimplemented!(),
}
State::DeterminingNetworkState {
ref network_state, ..
} => match network_state.is_some() {
true => unimplemented!(),
false => unimplemented!(),
},
ref s => panic!(
"::handle_key_gen_part: State must be `GeneratingKeys`. \
@ -131,7 +140,9 @@ impl<T: Contribution> Handler<T> {
let mut complete = false;
match state.state {
State::KeyGen { ref mut key_gen, .. } => {
State::KeyGen {
ref mut key_gen, ..
} => {
if key_gen.handle_key_gen_ack(src_uid, ack, peers)? {
complete = true;
}
@ -158,7 +169,7 @@ impl<T: Contribution> Handler<T> {
state: &mut StateMachine<T>,
peers: &Peers<T>,
) -> Result<(), Error> {
use key_gen::{MessageKind, InstanceId};
use crate::key_gen::{InstanceId, MessageKind};
match instance_id {
InstanceId::User(id) => {
@ -179,16 +190,14 @@ impl<T: Contribution> Handler<T> {
None => error!("KeyGen message received with invalid instance"),
}
}
InstanceId::BuiltIn => {
match msg.into_kind() {
MessageKind::Part(part) => {
self.handle_key_gen_part(src_uid, part, state, peers)?;
}
MessageKind::Ack(ack) => {
self.handle_key_gen_ack(src_uid, ack, state, peers)?;
}
InstanceId::BuiltIn => match msg.into_kind() {
MessageKind::Part(part) => {
self.handle_key_gen_part(src_uid, part, state, peers)?;
}
}
MessageKind::Ack(ack) => {
self.handle_key_gen_ack(src_uid, ack, state, peers)?;
}
},
}
Ok(())
@ -295,8 +304,11 @@ impl<T: Contribution> Handler<T> {
_state: &mut StateMachine<T>,
peers: &Peers<T>,
) -> Result<(), Error> {
peers.wire_to_validators(WireMessage::hello_request_change_add(*self.hdb.uid(),
*self.hdb.addr(), self.hdb.secret_key().public_key()));
peers.wire_to_validators(WireMessage::hello_request_change_add(
*self.hdb.uid(),
*self.hdb.addr(),
self.hdb.secret_key().public_key(),
));
Ok(())
}
@ -324,7 +336,10 @@ impl<T: Contribution> Handler<T> {
let mut reset_fresh = false;
match state.state {
State::DeterminingNetworkState { ref mut network_state, .. } => {
State::DeterminingNetworkState {
ref mut network_state,
..
} => {
*network_state = Some(NetworkState::Active(net_info.clone()));
}
State::KeyGen { ref key_gen, .. } => {
@ -504,8 +519,13 @@ impl<T: Contribution> Handler<T> {
let new_id = Uid::new();
// tx.unbounded_send(key_gen::Message::instance_id().unwrap();
let instance_id = key_gen::InstanceId::User(new_id.clone());
let key_gen = key_gen::Machine::generate(self.hdb.uid(), self.hdb.secret_key().clone(),
&peers, tx, instance_id)?;
let key_gen = key_gen::Machine::generate(
self.hdb.uid(),
self.hdb.secret_key().clone(),
&peers,
tx,
instance_id,
)?;
self.key_gens.borrow_mut().insert(new_id, key_gen);
}
@ -559,7 +579,13 @@ impl<T: Contribution> Handler<T> {
}
WireMessageKind::KeyGen(instance_id, msg) => {
self.handle_key_gen_message(instance_id, msg, &src_uid.unwrap(), state, &self.hdb.peers())?;
self.handle_key_gen_message(
instance_id,
msg,
&src_uid.unwrap(),
state,
&self.hdb.peers(),
)?;
}
// Output by validators when a batch with a `ChangeState`
@ -712,9 +738,7 @@ impl<T: Contribution> Future for Handler<T> {
);
}
Target::All => {
peers.wire_to_all(
WireMessage::message(*self.hdb.uid(), hb_msg.message),
);
peers.wire_to_all(WireMessage::message(*self.hdb.uid(), hb_msg.message));
}
}
}

View File

@ -1,6 +1,19 @@
//! A hydrabadger consensus node.
//!
use super::{Error, Handler, StateDsct, StateMachine};
use crate::peer::{PeerHandler, Peers};
use crate::{
key_gen, BatchRx, Change, Contribution, EpochRx, EpochTx, InAddr, InternalMessage, InternalTx,
OutAddr, Uid, WireMessage, WireMessageKind, WireMessages,
};
use futures::{
future::{self, Either},
sync::mpsc,
};
use hbbft::crypto::{PublicKey, SecretKey};
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use rand::{self, Rand};
use std::{
collections::HashSet,
net::SocketAddr,
@ -10,27 +23,12 @@ use std::{
},
time::{Duration, Instant},
};
use rand::{self, Rand};
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use futures::{
future::{self, Either},
sync::mpsc,
};
use tokio::{
self,
net::{TcpListener, TcpStream},
prelude::*,
timer::{Interval, Delay},
timer::{Delay, Interval},
};
use hbbft::{
crypto::{PublicKey, SecretKey},
};
use peer::{PeerHandler, Peers};
use {
Change, Contribution, InAddr, InternalMessage, InternalTx, OutAddr, Uid, WireMessage,
WireMessageKind, WireMessages, BatchRx, EpochTx, EpochRx, key_gen,
};
use super::{Error, Handler, StateMachine, StateDsct};
// The number of random transactions to generate per interval.
const DEFAULT_TXN_GEN_COUNT: usize = 5;
@ -302,7 +300,10 @@ impl<T: Contribution> Hydrabadger<T> {
pub fn new_key_gen_instance(&self) -> mpsc::UnboundedReceiver<key_gen::Message> {
let (tx, rx) = mpsc::unbounded();
self.send_internal(InternalMessage::new_key_gen_instance(
self.inner.uid, OutAddr(*self.inner.addr), tx));
self.inner.uid,
OutAddr(*self.inner.addr),
tx,
));
rx
}
@ -396,16 +397,21 @@ impl<T: Contribution> Hydrabadger<T> {
})
.map_err(move |err| {
if is_optimistic {
warn!("Unable to connect to: {} ({e:?}: {e})", remote_addr, e=err);
warn!(
"Unable to connect to: {} ({e:?}: {e})",
remote_addr,
e = err
);
} else {
error!("Error connecting to: {} ({e:?}: {e})", remote_addr, e=err);
error!("Error connecting to: {} ({e:?}: {e})", remote_addr, e = err);
}
})
}
fn generate_contributions(self, gen_txns: Option<fn(usize, usize) -> T>)
-> impl Future<Item = (), Error = ()>
{
fn generate_contributions(
self,
gen_txns: Option<fn(usize, usize) -> T>,
) -> impl Future<Item = (), Error = ()> {
if let Some(gen_txns) = gen_txns {
let epoch_stream = self.register_epoch_listener();
let gen_delay = self.inner.config.txn_gen_interval;
@ -440,7 +446,6 @@ impl<T: Contribution> Hydrabadger<T> {
.map_err(|err| panic!("Contribution generation error: {:?}", err));
Either::A(gen_cntrb)
} else {
Either::B(future::ok(()))
}
@ -510,10 +515,12 @@ impl<T: Contribution> Hydrabadger<T> {
let local_sk = hdb.inner.secret_key.clone();
let connect = future::lazy(move || {
for &remote_addr in remotes.iter().filter(|&&ra| ra != hdb.inner.addr.0) {
tokio::spawn(
hdb.clone()
.connect_outgoing(remote_addr, local_sk.clone(), None, true),
);
tokio::spawn(hdb.clone().connect_outgoing(
remote_addr,
local_sk.clone(),
None,
true,
));
}
Ok(())
});
@ -570,8 +577,12 @@ impl<T: Contribution> HydrabadgerWeak<T> {
pub fn upgrade(self) -> Option<Hydrabadger<T>> {
self.inner.upgrade().and_then(|inner| {
self.handler.upgrade().and_then(|handler| {
self.batch_rx.upgrade().and_then(|batch_rx|{
Some(Hydrabadger { inner, handler, batch_rx })
self.batch_rx.upgrade().and_then(|batch_rx| {
Some(Hydrabadger {
inner,
handler,
batch_rx,
})
})
})
})

View File

@ -1,17 +1,17 @@
//! Synchronous distributed key generation.
use futures::sync::mpsc;
use hydrabadger::hydrabadger::Hydrabadger;
use super::Error;
use crate::hydrabadger::hydrabadger::Hydrabadger;
use crate::peer::Peers;
use crate::{Contribution, NetworkState, Uid, WireMessage};
use crossbeam::queue::SegQueue;
use futures::sync::mpsc;
use hbbft::{
crypto::{PublicKey, SecretKey},
sync_key_gen::{Ack, AckOutcome, Part, PartOutcome, SyncKeyGen},
};
use peer::Peers;
use std::{collections::BTreeMap};
use rand;
use super::Error;
use {Contribution, NetworkState, Uid, WireMessage};
use std::collections::BTreeMap;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum InstanceId {
@ -45,15 +45,14 @@ impl Message {
}
pub fn kind(&self) -> &MessageKind {
&self.kind
&self.kind
}
pub fn into_kind(self) -> MessageKind {
self.kind
self.kind
}
}
/// Key generation state.
#[derive(Debug)]
pub(super) enum State {
@ -70,20 +69,17 @@ pub(super) enum State {
ack_count: usize,
},
Complete {
sync_key_gen: Option<SyncKeyGen<Uid>>,
sync_key_gen: Option<SyncKeyGen<Uid>>,
public_key: Option<PublicKey>,
},
}
/// Forwards an `Ack` to a `SyncKeyGen` instance.
fn handle_ack(
uid: &Uid,
ack: Ack,
ack_count: &mut usize,
sync_key_gen: &mut SyncKeyGen<Uid>,
) {
fn handle_ack(uid: &Uid, ack: Ack, ack_count: &mut usize, sync_key_gen: &mut SyncKeyGen<Uid>) {
trace!("KEY GENERATION: Handling ack from '{}'...", uid);
let ack_outcome = sync_key_gen.handle_ack(uid, ack.clone()).expect("Failed to handle Ack.");
let ack_outcome = sync_key_gen
.handle_ack(uid, ack.clone())
.expect("Failed to handle Ack.");
match ack_outcome {
AckOutcome::Invalid(fault) => error!("Error handling ack: '{:?}':\n{:?}", ack, fault),
AckOutcome::Valid => *ack_count += 1,
@ -122,52 +118,52 @@ pub struct Machine {
}
impl Machine {
/// Creates and returns a new `Machine` in the `AwaitingPeers`
/// state.
pub fn awaiting_peers(
ack_queue: SegQueue<(Uid, Ack)>,
event_tx: Option<mpsc::UnboundedSender<Message>>,
instance_id: InstanceId,
) -> Machine {
Machine {
state: State::AwaitingPeers {
required_peers: Vec::new(),
available_peers: Vec::new(),
},
ack_queue,
event_tx,
instance_id,
}
}
/// Creates and returns a new `Machine` in the `AwaitingPeers`
/// state.
pub fn awaiting_peers(
ack_queue: SegQueue<(Uid, Ack)>,
event_tx: Option<mpsc::UnboundedSender<Message>>,
instance_id: InstanceId,
) -> Machine {
Machine {
state: State::AwaitingPeers {
required_peers: Vec::new(),
available_peers: Vec::new(),
},
ack_queue,
event_tx,
instance_id,
}
}
/// Creates and returns a new `Machine` in the `Generating`
/// state.
pub fn generate<T: Contribution>(
local_uid: &Uid,
/// Creates and returns a new `Machine` in the `Generating`
/// state.
pub fn generate<T: Contribution>(
local_uid: &Uid,
local_sk: SecretKey,
peers: &Peers<T>,
event_tx: mpsc::UnboundedSender<Message>,
instance_id: InstanceId,
) -> Result<Machine, Error> {
let mut m = Machine {
state: State::AwaitingPeers {
required_peers: Vec::new(),
available_peers: Vec::new(),
},
ack_queue: SegQueue::new(),
event_tx: Some(event_tx),
instance_id: instance_id.clone(),
};
let mut m = Machine {
state: State::AwaitingPeers {
required_peers: Vec::new(),
available_peers: Vec::new(),
},
ack_queue: SegQueue::new(),
event_tx: Some(event_tx),
instance_id: instance_id.clone(),
};
let (part, ack) = m.set_generating_keys(local_uid, local_sk, peers)?;
let (part, ack) = m.set_generating_keys(local_uid, local_sk, peers)?;
peers.wire_to_validators(WireMessage::key_gen_part(instance_id.clone(), part));
peers.wire_to_validators(WireMessage::key_gen_part(instance_id.clone(), part));
peers.wire_to_validators(WireMessage::key_gen_ack(instance_id, ack));
Ok(m)
}
Ok(m)
}
/// Sets the state to `AwaitingMorePeersForKeyGeneration`.
/// Sets the state to `AwaitingMorePeersForKeyGeneration`.
pub(super) fn set_generating_keys<T: Contribution>(
&mut self,
local_uid: &Uid,
@ -176,9 +172,7 @@ impl Machine {
) -> Result<(Part, Ack), Error> {
let (part, ack);
self.state = match self.state {
State::AwaitingPeers {
..
} => {
State::AwaitingPeers { .. } => {
// let threshold = config.keygen_peer_count / 3;
let threshold = peers.count_validators() / 3;
@ -192,24 +186,32 @@ impl Machine {
let mut rng = rand::OsRng::new().expect("Creating OS Rng has failed");
let (mut sync_key_gen, opt_part) =
SyncKeyGen::new(&mut rng, *local_uid, local_sk, public_keys.clone(), threshold)
.map_err(Error::SyncKeyGenNew)?;
let (mut sync_key_gen, opt_part) = SyncKeyGen::new(
&mut rng,
*local_uid,
local_sk,
public_keys.clone(),
threshold,
)
.map_err(Error::SyncKeyGenNew)?;
part = opt_part.expect("This node is not a validator (somehow)!");
trace!("KEY GENERATION: Handling our own `Part`...");
ack = match sync_key_gen.handle_part(&mut rng, &local_uid, part.clone())
.expect("Handling our own Part has failed") {
ack = match sync_key_gen
.handle_part(&mut rng, &local_uid, part.clone())
.expect("Handling our own Part has failed")
{
PartOutcome::Valid(Some(ack)) => ack,
PartOutcome::Invalid(faults) => panic!(
"Invalid part (FIXME: handle): {:?}", faults),
PartOutcome::Invalid(faults) => {
panic!("Invalid part (FIXME: handle): {:?}", faults)
}
PartOutcome::Valid(None) => panic!("No Ack produced when handling Part."),
};
trace!("KEY GENERATION: Queueing our own `Ack`...");
self.ack_queue.push((*local_uid, ack.clone()));
State::Generating {
State::Generating {
sync_key_gen: Some(sync_key_gen),
public_key: Some(pk),
public_keys,
@ -228,14 +230,14 @@ impl Machine {
/// Notify this key generation instance that peers have been added.
//
// TODO: Move some of this logic back to handler.
pub(super) fn add_peers<T: Contribution>(
pub(super) fn add_peers<T: Contribution>(
&mut self,
peers: &Peers<T>,
hdb: &Hydrabadger<T>,
net_state: NetworkState,
) -> Result<(), Error> {
match self.state {
State::AwaitingPeers { .. } => {
match self.state {
State::AwaitingPeers { .. } => {
if peers.count_validators() >= hdb.config().keygen_peer_count {
info!("BEGINNING KEY GENERATION");
@ -243,23 +245,24 @@ impl Machine {
let local_in_addr = *hdb.addr();
let local_pk = hdb.secret_key().public_key();
let (part, ack) = self.set_generating_keys(
&local_uid,
hdb.secret_key().clone(),
peers,
)?;
let (part, ack) =
self.set_generating_keys(&local_uid, hdb.secret_key().clone(), peers)?;
trace!("KEY GENERATION: Sending initial parts and our own ack.");
peers.wire_to_validators(
WireMessage::hello_from_validator(
local_uid,
local_in_addr,
local_pk,
net_state,
),
);
peers.wire_to_validators(WireMessage::key_gen_part(self.instance_id.clone(), part));
peers.wire_to_validators(WireMessage::key_gen_ack(self.instance_id.clone(), ack));
peers.wire_to_validators(WireMessage::hello_from_validator(
local_uid,
local_in_addr,
local_pk,
net_state,
));
peers.wire_to_validators(WireMessage::key_gen_part(
self.instance_id.clone(),
part,
));
peers.wire_to_validators(WireMessage::key_gen_ack(
self.instance_id.clone(),
ack,
));
}
}
State::Generating { .. } => {
@ -271,17 +274,17 @@ impl Machine {
State::Complete { .. } => {
warn!("Ignoring new established peer signal while key gen `State::Complete`.");
}
}
Ok(())
}
}
Ok(())
}
/// Handles a received `Part`.
pub(super) fn handle_key_gen_part<T: Contribution>(
&mut self,
src_uid: &Uid,
part: Part,
peers: &Peers<T>
) {
/// Handles a received `Part`.
pub(super) fn handle_key_gen_part<T: Contribution>(
&mut self,
src_uid: &Uid,
part: Part,
peers: &Peers<T>,
) {
match self.state {
State::Generating {
ref mut sync_key_gen,
@ -292,7 +295,7 @@ impl Machine {
// TODO: Move this match block into a function somewhere for re-use:
trace!("KEY GENERATION: Handling part from '{}'...", src_uid);
let mut rng = rand::OsRng::new().expect("Creating OS Rng has failed");
let mut skg = sync_key_gen.as_mut().unwrap();
let skg = sync_key_gen.as_mut().unwrap();
let ack = match skg.handle_part(&mut rng, src_uid, part) {
Ok(PartOutcome::Valid(Some(ack))) => ack,
Ok(PartOutcome::Invalid(faults)) => panic!(
@ -352,19 +355,21 @@ impl Machine {
ref mut ack_count,
..
} => {
{
let mut skg = sync_key_gen.as_mut().unwrap();
{
let skg = sync_key_gen.as_mut().unwrap();
trace!("KEY GENERATION: Queueing `Ack`.");
self.ack_queue.push((*src_uid, ack.clone()));
trace!("KEY GENERATION: Queueing `Ack`.");
self.ack_queue.push((*src_uid, ack.clone()));
handle_queued_acks(&self.ack_queue, *part_count, ack_count, skg, peers);
};
handle_queued_acks(&self.ack_queue, *part_count, ack_count, skg, peers);
};
let node_n = peers.count_validators() + 1;
let node_n = peers.count_validators() + 1;
if sync_key_gen.as_ref().unwrap().count_complete() == node_n && *ack_count >= node_n * node_n {
let skg = sync_key_gen.take().unwrap();
if sync_key_gen.as_ref().unwrap().count_complete() == node_n
&& *ack_count >= node_n * node_n
{
let skg = sync_key_gen.take().unwrap();
info!("KEY GENERATION: All acks received and handled.");
debug!(" Peers complete: {}", skg.count_complete());
debug!(" Part count: {}", part_count);
@ -378,39 +383,45 @@ impl Machine {
}
match complete {
Some((sync_key_gen, public_key)) => {
self.state = State::Complete { sync_key_gen: Some(sync_key_gen), public_key: Some(public_key) };
Ok(true)
},
None => Ok(false),
Some((sync_key_gen, public_key)) => {
self.state = State::Complete {
sync_key_gen: Some(sync_key_gen),
public_key: Some(public_key),
};
Ok(true)
}
None => Ok(false),
}
}
/// Returns the state of this key generation instance.
pub(super) fn state(&self) -> &State {
&self.state
&self.state
}
/// Returns true if this key generation instance is awaiting more peers.
pub(super) fn is_awaiting_peers(&self) -> bool {
match self.state {
State::AwaitingPeers { .. } => true,
_ => false,
}
match self.state {
State::AwaitingPeers { .. } => true,
_ => false,
}
}
/// Returns the `SyncKeyGen` instance and `PublicKey` if this key
/// generation instance is complete.
pub(super) fn complete(&mut self) -> Option<(SyncKeyGen<Uid>, PublicKey)> {
match self.state {
State::Complete { ref mut sync_key_gen, ref mut public_key } => {
sync_key_gen.take().and_then(|skg| public_key.take().map(|pk| (skg, pk)))
}
_ => None
}
match self.state {
State::Complete {
ref mut sync_key_gen,
ref mut public_key,
} => sync_key_gen
.take()
.and_then(|skg| public_key.take().map(|pk| (skg, pk))),
_ => None,
}
}
pub(super) fn event_tx(&self) -> Option<&mpsc::UnboundedSender<Message>> {
self.event_tx.as_ref()
}
}
}

View File

@ -1,17 +1,14 @@
mod handler;
mod hydrabadger;
mod state;
pub mod key_gen;
mod state;
use std;
use bincode;
use hbbft::{
dynamic_honey_badger::Error as DhbError,
sync_key_gen::Error as SyncKeyGenError,
};
use {Change, Message, Uid};
use self::handler::Handler;
use self::state::{State, StateMachine};
use crate::{Change, Message, Uid};
use bincode;
use hbbft::{dynamic_honey_badger::Error as DhbError, sync_key_gen::Error as SyncKeyGenError};
use std;
pub use self::hydrabadger::{Config, Hydrabadger, HydrabadgerWeak};
pub use self::state::StateDsct;

View File

@ -5,19 +5,22 @@
#![allow(dead_code)]
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use super::{key_gen, Config, Error, InputOrMessage};
use crate::peer::Peers;
use crate::{ActiveNetworkInfo, Contribution, NetworkNodeInfo, NetworkState, Step, Uid};
use crossbeam::queue::SegQueue;
use rand::StdRng;
use hbbft::{
crypto::{PublicKey, SecretKey},
dynamic_honey_badger::{DynamicHoneyBadger, JoinPlan, Error as DhbError},
dynamic_honey_badger::{DynamicHoneyBadger, Error as DhbError, JoinPlan},
sync_key_gen::Ack,
NetworkInfo,
};
use peer::Peers;
use rand::StdRng;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::{collections::BTreeMap, fmt};
use super::{Config, Error, InputOrMessage, key_gen};
use {Contribution, NetworkNodeInfo, NetworkState, Step, Uid, ActiveNetworkInfo};
/// A `State` discriminant.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@ -60,7 +63,6 @@ impl From<usize> for StateDsct {
}
}
/// The current hydrabadger state.
//
pub enum State<T: Contribution> {
@ -95,7 +97,6 @@ impl<T: Contribution> State<T> {
}
}
pub struct StateMachine<T: Contribution> {
pub(crate) state: State<T>,
pub(crate) dsct: Arc<AtomicUsize>,
@ -105,16 +106,22 @@ impl<T: Contribution> StateMachine<T> {
/// Returns a new `State::Disconnected`.
pub(super) fn disconnected() -> StateMachine<T> {
StateMachine {
state: State::Disconnected { },
state: State::Disconnected {},
dsct: Arc::new(AtomicUsize::new(0)),
}
}
/// Sets the publicly visible state discriminant and returns the previous value.
fn set_state_discriminant(&self) -> StateDsct {
let sd = StateDsct::from(self.dsct.swap(self.state.discriminant().into(),
Ordering::Release));
info!("State has been set from '{}' to '{}'.", sd, self.state.discriminant());
let sd = StateDsct::from(
self.dsct
.swap(self.state.discriminant().into(), Ordering::Release),
);
info!(
"State has been set from '{}' to '{}'.",
sd,
self.state.discriminant()
);
sd
}
@ -124,9 +131,12 @@ impl<T: Contribution> StateMachine<T> {
State::Disconnected {} => {
info!("Setting state: `KeyGen`.");
State::KeyGen {
key_gen: key_gen::Machine::awaiting_peers(SegQueue::new(), None,
key_gen::InstanceId::BuiltIn),
iom_queue: Some(SegQueue::new())
key_gen: key_gen::Machine::awaiting_peers(
SegQueue::new(),
None,
key_gen::InstanceId::BuiltIn,
),
iom_queue: Some(SegQueue::new()),
}
}
State::DeterminingNetworkState {
@ -140,9 +150,12 @@ impl<T: Contribution> StateMachine<T> {
);
info!("Setting state: `KeyGen`.");
State::KeyGen {
key_gen: key_gen::Machine::awaiting_peers(ack_queue.take().unwrap(), None,
key_gen::InstanceId::BuiltIn),
iom_queue: iom_queue.take() ,
key_gen: key_gen::Machine::awaiting_peers(
ack_queue.take().unwrap(),
None,
key_gen::InstanceId::BuiltIn,
),
iom_queue: iom_queue.take(),
}
}
ref s => {
@ -161,7 +174,9 @@ impl<T: Contribution> StateMachine<T> {
/// `AwaitingMorePeersForKeyGeneration`, otherwise panics.
pub(super) fn set_determining_network_state_active(&mut self, net_info: ActiveNetworkInfo) {
self.state = match self.state {
State::KeyGen { ref mut iom_queue, .. } => {
State::KeyGen {
ref mut iom_queue, ..
} => {
info!("Setting state: `DeterminingNetworkState`.");
State::DeterminingNetworkState {
ack_queue: Some(SegQueue::new()),
@ -169,7 +184,9 @@ impl<T: Contribution> StateMachine<T> {
network_state: Some(NetworkState::Active(net_info)),
}
}
_ => panic!("Cannot reset network state when state is not `AwaitingMorePeersForKeyGeneration`."),
_ => panic!(
"Cannot reset network state when state is not `AwaitingMorePeersForKeyGeneration`."
),
};
self.set_state_discriminant();
}
@ -191,8 +208,8 @@ impl<T: Contribution> StateMachine<T> {
State::DeterminingNetworkState {
ref mut iom_queue, ..
} => {
let (dhb, dhb_step) = DynamicHoneyBadger::new_joining(local_uid, local_sk, jp,
StdRng::new()?)?;
let (dhb, dhb_step) =
DynamicHoneyBadger::new_joining(local_uid, local_sk, jp, StdRng::new()?)?;
step_queue.push(dhb_step);
iom_queue_ret = iom_queue.take().unwrap();
@ -245,7 +262,8 @@ impl<T: Contribution> StateMachine<T> {
ref mut iom_queue,
..
} => {
let (sync_key_gen, public_key) = key_gen.complete().expect("Key generation incomplete");
let (sync_key_gen, public_key) =
key_gen.complete().expect("Key generation incomplete");
// let mut sync_key_gen = sync_key_gen.take().unwrap();
assert_eq!(public_key, local_sk.public_key());
@ -378,31 +396,19 @@ impl<T: Contribution> StateMachine<T> {
})
.collect::<Vec<_>>();
match self.state {
State::KeyGen { ref key_gen, .. } => {
match key_gen.state() {
KeyGenState::AwaitingPeers { .. } => {
NetworkState::AwaitingMorePeersForKeyGeneration(peer_infos)
},
KeyGenState::Generating { ref public_keys, .. } => {
NetworkState::GeneratingKeys(peer_infos, public_keys.clone())
},
_ => NetworkState::Unknown(peer_infos),
State::KeyGen { ref key_gen, .. } => match key_gen.state() {
KeyGenState::AwaitingPeers { .. } => {
NetworkState::AwaitingMorePeersForKeyGeneration(peer_infos)
}
}
KeyGenState::Generating {
ref public_keys, ..
} => NetworkState::GeneratingKeys(peer_infos, public_keys.clone()),
_ => NetworkState::Unknown(peer_infos),
},
State::Observer { ref dhb } | State::Validator { ref dhb } => {
// FIXME: Ensure that `peer_info` matches `NetworkInfo` from HB.
let pk_set = dhb
.as_ref()
.unwrap()
.netinfo()
.public_key_set()
.clone();
let pk_map = dhb
.as_ref()
.unwrap()
.netinfo()
.public_key_map()
.clone();
let pk_set = dhb.as_ref().unwrap().netinfo().public_key_set().clone();
let pk_map = dhb.as_ref().unwrap().netinfo().public_key_map().clone();
NetworkState::Active((peer_infos, pk_set, pk_map))
}
_ => NetworkState::Unknown(peer_infos),
@ -438,7 +444,9 @@ impl<T: Contribution> StateMachine<T> {
/// Returns a reference to the key generation instance.
pub(super) fn key_gen_mut(&mut self) -> Option<&mut key_gen::Machine> {
match self.state {
State::KeyGen { ref mut key_gen, .. } => Some(key_gen),
State::KeyGen {
ref mut key_gen, ..
} => Some(key_gen),
_ => None,
}
}
@ -472,7 +480,7 @@ impl<T: Contribution> StateMachine<T> {
return step_opt;
}
| State::KeyGen { ref iom_queue, .. }
State::KeyGen { ref iom_queue, .. }
| State::DeterminingNetworkState { ref iom_queue, .. } => {
trace!("State::handle_iom: Queueing: {:?}", iom);
iom_queue.as_ref().unwrap().push(iom);

View File

@ -1,9 +1,19 @@
#![cfg_attr(feature = "nightly", feature(alloc_system))]
#![cfg_attr(feature = "nightly", feature(proc_macro))]
#![cfg_attr(feature = "cargo-clippy",
allow(large_enum_variant, new_without_default_derive, expect_fun_call, or_fun_call,
useless_format, cyclomatic_complexity, needless_pass_by_value, module_inception,
match_bool))]
#![cfg_attr(
feature = "cargo-clippy",
allow(
large_enum_variant,
new_without_default_derive,
expect_fun_call,
or_fun_call,
useless_format,
cyclomatic_complexity,
needless_pass_by_value,
module_inception,
match_bool
)
)]
#[cfg(feature = "nightly")]
extern crate alloc_system;
@ -52,6 +62,14 @@ pub mod peer;
use bytes::{Bytes, BytesMut};
use futures::{sync::mpsc, AsyncSink, StartSend};
use hbbft::{
crypto::{PublicKey, PublicKeySet, SecretKey, Signature},
dynamic_honey_badger::{
Change as DhbChange, DynamicHoneyBadger, JoinPlan, Message as DhbMessage,
},
sync_key_gen::{Ack, Part},
Contribution as HbbftContribution, DaStep as MessagingStep,
};
use rand::{Rand, Rng};
use serde::{de::DeserializeOwned, Serialize};
use std::{
@ -61,23 +79,21 @@ use std::{
net::SocketAddr,
ops::Deref,
};
use tokio::{io, net::TcpStream, prelude::*, codec::{Framed, LengthDelimitedCodec}};
use uuid::Uuid;
use hbbft::{
crypto::{PublicKey, PublicKeySet, SecretKey, Signature},
dynamic_honey_badger::{JoinPlan, Message as DhbMessage, DynamicHoneyBadger, Change as DhbChange},
sync_key_gen::{Ack, Part},
DaStep as MessagingStep,
Contribution as HbbftContribution,
use tokio::{
codec::{Framed, LengthDelimitedCodec},
io,
net::TcpStream,
prelude::*,
};
use uuid::Uuid;
pub use blockchain::{Blockchain, MiningError};
pub use hydrabadger::{Config, Hydrabadger, HydrabadgerWeak};
pub use crate::blockchain::{Blockchain, MiningError};
pub use crate::hydrabadger::{Config, Hydrabadger, HydrabadgerWeak};
// TODO: Create a separate, library-wide error type.
pub use hydrabadger::Error;
pub use crate::hydrabadger::key_gen;
pub use crate::hydrabadger::Error;
pub use crate::hydrabadger::StateDsct;
pub use hbbft::dynamic_honey_badger::Batch;
pub use hydrabadger::StateDsct;
pub use hydrabadger::key_gen;
/// Transmit half of the wire message channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound):
@ -111,15 +127,15 @@ type EpochTx = mpsc::UnboundedSender<u64>;
// TODO: Use a bounded tx/rx (find a sensible upper bound):
pub type EpochRx = mpsc::UnboundedReceiver<u64>;
pub trait Contribution:
HbbftContribution + Clone + Debug + Serialize + DeserializeOwned + 'static
{}
{
}
impl<C> Contribution for C where
C: HbbftContribution + Clone + Debug + Serialize + DeserializeOwned + 'static
{}
{
}
/// A unique identifier.
#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
@ -369,9 +385,10 @@ impl<T: Contribution> Stream for WireMessages<T> {
// Verify signature for certain variants.
match msg.kind {
| WireMessageKind::Message(..)
| WireMessageKind::KeyGen(..) => {
let peer_pk = self.peer_pk.ok_or(Error::VerificationMessageReceivedUnknownPeer)?;
WireMessageKind::Message(..) | WireMessageKind::KeyGen(..) => {
let peer_pk = self
.peer_pk
.ok_or(Error::VerificationMessageReceivedUnknownPeer)?;
if !peer_pk.verify(&s_msg.sig, &s_msg.message) {
return Err(Error::InvalidSignature);
}
@ -471,11 +488,19 @@ impl<T: Contribution> InternalMessage<T> {
}
pub fn hb_contribution(src_uid: Uid, src_addr: OutAddr, contrib: T) -> InternalMessage<T> {
InternalMessage::new(Some(src_uid), src_addr, InternalMessageKind::HbContribution(contrib))
InternalMessage::new(
Some(src_uid),
src_addr,
InternalMessageKind::HbContribution(contrib),
)
}
pub fn hb_vote(src_uid: Uid, src_addr: OutAddr, change: Change) -> InternalMessage<T> {
InternalMessage::new(Some(src_uid), src_addr, InternalMessageKind::HbChange(change))
InternalMessage::new(
Some(src_uid),
src_addr,
InternalMessageKind::HbChange(change),
)
}
pub fn peer_disconnect(src_uid: Uid, src_addr: OutAddr) -> InternalMessage<T> {
@ -496,11 +521,16 @@ impl<T: Contribution> InternalMessage<T> {
)
}
pub fn new_key_gen_instance(src_uid: Uid, src_addr: OutAddr,
tx: mpsc::UnboundedSender<key_gen::Message>) -> InternalMessage<T>
{
InternalMessage::new(Some(src_uid), src_addr,
InternalMessageKind::NewKeyGenInstance(tx))
pub fn new_key_gen_instance(
src_uid: Uid,
src_addr: OutAddr,
tx: mpsc::UnboundedSender<key_gen::Message>,
) -> InternalMessage<T> {
InternalMessage::new(
Some(src_uid),
src_addr,
InternalMessageKind::NewKeyGenInstance(tx),
)
}
pub fn new_outgoing_connection(src_addr: OutAddr) -> InternalMessage<T> {

View File

@ -2,10 +2,14 @@
#![allow(unused_imports, dead_code, unused_variables, unused_mut)]
use crate::hydrabadger::{Error, Hydrabadger};
use crate::{
Contribution, InAddr, InternalMessage, OutAddr, Uid, WireMessage, WireMessageKind,
WireMessages, WireRx, WireTx,
};
use futures::sync::mpsc;
use hbbft::crypto::PublicKey;
use hbbft::dynamic_honey_badger::Input as HbInput;
use hydrabadger::{Error, Hydrabadger};
use serde::{Deserialize, Serialize};
use std::{
borrow::Borrow,
@ -15,10 +19,6 @@ use std::{
},
};
use tokio::prelude::*;
use {
Contribution, InAddr, InternalMessage, OutAddr, Uid, WireMessage, WireMessageKind,
WireMessages, WireRx, WireTx,
};
/// The state for each connected client.
pub struct PeerHandler<T: Contribution> {
@ -527,7 +527,8 @@ impl<T: Contribution> Peers<T> {
}
pub(crate) fn wire_to_all(&self, msg: WireMessage<T>) {
for (_p_addr, peer) in self.peers
for (_p_addr, peer) in self
.peers
.iter()
.filter(|(&p_addr, _)| p_addr != OutAddr(self.local_addr.0))
{
@ -549,14 +550,17 @@ impl<T: Contribution> Peers<T> {
///
/// If the target is not an established node, the message will be returned
/// along with an incremented retry count.
pub(crate) fn wire_to(&self, tar_uid: Uid, msg: WireMessage<T>, retry_count: usize)
-> Option<(Uid, WireMessage<T>, usize)>
{
pub(crate) fn wire_to(
&self,
tar_uid: Uid,
msg: WireMessage<T>,
retry_count: usize,
) -> Option<(Uid, WireMessage<T>, usize)> {
match self.get_by_uid(&tar_uid) {
Some(p) => {
p.tx().unbounded_send(msg).unwrap();
None
},
}
None => {
info!(
"Node '{}' is not yet established. Queueing message for now (retry_count: {}).",