Allow peer network restart, wrap `State` in `StateMachine`.

This commit is contained in:
c0gent 2018-11-15 15:21:33 -08:00
parent e4670cb951
commit a6279c6952
No known key found for this signature in database
GPG Key ID: 9CC25E71A743E892
5 changed files with 118 additions and 67 deletions

View File

@ -8,7 +8,7 @@
unreachable_code)]
use super::WIRE_MESSAGE_RETRY_MAX;
use super::{Error, Hydrabadger, InputOrMessage, State, StateDsct};
use super::{Error, Hydrabadger, InputOrMessage, StateMachine, State, StateDsct};
use crossbeam::queue::SegQueue;
use hbbft::{
crypto::{PublicKey, PublicKeySet},
@ -90,15 +90,15 @@ impl<T: Contribution> Handler<T> {
_src_addr: OutAddr,
src_pk: PublicKey,
request_change_add: bool,
state: &mut State<T>,
state: &mut StateMachine<T>,
peers: &Peers<T>,
) -> Result<(), Error> {
match state.discriminant() {
StateDsct::Disconnected | StateDsct::DeterminingNetworkState => {
match state.state {
State::Disconnected { .. } | State::DeterminingNetworkState { .. } => {
state.update_peer_connection_added(&peers);
self.hdb.set_state_discriminant(state.discriminant());
}
StateDsct::AwaitingMorePeersForKeyGeneration => {
State::AwaitingMorePeersForKeyGeneration { .. } => {
if peers.count_validators() >= self.hdb.config().keygen_peer_count {
info!("== BEGINNING KEY GENERATION ==");
@ -130,7 +130,7 @@ impl<T: Contribution> Handler<T> {
self.wire_to_validators(WireMessage::key_gen_part_ack(ack), peers);
}
}
StateDsct::GeneratingKeys { .. } => {
State::GeneratingKeys { .. } => {
// This *could* be called multiple times when initially
// establishing outgoing connections. Do nothing for now.
warn!(
@ -138,7 +138,7 @@ impl<T: Contribution> Handler<T> {
peer signal while `StateDsct::GeneratingKeys`."
);
}
StateDsct::Observer | StateDsct::Validator => {
State::Observer { .. } | State::Validator { .. } => {
// If the new peer sends a request-change-add (to be a
// validator), input the change into HB and broadcast, etc.
if request_change_add {
@ -153,7 +153,7 @@ impl<T: Contribution> Handler<T> {
Ok(())
}
fn handle_iom(&self, iom: InputOrMessage<T>, state: &mut State<T>) -> Result<(), Error> {
fn handle_iom(&self, iom: InputOrMessage<T>, state: &mut StateMachine<T>) -> Result<(), Error> {
trace!("hydrabadger::Handler: About to handle_iom: {:?}", iom);
if let Some(step_res) = state.handle_iom(iom) {
let step = step_res.map_err(Error::HbStep)?;
@ -198,8 +198,8 @@ impl<T: Contribution> Handler<T> {
}
}
fn handle_key_gen_part(&self, src_uid: &Uid, part: Part, state: &mut State<T>) {
match state {
fn handle_key_gen_part(&self, src_uid: &Uid, part: Part, state: &mut StateMachine<T>) {
match state.state {
State::GeneratingKeys {
ref mut sync_key_gen,
ref ack_queue,
@ -246,11 +246,13 @@ impl<T: Contribution> Handler<T> {
debug!(" Part count: {}", part_count);
debug!(" Ack count: {}", ack_count);
}
State::DeterminingNetworkState { network_state, .. } => match network_state.is_some() {
true => unimplemented!(),
false => unimplemented!(),
State::DeterminingNetworkState { ref network_state, .. } => {
match network_state.is_some() {
true => unimplemented!(),
false => unimplemented!(),
}
},
s => panic!(
ref s => panic!(
"::handle_key_gen_part: State must be `GeneratingKeys`. \
State: \n{:?} \n\n[FIXME: Enqueue these parts!]\n\n",
s.discriminant()
@ -262,11 +264,11 @@ impl<T: Contribution> Handler<T> {
&self,
src_uid: &Uid,
ack: Ack,
state: &mut State<T>,
state: &mut StateMachine<T>,
peers: &Peers<T>,
) -> Result<(), Error> {
let mut keygen_is_complete = false;
match state {
match state.state {
State::GeneratingKeys {
ref mut sync_key_gen,
ref ack_queue,
@ -312,7 +314,7 @@ impl<T: Contribution> Handler<T> {
fn handle_join_plan(
&self,
jp: JoinPlan<Uid>,
state: &mut State<T>,
state: &mut StateMachine<T>,
peers: &Peers<T>,
) -> Result<(), Error> {
debug!("Join plan: \n{:?}", jp);
@ -342,7 +344,7 @@ impl<T: Contribution> Handler<T> {
fn instantiate_hb(
&self,
jp_opt: Option<JoinPlan<Uid>>,
state: &mut State<T>,
state: &mut StateMachine<T>,
peers: &Peers<T>,
) -> Result<(), Error> {
let mut iom_queue_opt = None;
@ -402,10 +404,24 @@ impl<T: Contribution> Handler<T> {
Ok(())
}
/// Resets all connections with peers.
///
/// Used when state gets out of sync such as when key generation completed
/// without including this node.
fn reset_peer_connections(
&self,
state: &mut StateMachine<T>,
peers: &Peers<T>,
) -> Result<(), Error> {
self.wire_to_validators(WireMessage::hello_request_change_add(*self.hdb.uid(),
*self.hdb.addr(), self.hdb.secret_key().public_key()), peers);
Ok(())
}
fn handle_net_state(
&self,
net_state: NetworkState,
state: &mut State<T>,
state: &mut StateMachine<T>,
peers: &Peers<T>,
) -> Result<(), Error> {
let peer_infos;
@ -425,15 +441,19 @@ impl<T: Contribution> Handler<T> {
}
NetworkState::Active(net_info) => {
peer_infos = net_info.0.clone();
match state {
State::DeterminingNetworkState {
ref mut network_state,
..
} => {
match state.state {
State::DeterminingNetworkState { ref mut network_state, .. } => {
*network_state = Some(NetworkState::Active(net_info));
}
State::AwaitingMorePeersForKeyGeneration { .. } => {
// Key generation has completed and we were not a part
// of it. Need to restart as a freshly connecting node.
state.set_determining_network_state_active(net_info);
self.hdb.set_state_discriminant(state.discriminant());
self.reset_peer_connections(state, peers)?;
}
State::Disconnected { .. }
| State::AwaitingMorePeersForKeyGeneration { .. }
| State::GeneratingKeys { .. } => {
panic!(
"Handler::net_state: Received `NetworkState::Active` while `{}`.",
@ -442,7 +462,6 @@ impl<T: Contribution> Handler<T> {
}
_ => {}
}
}
NetworkState::None => panic!("`NetworkState::None` received."),
}
@ -470,7 +489,7 @@ impl<T: Contribution> Handler<T> {
fn handle_peer_disconnect(
&self,
src_uid: Uid,
state: &mut State<T>,
state: &mut StateMachine<T>,
peers: &Peers<T>,
) -> Result<(), Error> {
state.update_peer_connection_dropped(peers);
@ -478,7 +497,7 @@ impl<T: Contribution> Handler<T> {
// TODO: Send a node removal (Change-Remove) vote?
match state {
match state.state {
State::Disconnected { .. } => {
panic!("Received `WireMessageKind::PeerDisconnect` while disconnected.");
}
@ -509,7 +528,7 @@ impl<T: Contribution> Handler<T> {
fn handle_internal_message(
&self,
i_msg: InternalMessage<T>,
state: &mut State<T>,
state: &mut StateMachine<T>,
) -> Result<(), Error> {
let (src_uid, src_out_addr, w_msg) = i_msg.into_parts();
@ -524,7 +543,7 @@ impl<T: Contribution> Handler<T> {
let net_state;
match state {
match state.state {
State::Disconnected {} => {
state.set_awaiting_more_peers();
self.hdb.set_state_discriminant(state.discriminant());
@ -539,9 +558,6 @@ impl<T: Contribution> Handler<T> {
_ => net_state = state.network_state(&peers),
}
// // Get the current `NetworkState`:
// let net_state = state.network_state(&peers);
// Send response to remote peer:
peers
.get(&src_out_addr)
@ -622,7 +638,7 @@ impl<T: Contribution> Handler<T> {
self.handle_net_state(net_state, state, &peers)?;
}
// New outgoing connection:
// New outgoing connection response:
WireMessageKind::WelcomeReceivedChangeAdd(src_uid_new, src_pk, net_state) => {
debug!("Received NetworkState: \n{:?}", net_state);
assert!(src_uid_new == src_uid.unwrap());

View File

@ -4,7 +4,6 @@
#![allow(unused_imports, dead_code, unused_variables, unused_mut, unused_assignments,
unreachable_code)]
use super::{Error, Handler, State, StateDsct};
use futures::{
future::{self, Either},
sync::mpsc,
@ -36,6 +35,7 @@ use {
Change, Contribution, InAddr, InternalMessage, InternalTx, OutAddr, Uid, WireMessage,
WireMessageKind, WireMessages, BatchRx, EpochTx, EpochRx,
};
use super::{Error, Handler, StateMachine, State, StateDsct};
// The number of random transactions to generate per interval.
const DEFAULT_TXN_GEN_COUNT: usize = 5;
@ -97,10 +97,8 @@ struct Inner<T: Contribution> {
peers: RwLock<Peers<T>>,
/// The current state containing HB when connected.
state: RwLock<State<T>>,
// TODO: Move this into a new state struct.
state_dsct: AtomicUsize,
state: RwLock<StateMachine<T>>,
state_dsct: Arc<AtomicUsize>,
// TODO: Use a bounded tx/rx (find a sensible upper bound):
peer_internal_tx: InternalTx<T>,
@ -150,13 +148,16 @@ impl<T: Contribution> Hydrabadger<T> {
let current_epoch = cfg.start_epoch;
let state = StateMachine::disconnected();
let state_dsct = state.dsct.clone();
let inner = Arc::new(Inner {
uid,
addr: InAddr(addr),
secret_key,
peers: RwLock::new(Peers::new()),
state: RwLock::new(State::disconnected()),
state_dsct: AtomicUsize::new(0),
state: RwLock::new(state),
state_dsct,
peer_internal_tx,
config: cfg,
current_epoch: Mutex::new(current_epoch),
@ -190,12 +191,12 @@ impl<T: Contribution> Hydrabadger<T> {
}
/// Returns a reference to the inner state.
pub fn state(&self) -> RwLockReadGuard<State<T>> {
pub fn state(&self) -> RwLockReadGuard<StateMachine<T>> {
self.inner.state.read()
}
/// Returns a mutable reference to the inner state.
pub(crate) fn state_mut(&self) -> RwLockWriteGuard<State<T>> {
pub(crate) fn state_mut(&self) -> RwLockWriteGuard<StateMachine<T>> {
self.inner.state.write()
}

View File

@ -2,15 +2,15 @@ mod handler;
mod hydrabadger;
mod state;
use self::handler::Handler;
use self::state::State;
use std;
use bincode;
use hbbft::{
dynamic_honey_badger::Error as DhbError,
sync_key_gen::Error as SyncKeyGenError,
};
use std;
use {Change, Message, Uid};
use self::handler::Handler;
use self::state::{State, StateMachine};
pub use self::hydrabadger::{Config, Hydrabadger, HydrabadgerWeak};
pub use self::state::StateDsct;

View File

@ -5,6 +5,7 @@
#![allow(dead_code)]
use std::sync::{Arc, atomic::AtomicUsize};
use super::{Config, Error, InputOrMessage};
use crossbeam::queue::SegQueue;
use hbbft::{
@ -16,7 +17,7 @@ use hbbft::{
use peer::Peers;
use std::{collections::BTreeMap, fmt};
use rand;
use {Contribution, NetworkNodeInfo, NetworkState, Step, Uid};
use {Contribution, NetworkNodeInfo, NetworkState, Step, Uid, ActiveNetworkInfo};
/// A `State` discriminant.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@ -113,15 +114,26 @@ impl<T: Contribution> State<T> {
State::Validator { .. } => StateDsct::Validator,
}
}
}
pub struct StateMachine<T: Contribution> {
pub(crate) state: State<T>,
pub(crate) dsct: Arc<AtomicUsize>,
}
impl<T: Contribution> StateMachine<T> {
/// Returns a new `State::Disconnected`.
pub(super) fn disconnected() -> State<T> {
State::Disconnected { /*secret_key: secret_key*/ }
pub(super) fn disconnected() -> StateMachine<T> {
StateMachine {
state: State::Disconnected { },
dsct: Arc::new(AtomicUsize::new(0)),
}
}
/// Sets the state to `AwaitingMorePeersForKeyGeneration`.
pub(super) fn set_awaiting_more_peers(&mut self) {
*self = match self {
self.state = match self.state {
State::Disconnected {} => {
info!("Setting state: `AwaitingMorePeersForKeyGeneration`.");
State::AwaitingMorePeersForKeyGeneration {
@ -144,7 +156,7 @@ impl<T: Contribution> State<T> {
iom_queue: iom_queue.take(),
}
}
s => {
ref s => {
debug!(
"State::set_awaiting_more_peers: Attempted to set \
`State::AwaitingMorePeersForKeyGeneration` while {}.",
@ -155,6 +167,22 @@ impl<T: Contribution> State<T> {
};
}
/// Sets state to `DeterminingNetworkState` if
/// `AwaitingMorePeersForKeyGeneration`, otherwise panics.
pub(super) fn set_determining_network_state_active(&mut self, net_info: ActiveNetworkInfo) {
self.state = match self.state {
State::AwaitingMorePeersForKeyGeneration { ref mut ack_queue, ref mut iom_queue } => {
info!("Setting state: `DeterminingNetworkState`.");
State::DeterminingNetworkState {
ack_queue: ack_queue.take(),
iom_queue: iom_queue.take(),
network_state: Some(NetworkState::Active(net_info)),
}
}
_ => panic!("Cannot reset network state when state is not `AwaitingMorePeersForKeyGeneration`."),
};
}
/// Sets the state to `AwaitingMorePeersForKeyGeneration`.
pub(super) fn set_generating_keys(
&mut self,
@ -164,7 +192,7 @@ impl<T: Contribution> State<T> {
config: &Config,
) -> Result<(Part, Ack), Error> {
let (part, ack);
*self = match self {
self.state = match self.state {
State::AwaitingMorePeersForKeyGeneration {
ref mut iom_queue,
ref mut ack_queue,
@ -233,7 +261,7 @@ impl<T: Contribution> State<T> {
step_queue: &SegQueue<Step<T>>,
) -> Result<SegQueue<InputOrMessage<T>>, Error> {
let iom_queue_ret;
*self = match self {
self.state = match self.state {
State::DeterminingNetworkState {
ref mut iom_queue, ..
} => {
@ -263,7 +291,7 @@ impl<T: Contribution> State<T> {
State::Observer { dhb: Some(dhb) }
}
s => panic!(
ref s => panic!(
"State::set_observer: State must be `GeneratingKeys`. \
State: {}",
s.discriminant()
@ -285,7 +313,7 @@ impl<T: Contribution> State<T> {
_step_queue: &SegQueue<Step<T>>,
) -> Result<SegQueue<InputOrMessage<T>>, Error> {
let iom_queue_ret;
*self = match self {
self.state = match self.state {
State::GeneratingKeys {
ref mut sync_key_gen,
mut public_key,
@ -333,7 +361,7 @@ impl<T: Contribution> State<T> {
iom_queue_ret = iom_queue.take().unwrap();
State::Validator { dhb: Some(dhb) }
}
s => panic!(
ref s => panic!(
"State::set_validator: State must be `GeneratingKeys`. State: {}",
s.discriminant()
),
@ -343,12 +371,12 @@ impl<T: Contribution> State<T> {
#[must_use]
pub(super) fn promote_to_validator(&mut self) -> Result<(), Error> {
*self = match self {
self.state = match self.state {
State::Observer { ref mut dhb } => {
info!("=== PROMOTING NODE TO VALIDATOR ===");
State::Validator { dhb: dhb.take() }
}
s => panic!(
ref s => panic!(
"State::promote_to_validator: State must be `Observer`. State: {}",
s.discriminant()
),
@ -359,8 +387,7 @@ impl<T: Contribution> State<T> {
/// Sets state to `DeterminingNetworkState` if `Disconnected`, otherwise does
/// nothing.
pub(super) fn update_peer_connection_added(&mut self, _peers: &Peers<T>) {
let _dsct = self.discriminant();
*self = match self {
self.state = match self.state {
State::Disconnected {} => {
info!("Setting state: `DeterminingNetworkState`.");
State::DeterminingNetworkState {
@ -375,7 +402,7 @@ impl<T: Contribution> State<T> {
/// Sets state to `Disconnected` if peer count is zero, otherwise does nothing.
pub(super) fn update_peer_connection_dropped(&mut self, peers: &Peers<T>) {
*self = match self {
self.state = match self.state {
State::DeterminingNetworkState { .. } => {
if peers.count_total() == 0 {
State::Disconnected {}
@ -418,7 +445,7 @@ impl<T: Contribution> State<T> {
.map(|(&uid, &in_addr, &pk)| NetworkNodeInfo { uid, in_addr, pk })
})
.collect::<Vec<_>>();
match self {
match self.state {
State::AwaitingMorePeersForKeyGeneration { .. } => {
NetworkState::AwaitingMorePeersForKeyGeneration(peer_infos)
}
@ -447,7 +474,7 @@ impl<T: Contribution> State<T> {
/// Returns a reference to the internal HB instance.
pub fn dhb(&self) -> Option<&DynamicHoneyBadger<T, Uid>> {
match self {
match self.state {
State::Observer { ref dhb, .. } => dhb.as_ref(),
State::Validator { ref dhb, .. } => dhb.as_ref(),
_ => None,
@ -456,7 +483,7 @@ impl<T: Contribution> State<T> {
/// Returns a reference to the internal HB instance.
pub(super) fn dhb_mut(&mut self) -> Option<&mut DynamicHoneyBadger<T, Uid>> {
match self {
match self.state {
State::Observer { ref mut dhb, .. } => dhb.as_mut(),
State::Validator { ref mut dhb, .. } => dhb.as_mut(),
_ => None,
@ -470,7 +497,7 @@ impl<T: Contribution> State<T> {
&mut self,
iom: InputOrMessage<T>,
) -> Option<Result<Step<T>, DhbError>> {
match self {
match self.state {
State::Observer { ref mut dhb, .. } | State::Validator { ref mut dhb, .. } => {
trace!("State::handle_iom: Handling: {:?}", iom);
let step_opt = Some({
@ -498,7 +525,7 @@ impl<T: Contribution> State<T> {
trace!("State::handle_iom: Queueing: {:?}", iom);
iom_queue.as_ref().unwrap().push(iom);
}
s => panic!(
ref s => panic!(
"State::handle_iom: Must be connected in order to input to \
honey badger. State: {}",
s.discriminant()
@ -506,4 +533,9 @@ impl<T: Contribution> State<T> {
}
None
}
/// Returns the state discriminant.
pub(super) fn discriminant(&self) -> StateDsct {
self.state.discriminant()
}
}

View File

@ -195,6 +195,8 @@ pub struct NetworkNodeInfo {
pub(crate) pk: PublicKey,
}
type ActiveNetworkInfo = (Vec<NetworkNodeInfo>, PublicKeySet, BTreeMap<Uid, PublicKey>);
/// The current state of the network.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NetworkState {
@ -202,7 +204,7 @@ pub enum NetworkState {
Unknown(Vec<NetworkNodeInfo>),
AwaitingMorePeersForKeyGeneration(Vec<NetworkNodeInfo>),
GeneratingKeys(Vec<NetworkNodeInfo>, BTreeMap<Uid, PublicKey>),
Active((Vec<NetworkNodeInfo>, PublicKeySet, BTreeMap<Uid, PublicKey>)),
Active(ActiveNetworkInfo),
}
/// Messages sent over the network between nodes.