diff --git a/src/commst.rs b/src/commst.rs index 2449300..3195b1c 100644 --- a/src/commst.rs +++ b/src/commst.rs @@ -1,6 +1,7 @@ //! Comms task structure. A comms task communicates with a remote node through a //! socket. Local communication with coordinating threads is made via //! `crossbeam_channel::unbounded()`. +use std::io; use std::fmt::Debug; use std::sync::Arc; use std::net::TcpStream; @@ -12,6 +13,15 @@ use proto_io; use proto_io::CodecIo; use messaging::SourcedMessage; +#[derive(Debug)] +pub enum Error { + IoError(io::Error), +} + +impl From for Error { + fn from(err: io::Error) -> Error { Error::IoError(err) } +} + /// A communication task connects a remote node to the thread that manages the /// consensus algorithm. pub struct CommsTask<'a, T: 'a + Clone + Debug + Send + Sync + @@ -51,11 +61,11 @@ where Vec: From /// The main socket IO loop and an asynchronous thread responding to manager /// thread requests. - pub fn run(&mut self) { + pub fn run(&mut self) -> Result<(), Error> { // Borrow parts of `self` before entering the thread binding scope. let tx = Arc::new(self.tx); let rx = Arc::new(self.rx); - let mut io1 = self.io.try_clone().unwrap(); // FIXME: handle errors + let mut io1 = self.io.try_clone()?; let node_index = self.node_index; crossbeam::scope(|scope| { @@ -80,8 +90,7 @@ where Vec: From SourcedMessage { source: node_index, message - }) - .unwrap() + }).unwrap(); }, Err(proto_io::Error::ProtobufError(e)) => warn!("Node {} - Protobuf error {}", node_index, e), @@ -92,5 +101,6 @@ where Vec: From } } }); + Ok(()) } } diff --git a/src/node.rs b/src/node.rs index 175028b..6cf6c75 100644 --- a/src/node.rs +++ b/src/node.rs @@ -1,18 +1,32 @@ //! Networking controls of the consensus node. +use std::io; use std::collections::HashSet; use std::fmt::Debug; use std::hash::Hash; use std::marker::{Send, Sync}; use std::net::SocketAddr; use crossbeam; -use crossbeam_channel::{unbounded, Sender, Receiver}; use connection; use broadcast; -use proto::Message; use commst; use messaging::Messaging; +#[derive(Debug)] +pub enum Error { + IoError(io::Error), + CommsError(commst::Error), + NotImplemented +} + +impl From for Error { + fn from(err: io::Error) -> Error { Error::IoError(err) } +} + +impl From for Error { + fn from(err: commst::Error) -> Error { Error::CommsError(err) } +} + /// This is a structure to start a consensus node. pub struct Node { /// Incoming connection socket. @@ -36,7 +50,7 @@ where Vec: From } /// Consensus node procedure implementing HoneyBadgerBFT. - pub fn run(&self) -> Result + pub fn run(&self) -> Result { let value = &self.value; let connections = connection::make(&self.addr, &self.remotes); @@ -71,7 +85,7 @@ where Vec: From debug!("Broadcast instance 0 succeeded: {}", String::from_utf8(Vec::from(t)).unwrap()); }, - Err(_) => error!("Sender broadcast instance failed") + Err(e) => error!("Broadcast instance 0: {:?}", e) } }); @@ -85,12 +99,16 @@ where Vec: From let node_index = i + 1; scope.spawn(move || { - commst::CommsTask::new(from_comms_tx, - to_comms_rx, - // FIXME: handle error - c.stream.try_clone().unwrap(), - node_index) - .run(); + match commst::CommsTask::new(from_comms_tx, + to_comms_rx, + // FIXME: handle error + c.stream.try_clone().unwrap(), + node_index) + .run() + { + Ok(_) => debug!("Comms task {} succeeded", node_index), + Err(e) => error!("Comms task {}: {:?}", node_index, e) + } }); @@ -107,20 +125,17 @@ where Vec: From Ok(t) => { debug!("Broadcast instance {} succeeded: {}", node_index, - String::from_utf8( - Vec::from(t) - ).unwrap()); + String::from_utf8(Vec::from(t)).unwrap()); }, - Err(_) => error!("Broadcast instance {} failed", i) + Err(e) => error!("Broadcast instance {}: {:?}", + node_index, e) } }); } // TODO: continue the implementation of the asynchronous common // subset algorithm. - - }); // end of thread scope - - Err(()) + Err(Error::NotImplemented) + }) // end of thread scope } }