Remove failure from zebra-chain, zebra-network.

Failure uses a distinct Fail trait rather than the standard library's
Error trait, which causes a lot of interoperability problems with tower
and other Error-using crates.  Since failure was created, the standard
library's Error trait was improved, and its conveniences are now
available without the custom Fail trait using `thiserror` (for easy
error derives) and `anyhow` (for a better boxed Error).
This commit is contained in:
Henry de Valence 2019-10-15 20:38:26 -07:00 committed by Deirdre Connolly
parent 199038e6b8
commit f6e62b0f5e
13 changed files with 146 additions and 142 deletions

View File

@ -8,8 +8,8 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
thiserror = "1"
byteorder = "1.3"
chrono = "0.4"
failure = "0.1"
#hex = "0.4" This conflicts with tracing-subscriber?
sha2 = "0.8"

View File

@ -1,9 +1,6 @@
//! Blockchain-related datastructures for Zebra. 🦓
#![deny(missing_docs)]
#[macro_use]
extern crate failure;
mod merkle_tree;
mod sha256d_writer;

View File

@ -10,25 +10,19 @@ use std::io;
use std::net::{IpAddr, SocketAddr};
use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt};
use thiserror::Error;
/// A serialization error.
// XXX refine error types -- better to use boxed errors?
#[derive(Fail, Debug)]
#[derive(Error, Debug)]
pub enum SerializationError {
/// An underlying IO error.
#[fail(display = "io error {}", _0)]
IoError(io::Error),
#[error("io error")]
Io(#[from] io::Error),
/// The data to be deserialized was malformed.
// XXX refine errors
#[fail(display = "parse error: {}", _0)]
ParseError(&'static str),
}
// Allow upcasting io::Error to SerializationError
impl From<io::Error> for SerializationError {
fn from(e: io::Error) -> Self {
Self::IoError(e)
}
#[error("parse error: {0}")]
Parse(&'static str),
}
/// Consensus-critical serialization for Zcash.

View File

@ -13,7 +13,7 @@ bytes = "0.4"
rand = "0.7"
byteorder = "1.3"
chrono = "0.4"
failure = "0.1"
thiserror = "1"
serde = { version = "1", features = ["serde_derive"] }
pin-project = "0.4"
# indexmap has rayon support for parallel iteration,

View File

@ -7,8 +7,6 @@ extern crate pin_project;
#[macro_use]
extern crate serde;
#[macro_use]
extern crate failure;
#[macro_use]
extern crate tracing;
#[macro_use]
extern crate bitflags;

View File

@ -4,30 +4,12 @@
mod client;
/// Asynchronously connects to peers.
mod connector;
/// Peer-related errors.
mod error;
/// Handles inbound requests from the network to our node.
mod server;
pub use client::PeerClient;
pub use connector::PeerConnector;
pub use error::{PeerError, SharedPeerError};
pub use server::PeerServer;
/// An error related to a peer connection.
#[derive(Fail, Debug, Clone)]
pub enum PeerError {
/// Wrapper around `failure::Error` that can be `Clone`.
#[fail(display = "{}", _0)]
Inner(std::sync::Arc<failure::Error>),
}
impl From<failure::Error> for PeerError {
fn from(e: failure::Error) -> PeerError {
PeerError::Inner(std::sync::Arc::new(e))
}
}
// XXX hack
impl Into<crate::BoxedStdError> for PeerError {
fn into(self) -> crate::BoxedStdError {
Box::new(format_err!("dropped error info").compat())
}
}

View File

@ -12,7 +12,7 @@ use tower::Service;
use crate::protocol::internal::{Request, Response};
use super::{server::ErrorSlot, PeerError};
use super::{error::ErrorSlot, SharedPeerError};
/// The "client" duplex half of a peer connection.
pub struct PeerClient {
@ -29,12 +29,12 @@ pub struct PeerClient {
#[derive(Debug)]
pub(super) struct ClientRequest(
pub(super) Request,
pub(super) oneshot::Sender<Result<Response, PeerError>>,
pub(super) oneshot::Sender<Result<Response, SharedPeerError>>,
);
impl Service<Request> for PeerClient {
type Response = Response;
type Error = PeerError;
type Error = SharedPeerError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
@ -52,6 +52,7 @@ impl Service<Request> for PeerClient {
fn call(&mut self, req: Request) -> Self::Future {
use futures::future::FutureExt;
use tracing_futures::Instrument;
let (tx, rx) = oneshot::channel();
match self.server_tx.try_send(ClientRequest(req, tx)) {
Err(e) => {
@ -68,16 +69,15 @@ impl Service<Request> for PeerClient {
panic!("called call without poll_ready");
}
}
// need a bit of yoga to get result types to align,
// because the oneshot future can error
Ok(()) => rx
.map(|val| match val {
Ok(Ok(rsp)) => Ok(rsp),
Ok(Err(e)) => Err(e),
Err(_) => Err(format_err!("oneshot died").into()),
Ok(()) => {
// The reciever end of the oneshot is itself a future.
rx.map(|oneshot_recv_result| {
oneshot_recv_result
.expect("ClientRequest oneshot sender must not be dropped before send")
})
.instrument(self.span.clone())
.boxed(),
.boxed()
}
}
}
}

View File

@ -5,7 +5,6 @@ use std::{
};
use chrono::Utc;
use failure::Error;
use futures::channel::mpsc;
use tokio::{codec::Framed, net::TcpStream, prelude::*};
use tower::Service;
@ -21,10 +20,7 @@ use crate::{
BoxedStdError, Config, Network,
};
use super::{
client::PeerClient,
server::{ErrorSlot, PeerServer, ServerState},
};
use super::{error::ErrorSlot, server::ServerState, PeerClient, PeerError, PeerServer};
/// A [`Service`] that connects to a remote peer and constructs a client/server pair.
pub struct PeerConnector<S> {
@ -64,12 +60,12 @@ where
impl<S> Service<SocketAddr> for PeerConnector<S>
where
S: Service<Request, Response = Response> + Clone + Send + 'static,
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
S::Future: Send,
S::Error: Send + Into<BoxedStdError>,
{
type Response = PeerClient;
type Error = Error;
type Error = PeerError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -93,7 +89,7 @@ where
debug!("opening tcp stream");
let mut stream = Framed::new(
TcpStream::connect(addr).await?,
TcpStream::connect(addr).await.expect("PeerError does not contain an io::Error variant, but this code will be removed in the next PR, so there's no need to handle this error"),
Codec::builder().for_network(network).finish(),
);
@ -121,7 +117,7 @@ where
let remote_version = stream
.next()
.await
.ok_or_else(|| format_err!("stream closed during handshake"))??;
.ok_or_else(|| PeerError::ConnectionClosed)??;
debug!(
?remote_version,
@ -132,7 +128,7 @@ where
let remote_verack = stream
.next()
.await
.ok_or_else(|| format_err!("stream closed during handshake"))??;
.ok_or_else(|| PeerError::ConnectionClosed)??;
debug!(?remote_verack, "got remote peer's verack");

View File

@ -0,0 +1,53 @@
use std::sync::{Arc, Mutex};
use thiserror::Error;
use zebra_chain::serialization::SerializationError;
/// A wrapper around `Arc<PeerError>` that implements `Error`.
#[derive(Error, Debug, Clone)]
#[error("{0}")]
pub struct SharedPeerError(#[from] Arc<PeerError>);
/// An error related to peer connection handling.
#[derive(Error, Debug)]
pub enum PeerError {
/// The remote peer closed the connection.
#[error("Peer closed connection")]
ConnectionClosed,
/// The [`PeerClient`] half of the [`PeerClient`]/[`PeerServer`] pair died before
/// the [`PeerServer`] half did.
#[error("PeerClient instance died")]
DeadPeerClient,
/// The [`PeerServer`] half of the [`PeerServer`]/[`PeerClient`] pair died before
/// the [`PeerClient`] half did.
#[error("PeerServer instance died")]
DeadPeerServer,
/// The remote peer did not respond to a [`PeerClient`] request in time.
#[error("Client request timed out")]
ClientRequestTimeout,
/// A serialization error occurred while reading or writing a message.
#[error("Serialization error")]
Serialization(#[from] SerializationError),
/// A badly-behaved remote peer sent a handshake message after the handshake was
/// already complete.
#[error("Remote peer sent handshake messages after handshake")]
DuplicateHandshake,
/// This node's internal services were overloaded, so the connection was dropped
/// to shed load.
#[error("Internal services over capacity")]
Overloaded,
}
#[derive(Default, Clone)]
pub(super) struct ErrorSlot(pub(super) Arc<Mutex<Option<SharedPeerError>>>);
impl ErrorSlot {
pub fn try_get_error(&self) -> Option<SharedPeerError> {
self.0
.lock()
.expect("error mutex should be unpoisoned")
.as_ref()
.map(|e| e.clone())
}
}

View File

@ -1,6 +1,5 @@
use std::sync::{Arc, Mutex};
use failure::Error;
use futures::{
channel::{mpsc, oneshot},
future::{self, Either},
@ -11,6 +10,7 @@ use tokio::{
timer::{delay_for, Delay},
};
use tower::Service;
use zebra_chain::serialization::SerializationError;
use crate::{
constants,
@ -21,26 +21,13 @@ use crate::{
BoxedStdError,
};
use super::{client::ClientRequest, PeerError};
#[derive(Default, Clone)]
pub(super) struct ErrorSlot(Arc<Mutex<Option<PeerError>>>);
impl ErrorSlot {
pub fn try_get_error(&self) -> Option<PeerError> {
self.0
.lock()
.expect("error mutex should be unpoisoned")
.as_ref()
.map(|e| e.clone())
}
}
use super::{client::ClientRequest, error::ErrorSlot, PeerError, SharedPeerError};
pub(super) enum ServerState {
/// Awaiting a client request or a peer message.
AwaitingRequest,
/// Awaiting a peer message we can interpret as a client request.
AwaitingResponse(Request, oneshot::Sender<Result<Response, PeerError>>),
AwaitingResponse(Request, oneshot::Sender<Result<Response, SharedPeerError>>),
/// A failure has occurred and we are shutting down the server.
Failed,
}
@ -62,15 +49,14 @@ pub struct PeerServer<S, Tx> {
impl<S, Tx> PeerServer<S, Tx>
where
S: Service<Request, Response = Response>,
S: Service<Request, Response = Response, Error = BoxedStdError>,
S::Error: Into<BoxedStdError>,
Tx: Sink<Message> + Unpin,
Tx::Error: Into<Error>,
Tx: Sink<Message, Error = SerializationError> + Unpin,
{
/// Run this peer server to completion.
pub async fn run<Rx>(mut self, mut peer_rx: Rx)
where
Rx: Stream<Item = Result<Message, Error>> + Unpin,
Rx: Stream<Item = Result<Message, SerializationError>> + Unpin,
{
// At a high level, the event loop we want is as follows: we check for any
// incoming messages from the remote peer, check if they should be interpreted
@ -122,9 +108,7 @@ where
.as_mut()
.expect("timeout must be set while awaiting response");
match future::select(peer_rx.next(), timer_ref).await {
Either::Left((None, _)) => {
self.fail_with(format_err!("peer closed connection").into())
}
Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed),
// XXX switch back to hard failure when we parse all message types
//Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()),
Either::Left((Some(Err(peer_err)), _timer)) => error!(%peer_err),
@ -139,7 +123,8 @@ where
// Re-matching lets us take ownership of tx
self.state = match self.state {
ServerState::AwaitingResponse(_, tx) => {
let _ = tx.send(Err(format_err!("request timed out").into()));
let e = PeerError::ClientRequestTimeout;
let _ = tx.send(Err(Arc::new(e).into()));
ServerState::AwaitingRequest
}
_ => panic!("unreachable"),
@ -179,7 +164,7 @@ where
if guard.is_some() {
panic!("called fail_with on already-failed server state");
} else {
*guard = Some(e);
*guard = Some(Arc::new(e).into());
}
// Drop the guard immediately to release the mutex.
std::mem::drop(guard);
@ -215,13 +200,13 @@ where
.peer_tx
.send(Message::GetAddr)
.await
.map_err(|e| e.into().into())
.map_err(|e| e.into())
.map(|()| AwaitingResponse(GetPeers, tx)),
(AwaitingRequest, PushPeers(addrs)) => self
.peer_tx
.send(Message::Addr(addrs))
.await
.map_err(|e| e.into().into())
.map_err(|e| e.into())
.map(|()| {
// PushPeers does not have a response, so we return OK as
// soon as we send the request. Sending through a oneshot
@ -280,17 +265,17 @@ where
// These messages are transport-related, handle them separately:
match msg {
Message::Version { .. } => {
self.fail_with(format_err!("got version message after handshake").into());
self.fail_with(PeerError::DuplicateHandshake);
return;
}
Message::Verack { .. } => {
self.fail_with(format_err!("got verack message after handshake").into());
self.fail_with(PeerError::DuplicateHandshake);
return;
}
Message::Ping(nonce) => {
match self.peer_tx.send(Message::Pong(nonce)).await {
Ok(()) => {}
Err(e) => self.fail_with(e.into().into()),
Err(e) => self.fail_with(e.into()),
}
return;
}
@ -317,26 +302,31 @@ where
/// of connected peers.
async fn drive_peer_request(&mut self, req: Request) {
trace!(?req);
use tower::ServiceExt;
if let Err(e) = self
.svc
.ready()
.await
.map_err(|e| Error::from_boxed_compat(e.into()))
{
self.fail_with(e.into());
use tower::{load_shed::error::Overloaded, ServiceExt};
if let Err(_) = self.svc.ready().await {
// Treat all service readiness errors as Overloaded
self.fail_with(PeerError::Overloaded);
}
match self
.svc
.call(req)
.await
.map_err(|e| Error::from_boxed_compat(e.into()))
{
Err(e) => self.fail_with(e.into()),
Ok(Response::Ok) => { /* generic success, do nothing */ }
Ok(Response::Peers(addrs)) => {
let rsp = match self.svc.call(req).await {
Err(e) => {
if e.is::<Overloaded>() {
self.fail_with(PeerError::Overloaded);
} else {
// We could send a reject to the remote peer.
error!(%e);
}
return;
}
Ok(rsp) => rsp,
};
match rsp {
Response::Ok => { /* generic success, do nothing */ }
Response::Peers(addrs) => {
if let Err(e) = self.peer_tx.send(Message::Addr(addrs)).await {
self.fail_with(e.into().into());
self.fail_with(e.into());
}
}
}

View File

@ -4,11 +4,10 @@ use std::{
task::{Context, Poll},
};
use failure::Error;
use tokio::prelude::*;
use tower::discover::{Change, Discover};
use crate::peer::PeerClient;
use crate::peer::{PeerClient, PeerError};
/// A [`tower::discover::Discover`] implementation to report new `PeerClient`s.
///
@ -22,7 +21,7 @@ pub struct PeerDiscover {
impl Discover for PeerDiscover {
type Key = SocketAddr;
type Service = PeerClient;
type Error = Error;
type Error = PeerError;
fn poll_discover(
self: Pin<&mut Self>,

View File

@ -5,12 +5,13 @@ use std::io::{Cursor, Read, Write};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use bytes::BytesMut;
use chrono::{TimeZone, Utc};
use failure::Error;
use tokio::codec::{Decoder, Encoder};
use zebra_chain::{
block::{Block, BlockHeader, BlockHeaderHash},
serialization::{ReadZcashExt, WriteZcashExt, ZcashDeserialize, ZcashSerialize},
serialization::{
ReadZcashExt, SerializationError as Error, WriteZcashExt, ZcashDeserialize, ZcashSerialize,
},
transaction::Transaction,
types::{BlockHeight, Sha256dChecksum},
};
@ -273,7 +274,7 @@ impl Codec {
// FilterAdd => {}
// FilterClear => {}
// MerkleBlock => {}
_ => bail!("unimplemented message type"),
_ => return Err(Error::Parse("unimplemented message type")),
}
Ok(())
}
@ -296,6 +297,7 @@ impl Decoder for Codec {
type Error = Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
use Error::Parse;
match self.state {
DecodeState::Head => {
// First check that the src buffer contains an entire header.
@ -316,14 +318,12 @@ impl Decoder for Codec {
let checksum = Sha256dChecksum(header_reader.read_4_bytes()?);
trace!(?self.state, ?magic, ?command, body_len, ?checksum, "read header from src buffer");
ensure!(
magic == self.builder.network.magic(),
"supplied magic did not meet expectations"
);
ensure!(
body_len < self.builder.max_len,
"body length exceeded maximum size",
);
if magic != self.builder.network.magic() {
return Err(Parse("supplied magic did not meet expectations"));
}
if body_len >= self.builder.max_len {
return Err(Parse("body length exceeded maximum size"));
}
// Reserve buffer space for the expected body and the following header.
src.reserve(body_len + HEADER_LEN);
@ -354,10 +354,11 @@ impl Decoder for Codec {
let body = src.split_to(body_len);
self.state = DecodeState::Head;
ensure!(
checksum == Sha256dChecksum::from(&body[..]),
"supplied message checksum does not match computed checksum"
);
if checksum != Sha256dChecksum::from(&body[..]) {
return Err(Parse(
"supplied message checksum does not match computed checksum",
));
}
let body_reader = Cursor::new(&body);
match &command {
@ -381,7 +382,7 @@ impl Decoder for Codec {
b"filteradd\0\0\0" => self.read_filteradd(body_reader),
b"filterclear\0" => self.read_filterclear(body_reader),
b"merkleblock\0" => self.read_merkleblock(body_reader),
_ => bail!("unknown command"),
_ => return Err(Parse("unknown command")),
}
// We need Ok(Some(msg)) to signal that we're done decoding.
// This is also convenient for tracing the parse result.
@ -415,7 +416,7 @@ impl Codec {
relay: match reader.read_u8()? {
0 => false,
1 => true,
_ => bail!("non-bool value supplied in relay field"),
_ => return Err(Error::Parse("non-bool value supplied in relay field")),
},
})
}
@ -433,8 +434,7 @@ impl Codec {
}
fn read_reject<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
trace!("reject");
bail!("unimplemented message type")
return Err(Error::Parse("reject messages are not implemented"));
}
fn read_addr<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
@ -544,28 +544,23 @@ impl Codec {
}
fn read_mempool<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
trace!("mempool");
bail!("unimplemented message type")
return Err(Error::Parse("mempool messages are not implemented"));
}
fn read_filterload<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
trace!("filterload");
bail!("unimplemented message type")
return Err(Error::Parse("filterload messages are not implemented"));
}
fn read_filteradd<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
trace!("filteradd");
bail!("unimplemented message type")
return Err(Error::Parse("filteradd messages are not implemented"));
}
fn read_filterclear<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
trace!("filterclear");
bail!("unimplemented message type")
return Err(Error::Parse("filterclear messages are not implemented"));
}
fn read_merkleblock<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
trace!("merkleblock");
bail!("unimplemented message type")
return Err(Error::Parse("merkleblock messages are not implemented"));
}
}

View File

@ -63,7 +63,7 @@ impl ZcashDeserialize for InventoryHash {
1 => Ok(InventoryHash::Tx(TransactionHash(bytes))),
2 => Ok(InventoryHash::Block(BlockHeaderHash(bytes))),
3 => Ok(InventoryHash::FilteredBlock(BlockHeaderHash(bytes))),
_ => Err(SerializationError::ParseError("invalid inventory code")),
_ => Err(SerializationError::Parse("invalid inventory code")),
}
}
}