broadcast VALUE remote message handler written

This commit is contained in:
Vladimir Komendantskiy 2018-04-25 14:07:16 +01:00
parent 5428d928cc
commit c7020e7b6a
3 changed files with 108 additions and 28 deletions

View File

@ -1,5 +1,5 @@
//! Reliable broadcast algorithm instance.
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::{Arc, Mutex};
@ -13,23 +13,96 @@ use reed_solomon_erasure::ReedSolomon;
use crossbeam_channel::{Sender, Receiver, SendError, RecvError};
use messaging::{Target, TargetedMessage, SourcedMessage,
ProposedValue, QMessage, MessageLoopState,
Handler};
NodeUid, ProposedValue, QMessage, MessageLoopState,
Handler, LocalMessage, RemoteMessage, AlgoMessage,
RemoteNode};
use messaging;
pub struct Broadcast {
uid: NodeUid,
num_nodes: usize,
root_hash: Option<Vec<u8>>,
leaf_values: Vec<Option<Box<[u8]>>>,
leaf_values_num: usize,
}
impl Broadcast {
pub fn new() -> Self {
Broadcast {}
pub fn new(uid: NodeUid, num_nodes: usize) -> Self {
Broadcast {
uid,
num_nodes,
root_hash: None,
leaf_values: vec![None; num_nodes],
leaf_values_num: 0
}
}
pub fn handle<E>(&self, m: QMessage, tx: Sender<QMessage>) ->
pub fn handle<E>(&mut self, m: QMessage, tx: Sender<QMessage>) ->
Result<MessageLoopState, E>
where E: From<Error> + From<messaging::Error>
{
Err(Error::NotImplemented).map_err(E::from)
match m {
QMessage::Local(LocalMessage {
dst: _,
message
}) => {
match message {
AlgoMessage::Broadcast(value) => {
Err(Error::NotImplemented).map_err(E::from)
}
_ => Err(Error::UnexpectedMessage).map_err(E::from)
}
},
QMessage::Remote(RemoteMessage {
node: RemoteNode::Node(uid),
message
}) => {
if let Message::Broadcast(b) = message { match b {
BroadcastMessage::Value(p) => {
if uid != self.uid {
// Ignore value messages from unrelated remote nodes.
Ok(MessageLoopState::Processing(VecDeque::new()))
}
else {
if let None = self.root_hash {
self.root_hash = Some(p.root_hash.clone());
debug!("Node {} Value root hash {:?}",
self.uid, HexBytes(&p.root_hash));
}
if let &Some(ref h) = &self.root_hash {
if p.validate(h.as_slice()) {
// Save the leaf value for reconstructing
// the tree later.
self.leaf_values[index_of_proof(&p)] =
Some(p.value.clone().into_boxed_slice());
self.leaf_values_num += 1;
}
}
// Enqueue a broadcast of an echo of this proof.
Ok(MessageLoopState::Processing(VecDeque::from(
vec![RemoteMessage {
node: RemoteNode::All,
message: Message::Broadcast(
BroadcastMessage::Echo(p))
}]
)))
}
},
_ => Err(Error::NotImplemented).map_err(E::from)
}
}
else {
Err(Error::UnexpectedMessage).map_err(E::from)
}
},
_ => Err(Error::UnexpectedMessage).map_err(E::from)
}
}
}
@ -122,6 +195,7 @@ pub enum Error {
ReedSolomon(rse::Error),
Send(SendError<TargetedMessage<ProposedValue>>),
Recv(RecvError),
UnexpectedMessage,
NotImplemented
}

View File

@ -55,9 +55,9 @@ pub enum AlgoMessage {
/// A message sent between algorithm instances.
pub struct LocalMessage {
/// Identifier of the message destination algorithm.
dst: Algorithm,
pub dst: Algorithm,
/// Payload
message: AlgoMessage
pub message: AlgoMessage
}
/// The message destinations corresponding to a remote node `i`. It can be

View File

@ -88,6 +88,7 @@ impl<'a> TestNode<'a>
let tx = tx.clone();
let self_uid = self.uid;
scope.spawn(move || {
debug!("Node {} receiver {} starting", self_uid, uid);
while let Ok(message) = rx.recv() {
// FIXME: error handling
tx.send(QMessage::Remote(RemoteMessage {
@ -98,6 +99,7 @@ impl<'a> TestNode<'a>
debug!("Node {} receiver {} terminated", self_uid, uid);
});
}
// Start the local message loop.
let _ = self.message_loop.run();
});
@ -125,16 +127,16 @@ fn proposed_value(n: usize) -> ProposedValue {
vec![b; 10]
}
fn node_addr(node_index: usize) -> SocketAddr {
fn node_addr(node_index: usize) -> NodeUid {
format!("127.0.0.1:{}", node_index).parse().unwrap()
}
/// Creates a vector of test nodes but does not run them.
/// Creates test nodes but does not run them.
fn create_test_nodes(num_nodes: usize,
net: &NetSim<Message<Vec<u8>>>) ->
Vec<TestNode>
HashMap<NodeUid, (TestNode, HashMap<NodeUid, Broadcast>)>
{
let mut nodes = Vec::new();
let mut nodes = HashMap::new();
for n in 0..num_nodes {
let value = proposed_value(n);
let mut txs = HashMap::new();
@ -150,7 +152,17 @@ fn create_test_nodes(num_nodes: usize,
rxs.insert(addr, net.rx(m, n));
}
let uid = node_addr(n);
nodes.push(TestNode::new(uid, num_nodes, txs, rxs, Some(value)));
// Create a broadcast algorithm instance for each node.
let mut broadcast_instances = HashMap::new();
for k in 0..num_nodes {
let them_uid = node_addr(k);
broadcast_instances.insert(them_uid, Broadcast::new(them_uid,
num_nodes));
}
nodes.insert(uid, (TestNode::new(uid, num_nodes, txs, rxs, Some(value)),
broadcast_instances));
}
nodes
}
@ -160,26 +172,20 @@ fn test_4_broadcast_nodes() {
simple_logger::init_with_level(log::Level::Debug).unwrap();
const NUM_NODES: usize = 4;
let mut node_uids = Vec::new();
for i in 0..NUM_NODES {
node_uids.push(node_addr(i));
}
let node_uids_r = &node_uids;
// Create algorithm instances. FIXME.
let bi0 = Arc::new(Broadcast::new());
let net: NetSim<Message<Vec<u8>>> = NetSim::new(NUM_NODES);
let nodes = create_test_nodes(NUM_NODES, &net);
let mut join_handles: HashMap<NodeUid, _> = HashMap::new();
crossbeam::scope(|scope| {
let bi0 = &bi0;
for node in nodes.iter() {
join_handles.insert(node.uid, scope.spawn(move || {
node.add_handler(Algorithm::Broadcast(node_uids_r[0]),
bi0.deref());
// Run the test nodes, each in its own thread.
for (uid, (node, broadcast_instances)) in nodes.iter() {
join_handles.insert(*uid, scope.spawn(move || {
// Register broadcast instance handlers with the message loop.
for (instance_uid, instance) in broadcast_instances {
node.add_handler(Algorithm::Broadcast(*instance_uid),
instance);
}
debug!("Running {:?}", node.uid);
node.run()
}));