Add voting method, some cleanups. (#13)

* Remove DistAlgorithm usage.

* Deduplicate DHB handling code.

* Add Hydrabadger::vote_for.

* Add DHB errors back into Error; they are Sync now.
This commit is contained in:
Andreas Fackler 2018-11-13 15:22:55 +01:00 committed by Nick Sanders
parent db28cadbfa
commit 51d526c51c
6 changed files with 76 additions and 122 deletions

View File

@ -13,8 +13,7 @@ use crossbeam::queue::SegQueue;
use hbbft::{
crypto::{PublicKey, PublicKeySet},
dynamic_honey_badger::{ChangeState, JoinPlan, Message as DhbMessage, Change as DhbChange,
Input as DhbInput, NodeChange},
// queueing_honey_badger::{Change as QhbChange, Input as QhbInput},
NodeChange},
sync_key_gen::{Ack, AckOutcome, Part, PartOutcome, SyncKeyGen},
Target, Epoched,
};
@ -23,7 +22,7 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use tokio::{self, prelude::*};
use {
Contribution, InAddr, Input, InternalMessage, InternalMessageKind, InternalRx, Message,
Change, Contribution, InAddr, InternalMessage, InternalMessageKind, InternalRx, Message,
NetworkNodeInfo, NetworkState, OutAddr, Step, Uid, WireMessage, WireMessageKind, BatchTx,
};
use rand;
@ -154,31 +153,10 @@ impl<T: Contribution> Handler<T> {
Ok(())
}
fn handle_input(&self, input: Input<T>, state: &mut State<T>) -> Result<(), Error> {
trace!("hydrabadger::Handler: About to input....");
if let Some(step_res) = state.input(input) {
let step = step_res.map_err(|err| {
error!("Honey Badger input error: {:?}", err);
Error::HbStepError
})?;
trace!("hydrabadger::Handler: Input step result added to queue....");
self.step_queue.push(step);
}
Ok(())
}
fn handle_message(
&self,
msg: Message,
src_uid: &Uid,
state: &mut State<T>,
) -> Result<(), Error> {
trace!("hydrabadger::Handler: About to handle_message: {:?}", msg);
if let Some(step_res) = state.handle_message(src_uid, msg) {
let step = step_res.map_err(|err| {
error!("Honey Badger handle_message error: {:?}", err);
Error::HbStepError
})?;
fn handle_iom(&self, iom: InputOrMessage<T>, state: &mut State<T>) -> Result<(), Error> {
trace!("hydrabadger::Handler: About to handle_iom: {:?}", iom);
if let Some(step_res) = state.handle_iom(iom) {
let step = step_res.map_err(Error::HbStep)?;
trace!("hydrabadger::Handler: Message step result added to queue....");
self.step_queue.push(step);
}
@ -416,14 +394,7 @@ impl<T: Contribution> Handler<T> {
// Handle previously queued input and messages:
if let Some(iom_queue) = iom_queue_opt {
while let Some(iom) = iom_queue.try_pop() {
match iom {
InputOrMessage::Input(input) => {
self.handle_input(input, state)?;
}
InputOrMessage::Message(uid, msg) => {
self.handle_message(msg, &uid, state)?;
}
}
self.handle_iom(iom, state)?;
}
}
Ok(())
@ -604,12 +575,16 @@ impl<T: Contribution> Handler<T> {
self.hdb.set_state_discriminant(state.discriminant());
}
InternalMessageKind::HbInput(input) => {
self.handle_input(input, state)?;
InternalMessageKind::HbContribution(contrib) => {
self.handle_iom(InputOrMessage::Contribution(contrib), state)?;
}
InternalMessageKind::HbChange(change) => {
self.handle_iom(InputOrMessage::Change(change), state)?;
}
InternalMessageKind::HbMessage(msg) => {
self.handle_message(msg, src_uid.as_ref().unwrap(), state)?;
self.handle_iom(InputOrMessage::Message(src_uid.unwrap(), msg), state)?;
}
InternalMessageKind::PeerDisconnect => {

View File

@ -33,8 +33,8 @@ use tokio::{
timer::{Interval, Delay},
};
use {
Contribution, InAddr, InternalMessage, InternalTx, OutAddr, Uid, WireMessage, WireMessageKind,
WireMessages, BatchRx, EpochTx, EpochRx,
Change, Contribution, InAddr, InternalMessage, InternalTx, OutAddr, Uid, WireMessage,
WireMessageKind, WireMessages, BatchRx, EpochTx, EpochRx,
};
// The number of random transactions to generate per interval.
@ -286,10 +286,10 @@ impl<T: Contribution> Hydrabadger<T> {
/// Handles a incoming batch of user transactions.
pub fn propose_user_contribution(&self, txn: T) -> Result<(), Error> {
if self.is_validator() {
self.send_internal(InternalMessage::hb_input(
self.send_internal(InternalMessage::hb_contribution(
self.inner.uid,
OutAddr(*self.inner.addr),
DhbInput::User(txn),
txn,
));
Ok(())
} else {
@ -297,6 +297,20 @@ impl<T: Contribution> Hydrabadger<T> {
}
}
/// Casts a vote for a change in the validator set or configuration.
pub fn vote_for(&self, change: Change) -> Result<(), Error> {
if self.is_validator() {
self.send_internal(InternalMessage::hb_vote(
self.inner.uid,
OutAddr(*self.inner.addr),
change,
));
Ok(())
} else {
Err(Error::VoteForNotValidator)
}
}
/// Returns a future that handles incoming connections on `socket`.
fn handle_incoming(self, socket: TcpStream) -> impl Future<Item = (), Error = ()> {
info!("Incoming connection from '{}'", socket.peer_addr().unwrap());
@ -419,10 +433,10 @@ impl<T: Contribution> Hydrabadger<T> {
self.inner.config.txn_gen_bytes,
);
hdb.send_internal(InternalMessage::hb_input(
hdb.send_internal(InternalMessage::hb_contribution(
hdb.inner.uid,
OutAddr(*hdb.inner.addr),
DhbInput::User(txns),
txns,
));
}
Ok(())

View File

@ -7,11 +7,10 @@ use self::state::State;
use bincode;
use hbbft::{
dynamic_honey_badger::Error as DhbError,
// queueing_honey_badger::Error as QhbError,
sync_key_gen::Error as SyncKeyGenError,
};
use std;
use {Input, Message, Uid};
use {Change, Message, Uid};
pub use self::hydrabadger::{Config, Hydrabadger, HydrabadgerWeak};
pub use self::state::StateDsct;
@ -22,7 +21,8 @@ pub const WIRE_MESSAGE_RETRY_MAX: usize = 10;
/// A HoneyBadger input or message.
#[derive(Clone, Debug)]
pub enum InputOrMessage<T> {
Input(Input<T>),
Change(Change),
Contribution(T),
Message(Uid, Message),
}
@ -34,24 +34,18 @@ pub enum Error {
Serde(bincode::Error),
#[fail(display = "Error polling hydrabadger internal receiver")]
HydrabadgerHandlerPoll,
// FIXME: Make honeybadger error thread safe.
#[fail(display = "QueuingHoneyBadger propose error")]
QhbPart,
/// TEMPORARY UNTIL WE FIX HB ERROR TYPES:
#[fail(display = "DynamicHoneyBadger error")]
Dhb(()),
/// TEMPORARY UNTIL WE FIX HB ERROR TYPES:
#[fail(display = "QueuingHoneyBadger error [FIXME]")]
Qhb(()),
/// TEMPORARY UNTIL WE FIX HB ERROR TYPES:
#[fail(display = "QueuingHoneyBadger step error")]
HbStepError,
Dhb(DhbError),
#[fail(display = "DynamicHoneyBadger step error")]
HbStep(DhbError),
#[fail(display = "Error creating SyncKeyGen: {}", _0)]
SyncKeyGenNew(SyncKeyGenError),
#[fail(display = "Error generating keys: {}", _0)]
SyncKeyGenGenerate(SyncKeyGenError),
#[fail(display = "Unable to push user transactions, this node is not a validator")]
ProposeUserContributionNotValidator,
#[fail(display = "Unable to vote for a change, this node is not a validator")]
VoteForNotValidator,
#[fail(display = "Unable to transmit epoch status to listener, listener receiver dropped")]
InstantiateHbListenerDropped,
}
@ -63,7 +57,7 @@ impl From<std::io::Error> for Error {
}
impl From<DhbError> for Error {
fn from(_err: DhbError) -> Error {
Error::Dhb(())
fn from(err: DhbError) -> Error {
Error::Dhb(err)
}
}

View File

@ -11,12 +11,12 @@ use hbbft::{
crypto::{PublicKey, SecretKey},
dynamic_honey_badger::{DynamicHoneyBadger, JoinPlan, Error as DhbError},
sync_key_gen::{Ack, Part, PartOutcome, SyncKeyGen},
DistAlgorithm, NetworkInfo,
NetworkInfo,
};
use peer::Peers;
use std::{collections::BTreeMap, fmt};
use rand;
use {Contribution, Input, Message, NetworkNodeInfo, NetworkState, Step, Uid};
use {Contribution, NetworkNodeInfo, NetworkState, Step, Uid};
/// A `State` discriminant.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@ -463,62 +463,31 @@ impl<T: Contribution> State<T> {
}
}
/// Presents input to HoneyBadger or queues it for later.
/// Presents a message, vote or contribution to HoneyBadger or queues it for later.
///
/// Cannot be called while disconnected or connection-pending.
pub(super) fn input(&mut self, input: Input<T>) -> Option<Result<Step<T>, DhbError>> {
match self {
State::Observer { ref mut dhb, .. } | State::Validator { ref mut dhb, .. } => {
trace!("State::input: Inputting: {:?}", input);
let step_opt = Some(dhb.as_mut().unwrap().handle_input(input));
match step_opt {
Some(ref step) => match step {
Ok(s) => trace!("State::input: QHB output: {:?}", s.output),
Err(err) => error!("State::input: QHB output error: {:?}", err),
},
None => trace!("State::input: QHB Output is `None`"),
}
return step_opt;
}
State::AwaitingMorePeersForKeyGeneration { ref iom_queue, .. }
| State::GeneratingKeys { ref iom_queue, .. }
| State::DeterminingNetworkState { ref iom_queue, .. } => {
trace!("State::input: Queueing input: {:?}", input);
iom_queue
.as_ref()
.unwrap()
.push(InputOrMessage::Input(input));
}
s => panic!(
"State::handle_message: Must be connected in order to input to \
honey badger. State: {}",
s.discriminant()
),
}
None
}
/// Presents a message to HoneyBadger or queues it for later.
///
/// Cannot be called while disconnected or connection-pending.
pub(super) fn handle_message(
pub(super) fn handle_iom(
&mut self,
src_uid: &Uid,
msg: Message,
iom: InputOrMessage<T>,
) -> Option<Result<Step<T>, DhbError>> {
match self {
State::Observer { ref mut dhb, .. } | State::Validator { ref mut dhb, .. } => {
trace!("State::handle_message: Handling message: {:?}", msg);
let step_opt = Some(dhb.as_mut().unwrap().handle_message(src_uid, msg));
trace!("State::handle_iom: Handling: {:?}", iom);
let step_opt = Some({
let dhb = dhb.as_mut().unwrap();
match iom {
InputOrMessage::Contribution(contrib) => dhb.propose(contrib),
InputOrMessage::Change(change) => dhb.vote_for(change),
InputOrMessage::Message(src_uid, msg) => dhb.handle_message(&src_uid, msg),
}
});
match step_opt {
Some(ref step) => match step {
Ok(s) => trace!("State::handle_message: QHB output: {:?}", s.output),
Err(err) => error!("State::handle_message: QHB output error: {:?}", err),
Ok(s) => trace!("State::handle_iom: DHB output: {:?}", s.output),
Err(err) => error!("State::handle_iom: DHB output error: {:?}", err),
},
None => trace!("State::handle_message: QHB Output is `None`"),
None => trace!("State::handle_iom: DHB Output is `None`"),
}
return step_opt;
@ -526,14 +495,11 @@ impl<T: Contribution> State<T> {
State::AwaitingMorePeersForKeyGeneration { ref iom_queue, .. }
| State::GeneratingKeys { ref iom_queue, .. }
| State::DeterminingNetworkState { ref iom_queue, .. } => {
trace!("State::handle_message: Queueing message: {:?}", msg);
iom_queue
.as_ref()
.unwrap()
.push(InputOrMessage::Message(*src_uid, msg));
trace!("State::handle_iom: Queueing: {:?}", iom);
iom_queue.as_ref().unwrap().push(iom);
}
s => panic!(
"State::handle_message: Must be connected in order to input to \
"State::handle_iom: Must be connected in order to input to \
honey badger. State: {}",
s.discriminant()
),

View File

@ -65,7 +65,7 @@ use tokio::{io, net::TcpStream, prelude::*, codec::{Framed, LengthDelimitedCodec
use uuid::Uuid;
use hbbft::{
crypto::{PublicKey, PublicKeySet},
dynamic_honey_badger::{JoinPlan, Message as DhbMessage, DynamicHoneyBadger, Input as DhbInput},
dynamic_honey_badger::{JoinPlan, Message as DhbMessage, DynamicHoneyBadger, Change as DhbChange},
sync_key_gen::{Ack, Part},
DaStep as MessagingStep,
Contribution as HbbftContribution,
@ -151,7 +151,7 @@ impl fmt::Debug for Uid {
type Message = DhbMessage<Uid>;
type Step<T> = MessagingStep<DynamicHoneyBadger<T, Uid>>;
type Input<T> = DhbInput<T, Uid>;
type Change = DhbChange<Uid>;
/// A peer's incoming (listening) address.
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
@ -388,7 +388,8 @@ impl<T: Contribution> Sink for WireMessages<T> {
pub enum InternalMessageKind<T: Contribution> {
Wire(WireMessage<T>),
HbMessage(Message),
HbInput(Input<T>),
HbContribution(T),
HbChange(Change),
PeerDisconnect,
NewIncomingConnection(InAddr, PublicKey, bool),
NewOutgoingConnection,
@ -432,8 +433,12 @@ impl<T: Contribution> InternalMessage<T> {
InternalMessage::new(Some(src_uid), src_addr, InternalMessageKind::HbMessage(msg))
}
pub fn hb_input(src_uid: Uid, src_addr: OutAddr, input: Input<T>) -> InternalMessage<T> {
InternalMessage::new(Some(src_uid), src_addr, InternalMessageKind::HbInput(input))
pub fn hb_contribution(src_uid: Uid, src_addr: OutAddr, contrib: T) -> InternalMessage<T> {
InternalMessage::new(Some(src_uid), src_addr, InternalMessageKind::HbContribution(contrib))
}
pub fn hb_vote(src_uid: Uid, src_addr: OutAddr, change: Change) -> InternalMessage<T> {
InternalMessage::new(Some(src_uid), src_addr, InternalMessageKind::HbChange(change))
}
pub fn peer_disconnect(src_uid: Uid, src_addr: OutAddr) -> InternalMessage<T> {

View File

@ -143,10 +143,10 @@ impl<T: Contribution> Future for PeerHandler<T> {
debug_assert_eq!(src_uid, *peer_uid);
}
self.hdb.send_internal(InternalMessage::hb_input(
self.hdb.send_internal(InternalMessage::hb_contribution(
src_uid,
self.out_addr,
HbInput::User(txn),
txn,
))
}
kind => self.hdb.send_internal(InternalMessage::wire(