added Error return types of run() functions, and handled returned errors

This commit is contained in:
Vladimir Komendantskiy 2018-04-06 17:01:14 +01:00
parent fdb41d393f
commit bc071d7a09
2 changed files with 47 additions and 22 deletions

View File

@ -1,6 +1,7 @@
//! Comms task structure. A comms task communicates with a remote node through a //! Comms task structure. A comms task communicates with a remote node through a
//! socket. Local communication with coordinating threads is made via //! socket. Local communication with coordinating threads is made via
//! `crossbeam_channel::unbounded()`. //! `crossbeam_channel::unbounded()`.
use std::io;
use std::fmt::Debug; use std::fmt::Debug;
use std::sync::Arc; use std::sync::Arc;
use std::net::TcpStream; use std::net::TcpStream;
@ -12,6 +13,15 @@ use proto_io;
use proto_io::CodecIo; use proto_io::CodecIo;
use messaging::SourcedMessage; use messaging::SourcedMessage;
#[derive(Debug)]
pub enum Error {
IoError(io::Error),
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error { Error::IoError(err) }
}
/// A communication task connects a remote node to the thread that manages the /// A communication task connects a remote node to the thread that manages the
/// consensus algorithm. /// consensus algorithm.
pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync +
@ -51,11 +61,11 @@ where Vec<u8>: From<T>
/// The main socket IO loop and an asynchronous thread responding to manager /// The main socket IO loop and an asynchronous thread responding to manager
/// thread requests. /// thread requests.
pub fn run(&mut self) { pub fn run(&mut self) -> Result<(), Error> {
// Borrow parts of `self` before entering the thread binding scope. // Borrow parts of `self` before entering the thread binding scope.
let tx = Arc::new(self.tx); let tx = Arc::new(self.tx);
let rx = Arc::new(self.rx); let rx = Arc::new(self.rx);
let mut io1 = self.io.try_clone().unwrap(); // FIXME: handle errors let mut io1 = self.io.try_clone()?;
let node_index = self.node_index; let node_index = self.node_index;
crossbeam::scope(|scope| { crossbeam::scope(|scope| {
@ -80,8 +90,7 @@ where Vec<u8>: From<T>
SourcedMessage { SourcedMessage {
source: node_index, source: node_index,
message message
}) }).unwrap();
.unwrap()
}, },
Err(proto_io::Error::ProtobufError(e)) => Err(proto_io::Error::ProtobufError(e)) =>
warn!("Node {} - Protobuf error {}", node_index, e), warn!("Node {} - Protobuf error {}", node_index, e),
@ -92,5 +101,6 @@ where Vec<u8>: From<T>
} }
} }
}); });
Ok(())
} }
} }

View File

@ -1,18 +1,32 @@
//! Networking controls of the consensus node. //! Networking controls of the consensus node.
use std::io;
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt::Debug; use std::fmt::Debug;
use std::hash::Hash; use std::hash::Hash;
use std::marker::{Send, Sync}; use std::marker::{Send, Sync};
use std::net::SocketAddr; use std::net::SocketAddr;
use crossbeam; use crossbeam;
use crossbeam_channel::{unbounded, Sender, Receiver};
use connection; use connection;
use broadcast; use broadcast;
use proto::Message;
use commst; use commst;
use messaging::Messaging; use messaging::Messaging;
#[derive(Debug)]
pub enum Error {
IoError(io::Error),
CommsError(commst::Error),
NotImplemented
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error { Error::IoError(err) }
}
impl From<commst::Error> for Error {
fn from(err: commst::Error) -> Error { Error::CommsError(err) }
}
/// This is a structure to start a consensus node. /// This is a structure to start a consensus node.
pub struct Node<T> { pub struct Node<T> {
/// Incoming connection socket. /// Incoming connection socket.
@ -36,7 +50,7 @@ where Vec<u8>: From<T>
} }
/// Consensus node procedure implementing HoneyBadgerBFT. /// Consensus node procedure implementing HoneyBadgerBFT.
pub fn run(&self) -> Result<T, ()> pub fn run(&self) -> Result<T, Error>
{ {
let value = &self.value; let value = &self.value;
let connections = connection::make(&self.addr, &self.remotes); let connections = connection::make(&self.addr, &self.remotes);
@ -71,7 +85,7 @@ where Vec<u8>: From<T>
debug!("Broadcast instance 0 succeeded: {}", debug!("Broadcast instance 0 succeeded: {}",
String::from_utf8(Vec::from(t)).unwrap()); String::from_utf8(Vec::from(t)).unwrap());
}, },
Err(_) => error!("Sender broadcast instance failed") Err(e) => error!("Broadcast instance 0: {:?}", e)
} }
}); });
@ -85,12 +99,16 @@ where Vec<u8>: From<T>
let node_index = i + 1; let node_index = i + 1;
scope.spawn(move || { scope.spawn(move || {
commst::CommsTask::new(from_comms_tx, match commst::CommsTask::new(from_comms_tx,
to_comms_rx, to_comms_rx,
// FIXME: handle error // FIXME: handle error
c.stream.try_clone().unwrap(), c.stream.try_clone().unwrap(),
node_index) node_index)
.run(); .run()
{
Ok(_) => debug!("Comms task {} succeeded", node_index),
Err(e) => error!("Comms task {}: {:?}", node_index, e)
}
}); });
@ -107,20 +125,17 @@ where Vec<u8>: From<T>
Ok(t) => { Ok(t) => {
debug!("Broadcast instance {} succeeded: {}", debug!("Broadcast instance {} succeeded: {}",
node_index, node_index,
String::from_utf8( String::from_utf8(Vec::from(t)).unwrap());
Vec::from(t)
).unwrap());
}, },
Err(_) => error!("Broadcast instance {} failed", i) Err(e) => error!("Broadcast instance {}: {:?}",
node_index, e)
} }
}); });
} }
// TODO: continue the implementation of the asynchronous common // TODO: continue the implementation of the asynchronous common
// subset algorithm. // subset algorithm.
Err(Error::NotImplemented)
}); // end of thread scope }) // end of thread scope
Err(())
} }
} }