Disentangle and extract sync key gen functionality from handler and state.

This commit is contained in:
c0gent 2018-11-26 10:15:39 -08:00
parent 0a2d062955
commit 5efd4e65af
No known key found for this signature in database
GPG Key ID: 9CC25E71A743E892
5 changed files with 442 additions and 327 deletions

View File

@ -4,27 +4,21 @@
//! * Do not make state changes directly in this module (use closures, etc.). //! * Do not make state changes directly in this module (use closures, etc.).
//! //!
#![allow(unused_imports, dead_code, unused_variables, unused_mut, unused_assignments,
unreachable_code)]
use super::WIRE_MESSAGE_RETRY_MAX; use super::WIRE_MESSAGE_RETRY_MAX;
use super::{Error, Hydrabadger, InputOrMessage, StateMachine, State, StateDsct}; use super::{Error, Hydrabadger, InputOrMessage, StateMachine, State, StateDsct};
use crossbeam::queue::SegQueue; use crossbeam::queue::SegQueue;
use hbbft::{ use hbbft::{
crypto::{PublicKey, PublicKeySet}, crypto::PublicKey,
dynamic_honey_badger::{ChangeState, JoinPlan, Message as DhbMessage, Change as DhbChange}, dynamic_honey_badger::{ChangeState, JoinPlan, Change as DhbChange},
sync_key_gen::{Ack, AckOutcome, Part, PartOutcome, SyncKeyGen}, sync_key_gen::{Ack, Part},
Target, Epoched, Target,
}; };
use peer::Peers; use peer::Peers;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tokio::{self, prelude::*}; use tokio::{self, prelude::*};
use { use {
Change, Contribution, InAddr, InternalMessage, InternalMessageKind, InternalRx, Message, Contribution, InAddr, InternalMessage, InternalMessageKind, InternalRx,
NetworkNodeInfo, NetworkState, OutAddr, Step, Uid, WireMessage, WireMessageKind, BatchTx, NetworkState, OutAddr, Step, Uid, WireMessage, WireMessageKind, BatchTx,
}; };
use rand;
/// Hydrabadger event (internal message) handler. /// Hydrabadger event (internal message) handler.
pub struct Handler<T: Contribution> { pub struct Handler<T: Contribution> {
@ -53,55 +47,22 @@ impl<T: Contribution> Handler<T> {
fn handle_new_established_peer( fn handle_new_established_peer(
&self, &self,
src_uid: Uid, src_uid: Uid,
_src_addr: OutAddr,
src_pk: PublicKey, src_pk: PublicKey,
request_change_add: bool, request_change_add: bool,
state: &mut StateMachine<T>, state: &mut StateMachine<T>,
peers: &Peers<T>, peers: &Peers<T>,
) -> Result<(), Error> { ) -> Result<(), Error> {
match state.state { match state.discriminant() {
State::Disconnected { .. } | State::DeterminingNetworkState { .. } => { StateDsct::Disconnected | StateDsct::DeterminingNetworkState => {
state.update_peer_connection_added(&peers); state.update_peer_connection_added(&peers);
} }
State::AwaitingMorePeersForKeyGeneration { .. } => { StateDsct::KeyGen => {
if peers.count_validators() >= self.hdb.config().keygen_peer_count { // TODO: Should network state simply be stored within key_gen?
info!("== BEGINNING KEY GENERATION =="); let net_state = state.network_state(&peers);
state.key_gen_mut().unwrap()
let local_uid = *self.hdb.uid(); .handle_new_established_peer(peers, &self.hdb, net_state)?;
let local_in_addr = *self.hdb.addr();
let local_sk = self.hdb.secret_key().public_key();
let (part, ack) = state.set_generating_keys(
&local_uid,
self.hdb.secret_key().clone(),
peers,
self.hdb.config(),
)?;
info!("KEY GENERATION: Sending initial parts and our own ack.");
peers.wire_to_validators(
WireMessage::hello_from_validator(
local_uid,
local_in_addr,
local_sk,
state.network_state(&peers),
),
);
peers.wire_to_validators(WireMessage::key_gen_part(part));
// FIXME: QUEUE ACKS UNTIL PARTS ARE ALL RECEIVED:
peers.wire_to_validators(WireMessage::key_gen_part_ack(ack));
}
} }
State::GeneratingKeys { .. } => { StateDsct::Observer | StateDsct::Validator => {
// This *could* be called multiple times when initially
// establishing outgoing connections. Do nothing for now.
warn!(
"hydrabadger::Handler::handle_new_established_peer: Ignoring new established \
peer signal while `StateDsct::GeneratingKeys`."
);
}
State::Observer { .. } | State::Validator { .. } => {
// If the new peer sends a request-change-add (to be a // If the new peer sends a request-change-add (to be a
// validator), input the change into HB and broadcast, etc. // validator), input the change into HB and broadcast, etc.
if request_change_add { if request_change_add {
@ -126,88 +87,10 @@ impl<T: Contribution> Handler<T> {
Ok(()) Ok(())
} }
fn handle_ack(
&self,
uid: &Uid,
ack: Ack,
sync_key_gen: &mut SyncKeyGen<Uid>,
ack_count: &mut usize,
) {
info!("KEY GENERATION: Handling ack from '{}'...", uid);
let ack_outcome = sync_key_gen.handle_ack(uid, ack.clone()).expect("Failed to handle Ack.");
match ack_outcome {
AckOutcome::Invalid(fault) => error!("Error handling ack: '{:?}':\n{:?}", ack, fault),
AckOutcome::Valid => *ack_count += 1,
}
}
fn handle_queued_acks(
&self,
ack_queue: &SegQueue<(Uid, Ack)>,
sync_key_gen: &mut SyncKeyGen<Uid>,
part_count: usize,
ack_count: &mut usize,
) {
if part_count == self.hdb.config().keygen_peer_count + 1 {
info!("KEY GENERATION: Handling queued acks...");
debug!(" Peers complete: {}", sync_key_gen.count_complete());
debug!(" Part count: {}", part_count);
debug!(" Ack count: {}", ack_count);
while let Some((uid, ack)) = ack_queue.try_pop() {
self.handle_ack(&uid, ack, sync_key_gen, ack_count);
}
}
}
fn handle_key_gen_part(&self, src_uid: &Uid, part: Part, state: &mut StateMachine<T>) { fn handle_key_gen_part(&self, src_uid: &Uid, part: Part, state: &mut StateMachine<T>) {
match state.state { match state.state {
State::GeneratingKeys { State::KeyGen { ref mut key_gen, .. } => {
ref mut sync_key_gen, key_gen.handle_key_gen_part(src_uid, part, &self.hdb);
ref ack_queue,
ref mut part_count,
ref mut ack_count,
..
} => {
// TODO: Move this match block into a function somewhere for re-use:
info!("KEY GENERATION: Handling part from '{}'...", src_uid);
let mut rng = rand::OsRng::new().expect("Creating OS Rng has failed");
let mut skg = sync_key_gen.as_mut().unwrap();
let ack = match skg.handle_part(&mut rng, src_uid, part) {
Ok(PartOutcome::Valid(Some(ack))) => ack,
Ok(PartOutcome::Invalid(faults)) => panic!(
"Invalid part \
(FIXME: handle): {:?}",
faults
),
Ok(PartOutcome::Valid(None)) => {
error!("`DynamicHoneyBadger::handle_part` returned `None`.");
return;
}
Err(err) => {
error!("Error handling Part: {:?}", err);
return;
}
};
*part_count += 1;
info!("KEY GENERATION: Queueing `Ack`.");
ack_queue.as_ref().unwrap().push((*src_uid, ack.clone()));
self.handle_queued_acks(ack_queue.as_ref().unwrap(), skg, *part_count, ack_count);
let peers = self.hdb.peers();
info!(
"KEY GENERATION: Part from '{}' acknowledged. Broadcasting ack...",
src_uid
);
peers.wire_to_validators(WireMessage::key_gen_part_ack(ack));
debug!(" Peers complete: {}", skg.count_complete());
debug!(" Part count: {}", part_count);
debug!(" Ack count: {}", ack_count);
} }
State::DeterminingNetworkState { ref network_state, .. } => { State::DeterminingNetworkState { ref network_state, .. } => {
match network_state.is_some() { match network_state.is_some() {
@ -230,32 +113,12 @@ impl<T: Contribution> Handler<T> {
state: &mut StateMachine<T>, state: &mut StateMachine<T>,
peers: &Peers<T>, peers: &Peers<T>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut keygen_is_complete = false; let mut complete = false;
match state.state { match state.state {
State::GeneratingKeys { State::KeyGen { ref mut key_gen, .. } => {
ref mut sync_key_gen, if key_gen.handle_key_gen_ack(src_uid, ack, &self.hdb)? {
ref ack_queue, complete = true;
ref part_count,
ref mut ack_count,
..
} => {
let mut skg = sync_key_gen.as_mut().unwrap();
info!("KEY GENERATION: Queueing `Ack`.");
ack_queue.as_ref().unwrap().push((*src_uid, ack.clone()));
self.handle_queued_acks(ack_queue.as_ref().unwrap(), skg, *part_count, ack_count);
let node_n = self.hdb.config().keygen_peer_count + 1;
if skg.count_complete() == node_n && *ack_count >= node_n * node_n {
info!("KEY GENERATION: All acks received and handled.");
debug!(" Peers complete: {}", skg.count_complete());
debug!(" Part count: {}", part_count);
debug!(" Ack count: {}", ack_count);
assert!(skg.is_ready());
keygen_is_complete = true;
} }
} }
State::Validator { .. } | State::Observer { .. } => { State::Validator { .. } | State::Observer { .. } => {
@ -266,7 +129,7 @@ impl<T: Contribution> Handler<T> {
} }
_ => panic!("::handle_key_gen_ack: State must be `GeneratingKeys`."), _ => panic!("::handle_key_gen_ack: State must be `GeneratingKeys`."),
} }
if keygen_is_complete { if complete {
self.instantiate_hb(None, state, peers)?; self.instantiate_hb(None, state, peers)?;
} }
Ok(()) Ok(())
@ -290,7 +153,7 @@ impl<T: Contribution> Handler<T> {
info!("Received join plan."); info!("Received join plan.");
self.instantiate_hb(Some(jp), state, peers)?; self.instantiate_hb(Some(jp), state, peers)?;
} }
StateDsct::AwaitingMorePeersForKeyGeneration | StateDsct::GeneratingKeys => { StateDsct::KeyGen => {
panic!( panic!(
"hydrabadger::Handler::handle_join_plan: Received join plan while \ "hydrabadger::Handler::handle_join_plan: Received join plan while \
`{}`", `{}`",
@ -314,7 +177,7 @@ impl<T: Contribution> Handler<T> {
match state.discriminant() { match state.discriminant() {
StateDsct::Disconnected => unimplemented!(), StateDsct::Disconnected => unimplemented!(),
StateDsct::DeterminingNetworkState | StateDsct::GeneratingKeys => { StateDsct::DeterminingNetworkState | StateDsct::KeyGen => {
info!("== INSTANTIATING HONEY BADGER =="); info!("== INSTANTIATING HONEY BADGER ==");
match jp_opt { match jp_opt {
Some(jp) => { Some(jp) => {
@ -343,7 +206,6 @@ impl<T: Contribution> Handler<T> {
.map_err(|_| Error::InstantiateHbListenerDropped)?; .map_err(|_| Error::InstantiateHbListenerDropped)?;
} }
} }
StateDsct::AwaitingMorePeersForKeyGeneration => unimplemented!(),
StateDsct::Observer => { StateDsct::Observer => {
// TODO: Add checks to ensure that `net_info` is consistent // TODO: Add checks to ensure that `net_info` is consistent
// with HB's netinfo. // with HB's netinfo.
@ -371,7 +233,7 @@ impl<T: Contribution> Handler<T> {
/// without including this node. /// without including this node.
fn reset_peer_connections( fn reset_peer_connections(
&self, &self,
state: &mut StateMachine<T>, _state: &mut StateMachine<T>,
peers: &Peers<T>, peers: &Peers<T>,
) -> Result<(), Error> { ) -> Result<(), Error> {
peers.wire_to_validators(WireMessage::hello_request_change_add(*self.hdb.uid(), peers.wire_to_validators(WireMessage::hello_request_change_add(*self.hdb.uid(),
@ -395,24 +257,28 @@ impl<T: Contribution> Handler<T> {
peer_infos = p_infos; peer_infos = p_infos;
state.set_awaiting_more_peers(); state.set_awaiting_more_peers();
} }
NetworkState::GeneratingKeys(p_infos, public_keys) => { NetworkState::GeneratingKeys(p_infos, _public_keys) => {
peer_infos = p_infos; peer_infos = p_infos;
} }
NetworkState::Active(net_info) => { NetworkState::Active(net_info) => {
peer_infos = net_info.0.clone(); peer_infos = net_info.0.clone();
let mut reset_fresh = false;
match state.state { match state.state {
State::DeterminingNetworkState { ref mut network_state, .. } => { State::DeterminingNetworkState { ref mut network_state, .. } => {
*network_state = Some(NetworkState::Active(net_info)); *network_state = Some(NetworkState::Active(net_info.clone()));
} }
State::AwaitingMorePeersForKeyGeneration { .. } => { State::KeyGen { ref key_gen, .. } => {
// Key generation has completed and we were not a part if key_gen.is_awaiting_peers() {
// of it. Need to restart as a freshly connecting node. reset_fresh = true;
state.set_determining_network_state_active(net_info); } else {
self.reset_peer_connections(state, peers)?; panic!(
"Handler::net_state: Received `NetworkState::Active` while `{}`.",
state.discriminant()
);
}
} }
State::Disconnected { .. } State::Disconnected { .. } => {
| State::GeneratingKeys { .. } => {
panic!( panic!(
"Handler::net_state: Received `NetworkState::Active` while `{}`.", "Handler::net_state: Received `NetworkState::Active` while `{}`.",
state.discriminant() state.discriminant()
@ -420,6 +286,12 @@ impl<T: Contribution> Handler<T> {
} }
_ => {} _ => {}
} }
if reset_fresh {
// Key generation has completed and we were not a part
// of it. Need to restart as a freshly connecting node.
state.set_determining_network_state_active(net_info);
self.reset_peer_connections(state, peers)?;
}
} }
NetworkState::None => panic!("`NetworkState::None` received."), NetworkState::None => panic!("`NetworkState::None` received."),
} }
@ -461,12 +333,7 @@ impl<T: Contribution> Handler<T> {
State::DeterminingNetworkState { .. } => { State::DeterminingNetworkState { .. } => {
// unimplemented!(); // unimplemented!();
} }
State::AwaitingMorePeersForKeyGeneration { .. } => { State::KeyGen { .. } => {
// info!("Removing peer ({}: '{}') from await list.",
// src_out_addr, src_uid.unwrap());
// state.peer_connection_dropped(&*self.hdb.peers());
}
State::GeneratingKeys { .. } => {
// Do something here (possibly panic). // Do something here (possibly panic).
} }
State::Observer { .. } => { State::Observer { .. } => {
@ -527,7 +394,7 @@ impl<T: Contribution> Handler<T> {
// Modify state accordingly: // Modify state accordingly:
self.handle_new_established_peer( self.handle_new_established_peer(
src_uid.unwrap(), src_uid.unwrap(),
src_out_addr, // src_out_addr,
src_pk, src_pk,
request_change_add, request_change_add,
state, state,
@ -609,7 +476,7 @@ impl<T: Contribution> Handler<T> {
// Modify state accordingly: // Modify state accordingly:
self.handle_new_established_peer( self.handle_new_established_peer(
src_uid_new, src_uid_new,
src_out_addr, // src_out_addr,
src_pk, src_pk,
false, false,
state, state,
@ -747,14 +614,14 @@ impl<T: Contribution> Future for Handler<T> {
// Send the batch along its merry way: // Send the batch along its merry way:
if !self.batch_tx.is_closed() { if !self.batch_tx.is_closed() {
if let Err(err) = self.batch_tx.unbounded_send(batch) { if let Err(_err) = self.batch_tx.unbounded_send(batch) {
error!("Unable to send batch output. Shutting down..."); error!("Unable to send batch output. Shutting down...");
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
} else { } else {
// Notify epoch listeners that a batch has been output. // Notify epoch listeners that a batch has been output.
let mut dropped_listeners = Vec::new(); let mut dropped_listeners = Vec::new();
for (i, listener) in self.hdb.epoch_listeners().iter().enumerate() { for (i, listener) in self.hdb.epoch_listeners().iter().enumerate() {
if let Err(err) = listener.unbounded_send(batch_epoch + 1) { if let Err(_err) = listener.unbounded_send(batch_epoch + 1) {
dropped_listeners.push(i); dropped_listeners.push(i);
error!("Epoch listener {} has dropped.", i); error!("Epoch listener {} has dropped.", i);
} }

View File

@ -1,24 +1,19 @@
//! A hydrabadger consensus node. //! A hydrabadger consensus node.
//! //!
#![allow(unused_imports, dead_code, unused_variables, unused_mut, unused_assignments,
unreachable_code)]
use futures::{ use futures::{
future::{self, Either}, future::{self, Either},
sync::mpsc, sync::mpsc,
}; };
use hbbft::{ use hbbft::{
crypto::{PublicKey, SecretKey}, crypto::{PublicKey, SecretKey},
dynamic_honey_badger::Input as DhbInput,
}; };
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use peer::{PeerHandler, Peers}; use peer::{PeerHandler, Peers};
use rand::{self, Rand}; use rand::{self, Rand};
use serde::{Deserialize, Serialize};
use std::{ use std::{
collections::HashSet, collections::HashSet,
net::{SocketAddr, ToSocketAddrs}, net::SocketAddr,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Weak, Arc, Weak,
@ -35,7 +30,7 @@ use {
Change, Contribution, InAddr, InternalMessage, InternalTx, OutAddr, Uid, WireMessage, Change, Contribution, InAddr, InternalMessage, InternalTx, OutAddr, Uid, WireMessage,
WireMessageKind, WireMessages, BatchRx, EpochTx, EpochRx, WireMessageKind, WireMessages, BatchRx, EpochTx, EpochRx,
}; };
use super::{Error, Handler, StateMachine, State, StateDsct}; use super::{Error, Handler, StateMachine, StateDsct};
// The number of random transactions to generate per interval. // The number of random transactions to generate per interval.
const DEFAULT_TXN_GEN_COUNT: usize = 5; const DEFAULT_TXN_GEN_COUNT: usize = 5;
@ -128,10 +123,6 @@ pub struct Hydrabadger<T: Contribution> {
impl<T: Contribution> Hydrabadger<T> { impl<T: Contribution> Hydrabadger<T> {
/// Returns a new Hydrabadger node. /// Returns a new Hydrabadger node.
pub fn new(addr: SocketAddr, cfg: Config) -> Self { pub fn new(addr: SocketAddr, cfg: Config) -> Self {
use chrono::Local;
use env_logger;
use std::env;
let uid = Uid::new(); let uid = Uid::new();
let secret_key = SecretKey::rand(&mut rand::thread_rng()); let secret_key = SecretKey::rand(&mut rand::thread_rng());

324
src/hydrabadger/key_gen.rs Normal file
View File

@ -0,0 +1,324 @@
use hydrabadger::hydrabadger::Hydrabadger;
use crossbeam::queue::SegQueue;
use hbbft::{
crypto::{PublicKey, SecretKey},
sync_key_gen::{Ack, AckOutcome, Part, PartOutcome, SyncKeyGen},
};
use peer::Peers;
use std::{collections::BTreeMap};
use rand;
use super::{Config, Error};
use {Contribution, NetworkState, Uid, WireMessage};
/// Key generation state.
#[derive(Debug)]
pub(super) enum State {
AwaitingPeers {
required_peers: Vec<Uid>,
available_peers: Vec<Uid>,
},
Generating {
sync_key_gen: Option<SyncKeyGen<Uid>>,
public_key: Option<PublicKey>,
public_keys: BTreeMap<Uid, PublicKey>,
part_count: usize,
ack_count: usize,
},
Complete {
sync_key_gen: Option<SyncKeyGen<Uid>>,
public_key: Option<PublicKey>,
},
}
fn handle_ack(
uid: &Uid,
ack: Ack,
ack_count: &mut usize,
sync_key_gen: &mut SyncKeyGen<Uid>,
) {
info!("KEY GENERATION: Handling ack from '{}'...", uid);
let ack_outcome = sync_key_gen.handle_ack(uid, ack.clone()).expect("Failed to handle Ack.");
match ack_outcome {
AckOutcome::Invalid(fault) => error!("Error handling ack: '{:?}':\n{:?}", ack, fault),
AckOutcome::Valid => *ack_count += 1,
}
}
fn handle_queued_acks<T: Contribution>(
ack_queue: &SegQueue<(Uid, Ack)>,
part_count: usize,
ack_count: &mut usize,
sync_key_gen: &mut SyncKeyGen<Uid>,
hdb: &Hydrabadger<T>,
) {
if part_count == hdb.config().keygen_peer_count + 1 {
info!("KEY GENERATION: Handling queued acks...");
debug!(" Peers complete: {}", sync_key_gen.count_complete());
debug!(" Part count: {}", part_count);
debug!(" Ack count: {}", ack_count);
while let Some((uid, ack)) = ack_queue.try_pop() {
handle_ack(&uid, ack, ack_count, sync_key_gen);
}
}
}
/// Manages the key generation state.
#[derive(Debug)]
pub struct KeyGenMachine {
state: State,
ack_queue: SegQueue<(Uid, Ack)>,
}
impl KeyGenMachine {
/// Creates and returns a new `KeyGenMachine` in the `AwaitingPeers`
/// state.
pub fn awaiting_peers(ack_queue: SegQueue<(Uid, Ack)>)
-> KeyGenMachine
{
KeyGenMachine {
state: State::AwaitingPeers {
required_peers: Vec::new(),
available_peers: Vec::new(),
},
ack_queue: ack_queue,
}
}
/// Sets the state to `AwaitingMorePeersForKeyGeneration`.
pub(super) fn set_generating_keys<T: Contribution>(
&mut self,
local_uid: &Uid,
local_sk: SecretKey,
peers: &Peers<T>,
config: &Config,
) -> Result<(Part, Ack), Error> {
let (part, ack);
self.state = match self.state {
State::AwaitingPeers {
..
} => {
let threshold = config.keygen_peer_count / 3;
let mut public_keys: BTreeMap<Uid, PublicKey> = peers
.validators()
.map(|p| p.pub_info().map(|(uid, _, pk)| (*uid, *pk)).unwrap())
.collect();
let pk = local_sk.public_key();
public_keys.insert(*local_uid, pk);
let mut rng = rand::OsRng::new().expect("Creating OS Rng has failed");
let (mut sync_key_gen, opt_part) =
SyncKeyGen::new(&mut rng, *local_uid, local_sk, public_keys.clone(), threshold)
.map_err(Error::SyncKeyGenNew)?;
part = opt_part.expect("This node is not a validator (somehow)!");
info!("KEY GENERATION: Handling our own `Part`...");
ack = match sync_key_gen.handle_part(&mut rng, &local_uid, part.clone())
.expect("Handling our own Part has failed") {
PartOutcome::Valid(Some(ack)) => ack,
PartOutcome::Invalid(faults) => panic!(
"Invalid part \
(FIXME: handle): {:?}",
faults
),
PartOutcome::Valid(None) => panic!("No Ack produced when handling Part."),
};
info!("KEY GENERATION: Queueing our own `Ack`...");
self.ack_queue.push((*local_uid, ack.clone()));
State::Generating {
sync_key_gen: Some(sync_key_gen),
public_key: Some(pk),
public_keys,
part_count: 1,
ack_count: 0,
}
}
_ => panic!(
"State::set_generating_keys: \
Must be State::AwaitingMorePeersForKeyGeneration"
),
};
Ok((part, ack))
}
pub(super) fn handle_new_established_peer<T: Contribution>(
&mut self,
peers: &Peers<T>,
hdb: &Hydrabadger<T>,
net_state: NetworkState,
) -> Result<(), Error> {
match self.state {
State::AwaitingPeers { .. } => {
if peers.count_validators() >= hdb.config().keygen_peer_count {
info!("== BEGINNING KEY GENERATION ==");
let local_uid = *hdb.uid();
let local_in_addr = *hdb.addr();
let local_sk = hdb.secret_key().public_key();
let (part, ack) = self.set_generating_keys(
&local_uid,
hdb.secret_key().clone(),
peers,
hdb.config(),
)?;
info!("KEY GENERATION: Sending initial parts and our own ack.");
peers.wire_to_validators(
WireMessage::hello_from_validator(
local_uid,
local_in_addr,
local_sk,
net_state,
),
);
peers.wire_to_validators(WireMessage::key_gen_part(part));
peers.wire_to_validators(WireMessage::key_gen_part_ack(ack));
}
}
State::Generating { .. } => {
// This *could* be called multiple times when initially
// establishing outgoing connections. Do nothing for now.
warn!("Ignoring new established peer signal while key gen `State::Generating`.");
}
State::Complete { .. } => {
warn!("Ignoring new established peer signal while key gen `State::Complete`.");
}
}
Ok(())
}
pub(super) fn handle_key_gen_part<T: Contribution>(&mut self, src_uid: &Uid, part: Part, hdb: &Hydrabadger<T>) {
match self.state {
State::Generating {
ref mut sync_key_gen,
ref mut part_count,
ref mut ack_count,
..
} => {
// TODO: Move this match block into a function somewhere for re-use:
info!("KEY GENERATION: Handling part from '{}'...", src_uid);
let mut rng = rand::OsRng::new().expect("Creating OS Rng has failed");
let mut skg = sync_key_gen.as_mut().unwrap();
let ack = match skg.handle_part(&mut rng, src_uid, part) {
Ok(PartOutcome::Valid(Some(ack))) => ack,
Ok(PartOutcome::Invalid(faults)) => panic!(
"Invalid part \
(FIXME: handle): {:?}",
faults
),
Ok(PartOutcome::Valid(None)) => {
error!("`DynamicHoneyBadger::handle_part` returned `None`.");
return;
}
Err(err) => {
error!("Error handling Part: {:?}", err);
return;
}
};
*part_count += 1;
info!("KEY GENERATION: Queueing `Ack`.");
self.ack_queue.push((*src_uid, ack.clone()));
let peers = hdb.peers();
info!(
"KEY GENERATION: Part from '{}' acknowledged. Broadcasting ack...",
src_uid
);
peers.wire_to_validators(WireMessage::key_gen_part_ack(ack));
debug!(" Peers complete: {}", skg.count_complete());
debug!(" Part count: {}", part_count);
debug!(" Ack count: {}", ack_count);
handle_queued_acks(&self.ack_queue, *part_count, ack_count, skg, hdb)
}
ref s => panic!(
"::handle_key_gen_part: State must be `GeneratingKeys`. \
State: \n{:?} \n\n[FIXME: Enqueue these parts!]\n\n",
s
),
}
}
pub(super) fn handle_key_gen_ack<T: Contribution>(
&mut self,
src_uid: &Uid,
ack: Ack,
hdb: &Hydrabadger<T>,
) -> Result<bool, Error> {
let mut complete: Option<(SyncKeyGen<Uid>, PublicKey)> = None;
match self.state {
State::Generating {
ref mut sync_key_gen,
ref mut public_key,
ref part_count,
ref mut ack_count,
..
} => {
let node_n = {
let mut skg = sync_key_gen.as_mut().unwrap();
info!("KEY GENERATION: Queueing `Ack`.");
self.ack_queue.push((*src_uid, ack.clone()));
handle_queued_acks(&self.ack_queue, *part_count, ack_count, skg, hdb);
hdb.config().keygen_peer_count + 1
};
if sync_key_gen.as_ref().unwrap().count_complete() == node_n && *ack_count >= node_n * node_n {
let skg = sync_key_gen.take().unwrap();
info!("KEY GENERATION: All acks received and handled.");
debug!(" Peers complete: {}", skg.count_complete());
debug!(" Part count: {}", part_count);
debug!(" Ack count: {}", ack_count);
assert!(skg.is_ready());
complete = public_key.take().map(|pk| (skg, pk))
}
}
_ => panic!("::handle_key_gen_ack: KeyGen state must be `Generating`."),
}
match complete {
Some((sync_key_gen, public_key)) => {
self.state = State::Complete { sync_key_gen: Some(sync_key_gen), public_key: Some(public_key) };
Ok(true)
},
None => Ok(false),
}
}
pub(super) fn state(&self) -> &State {
&self.state
}
pub(super) fn is_awaiting_peers(&self) -> bool {
match self.state {
State::AwaitingPeers { .. } => true,
_ => false,
}
}
pub(super) fn complete(&mut self) -> Option<(SyncKeyGen<Uid>, PublicKey)> {
match self.state {
State::Complete { ref mut sync_key_gen, ref mut public_key } => {
sync_key_gen.take().and_then(|skg| public_key.take().map(|pk| (skg, pk)))
}
_ => None
}
}
}

View File

@ -1,6 +1,7 @@
mod handler; mod handler;
mod hydrabadger; mod hydrabadger;
mod state; mod state;
mod key_gen;
use std; use std;
use bincode; use bincode;

View File

@ -11,12 +11,12 @@ use crossbeam::queue::SegQueue;
use hbbft::{ use hbbft::{
crypto::{PublicKey, SecretKey}, crypto::{PublicKey, SecretKey},
dynamic_honey_badger::{DynamicHoneyBadger, JoinPlan, Error as DhbError}, dynamic_honey_badger::{DynamicHoneyBadger, JoinPlan, Error as DhbError},
sync_key_gen::{Ack, Part, PartOutcome, SyncKeyGen}, sync_key_gen::Ack,
NetworkInfo, NetworkInfo,
}; };
use peer::Peers; use peer::Peers;
use std::{collections::BTreeMap, fmt}; use std::{collections::BTreeMap, fmt};
use rand; use super::key_gen::KeyGenMachine;
use {Contribution, NetworkNodeInfo, NetworkState, Step, Uid, ActiveNetworkInfo}; use {Contribution, NetworkNodeInfo, NetworkState, Step, Uid, ActiveNetworkInfo};
/// A `State` discriminant. /// A `State` discriminant.
@ -24,8 +24,7 @@ use {Contribution, NetworkNodeInfo, NetworkState, Step, Uid, ActiveNetworkInfo};
pub enum StateDsct { pub enum StateDsct {
Disconnected, Disconnected,
DeterminingNetworkState, DeterminingNetworkState,
AwaitingMorePeersForKeyGeneration, KeyGen,
GeneratingKeys,
Observer, Observer,
Validator, Validator,
} }
@ -41,10 +40,9 @@ impl From<StateDsct> for usize {
match dsct { match dsct {
StateDsct::Disconnected => 0, StateDsct::Disconnected => 0,
StateDsct::DeterminingNetworkState => 1, StateDsct::DeterminingNetworkState => 1,
StateDsct::AwaitingMorePeersForKeyGeneration => 2, StateDsct::KeyGen => 4,
StateDsct::GeneratingKeys => 3, StateDsct::Observer => 10,
StateDsct::Observer => 4, StateDsct::Validator => 11,
StateDsct::Validator => 5,
} }
} }
} }
@ -54,23 +52,15 @@ impl From<usize> for StateDsct {
match val { match val {
0 => StateDsct::Disconnected, 0 => StateDsct::Disconnected,
1 => StateDsct::DeterminingNetworkState, 1 => StateDsct::DeterminingNetworkState,
2 => StateDsct::AwaitingMorePeersForKeyGeneration, 4 => StateDsct::KeyGen,
3 => StateDsct::GeneratingKeys, 10 => StateDsct::Observer,
4 => StateDsct::Observer, 11 => StateDsct::Validator,
5 => StateDsct::Validator,
_ => panic!("Invalid state discriminant."), _ => panic!("Invalid state discriminant."),
} }
} }
} }
// pub struct KeyGen {
// ack_queue: Option<SegQueue<(Uid, Ack)>>,
// iom_queue: Option<SegQueue<InputOrMessage<T>>>,
// }
/// The current hydrabadger state. /// The current hydrabadger state.
// //
pub enum State<T: Contribution> { pub enum State<T: Contribution> {
@ -80,21 +70,8 @@ pub enum State<T: Contribution> {
iom_queue: Option<SegQueue<InputOrMessage<T>>>, iom_queue: Option<SegQueue<InputOrMessage<T>>>,
network_state: Option<NetworkState>, network_state: Option<NetworkState>,
}, },
AwaitingMorePeersForKeyGeneration { KeyGen {
// Queued input to HoneyBadger: key_gen: KeyGenMachine,
ack_queue: Option<SegQueue<(Uid, Ack)>>,
iom_queue: Option<SegQueue<InputOrMessage<T>>>,
},
GeneratingKeys {
sync_key_gen: Option<SyncKeyGen<Uid>>,
public_key: Option<PublicKey>,
public_keys: BTreeMap<Uid, PublicKey>,
ack_queue: Option<SegQueue<(Uid, Ack)>>,
part_count: usize,
ack_count: usize,
// Queued input to HoneyBadger:
iom_queue: Option<SegQueue<InputOrMessage<T>>>, iom_queue: Option<SegQueue<InputOrMessage<T>>>,
}, },
Observer { Observer {
@ -111,10 +88,7 @@ impl<T: Contribution> State<T> {
match self { match self {
State::Disconnected { .. } => StateDsct::Disconnected, State::Disconnected { .. } => StateDsct::Disconnected,
State::DeterminingNetworkState { .. } => StateDsct::DeterminingNetworkState, State::DeterminingNetworkState { .. } => StateDsct::DeterminingNetworkState,
State::AwaitingMorePeersForKeyGeneration { .. } => { State::KeyGen { .. } => StateDsct::KeyGen,
StateDsct::AwaitingMorePeersForKeyGeneration
}
State::GeneratingKeys { .. } => StateDsct::GeneratingKeys,
State::Observer { .. } => StateDsct::Observer, State::Observer { .. } => StateDsct::Observer,
State::Validator { .. } => StateDsct::Validator, State::Validator { .. } => StateDsct::Validator,
} }
@ -148,10 +122,10 @@ impl<T: Contribution> StateMachine<T> {
pub(super) fn set_awaiting_more_peers(&mut self) { pub(super) fn set_awaiting_more_peers(&mut self) {
self.state = match self.state { self.state = match self.state {
State::Disconnected {} => { State::Disconnected {} => {
info!("Setting state: `AwaitingMorePeersForKeyGeneration`."); info!("Setting state: `KeyGen`.");
State::AwaitingMorePeersForKeyGeneration { State::KeyGen {
ack_queue: Some(SegQueue::new()), key_gen: KeyGenMachine::awaiting_peers(SegQueue::new()),
iom_queue: Some(SegQueue::new()), iom_queue: Some(SegQueue::new())
} }
} }
State::DeterminingNetworkState { State::DeterminingNetworkState {
@ -163,10 +137,10 @@ impl<T: Contribution> StateMachine<T> {
!network_state.is_some(), !network_state.is_some(),
"State::set_awaiting_more_peers: Network is active!" "State::set_awaiting_more_peers: Network is active!"
); );
info!("Setting state: `AwaitingMorePeersForKeyGeneration`."); info!("Setting state: `KeyGen`.");
State::AwaitingMorePeersForKeyGeneration { State::KeyGen {
ack_queue: ack_queue.take(), key_gen: KeyGenMachine::awaiting_peers(ack_queue.take().unwrap()),
iom_queue: iom_queue.take(), iom_queue: iom_queue.take() ,
} }
} }
ref s => { ref s => {
@ -185,10 +159,10 @@ impl<T: Contribution> StateMachine<T> {
/// `AwaitingMorePeersForKeyGeneration`, otherwise panics. /// `AwaitingMorePeersForKeyGeneration`, otherwise panics.
pub(super) fn set_determining_network_state_active(&mut self, net_info: ActiveNetworkInfo) { pub(super) fn set_determining_network_state_active(&mut self, net_info: ActiveNetworkInfo) {
self.state = match self.state { self.state = match self.state {
State::AwaitingMorePeersForKeyGeneration { ref mut ack_queue, ref mut iom_queue } => { State::KeyGen { ref mut iom_queue, .. } => {
info!("Setting state: `DeterminingNetworkState`."); info!("Setting state: `DeterminingNetworkState`.");
State::DeterminingNetworkState { State::DeterminingNetworkState {
ack_queue: ack_queue.take(), ack_queue: Some(SegQueue::new()),
iom_queue: iom_queue.take(), iom_queue: iom_queue.take(),
network_state: Some(NetworkState::Active(net_info)), network_state: Some(NetworkState::Active(net_info)),
} }
@ -198,71 +172,6 @@ impl<T: Contribution> StateMachine<T> {
self.set_state_discriminant(); self.set_state_discriminant();
} }
/// Sets the state to `AwaitingMorePeersForKeyGeneration`.
pub(super) fn set_generating_keys(
&mut self,
local_uid: &Uid,
local_sk: SecretKey,
peers: &Peers<T>,
config: &Config,
) -> Result<(Part, Ack), Error> {
let (part, ack);
self.state = match self.state {
State::AwaitingMorePeersForKeyGeneration {
ref mut iom_queue,
ref mut ack_queue,
} => {
let threshold = config.keygen_peer_count / 3;
let mut public_keys: BTreeMap<Uid, PublicKey> = peers
.validators()
.map(|p| p.pub_info().map(|(uid, _, pk)| (*uid, *pk)).unwrap())
.collect();
let pk = local_sk.public_key();
public_keys.insert(*local_uid, pk);
let mut rng = rand::OsRng::new().expect("Creating OS Rng has failed");
let (mut sync_key_gen, opt_part) =
SyncKeyGen::new(&mut rng, *local_uid, local_sk, public_keys.clone(), threshold)
.map_err(Error::SyncKeyGenNew)?;
part = opt_part.expect("This node is not a validator (somehow)!");
info!("KEY GENERATION: Handling our own `Part`...");
ack = match sync_key_gen.handle_part(&mut rng, &local_uid, part.clone())
.expect("Handling our own Part has failed") {
PartOutcome::Valid(Some(ack)) => ack,
PartOutcome::Invalid(faults) => panic!(
"Invalid part \
(FIXME: handle): {:?}",
faults
),
PartOutcome::Valid(None) => panic!("No Ack produced when handling Part."),
};
info!("KEY GENERATION: Queueing our own `Ack`...");
ack_queue.as_ref().unwrap().push((*local_uid, ack.clone()));
State::GeneratingKeys {
sync_key_gen: Some(sync_key_gen),
public_key: Some(pk),
public_keys,
ack_queue: ack_queue.take(),
part_count: 1,
ack_count: 0,
iom_queue: iom_queue.take(),
}
}
_ => panic!(
"State::set_generating_keys: \
Must be State::AwaitingMorePeersForKeyGeneration"
),
};
self.set_state_discriminant();
Ok((part, ack))
}
/// Changes the variant (in-place) of this `State` to `Observer`. /// Changes the variant (in-place) of this `State` to `Observer`.
// //
// TODO: Add proper error handling: // TODO: Add proper error handling:
@ -330,14 +239,14 @@ impl<T: Contribution> StateMachine<T> {
) -> Result<SegQueue<InputOrMessage<T>>, Error> { ) -> Result<SegQueue<InputOrMessage<T>>, Error> {
let iom_queue_ret; let iom_queue_ret;
self.state = match self.state { self.state = match self.state {
State::GeneratingKeys { State::KeyGen {
ref mut sync_key_gen, ref mut key_gen,
mut public_key,
ref mut iom_queue, ref mut iom_queue,
.. ..
} => { } => {
let mut sync_key_gen = sync_key_gen.take().unwrap(); let (sync_key_gen, public_key) = key_gen.complete().expect("Key generation incomplete");
assert_eq!(public_key.take().unwrap(), local_sk.public_key()); // let mut sync_key_gen = sync_key_gen.take().unwrap();
assert_eq!(public_key, local_sk.public_key());
let (pk_set, sk_share_opt) = let (pk_set, sk_share_opt) =
sync_key_gen.generate().map_err(Error::SyncKeyGenGenerate)?; sync_key_gen.generate().map_err(Error::SyncKeyGenGenerate)?;
@ -434,15 +343,16 @@ impl<T: Contribution> StateMachine<T> {
assert_eq!(peers.count_total(), 0); assert_eq!(peers.count_total(), 0);
return; return;
} }
State::AwaitingMorePeersForKeyGeneration { .. } => { State::KeyGen { ref key_gen, .. } => {
debug!( if key_gen.is_awaiting_peers() {
"Ignoring peer disconnection when \ debug!(
`State::AwaitingMorePeersForKeyGeneration`." "Ignoring peer disconnection when \
); `State::AwaitingMorePeersForKeyGeneration`."
return; );
} return;
State::GeneratingKeys { .. } => { } else {
panic!("FIXME: RESTART KEY GENERATION PROCESS AFTER PEER DISCONNECTS."); panic!("FIXME: RESTART KEY GENERATION PROCESS AFTER PEER DISCONNECTS.");
}
} }
State::Observer { .. } => { State::Observer { .. } => {
debug!("Ignoring peer disconnection when `State::Observer`."); debug!("Ignoring peer disconnection when `State::Observer`.");
@ -458,6 +368,8 @@ impl<T: Contribution> StateMachine<T> {
/// Returns the network state, if possible. /// Returns the network state, if possible.
pub(super) fn network_state(&self, peers: &Peers<T>) -> NetworkState { pub(super) fn network_state(&self, peers: &Peers<T>) -> NetworkState {
use super::key_gen::State as KeyGenState;
let peer_infos = peers let peer_infos = peers
.peers() .peers()
.filter_map(|peer| { .filter_map(|peer| {
@ -466,12 +378,17 @@ impl<T: Contribution> StateMachine<T> {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
match self.state { match self.state {
State::AwaitingMorePeersForKeyGeneration { .. } => { State::KeyGen { ref key_gen, .. } => {
NetworkState::AwaitingMorePeersForKeyGeneration(peer_infos) match key_gen.state() {
KeyGenState::AwaitingPeers { .. } => {
NetworkState::AwaitingMorePeersForKeyGeneration(peer_infos)
},
KeyGenState::Generating { ref public_keys, .. } => {
NetworkState::GeneratingKeys(peer_infos, public_keys.clone())
},
_ => NetworkState::Unknown(peer_infos),
}
} }
State::GeneratingKeys {
ref public_keys, ..
} => NetworkState::GeneratingKeys(peer_infos, public_keys.clone()),
State::Observer { ref dhb } | State::Validator { ref dhb } => { State::Observer { ref dhb } | State::Validator { ref dhb } => {
// FIXME: Ensure that `peer_info` matches `NetworkInfo` from HB. // FIXME: Ensure that `peer_info` matches `NetworkInfo` from HB.
let pk_set = dhb let pk_set = dhb
@ -510,6 +427,22 @@ impl<T: Contribution> StateMachine<T> {
} }
} }
/// Returns a reference to the key generation instance.
pub(super) fn key_gen(&self) -> Option<&KeyGenMachine> {
match self.state {
State::KeyGen { ref key_gen, .. } => Some(key_gen),
_ => None,
}
}
/// Returns a reference to the key generation instance.
pub(super) fn key_gen_mut(&mut self) -> Option<&mut KeyGenMachine> {
match self.state {
State::KeyGen { ref mut key_gen, .. } => Some(key_gen),
_ => None,
}
}
/// Presents a message, vote or contribution to HoneyBadger or queues it for later. /// Presents a message, vote or contribution to HoneyBadger or queues it for later.
/// ///
/// Cannot be called while disconnected or connection-pending. /// Cannot be called while disconnected or connection-pending.
@ -539,8 +472,7 @@ impl<T: Contribution> StateMachine<T> {
return step_opt; return step_opt;
} }
State::AwaitingMorePeersForKeyGeneration { ref iom_queue, .. } | State::KeyGen { ref iom_queue, .. }
| State::GeneratingKeys { ref iom_queue, .. }
| State::DeterminingNetworkState { ref iom_queue, .. } => { | State::DeterminingNetworkState { ref iom_queue, .. } => {
trace!("State::handle_iom: Queueing: {:?}", iom); trace!("State::handle_iom: Queueing: {:?}", iom);
iom_queue.as_ref().unwrap().push(iom); iom_queue.as_ref().unwrap().push(iom);