Move instance id to `WireMessage`, route user instance messages.

This commit is contained in:
c0gent 2018-11-29 03:56:53 -08:00
parent 4ad23df2a3
commit 7cf63d8a42
No known key found for this signature in database
GPG Key ID: 9CC25E71A743E892
4 changed files with 83 additions and 81 deletions

View File

@ -21,8 +21,7 @@ use peer::Peers;
use { use {
Contribution, InAddr, InternalMessage, InternalMessageKind, InternalRx, Contribution, InAddr, InternalMessage, InternalMessageKind, InternalRx,
NetworkState, OutAddr, Step, Uid, WireMessage, WireMessageKind, BatchTx, NetworkState, OutAddr, Step, Uid, WireMessage, WireMessageKind, BatchTx,
key_gen, key_gen};
};
use super::WIRE_MESSAGE_RETRY_MAX; use super::WIRE_MESSAGE_RETRY_MAX;
use super::{Error, Hydrabadger, InputOrMessage, StateMachine, State, StateDsct}; use super::{Error, Hydrabadger, InputOrMessage, StateMachine, State, StateDsct};
@ -97,12 +96,12 @@ impl<T: Contribution> Handler<T> {
} }
/// Handles a received `Part`. /// Handles a received `Part`.
fn handle_key_gen_part(&self, src_uid: &Uid, part: Part, state: &mut StateMachine<T>) fn handle_key_gen_part(&self, src_uid: &Uid, part: Part, state: &mut StateMachine<T>, peers: &Peers<T>)
-> Result<(), Error> -> Result<(), Error>
{ {
match state.state { match state.state {
State::KeyGen { ref mut key_gen, .. } => { State::KeyGen { ref mut key_gen, .. } => {
key_gen.handle_key_gen_part(src_uid, part, &self.hdb); key_gen.handle_key_gen_part(src_uid, part, peers);
} }
State::DeterminingNetworkState { ref network_state, .. } => { State::DeterminingNetworkState { ref network_state, .. } => {
match network_state.is_some() { match network_state.is_some() {
@ -131,7 +130,7 @@ impl<T: Contribution> Handler<T> {
match state.state { match state.state {
State::KeyGen { ref mut key_gen, .. } => { State::KeyGen { ref mut key_gen, .. } => {
if key_gen.handle_key_gen_ack(src_uid, ack, &self.hdb)? { if key_gen.handle_key_gen_ack(src_uid, ack, peers)? {
complete = true; complete = true;
} }
} }
@ -151,52 +150,43 @@ impl<T: Contribution> Handler<T> {
fn handle_key_gen_message( fn handle_key_gen_message(
&self, &self,
instance_id: key_gen::InstanceId,
msg: key_gen::Message, msg: key_gen::Message,
src_uid: &Uid, src_uid: &Uid,
state: &mut StateMachine<T>, state: &mut StateMachine<T>,
peers: &Peers<T>, peers: &Peers<T>,
) -> Result<(), Error> { ) -> Result<(), Error> {
use key_gen::{InstanceId, MessageKind}; use key_gen::{MessageKind, InstanceId};
let (instance_id, kind) = msg.into_parts(); let kind = msg.into_kind();
match instance_id { match instance_id {
InstanceId::User(_id) => { InstanceId::User(id) => {
// let key_gens = self.key_gens.borrow_mut(); let mut key_gens = self.key_gens.borrow_mut();
// let key_gen = match key_gens.get(&id) { match key_gens.get_mut(&id) {
// Some(kg) => { Some(ref mut kg) => match kind {
// // Key gen proposal: MessageKind::Part(part) => {
// MessageKind::Part(part) => { kg.handle_key_gen_part(src_uid, part, peers);
// self.handle_key_gen_part(src_uid, part, state)?; }
// } MessageKind::Ack(ack) => {
// // Key gen proposal acknowledgement: kg.handle_key_gen_ack(src_uid, ack, peers)?;
// // }
// // FIXME: Queue until all parts have been sent. // MessageKind::InstanceId => panic!("InstanceId should not be sent \
// MessageKind::Ack(ack) => { // for BuiltIn key gen instances"),
// self.handle_key_gen_ack(src_uid, ack, state, &self.hdb.peers())?; }
// } None => error!("KeyGen message received with invalid instance"),
}
// MessageKind::InstanceId => panic!("InstanceId should not be sent \
// for BuiltIn key gen instances"),
// }
// None => error!("KeyGen message received with invalid instance"),
// }
} }
InstanceId::BuiltIn => { InstanceId::BuiltIn => {
match kind { match kind {
// Key gen proposal:
MessageKind::Part(part) => { MessageKind::Part(part) => {
self.handle_key_gen_part(src_uid, part, state)?; self.handle_key_gen_part(src_uid, part, state, peers)?;
} }
// Key gen proposal acknowledgement:
//
// FIXME: Queue until all parts have been sent.
MessageKind::Ack(ack) => { MessageKind::Ack(ack) => {
self.handle_key_gen_ack(src_uid, ack, state, peers)?; self.handle_key_gen_ack(src_uid, ack, state, peers)?;
} }
// MessageKind::InstanceId => panic!("InstanceId should not be sent \
MessageKind::InstanceId => panic!("InstanceId should not be sent \ // for BuiltIn key gen instances"),
for BuiltIn key gen instances"),
} }
} }
} }
@ -512,8 +502,10 @@ impl<T: Contribution> Handler<T> {
let peers = self.hdb.peers(); let peers = self.hdb.peers();
let new_id = Uid::new(); let new_id = Uid::new();
tx.unbounded_send(key_gen::Message::instance_id(key_gen::InstanceId::User(new_id.clone()))).unwrap(); // tx.unbounded_send(key_gen::Message::instance_id().unwrap();
let key_gen = key_gen::Machine::generate(self.hdb.uid(), self.hdb.secret_key().clone(), &peers, tx)?; let instance_id = key_gen::InstanceId::User(new_id.clone());
let key_gen = key_gen::Machine::generate(self.hdb.uid(), self.hdb.secret_key().clone(),
&peers, tx, instance_id)?;
self.key_gens.borrow_mut().insert(new_id, key_gen); self.key_gens.borrow_mut().insert(new_id, key_gen);
} }
@ -566,8 +558,8 @@ impl<T: Contribution> Handler<T> {
)?; )?;
} }
WireMessageKind::KeyGen(msg) => { WireMessageKind::KeyGen(instance_id, msg) => {
self.handle_key_gen_message(msg, &src_uid.unwrap(), state, &self.hdb.peers())?; self.handle_key_gen_message(instance_id, msg, &src_uid.unwrap(), state, &self.hdb.peers())?;
} }
// Output by validators when a batch with a `ChangeState` // Output by validators when a batch with a `ChangeState`

View File

@ -13,7 +13,6 @@ use rand;
use super::Error; use super::Error;
use {Contribution, NetworkState, Uid, WireMessage}; use {Contribution, NetworkState, Uid, WireMessage};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum InstanceId { pub enum InstanceId {
BuiltIn, BuiltIn,
@ -25,47 +24,47 @@ pub enum InstanceId {
pub enum MessageKind { pub enum MessageKind {
Part(Part), Part(Part),
Ack(Ack), Ack(Ack),
InstanceId, // InstanceId(InstanceId),
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message { pub struct Message {
instance_id: InstanceId, // instance_id: InstanceId,
kind: MessageKind, kind: MessageKind,
} }
impl Message { impl Message {
pub fn part(instance_id: InstanceId, part: Part) -> Message { pub fn part(/*instance_id: InstanceId, */part: Part) -> Message {
Message { Message {
instance_id, // instance_id,
kind: MessageKind::Part(part), kind: MessageKind::Part(part),
} }
} }
pub fn ack(instance_id: InstanceId, ack: Ack) -> Message { pub fn ack(/*instance_id: InstanceId, */ack: Ack) -> Message {
Message { Message {
instance_id, // instance_id,
kind: MessageKind::Ack(ack), kind: MessageKind::Ack(ack),
} }
} }
pub fn instance_id(instance_id: InstanceId) -> Message { // pub fn instance_id(instance_id: InstanceId) -> Message {
Message { // Message {
instance_id, // // instance_id,
kind: MessageKind::InstanceId, // kind: MessageKind::InstanceId(instance_id),
} // }
} // }
pub fn kind(&self) -> &MessageKind { pub fn kind(&self) -> &MessageKind {
&self.kind &self.kind
} }
// pub fn into_kind(self) -> MessageKind { // pub fn into_parts(self) -> (InstanceId, MessageKind) {
// self.kind // (self.instance_id, self.kind)
// } // }
pub fn into_parts(self) -> (InstanceId, MessageKind) { pub fn into_kind(self) -> MessageKind {
(self.instance_id, self.kind) self.kind
} }
} }
@ -113,9 +112,9 @@ fn handle_queued_acks<T: Contribution>(
part_count: usize, part_count: usize,
ack_count: &mut usize, ack_count: &mut usize,
sync_key_gen: &mut SyncKeyGen<Uid>, sync_key_gen: &mut SyncKeyGen<Uid>,
hdb: &Hydrabadger<T>, peers: &Peers<T>,
) { ) {
if part_count == hdb.config().keygen_peer_count + 1 { if part_count == peers.count_validators() + 1 {
trace!("KEY GENERATION: Handling queued acks..."); trace!("KEY GENERATION: Handling queued acks...");
debug!(" Peers complete: {}", sync_key_gen.count_complete()); debug!(" Peers complete: {}", sync_key_gen.count_complete());
@ -134,6 +133,7 @@ pub struct Machine {
state: State, state: State,
ack_queue: SegQueue<(Uid, Ack)>, ack_queue: SegQueue<(Uid, Ack)>,
event_tx: Option<mpsc::UnboundedSender<Message>>, event_tx: Option<mpsc::UnboundedSender<Message>>,
instance_id: InstanceId,
} }
impl Machine { impl Machine {
@ -141,7 +141,8 @@ impl Machine {
/// state. /// state.
pub fn awaiting_peers( pub fn awaiting_peers(
ack_queue: SegQueue<(Uid, Ack)>, ack_queue: SegQueue<(Uid, Ack)>,
event_tx: Option<mpsc::UnboundedSender<Message>> event_tx: Option<mpsc::UnboundedSender<Message>>,
instance_id: InstanceId,
) -> Machine { ) -> Machine {
Machine { Machine {
state: State::AwaitingPeers { state: State::AwaitingPeers {
@ -150,6 +151,7 @@ impl Machine {
}, },
ack_queue, ack_queue,
event_tx, event_tx,
instance_id,
} }
} }
@ -160,7 +162,7 @@ impl Machine {
local_sk: SecretKey, local_sk: SecretKey,
peers: &Peers<T>, peers: &Peers<T>,
event_tx: mpsc::UnboundedSender<Message>, event_tx: mpsc::UnboundedSender<Message>,
// peer_count: usize, instance_id: InstanceId,
) -> Result<Machine, Error> { ) -> Result<Machine, Error> {
let mut m = Machine { let mut m = Machine {
state: State::AwaitingPeers { state: State::AwaitingPeers {
@ -169,12 +171,13 @@ impl Machine {
}, },
ack_queue: SegQueue::new(), ack_queue: SegQueue::new(),
event_tx: Some(event_tx), event_tx: Some(event_tx),
instance_id: instance_id.clone(),
}; };
let (part, ack) = m.set_generating_keys(local_uid, local_sk, peers)?; let (part, ack) = m.set_generating_keys(local_uid, local_sk, peers)?;
peers.wire_to_validators(WireMessage::key_gen_part(InstanceId::BuiltIn, part)); peers.wire_to_validators(WireMessage::key_gen_part(instance_id.clone(), part));
peers.wire_to_validators(WireMessage::key_gen_ack(InstanceId::BuiltIn, ack)); peers.wire_to_validators(WireMessage::key_gen_ack(instance_id, ack));
Ok(m) Ok(m)
} }
@ -273,8 +276,8 @@ impl Machine {
net_state, net_state,
), ),
); );
peers.wire_to_validators(WireMessage::key_gen_part(InstanceId::BuiltIn, part)); peers.wire_to_validators(WireMessage::key_gen_part(self.instance_id.clone(), part));
peers.wire_to_validators(WireMessage::key_gen_ack(InstanceId::BuiltIn, ack)); peers.wire_to_validators(WireMessage::key_gen_ack(self.instance_id.clone(), ack));
} }
} }
State::Generating { .. } => { State::Generating { .. } => {
@ -291,7 +294,12 @@ impl Machine {
} }
/// Handles a received `Part`. /// Handles a received `Part`.
pub(super) fn handle_key_gen_part<T: Contribution>(&mut self, src_uid: &Uid, part: Part, hdb: &Hydrabadger<T>) { pub(super) fn handle_key_gen_part<T: Contribution>(
&mut self,
src_uid: &Uid,
part: Part,
peers: &Peers<T>
) {
match self.state { match self.state {
State::Generating { State::Generating {
ref mut sync_key_gen, ref mut sync_key_gen,
@ -325,18 +333,18 @@ impl Machine {
trace!("KEY GENERATION: Queueing `Ack`."); trace!("KEY GENERATION: Queueing `Ack`.");
self.ack_queue.push((*src_uid, ack.clone())); self.ack_queue.push((*src_uid, ack.clone()));
let peers = hdb.peers(); // let peers = hdb.peers();
trace!( trace!(
"KEY GENERATION: Part from '{}' acknowledged. Broadcasting ack...", "KEY GENERATION: Part from '{}' acknowledged. Broadcasting ack...",
src_uid src_uid
); );
peers.wire_to_validators(WireMessage::key_gen_ack(InstanceId::BuiltIn, ack)); peers.wire_to_validators(WireMessage::key_gen_ack(self.instance_id.clone(), ack));
debug!(" Peers complete: {}", skg.count_complete()); debug!(" Peers complete: {}", skg.count_complete());
debug!(" Part count: {}", part_count); debug!(" Part count: {}", part_count);
debug!(" Ack count: {}", ack_count); debug!(" Ack count: {}", ack_count);
handle_queued_acks(&self.ack_queue, *part_count, ack_count, skg, hdb) handle_queued_acks(&self.ack_queue, *part_count, ack_count, skg, peers)
} }
ref s => panic!( ref s => panic!(
"::handle_key_gen_part: State must be `GeneratingKeys`. \ "::handle_key_gen_part: State must be `GeneratingKeys`. \
@ -351,7 +359,8 @@ impl Machine {
&mut self, &mut self,
src_uid: &Uid, src_uid: &Uid,
ack: Ack, ack: Ack,
hdb: &Hydrabadger<T>, // hdb: &Hydrabadger<T>,
peers: &Peers<T>,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
let mut complete: Option<(SyncKeyGen<Uid>, PublicKey)> = None; let mut complete: Option<(SyncKeyGen<Uid>, PublicKey)> = None;
@ -363,17 +372,17 @@ impl Machine {
ref mut ack_count, ref mut ack_count,
.. ..
} => { } => {
let node_n = { {
let mut skg = sync_key_gen.as_mut().unwrap(); let mut skg = sync_key_gen.as_mut().unwrap();
trace!("KEY GENERATION: Queueing `Ack`."); trace!("KEY GENERATION: Queueing `Ack`.");
self.ack_queue.push((*src_uid, ack.clone())); self.ack_queue.push((*src_uid, ack.clone()));
handle_queued_acks(&self.ack_queue, *part_count, ack_count, skg, hdb); handle_queued_acks(&self.ack_queue, *part_count, ack_count, skg, peers);
hdb.config().keygen_peer_count + 1
}; };
let node_n = peers.count_validators() + 1;
if sync_key_gen.as_ref().unwrap().count_complete() == node_n && *ack_count >= node_n * node_n { if sync_key_gen.as_ref().unwrap().count_complete() == node_n && *ack_count >= node_n * node_n {
let skg = sync_key_gen.take().unwrap(); let skg = sync_key_gen.take().unwrap();
info!("KEY GENERATION: All acks received and handled."); info!("KEY GENERATION: All acks received and handled.");

View File

@ -123,7 +123,8 @@ impl<T: Contribution> StateMachine<T> {
State::Disconnected {} => { State::Disconnected {} => {
info!("Setting state: `KeyGen`."); info!("Setting state: `KeyGen`.");
State::KeyGen { State::KeyGen {
key_gen: key_gen::Machine::awaiting_peers(SegQueue::new(), None), key_gen: key_gen::Machine::awaiting_peers(SegQueue::new(), None,
key_gen::InstanceId::BuiltIn),
iom_queue: Some(SegQueue::new()) iom_queue: Some(SegQueue::new())
} }
} }
@ -138,7 +139,8 @@ impl<T: Contribution> StateMachine<T> {
); );
info!("Setting state: `KeyGen`."); info!("Setting state: `KeyGen`.");
State::KeyGen { State::KeyGen {
key_gen: key_gen::Machine::awaiting_peers(ack_queue.take().unwrap(), None), key_gen: key_gen::Machine::awaiting_peers(ack_queue.take().unwrap(), None,
key_gen::InstanceId::BuiltIn),
iom_queue: iom_queue.take() , iom_queue: iom_queue.take() ,
} }
} }
@ -294,7 +296,6 @@ impl<T: Contribution> StateMachine<T> {
Ok(iom_queue_ret) Ok(iom_queue_ret)
} }
#[must_use]
pub(super) fn promote_to_validator(&mut self) -> Result<(), Error> { pub(super) fn promote_to_validator(&mut self) -> Result<(), Error> {
self.state = match self.state { self.state = match self.state {
State::Observer { ref mut dhb } => { State::Observer { ref mut dhb } => {

View File

@ -231,7 +231,7 @@ pub enum WireMessageKind<T> {
// TODO(c0gent): Remove. // TODO(c0gent): Remove.
Transaction(Uid, T), Transaction(Uid, T),
/// Messages used during synchronous key generation. /// Messages used during synchronous key generation.
KeyGen(key_gen::Message), KeyGen(key_gen::InstanceId, key_gen::Message),
JoinPlan(JoinPlan<Uid>), JoinPlan(JoinPlan<Uid>),
} }
@ -279,18 +279,18 @@ impl<T: Contribution> WireMessage<T> {
WireMessageKind::Message(src_uid, msg).into() WireMessageKind::Message(src_uid, msg).into()
} }
pub fn key_gen(msg: key_gen::Message) -> WireMessage<T> { pub fn key_gen(instance_id: key_gen::InstanceId, msg: key_gen::Message) -> WireMessage<T> {
WireMessageKind::KeyGen(msg).into() WireMessageKind::KeyGen(instance_id, msg).into()
} }
pub fn key_gen_part(instance_id: key_gen::InstanceId, part: Part) -> WireMessage<T> { pub fn key_gen_part(instance_id: key_gen::InstanceId, part: Part) -> WireMessage<T> {
// WireMessageKind::KeyGenPart(part).into() // WireMessageKind::KeyGenPart(part).into()
WireMessage::key_gen(key_gen::Message::part(instance_id, part)) WireMessage::key_gen(instance_id, key_gen::Message::part(part))
} }
pub fn key_gen_ack(instance_id: key_gen::InstanceId, ack: Ack) -> WireMessage<T> { pub fn key_gen_ack(instance_id: key_gen::InstanceId, ack: Ack) -> WireMessage<T> {
// WireMessageKind::KeyGenAck(outcome).into() // WireMessageKind::KeyGenAck(outcome).into()
WireMessage::key_gen(key_gen::Message::ack(instance_id, ack)) WireMessage::key_gen(instance_id, key_gen::Message::ack(ack))
} }
pub fn join_plan(jp: JoinPlan<Uid>) -> WireMessage<T> { pub fn join_plan(jp: JoinPlan<Uid>) -> WireMessage<T> {