Revert "leverage return value for propagating errors"

This reverts commit e6cb20e13f.
This commit is contained in:
teor 2021-02-23 10:54:48 +10:00 committed by Jane Lusby
parent 72e2e83828
commit 78f162733d
1 changed files with 131 additions and 115 deletions

View File

@ -378,10 +378,7 @@ where
} }
Either::Left((Some(Err(e)), _)) => self.fail_with(e), Either::Left((Some(Err(e)), _)) => self.fail_with(e),
Either::Left((Some(Ok(msg)), _)) => { Either::Left((Some(Ok(msg)), _)) => {
match self.handle_message_as_request(msg).await { self.handle_message_as_request(msg).await
Ok(()) => {}
Err(e) => self.fail_with(e),
}
} }
Either::Right((None, _)) => { Either::Right((None, _)) => {
trace!("client_rx closed, ending connection"); trace!("client_rx closed, ending connection");
@ -437,10 +434,7 @@ where
if let Some(msg) = request_msg { if let Some(msg) = request_msg {
// do NOT instrument with the request span, this is // do NOT instrument with the request span, this is
// independent work // independent work
match self.handle_message_as_request(msg).await { self.handle_message_as_request(msg).await;
Ok(()) => {}
Err(e) => self.fail_with(e),
}
} else { } else {
// Otherwise, check whether the handler is finished // Otherwise, check whether the handler is finished
// processing messages and update the state. // processing messages and update the state.
@ -571,53 +565,18 @@ where
/// NOTE: the caller should use .instrument(msg.span) to instrument the function. /// NOTE: the caller should use .instrument(msg.span) to instrument the function.
async fn handle_client_request(&mut self, req: InProgressClientRequest) { async fn handle_client_request(&mut self, req: InProgressClientRequest) {
trace!(?req.request); trace!(?req.request);
use Request::*;
use State::*; use State::*;
let InProgressClientRequest { request, tx, span } = req;
if req.tx.is_canceled() { if tx.is_canceled() {
metrics::counter!("peer.canceled", 1); metrics::counter!("peer.canceled", 1);
tracing::debug!("ignoring canceled request"); tracing::debug!("ignoring canceled request");
return; return;
} }
let new_state_result = self._handle_client_request(req).await; // These matches return a Result with (new_state, Option<Sender>) or an (error, Sender)
let new_state_result = match (&self.state, request) {
// Updates state or fails.
match new_state_result {
Ok(AwaitingRequest) => {
self.state = AwaitingRequest;
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
}
Ok(new_state @ AwaitingResponse { .. }) => {
self.state = new_state;
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
}
Err((e, tx)) => {
let e = SharedPeerError::from(e);
let _ = tx.send(Err(e.clone()));
self.fail_with(e);
}
// unreachable states
Ok(Failed) => unreachable!(
"failed client requests must use fail_with(error) to reach a Failed state."
),
};
}
async fn _handle_client_request(
&mut self,
req: InProgressClientRequest,
) -> Result<
State,
(
SerializationError,
MustUseOneshotSender<Result<Response, SharedPeerError>>,
),
> {
use Request::*;
use State::*;
let InProgressClientRequest { request, tx, span } = req;
match (&self.state, request) {
(Failed, request) => panic!( (Failed, request) => panic!(
"failed connection cannot handle new request: {:?}, client_receiver: {:?}", "failed connection cannot handle new request: {:?}, client_receiver: {:?}",
request, request,
@ -630,23 +589,25 @@ where
self.client_rx self.client_rx
), ),
(AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await { (AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await {
Ok(()) => Ok( Ok(()) => Ok((
AwaitingResponse { AwaitingResponse {
handler: Handler::Peers, handler: Handler::Peers,
tx, tx,
span, span,
}, },
), None,
)),
Err(e) => Err((e, tx)), Err(e) => Err((e, tx)),
}, },
(AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await { (AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await {
Ok(()) => Ok( Ok(()) => Ok((
AwaitingResponse { AwaitingResponse {
handler: Handler::Ping(nonce), handler: Handler::Ping(nonce),
tx, tx,
span, span,
}, },
), None,
)),
Err(e) => Err((e, tx)), Err(e) => Err((e, tx)),
}, },
(AwaitingRequest, BlocksByHash(hashes)) => { (AwaitingRequest, BlocksByHash(hashes)) => {
@ -657,7 +618,7 @@ where
)) ))
.await .await
{ {
Ok(()) => Ok( Ok(()) => Ok((
AwaitingResponse { AwaitingResponse {
handler: Handler::BlocksByHash { handler: Handler::BlocksByHash {
blocks: Vec::with_capacity(hashes.len()), blocks: Vec::with_capacity(hashes.len()),
@ -666,7 +627,8 @@ where
tx, tx,
span, span,
}, },
), None,
)),
Err(e) => Err((e, tx)), Err(e) => Err((e, tx)),
} }
} }
@ -678,7 +640,7 @@ where
)) ))
.await .await
{ {
Ok(()) => Ok( Ok(()) => Ok((
AwaitingResponse { AwaitingResponse {
handler: Handler::TransactionsByHash { handler: Handler::TransactionsByHash {
transactions: Vec::with_capacity(hashes.len()), transactions: Vec::with_capacity(hashes.len()),
@ -687,7 +649,8 @@ where
tx, tx,
span, span,
}, },
), None,
)),
Err(e) => Err((e, tx)), Err(e) => Err((e, tx)),
} }
} }
@ -697,13 +660,14 @@ where
.send(Message::GetBlocks { known_blocks, stop }) .send(Message::GetBlocks { known_blocks, stop })
.await .await
{ {
Ok(()) => Ok( Ok(()) => Ok((
AwaitingResponse { AwaitingResponse {
handler: Handler::FindBlocks, handler: Handler::FindBlocks,
tx, tx,
span, span,
}, },
), None,
)),
Err(e) => Err((e, tx)), Err(e) => Err((e, tx)),
} }
} }
@ -713,36 +677,33 @@ where
.send(Message::GetHeaders { known_blocks, stop }) .send(Message::GetHeaders { known_blocks, stop })
.await .await
{ {
Ok(()) => Ok( Ok(()) => Ok((
AwaitingResponse { AwaitingResponse {
handler: Handler::FindHeaders, handler: Handler::FindHeaders,
tx, tx,
span, span,
}, },
), None,
)),
Err(e) => Err((e, tx)), Err(e) => Err((e, tx)),
} }
} }
(AwaitingRequest, MempoolTransactions) => { (AwaitingRequest, MempoolTransactions) => {
match self.peer_tx.send(Message::Mempool).await { match self.peer_tx.send(Message::Mempool).await {
Ok(()) => Ok( Ok(()) => Ok((
AwaitingResponse { AwaitingResponse {
handler: Handler::MempoolTransactions, handler: Handler::MempoolTransactions,
tx, tx,
span, span,
}, },
), None,
)),
Err(e) => Err((e, tx)), Err(e) => Err((e, tx)),
} }
} }
(AwaitingRequest, PushTransaction(transaction)) => { (AwaitingRequest, PushTransaction(transaction)) => {
match self.peer_tx.send(Message::Tx(transaction)).await { match self.peer_tx.send(Message::Tx(transaction)).await {
Ok(()) => { Ok(()) => Ok((AwaitingRequest, Some(tx))),
// Since we're not waiting for further messages, we need to
// send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil));
Ok(AwaitingRequest)
},
Err(e) => Err((e, tx)), Err(e) => Err((e, tx)),
} }
} }
@ -752,78 +713,110 @@ where
.send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect())) .send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect()))
.await .await
{ {
Ok(()) => { Ok(()) => Ok((AwaitingRequest, Some(tx))),
// Since we're not waiting for further messages, we need to
// send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil));
Ok(AwaitingRequest)
},
Err(e) => Err((e, tx)), Err(e) => Err((e, tx)),
} }
} }
(AwaitingRequest, AdvertiseBlock(hash)) => { (AwaitingRequest, AdvertiseBlock(hash)) => {
match self.peer_tx.send(Message::Inv(vec![hash.into()])).await { match self.peer_tx.send(Message::Inv(vec![hash.into()])).await {
Ok(()) => { Ok(()) => Ok((AwaitingRequest, Some(tx))),
// Since we're not waiting for further messages, we need to
// send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil));
Ok(AwaitingRequest)
},
Err(e) => Err((e, tx)), Err(e) => Err((e, tx)),
} }
} }
} };
// Updates state or fails. Sends the error on the Sender if it is Some.
match new_state_result {
Ok((AwaitingRequest, Some(tx))) => {
// Since we're not waiting for further messages, we need to
// send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil));
self.state = AwaitingRequest;
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
}
Ok((new_state @ AwaitingResponse { .. }, None)) => {
self.state = new_state;
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
}
Err((e, tx)) => {
let e = SharedPeerError::from(e);
let _ = tx.send(Err(e.clone()));
self.fail_with(e);
}
// unreachable states
Ok((Failed, tx)) => unreachable!(
"failed client requests must use fail_with(error) to reach a Failed state. tx: {:?}",
tx
),
Ok((AwaitingRequest, None)) => unreachable!(
"successful AwaitingRequest states must send a response on tx, but tx is None",
),
Ok((new_state @ AwaitingResponse { .. }, Some(tx))) => unreachable!(
"successful AwaitingResponse states must keep tx, but tx is Some: {:?} for: {:?}",
tx, new_state,
),
};
} }
// This function has its own span, because we're creating a new work // This function has its own span, because we're creating a new work
// context (namely, the work of processing the inbound msg as a request) // context (namely, the work of processing the inbound msg as a request)
#[instrument(name = "msg_as_req", skip(self, msg), fields(%msg))] #[instrument(name = "msg_as_req", skip(self, msg), fields(%msg))]
async fn handle_message_as_request(&mut self, msg: Message) -> Result<(), PeerError> { async fn handle_message_as_request(&mut self, msg: Message) {
trace!(?msg); trace!(?msg);
let req = match msg { let req = match msg {
Message::Ping(nonce) => { Message::Ping(nonce) => {
trace!(?nonce, "responding to heartbeat"); trace!(?nonce, "responding to heartbeat");
self.peer_tx.send(Message::Pong(nonce)).await?; if let Err(e) = self.peer_tx.send(Message::Pong(nonce)).await {
return Ok(()); self.fail_with(e);
}
return;
} }
// These messages shouldn't be sent outside of a handshake. // These messages shouldn't be sent outside of a handshake.
Message::Version { .. } => Err(PeerError::DuplicateHandshake)?, Message::Version { .. } => {
Message::Verack { .. } => Err(PeerError::DuplicateHandshake)?, self.fail_with(PeerError::DuplicateHandshake);
return;
}
Message::Verack { .. } => {
self.fail_with(PeerError::DuplicateHandshake);
return;
}
// These messages should already be handled as a response if they // These messages should already be handled as a response if they
// could be a response, so if we see them here, they were either // could be a response, so if we see them here, they were either
// sent unsolicited, or they were sent in response to a canceled request // sent unsolicited, or they were sent in response to a canceled request
// that we've already forgotten about. // that we've already forgotten about.
Message::Reject { .. } => { Message::Reject { .. } => {
tracing::debug!("got reject message unsolicited or from canceled request"); tracing::debug!("got reject message unsolicited or from canceled request");
return Ok(()); return;
} }
Message::NotFound { .. } => { Message::NotFound { .. } => {
tracing::debug!("got notfound message unsolicited or from canceled request"); tracing::debug!("got notfound message unsolicited or from canceled request");
return Ok(()); return;
} }
Message::Pong(_) => { Message::Pong(_) => {
tracing::debug!("got pong message unsolicited or from canceled request"); tracing::debug!("got pong message unsolicited or from canceled request");
return Ok(()); return;
} }
Message::Block(_) => { Message::Block(_) => {
tracing::debug!("got block message unsolicited or from canceled request"); tracing::debug!("got block message unsolicited or from canceled request");
return Ok(()); return;
} }
Message::Headers(_) => { Message::Headers(_) => {
tracing::debug!("got headers message unsolicited or from canceled request"); tracing::debug!("got headers message unsolicited or from canceled request");
return Ok(()); return;
} }
// These messages should never be sent by peers. // These messages should never be sent by peers.
Message::FilterLoad { .. } Message::FilterLoad { .. }
| Message::FilterAdd { .. } | Message::FilterAdd { .. }
| Message::FilterClear { .. } => Err(PeerError::UnsupportedMessage( | Message::FilterClear { .. } => {
"got BIP11 message without advertising NODE_BLOOM", self.fail_with(PeerError::UnsupportedMessage(
))?, "got BIP11 message without advertising NODE_BLOOM",
));
return;
}
// Zebra crawls the network proactively, to prevent // Zebra crawls the network proactively, to prevent
// peers from inserting data into our address book. // peers from inserting data into our address book.
Message::Addr(_) => { Message::Addr(_) => {
trace!("ignoring unsolicited addr message"); trace!("ignoring unsolicited addr message");
return Ok(()); return;
} }
Message::Tx(transaction) => Request::PushTransaction(transaction), Message::Tx(transaction) => Request::PushTransaction(transaction),
Message::Inv(items) => match &items[..] { Message::Inv(items) => match &items[..] {
@ -835,7 +828,10 @@ where
{ {
Request::TransactionsByHash(transaction_hashes(&items).collect()) Request::TransactionsByHash(transaction_hashes(&items).collect())
} }
_ => Err(PeerError::WrongMessage("inv with mixed item types"))?, _ => {
self.fail_with(PeerError::WrongMessage("inv with mixed item types"));
return;
}
}, },
Message::GetData(items) => match &items[..] { Message::GetData(items) => match &items[..] {
[InventoryHash::Block(_), rest @ ..] [InventoryHash::Block(_), rest @ ..]
@ -850,7 +846,10 @@ where
{ {
Request::TransactionsByHash(transaction_hashes(&items).collect()) Request::TransactionsByHash(transaction_hashes(&items).collect())
} }
_ => Err(PeerError::WrongMessage("getdata with mixed item types"))?, _ => {
self.fail_with(PeerError::WrongMessage("getdata with mixed item types"));
return;
}
}, },
Message::GetAddr => Request::Peers, Message::GetAddr => Request::Peers,
Message::GetBlocks { known_blocks, stop } => Request::FindBlocks { known_blocks, stop }, Message::GetBlocks { known_blocks, stop } => Request::FindBlocks { known_blocks, stop },
@ -860,9 +859,7 @@ where
Message::Mempool => Request::MempoolTransactions, Message::Mempool => Request::MempoolTransactions,
}; };
self.drive_peer_request(req).await?; self.drive_peer_request(req).await
Ok(())
} }
/// Given a `req` originating from the peer, drive it to completion and send /// Given a `req` originating from the peer, drive it to completion and send
@ -870,14 +867,15 @@ where
/// processing the request (e.g., the service is shedding load), then we call /// processing the request (e.g., the service is shedding load), then we call
/// fail_with to terminate the entire peer connection, shrinking the number /// fail_with to terminate the entire peer connection, shrinking the number
/// of connected peers. /// of connected peers.
async fn drive_peer_request(&mut self, req: Request) -> Result<(), PeerError> { async fn drive_peer_request(&mut self, req: Request) {
trace!(?req); trace!(?req);
use tower::{load_shed::error::Overloaded, ServiceExt}; use tower::{load_shed::error::Overloaded, ServiceExt};
if self.svc.ready_and().await.is_err() { if self.svc.ready_and().await.is_err() {
// Treat all service readiness errors as Overloaded // Treat all service readiness errors as Overloaded
// TODO: treat `TryRecvError::Closed` in `Inbound::poll_ready` as a fatal error (#1655) // TODO: treat `TryRecvError::Closed` in `Inbound::poll_ready` as a fatal error (#1655)
Err(PeerError::Overloaded)? self.fail_with(PeerError::Overloaded);
return;
} }
let rsp = match self.svc.call(req).await { let rsp = match self.svc.call(req).await {
@ -885,7 +883,7 @@ where
if e.is::<Overloaded>() { if e.is::<Overloaded>() {
tracing::warn!("inbound service is overloaded, closing connection"); tracing::warn!("inbound service is overloaded, closing connection");
metrics::counter!("pool.closed.loadshed", 1); metrics::counter!("pool.closed.loadshed", 1);
Err(PeerError::Overloaded)? self.fail_with(PeerError::Overloaded);
} else { } else {
// We could send a reject to the remote peer, but that might cause // We could send a reject to the remote peer, but that might cause
// them to disconnect, and we might be using them to sync blocks. // them to disconnect, and we might be using them to sync blocks.
@ -896,40 +894,58 @@ where
client_receiver = ?self.client_rx, client_receiver = ?self.client_rx,
"error processing peer request"); "error processing peer request");
} }
return Ok(()); return;
} }
Ok(rsp) => rsp, Ok(rsp) => rsp,
}; };
match rsp { match rsp {
Response::Nil => { /* generic success, do nothing */ }, Response::Nil => { /* generic success, do nothing */ }
Response::Peers(addrs) => self.peer_tx.send(Message::Addr(addrs)).await?, Response::Peers(addrs) => {
if let Err(e) = self.peer_tx.send(Message::Addr(addrs)).await {
self.fail_with(e);
}
}
Response::Transactions(transactions) => { Response::Transactions(transactions) => {
// Generate one tx message per transaction. // Generate one tx message per transaction.
for transaction in transactions.into_iter() { for transaction in transactions.into_iter() {
self.peer_tx.send(Message::Tx(transaction)).await?; if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await {
self.fail_with(e);
}
} }
} }
Response::Blocks(blocks) => { Response::Blocks(blocks) => {
// Generate one block message per block. // Generate one block message per block.
for block in blocks.into_iter() { for block in blocks.into_iter() {
self.peer_tx.send(Message::Block(block)).await?; if let Err(e) = self.peer_tx.send(Message::Block(block)).await {
self.fail_with(e);
}
} }
} }
Response::BlockHashes(hashes) => { Response::BlockHashes(hashes) => {
self.peer_tx if let Err(e) = self
.peer_tx
.send(Message::Inv(hashes.into_iter().map(Into::into).collect())) .send(Message::Inv(hashes.into_iter().map(Into::into).collect()))
.await? .await
{
self.fail_with(e)
}
}
Response::BlockHeaders(headers) => {
if let Err(e) = self.peer_tx.send(Message::Headers(headers)).await {
self.fail_with(e)
}
} }
Response::BlockHeaders(headers) => self.peer_tx.send(Message::Headers(headers)).await?,
Response::TransactionHashes(hashes) => { Response::TransactionHashes(hashes) => {
self.peer_tx if let Err(e) = self
.peer_tx
.send(Message::Inv(hashes.into_iter().map(Into::into).collect())) .send(Message::Inv(hashes.into_iter().map(Into::into).collect()))
.await? .await
{
self.fail_with(e)
}
} }
} }
Ok(())
} }
} }