Call ClientRequest.tx.send() even if there is an error

Previously, tx would be dropped before send if:
- the success case would have used tx to wait for further messages,
- but the response was actually an error.

Instead, send the error on `tx` and call `fail_with()` using the same
error.

To support this change, allow `fail_with()` to take a `PeerError` or a
`SharedPeerError`.
This commit is contained in:
teor 2020-12-16 16:43:19 +10:00 committed by Jane Lusby
parent 28f3186182
commit 3892894ffa
1 changed files with 122 additions and 100 deletions

View File

@ -7,8 +7,7 @@
//! And it's unclear if these assumptions match the `zcashd` implementation. //! And it's unclear if these assumptions match the `zcashd` implementation.
//! It should be refactored into a cleaner set of request/response pairs (#1515). //! It should be refactored into a cleaner set of request/response pairs (#1515).
use std::collections::HashSet; use std::{collections::HashSet, fmt, sync::Arc};
use std::sync::Arc;
use futures::{ use futures::{
channel::{mpsc, oneshot}, channel::{mpsc, oneshot},
@ -371,7 +370,7 @@ where
Either::Left((None, _)) => { Either::Left((None, _)) => {
self.fail_with(PeerError::ConnectionClosed); self.fail_with(PeerError::ConnectionClosed);
} }
Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()), Either::Left((Some(Err(e)), _)) => self.fail_with(e),
Either::Left((Some(Ok(msg)), _)) => { Either::Left((Some(Ok(msg)), _)) => {
self.handle_message_as_request(msg).await self.handle_message_as_request(msg).await
} }
@ -405,7 +404,7 @@ where
.await .await
{ {
Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed), Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()), Either::Left((Some(Err(e)), _)) => self.fail_with(e),
Either::Left((Some(Ok(peer_msg)), _cancel)) => { Either::Left((Some(Ok(peer_msg)), _cancel)) => {
// Try to process the message using the handler. // Try to process the message using the handler.
// This extremely awkward construction avoids // This extremely awkward construction avoids
@ -494,7 +493,10 @@ where
} }
/// Marks the peer as having failed with error `e`. /// Marks the peer as having failed with error `e`.
fn fail_with(&mut self, e: PeerError) { fn fail_with<E>(&mut self, e: E)
where
E: Into<SharedPeerError> + fmt::Display,
{
debug!(%e, "failing peer service with error"); debug!(%e, "failing peer service with error");
// Update the shared error slot // Update the shared error slot
let mut guard = self let mut guard = self
@ -542,123 +544,143 @@ where
// XXX(hdevalence) this is truly horrible, but let's fix it later // XXX(hdevalence) this is truly horrible, but let's fix it later
// Inner match returns Result with the new state or an error. // Inner matches return a Result with a new state or an (error, Option<oneshot::Sender>)
// Outer match updates state or fails. // Middle match returns Result with the new state or the (error, Option<oneshot::Sender>)
// Outer match updates state or fails, and sends the error on the Sender if it is Some
match match (&self.state, request) { match match (&self.state, request) {
(Failed, _) => panic!("failed connection cannot handle requests"), (Failed, _) => panic!("failed connection cannot handle requests"),
(AwaitingResponse { .. }, _) => panic!("tried to update pending request"), (AwaitingResponse { .. }, _) => panic!("tried to update pending request"),
(AwaitingRequest, Peers) => self
.peer_tx (AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await {
.send(Message::GetAddr) Ok(()) => Ok(AwaitingResponse {
.await
.map_err(|e| e.into())
.map(|()| AwaitingResponse {
handler: Handler::Peers, handler: Handler::Peers,
tx, tx,
span, span,
}), }),
(AwaitingRequest, Ping(nonce)) => self Err(e) => Err((e, Some(tx))),
.peer_tx },
.send(Message::Ping(nonce)) (AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await {
.await Ok(()) => Ok(AwaitingResponse {
.map_err(|e| e.into())
.map(|()| AwaitingResponse {
handler: Handler::Ping(nonce), handler: Handler::Ping(nonce),
tx, tx,
span, span,
}), }),
(AwaitingRequest, BlocksByHash(hashes)) => self Err(e) => Err((e, Some(tx))),
.peer_tx },
.send(Message::GetData( (AwaitingRequest, BlocksByHash(hashes)) => {
hashes.iter().map(|h| (*h).into()).collect(), match self
)) .peer_tx
.await .send(Message::GetData(
.map_err(|e| e.into()) hashes.iter().map(|h| (*h).into()).collect(),
.map(|()| AwaitingResponse { ))
handler: Handler::BlocksByHash { .await
blocks: Vec::with_capacity(hashes.len()), {
hashes, Ok(()) => Ok(AwaitingResponse {
}, handler: Handler::BlocksByHash {
tx, blocks: Vec::with_capacity(hashes.len()),
span, hashes,
}), },
(AwaitingRequest, TransactionsByHash(hashes)) => self tx,
.peer_tx span,
.send(Message::GetData( }),
hashes.iter().map(|h| (*h).into()).collect(), Err(e) => Err((e, Some(tx))),
)) }
.await }
.map_err(|e| e.into()) (AwaitingRequest, TransactionsByHash(hashes)) => {
.map(|()| AwaitingResponse { match self
handler: Handler::TransactionsByHash { .peer_tx
transactions: Vec::with_capacity(hashes.len()), .send(Message::GetData(
hashes, hashes.iter().map(|h| (*h).into()).collect(),
}, ))
tx, .await
span, {
}), Ok(()) => Ok(AwaitingResponse {
(AwaitingRequest, FindBlocks { known_blocks, stop }) => self handler: Handler::TransactionsByHash {
.peer_tx transactions: Vec::with_capacity(hashes.len()),
.send(Message::GetBlocks { known_blocks, stop }) hashes,
.await },
.map_err(|e| e.into()) tx,
.map(|()| AwaitingResponse { span,
handler: Handler::FindBlocks, }),
tx, Err(e) => Err((e, Some(tx))),
span, }
}), }
(AwaitingRequest, FindHeaders { known_blocks, stop }) => self (AwaitingRequest, FindBlocks { known_blocks, stop }) => {
.peer_tx match self
.send(Message::GetHeaders { known_blocks, stop }) .peer_tx
.await .send(Message::GetBlocks { known_blocks, stop })
.map_err(|e| e.into()) .await
.map(|()| AwaitingResponse { {
handler: Handler::FindHeaders, Ok(()) => Ok(AwaitingResponse {
tx, handler: Handler::FindBlocks,
span, tx,
}), span,
(AwaitingRequest, MempoolTransactions) => self }),
.peer_tx Err(e) => Err((e, Some(tx))),
.send(Message::Mempool) }
.await }
.map_err(|e| e.into()) (AwaitingRequest, FindHeaders { known_blocks, stop }) => {
.map(|()| AwaitingResponse { match self
handler: Handler::MempoolTransactions, .peer_tx
tx, .send(Message::GetHeaders { known_blocks, stop })
span, .await
}), {
Ok(()) => Ok(AwaitingResponse {
handler: Handler::FindHeaders,
tx,
span,
}),
Err(e) => Err((e, Some(tx))),
}
}
(AwaitingRequest, MempoolTransactions) => {
match self.peer_tx.send(Message::Mempool).await {
Ok(()) => Ok(AwaitingResponse {
handler: Handler::MempoolTransactions,
tx,
span,
}),
Err(e) => Err((e, Some(tx))),
}
}
(AwaitingRequest, PushTransaction(transaction)) => { (AwaitingRequest, PushTransaction(transaction)) => {
// Since we're not waiting for further messages, we need to // Since we're not waiting for further messages, we need to
// send a response before dropping tx. // send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil)); let _ = tx.send(Ok(Response::Nil));
self.peer_tx match self.peer_tx.send(Message::Tx(transaction)).await {
.send(Message::Tx(transaction)) Ok(()) => Ok(AwaitingRequest),
.await Err(e) => Err((e, None)),
.map_err(|e| e.into()) }
.map(|()| AwaitingRequest)
} }
(AwaitingRequest, AdvertiseTransactions(hashes)) => { (AwaitingRequest, AdvertiseTransactions(hashes)) => {
let _ = tx.send(Ok(Response::Nil)); let _ = tx.send(Ok(Response::Nil));
self.peer_tx match self
.peer_tx
.send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect())) .send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect()))
.await .await
.map_err(|e| e.into()) {
.map(|()| AwaitingRequest) Ok(()) => Ok(AwaitingRequest),
Err(e) => Err((e, None)),
}
} }
(AwaitingRequest, AdvertiseBlock(hash)) => { (AwaitingRequest, AdvertiseBlock(hash)) => {
let _ = tx.send(Ok(Response::Nil)); let _ = tx.send(Ok(Response::Nil));
self.peer_tx match self.peer_tx.send(Message::Inv(vec![hash.into()])).await {
.send(Message::Inv(vec![hash.into()])) Ok(()) => Ok(AwaitingRequest),
.await Err(e) => Err((e, None)),
.map_err(|e| e.into()) }
.map(|()| AwaitingRequest)
} }
} { } {
Ok(new_state) => { Ok(new_state) => {
self.state = new_state; self.state = new_state;
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT)); self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
} }
Err(e) => self.fail_with(e), Err((e, Some(tx))) => {
let e = SharedPeerError::from(e);
let _ = tx.send(Err(e.clone()));
self.fail_with(e);
}
Err((e, None)) => self.fail_with(e),
} }
} }
@ -671,7 +693,7 @@ where
Message::Ping(nonce) => { Message::Ping(nonce) => {
trace!(?nonce, "responding to heartbeat"); trace!(?nonce, "responding to heartbeat");
if let Err(e) = self.peer_tx.send(Message::Pong(nonce)).await { if let Err(e) = self.peer_tx.send(Message::Pong(nonce)).await {
self.fail_with(e.into()); self.fail_with(e);
} }
return; return;
} }
@ -800,14 +822,14 @@ where
Response::Nil => { /* generic success, do nothing */ } Response::Nil => { /* generic success, do nothing */ }
Response::Peers(addrs) => { Response::Peers(addrs) => {
if let Err(e) = self.peer_tx.send(Message::Addr(addrs)).await { if let Err(e) = self.peer_tx.send(Message::Addr(addrs)).await {
self.fail_with(e.into()); self.fail_with(e);
} }
} }
Response::Transactions(transactions) => { Response::Transactions(transactions) => {
// Generate one tx message per transaction. // Generate one tx message per transaction.
for transaction in transactions.into_iter() { for transaction in transactions.into_iter() {
if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await { if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await {
self.fail_with(e.into()); self.fail_with(e);
} }
} }
} }
@ -815,7 +837,7 @@ where
// Generate one block message per block. // Generate one block message per block.
for block in blocks.into_iter() { for block in blocks.into_iter() {
if let Err(e) = self.peer_tx.send(Message::Block(block)).await { if let Err(e) = self.peer_tx.send(Message::Block(block)).await {
self.fail_with(e.into()); self.fail_with(e);
} }
} }
} }
@ -825,12 +847,12 @@ where
.send(Message::Inv(hashes.into_iter().map(Into::into).collect())) .send(Message::Inv(hashes.into_iter().map(Into::into).collect()))
.await .await
{ {
self.fail_with(e.into()) self.fail_with(e)
} }
} }
Response::BlockHeaders(headers) => { Response::BlockHeaders(headers) => {
if let Err(e) = self.peer_tx.send(Message::Headers(headers)).await { if let Err(e) = self.peer_tx.send(Message::Headers(headers)).await {
self.fail_with(e.into()) self.fail_with(e)
} }
} }
Response::TransactionHashes(hashes) => { Response::TransactionHashes(hashes) => {
@ -839,7 +861,7 @@ where
.send(Message::Inv(hashes.into_iter().map(Into::into).collect())) .send(Message::Inv(hashes.into_iter().map(Into::into).collect()))
.await .await
{ {
self.fail_with(e.into()) self.fail_with(e)
} }
} }
} }