Allow creation of arbitrary key generation instances.

Create using `Hydrabadger::new_key_gen_instance`.
This commit is contained in:
c0gent 2018-11-28 18:44:05 -08:00
parent e3b9d50251
commit 4ad23df2a3
No known key found for this signature in database
GPG Key ID: 9CC25E71A743E892
6 changed files with 237 additions and 100 deletions

View File

@ -4,9 +4,13 @@
//! * Do not make state changes directly in this module (use closures, etc.).
//!
use super::WIRE_MESSAGE_RETRY_MAX;
use super::{Error, Hydrabadger, InputOrMessage, StateMachine, State, StateDsct};
use std::{
collections::HashMap,
cell::RefCell,
};
use crossbeam::queue::SegQueue;
use tokio::{self, prelude::*};
use hbbft::{
crypto::PublicKey,
dynamic_honey_badger::{ChangeState, JoinPlan, Change as DhbChange},
@ -14,24 +18,27 @@ use hbbft::{
Target,
};
use peer::Peers;
use tokio::{self, prelude::*};
use {
Contribution, InAddr, InternalMessage, InternalMessageKind, InternalRx,
NetworkState, OutAddr, Step, Uid, WireMessage, WireMessageKind, BatchTx,
KeyGenMessage,
key_gen,
};
use super::WIRE_MESSAGE_RETRY_MAX;
use super::{Error, Hydrabadger, InputOrMessage, StateMachine, State, StateDsct};
/// Hydrabadger event (internal message) handler.
pub struct Handler<T: Contribution> {
hdb: Hydrabadger<T>,
// TODO: Use a bounded tx/rx (find a sensible upper bound):
peer_internal_rx: InternalRx<T>,
// Outgoing wire message queue:
/// Outgoing wire message queue.
wire_queue: SegQueue<(Uid, WireMessage<T>, usize)>,
// Output from HoneyBadger:
/// Output from HoneyBadger.
step_queue: SegQueue<Step<T>>,
// TODO: Use a bounded tx/rx (find a sensible upper bound):
batch_tx: BatchTx<T>,
/// Distributed synchronous key generation instances.
key_gens: RefCell<HashMap<Uid, key_gen::Machine>>,
}
impl<T: Contribution> Handler<T> {
@ -42,6 +49,7 @@ impl<T: Contribution> Handler<T> {
wire_queue: SegQueue::new(),
step_queue: SegQueue::new(),
batch_tx,
key_gens: RefCell::new(HashMap::new()),
}
}
@ -143,28 +151,59 @@ impl<T: Contribution> Handler<T> {
fn handle_key_gen_message(
&self,
msg: KeyGenMessage,
msg: key_gen::Message,
src_uid: &Uid,
state: &mut StateMachine<T>,
peers: &Peers<T>,
) -> Result<(), Error> {
match msg {
// Key gen proposal:
KeyGenMessage::Part(part) => {
self.handle_key_gen_part(src_uid, part, state)
}
use key_gen::{InstanceId, MessageKind};
// Key gen proposal acknowledgement:
//
// FIXME: Queue until all parts have been sent.
KeyGenMessage::Ack(ack) => {
self.handle_key_gen_ack(src_uid, ack, state, peers)
let (instance_id, kind) = msg.into_parts();
match instance_id {
InstanceId::User(_id) => {
// let key_gens = self.key_gens.borrow_mut();
// let key_gen = match key_gens.get(&id) {
// Some(kg) => {
// // Key gen proposal:
// MessageKind::Part(part) => {
// self.handle_key_gen_part(src_uid, part, state)?;
// }
// // Key gen proposal acknowledgement:
// //
// // FIXME: Queue until all parts have been sent.
// MessageKind::Ack(ack) => {
// self.handle_key_gen_ack(src_uid, ack, state, &self.hdb.peers())?;
// }
// MessageKind::InstanceId => panic!("InstanceId should not be sent \
// for BuiltIn key gen instances"),
// }
// None => error!("KeyGen message received with invalid instance"),
// }
}
InstanceId::BuiltIn => {
match kind {
// Key gen proposal:
MessageKind::Part(part) => {
self.handle_key_gen_part(src_uid, part, state)?;
}
// Key gen proposal acknowledgement:
//
// FIXME: Queue until all parts have been sent.
MessageKind::Ack(ack) => {
self.handle_key_gen_ack(src_uid, ack, state, peers)?;
}
MessageKind::InstanceId => panic!("InstanceId should not be sent \
for BuiltIn key gen instances"),
}
}
}
Ok(())
}
// This may be called spuriously and only need be handled by
// 'unestablished' nodes.
fn handle_join_plan(
@ -382,6 +421,9 @@ impl<T: Contribution> Handler<T> {
i_msg: InternalMessage<T>,
state: &mut StateMachine<T>,
) -> Result<(), Error> {
// let mut state_guard = self.hdb.state_mut();
// let mut state = &mut state_guard;
let (src_uid, src_out_addr, w_msg) = i_msg.into_parts();
match w_msg {
@ -465,6 +507,16 @@ impl<T: Contribution> Handler<T> {
self.handle_peer_disconnect(dropped_src_uid, state, &peers)?;
}
InternalMessageKind::NewKeyGenInstance(tx) => {
// TODO: Spawn these instances in a separate thread/task.
let peers = self.hdb.peers();
let new_id = Uid::new();
tx.unbounded_send(key_gen::Message::instance_id(key_gen::InstanceId::User(new_id.clone()))).unwrap();
let key_gen = key_gen::Machine::generate(self.hdb.uid(), self.hdb.secret_key().clone(), &peers, tx)?;
self.key_gens.borrow_mut().insert(new_id, key_gen);
}
InternalMessageKind::Wire(w_msg) => match w_msg.into_kind() {
// This is sent on the wire to ensure that we have all of the
// relevant details for a peer (generally preceeding other
@ -514,19 +566,6 @@ impl<T: Contribution> Handler<T> {
)?;
}
// // Key gen proposal:
// WireMessageKind::KeyGenPart(part) => {
// self.handle_key_gen_part(&src_uid.unwrap(), part, state);
// }
// // Key gen proposal acknowledgement:
// //
// // FIXME: Queue until all parts have been sent.
// WireMessageKind::KeyGenAck(ack) => {
// let peers = self.hdb.peers();
// self.handle_key_gen_ack(&src_uid.unwrap(), ack, state, &peers)?;
// }
WireMessageKind::KeyGen(msg) => {
self.handle_key_gen_message(msg, &src_uid.unwrap(), state, &self.hdb.peers())?;
}
@ -558,9 +597,7 @@ impl<T: Contribution> Future for Handler<T> {
// Ensure the loop can't hog the thread for too long:
const MESSAGES_PER_TICK: usize = 50;
trace!("hydrabadger::Handler::poll: Locking 'state' for writing...");
let mut state = self.hdb.state_mut();
trace!("hydrabadger::Handler::poll: 'state' locked for writing.");
// Handle incoming internal messages:
for i in 0..MESSAGES_PER_TICK {
@ -600,6 +637,8 @@ impl<T: Contribution> Future for Handler<T> {
trace!("hydrabadger::Handler: Processing step queue....");
// let mut state = self.hdb.state_mut();
// Process all honey badger output batches:
while let Some(mut step) = self.step_queue.try_pop() {
for batch in step.output.drain(..) {

View File

@ -1,16 +1,6 @@
//! A hydrabadger consensus node.
//!
use futures::{
future::{self, Either},
sync::mpsc,
};
use hbbft::{
crypto::{PublicKey, SecretKey},
};
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use peer::{PeerHandler, Peers};
use rand::{self, Rand};
use std::{
collections::HashSet,
net::SocketAddr,
@ -20,15 +10,25 @@ use std::{
},
time::{Duration, Instant},
};
use rand::{self, Rand};
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use futures::{
future::{self, Either},
sync::mpsc,
};
use tokio::{
self,
net::{TcpListener, TcpStream},
prelude::*,
timer::{Interval, Delay},
};
use hbbft::{
crypto::{PublicKey, SecretKey},
};
use peer::{PeerHandler, Peers};
use {
Change, Contribution, InAddr, InternalMessage, InternalTx, OutAddr, Uid, WireMessage,
WireMessageKind, WireMessages, BatchRx, EpochTx, EpochRx,
WireMessageKind, WireMessages, BatchRx, EpochTx, EpochRx, key_gen,
};
use super::{Error, Handler, StateMachine, StateDsct};
@ -297,6 +297,15 @@ impl<T: Contribution> Hydrabadger<T> {
}
}
/// Begins a synchronous distributed key generation instance and returns a
/// stream which may be polled for events and messages.
pub fn new_key_gen_instance(&self) -> mpsc::UnboundedReceiver<key_gen::Message> {
let (tx, rx) = mpsc::unbounded();
self.send_internal(InternalMessage::new_key_gen_instance(
self.inner.uid, OutAddr(*self.inner.addr), tx));
rx
}
/// Returns a future that handles incoming connections on `socket`.
fn handle_incoming(self, socket: TcpStream) -> impl Future<Item = (), Error = ()> {
info!("Incoming connection from '{}'", socket.peer_addr().unwrap());
@ -559,7 +568,7 @@ pub struct HydrabadgerWeak<T: Contribution> {
impl<T: Contribution> HydrabadgerWeak<T> {
pub fn upgrade(self) -> Option<Hydrabadger<T>> {
self.inner.upgrade() .and_then(|inner| {
self.inner.upgrade().and_then(|inner| {
self.handler.upgrade().and_then(|handler| {
self.batch_rx.upgrade().and_then(|batch_rx|{
Some(Hydrabadger { inner, handler, batch_rx })

View File

@ -1,4 +1,6 @@
//! Synchronous distributed key generation.
use futures::sync::mpsc;
use hydrabadger::hydrabadger::Hydrabadger;
use crossbeam::queue::SegQueue;
use hbbft::{
@ -8,10 +10,66 @@ use hbbft::{
use peer::Peers;
use std::{collections::BTreeMap};
use rand;
use super::{Config, Error};
use super::Error;
use {Contribution, NetworkState, Uid, WireMessage};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum InstanceId {
BuiltIn,
User(Uid),
}
/// Messages used during synchronous key generation.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum MessageKind {
Part(Part),
Ack(Ack),
InstanceId,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message {
instance_id: InstanceId,
kind: MessageKind,
}
impl Message {
pub fn part(instance_id: InstanceId, part: Part) -> Message {
Message {
instance_id,
kind: MessageKind::Part(part),
}
}
pub fn ack(instance_id: InstanceId, ack: Ack) -> Message {
Message {
instance_id,
kind: MessageKind::Ack(ack),
}
}
pub fn instance_id(instance_id: InstanceId) -> Message {
Message {
instance_id,
kind: MessageKind::InstanceId,
}
}
pub fn kind(&self) -> &MessageKind {
&self.kind
}
// pub fn into_kind(self) -> MessageKind {
// self.kind
// }
pub fn into_parts(self) -> (InstanceId, MessageKind) {
(self.instance_id, self.kind)
}
}
/// Key generation state.
#[derive(Debug)]
pub(super) enum State {
@ -40,7 +98,7 @@ fn handle_ack(
ack_count: &mut usize,
sync_key_gen: &mut SyncKeyGen<Uid>,
) {
info!("KEY GENERATION: Handling ack from '{}'...", uid);
trace!("KEY GENERATION: Handling ack from '{}'...", uid);
let ack_outcome = sync_key_gen.handle_ack(uid, ack.clone()).expect("Failed to handle Ack.");
match ack_outcome {
AckOutcome::Invalid(fault) => error!("Error handling ack: '{:?}':\n{:?}", ack, fault),
@ -58,7 +116,7 @@ fn handle_queued_acks<T: Contribution>(
hdb: &Hydrabadger<T>,
) {
if part_count == hdb.config().keygen_peer_count + 1 {
info!("KEY GENERATION: Handling queued acks...");
trace!("KEY GENERATION: Handling queued acks...");
debug!(" Peers complete: {}", sync_key_gen.count_complete());
debug!(" Part count: {}", part_count);
@ -72,40 +130,71 @@ fn handle_queued_acks<T: Contribution>(
/// Manages the key generation state.
#[derive(Debug)]
pub struct KeyGenMachine {
pub struct Machine {
state: State,
ack_queue: SegQueue<(Uid, Ack)>,
event_tx: Option<mpsc::UnboundedSender<Message>>,
}
impl KeyGenMachine {
/// Creates and returns a new `KeyGenMachine` in the `AwaitingPeers`
impl Machine {
/// Creates and returns a new `Machine` in the `AwaitingPeers`
/// state.
pub fn awaiting_peers(ack_queue: SegQueue<(Uid, Ack)>)
-> KeyGenMachine
{
KeyGenMachine {
pub fn awaiting_peers(
ack_queue: SegQueue<(Uid, Ack)>,
event_tx: Option<mpsc::UnboundedSender<Message>>
) -> Machine {
Machine {
state: State::AwaitingPeers {
required_peers: Vec::new(),
available_peers: Vec::new(),
},
ack_queue: ack_queue,
ack_queue,
event_tx,
}
}
/// Creates and returns a new `Machine` in the `Generating`
/// state.
pub fn generate<T: Contribution>(
local_uid: &Uid,
local_sk: SecretKey,
peers: &Peers<T>,
event_tx: mpsc::UnboundedSender<Message>,
// peer_count: usize,
) -> Result<Machine, Error> {
let mut m = Machine {
state: State::AwaitingPeers {
required_peers: Vec::new(),
available_peers: Vec::new(),
},
ack_queue: SegQueue::new(),
event_tx: Some(event_tx),
};
let (part, ack) = m.set_generating_keys(local_uid, local_sk, peers)?;
peers.wire_to_validators(WireMessage::key_gen_part(InstanceId::BuiltIn, part));
peers.wire_to_validators(WireMessage::key_gen_ack(InstanceId::BuiltIn, ack));
Ok(m)
}
/// Sets the state to `AwaitingMorePeersForKeyGeneration`.
pub(super) fn set_generating_keys<T: Contribution>(
&mut self,
local_uid: &Uid,
local_sk: SecretKey,
peers: &Peers<T>,
config: &Config,
// config: &Config,
// threshold: usize,
) -> Result<(Part, Ack), Error> {
let (part, ack);
self.state = match self.state {
State::AwaitingPeers {
..
} => {
let threshold = config.keygen_peer_count / 3;
// let threshold = config.keygen_peer_count / 3;
let threshold = peers.count_validators() / 3;
let mut public_keys: BTreeMap<Uid, PublicKey> = peers
.validators()
@ -122,19 +211,16 @@ impl KeyGenMachine {
.map_err(Error::SyncKeyGenNew)?;
part = opt_part.expect("This node is not a validator (somehow)!");
info!("KEY GENERATION: Handling our own `Part`...");
trace!("KEY GENERATION: Handling our own `Part`...");
ack = match sync_key_gen.handle_part(&mut rng, &local_uid, part.clone())
.expect("Handling our own Part has failed") {
PartOutcome::Valid(Some(ack)) => ack,
PartOutcome::Invalid(faults) => panic!(
"Invalid part \
(FIXME: handle): {:?}",
faults
),
"Invalid part (FIXME: handle): {:?}", faults),
PartOutcome::Valid(None) => panic!("No Ack produced when handling Part."),
};
info!("KEY GENERATION: Queueing our own `Ack`...");
trace!("KEY GENERATION: Queueing our own `Ack`...");
self.ack_queue.push((*local_uid, ack.clone()));
State::Generating {
@ -154,6 +240,8 @@ impl KeyGenMachine {
}
/// Notify this key generation instance that peers have been added.
//
// TODO: Move some of this logic back to handler.
pub(super) fn add_peers<T: Contribution>(
&mut self,
peers: &Peers<T>,
@ -163,30 +251,30 @@ impl KeyGenMachine {
match self.state {
State::AwaitingPeers { .. } => {
if peers.count_validators() >= hdb.config().keygen_peer_count {
info!("== BEGINNING KEY GENERATION ==");
info!("BEGINNING KEY GENERATION");
let local_uid = *hdb.uid();
let local_in_addr = *hdb.addr();
let local_sk = hdb.secret_key().public_key();
let local_pk = hdb.secret_key().public_key();
let (part, ack) = self.set_generating_keys(
&local_uid,
hdb.secret_key().clone(),
peers,
hdb.config(),
// hdb.config().keygen_peer_count / 3,
)?;
info!("KEY GENERATION: Sending initial parts and our own ack.");
trace!("KEY GENERATION: Sending initial parts and our own ack.");
peers.wire_to_validators(
WireMessage::hello_from_validator(
local_uid,
local_in_addr,
local_sk,
local_pk,
net_state,
),
);
peers.wire_to_validators(WireMessage::key_gen_part(part));
peers.wire_to_validators(WireMessage::key_gen_ack(ack));
peers.wire_to_validators(WireMessage::key_gen_part(InstanceId::BuiltIn, part));
peers.wire_to_validators(WireMessage::key_gen_ack(InstanceId::BuiltIn, ack));
}
}
State::Generating { .. } => {
@ -212,7 +300,7 @@ impl KeyGenMachine {
..
} => {
// TODO: Move this match block into a function somewhere for re-use:
info!("KEY GENERATION: Handling part from '{}'...", src_uid);
trace!("KEY GENERATION: Handling part from '{}'...", src_uid);
let mut rng = rand::OsRng::new().expect("Creating OS Rng has failed");
let mut skg = sync_key_gen.as_mut().unwrap();
let ack = match skg.handle_part(&mut rng, src_uid, part) {
@ -234,15 +322,15 @@ impl KeyGenMachine {
*part_count += 1;
info!("KEY GENERATION: Queueing `Ack`.");
trace!("KEY GENERATION: Queueing `Ack`.");
self.ack_queue.push((*src_uid, ack.clone()));
let peers = hdb.peers();
info!(
trace!(
"KEY GENERATION: Part from '{}' acknowledged. Broadcasting ack...",
src_uid
);
peers.wire_to_validators(WireMessage::key_gen_ack(ack));
peers.wire_to_validators(WireMessage::key_gen_ack(InstanceId::BuiltIn, ack));
debug!(" Peers complete: {}", skg.count_complete());
debug!(" Part count: {}", part_count);
@ -278,7 +366,7 @@ impl KeyGenMachine {
let node_n = {
let mut skg = sync_key_gen.as_mut().unwrap();
info!("KEY GENERATION: Queueing `Ack`.");
trace!("KEY GENERATION: Queueing `Ack`.");
self.ack_queue.push((*src_uid, ack.clone()));
handle_queued_acks(&self.ack_queue, *part_count, ack_count, skg, hdb);

View File

@ -1,7 +1,7 @@
mod handler;
mod hydrabadger;
mod state;
mod key_gen;
pub mod key_gen;
use std;
use bincode;

View File

@ -6,7 +6,6 @@
#![allow(dead_code)]
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use super::{Config, Error, InputOrMessage};
use crossbeam::queue::SegQueue;
use hbbft::{
crypto::{PublicKey, SecretKey},
@ -16,7 +15,7 @@ use hbbft::{
};
use peer::Peers;
use std::{collections::BTreeMap, fmt};
use super::key_gen::KeyGenMachine;
use super::{Config, Error, InputOrMessage, key_gen};
use {Contribution, NetworkNodeInfo, NetworkState, Step, Uid, ActiveNetworkInfo};
/// A `State` discriminant.
@ -71,7 +70,7 @@ pub enum State<T: Contribution> {
network_state: Option<NetworkState>,
},
KeyGen {
key_gen: KeyGenMachine,
key_gen: key_gen::Machine,
iom_queue: Option<SegQueue<InputOrMessage<T>>>,
},
Observer {
@ -124,7 +123,7 @@ impl<T: Contribution> StateMachine<T> {
State::Disconnected {} => {
info!("Setting state: `KeyGen`.");
State::KeyGen {
key_gen: KeyGenMachine::awaiting_peers(SegQueue::new()),
key_gen: key_gen::Machine::awaiting_peers(SegQueue::new(), None),
iom_queue: Some(SegQueue::new())
}
}
@ -139,7 +138,7 @@ impl<T: Contribution> StateMachine<T> {
);
info!("Setting state: `KeyGen`.");
State::KeyGen {
key_gen: KeyGenMachine::awaiting_peers(ack_queue.take().unwrap()),
key_gen: key_gen::Machine::awaiting_peers(ack_queue.take().unwrap(), None),
iom_queue: iom_queue.take() ,
}
}
@ -428,7 +427,7 @@ impl<T: Contribution> StateMachine<T> {
}
/// Returns a reference to the key generation instance.
pub(super) fn key_gen(&self) -> Option<&KeyGenMachine> {
pub(super) fn key_gen(&self) -> Option<&key_gen::Machine> {
match self.state {
State::KeyGen { ref key_gen, .. } => Some(key_gen),
_ => None,
@ -436,7 +435,7 @@ impl<T: Contribution> StateMachine<T> {
}
/// Returns a reference to the key generation instance.
pub(super) fn key_gen_mut(&mut self) -> Option<&mut KeyGenMachine> {
pub(super) fn key_gen_mut(&mut self) -> Option<&mut key_gen::Machine> {
match self.state {
State::KeyGen { ref mut key_gen, .. } => Some(key_gen),
_ => None,

View File

@ -77,6 +77,7 @@ pub use hydrabadger::{Config, Hydrabadger, HydrabadgerWeak};
pub use hydrabadger::Error;
pub use hbbft::dynamic_honey_badger::Batch;
pub use hydrabadger::StateDsct;
pub use hydrabadger::key_gen;
/// Transmit half of the wire message channel.
// TODO: Use a bounded tx/rx (find a sensible upper bound):
@ -114,8 +115,8 @@ pub type EpochRx = mpsc::UnboundedReceiver<u64>;
pub trait Contribution:
HbbftContribution + Clone + Debug + Serialize + DeserializeOwned + 'static
{
}
{}
impl<C> Contribution for C where
C: HbbftContribution + Clone + Debug + Serialize + DeserializeOwned + 'static
{}
@ -207,13 +208,6 @@ pub enum NetworkState {
Active(ActiveNetworkInfo),
}
/// Messages used during synchronous key generation.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum KeyGenMessage {
Part(Part),
Ack(Ack),
}
/// Messages sent over the network between nodes.
///
/// [`Message`](enum.WireMessageKind.html#variant.Message) variants are among
@ -237,7 +231,7 @@ pub enum WireMessageKind<T> {
// TODO(c0gent): Remove.
Transaction(Uid, T),
/// Messages used during synchronous key generation.
KeyGen(KeyGenMessage),
KeyGen(key_gen::Message),
JoinPlan(JoinPlan<Uid>),
}
@ -285,18 +279,18 @@ impl<T: Contribution> WireMessage<T> {
WireMessageKind::Message(src_uid, msg).into()
}
pub fn key_gen(msg: KeyGenMessage) -> WireMessage<T> {
pub fn key_gen(msg: key_gen::Message) -> WireMessage<T> {
WireMessageKind::KeyGen(msg).into()
}
pub fn key_gen_part(part: Part) -> WireMessage<T> {
pub fn key_gen_part(instance_id: key_gen::InstanceId, part: Part) -> WireMessage<T> {
// WireMessageKind::KeyGenPart(part).into()
WireMessage::key_gen(KeyGenMessage::Part(part))
WireMessage::key_gen(key_gen::Message::part(instance_id, part))
}
pub fn key_gen_ack(ack: Ack) -> WireMessage<T> {
pub fn key_gen_ack(instance_id: key_gen::InstanceId, ack: Ack) -> WireMessage<T> {
// WireMessageKind::KeyGenAck(outcome).into()
WireMessage::key_gen(KeyGenMessage::Ack(ack))
WireMessage::key_gen(key_gen::Message::ack(instance_id, ack))
}
pub fn join_plan(jp: JoinPlan<Uid>) -> WireMessage<T> {
@ -435,6 +429,7 @@ pub enum InternalMessageKind<T: Contribution> {
PeerDisconnect,
NewIncomingConnection(InAddr, PublicKey, bool),
NewOutgoingConnection,
NewKeyGenInstance(mpsc::UnboundedSender<key_gen::Message>),
}
/// A message between internal threads/tasks.
@ -501,6 +496,13 @@ impl<T: Contribution> InternalMessage<T> {
)
}
pub fn new_key_gen_instance(src_uid: Uid, src_addr: OutAddr,
tx: mpsc::UnboundedSender<key_gen::Message>) -> InternalMessage<T>
{
InternalMessage::new(Some(src_uid), src_addr,
InternalMessageKind::NewKeyGenInstance(tx))
}
pub fn new_outgoing_connection(src_addr: OutAddr) -> InternalMessage<T> {
InternalMessage::new_without_uid(src_addr, InternalMessageKind::NewOutgoingConnection)
}