added a broadcast stage struct and drafted messaging between broadcast tasks

This commit is contained in:
Vladimir Komendantskiy 2018-03-16 18:12:14 +00:00
parent cb4c55a5c5
commit 86e133d2b4
6 changed files with 143 additions and 30 deletions

View File

@ -2,51 +2,84 @@
use std::fmt::Debug;
use std::collections::{HashMap, HashSet};
use std::net::{TcpStream, TcpListener, SocketAddr};
use std::sync::mpsc::{channel, Receiver};
//use errors::ResultExt;
use task::{Error, MessageLoop, Task};
use proto::Message;
use merkle::*;
use proto::*;
use std::marker::{Send, Sync};
mod stage;
use self::stage::*;
/// A broadcast task is an instance of `Task`, a message-handling task with a
/// main loop.
pub struct BroadcastTask<T> {
/// The underlying task that handles sending and receiving messages.
task: Task,
/// Messages of type Value received so far, keyed with the root hash for
/// easy access.
values: HashMap<Vec<u8>, Proof<T>>,
/// Messages of type Echo received so far, keyed with the root hash for
/// easy access.
echos: HashMap<Vec<u8>, Proof<T>>,
/// Messages of type Ready received so far. That is, the root hashes in
/// those messages.
readys: HashSet<Vec<u8>>
/// The receive end of the comms channel. The transmit end is stored in
/// `stage`.
receiver: Receiver<Message<T>>,
/// Shared state of the broadcast stage.
stage: Stage<T>
}
impl<T: Debug> BroadcastTask<T> {
pub fn new(stream: TcpStream) -> Self {
impl<T: Clone + Debug + Send + Sync + 'static> BroadcastTask<T> {
pub fn new(stream: TcpStream,
receiver: Receiver<Message<T>>,
stage: Stage<T>) -> Self {
BroadcastTask {
task: Task::new(stream),
values: Default::default(),
echos: Default::default(),
readys: Default::default()
receiver: receiver,
stage: stage
}
}
fn on_message_received(&mut self, message: Message<T>)
-> Result<(), Error>
{
info!("Message received: {:?}", message);
Ok(())
// else {
// warn!("Unexpected message type");
// return Err(Error::ProtocolError);
// }
// info!("Message received: {:?}", message);
if let Message::Broadcast(b) = message {
match b {
BroadcastMessage::Value(proof) => {
self.stage.values.insert(proof.root_hash.clone(), proof.clone());
Ok(())
},
BroadcastMessage::Echo(proof) => {
self.echos.insert(proof.root_hash.clone(), proof.clone());
Ok(())
},
BroadcastMessage::Ready(root_hash) => {
self.readys.insert(root_hash);
Ok(())
}
}
}
else {
warn!("Unexpected message type");
return Err(Error::ProtocolError);
}
}
/// Receiver of messages from other broadcast tasks.
///
/// TODO: This is only a placeholder.
pub fn receiver_thread(&self) {
::std::thread::spawn(move || {
loop {
let message = self.receiver.recv().unwrap();
info!("Task {:?} received message {:?}",
self.task.stream.peer_addr().unwrap(),
message);
}
});
}
}
impl<T: Debug + From<Vec<u8>>> MessageLoop for BroadcastTask<T> {
impl<T: Clone + Debug + From<Vec<u8>> + Send + Sync + 'static>
MessageLoop for BroadcastTask<T> {
fn run(&mut self) {
self.receiver_thread();
loop {
match self.task.receive_message() {
Ok(message) => self.on_message_received(message).unwrap(),
@ -57,5 +90,6 @@ impl<T: Debug + From<Vec<u8>>> MessageLoop for BroadcastTask<T> {
}
}
}
//interthread_receiver.join().unwrap();
}
}

31
src/broadcast/stage.rs Normal file
View File

@ -0,0 +1,31 @@
//! Broadcast stage of the consensus algorithm.
use std::collections::{HashMap, HashSet};
use std::sync::mpsc::{channel, Sender};
use std::sync::Mutex;
use merkle::*;
use proto::*;
pub struct Stage<T> {
/// Tx channels to communicate with all tasks.
pub senders: Vec<Sender<Message<T>>>,
/// Messages of type Value received so far, keyed with the root hash for
/// easy access.
pub values: Mutex<HashMap<Vec<u8>, Proof<T>>>,
/// Messages of type Echo received so far, keyed with the root hash for
/// easy access.
pub echos: Mutex<HashMap<Vec<u8>, Proof<T>>>,
/// Messages of type Ready received so far. That is, the root hashes in
/// those messages.
pub readys: Mutex<HashSet<Vec<u8>>>
}
impl<T> Stage<T> {
pub fn new(senders: Vec<Sender<Message<T>>>) -> Self {
Stage {
senders: senders,
values: Default::default(),
echos: Default::default(),
readys: Default::default()
}
}
}

View File

@ -13,5 +13,6 @@ mod errors;
mod proto;
mod task;
pub mod node;
pub mod broadcast;
pub mod agreement;

39
src/node.rs Normal file
View File

@ -0,0 +1,39 @@
//! Networking controls of the consensus node.
use std::sync::mpsc;
use std::fmt::Debug;
use std::collections::{HashMap, HashSet};
use std::net::{TcpStream, TcpListener, SocketAddr};
use broadcast::*;
/// This is a structure to start a consensus node.
pub struct Node {
/// Incoming connection socket.
addr: SocketAddr,
/// Connection sockets of remote nodes. TODO.
remotes: Vec<SocketAddr>
}
impl Node {
pub fn new(addr: SocketAddr, remotes: Vec<SocketAddr>) -> Self {
Node {addr, remotes}
}
pub fn run(&self) {
// Listen for incoming connections on a given TCP port.
let listener = TcpListener::bind(&self.addr).unwrap();
for stream in listener.incoming() {
match stream {
Ok(stream) => {
info!("New connection from {:?}",
stream.peer_addr().unwrap());
// TODO: spawn a thread for the connected socket
}
Err(e) => {
warn!("Failed to connect: {}", e);
}
}
}
}
}

View File

@ -1,6 +1,8 @@
//! Construction of messages from protobuf buffers.
#![feature(optin_builtin_traits)]
pub mod message;
use std::marker::{Send, Sync};
use ring::digest::Algorithm;
use merkle::proof::{Proof, Lemma, Positioned};
use protobuf::Message as ProtobufMessage;
@ -10,15 +12,18 @@ use protobuf::core::parse_from_bytes;
/// Kinds of message sent by nodes participating in consensus.
#[derive (Clone, Debug, PartialEq)]
pub enum Message<T> {
pub enum Message<T: Send + Sync> {
Broadcast(BroadcastMessage<T>),
Agreement(AgreementMessage)
}
//unsafe impl<T: Send + Sync> Send for Message<T> { }
//impl<T: Send + Sync> !Sync for Message<T> { }
/// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm.
#[derive (Clone, Debug, PartialEq)]
pub enum BroadcastMessage<T> {
pub enum BroadcastMessage<T: Send + Sync> {
Value(Proof<T>),
Echo(Proof<T>),
Ready(Vec<u8>)
@ -30,10 +35,13 @@ pub enum AgreementMessage {
// TODO
}
impl<T> Message<T> {
impl<T: Send + Sync> Message<T> {
/// Translation from protobuf to the regular type.
///
/// TODO: add an `Algorithm` field to `MessageProto`.
/// TODO: add an `Algorithm` field to `MessageProto`. Either `Algorithm` has
/// to be fully serialised and sent as a whole, or it can be passed over
/// using an ID and the `Eq` instance to discriminate the finite set of
/// algorithms in `ring::digest`.
pub fn from_proto(mut proto: message::MessageProto)
-> Option<Self>
where T: From<Vec<u8>>
@ -94,7 +102,7 @@ impl<T> Message<T> {
}
}
impl<T> BroadcastMessage<T> {
impl<T: Send + Sync> BroadcastMessage<T> {
pub fn into_proto(self) -> BroadcastProto
where T: Into<Vec<u8>>
{
@ -165,10 +173,10 @@ impl ProofProto {
match proof {
Proof {
algorithm, // TODO: use
root_hash,
lemma,
value,
..
} => {
proto.set_root_hash(root_hash);
proto.set_lemma(LemmaProto::into_proto(lemma));

View File

@ -65,7 +65,7 @@ pub trait MessageLoop {
}
pub struct Task {
stream: TcpStream,
pub stream: TcpStream,
buffer: [u8; 1024],
}