added shared state of the broadcast stage to the main consensus node loop

This commit is contained in:
Vladimir Komendantskiy 2018-03-16 21:04:06 +00:00
parent 86e133d2b4
commit 4353b1bb3e
4 changed files with 35 additions and 16 deletions

View File

@ -2,32 +2,33 @@
use std::fmt::Debug;
use std::collections::{HashMap, HashSet};
use std::net::{TcpStream, TcpListener, SocketAddr};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver};
//use errors::ResultExt;
use task::{Error, MessageLoop, Task};
use proto::*;
use std::marker::{Send, Sync};
mod stage;
pub mod stage;
use self::stage::*;
/// A broadcast task is an instance of `Task`, a message-handling task with a
/// main loop.
pub struct BroadcastTask<T> {
pub struct BroadcastTask<T: Send + Sync> {
/// The underlying task that handles sending and receiving messages.
task: Task,
/// The receive end of the comms channel. The transmit end is stored in
/// `stage`.
receiver: Receiver<Message<T>>,
/// Shared state of the broadcast stage.
stage: Stage<T>
stage: Arc<Mutex<Stage<T>>>
}
impl<T: Clone + Debug + Send + Sync + 'static> BroadcastTask<T> {
pub fn new(stream: TcpStream,
receiver: Receiver<Message<T>>,
stage: Stage<T>) -> Self {
stage: Arc<Mutex<Stage<T>>>) -> Self {
BroadcastTask {
task: Task::new(stream),
receiver: receiver,
@ -40,6 +41,8 @@ impl<T: Clone + Debug + Send + Sync + 'static> BroadcastTask<T> {
{
// info!("Message received: {:?}", message);
if let Message::Broadcast(b) = message {
Ok(())
/*
match b {
BroadcastMessage::Value(proof) => {
self.stage.values.insert(proof.root_hash.clone(), proof.clone());
@ -54,6 +57,7 @@ impl<T: Clone + Debug + Send + Sync + 'static> BroadcastTask<T> {
Ok(())
}
}
*/
}
else {
warn!("Unexpected message type");
@ -67,10 +71,10 @@ impl<T: Clone + Debug + Send + Sync + 'static> BroadcastTask<T> {
pub fn receiver_thread(&self) {
::std::thread::spawn(move || {
loop {
let message = self.receiver.recv().unwrap();
info!("Task {:?} received message {:?}",
self.task.stream.peer_addr().unwrap(),
message);
// let message = self.receiver.recv().unwrap();
// info!("Task {:?} received message {:?}",
// self.task.stream.peer_addr().unwrap(),
// message);
}
});
}

View File

@ -1,25 +1,26 @@
//! Broadcast stage of the consensus algorithm.
use std::collections::{HashMap, HashSet};
use std::marker::{Send, Sync};
use std::sync::mpsc::{channel, Sender};
use std::sync::Mutex;
use merkle::*;
use proto::*;
pub struct Stage<T> {
pub struct Stage<T: Send + Sync> {
/// Tx channels to communicate with all tasks.
pub senders: Vec<Sender<Message<T>>>,
/// Messages of type Value received so far, keyed with the root hash for
/// easy access.
pub values: Mutex<HashMap<Vec<u8>, Proof<T>>>,
pub values: HashMap<Vec<u8>, Proof<T>>,
/// Messages of type Echo received so far, keyed with the root hash for
/// easy access.
pub echos: Mutex<HashMap<Vec<u8>, Proof<T>>>,
pub echos: HashMap<Vec<u8>, Proof<T>>,
/// Messages of type Ready received so far. That is, the root hashes in
/// those messages.
pub readys: Mutex<HashSet<Vec<u8>>>
pub readys: HashSet<Vec<u8>>
}
impl<T> Stage<T> {
impl<T: Send + Sync> Stage<T> {
pub fn new(senders: Vec<Sender<Message<T>>>) -> Self {
Stage {
senders: senders,

View File

@ -2,8 +2,13 @@
use std::sync::mpsc;
use std::fmt::Debug;
use std::collections::{HashMap, HashSet};
use std::marker::{Send, Sync};
use std::net::{TcpStream, TcpListener, SocketAddr};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Sender, Receiver, channel};
use broadcast::*;
use broadcast::stage::Stage as BroadcastStage;
use proto::Message;
/// This is a structure to start a consensus node.
pub struct Node {
@ -18,15 +23,24 @@ impl Node {
Node {addr, remotes}
}
pub fn run(&self) {
pub fn run<T: Clone + Debug + Send + Sync + 'static>(&self) {
// Listen for incoming connections on a given TCP port.
let listener = TcpListener::bind(&self.addr).unwrap();
let broadcast_stage: Arc<Mutex<BroadcastStage<T>>> =
Arc::new(Mutex::new(BroadcastStage::new(Vec::new())));
for stream in listener.incoming() {
match stream {
Ok(stream) => {
info!("New connection from {:?}",
stream.peer_addr().unwrap());
let (tx, rx): (Sender<Message<T>>, Receiver<Message<T>>) =
channel();
let stage = Arc::clone(&broadcast_stage);
// Insert the transmit handle connected to this task into
// the shared list of senders.
stage.lock().unwrap().senders.push(tx);
let task = BroadcastTask::new(stream, rx, stage);
// TODO: spawn a thread for the connected socket
}

View File

@ -86,7 +86,7 @@ impl Task where Self: MessageLoop {
}
pub fn receive_message<T>(&mut self) -> Result<Message<T>, Error>
where T: From<Vec<u8>>
where T: From<Vec<u8>> + Send + Sync
{
self.stream.read_exact(&mut self.buffer[0..4])?;
let frame_start = decode_u32_from_be(&self.buffer[0..4])?;
@ -112,7 +112,7 @@ impl Task where Self: MessageLoop {
pub fn send_message<T>(&mut self, message: Message<T>)
-> Result<(), Error>
where T: Into<Vec<u8>>
where T: Into<Vec<u8>> + Send + Sync
{
let mut buffer: [u8; 4] = [0; 4];
// Wrap stream