Move wire messaging code to `Peers`.

This commit is contained in:
c0gent 2018-11-19 12:43:22 -08:00
parent 5fab376f9b
commit 796745d2a9
No known key found for this signature in database
GPG Key ID: 9CC25E71A743E892
3 changed files with 55 additions and 48 deletions

View File

@ -50,39 +50,6 @@ impl<T: Contribution> Handler<T> {
}
}
fn wire_to_all(&self, msg: WireMessage<T>, peers: &Peers<T>) {
for (_p_addr, peer) in peers
.iter()
.filter(|(&p_addr, _)| p_addr != OutAddr(self.hdb.addr().0))
{
peer.tx().unbounded_send(msg.clone()).unwrap();
}
}
fn wire_to_validators(&self, msg: WireMessage<T>, peers: &Peers<T>) {
// for peer in peers.validators()
// .filter(|p| p.out_addr() != &OutAddr(self.hdb.addr().0)) {
// peer.tx().unbounded_send(msg.clone()).unwrap();
// }
// FIXME(DEBUG): TEMPORARILY WIRE TO ALL FOR NOW:
self.wire_to_all(msg, peers)
}
// `tar_uid` of `None` sends to all peers.
fn wire_to(&self, tar_uid: Uid, msg: WireMessage<T>, retry_count: usize, peers: &Peers<T>) {
match peers.get_by_uid(&tar_uid) {
Some(p) => p.tx().unbounded_send(msg).unwrap(),
None => {
info!(
"Node '{}' is not yet established. Queueing message for now (retry_count: {}).",
tar_uid, retry_count
);
self.wire_queue.push((tar_uid, msg, retry_count + 1))
}
}
}
fn handle_new_established_peer(
&self,
src_uid: Uid,
@ -112,19 +79,18 @@ impl<T: Contribution> Handler<T> {
)?;
info!("KEY GENERATION: Sending initial parts and our own ack.");
self.wire_to_validators(
peers.wire_to_validators(
WireMessage::hello_from_validator(
local_uid,
local_in_addr,
local_sk,
state.network_state(&peers),
),
peers,
);
self.wire_to_validators(WireMessage::key_gen_part(part), peers);
peers.wire_to_validators(WireMessage::key_gen_part(part));
// FIXME: QUEUE ACKS UNTIL PARTS ARE ALL RECEIVED:
self.wire_to_validators(WireMessage::key_gen_part_ack(ack), peers);
peers.wire_to_validators(WireMessage::key_gen_part_ack(ack));
}
}
State::GeneratingKeys { .. } => {
@ -237,7 +203,7 @@ impl<T: Contribution> Handler<T> {
"KEY GENERATION: Part from '{}' acknowledged. Broadcasting ack...",
src_uid
);
self.wire_to_validators(WireMessage::key_gen_part_ack(ack), &peers);
peers.wire_to_validators(WireMessage::key_gen_part_ack(ack));
debug!(" Peers complete: {}", skg.count_complete());
debug!(" Part count: {}", part_count);
@ -408,8 +374,8 @@ impl<T: Contribution> Handler<T> {
state: &mut StateMachine<T>,
peers: &Peers<T>,
) -> Result<(), Error> {
self.wire_to_validators(WireMessage::hello_request_change_add(*self.hdb.uid(),
*self.hdb.addr(), self.hdb.secret_key().public_key()), peers);
peers.wire_to_validators(WireMessage::hello_request_change_add(*self.hdb.uid(),
*self.hdb.addr(), self.hdb.secret_key().public_key()));
Ok(())
}
@ -725,7 +691,7 @@ impl<T: Contribution> Future for Handler<T> {
"Sending queued message from retry queue (retry_count: {})",
retry_count
);
self.wire_to(tar_uid, msg, retry_count, &peers);
peers.wire_to(tar_uid, msg, retry_count);
} else {
info!("Discarding queued message for '{}': {:?}", tar_uid, msg);
}
@ -750,7 +716,7 @@ impl<T: Contribution> Future for Handler<T> {
if let Some(jp) = batch.join_plan() {
// FIXME: Only sent to unconnected nodes:
debug!("Outputting join plan: {:?}", jp);
self.wire_to_all(WireMessage::join_plan(jp), &peers);
peers.wire_to_all(WireMessage::join_plan(jp));
}
match batch.change() {
@ -806,17 +772,15 @@ impl<T: Contribution> Future for Handler<T> {
trace!("hydrabadger::Handler: Forwarding message: {:?}", hb_msg);
match hb_msg.target {
Target::Node(p_uid) => {
self.wire_to(
peers.wire_to(
p_uid,
WireMessage::message(*self.hdb.uid(), hb_msg.message),
0,
&peers,
);
}
Target::All => {
self.wire_to_all(
peers.wire_to_all(
WireMessage::message(*self.hdb.uid(), hb_msg.message),
&peers,
);
}
}

View File

@ -157,7 +157,7 @@ impl<T: Contribution> Hydrabadger<T> {
uid,
addr: InAddr(addr),
secret_key,
peers: RwLock::new(Peers::new()),
peers: RwLock::new(Peers::new(InAddr(addr))),
state: RwLock::new(state),
state_dsct_stale,
peer_internal_tx,

View File

@ -409,14 +409,16 @@ impl<T: Contribution> Peer<T> {
pub struct Peers<T: Contribution> {
peers: HashMap<OutAddr, Peer<T>>,
out_addrs: HashMap<Uid, OutAddr>,
local_addr: InAddr,
}
impl<T: Contribution> Peers<T> {
/// Returns a new empty list of peers.
pub(crate) fn new() -> Peers<T> {
pub(crate) fn new(local_addr: InAddr) -> Peers<T> {
Peers {
peers: HashMap::with_capacity(64),
out_addrs: HashMap::with_capacity(64),
local_addr,
}
}
@ -524,6 +526,47 @@ impl<T: Contribution> Peers<T> {
false
}
pub(crate) fn wire_to_all(&self, msg: WireMessage<T>) {
for (_p_addr, peer) in self.peers
.iter()
.filter(|(&p_addr, _)| p_addr != OutAddr(self.local_addr.0))
{
peer.tx().unbounded_send(msg.clone()).unwrap();
}
}
pub(crate) fn wire_to_validators(&self, msg: WireMessage<T>) {
// for peer in peers.validators()
// .filter(|p| p.out_addr() != &OutAddr(self.hdb.addr().0)) {
// peer.tx().unbounded_send(msg.clone()).unwrap();
// }
// FIXME: TEMPORARILY WIRE TO ALL FOR NOW.
self.wire_to_all(msg)
}
/// Sends a `WireMessage` to the target specified by `tar_uid`.
///
/// If the target is not an established node, the message will be returned
/// along with an incremented retry count.
pub(crate) fn wire_to(&self, tar_uid: Uid, msg: WireMessage<T>, retry_count: usize)
-> Option<(Uid, WireMessage<T>, usize)>
{
match self.get_by_uid(&tar_uid) {
Some(p) => {
p.tx().unbounded_send(msg).unwrap();
None
},
None => {
info!(
"Node '{}' is not yet established. Queueing message for now (retry_count: {}).",
tar_uid, retry_count
);
Some((tar_uid, msg, retry_count + 1))
}
}
}
/// Removes a peer the list if it exists.
pub(crate) fn remove<O: Borrow<OutAddr>>(&mut self, out_addr: O) {
let peer = self.peers.remove(out_addr.borrow());