Handle errors consistently

Error handling is still clumsy. We should switch to something like
`error-chain` or `Result<T, Box<Error>>`, but until then, we can
at least be consistent across modules.
This commit is contained in:
Greg Fitzgerald 2018-07-05 14:37:13 -06:00 committed by Greg Fitzgerald
parent 0f7fdd71cc
commit b95db62be3
4 changed files with 62 additions and 39 deletions

View File

@ -1,7 +1,7 @@
use crdt::ReplicatedData; use crdt::{CrdtError, ReplicatedData};
use rand::distributions::{Distribution, Weighted, WeightedChoice}; use rand::distributions::{Distribution, Weighted, WeightedChoice};
use rand::thread_rng; use rand::thread_rng;
use result::{Error, Result}; use result::Result;
use signature::PublicKey; use signature::PublicKey;
use std; use std;
use std::collections::HashMap; use std::collections::HashMap;
@ -29,7 +29,7 @@ impl<'a, 'b> ChooseRandomPeerStrategy<'a> {
impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> {
fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> {
if options.is_empty() { if options.is_empty() {
return Err(Error::CrdtTooSmall); Err(CrdtError::TooSmall)?;
} }
let n = ((self.random)() as usize) % options.len(); let n = ((self.random)() as usize) % options.len();
@ -174,7 +174,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> {
impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> {
if options.len() < 1 { if options.len() < 1 {
return Err(Error::CrdtTooSmall); Err(CrdtError::TooSmall)?;
} }
let mut weighted_peers = vec![]; let mut weighted_peers = vec![];

View File

@ -42,6 +42,11 @@ const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// minimum membership table size before we start purging dead nodes /// minimum membership table size before we start purging dead nodes
const MIN_TABLE_SIZE: usize = 2; const MIN_TABLE_SIZE: usize = 2;
#[derive(Debug, PartialEq, Eq)]
pub enum CrdtError {
TooSmall,
}
pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr { pub fn parse_port_or_addr(optstr: Option<String>) -> SocketAddr {
let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address");
if let Some(addrstr) = optstr { if let Some(addrstr) = optstr {
@ -393,7 +398,7 @@ impl Crdt {
.collect(); .collect();
if nodes.len() < 1 { if nodes.len() < 1 {
warn!("crdt too small"); warn!("crdt too small");
return Err(Error::CrdtTooSmall); Err(CrdtError::TooSmall)?;
} }
trace!("nodes table {}", nodes.len()); trace!("nodes table {}", nodes.len());
@ -433,13 +438,10 @@ impl Crdt {
.collect(); .collect();
trace!("broadcast results {}", errs.len()); trace!("broadcast results {}", errs.len());
for e in errs { for e in errs {
match e { if let Err(e) = &e {
Err(e) => { error!("broadcast result {:?}", e);
error!("broadcast result {:?}", e);
return Err(Error::IO(e));
}
_ => (),
} }
e?;
*transmit_index += 1; *transmit_index += 1;
} }
Ok(()) Ok(())
@ -491,13 +493,10 @@ impl Crdt {
}) })
.collect(); .collect();
for e in errs { for e in errs {
match e { if let Err(e) = &e {
Err(e) => { error!("broadcast result {:?}", e);
info!("retransmit error {:?}", e);
return Err(Error::IO(e));
}
_ => (),
} }
e?;
} }
Ok(()) Ok(())
} }
@ -541,7 +540,7 @@ impl Crdt {
.filter(|r| r.id != self.me && r.repair_addr != daddr) .filter(|r| r.id != self.me && r.repair_addr != daddr)
.collect(); .collect();
if valid.is_empty() { if valid.is_empty() {
return Err(Error::CrdtTooSmall); Err(CrdtError::TooSmall)?;
} }
let n = (Self::random() as usize) % valid.len(); let n = (Self::random() as usize) % valid.len();
let addr = valid[n].gossip_addr.clone(); let addr = valid[n].gossip_addr.clone();
@ -566,18 +565,14 @@ impl Crdt {
let choose_peer_result = choose_peer_strategy.choose_peer(options); let choose_peer_result = choose_peer_strategy.choose_peer(options);
let v = match choose_peer_result { if let Err(Error::CrdtError(CrdtError::TooSmall)) = &choose_peer_result {
Ok(peer) => peer, trace!(
Err(Error::CrdtTooSmall) => { "crdt too small for gossip {:?} {}",
trace!( &self.me[..4],
"crdt too small for gossip {:?} {}", self.table.len()
&self.me[..4], );
self.table.len()
);
return Err(Error::CrdtTooSmall);
}
Err(e) => return Err(e),
}; };
let v = choose_peer_result?;
let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0);
let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone());
@ -1014,7 +1009,9 @@ impl TestNode {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE}; use crdt::{
parse_port_or_addr, Crdt, CrdtError, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE,
};
use logger; use logger;
use packet::BlobRecycler; use packet::BlobRecycler;
use result::Error; use result::Error;
@ -1140,7 +1137,7 @@ mod tests {
); );
let mut crdt = Crdt::new(me.clone()); let mut crdt = Crdt::new(me.clone());
let rv = crdt.window_index_request(0); let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtTooSmall)); assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
let nxt = ReplicatedData::new( let nxt = ReplicatedData::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.1:1234".parse().unwrap(), "127.0.0.1:1234".parse().unwrap(),
@ -1151,7 +1148,7 @@ mod tests {
); );
crdt.insert(&nxt); crdt.insert(&nxt);
let rv = crdt.window_index_request(0); let rv = crdt.window_index_request(0);
assert_matches!(rv, Err(Error::CrdtTooSmall)); assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
let nxt = ReplicatedData::new( let nxt = ReplicatedData::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.2:1234".parse().unwrap(), "127.0.0.2:1234".parse().unwrap(),
@ -1202,7 +1199,7 @@ mod tests {
); );
let mut crdt = Crdt::new(me.clone()); let mut crdt = Crdt::new(me.clone());
let rv = crdt.gossip_request(); let rv = crdt.gossip_request();
assert_matches!(rv, Err(Error::CrdtTooSmall)); assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall)));
let nxt1 = ReplicatedData::new( let nxt1 = ReplicatedData::new(
KeyPair::new().pubkey(), KeyPair::new().pubkey(),
"127.0.0.2:1234".parse().unwrap(), "127.0.0.2:1234".parse().unwrap(),

View File

@ -2,9 +2,13 @@
use bank; use bank;
use bincode; use bincode;
use crdt;
#[cfg(feature = "erasure")]
use erasure;
use serde_json; use serde_json;
use std; use std;
use std::any::Any; use std::any::Any;
use streamer;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -16,10 +20,11 @@ pub enum Error {
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
Serialize(std::boxed::Box<bincode::ErrorKind>), Serialize(std::boxed::Box<bincode::ErrorKind>),
BankError(bank::BankError), BankError(bank::BankError),
CrdtError(crdt::CrdtError),
WindowError(streamer::WindowError),
#[cfg(feature = "erasure")]
ErasureError(erasure::ErasureError),
SendError, SendError,
Services,
CrdtTooSmall,
GenericError,
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@ -39,6 +44,22 @@ impl std::convert::From<bank::BankError> for Error {
Error::BankError(e) Error::BankError(e)
} }
} }
impl std::convert::From<crdt::CrdtError> for Error {
fn from(e: crdt::CrdtError) -> Error {
Error::CrdtError(e)
}
}
impl std::convert::From<streamer::WindowError> for Error {
fn from(e: streamer::WindowError) -> Error {
Error::WindowError(e)
}
}
#[cfg(feature = "erasure")]
impl std::convert::From<erasure::ErasureError> for Error {
fn from(e: erasure::ErasureError) -> Error {
Error::ErasureError(e)
}
}
impl<T> std::convert::From<std::sync::mpsc::SendError<T>> for Error { impl<T> std::convert::From<std::sync::mpsc::SendError<T>> for Error {
fn from(_e: std::sync::mpsc::SendError<T>) -> Error { fn from(_e: std::sync::mpsc::SendError<T>) -> Error {
Error::SendError Error::SendError

View File

@ -6,7 +6,7 @@ use erasure;
use packet::{ use packet::{
Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE, Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE,
}; };
use result::{Error, Result}; use result::Result;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::mem; use std::mem;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
@ -23,6 +23,11 @@ pub type BlobSender = Sender<SharedBlobs>;
pub type BlobReceiver = Receiver<SharedBlobs>; pub type BlobReceiver = Receiver<SharedBlobs>;
pub type Window = Arc<RwLock<Vec<Option<SharedBlob>>>>; pub type Window = Arc<RwLock<Vec<Option<SharedBlob>>>>;
#[derive(Debug, PartialEq, Eq)]
pub enum WindowError {
GenericError,
}
fn recv_loop( fn recv_loop(
sock: &UdpSocket, sock: &UdpSocket,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
@ -152,7 +157,7 @@ fn find_next_missing(
received: &mut u64, received: &mut u64,
) -> Result<Vec<(SocketAddr, Vec<u8>)>> { ) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
if *received <= *consumed { if *received <= *consumed {
return Err(Error::GenericError); Err(WindowError::GenericError)?;
} }
let window = locked_window.read().unwrap(); let window = locked_window.read().unwrap();
let reqs: Vec<_> = (*consumed..*received) let reqs: Vec<_> = (*consumed..*received)
@ -575,7 +580,7 @@ fn broadcast(
&mut window.write().unwrap(), &mut window.write().unwrap(),
*receive_index as usize, *receive_index as usize,
blobs_len, blobs_len,
).map_err(|_| Error::GenericError)?; )?;
} }
*receive_index += blobs_len as u64; *receive_index += blobs_len as u64;