From b95db62be3e3d9c8435de1823df8e3d89b9fde56 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 5 Jul 2018 14:37:13 -0600 Subject: [PATCH] Handle errors consistently Error handling is still clumsy. We should switch to something like `error-chain` or `Result>`, but until then, we can at least be consistent across modules. --- src/choose_gossip_peer_strategy.rs | 8 ++--- src/crdt.rs | 55 ++++++++++++++---------------- src/result.rs | 27 +++++++++++++-- src/streamer.rs | 11 ++++-- 4 files changed, 62 insertions(+), 39 deletions(-) diff --git a/src/choose_gossip_peer_strategy.rs b/src/choose_gossip_peer_strategy.rs index dec6a0aee..7a0850ca2 100644 --- a/src/choose_gossip_peer_strategy.rs +++ b/src/choose_gossip_peer_strategy.rs @@ -1,7 +1,7 @@ -use crdt::ReplicatedData; +use crdt::{CrdtError, ReplicatedData}; use rand::distributions::{Distribution, Weighted, WeightedChoice}; use rand::thread_rng; -use result::{Error, Result}; +use result::Result; use signature::PublicKey; use std; use std::collections::HashMap; @@ -29,7 +29,7 @@ impl<'a, 'b> ChooseRandomPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> { fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { if options.is_empty() { - return Err(Error::CrdtTooSmall); + Err(CrdtError::TooSmall)?; } let n = ((self.random)() as usize) % options.len(); @@ -174,7 +174,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { if options.len() < 1 { - return Err(Error::CrdtTooSmall); + Err(CrdtError::TooSmall)?; } let mut weighted_peers = vec![]; diff --git a/src/crdt.rs b/src/crdt.rs index f9ef05a2b..4beb3073b 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -42,6 +42,11 @@ const GOSSIP_SLEEP_MILLIS: u64 = 100; /// minimum membership table size before we start purging dead nodes const MIN_TABLE_SIZE: usize = 2; +#[derive(Debug, PartialEq, Eq)] +pub enum CrdtError { + TooSmall, +} + pub fn parse_port_or_addr(optstr: Option) -> SocketAddr { let daddr: SocketAddr = "0.0.0.0:8000".parse().expect("default socket address"); if let Some(addrstr) = optstr { @@ -393,7 +398,7 @@ impl Crdt { .collect(); if nodes.len() < 1 { warn!("crdt too small"); - return Err(Error::CrdtTooSmall); + Err(CrdtError::TooSmall)?; } trace!("nodes table {}", nodes.len()); @@ -433,13 +438,10 @@ impl Crdt { .collect(); trace!("broadcast results {}", errs.len()); for e in errs { - match e { - Err(e) => { - error!("broadcast result {:?}", e); - return Err(Error::IO(e)); - } - _ => (), + if let Err(e) = &e { + error!("broadcast result {:?}", e); } + e?; *transmit_index += 1; } Ok(()) @@ -491,13 +493,10 @@ impl Crdt { }) .collect(); for e in errs { - match e { - Err(e) => { - info!("retransmit error {:?}", e); - return Err(Error::IO(e)); - } - _ => (), + if let Err(e) = &e { + error!("broadcast result {:?}", e); } + e?; } Ok(()) } @@ -541,7 +540,7 @@ impl Crdt { .filter(|r| r.id != self.me && r.repair_addr != daddr) .collect(); if valid.is_empty() { - return Err(Error::CrdtTooSmall); + Err(CrdtError::TooSmall)?; } let n = (Self::random() as usize) % valid.len(); let addr = valid[n].gossip_addr.clone(); @@ -566,18 +565,14 @@ impl Crdt { let choose_peer_result = choose_peer_strategy.choose_peer(options); - let v = match choose_peer_result { - Ok(peer) => peer, - Err(Error::CrdtTooSmall) => { - trace!( - "crdt too small for gossip {:?} {}", - &self.me[..4], - self.table.len() - ); - return Err(Error::CrdtTooSmall); - } - Err(e) => return Err(e), + if let Err(Error::CrdtError(CrdtError::TooSmall)) = &choose_peer_result { + trace!( + "crdt too small for gossip {:?} {}", + &self.me[..4], + self.table.len() + ); }; + let v = choose_peer_result?; let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); @@ -1014,7 +1009,9 @@ impl TestNode { #[cfg(test)] mod tests { - use crdt::{parse_port_or_addr, Crdt, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE}; + use crdt::{ + parse_port_or_addr, Crdt, CrdtError, ReplicatedData, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, + }; use logger; use packet::BlobRecycler; use result::Error; @@ -1140,7 +1137,7 @@ mod tests { ); let mut crdt = Crdt::new(me.clone()); let rv = crdt.window_index_request(0); - assert_matches!(rv, Err(Error::CrdtTooSmall)); + assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); let nxt = ReplicatedData::new( KeyPair::new().pubkey(), "127.0.0.1:1234".parse().unwrap(), @@ -1151,7 +1148,7 @@ mod tests { ); crdt.insert(&nxt); let rv = crdt.window_index_request(0); - assert_matches!(rv, Err(Error::CrdtTooSmall)); + assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); let nxt = ReplicatedData::new( KeyPair::new().pubkey(), "127.0.0.2:1234".parse().unwrap(), @@ -1202,7 +1199,7 @@ mod tests { ); let mut crdt = Crdt::new(me.clone()); let rv = crdt.gossip_request(); - assert_matches!(rv, Err(Error::CrdtTooSmall)); + assert_matches!(rv, Err(Error::CrdtError(CrdtError::TooSmall))); let nxt1 = ReplicatedData::new( KeyPair::new().pubkey(), "127.0.0.2:1234".parse().unwrap(), diff --git a/src/result.rs b/src/result.rs index e31e45e2d..af056ba35 100644 --- a/src/result.rs +++ b/src/result.rs @@ -2,9 +2,13 @@ use bank; use bincode; +use crdt; +#[cfg(feature = "erasure")] +use erasure; use serde_json; use std; use std::any::Any; +use streamer; #[derive(Debug)] pub enum Error { @@ -16,10 +20,11 @@ pub enum Error { RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), Serialize(std::boxed::Box), BankError(bank::BankError), + CrdtError(crdt::CrdtError), + WindowError(streamer::WindowError), + #[cfg(feature = "erasure")] + ErasureError(erasure::ErasureError), SendError, - Services, - CrdtTooSmall, - GenericError, } pub type Result = std::result::Result; @@ -39,6 +44,22 @@ impl std::convert::From for Error { Error::BankError(e) } } +impl std::convert::From for Error { + fn from(e: crdt::CrdtError) -> Error { + Error::CrdtError(e) + } +} +impl std::convert::From for Error { + fn from(e: streamer::WindowError) -> Error { + Error::WindowError(e) + } +} +#[cfg(feature = "erasure")] +impl std::convert::From for Error { + fn from(e: erasure::ErasureError) -> Error { + Error::ErasureError(e) + } +} impl std::convert::From> for Error { fn from(_e: std::sync::mpsc::SendError) -> Error { Error::SendError diff --git a/src/streamer.rs b/src/streamer.rs index 91674657f..9e215a773 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -6,7 +6,7 @@ use erasure; use packet::{ Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedBlobs, SharedPackets, BLOB_SIZE, }; -use result::{Error, Result}; +use result::Result; use std::collections::VecDeque; use std::mem; use std::net::{SocketAddr, UdpSocket}; @@ -23,6 +23,11 @@ pub type BlobSender = Sender; pub type BlobReceiver = Receiver; pub type Window = Arc>>>; +#[derive(Debug, PartialEq, Eq)] +pub enum WindowError { + GenericError, +} + fn recv_loop( sock: &UdpSocket, exit: &Arc, @@ -152,7 +157,7 @@ fn find_next_missing( received: &mut u64, ) -> Result)>> { if *received <= *consumed { - return Err(Error::GenericError); + Err(WindowError::GenericError)?; } let window = locked_window.read().unwrap(); let reqs: Vec<_> = (*consumed..*received) @@ -575,7 +580,7 @@ fn broadcast( &mut window.write().unwrap(), *receive_index as usize, blobs_len, - ).map_err(|_| Error::GenericError)?; + )?; } *receive_index += blobs_len as u64;